Merge tag 'ceph-for-5.12-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "With netfs helper library and fscache rework delayed, just a few cap
  handling improvements to avoid grabbing mmap_lock in some code paths
  and deal with capsnaps better and a mount option cleanup"

* tag 'ceph-for-5.12-rc1' of git://github.com/ceph/ceph-client:
  ceph: defer flushing the capsnap if the Fb is used
  libceph: remove osdtimeout option entirely
  libceph: deprecate [no]cephx_require_signatures options
  ceph: allow queueing cap/snap handling after putting cap references
  ceph: clean up inode work queueing
  ceph: fix flush_snap logic after putting caps
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 9505529..26e6643 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1662,7 +1662,7 @@
 
 	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
 	     inode, off, len, ceph_cap_string(got), ret);
-	ceph_put_cap_refs(ci, got);
+	ceph_put_cap_refs_async(ci, got);
 out_free:
 	ceph_restore_sigs(&oldset);
 	sb_end_pagefault(inode->i_sb);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 255a512..570731c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -3027,6 +3027,12 @@
 	return 0;
 }
 
+enum put_cap_refs_mode {
+	PUT_CAP_REFS_SYNC = 0,
+	PUT_CAP_REFS_NO_CHECK,
+	PUT_CAP_REFS_ASYNC,
+};
+
 /*
  * Release cap refs.
  *
@@ -3037,10 +3043,11 @@
  * cap_snap, and wake up any waiters.
  */
 static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
-				bool skip_checking_caps)
+				enum put_cap_refs_mode mode)
 {
 	struct inode *inode = &ci->vfs_inode;
 	int last = 0, put = 0, flushsnaps = 0, wake = 0;
+	bool check_flushsnaps = false;
 
 	spin_lock(&ci->i_ceph_lock);
 	if (had & CEPH_CAP_PIN)
@@ -3057,26 +3064,17 @@
 	if (had & CEPH_CAP_FILE_BUFFER) {
 		if (--ci->i_wb_ref == 0) {
 			last++;
+			/* put the ref held by ceph_take_cap_refs() */
 			put++;
+			check_flushsnaps = true;
 		}
 		dout("put_cap_refs %p wb %d -> %d (?)\n",
 		     inode, ci->i_wb_ref+1, ci->i_wb_ref);
 	}
-	if (had & CEPH_CAP_FILE_WR)
+	if (had & CEPH_CAP_FILE_WR) {
 		if (--ci->i_wr_ref == 0) {
 			last++;
-			if (__ceph_have_pending_cap_snap(ci)) {
-				struct ceph_cap_snap *capsnap =
-					list_last_entry(&ci->i_cap_snaps,
-							struct ceph_cap_snap,
-							ci_item);
-				capsnap->writing = 0;
-				if (ceph_try_drop_cap_snap(ci, capsnap))
-					put++;
-				else if (__ceph_finish_cap_snap(ci, capsnap))
-					flushsnaps = 1;
-				wake = 1;
-			}
+			check_flushsnaps = true;
 			if (ci->i_wrbuffer_ref_head == 0 &&
 			    ci->i_dirty_caps == 0 &&
 			    ci->i_flushing_caps == 0) {
@@ -3088,15 +3086,42 @@
 			if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
 				drop_inode_snap_realm(ci);
 		}
+	}
+	if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
+		struct ceph_cap_snap *capsnap =
+			list_last_entry(&ci->i_cap_snaps,
+					struct ceph_cap_snap,
+					ci_item);
+
+		capsnap->writing = 0;
+		if (ceph_try_drop_cap_snap(ci, capsnap))
+			/* put the ref held by ceph_queue_cap_snap() */
+			put++;
+		else if (__ceph_finish_cap_snap(ci, capsnap))
+			flushsnaps = 1;
+		wake = 1;
+	}
 	spin_unlock(&ci->i_ceph_lock);
 
 	dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
 	     last ? " last" : "", put ? " put" : "");
 
