Merge tag 'nfs-rdma-for-4.2' of git://git.linux-nfs.org/projects/anna/nfs-rdma

NFS: NFSoRDMA Client Changes

These patches continue to build up for improving the rsize and wsize that the
NFS client uses when talking over RDMA.  In addition, these patches also add
in scalability enhancements and other bugfixes.

Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>

* tag 'nfs-rdma-for-4.2' of git://git.linux-nfs.org/projects/anna/nfs-rdma: (142 commits)
  xprtrdma: Reduce per-transport MR allocation
  xprtrdma: Stack relief in fmr_op_map()
  xprtrdma: Split rb_lock
  xprtrdma: Remove rpcrdma_ia::ri_memreg_strategy
  xprtrdma: Remove ->ro_reset
  xprtrdma: Remove unused LOCAL_INV recovery logic
  xprtrdma: Acquire MRs in rpcrdma_register_external()
  xprtrdma: Introduce an FRMR recovery workqueue
  xprtrdma: Acquire FMRs in rpcrdma_fmr_register_external()
  xprtrdma: Introduce helpers for allocating MWs
  xprtrdma: Use ib_device pointer safely
  xprtrdma: Remove rr_func
  xprtrdma: Replace rpcrdma_rep::rr_buffer with rr_rxprt
  xprtrdma: Warn when there are orphaned IB objects
  ...
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 8d129bb..682529c 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -458,7 +458,7 @@
  * pg_authenticate method for nfsv4 callback threads.
  *
  * The authflavor has been negotiated, so an incorrect flavor is a server
- * bug. Drop packets with incorrect authflavor.
+ * bug. Deny packets with incorrect authflavor.
  *
  * All other checking done after NFS decoding where the nfs_client can be
  * found in nfs4_callback_compound
@@ -468,12 +468,12 @@
 	switch (rqstp->rq_authop->flavour) {
 	case RPC_AUTH_NULL:
 		if (rqstp->rq_proc != CB_NULL)
-			return SVC_DROP;
+			return SVC_DENIED;
 		break;
 	case RPC_AUTH_GSS:
 		/* No RPC_AUTH_GSS support yet in NFSv4.1 */
 		 if (svc_is_backchannel(rqstp))
-			return SVC_DROP;
+			return SVC_DENIED;
 	}
 	return SVC_OK;
 }
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 19ca95c..6b1697a 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -909,7 +909,7 @@
 	xdr_init_encode(&xdr_out, &rqstp->rq_res, p);
 
 	status = decode_compound_hdr_arg(&xdr_in, &hdr_arg);
