Merge tag 'nfsd-5.12-1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux

Pull more nfsd updates from Chuck Lever:
 "Here are a few additional NFSD commits for the merge window:

 Optimization:
   - Cork the socket while there are queued replies

  Fixes:
   - DRC shutdown ordering
   - svc_rdma_accept() lockdep splat"

* tag 'nfsd-5.12-1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux:
  SUNRPC: Further clean up svc_tcp_sendmsg()
  SUNRPC: Remove redundant socket flags from svc_tcp_sendmsg()
  SUNRPC: Use TCP_CORK to optimise send performance on the server
  svcrdma: Hold private mutex while invoking rdma_accept()
  nfsd: register pernet ops last, unregister first
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index 4f6e514..ef86ed2 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -1525,12 +1525,9 @@
 	int retval;
 	printk(KERN_INFO "Installing knfsd (copyright (C) 1996 okir@monad.swb.de).\n");
 
-	retval = register_pernet_subsys(&nfsd_net_ops);
-	if (retval < 0)
-		return retval;
 	retval = register_cld_notifier();
 	if (retval)
-		goto out_unregister_pernet;
+		return retval;
 	retval = nfsd4_init_slabs();
 	if (retval)
 		goto out_unregister_notifier;
@@ -1549,9 +1546,14 @@
 		goto out_free_lockd;
 	retval = register_filesystem(&nfsd_fs_type);
 	if (retval)
+		goto out_free_exports;
+	retval = register_pernet_subsys(&nfsd_net_ops);
+	if (retval < 0)
 		goto out_free_all;
 	return 0;
 out_free_all:
+	unregister_pernet_subsys(&nfsd_net_ops);
+out_free_exports:
 	remove_proc_entry("fs/nfs/exports", NULL);
 	remove_proc_entry("fs/nfs", NULL);
 out_free_lockd:
@@ -1565,13 +1567,12 @@
 	nfsd4_free_slabs();
 out_unregister_notifier:
 	unregister_cld_notifier();
-out_unregister_pernet:
-	unregister_pernet_subsys(&nfsd_net_ops);
 	return retval;
 }
 
 static void __exit exit_nfsd(void)
 {
+	unregister_pernet_subsys(&nfsd_net_ops);
 	nfsd_drc_slab_free();
 	remove_proc_entry("fs/nfs/exports", NULL);
 	remove_proc_entry("fs/nfs", NULL);
@@ -1581,7 +1582,6 @@
 	nfsd4_exit_pnfs();
 	unregister_filesystem(&nfsd_fs_type);
 	unregister_cld_notifier();
-	unregister_pernet_subsys(&nfsd_net_ops);
 }
 
 MODULE_AUTHOR("Olaf Kirch <okir@monad.swb.de>");
diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
index b7ac7fe6..bcc555c 100644
--- a/include/linux/sunrpc/svcsock.h
+++ b/include/linux/sunrpc/svcsock.h
@@ -35,6 +35,8 @@
 	/* Total length of the data (not including fragment headers)
 	 * received so far in the fragments making up this rpc: */
 	u32			sk_datalen;
+	/* Number of queued send requests */
+	atomic_t		sk_sendqlen;
 
 	struct page *		sk_pages[RPCSVC_MAXPAGES];	/* received data */
 };
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 5a809c6..2e2f007 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1078,9 +1078,8 @@
  * In addition, the logic assumes that * .bv_len is never larger
  * than PAGE_SIZE.
  */
-static int svc_tcp_sendmsg(struct socket *sock, struct msghdr *msg,
-			   struct xdr_buf *xdr, rpc_fraghdr marker,
-			   unsigned int *sentp)
+static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+			   rpc_fraghdr marker, unsigned int *sentp)
 {
 	const struct kvec *head = xdr->head;
 	const struct kvec *tail = xdr->tail;
@@ -1088,21 +1087,22 @@
 		.iov_base	= &marker,
 		.iov_len	= sizeof(marker),
 	};
-	int flags, ret;
+	struct msghdr msg = {
+		.msg_flags	= 0,
+	};
+	int ret;
 
 	*sentp = 0;
 	xdr_alloc_bvec(xdr, GFP_KERNEL);
 