-	if (last && !skip_checking_caps)
-		ceph_check_caps(ci, 0, NULL);
-	else if (flushsnaps)
-		ceph_flush_snaps(ci, NULL);
+	switch (mode) {
+	case PUT_CAP_REFS_SYNC:
+		if (last)
+			ceph_check_caps(ci, 0, NULL);
+		else if (flushsnaps)
+			ceph_flush_snaps(ci, NULL);
+		break;
+	case PUT_CAP_REFS_ASYNC:
+		if (last)
+			ceph_queue_check_caps(inode);
+		else if (flushsnaps)
+			ceph_queue_flush_snaps(inode);
+		break;
+	default:
+		break;
+	}
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 	while (put-- > 0)
@@ -3105,12 +3130,17 @@
 
 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 {
-	__ceph_put_cap_refs(ci, had, false);
+	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_SYNC);
+}
+
+void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had)
+{
+	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_ASYNC);
 }
 
 void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
 {
-	__ceph_put_cap_refs(ci, had, true);
+	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_NO_CHECK);
 }
 
 /*
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index adc8fc3..5d20a62 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1816,60 +1816,17 @@
 	}
 }
 
-/*
- * Write back inode data in a worker thread.  (This can't be done
- * in the message handler context.)
- */
-void ceph_queue_writeback(struct inode *inode)
+void ceph_queue_inode_work(struct inode *inode, int work_bit)
 {
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
+	set_bit(work_bit, &ci->i_work_mask);
 
 	ihold(inode);
-	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
-		       &ci->i_work)) {
-		dout("ceph_queue_writeback %p\n", inode);
+	if (queue_work(fsc->inode_wq, &ci->i_work)) {
+		dout("queue_inode_work %p, mask=%lx\n", inode, ci->i_work_mask);
 	} else {
-		dout("ceph_queue_writeback %p already queued, mask=%lx\n",
-		     inode, ci->i_work_mask);
-		iput(inode);
-	}
-}
-
-/*
- * queue an async invalidation
- */
-void ceph_queue_invalidate(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
-
-	ihold(inode);
-	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
-		       &ceph_inode(inode)->i_work)) {
-		dout("ceph_queue_invalidate %p\n", inode);
-	} else {
-		dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
-		     inode, ci->i_work_mask);
-		iput(inode);
-	}
-}
-
-/*
- * Queue an async vmtruncate.  If we fail to queue work, we will handle
- * the truncation the next time we call __ceph_do_pending_vmtruncate.
- */
-void ceph_queue_vmtruncate(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
-
-	ihold(inode);
-	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
-		       &ci->i_work)) {
-		dout("ceph_queue_vmtruncate %p\n", inode);
-	} else {
-		dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
+		dout("queue_inode_work %p already queued, mask=%lx\n",
 		     inode, ci->i_work_mask);
 		iput(inode);
 	}
@@ -2008,6 +1965,12 @@
 	if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
 		__ceph_do_pending_vmtruncate(inode);
 
+	if (test_and_clear_bit(CEPH_I_WORK_CHECK_CAPS, &ci->i_work_mask))
+		ceph_check_caps(ci, 0, NULL);
+
+	if (test_and_clear_bit(CEPH_I_WORK_FLUSH_SNAPS, &ci->i_work_mask))
+		ceph_flush_snaps(ci, NULL);
+
 	iput(inode);
 }
 
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index b611f82..0728b01d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -623,6 +623,16 @@
 		return 0;
 	}
 
+	/* Fb cap still in use, delay it */
+	if (ci->i_wb_ref) {
+		dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
+		     "used WRBUFFER, delaying\n", inode, capsnap,
+		     capsnap->context, capsnap->context->seq,
+		     ceph_cap_string(capsnap->dirty), capsnap->size);
+		capsnap->writing = 1;
+		return 0;
+	}
+
 	ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
 	dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
 	     inode, capsnap, capsnap->context,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b62d8fe..13b0288 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -562,9 +562,11 @@
 /*
  * Masks of ceph inode work.
  */
-#define CEPH_I_WORK_WRITEBACK		0 /* writeback */
-#define CEPH_I_WORK_INVALIDATE_PAGES	1 /* invalidate pages */
-#define CEPH_I_WORK_VMTRUNCATE		2 /* vmtruncate */
+#define CEPH_I_WORK_WRITEBACK		0
+#define CEPH_I_WORK_INVALIDATE_PAGES	1
+#define CEPH_I_WORK_VMTRUNCATE		2
+#define CEPH_I_WORK_CHECK_CAPS		3
+#define CEPH_I_WORK_FLUSH_SNAPS		4
 
 /*
  * We set the ERROR_WRITE bit when we start seeing write errors on an inode
@@ -962,11 +964,36 @@
 
 extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
 extern void __ceph_do_pending_vmtruncate(struct inode *inode);
-extern void ceph_queue_vmtruncate(struct inode *inode);
-extern void ceph_queue_invalidate(struct inode *inode);
-extern void ceph_queue_writeback(struct inode *inode);
+
 extern void ceph_async_iput(struct inode *inode);
 
+void ceph_queue_inode_work(struct inode *inode, int work_bit);
+
+static inline void ceph_queue_vmtruncate(struct inode *inode)
+{
+	ceph_queue_inode_work(inode, CEPH_I_WORK_VMTRUNCATE);
+}
+
+static inline void ceph_queue_invalidate(struct inode *inode)
+{
+	ceph_queue_inode_work(inode, CEPH_I_WORK_INVALIDATE_PAGES);
+}
+
+static inline void ceph_queue_writeback(struct inode *inode)
+{
+	ceph_queue_inode_work(inode, CEPH_I_WORK_WRITEBACK);
+}
+
+static inline void ceph_queue_check_caps(struct inode *inode)
+{
+	ceph_queue_inode_work(inode, CEPH_I_WORK_CHECK_CAPS);
+}
+
+static inline void ceph_queue_flush_snaps(struct inode *inode)
+{
+	ceph_queue_inode_work(inode, CEPH_I_WORK_FLUSH_SNAPS);
+}
+
 extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
 			     int mask, bool force);
 static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
@@ -1105,6 +1132,7 @@
 				bool snap_rwsem_locked);
 extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had);
 extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
 					    int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index eb9008b..409d8c29 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -32,10 +32,9 @@
 #define CEPH_OPT_NOSHARE          (1<<1) /* don't share client with other sbs */
 #define CEPH_OPT_MYIP             (1<<2) /* specified my ip */
 #define CEPH_OPT_NOCRC            (1<<3) /* no data crc on writes (msgr1) */