-	if (status == __constant_htonl(NFS4ERR_RESOURCE))
+	if (status == htonl(NFS4ERR_RESOURCE))
 		return rpc_garbage_args;
 
 	if (hdr_arg.minorversion == 0) {
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 892aeff..1e37491 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -1364,27 +1364,29 @@
 {
 	struct nfs_server *server;
 	struct nfs_client *clp;
-	char dev[8], fsid[17];
+	char dev[13];	// 8 for 2^24, 1 for ':', 3 for 2^8, 1 for '\0'
+	char fsid[34];	// 2 * 16 for %llx, 1 for ':', 1 for '\0'
 	struct nfs_net *nn = net_generic(seq_file_net(m), nfs_net_id);
 
 	/* display header on line 1 */
 	if (v == &nn->nfs_volume_list) {
-		seq_puts(m, "NV SERVER   PORT DEV     FSID              FSC\n");
+		seq_puts(m, "NV SERVER   PORT DEV          FSID"
+			    "                              FSC\n");
 		return 0;
 	}
 	/* display one transport per line on subsequent lines */
 	server = list_entry(v, struct nfs_server, master_link);
 	clp = server->nfs_client;
 
-	snprintf(dev, 8, "%u:%u",
+	snprintf(dev, sizeof(dev), "%u:%u",
 		 MAJOR(server->s_dev), MINOR(server->s_dev));
 
-	snprintf(fsid, 17, "%llx:%llx",
+	snprintf(fsid, sizeof(fsid), "%llx:%llx",
 		 (unsigned long long) server->fsid.major,
 		 (unsigned long long) server->fsid.minor);
 
 	rcu_read_lock();
-	seq_printf(m, "v%u %s %s %-7s %-17s %s\n",
+	seq_printf(m, "v%u %s %s %-12s %-33s %s\n",
 		   clp->rpc_ops->version,
 		   rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_HEX_ADDR),
 		   rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_HEX_PORT),
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 8b8d83a..cc4fa1e 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -555,31 +555,22 @@
 	return nfs_wb_page(inode, page);
 }
 
-#ifdef CONFIG_NFS_SWAP
 static int nfs_swap_activate(struct swap_info_struct *sis, struct file *file,
 						sector_t *span)
 {
-	int ret;
 	struct rpc_clnt *clnt = NFS_CLIENT(file->f_mapping->host);
 
 	*span = sis->pages;
 
-	rcu_read_lock();
-	ret = xs_swapper(rcu_dereference(clnt->cl_xprt), 1);
-	rcu_read_unlock();
-
-	return ret;
+	return rpc_clnt_swap_activate(clnt);
 }
 
 static void nfs_swap_deactivate(struct file *file)
 {
 	struct rpc_clnt *clnt = NFS_CLIENT(file->f_mapping->host);
 
-	rcu_read_lock();
-	xs_swapper(rcu_dereference(clnt->cl_xprt), 0);
-	rcu_read_unlock();
+	rpc_clnt_swap_deactivate(clnt);
 }
-#endif
 
 const struct address_space_operations nfs_file_aops = {
 	.readpage = nfs_readpage,
@@ -596,10 +587,8 @@
 	.launder_page = nfs_launder_page,
 	.is_dirty_writeback = nfs_check_dirty_writeback,
 	.error_remove_page = generic_error_remove_page,
-#ifdef CONFIG_NFS_SWAP
 	.swap_activate = nfs_swap_activate,
 	.swap_deactivate = nfs_swap_deactivate,
-#endif
 };
 
 /*
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index 7d05089..0f1410c 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -182,17 +182,14 @@
 
 static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
 {
-	struct nfs4_ff_layout_mirror *tmp;
 	int i, j;
 
 	for (i = 0; i < fls->mirror_array_cnt - 1; i++) {
 		for (j = i + 1; j < fls->mirror_array_cnt; j++)
 			if (fls->mirror_array[i]->efficiency <
-			    fls->mirror_array[j]->efficiency) {
-				tmp = fls->mirror_array[i];
-				fls->mirror_array[i] = fls->mirror_array[j];
-				fls->mirror_array[j] = tmp;
-			}
+			    fls->mirror_array[j]->efficiency)
+				swap(fls->mirror_array[i],
+				     fls->mirror_array[j]);
 	}
 }
 
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index f734562..14b4709 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -678,6 +678,8 @@
 	if (!err) {
 		generic_fillattr(inode, stat);
 		stat->ino = nfs_compat_user_ino64(NFS_FILEID(inode));
+		if (S_ISDIR(inode->i_mode))
+			stat->blksize = NFS_SERVER(inode)->dtsize;
 	}
 out:
 	trace_nfs_getattr_exit(inode, err);
diff --git a/fs/nfs/nfs3xdr.c b/fs/nfs/nfs3xdr.c
index 53852a4..9b04c2e 100644
--- a/fs/nfs/nfs3xdr.c
+++ b/fs/nfs/nfs3xdr.c
@@ -1342,7 +1342,7 @@
 	if (args->npages != 0)
 		xdr_write_pages(xdr, args->pages, 0, args->len);
 	else
-		xdr_reserve_space(xdr, NFS_ACL_INLINE_BUFSIZE);
+		xdr_reserve_space(xdr, args->len);
 
 	error = nfsacl_encode(xdr->buf, base, args->inode,
 			    (args->mask & NFS_ACL) ?
diff --git a/fs/nfs/nfs4idmap.c b/fs/nfs/nfs4idmap.c
index 2e1737c..535dfc6 100644
--- a/fs/nfs/nfs4idmap.c
+++ b/fs/nfs/nfs4idmap.c
@@ -494,12 +494,7 @@
 
 int nfs_idmap_init(void)
 {
-	int ret;
-	ret = nfs_idmap_init_keyring();
-	if (ret != 0)
-		goto out;
-out:
-	return ret;
+	return nfs_idmap_init_keyring();
 }
 
 void nfs_idmap_quit(void)
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 55e1e3a..0fc1b0c 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -356,6 +356,9 @@
 		case 0:
 			return 0;
 		case -NFS4ERR_OPENMODE:
+		case -NFS4ERR_DELEG_REVOKED:
+		case -NFS4ERR_ADMIN_REVOKED:
+		case -NFS4ERR_BAD_STATEID:
 			if (inode && nfs4_have_delegation(inode, FMODE_READ)) {
 				nfs4_inode_return_delegation(inode);
 				exception->retry = 1;
@@ -367,15 +370,6 @@
 			if (ret < 0)
 				break;
 			goto wait_on_recovery;
-		case -NFS4ERR_DELEG_REVOKED:
-		case -NFS4ERR_ADMIN_REVOKED:
-		case -NFS4ERR_BAD_STATEID:
-			if (state == NULL)
-				break;
-			ret = nfs4_schedule_stateid_recovery(server, state);
-			if (ret < 0)
-				break;
-			goto wait_on_recovery;
 		case -NFS4ERR_EXPIRED:
 			if (state != NULL) {
 				ret = nfs4_schedule_stateid_recovery(server, state);
@@ -3355,6 +3349,8 @@
 			goto out;
 		case -NFS4ERR_MOVED:
 			err = nfs4_get_referral(client, dir, name, fattr, fhandle);
+			if (err == -NFS4ERR_MOVED)
+				err = nfs4_handle_exception(NFS_SERVER(dir), err, &exception);
 			goto out;
 		case -NFS4ERR_WRONGSEC:
 			err = -EPERM;
@@ -4955,49 +4951,128 @@
 	memcpy(bootverf->data, verf, sizeof(bootverf->data));
 }
 
-static unsigned int
-nfs4_init_nonuniform_client_string(struct nfs_client *clp,
-				   char *buf, size_t len)
+static int
+nfs4_init_nonuniform_client_string(struct nfs_client *clp)
 {
-	unsigned int result;
+	int result;
+	size_t len;
+	char *str;
+	bool retried = false;
 
 	if (clp->cl_owner_id != NULL)
-		return strlcpy(buf, clp->cl_owner_id, len);
+		return 0;
+retry:
+	rcu_read_lock();
+	len = 10 + strlen(clp->cl_ipaddr) + 1 +
+		strlen(rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR)) +
+		1 +
+		strlen(rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_PROTO)) +
+		1;
+	rcu_read_unlock();
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
 
 	rcu_read_lock();
-	result = scnprintf(buf, len, "Linux NFSv4.0 %s/%s %s",
-				clp->cl_ipaddr,
-				rpc_peeraddr2str(clp->cl_rpcclient,
-							RPC_DISPLAY_ADDR),
-				rpc_peeraddr2str(clp->cl_rpcclient,
-							RPC_DISPLAY_PROTO));
+	result = scnprintf(str, len, "Linux NFSv4.0 %s/%s %s",
+			clp->cl_ipaddr,
+			rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
+			rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_PROTO));
 	rcu_read_unlock();
-	clp->cl_owner_id = kstrdup(buf, GFP_KERNEL);
-	return result;
+
+	/* Did something change? */
+	if (result >= len) {
+		kfree(str);
+		if (retried)
+			return -EINVAL;
+		retried = true;
+		goto retry;
+	}
+	clp->cl_owner_id = str;
+	return 0;
 }
 
-static unsigned int
-nfs4_init_uniform_client_string(struct nfs_client *clp,
-				char *buf, size_t len)
+static int
+nfs4_init_uniquifier_client_string(struct nfs_client *clp)
 {
-	const char *nodename = clp->cl_rpcclient->cl_nodename;
-	unsigned int result;
+	int result;
+	size_t len;
+	char *str;
+
+	len = 10 + 10 + 1 + 10 + 1 +
+		strlen(nfs4_client_id_uniquifier) + 1 +
+		strlen(clp->cl_rpcclient->cl_nodename) + 1;
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
+
+	result = scnprintf(str, len, "Linux NFSv%u.%u %s/%s",
+			clp->rpc_ops->version, clp->cl_minorversion,
+			nfs4_client_id_uniquifier,
+			clp->cl_rpcclient->cl_nodename);
+	if (result >= len) {
+		kfree(str);
+		return -EINVAL;
+	}
+	clp->cl_owner_id = str;
+	return 0;
+}
+
+static int
+nfs4_init_uniform_client_string(struct nfs_client *clp)
+{
+	int result;
+	size_t len;
+	char *str;
 
 	if (clp->cl_owner_id != NULL)
-		return strlcpy(buf, clp->cl_owner_id, len);
+		return 0;
 
 	if (nfs4_client_id_uniquifier[0] != '\0')
-		result = scnprintf(buf, len, "Linux NFSv%u.%u %s/%s",
-				clp->rpc_ops->version,
-				clp->cl_minorversion,
-				nfs4_client_id_uniquifier,
-				nodename);
-	else
-		result = scnprintf(buf, len, "Linux NFSv%u.%u %s",
-				clp->rpc_ops->version, clp->cl_minorversion,
-				nodename);
-	clp->cl_owner_id = kstrdup(buf, GFP_KERNEL);
-	return result;
+		return nfs4_init_uniquifier_client_string(clp);
+
+	len = 10 + 10 + 1 + 10 + 1 +
+		strlen(clp->cl_rpcclient->cl_nodename) + 1;
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
+
+	result = scnprintf(str, len, "Linux NFSv%u.%u %s",
+			clp->rpc_ops->version, clp->cl_minorversion,
+			clp->cl_rpcclient->cl_nodename);
+	if (result >= len) {
+		kfree(str);
+		return -EINVAL;
+	}
+	clp->cl_owner_id = str;
+	return 0;
 }
 
 /*
@@ -5044,7 +5119,7 @@
 	struct nfs4_setclientid setclientid = {
 		.sc_verifier = &sc_verifier,
 		.sc_prog = program,
-		.sc_cb_ident = clp->cl_cb_ident,
+		.sc_clnt = clp,
 	};
 	struct rpc_message msg = {
 		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_SETCLIENTID],
@@ -5064,16 +5139,15 @@
 
 	/* nfs_client_id4 */
 	nfs4_init_boot_verifier(clp, &sc_verifier);
+
 	if (test_bit(NFS_CS_MIGRATION, &clp->cl_flags))
-		setclientid.sc_name_len =
-				nfs4_init_uniform_client_string(clp,
-						setclientid.sc_name,
-						sizeof(setclientid.sc_name));
+		status = nfs4_init_uniform_client_string(clp);
 	else
-		setclientid.sc_name_len =
-				nfs4_init_nonuniform_client_string(clp,
-						setclientid.sc_name,
-						sizeof(setclientid.sc_name));
+		status = nfs4_init_nonuniform_client_string(clp);
+
+	if (status)
+		goto out;
+
 	/* cb_client4 */
 	setclientid.sc_netid_len =
 				nfs4_init_callback_netid(clp,
@@ -5083,9 +5157,9 @@
 				sizeof(setclientid.sc_uaddr), "%s.%u.%u",
 				clp->cl_ipaddr, port >> 8, port & 255);
 
-	dprintk("NFS call  setclientid auth=%s, '%.*s'\n",
+	dprintk("NFS call  setclientid auth=%s, '%s'\n",
 		clp->cl_rpcclient->cl_auth->au_ops->au_name,
-		setclientid.sc_name_len, setclientid.sc_name);
+		clp->cl_owner_id);
 	task = rpc_run_task(&task_setup_data);
 	if (IS_ERR(task)) {
 		status = PTR_ERR(task);
@@ -6846,11 +6920,14 @@
 	};
 
 	nfs4_init_boot_verifier(clp, &verifier);
-	args.id_len = nfs4_init_uniform_client_string(clp, args.id,
-							sizeof(args.id));
-	dprintk("NFS call  exchange_id auth=%s, '%.*s'\n",
+
+	status = nfs4_init_uniform_client_string(clp);
+	if (status)
+		goto out;
+
+	dprintk("NFS call  exchange_id auth=%s, '%s'\n",
 		clp->cl_rpcclient->cl_auth->au_ops->au_name,
-		args.id_len, args.id);
+		clp->cl_owner_id);
 
 	res.server_owner = kzalloc(sizeof(struct nfs41_server_owner),
 					GFP_NOFS);
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 2782cfc..605840d 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -309,7 +309,6 @@
 
 	if (test_bit(NFS4CLNT_LEASE_CONFIRM, &clp->cl_state))
 		goto do_confirm;
-	nfs4_begin_drain_session(clp);
 	status = nfs4_proc_exchange_id(clp, cred);
 	if (status != 0)
 		goto out;
@@ -1482,6 +1481,8 @@
 					spin_unlock(&state->state_lock);
 				}
 				nfs4_put_open_state(state);
+				clear_bit(NFS4CLNT_RECLAIM_NOGRACE,
+					&state->flags);
 				spin_lock(&sp->so_lock);
 				goto restart;
 			}
