Merge tag 'dlm-5.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes a some improvements to the dlm networking layer:
  improving the ability to trace dlm messages for debugging, and
  improved handling of bad messages or disrupted connections"

* tag 'dlm-5.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: implement tcp graceful shutdown
  fs: dlm: change handling of reconnects
  fs: dlm: don't close socket on invalid message
  fs: dlm: set skb mark per peer socket
  fs: dlm: set skb mark for listen socket
  net: sock: add sock_set_mark
  dlm: Fix kobject memleak
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 3b21082..47f0b98 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -73,6 +73,7 @@
 	unsigned int cl_log_debug;
 	unsigned int cl_log_info;
 	unsigned int cl_protocol;
+	unsigned int cl_mark;
 	unsigned int cl_timewarn_cs;
 	unsigned int cl_waitwarn_us;
 	unsigned int cl_new_rsb_count;
@@ -96,6 +97,7 @@
 	CLUSTER_ATTR_LOG_DEBUG,
 	CLUSTER_ATTR_LOG_INFO,
 	CLUSTER_ATTR_PROTOCOL,
+	CLUSTER_ATTR_MARK,
 	CLUSTER_ATTR_TIMEWARN_CS,
 	CLUSTER_ATTR_WAITWARN_US,
 	CLUSTER_ATTR_NEW_RSB_COUNT,
@@ -168,6 +170,7 @@
 CLUSTER_ATTR(log_debug, 0);
 CLUSTER_ATTR(log_info, 0);
 CLUSTER_ATTR(protocol, 0);
+CLUSTER_ATTR(mark, 0);
 CLUSTER_ATTR(timewarn_cs, 1);
 CLUSTER_ATTR(waitwarn_us, 0);
 CLUSTER_ATTR(new_rsb_count, 0);
