ceph: take the inode lock before acquiring cap refs

Most of the time, we (or the vfs layer) takes the inode_lock and then
acquires caps, but ceph_read_iter does the opposite, and that can lead
to a deadlock.

When there are multiple clients treading over the same data, we can end
up in a situation where a reader takes caps and then tries to acquire
the inode_lock. Another task holds the inode_lock and issues a request
to the MDS which needs to revoke the caps, but that can't happen until
the inode_lock is unwedged.

Fix this by having ceph_read_iter take the inode_lock earlier, before
attempting to acquire caps.

Fixes: 321fe13c9398 ("ceph: add buffered/direct exclusionary locking for reads and writes")
Link: https://tracker.ceph.com/issues/36348
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index bd77adb..06efeaf 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1264,14 +1264,24 @@
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
 	     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
 
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_start_io_direct(inode);
+	else
+		ceph_start_io_read(inode);
+
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
 	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
 			    &got, &pinned_page);
-	if (ret < 0)
+	if (ret < 0) {
+		if (iocb->ki_flags & IOCB_DIRECT)
+			ceph_end_io_direct(inode);
+		else
+			ceph_end_io_read(inode);
 		return ret;
+	}
 
 	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_flags & IOCB_DIRECT) ||
@@ -1283,16 +1293,12 @@
 
 		if (ci->i_inline_version == CEPH_INLINE_NONE) {
 			if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
-				ceph_start_io_direct(inode);
 				ret = ceph_direct_read_write(iocb, to,
 							     NULL, NULL);
-				ceph_end_io_direct(inode);
 				if (ret >= 0 && ret < len)
 					retry_op = CHECK_EOF;
 			} else {
-				ceph_start_io_read(inode);
 				ret = ceph_sync_read(iocb, to, &retry_op);
-				ceph_end_io_read(inode);
 			}
 		} else {
 			retry_op = READ_INLINE;
@@ -1303,11 +1309,10 @@
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
 		ceph_add_rw_context(fi, &rw_ctx);
-		ceph_start_io_read(inode);
 		ret = generic_file_read_iter(iocb, to);
-		ceph_end_io_read(inode);
 		ceph_del_rw_context(fi, &rw_ctx);
 	}
+
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
 	if (pinned_page) {
@@ -1315,6 +1320,12 @@
 		pinned_page = NULL;
 	}
 	ceph_put_cap_refs(ci, got);
+
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_end_io_direct(inode);
+	else
+		ceph_end_io_read(inode);
+
 	if (retry_op > HAVE_RETRIED && ret >= 0) {
 		int statret;
 		struct page *page = NULL;