@@ -1830,6 +1831,7 @@
 		clp->cl_mvops->reboot_recovery_ops;
 	int status;
 
+	nfs4_begin_drain_session(clp);
 	cred = nfs4_get_clid_cred(clp);
 	if (cred == NULL)
 		return -ENOENT;
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 0aea978..a07555d 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -139,7 +139,8 @@
 #define encode_setclientid_maxsz \
 				(op_encode_hdr_maxsz + \
 				XDR_QUADLEN(NFS4_VERIFIER_SIZE) + \
-				XDR_QUADLEN(NFS4_SETCLIENTID_NAMELEN) + \
+				/* client name */ \
+				1 + XDR_QUADLEN(NFS4_OPAQUE_LIMIT) + \
 				1 /* sc_prog */ + \
 				1 + XDR_QUADLEN(RPCBIND_MAXNETIDLEN) + \
 				1 + XDR_QUADLEN(RPCBIND_MAXUADDRLEN) + \
@@ -288,7 +289,8 @@
 #define encode_exchange_id_maxsz (op_encode_hdr_maxsz + \
 				encode_verifier_maxsz + \
 				1 /* co_ownerid.len */ + \
-				XDR_QUADLEN(NFS4_EXCHANGE_ID_LEN) + \
+				/* eia_clientowner */ \
+				1 + XDR_QUADLEN(NFS4_OPAQUE_LIMIT) + \
 				1 /* flags */ + \
 				1 /* spa_how */ + \
 				/* max is SP4_MACH_CRED (for now) */ + \
@@ -1667,13 +1669,14 @@
 	encode_op_hdr(xdr, OP_SETCLIENTID, decode_setclientid_maxsz, hdr);
 	encode_nfs4_verifier(xdr, setclientid->sc_verifier);
 
-	encode_string(xdr, setclientid->sc_name_len, setclientid->sc_name);
+	encode_string(xdr, strlen(setclientid->sc_clnt->cl_owner_id),
+			setclientid->sc_clnt->cl_owner_id);
 	p = reserve_space(xdr, 4);
 	*p = cpu_to_be32(setclientid->sc_prog);
 	encode_string(xdr, setclientid->sc_netid_len, setclientid->sc_netid);
 	encode_string(xdr, setclientid->sc_uaddr_len, setclientid->sc_uaddr);
 	p = reserve_space(xdr, 4);
-	*p = cpu_to_be32(setclientid->sc_cb_ident);
+	*p = cpu_to_be32(setclientid->sc_clnt->cl_cb_ident);
 }
 
 static void encode_setclientid_confirm(struct xdr_stream *xdr, const struct nfs4_setclientid_res *arg, struct compound_hdr *hdr)
@@ -1747,7 +1750,8 @@
 	encode_op_hdr(xdr, OP_EXCHANGE_ID, decode_exchange_id_maxsz, hdr);
 	encode_nfs4_verifier(xdr, args->verifier);
 