@@ -183,6 +186,7 @@
 	[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug,
 	[CLUSTER_ATTR_LOG_INFO] = &cluster_attr_log_info,
 	[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol,
+	[CLUSTER_ATTR_MARK] = &cluster_attr_mark,
 	[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs,
 	[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us,
 	[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count,
@@ -196,6 +200,7 @@
 	COMM_ATTR_LOCAL,
 	COMM_ATTR_ADDR,
 	COMM_ATTR_ADDR_LIST,
+	COMM_ATTR_MARK,
 };
 
 enum {
@@ -228,6 +233,7 @@
 	int nodeid;
 	int local;
 	int addr_count;
+	unsigned int mark;
 	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 };
 
@@ -465,6 +471,7 @@
 	cm->nodeid = -1;
 	cm->local = 0;
 	cm->addr_count = 0;
+	cm->mark = 0;
 	return &cm->item;
 }
 
@@ -660,8 +667,28 @@
 	return 4096 - allowance;
 }
 
+static ssize_t comm_mark_show(struct config_item *item, char *buf)
+{
+	return sprintf(buf, "%u\n", config_item_to_comm(item)->mark);
+}
+
+static ssize_t comm_mark_store(struct config_item *item, const char *buf,
+			       size_t len)
+{
+	unsigned int mark;
+	int rc;
+
+	rc = kstrtouint(buf, 0, &mark);
+	if (rc)
+		return rc;
+
+	config_item_to_comm(item)->mark = mark;
+	return len;
+}
+
 CONFIGFS_ATTR(comm_, nodeid);
 CONFIGFS_ATTR(comm_, local);
+CONFIGFS_ATTR(comm_, mark);
 CONFIGFS_ATTR_WO(comm_, addr);
 CONFIGFS_ATTR_RO(comm_, addr_list);
 
@@ -670,6 +697,7 @@
 	[COMM_ATTR_LOCAL] = &comm_attr_local,
 	[COMM_ATTR_ADDR] = &comm_attr_addr,
 	[COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list,
+	[COMM_ATTR_MARK] = &comm_attr_mark,
 	NULL,
 };
 
@@ -829,6 +857,20 @@
 	return 0;
 }
 
+int dlm_comm_mark(int nodeid, unsigned int *mark)
+{
+	struct dlm_comm *cm;
+
+	cm = get_comm(nodeid);
+	if (!cm)
+		return -ENOENT;
+
+	*mark = cm->mark;
+	put_comm(cm);
+
+	return 0;
+}
+
 int dlm_our_nodeid(void)
 {
 	return local_comm ? local_comm->nodeid : 0;
@@ -855,6 +897,7 @@
 #define DEFAULT_LOG_DEBUG          0
 #define DEFAULT_LOG_INFO           1
 #define DEFAULT_PROTOCOL           0
+#define DEFAULT_MARK               0
 #define DEFAULT_TIMEWARN_CS      500 /* 5 sec = 500 centiseconds */
 #define DEFAULT_WAITWARN_US	   0
 #define DEFAULT_NEW_RSB_COUNT    128
@@ -871,6 +914,7 @@
 	.ci_log_debug = DEFAULT_LOG_DEBUG,
 	.ci_log_info = DEFAULT_LOG_INFO,
 	.ci_protocol = DEFAULT_PROTOCOL,
+	.ci_mark = DEFAULT_MARK,
 	.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
 	.ci_waitwarn_us = DEFAULT_WAITWARN_US,
 	.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 2b471aa..f62996c 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -31,6 +31,7 @@
 	int ci_log_debug;
 	int ci_log_info;
 	int ci_protocol;
+	int ci_mark;
 	int ci_timewarn_cs;
 	int ci_waitwarn_us;
 	int ci_new_rsb_count;
@@ -45,6 +46,7 @@
 int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
 		     int *count_out);
 int dlm_comm_seq(int nodeid, uint32_t *seq);
+int dlm_comm_mark(int nodeid, unsigned int *mark);
 int dlm_our_nodeid(void);
 int dlm_our_addr(struct sockaddr_storage *addr, int num);
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index e93670e..624617c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -622,6 +622,9 @@
 	wait_event(ls->ls_recover_lock_wait,
 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 
+	/* let kobject handle freeing of ls if there's an error */
+	do_unreg = 1;
+
 	ls->ls_kobj.kset = dlm_kset;
 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 				     "%s", ls->ls_name);
@@ -629,9 +632,6 @@
 		goto out_recoverd;
 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 
-	/* let kobject handle freeing of ls if there's an error */
-	do_unreg = 1;
-
 	/* This uevent triggers dlm_controld in userspace to add us to the
 	   group of nodes that are members of this lockspace (managed by the
 	   cluster infrastructure.)  Once it's done that, it tells us who the
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 3543a8f..5050fe0 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,6 +63,7 @@
 
 /* Number of messages to send before rescheduling */
 #define MAX_SEND_MSG_COUNT 25
+#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
 
 struct cbuf {
 	unsigned int base;
@@ -110,10 +111,12 @@
 #define CF_CLOSE 6
 #define CF_APP_LIMITED 7
 #define CF_CLOSING 8
+#define CF_SHUTDOWN 9
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	int (*rx_action) (struct connection *);	/* What to do when active */
 	void (*connect_action) (struct connection *);	/* What to do to connect */
+	void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
 	struct page *rx_page;
 	struct cbuf cb;
 	int retries;
@@ -122,6 +125,7 @@
 	struct connection *othercon;
 	struct work_struct rwork; /* Receive workqueue */
 	struct work_struct swork; /* Send workqueue */
+	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -218,6 +222,7 @@
 	spin_lock_init(&con->writequeue_lock);
 	INIT_WORK(&con->swork, process_send_sockets);
 	INIT_WORK(&con->rwork, process_recv_sockets);
+	init_waitqueue_head(&con->shutdown_wait);
 
 	/* Setup action pointers for child sockets */
 	if (con->nodeid) {
@@ -619,6 +624,54 @@
 	clear_bit(CF_CLOSING, &con->flags);
 }
 
+static void shutdown_connection(struct connection *con)
+{
+	int ret;
+
+	if (cancel_work_sync(&con->swork)) {
+		log_print("canceled swork for node %d", con->nodeid);
+		clear_bit(CF_WRITE_PENDING, &con->flags);
+	}
+
+	mutex_lock(&con->sock_mutex);
+	/* nothing to shutdown */
+	if (!con->sock) {
+		mutex_unlock(&con->sock_mutex);
+		return;
+	}
+
+	set_bit(CF_SHUTDOWN, &con->flags);
+	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
+	mutex_unlock(&con->sock_mutex);
+	if (ret) {
+		log_print("Connection %p failed to shutdown: %d will force close",
+			  con, ret);
+		goto force_close;
+	} else {
+		ret = wait_event_timeout(con->shutdown_wait,
+					 !test_bit(CF_SHUTDOWN, &con->flags),
+					 DLM_SHUTDOWN_WAIT_TIMEOUT);
+		if (ret == 0) {
+			log_print("Connection %p shutdown timed out, will force close",
+				  con);
+			goto force_close;
+		}
+	}
+
+	return;
+
+force_close:
+	clear_bit(CF_SHUTDOWN, &con->flags);
+	close_connection(con, false, true, true);
+}
+
+static void dlm_tcp_shutdown(struct connection *con)
+{
+	if (con->othercon)
+		shutdown_connection(con->othercon);
+	shutdown_connection(con);
+}
+
 /* Data received from remote end */
 static int receive_from_sock(struct connection *con)
 {
@@ -685,14 +738,14 @@
 					  page_address(con->rx_page),
 					  con->cb.base, con->cb.len,
 					  PAGE_SIZE);
-	if (ret == -EBADMSG) {
-		log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
-			  page_address(con->rx_page), con->cb.base,
+	if (ret < 0) {
+		log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
+			  ret, page_address(con->rx_page), con->cb.base,
 			  con->cb.len, r);
+		cbuf_eat(&con->cb, r);
+	} else {
+		cbuf_eat(&con->cb, ret);
 	}
-	if (ret < 0)
-		goto out_close;
-	cbuf_eat(&con->cb, ret);
 
 	if (cbuf_empty(&con->cb) && !call_again_soon) {
 		__free_page(con->rx_page);
@@ -713,13 +766,18 @@
 out_close:
 	mutex_unlock(&con->sock_mutex);
 	if (ret != -EAGAIN) {
-		close_connection(con, true, true, false);
 		/* Reconnect when there is something to send */
+		close_connection(con, false, true, false);
+		if (ret == 0) {
+			log_print("connection %p got EOF from %d",
+				  con, con->nodeid);
+			/* handling for tcp shutdown */
+			clear_bit(CF_SHUTDOWN, &con->flags);
+			wake_up(&con->shutdown_wait);
+			/* signal to breaking receive worker */
+			ret = -1;
+		}
 	}
-	/* Don't return success if we really got EOF */
-	if (ret == 0)
-		ret = -EAGAIN;
-
 	return ret;
 }
 
@@ -803,22 +861,18 @@
 			spin_lock_init(&othercon->writequeue_lock);
 			INIT_WORK(&othercon->swork, process_send_sockets);
 			INIT_WORK(&othercon->rwork, process_recv_sockets);
+			init_waitqueue_head(&othercon->shutdown_wait);
 			set_bit(CF_IS_OTHERCON, &othercon->flags);
+		} else {
+			/* close other sock con if we have something new */
+			close_connection(othercon, false, true, false);
 		}
+
 		mutex_lock_nested(&othercon->sock_mutex, 2);
-		if (!othercon->sock) {
-			newcon->othercon = othercon;
-			add_sock(newsock, othercon);
-			addcon = othercon;
-			mutex_unlock(&othercon->sock_mutex);
-		}
-		else {
-			printk("Extra connection from node %d attempted\n", nodeid);
-			result = -EAGAIN;
-			mutex_unlock(&othercon->sock_mutex);
-			mutex_unlock(&newcon->sock_mutex);
-			goto accept_err;
-		}
+		newcon->othercon = othercon;
+		add_sock(newsock, othercon);
+		addcon = othercon;
+		mutex_unlock(&othercon->sock_mutex);
 	}
 	else {
 		newcon->rx_action = receive_from_sock;
@@ -914,6 +968,7 @@
 	int result;
 	int addr_len;
 	struct socket *sock;
+	unsigned int mark;
 
 	if (con->nodeid == 0) {
 		log_print("attempt to connect sock 0 foiled");
@@ -944,6 +999,13 @@
 	if (result < 0)
 		goto socket_err;
 
+	/* set skb mark */
+	result = dlm_comm_mark(con->nodeid, &mark);
+	if (result < 0)
+		goto bind_err;
+
+	sock_set_mark(sock->sk, mark);
+
 	con->rx_action = receive_from_sock;
 	con->connect_action = sctp_connect_to_sock;
 	add_sock(sock, con);
@@ -1006,6 +1068,7 @@
 	struct sockaddr_storage saddr, src_addr;
 	int addr_len;
 	struct socket *sock = NULL;
+	unsigned int mark;
 	int result;
 
 	if (con->nodeid == 0) {
@@ -1027,6 +1090,13 @@
 	if (result < 0)
 		goto out_err;
 
+	/* set skb mark */
+	result = dlm_comm_mark(con->nodeid, &mark);
+	if (result < 0)
+		goto out_err;
+
+	sock_set_mark(sock->sk, mark);
+
 	memset(&saddr, 0, sizeof(saddr));
 	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
 	if (result < 0) {
@@ -1036,6 +1106,7 @@
 
 	con->rx_action = receive_from_sock;
 	con->connect_action = tcp_connect_to_sock;
+	con->shutdown_action = dlm_tcp_shutdown;
 	add_sock(sock, con);
 
 	/* Bind to our cluster-known address connecting to avoid
@@ -1111,6 +1182,8 @@
 		goto create_out;
 	}
 
+	sock_set_mark(sock->sk, dlm_config.ci_mark);
+
 	/* Turn off Nagle's algorithm */
 	tcp_sock_set_nodelay(sock->sk);
 
@@ -1185,6 +1258,7 @@
 	}
 
 	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+	sock_set_mark(sock->sk, dlm_config.ci_mark);
 	sctp_sock_set_nodelay(sock->sk);
 
 	write_lock_bh(&sock->sk->sk_callback_lock);
@@ -1396,7 +1470,7 @@
 
 send_error:
 	mutex_unlock(&con->sock_mutex);
-	close_connection(con, true, false, true);
+	close_connection(con, false, false, true);
 	/* Requeue the send work. When the work daemon runs again, it will try
 	   a new connection, then call this function again. */
 	queue_work(send_workqueue, &con->swork);
@@ -1528,6 +1602,12 @@
 	_stop_conn(con, true);
 }
 
+static void shutdown_conn(struct connection *con)
+{
+	if (con->shutdown_action)
+		con->shutdown_action(con);
+}
+
 static void free_conn(struct connection *con)
 {
 	close_connection(con, true, true, true);
@@ -1579,6 +1659,7 @@
 	mutex_lock(&connections_lock);
 	dlm_allow_conn = 0;
 	mutex_unlock(&connections_lock);
+	foreach_conn(shutdown_conn);
 	work_flush();
 	clean_writequeues();
 	foreach_conn(free_conn);
diff --git a/include/net/sock.h b/include/net/sock.h
index 497c64f..064637d 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -2696,6 +2696,7 @@
 void sock_set_keepalive(struct sock *sk);
 void sock_set_priority(struct sock *sk, u32 priority);
 void sock_set_rcvbuf(struct sock *sk, int val);
+void sock_set_mark(struct sock *sk, u32 val);
 void sock_set_reuseaddr(struct sock *sk);
 void sock_set_reuseport(struct sock *sk);
 void sock_set_sndtimeo(struct sock *sk, s64 secs);
diff --git a/net/core/sock.c b/net/core/sock.c
index 49cd5ff..d29709e 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -820,6 +820,14 @@
 }
 EXPORT_SYMBOL(sock_set_rcvbuf);
 
+void sock_set_mark(struct sock *sk, u32 val)
+{
+	lock_sock(sk);
+	sk->sk_mark = val;
+	release_sock(sk);
+}
+EXPORT_SYMBOL(sock_set_mark);
+
 /*
  *	This is meant for all protocols to use and covers goings on
  *	at the socket level. Everything here is generic.