-#define CEPH_OPT_NOMSGAUTH	  (1<<4) /* don't require msg signing feat */
-#define CEPH_OPT_TCP_NODELAY	  (1<<5) /* TCP_NODELAY on TCP sockets */
-#define CEPH_OPT_NOMSGSIGN	  (1<<6) /* don't sign msgs (msgr1) */
-#define CEPH_OPT_ABORT_ON_FULL	  (1<<7) /* abort w/ ENOSPC when full */
+#define CEPH_OPT_TCP_NODELAY      (1<<4) /* TCP_NODELAY on TCP sockets */
+#define CEPH_OPT_NOMSGSIGN        (1<<5) /* don't sign msgs (msgr1) */
+#define CEPH_OPT_ABORT_ON_FULL    (1<<6) /* abort w/ ENOSPC when full */
 
 #define CEPH_OPT_DEFAULT   (CEPH_OPT_TCP_NODELAY)
 
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 271287c..97d6ea7 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -252,7 +252,6 @@
  * ceph options
  */
 enum {
-	Opt_osdtimeout,
 	Opt_osdkeepalivetimeout,
 	Opt_mount_timeout,
 	Opt_osd_idle_ttl,
@@ -307,7 +306,8 @@
 
 static const struct fs_parameter_spec ceph_parameters[] = {
 	fsparam_flag	("abort_on_full",		Opt_abort_on_full),
-	fsparam_flag_no ("cephx_require_signatures",	Opt_cephx_require_signatures),
+	__fsparam	(NULL, "cephx_require_signatures", Opt_cephx_require_signatures,
+			 fs_param_neg_with_no|fs_param_deprecated, NULL),
 	fsparam_flag_no ("cephx_sign_messages",		Opt_cephx_sign_messages),
 	fsparam_flag_no ("crc",				Opt_crc),
 	fsparam_string	("crush_location",		Opt_crush_location),
@@ -319,8 +319,6 @@
 	fsparam_u32	("osd_idle_ttl",		Opt_osd_idle_ttl),
 	fsparam_u32	("osd_request_timeout",		Opt_osd_request_timeout),
 	fsparam_u32	("osdkeepalive",		Opt_osdkeepalivetimeout),
-	__fsparam	(fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
-			 fs_param_deprecated, NULL),
 	fsparam_enum	("read_from_replica",		Opt_read_from_replica,
 			 ceph_param_read_from_replica),
 	fsparam_enum	("ms_mode",			Opt_ms_mode,
@@ -552,9 +550,6 @@
 		}
 		break;
 
-	case Opt_osdtimeout:
-		warn_plog(&log, "Ignoring osdtimeout");
-		break;
 	case Opt_osdkeepalivetimeout:
 		/* 0 isn't well defined right now, reject it */
 		if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
@@ -596,9 +591,9 @@
 		break;
 	case Opt_cephx_require_signatures:
 		if (!result.negated)
-			opt->flags &= ~CEPH_OPT_NOMSGAUTH;
+			warn_plog(&log, "Ignoring cephx_require_signatures");
 		else
-			opt->flags |= CEPH_OPT_NOMSGAUTH;
+			warn_plog(&log, "Ignoring nocephx_require_signatures, use nocephx_sign_messages");
 		break;
 	case Opt_cephx_sign_messages:
 		if (!result.negated)
@@ -686,8 +681,6 @@
 		seq_puts(m, "noshare,");
 	if (opt->flags & CEPH_OPT_NOCRC)
 		seq_puts(m, "nocrc,");
-	if (opt->flags & CEPH_OPT_NOMSGAUTH)
-		seq_puts(m, "nocephx_require_signatures,");
 	if (opt->flags & CEPH_OPT_NOMSGSIGN)
 		seq_puts(m, "nocephx_sign_messages,");
 	if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
@@ -756,7 +749,7 @@
 	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
 	client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
 
-	if (!ceph_test_opt(client, NOMSGAUTH))
+	if (!ceph_test_opt(client, NOMSGSIGN))
 		client->required_features |= CEPH_FEATURE_MSG_AUTH;
 
 	/* msgr */