-	encode_string(xdr, args->id_len, args->id);
+	encode_string(xdr, strlen(args->client->cl_owner_id),
+			args->client->cl_owner_id);
 
 	encode_uint32(xdr, args->flags);
 	encode_uint32(xdr, args->state_protect.how);
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 282b393..e9953ab 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -690,8 +690,6 @@
 static void nfs_pgio_release(void *calldata)
 {
 	struct nfs_pgio_header *hdr = calldata;
-	if (hdr->rw_ops->rw_release)
-		hdr->rw_ops->rw_release(hdr);
 	nfs_pgio_data_destroy(hdr);
 	hdr->completion_ops->completion(hdr);
 }
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index dfc19f1..d0f1f21 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1347,11 +1347,6 @@
 	NFS_PROTO(data->inode)->commit_rpc_prepare(task, data);
 }
 
-static void nfs_writeback_release_common(struct nfs_pgio_header *hdr)
-{
-	/* do nothing! */
-}
-
 /*
  * Special version of should_remove_suid() that ignores capabilities.
  */
@@ -2012,7 +2007,6 @@
 	.rw_mode		= FMODE_WRITE,
 	.rw_alloc_header	= nfs_writehdr_alloc,
 	.rw_free_header		= nfs_writehdr_free,
-	.rw_release		= nfs_writeback_release_common,
 	.rw_done		= nfs_writeback_done,
 	.rw_result		= nfs_writeback_result,
 	.rw_initiate		= nfs_initiate_write,
diff --git a/include/linux/nfs_page.h b/include/linux/nfs_page.h
index 3eb072d..f2f650f 100644
--- a/include/linux/nfs_page.h
+++ b/include/linux/nfs_page.h
@@ -67,7 +67,6 @@
 	const fmode_t rw_mode;
 	struct nfs_pgio_header *(*rw_alloc_header)(void);
 	void (*rw_free_header)(struct nfs_pgio_header *);
-	void (*rw_release)(struct nfs_pgio_header *);
 	int  (*rw_done)(struct rpc_task *, struct nfs_pgio_header *,
 			struct inode *);
 	void (*rw_result)(struct rpc_task *, struct nfs_pgio_header *);
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 93ab607..c959e7e 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -984,17 +984,14 @@
 	struct nfs4_sequence_res	seq_res;
 };
 
-#define NFS4_SETCLIENTID_NAMELEN	(127)
 struct nfs4_setclientid {
 	const nfs4_verifier *		sc_verifier;
-	unsigned int			sc_name_len;
-	char				sc_name[NFS4_SETCLIENTID_NAMELEN + 1];
 	u32				sc_prog;
 	unsigned int			sc_netid_len;
 	char				sc_netid[RPCBIND_MAXNETIDLEN + 1];
 	unsigned int			sc_uaddr_len;
 	char				sc_uaddr[RPCBIND_MAXUADDRLEN + 1];
-	u32				sc_cb_ident;
+	struct nfs_client		*sc_clnt;
 	struct rpc_cred			*sc_cred;
 };
 
@@ -1142,12 +1139,9 @@
 	struct nfs4_op_map allow;
 };
 
-#define NFS4_EXCHANGE_ID_LEN	(48)
 struct nfs41_exchange_id_args {
 	struct nfs_client		*client;
 	nfs4_verifier			*verifier;
-	unsigned int 			id_len;
-	char 				id[NFS4_EXCHANGE_ID_LEN];
 	u32				flags;
 	struct nfs41_state_protection	state_protect;
 };
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 2ca67b5..8df43c9f 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -37,7 +37,6 @@
 void xprt_free_bc_request(struct rpc_rqst *req);
 int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
 void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
-int bc_send(struct rpc_rqst *req);
 
 /*
  * Determine if a shared backchannel is in use
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 598ba80..131032f 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -56,6 +56,7 @@
 	struct rpc_rtt *	cl_rtt;		/* RTO estimator data */
 	const struct rpc_timeout *cl_timeout;	/* Timeout strategy */
 
+	atomic_t		cl_swapper;	/* swapfile count */
 	int			cl_nodelen;	/* nodename length */
 	char 			cl_nodename[UNX_MAXNODENAME+1];
 	struct rpc_pipe_dir_head cl_pipedir_objects;
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 5f1e6bd..d703f0e 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -205,8 +205,7 @@
  */
 struct rpc_task *rpc_new_task(const struct rpc_task_setup *);
 struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
-struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
-				const struct rpc_call_ops *ops);
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req);
 void		rpc_put_task(struct rpc_task *);
 void		rpc_put_task_async(struct rpc_task *);
 void		rpc_exit_task(struct rpc_task *);
@@ -269,4 +268,20 @@
 }
 #endif
 
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+int rpc_clnt_swap_activate(struct rpc_clnt *clnt);
+void rpc_clnt_swap_deactivate(struct rpc_clnt *clnt);
+#else
+static inline int
+rpc_clnt_swap_activate(struct rpc_clnt *clnt)
+{
+	return -EINVAL;
+}
+
+static inline void
+rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
+{
+}
+#endif /* CONFIG_SUNRPC_SWAP */
+
 #endif /* _LINUX_SUNRPC_SCHED_H_ */
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 8b93ef5..0fb9acb 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -133,6 +133,9 @@
 	void		(*close)(struct rpc_xprt *xprt);
 	void		(*destroy)(struct rpc_xprt *xprt);
 	void		(*print_stats)(struct rpc_xprt *xprt, struct seq_file *seq);
+	int		(*enable_swap)(struct rpc_xprt *xprt);
+	void		(*disable_swap)(struct rpc_xprt *xprt);
+	void		(*inject_disconnect)(struct rpc_xprt *xprt);
 };
 
 /*
@@ -180,7 +183,7 @@
 	atomic_t		num_reqs;	/* total slots */
 	unsigned long		state;		/* transport state */
 	unsigned char		resvport   : 1; /* use a reserved port */
-	unsigned int		swapper;	/* we're swapping over this
+	atomic_t		swapper;	/* we're swapping over this
 						   transport */
 	unsigned int		bind_index;	/* bind function index */
 
@@ -212,7 +215,8 @@
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
 						/* process the callback */
-	unsigned int		bc_alloc_count;	/* Total number of preallocs */
+	int			bc_alloc_count;	/* Total number of preallocs */
+	atomic_t		bc_free_slots;
 	spinlock_t		bc_pa_lock;	/* Protects the preallocated
 						 * items */
 	struct list_head	bc_pa_list;	/* List of preallocated
@@ -241,6 +245,7 @@
 	const char		*address_strings[RPC_DISPLAY_MAX];
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	struct dentry		*debugfs;		/* debugfs directory */
+	atomic_t		inject_disconnect;
 #endif
 };
 
@@ -327,6 +332,18 @@
 	return p + xprt->tsh_size;
 }
 
+static inline int
+xprt_enable_swap(struct rpc_xprt *xprt)
+{
+	return xprt->ops->enable_swap(xprt);
+}
+
+static inline void
+xprt_disable_swap(struct rpc_xprt *xprt)
+{
+	xprt->ops->disable_swap(xprt);
+}
+
 /*
  * Transport switch helper functions
  */