-	msg->msg_flags = MSG_MORE;
-	ret = kernel_sendmsg(sock, msg, &rm, 1, rm.iov_len);
+	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
 	if (ret < 0)
 		return ret;
 	*sentp += ret;
 	if (ret != rm.iov_len)
 		return -EAGAIN;
 
-	flags = head->iov_len < xdr->len ? MSG_MORE | MSG_SENDPAGE_NOTLAST : 0;
-	ret = svc_tcp_send_kvec(sock, head, flags);
+	ret = svc_tcp_send_kvec(sock, head, 0);
 	if (ret < 0)
 		return ret;
 	*sentp += ret;
@@ -1116,15 +1116,11 @@
 		bvec = xdr->bvec + (xdr->page_base >> PAGE_SHIFT);
 		offset = offset_in_page(xdr->page_base);
 		remaining = xdr->page_len;
-		flags = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 		while (remaining > 0) {
-			if (remaining <= PAGE_SIZE && tail->iov_len == 0)
-				flags = 0;
-
 			len = min(remaining, bvec->bv_len - offset);
 			ret = kernel_sendpage(sock, bvec->bv_page,
 					      bvec->bv_offset + offset,
-					      len, flags);
+					      len, 0);
 			if (ret < 0)
 				return ret;
 			*sentp += ret;
@@ -1163,26 +1159,28 @@
 	struct xdr_buf *xdr = &rqstp->rq_res;
 	rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
 					 (u32)xdr->len);
-	struct msghdr msg = {
-		.msg_flags	= 0,
-	};
 	unsigned int sent;
 	int err;
 
 	svc_tcp_release_rqst(rqstp);
 
+	atomic_inc(&svsk->sk_sendqlen);
 	mutex_lock(&xprt->xpt_mutex);
 	if (svc_xprt_is_dead(xprt))
 		goto out_notconn;
-	err = svc_tcp_sendmsg(svsk->sk_sock, &msg, xdr, marker, &sent);
+	tcp_sock_set_cork(svsk->sk_sk, true);
+	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
 	xdr_free_bvec(xdr);
 	trace_svcsock_tcp_send(xprt, err < 0 ? err : sent);
 	if (err < 0 || sent != (xdr->len + sizeof(marker)))
 		goto out_close;
+	if (atomic_dec_and_test(&svsk->sk_sendqlen))
+		tcp_sock_set_cork(svsk->sk_sk, false);
 	mutex_unlock(&xprt->xpt_mutex);
 	return sent;
 
 out_notconn:
+	atomic_dec(&svsk->sk_sendqlen);
 	mutex_unlock(&xprt->xpt_mutex);
 	return -ENOTCONN;
 out_close:
@@ -1192,6 +1190,7 @@
 		  (err < 0) ? err : sent, xdr->len);
 	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 	svc_xprt_enqueue(xprt);
+	atomic_dec(&svsk->sk_sendqlen);
 	mutex_unlock(&xprt->xpt_mutex);
 	return -EAGAIN;
 }
@@ -1261,7 +1260,7 @@
 		svsk->sk_datalen = 0;
 		memset(&svsk->sk_pages[0], 0, sizeof(svsk->sk_pages));
 
-		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
+		tcp_sock_set_nodelay(sk);
 
 		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 		switch (sk->sk_state) {
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index afba4e9..c895f80 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -475,9 +475,6 @@
 	if (!svc_rdma_post_recvs(newxprt))
 		goto errout;
 
-	/* Swap out the handler */
-	newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
-
 	/* Construct RDMA-CM private message */
 	pmsg.cp_magic = rpcrdma_cmp_magic;
 	pmsg.cp_version = RPCRDMA_CMP_VERSION;
@@ -498,7 +495,10 @@
 	}
 	conn_param.private_data = &pmsg;
 	conn_param.private_data_len = sizeof(pmsg);
+	rdma_lock_handler(newxprt->sc_cm_id);
+	newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
 	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+	rdma_unlock_handler(newxprt->sc_cm_id);
 	if (ret) {
 		trace_svcrdma_accept_err(newxprt, ret);
 		goto errout;