@@ -345,7 +362,6 @@
 void			xprt_disconnect_done(struct rpc_xprt *xprt);
 void			xprt_force_disconnect(struct rpc_xprt *xprt);
 void			xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
-int			xs_swapper(struct rpc_xprt *xprt, int enable);
 
 bool			xprt_lock_connect(struct rpc_xprt *, struct rpc_task *, void *);
 void			xprt_unlock_connect(struct rpc_xprt *, void *);
@@ -431,6 +447,23 @@
 	return test_and_set_bit(XPRT_BINDING, &xprt->state);
 }
 
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+extern unsigned int rpc_inject_disconnect;
+static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
+{
+	if (!rpc_inject_disconnect)
+		return;
+	if (atomic_dec_return(&xprt->inject_disconnect))
+		return;
+	atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect);
+	xprt->ops->inject_disconnect(xprt);
+}
+#else
+static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
+{
+}
+#endif
+
 #endif /* __KERNEL__*/
 
 #endif /* _LINUX_SUNRPC_XPRT_H */
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 15e6f6c..1b8e68d 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -15,6 +15,6 @@
 	    sunrpc_syms.o cache.o rpc_pipe.o \
 	    svc_xprt.o
 sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
-sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o bc_svc.o
+sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 9dd0ea8d..9825ff0 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -37,16 +37,18 @@
  */
 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 {
-	return xprt->bc_alloc_count > 0;
+	return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
 }
 
 static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
 {
+	atomic_add(n, &xprt->bc_free_slots);
 	xprt->bc_alloc_count += n;
 }
 
 static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
 {
+	atomic_sub(n, &xprt->bc_free_slots);
 	return xprt->bc_alloc_count -= n;
 }
 
@@ -60,13 +62,62 @@
 
 	dprintk("RPC:        free allocations for req= %p\n", req);
 	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
-	xbufp = &req->rq_private_buf;
+	xbufp = &req->rq_rcv_buf;
 	free_page((unsigned long)xbufp->head[0].iov_base);
 	xbufp = &req->rq_snd_buf;
 	free_page((unsigned long)xbufp->head[0].iov_base);
 	kfree(req);
 }
 
+static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
+{
+	struct page *page;
+	/* Preallocate one XDR receive buffer */
+	page = alloc_page(gfp_flags);
+	if (page == NULL)
+		return -ENOMEM;
+	buf->head[0].iov_base = page_address(page);
+	buf->head[0].iov_len = PAGE_SIZE;
+	buf->tail[0].iov_base = NULL;
+	buf->tail[0].iov_len = 0;
+	buf->page_len = 0;
+	buf->len = 0;
+	buf->buflen = PAGE_SIZE;
+	return 0;
+}
+
+static
+struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+	struct rpc_rqst *req;
+
+	/* Pre-allocate one backchannel rpc_rqst */
+	req = kzalloc(sizeof(*req), gfp_flags);
+	if (req == NULL)
+		return NULL;
+
+	req->rq_xprt = xprt;
+	INIT_LIST_HEAD(&req->rq_list);
+	INIT_LIST_HEAD(&req->rq_bc_list);
+
+	/* Preallocate one XDR receive buffer */
+	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
+		printk(KERN_ERR "Failed to create bc receive xbuf\n");
+		goto out_free;
+	}
+	req->rq_rcv_buf.len = PAGE_SIZE;
+
+	/* Preallocate one XDR send buffer */
+	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
+		printk(KERN_ERR "Failed to create bc snd xbuf\n");
+		goto out_free;
+	}
+	return req;
+out_free:
+	xprt_free_allocation(req);
+	return NULL;
+}
+
 /*
  * Preallocate up to min_reqs structures and related buffers for use
  * by the backchannel.  This function can be called multiple times
@@ -87,9 +138,7 @@
  */
 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
 {
-	struct page *page_rcv = NULL, *page_snd = NULL;
-	struct xdr_buf *xbufp = NULL;
-	struct rpc_rqst *req, *tmp;
+	struct rpc_rqst *req;
 	struct list_head tmp_list;
 	int i;
 
@@ -106,7 +155,7 @@
 	INIT_LIST_HEAD(&tmp_list);
 	for (i = 0; i < min_reqs; i++) {
 		/* Pre-allocate one backchannel rpc_rqst */
-		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
+		req = xprt_alloc_bc_req(xprt, GFP_KERNEL);
 		if (req == NULL) {
 			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
 			goto out_free;
@@ -115,41 +164,6 @@
 		/* Add the allocated buffer to the tmp list */
 		dprintk("RPC:       adding req= %p\n", req);
 		list_add(&req->rq_bc_pa_list, &tmp_list);
-
-		req->rq_xprt = xprt;
-		INIT_LIST_HEAD(&req->rq_list);
-		INIT_LIST_HEAD(&req->rq_bc_list);
-
-		/* Preallocate one XDR receive buffer */
-		page_rcv = alloc_page(GFP_KERNEL);
-		if (page_rcv == NULL) {
-			printk(KERN_ERR "Failed to create bc receive xbuf\n");
-			goto out_free;
-		}
-		xbufp = &req->rq_rcv_buf;
-		xbufp->head[0].iov_base = page_address(page_rcv);
-		xbufp->head[0].iov_len = PAGE_SIZE;
-		xbufp->tail[0].iov_base = NULL;
-		xbufp->tail[0].iov_len = 0;
-		xbufp->page_len = 0;
-		xbufp->len = PAGE_SIZE;
-		xbufp->buflen = PAGE_SIZE;
-
-		/* Preallocate one XDR send buffer */
-		page_snd = alloc_page(GFP_KERNEL);
-		if (page_snd == NULL) {
-			printk(KERN_ERR "Failed to create bc snd xbuf\n");
-			goto out_free;
-		}
-
-		xbufp = &req->rq_snd_buf;
-		xbufp->head[0].iov_base = page_address(page_snd);
-		xbufp->head[0].iov_len = 0;
-		xbufp->tail[0].iov_base = NULL;
-		xbufp->tail[0].iov_len = 0;
-		xbufp->page_len = 0;
-		xbufp->len = 0;
-		xbufp->buflen = PAGE_SIZE;
 	}
 
 	/*
@@ -167,7 +181,10 @@
 	/*
 	 * Memory allocation failed, free the temporary list
 	 */
-	list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) {
+	while (!list_empty(&tmp_list)) {
+		req = list_first_entry(&tmp_list,
+				struct rpc_rqst,
+				rq_bc_pa_list);
 		list_del(&req->rq_bc_pa_list);
 		xprt_free_allocation(req);
 	}
@@ -217,9 +234,15 @@
 	struct rpc_rqst *req = NULL;
 
 	dprintk("RPC:       allocate a backchannel request\n");
-	if (list_empty(&xprt->bc_pa_list))
+	if (atomic_read(&xprt->bc_free_slots) <= 0)
 		goto not_found;
-
+	if (list_empty(&xprt->bc_pa_list)) {
+		req = xprt_alloc_bc_req(xprt, GFP_ATOMIC);
+		if (!req)
+			goto not_found;
+		/* Note: this 'free' request adds it to xprt->bc_pa_list */
+		xprt_free_bc_request(req);
+	}
 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 				rq_bc_pa_list);
 	req->rq_reply_bytes_recvd = 0;
@@ -245,11 +268,21 @@
 
 	req->rq_connect_cookie = xprt->connect_cookie - 1;
 	smp_mb__before_atomic();
-	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
 	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
 	smp_mb__after_atomic();
 
-	if (!xprt_need_to_requeue(xprt)) {
+	/*
+	 * Return it to the list of preallocations so that it
+	 * may be reused by a new callback request.
+	 */
+	spin_lock_bh(&xprt->bc_pa_lock);
+	if (xprt_need_to_requeue(xprt)) {
+		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
+		xprt->bc_alloc_count++;
+		req = NULL;
+	}
+	spin_unlock_bh(&xprt->bc_pa_lock);
+	if (req != NULL) {
 		/*
 		 * The last remaining session was destroyed while this
 		 * entry was in use.  Free the entry and don't attempt
@@ -260,14 +293,6 @@
 		xprt_free_allocation(req);
 		return;
 	}
-
-	/*
-	 * Return it to the list of preallocations so that it
-	 * may be reused by a new callback request.
-	 */
-	spin_lock_bh(&xprt->bc_pa_lock);
-	list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
-	spin_unlock_bh(&xprt->bc_pa_lock);
 }
 
 /*
@@ -311,6 +336,7 @@
 
 	spin_lock(&xprt->bc_pa_lock);
 	list_del(&req->rq_bc_pa_list);
+	xprt->bc_alloc_count--;
 	spin_unlock(&xprt->bc_pa_lock);
 
 	req->rq_private_buf.len = copied;
diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c
deleted file mode 100644
index 15c7a8a..0000000
--- a/net/sunrpc/bc_svc.c
+++ /dev/null
@@ -1,63 +0,0 @@
-/******************************************************************************
-
-(c) 2007 Network Appliance, Inc.  All Rights Reserved.
-(c) 2009 NetApp.  All Rights Reserved.
-
-NetApp provides this source code under the GPL v2 License.
-The GPL v2 license is available at
-http://opensource.org/licenses/gpl-license.php.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-******************************************************************************/
-
-/*
- * The NFSv4.1 callback service helper routines.
- * They implement the transport level processing required to send the
- * reply over an existing open connection previously established by the client.
- */
-
-#include <linux/module.h>
-
-#include <linux/sunrpc/xprt.h>
-#include <linux/sunrpc/sched.h>
-#include <linux/sunrpc/bc_xprt.h>
-
-#define RPCDBG_FACILITY	RPCDBG_SVCDSP
-
-/* Empty callback ops */
-static const struct rpc_call_ops nfs41_callback_ops = {
-};
-
-
-/*
- * Send the callback reply
- */
-int bc_send(struct rpc_rqst *req)
-{
-	struct rpc_task *task;
-	int ret;
-
-	dprintk("RPC:       bc_send req= %p\n", req);
-	task = rpc_run_bc_task(req, &nfs41_callback_ops);
-	if (IS_ERR(task))
-		ret = PTR_ERR(task);
-	else {
-		WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
-		ret = task->tk_status;
-		rpc_put_task(task);
-	}
-	dprintk("RPC:       bc_send ret= %d\n", ret);
-	return ret;
-}
-
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index e6ce151..f41ed88 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -891,15 +891,8 @@
 			task->tk_flags |= RPC_TASK_SOFT;
 		if (clnt->cl_noretranstimeo)
 			task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
-		if (sk_memalloc_socks()) {
-			struct rpc_xprt *xprt;
-
-			rcu_read_lock();
-			xprt = rcu_dereference(clnt->cl_xprt);
-			if (xprt->swapper)
-				task->tk_flags |= RPC_TASK_SWAPPER;
-			rcu_read_unlock();
-		}
+		if (atomic_read(&clnt->cl_swapper))
+			task->tk_flags |= RPC_TASK_SWAPPER;
 		/* Add to the client's list of all tasks */
 		spin_lock(&clnt->cl_lock);
 		list_add_tail(&task->tk_task, &clnt->cl_tasks);
@@ -1031,15 +1024,13 @@
  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
  * rpc_execute against it
  * @req: RPC request
- * @tk_ops: RPC call ops
  */
-struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
-				const struct rpc_call_ops *tk_ops)
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 {
 	struct rpc_task *task;
 	struct xdr_buf *xbufp = &req->rq_snd_buf;
 	struct rpc_task_setup task_setup_data = {
-		.callback_ops = tk_ops,
+		.callback_ops = &rpc_default_ops,
 	};
 
 	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
@@ -1614,6 +1605,7 @@
 					req->rq_callsize + req->rq_rcvsize);
 	if (req->rq_buffer != NULL)
 		return;
+	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
 
@@ -1951,24 +1943,22 @@
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 
-	if (!xprt_prepare_transmit(task)) {
-		/*
-		 * Could not reserve the transport. Try again after the
-		 * transport is released.
-		 */
-		task->tk_status = 0;
-		task->tk_action = call_bc_transmit;
-		return;
-	}
+	if (!xprt_prepare_transmit(task))
+		goto out_retry;
 
-	task->tk_action = rpc_exit_task;
 	if (task->tk_status < 0) {
 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
 			"error: %d\n", task->tk_status);
-		return;
+		goto out_done;
 	}
+	if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
+		req->rq_bytes_sent = 0;
 
 	xprt_transmit(task);
+
+	if (task->tk_status == -EAGAIN)
+		goto out_nospace;
+
 	xprt_end_transmit(task);
 	dprint_status(task);
 	switch (task->tk_status) {
@@ -2002,6 +1992,13 @@
 		break;
 	}
 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
+out_done:
+	task->tk_action = rpc_exit_task;
+	return;
+out_nospace:
+	req->rq_connect_cookie = req->rq_xprt->connect_cookie;
+out_retry:
+	task->tk_status = 0;
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
@@ -2476,3 +2473,59 @@
 	spin_unlock(&sn->rpc_client_lock);
 }
 #endif
+
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+int
+rpc_clnt_swap_activate(struct rpc_clnt *clnt)
+{
+	int ret = 0;
+	struct rpc_xprt	*xprt;
+
+	if (atomic_inc_return(&clnt->cl_swapper) == 1) {
+retry:
+		rcu_read_lock();
+		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+		rcu_read_unlock();
+		if (!xprt) {
+			/*
+			 * If we didn't get a reference, then we likely are
+			 * racing with a migration event. Wait for a grace
+			 * period and try again.
+			 */
+			synchronize_rcu();
+			goto retry;
+		}
+
+		ret = xprt_enable_swap(xprt);
+		xprt_put(xprt);
+	}
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
+
+void
+rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt	*xprt;
+
+	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
+retry:
+		rcu_read_lock();
+		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+		rcu_read_unlock();
+		if (!xprt) {
+			/*
+			 * If we didn't get a reference, then we likely are
+			 * racing with a migration event. Wait for a grace
+			 * period and try again.
+			 */
+			synchronize_rcu();
+			goto retry;
+		}
+
+		xprt_disable_swap(xprt);
+		xprt_put(xprt);
+	}
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
+#endif /* CONFIG_SUNRPC_SWAP */
diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c
index 82962f7..e7b4d93 100644
--- a/net/sunrpc/debugfs.c
+++ b/net/sunrpc/debugfs.c
@@ -10,9 +10,12 @@
 #include "netns.h"
 
 static struct dentry *topdir;
+static struct dentry *rpc_fault_dir;
 static struct dentry *rpc_clnt_dir;
 static struct dentry *rpc_xprt_dir;
 
+unsigned int rpc_inject_disconnect;
+
 struct rpc_clnt_iter {
 	struct rpc_clnt	*clnt;
 	loff_t		pos;
@@ -257,6 +260,8 @@
 		debugfs_remove_recursive(xprt->debugfs);
 		xprt->debugfs = NULL;
 	}
+
+	atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect);
 }
 
 void
@@ -266,11 +271,79 @@
 	xprt->debugfs = NULL;
 }
 
+static int
+fault_open(struct inode *inode, struct file *filp)
+{
+	filp->private_data = kmalloc(128, GFP_KERNEL);
+	if (!filp->private_data)
+		return -ENOMEM;
+	return 0;
+}
+
+static int
+fault_release(struct inode *inode, struct file *filp)
+{
+	kfree(filp->private_data);
+	return 0;
+}
+
+static ssize_t
+fault_disconnect_read(struct file *filp, char __user *user_buf,
+		      size_t len, loff_t *offset)
+{
+	char *buffer = (char *)filp->private_data;
+	size_t size;
+
+	size = sprintf(buffer, "%u\n", rpc_inject_disconnect);
+	return simple_read_from_buffer(user_buf, len, offset, buffer, size);
+}
+
+static ssize_t
+fault_disconnect_write(struct file *filp, const char __user *user_buf,
+		       size_t len, loff_t *offset)
+{
+	char buffer[16];
+
+	if (len >= sizeof(buffer))
+		len = sizeof(buffer) - 1;
+	if (copy_from_user(buffer, user_buf, len))
+		return -EFAULT;
+	buffer[len] = '\0';
+	if (kstrtouint(buffer, 10, &rpc_inject_disconnect))
+		return -EINVAL;
+	return len;
+}
+
+static const struct file_operations fault_disconnect_fops = {
+	.owner		= THIS_MODULE,
+	.open		= fault_open,
+	.read		= fault_disconnect_read,
+	.write		= fault_disconnect_write,
+	.release	= fault_release,
+};
+
+static struct dentry *
+inject_fault_dir(struct dentry *topdir)
+{
+	struct dentry *faultdir;
+
+	faultdir = debugfs_create_dir("inject_fault", topdir);
+	if (!faultdir)
+		return NULL;
+
+	if (!debugfs_create_file("disconnect", S_IFREG | S_IRUSR, faultdir,
+				 NULL, &fault_disconnect_fops))
+		return NULL;
+
+	return faultdir;
+}
+
 void __exit
 sunrpc_debugfs_exit(void)
 {
 	debugfs_remove_recursive(topdir);
 	topdir = NULL;
+	rpc_fault_dir = NULL;
 	rpc_clnt_dir = NULL;
 	rpc_xprt_dir = NULL;
 }
@@ -282,6 +355,10 @@
 	if (!topdir)
 		return;
 
+	rpc_fault_dir = inject_fault_dir(topdir);
+	if (!rpc_fault_dir)
+		goto out_remove;
+
 	rpc_clnt_dir = debugfs_create_dir("rpc_clnt", topdir);
 	if (!rpc_clnt_dir)
 		goto out_remove;
@@ -294,5 +371,6 @@
 out_remove:
 	debugfs_remove_recursive(topdir);
 	topdir = NULL;
+	rpc_fault_dir = NULL;
 	rpc_clnt_dir = NULL;
 }
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 78974e4..b47f02f 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1350,6 +1350,11 @@
 {
 	struct kvec	*argv = &rqstp->rq_arg.head[0];
 	struct kvec	*resv = &rqstp->rq_res.head[0];
+	struct rpc_task *task;
+	int proc_error;
+	int error;
+
+	dprintk("svc: %s(%p)\n", __func__, req);
 
 	/* Build the svc_rqst used by the common processing routine */
 	rqstp->rq_xprt = serv->sv_bc_xprt;
@@ -1372,21 +1377,36 @@
 
 	/*
 	 * Skip the next two words because they've already been
-	 * processed in the trasport
+	 * processed in the transport
 	 */
 	svc_getu32(argv);	/* XID */
 	svc_getnl(argv);	/* CALLDIR */
 
-	/* Returns 1 for send, 0 for drop */
-	if (svc_process_common(rqstp, argv, resv)) {
-		memcpy(&req->rq_snd_buf, &rqstp->rq_res,
-						sizeof(req->rq_snd_buf));
-		return bc_send(req);
-	} else {
-		/* drop request */
+	/* Parse and execute the bc call */
+	proc_error = svc_process_common(rqstp, argv, resv);
+
+	atomic_inc(&req->rq_xprt->bc_free_slots);
+	if (!proc_error) {
+		/* Processing error: drop the request */
 		xprt_free_bc_request(req);
 		return 0;
 	}
+
+	/* Finally, send the reply synchronously */
+	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
+	task = rpc_run_bc_task(req);
+	if (IS_ERR(task)) {
+		error = PTR_ERR(task);
+		goto out;
+	}
+
+	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
+	error = task->tk_status;
+	rpc_put_task(task);
+
+out:
+	dprintk("svc: %s(), error=%d\n", __func__, error);
+	return error;
 }
 EXPORT_SYMBOL_GPL(bc_svc_process);
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 1d4fe24..3ca31f2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -68,6 +68,7 @@
 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 static void	xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
+static void     __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
 static void	 xprt_destroy(struct rpc_xprt *xprt);
 
 static DEFINE_SPINLOCK(xprt_list_lock);
@@ -250,6 +251,8 @@
 	}
 	xprt_clear_locked(xprt);
 out_sleep:
+	if (req)
+		__xprt_put_cong(xprt, req);
 	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
@@ -967,6 +970,7 @@
 		task->tk_status = status;
 		return;
 	}
+	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 	task->tk_flags |= RPC_TASK_SENT;
@@ -1285,6 +1289,7 @@
 	spin_unlock_bh(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(req->rq_buffer);
+	xprt_inject_disconnect(xprt);
 	if (req->rq_cred != NULL)
 		put_rpccred(req->rq_cred);
 	task->tk_rqstp = NULL;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ed861ff..9facc71 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -246,6 +246,16 @@
 	xprt_clear_connecting(xprt);
 }
 
+static void
+xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
+						   rx_xprt);
+
+	pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt);
+	rdma_disconnect(r_xprt->rx_ia.ri_id);
+}
+
 /*
  * xprt_rdma_destroy
  *
@@ -676,6 +686,17 @@
 	   r_xprt->rx_stats.bad_reply_count);
 }
 
+static int
+xprt_rdma_enable_swap(struct rpc_xprt *xprt)
+{
+	return -EINVAL;
+}
+
+static void
+xprt_rdma_disable_swap(struct rpc_xprt *xprt)
+{
+}
+
 /*
  * Plumbing for rpc transport switch and kernel module
  */
@@ -694,7 +715,10 @@
 	.send_request		= xprt_rdma_send_request,
 	.close			= xprt_rdma_close,
 	.destroy		= xprt_rdma_destroy,
-	.print_stats		= xprt_rdma_print_stats
+	.print_stats		= xprt_rdma_print_stats,
+	.enable_swap		= xprt_rdma_enable_swap,
+	.disable_swap		= xprt_rdma_disable_swap,
+	.inject_disconnect	= xprt_rdma_inject_disconnect
 };
 
 static struct xprt_class xprt_rdma = {
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 66891e3..fda8ec8 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -827,6 +827,9 @@
 	if (sk == NULL)
 		return;
 
+	if (atomic_read(&transport->xprt.swapper))
+		sk_clear_memalloc(sk);
+
 	write_lock_bh(&sk->sk_callback_lock);
 	transport->inet = NULL;
 	transport->sock = NULL;
@@ -863,6 +866,13 @@
 	xprt_disconnect_done(xprt);
 }
 
+static void xs_inject_disconnect(struct rpc_xprt *xprt)
+{
+	dprintk("RPC:       injecting transport disconnect on xprt=%p\n",
+		xprt);
+	xprt_disconnect_done(xprt);
+}
+
 static void xs_xprt_free(struct rpc_xprt *xprt)
 {
 	xs_free_peer_addresses(xprt);
@@ -901,7 +911,6 @@
 /**
  * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
  * @sk: socket with data to read
- * @len: how much data to read
  *
  * Currently this assumes we can read the whole reply in a single gulp.
  */
@@ -965,7 +974,6 @@
 /**
  * xs_udp_data_ready - "data ready" callback for UDP sockets
  * @sk: socket with data to read
- * @len: how much data to read
  *
  */
 static void xs_udp_data_ready(struct sock *sk)
@@ -1389,7 +1397,6 @@
 /**
  * xs_tcp_data_ready - "data ready" callback for TCP sockets
  * @sk: socket with data to read
- * @bytes: how much data to read
  *
  */
 static void xs_tcp_data_ready(struct sock *sk)
@@ -1886,9 +1893,7 @@
 
 /**
  * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
- * @xprt: RPC transport to connect
  * @transport: socket transport to connect
- * @create_sock: function to create a socket of the correct type
  */
 static int xs_local_setup_socket(struct sock_xprt *transport)
 {
@@ -1960,43 +1965,84 @@
 		msleep_interruptible(15000);
 }
 
-#ifdef CONFIG_SUNRPC_SWAP
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+/*
+ * Note that this should be called with XPRT_LOCKED held (or when we otherwise
+ * know that we have exclusive access to the socket), to guard against
+ * races with xs_reset_transport.
+ */
 static void xs_set_memalloc(struct rpc_xprt *xprt)
 {
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
 			xprt);
 
-	if (xprt->swapper)
+	/*
+	 * If there's no sock, then we have nothing to set. The
+	 * reconnecting process will get it for us.
+	 */
+	if (!transport->inet)
+		return;
+	if (atomic_read(&xprt->swapper))
 		sk_set_memalloc(transport->inet);
 }
 
 /**
- * xs_swapper - Tag this transport as being used for swap.
+ * xs_enable_swap - Tag this transport as being used for swap.
  * @xprt: transport to tag
- * @enable: enable/disable
  *
+ * Take a reference to this transport on behalf of the rpc_clnt, and
+ * optionally mark it for swapping if it wasn't already.
  */
-int xs_swapper(struct rpc_xprt *xprt, int enable)
+static int
+xs_enable_swap(struct rpc_xprt *xprt)
 {
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
-			xprt);
-	int err = 0;
+	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
 
-	if (enable) {
-		xprt->swapper++;
-		xs_set_memalloc(xprt);
-	} else if (xprt->swapper) {
-		xprt->swapper--;
-		sk_clear_memalloc(transport->inet);
-	}
-
-	return err;
+	if (atomic_inc_return(&xprt->swapper) != 1)
+		return 0;
+	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
+		return -ERESTARTSYS;
+	if (xs->inet)
+		sk_set_memalloc(xs->inet);
+	xprt_release_xprt(xprt, NULL);
+	return 0;
 }
-EXPORT_SYMBOL_GPL(xs_swapper);
+
+/**
+ * xs_disable_swap - Untag this transport as being used for swap.
+ * @xprt: transport to tag
+ *
+ * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the
+ * swapper refcount goes to 0, untag the socket as a memalloc socket.
+ */
+static void
+xs_disable_swap(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
+
+	if (!atomic_dec_and_test(&xprt->swapper))
+		return;
+	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
+		return;
+	if (xs->inet)
+		sk_clear_memalloc(xs->inet);
+	xprt_release_xprt(xprt, NULL);
+}
 #else
 static void xs_set_memalloc(struct rpc_xprt *xprt)
 {
 }
+
+static int
+xs_enable_swap(struct rpc_xprt *xprt)
+{
+	return -EINVAL;
+}
+
+static void
+xs_disable_swap(struct rpc_xprt *xprt)
+{
+}
 #endif
 
 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
@@ -2125,9 +2171,6 @@
 
 /**
  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
- * @xprt: RPC transport to connect
- * @transport: socket transport to connect
- * @create_sock: function to create a socket of the correct type
  *
  * Invoked by a work queue tasklet.
  */
@@ -2463,6 +2506,8 @@
 	.close			= xs_close,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_local_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
 };
 
 static struct rpc_xprt_ops xs_udp_ops = {
@@ -2482,6 +2527,9 @@
 	.close			= xs_close,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_udp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 static struct rpc_xprt_ops xs_tcp_ops = {
@@ -2498,6 +2546,9 @@
 	.close			= xs_tcp_shutdown,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_tcp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 /*
@@ -2515,6 +2566,9 @@
 	.close			= bc_close,
 	.destroy		= bc_destroy,
 	.print_stats		= xs_tcp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 static int xs_init_anyaddr(const int family, struct sockaddr *sap)