Merge tag 'afs-fixes-20201016' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

Pull afs updates from David Howells:
 "A collection of fixes to fix afs_cell struct refcounting, thereby
  fixing a slew of related syzbot bugs:

   - Fix the cell tree in the netns to use an rwsem rather than RCU.

     There seem to be some problems deriving from the use of RCU and a
     seqlock to walk the rbtree, but it's not entirely clear what since
     there are several different failures being seen.

     Changing things to use an rwsem instead makes it more robust. The
     extra performance derived from using RCU isn't necessary in this
     case since the only time we're looking up a cell is during mount or
     when cells are being manually added.

   - Fix the refcounting by splitting the usage counter into a memory
     refcount and an active users counter. The usage counter was doing
     double duty, keeping track of whether a cell is still in use and
     keeping track of when it needs to be destroyed - but this makes the
     clean up tricky. Separating these out simplifies the logic.

   - Fix purging a cell that has an alias. A cell alias pins the cell
     it's an alias of, but the alias is always later in the list. Trying
     to purge in a single pass causes rmmod to hang in such a case.

   - Fix cell removal. If a cell's manager is requeued whilst it's
     removing itself, the manager will run again and re-remove itself,
     causing problems in various places. Follow Hillf Danton's
     suggestion to insert a more terminal state that causes the manager
     to do nothing post-removal.

  In additional to the above, two other changes:

   - Add a tracepoint for the cell refcount and active users count. This
     helped with debugging the above and may be useful again in future.

   - Downgrade an assertion to a print when a still-active server is
     seen during purging. This was happening as a consequence of
     incomplete cell removal before the servers were cleaned up"

* tag 'afs-fixes-20201016' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs:
  afs: Don't assert on unpurgeable server records
  afs: Add tracing for cell refcount and active user count
  afs: Fix cell removal
  afs: Fix cell purging with aliases
  afs: Fix cell refcounting by splitting the usage counter
  afs: Fix rapid cell addition/removal by not using RCU on cells tree
diff --git a/fs/afs/cell.c b/fs/afs/cell.c
index 5b79cdc..52233fa 100644
--- a/fs/afs/cell.c
+++ b/fs/afs/cell.c
@@ -18,8 +18,10 @@
 static unsigned __read_mostly afs_cell_gc_delay = 10;
 static unsigned __read_mostly afs_cell_min_ttl = 10 * 60;
 static unsigned __read_mostly afs_cell_max_ttl = 24 * 60 * 60;
+static atomic_t cell_debug_id;
 
-static void afs_manage_cell(struct work_struct *);
+static void afs_queue_cell_manager(struct afs_net *);
+static void afs_manage_cell_work(struct work_struct *);
 
 static void afs_dec_cells_outstanding(struct afs_net *net)
 {
@@ -37,19 +39,22 @@
 		atomic_inc(&net->cells_outstanding);
 		if (timer_reduce(&net->cells_timer, jiffies + delay * HZ))
 			afs_dec_cells_outstanding(net);
+	} else {
+		afs_queue_cell_manager(net);
 	}
 }
 
 /*
- * Look up and get an activation reference on a cell record under RCU
- * conditions.  The caller must hold the RCU read lock.
+ * Look up and get an activation reference on a cell record.  The caller must
+ * hold net->cells_lock at least read-locked.
  */
-struct afs_cell *afs_lookup_cell_rcu(struct afs_net *net,
-				     const char *name, unsigned int namesz)
+static struct afs_cell *afs_find_cell_locked(struct afs_net *net,
+					     const char *name, unsigned int namesz,
+					     enum afs_cell_trace reason)
 {
 	struct afs_cell *cell = NULL;
 	struct rb_node *p;
-	int n, seq = 0, ret = 0;
+	int n;
 
 	_enter("%*.*s", namesz, namesz, name);
 
@@ -58,61 +63,48 @@
 	if (namesz > AFS_MAXCELLNAME)
 		return ERR_PTR(-ENAMETOOLONG);
 
-	do {
-		/* Unfortunately, rbtree walking doesn't give reliable results
-		 * under just the RCU read lock, so we have to check for
-		 * changes.
-		 */
-		if (cell)
-			afs_put_cell(net, cell);
-		cell = NULL;
-		ret = -ENOENT;
+	if (!name) {
+		cell = net->ws_cell;
+		if (!cell)
+			return ERR_PTR(-EDESTADDRREQ);
+		goto found;
+	}
 
-		read_seqbegin_or_lock(&net->cells_lock, &seq);
+	p = net->cells.rb_node;
+	while (p) {
+		cell = rb_entry(p, struct afs_cell, net_node);
 
-		if (!name) {
-			cell = rcu_dereference_raw(net->ws_cell);
-			if (cell) {
-				afs_get_cell(cell);
-				ret = 0;
-				break;
-			}
-			ret = -EDESTADDRREQ;
-			continue;
-		}
+		n = strncasecmp(cell->name, name,
+				min_t(size_t, cell->name_len, namesz));
+		if (n == 0)
+			n = cell->name_len - namesz;
+		if (n < 0)
+			p = p->rb_left;
+		else if (n > 0)
+			p = p->rb_right;
+		else
+			goto found;
+	}
 
-		p = rcu_dereference_raw(net->cells.rb_node);
-		while (p) {
-			cell = rb_entry(p, struct afs_cell, net_node);
+	return ERR_PTR(-ENOENT);
 
-			n = strncasecmp(cell->name, name,
-					min_t(size_t, cell->name_len, namesz));
-			if (n == 0)
-				n = cell->name_len - namesz;
-			if (n < 0) {
-				p = rcu_dereference_raw(p->rb_left);
-			} else if (n > 0) {
-				p = rcu_dereference_raw(p->rb_right);
-			} else {
-				if (atomic_inc_not_zero(&cell->usage)) {
-					ret = 0;
-					break;
-				}
-				/* We want to repeat the search, this time with
-				 * the lock properly locked.
-				 */
-			}
-			cell = NULL;
-		}
+found:
+	return afs_use_cell(cell, reason);
+}
 
-	} while (need_seqretry(&net->cells_lock, seq));
+/*
+ * Look up and get an activation reference on a cell record.
+ */
+struct afs_cell *afs_find_cell(struct afs_net *net,
+			       const char *name, unsigned int namesz,
+			       enum afs_cell_trace reason)
+{
+	struct afs_cell *cell;
 
-	done_seqretry(&net->cells_lock, seq);
-
-	if (ret != 0 && cell)
-		afs_put_cell(net, cell);
-
-	return ret == 0 ? cell : ERR_PTR(ret);
+	down_read(&net->cells_lock);
+	cell = afs_find_cell_locked(net, name, namesz, reason);
+	up_read(&net->cells_lock);
+	return cell;
 }
 
 /*
@@ -166,8 +158,9 @@
 		cell->name[i] = tolower(name[i]);
 	cell->name[i] = 0;
 
-	atomic_set(&cell->usage, 2);
-	INIT_WORK(&cell->manager, afs_manage_cell);
+	atomic_set(&cell->ref, 1);
+	atomic_set(&cell->active, 0);
+	INIT_WORK(&cell->manager, afs_manage_cell_work);
 	cell->volumes = RB_ROOT;
 	INIT_HLIST_HEAD(&cell->proc_volumes);
 	seqlock_init(&cell->volume_lock);
@@ -206,6 +199,9 @@
 	cell->dns_source = vllist->source;
 	cell->dns_status = vllist->status;
 	smp_store_release(&cell->dns_lookup_count, 1); /* vs source/status */
+	atomic_inc(&net->cells_outstanding);
+	cell->debug_id = atomic_inc_return(&cell_debug_id);
+	trace_afs_cell(cell->debug_id, 1, 0, afs_cell_trace_alloc);
 
 	_leave(" = %p", cell);
 	return cell;
@@ -245,9 +241,7 @@
 	_enter("%s,%s", name, vllist);
 
 	if (!excl) {
-		rcu_read_lock();
-		cell = afs_lookup_cell_rcu(net, name, namesz);
-		rcu_read_unlock();
+		cell = afs_find_cell(net, name, namesz, afs_cell_trace_use_lookup);
 		if (!IS_ERR(cell))
 			goto wait_for_cell;
 	}
@@ -268,7 +262,7 @@
 	/* Find the insertion point and check to see if someone else added a
 	 * cell whilst we were allocating.
 	 */
-	write_seqlock(&net->cells_lock);
+	down_write(&net->cells_lock);
 
 	pp = &net->cells.rb_node;
 	parent = NULL;
@@ -290,23 +284,26 @@
 
 	cell = candidate;
 	candidate = NULL;
+	atomic_set(&cell->active, 2);
+	trace_afs_cell(cell->debug_id, atomic_read(&cell->ref), 2, afs_cell_trace_insert);
 	rb_link_node_rcu(&cell->net_node, parent, pp);
 	rb_insert_color(&cell->net_node, &net->cells);
-	atomic_inc(&net->cells_outstanding);
-	write_sequnlock(&net->cells_lock);
+	up_write(&net->cells_lock);
 
-	queue_work(afs_wq, &cell->manager);
+	afs_queue_cell(cell, afs_cell_trace_get_queue_new);
 
 wait_for_cell:
+	trace_afs_cell(cell->debug_id, atomic_read(&cell->ref), atomic_read(&cell->active),
+		       afs_cell_trace_wait);
 	_debug("wait_for_cell");
 	wait_var_event(&cell->state,
 		       ({
 			       state = smp_load_acquire(&cell->state); /* vs error */
-			       state == AFS_CELL_ACTIVE || state == AFS_CELL_FAILED;
+			       state == AFS_CELL_ACTIVE || state == AFS_CELL_REMOVED;
 		       }));
 
 	/* Check the state obtained from the wait check. */
-	if (state == AFS_CELL_FAILED) {
+	if (state == AFS_CELL_REMOVED) {
 		ret = cell->error;
 		goto error;
 	}
@@ -320,16 +317,17 @@
 	if (excl) {
 		ret = -EEXIST;
 	} else {
-		afs_get_cell(cursor);
+		afs_use_cell(cursor, afs_cell_trace_use_lookup);
 		ret = 0;
 	}
-	write_sequnlock(&net->cells_lock);
-	kfree(candidate);
+	up_write(&net->cells_lock);
+	if (candidate)
+		afs_put_cell(candidate, afs_cell_trace_put_candidate);
 	if (ret == 0)
 		goto wait_for_cell;
 	goto error_noput;
 error:
-	afs_put_cell(net, cell);
+	afs_unuse_cell(net, cell, afs_cell_trace_unuse_lookup);
 error_noput:
 	_leave(" = %d [error]", ret);
 	return ERR_PTR(ret);
@@ -374,15 +372,16 @@
 	}
 
 	if (!test_and_set_bit(AFS_CELL_FL_NO_GC, &new_root->flags))
-		afs_get_cell(new_root);
+		afs_use_cell(new_root, afs_cell_trace_use_pin);
 
 	/* install the new cell */
-	write_seqlock(&net->cells_lock);
-	old_root = rcu_access_pointer(net->ws_cell);
-	rcu_assign_pointer(net->ws_cell, new_root);
-	write_sequnlock(&net->cells_lock);
+	down_write(&net->cells_lock);
+	afs_see_cell(new_root, afs_cell_trace_see_ws);
+	old_root = net->ws_cell;
+	net->ws_cell = new_root;
+	up_write(&net->cells_lock);
 
-	afs_put_cell(net, old_root);
+	afs_unuse_cell(net, old_root, afs_cell_trace_unuse_ws);
 	_leave(" = 0");
 	return 0;
 }
@@ -488,18 +487,22 @@
 static void afs_cell_destroy(struct rcu_head *rcu)
 {
 	struct afs_cell *cell = container_of(rcu, struct afs_cell, rcu);
+	struct afs_net *net = cell->net;
+	int u;
 
 	_enter("%p{%s}", cell, cell->name);
 
-	ASSERTCMP(atomic_read(&cell->usage), ==, 0);
+	u = atomic_read(&cell->ref);
+	ASSERTCMP(u, ==, 0);
+	trace_afs_cell(cell->debug_id, u, atomic_read(&cell->active), afs_cell_trace_free);
 
-	afs_put_volume(cell->net, cell->root_volume, afs_volume_trace_put_cell_root);
-	afs_put_vlserverlist(cell->net, rcu_access_pointer(cell->vl_servers));
-	afs_put_cell(cell->net, cell->alias_of);
+	afs_put_vlserverlist(net, rcu_access_pointer(cell->vl_servers));
+	afs_unuse_cell(net, cell->alias_of, afs_cell_trace_unuse_alias);
 	key_put(cell->anonymous_key);
 	kfree(cell->name);
 	kfree(cell);
 
+	afs_dec_cells_outstanding(net);
 	_leave(" [destroyed]");
 }
 
@@ -532,18 +535,63 @@
 /*
  * Get a reference on a cell record.
  */
-struct afs_cell *afs_get_cell(struct afs_cell *cell)
+struct afs_cell *afs_get_cell(struct afs_cell *cell, enum afs_cell_trace reason)
 {
-	atomic_inc(&cell->usage);
+	int u;
+
+	if (atomic_read(&cell->ref) <= 0)
+		BUG();
+
+	u = atomic_inc_return(&cell->ref);
+	trace_afs_cell(cell->debug_id, u, atomic_read(&cell->active), reason);
 	return cell;
 }
 
 /*
  * Drop a reference on a cell record.
  */
-void afs_put_cell(struct afs_net *net, struct afs_cell *cell)
+void afs_put_cell(struct afs_cell *cell, enum afs_cell_trace reason)
 {
+	if (cell) {
+		unsigned int debug_id = cell->debug_id;
+		unsigned int u, a;
+
+		a = atomic_read(&cell->active);
+		u = atomic_dec_return(&cell->ref);
+		trace_afs_cell(debug_id, u, a, reason);
+		if (u == 0) {
+			a = atomic_read(&cell->active);
+			WARN(a != 0, "Cell active count %u > 0\n", a);
+			call_rcu(&cell->rcu, afs_cell_destroy);
+		}
+	}
+}
+
+/*
+ * Note a cell becoming more active.
+ */
+struct afs_cell *afs_use_cell(struct afs_cell *cell, enum afs_cell_trace reason)
+{
+	int u, a;
+
+	if (atomic_read(&cell->ref) <= 0)
+		BUG();
+
+	u = atomic_read(&cell->ref);
+	a = atomic_inc_return(&cell->active);
+	trace_afs_cell(cell->debug_id, u, a, reason);
+	return cell;
+}
+
+/*
+ * Record a cell becoming less active.  When the active counter reaches 1, it
+ * is scheduled for destruction, but may get reactivated.
+ */
+void afs_unuse_cell(struct afs_net *net, struct afs_cell *cell, enum afs_cell_trace reason)
+{
+	unsigned int debug_id = cell->debug_id;
 	time64_t now, expire_delay;
+	int u, a;
 
 	if (!cell)
 		return;
@@ -556,11 +604,35 @@
 	if (cell->vl_servers->nr_servers)
 		expire_delay = afs_cell_gc_delay;
 
-	if (atomic_dec_return(&cell->usage) > 1)
-		return;
+	u = atomic_read(&cell->ref);
+	a = atomic_dec_return(&cell->active);
+	trace_afs_cell(debug_id, u, a, reason);
+	WARN_ON(a == 0);
+	if (a == 1)
+		/* 'cell' may now be garbage collected. */
+		afs_set_cell_timer(net, expire_delay);
+}
 
-	/* 'cell' may now be garbage collected. */
-	afs_set_cell_timer(net, expire_delay);
+/*
+ * Note that a cell has been seen.
+ */
+void afs_see_cell(struct afs_cell *cell, enum afs_cell_trace reason)
+{
+	int u, a;
+
+	u = atomic_read(&cell->ref);
+	a = atomic_read(&cell->active);
+	trace_afs_cell(cell->debug_id, u, a, reason);
+}
+
+/*
+ * Queue a cell for management, giving the workqueue a ref to hold.
+ */
+void afs_queue_cell(struct afs_cell *cell, enum afs_cell_trace reason)
+{
+	afs_get_cell(cell, reason);
+	if (!queue_work(afs_wq, &cell->manager))
+		afs_put_cell(cell, afs_cell_trace_put_queue_fail);
 }
 
 /*
@@ -660,12 +732,10 @@
  * Manage a cell record, initialising and destroying it, maintaining its DNS
  * records.
  */
-static void afs_manage_cell(struct work_struct *work)
+static void afs_manage_cell(struct afs_cell *cell)
 {
-	struct afs_cell *cell = container_of(work, struct afs_cell, manager);
 	struct afs_net *net = cell->net;
-	bool deleted;
-	int ret, usage;
+	int ret, active;
 
 	_enter("%s", cell->name);
 
@@ -674,14 +744,19 @@
 	switch (cell->state) {
 	case AFS_CELL_INACTIVE:
 	case AFS_CELL_FAILED:
-		write_seqlock(&net->cells_lock);
-		usage = 1;
-		deleted = atomic_try_cmpxchg_relaxed(&cell->usage, &usage, 0);
-		if (deleted)
+		down_write(&net->cells_lock);
+		active = 1;
+		if (atomic_try_cmpxchg_relaxed(&cell->active, &active, 0)) {
 			rb_erase(&cell->net_node, &net->cells);
-		write_sequnlock(&net->cells_lock);
-		if (deleted)
+			trace_afs_cell(cell->debug_id, atomic_read(&cell->ref), 0,
+				       afs_cell_trace_unuse_delete);
+			smp_store_release(&cell->state, AFS_CELL_REMOVED);
+		}
+		up_write(&net->cells_lock);
+		if (cell->state == AFS_CELL_REMOVED) {
+			wake_up_var(&cell->state);
 			goto final_destruction;
+		}
 		if (cell->state == AFS_CELL_FAILED)
 			goto done;
 		smp_store_release(&cell->state, AFS_CELL_UNSET);
@@ -703,7 +778,7 @@
 		goto again;
 
 	case AFS_CELL_ACTIVE:
-		if (atomic_read(&cell->usage) > 1) {
+		if (atomic_read(&cell->active) > 1) {
 			if (test_and_clear_bit(AFS_CELL_FL_DO_LOOKUP, &cell->flags)) {
 				ret = afs_update_cell(cell);
 				if (ret < 0)
@@ -716,13 +791,16 @@
 		goto again;
 
 	case AFS_CELL_DEACTIVATING:
-		if (atomic_read(&cell->usage) > 1)
+		if (atomic_read(&cell->active) > 1)
 			goto reverse_deactivation;
 		afs_deactivate_cell(net, cell);
 		smp_store_release(&cell->state, AFS_CELL_INACTIVE);
 		wake_up_var(&cell->state);
 		goto again;
 
+	case AFS_CELL_REMOVED:
+		goto done;
+
 	default:
 		break;
 	}
@@ -748,9 +826,18 @@
 	return;
 
 final_destruction:
-	call_rcu(&cell->rcu, afs_cell_destroy);
-	afs_dec_cells_outstanding(net);
-	_leave(" [destruct %d]", atomic_read(&net->cells_outstanding));
+	/* The root volume is pinning the cell */
+	afs_put_volume(cell->net, cell->root_volume, afs_volume_trace_put_cell_root);
+	cell->root_volume = NULL;
+	afs_put_cell(cell, afs_cell_trace_put_destroy);
+}
+
+static void afs_manage_cell_work(struct work_struct *work)
+{
+	struct afs_cell *cell = container_of(work, struct afs_cell, manager);
+
+	afs_manage_cell(cell);
+	afs_put_cell(cell, afs_cell_trace_put_queue_work);
 }
 
 /*
@@ -779,26 +866,29 @@
 	 * lack of use and cells whose DNS results have expired and dispatch
 	 * their managers.
 	 */
-	read_seqlock_excl(&net->cells_lock);
+	down_read(&net->cells_lock);
 
 	for (cursor = rb_first(&net->cells); cursor; cursor = rb_next(cursor)) {
 		struct afs_cell *cell =
 			rb_entry(cursor, struct afs_cell, net_node);
-		unsigned usage;
+		unsigned active;
 		bool sched_cell = false;
 
-		usage = atomic_read(&cell->usage);
-		_debug("manage %s %u", cell->name, usage);
+		active = atomic_read(&cell->active);
+		trace_afs_cell(cell->debug_id, atomic_read(&cell->ref),
+			       active, afs_cell_trace_manage);
 
-		ASSERTCMP(usage, >=, 1);
+		ASSERTCMP(active, >=, 1);
 
 		if (purging) {
-			if (test_and_clear_bit(AFS_CELL_FL_NO_GC, &cell->flags))
-				usage = atomic_dec_return(&cell->usage);
-			ASSERTCMP(usage, ==, 1);
+			if (test_and_clear_bit(AFS_CELL_FL_NO_GC, &cell->flags)) {
+				active = atomic_dec_return(&cell->active);
+				trace_afs_cell(cell->debug_id, atomic_read(&cell->ref),
+					       active, afs_cell_trace_unuse_pin);
+			}
 		}
 
-		if (usage == 1) {
+		if (active == 1) {
 			struct afs_vlserver_list *vllist;
 			time64_t expire_at = cell->last_inactive;
 
@@ -821,10 +911,10 @@
 		}
 
 		if (sched_cell)
-			queue_work(afs_wq, &cell->manager);
+			afs_queue_cell(cell, afs_cell_trace_get_queue_manage);
 	}
 
-	read_sequnlock_excl(&net->cells_lock);
+	up_read(&net->cells_lock);
 
 	/* Update the timer on the way out.  We have to pass an increment on
 	 * cells_outstanding in the namespace that we are in to the timer or
@@ -854,11 +944,11 @@
 
 	_enter("");
 
-	write_seqlock(&net->cells_lock);
-	ws = rcu_access_pointer(net->ws_cell);
-	RCU_INIT_POINTER(net->ws_cell, NULL);
-	write_sequnlock(&net->cells_lock);
-	afs_put_cell(net, ws);
+	down_write(&net->cells_lock);
+	ws = net->ws_cell;
+	net->ws_cell = NULL;
+	up_write(&net->cells_lock);
+	afs_unuse_cell(net, ws, afs_cell_trace_unuse_ws);
 
 	_debug("del timer");
 	if (del_timer_sync(&net->cells_timer))
diff --git a/fs/afs/dynroot.c b/fs/afs/dynroot.c
index 7b784af..db832cc 100644
--- a/fs/afs/dynroot.c
+++ b/fs/afs/dynroot.c
@@ -123,9 +123,9 @@
 		len--;
 	}
 
-	cell = afs_lookup_cell_rcu(net, name, len);
+	cell = afs_find_cell(net, name, len, afs_cell_trace_use_probe);
 	if (!IS_ERR(cell)) {
-		afs_put_cell(net, cell);
+		afs_unuse_cell(net, cell, afs_cell_trace_unuse_probe);
 		return 0;
 	}
 
@@ -179,7 +179,6 @@
 	struct afs_cell *cell;
 	struct afs_net *net = afs_d2net(dentry);
 	struct dentry *ret;
-	unsigned int seq = 0;
 	char *name;
 	int len;
 
@@ -191,17 +190,13 @@
 	if (!name)
 		goto out_p;
 
-	rcu_read_lock();
-	do {
-		read_seqbegin_or_lock(&net->cells_lock, &seq);
-		cell = rcu_dereference_raw(net->ws_cell);
-		if (cell) {
-			len = cell->name_len;
-			memcpy(name, cell->name, len + 1);
-		}
-	} while (need_seqretry(&net->cells_lock, seq));
-	done_seqretry(&net->cells_lock, seq);
-	rcu_read_unlock();
+	down_read(&net->cells_lock);
+	cell = net->ws_cell;
+	if (cell) {
+		len = cell->name_len;
+		memcpy(name, cell->name, len + 1);
+	}
+	up_read(&net->cells_lock);
 
 	ret = ERR_PTR(-ENOENT);
 	if (!cell)
diff --git a/fs/afs/internal.h b/fs/afs/internal.h
index e5f0446..81b0485 100644
--- a/fs/afs/internal.h
+++ b/fs/afs/internal.h
@@ -263,11 +263,11 @@
 
 	/* Cell database */
 	struct rb_root		cells;
-	struct afs_cell __rcu	*ws_cell;
+	struct afs_cell		*ws_cell;
 	struct work_struct	cells_manager;
 	struct timer_list	cells_timer;
 	atomic_t		cells_outstanding;
-	seqlock_t		cells_lock;
+	struct rw_semaphore	cells_lock;
 	struct mutex		cells_alias_lock;
 
 	struct mutex		proc_cells_lock;
@@ -326,6 +326,7 @@
 	AFS_CELL_DEACTIVATING,
 	AFS_CELL_INACTIVE,
 	AFS_CELL_FAILED,
+	AFS_CELL_REMOVED,
 };
 
 /*
@@ -363,7 +364,8 @@
 #endif
 	time64_t		dns_expiry;	/* Time AFSDB/SRV record expires */
 	time64_t		last_inactive;	/* Time of last drop of usage count */
-	atomic_t		usage;
+	atomic_t		ref;		/* Struct refcount */
+	atomic_t		active;		/* Active usage counter */
 	unsigned long		flags;
 #define AFS_CELL_FL_NO_GC	0		/* The cell was added manually, don't auto-gc */
 #define AFS_CELL_FL_DO_LOOKUP	1		/* DNS lookup requested */
@@ -373,6 +375,7 @@
 	enum dns_record_source	dns_source:8;	/* Latest source of data from lookup */
 	enum dns_lookup_status	dns_status:8;	/* Latest status of data from lookup */
 	unsigned int		dns_lookup_count; /* Counter of DNS lookups */
+	unsigned int		debug_id;
 
 	/* The volumes belonging to this cell */
 	struct rb_root		volumes;	/* Tree of volumes on this server */
@@ -917,11 +920,16 @@
  * cell.c
  */
 extern int afs_cell_init(struct afs_net *, const char *);
-extern struct afs_cell *afs_lookup_cell_rcu(struct afs_net *, const char *, unsigned);
+extern struct afs_cell *afs_find_cell(struct afs_net *, const char *, unsigned,
+				      enum afs_cell_trace);
 extern struct afs_cell *afs_lookup_cell(struct afs_net *, const char *, unsigned,
 					const char *, bool);
-extern struct afs_cell *afs_get_cell(struct afs_cell *);
-extern void afs_put_cell(struct afs_net *, struct afs_cell *);
+extern struct afs_cell *afs_use_cell(struct afs_cell *, enum afs_cell_trace);
+extern void afs_unuse_cell(struct afs_net *, struct afs_cell *, enum afs_cell_trace);
+extern struct afs_cell *afs_get_cell(struct afs_cell *, enum afs_cell_trace);
+extern void afs_see_cell(struct afs_cell *, enum afs_cell_trace);
+extern void afs_put_cell(struct afs_cell *, enum afs_cell_trace);
+extern void afs_queue_cell(struct afs_cell *, enum afs_cell_trace);
 extern void afs_manage_cells(struct work_struct *);
 extern void afs_cells_timer(struct timer_list *);
 extern void __net_exit afs_cell_purge(struct afs_net *);
diff --git a/fs/afs/main.c b/fs/afs/main.c
index 31b472f..accdd89 100644
--- a/fs/afs/main.c
+++ b/fs/afs/main.c
@@ -78,7 +78,7 @@
 	mutex_init(&net->socket_mutex);
 
 	net->cells = RB_ROOT;
-	seqlock_init(&net->cells_lock);
+	init_rwsem(&net->cells_lock);
 	INIT_WORK(&net->cells_manager, afs_manage_cells);
 	timer_setup(&net->cells_timer, afs_cells_timer, 0);
 
diff --git a/fs/afs/mntpt.c b/fs/afs/mntpt.c
index 79bc5f1..052dab2 100644
--- a/fs/afs/mntpt.c
+++ b/fs/afs/mntpt.c
@@ -88,7 +88,7 @@
 		ctx->force = true;
 	}
 	if (ctx->cell) {
-		afs_put_cell(ctx->net, ctx->cell);
+		afs_unuse_cell(ctx->net, ctx->cell, afs_cell_trace_unuse_mntpt);
 		ctx->cell = NULL;
 	}
 	if (test_bit(AFS_VNODE_PSEUDODIR, &vnode->flags)) {
@@ -124,7 +124,7 @@
 		char *buf;
 
 		if (src_as->cell)
-			ctx->cell = afs_get_cell(src_as->cell);
+			ctx->cell = afs_use_cell(src_as->cell, afs_cell_trace_use_mntpt);
 
 		if (size < 2 || size > PAGE_SIZE - 1)
 			return -EINVAL;
diff --git a/fs/afs/proc.c b/fs/afs/proc.c
index e8babb6..065a28b 100644
--- a/fs/afs/proc.c
+++ b/fs/afs/proc.c
@@ -38,7 +38,7 @@
 
 	if (v == SEQ_START_TOKEN) {
 		/* display header on line 1 */
-		seq_puts(m, "USE    TTL SV ST NAME\n");
+		seq_puts(m, "USE ACT    TTL SV ST NAME\n");
 		return 0;
 	}
 
@@ -46,10 +46,11 @@
 	vllist = rcu_dereference(cell->vl_servers);
 
 	/* display one cell per line on subsequent lines */
-	seq_printf(m, "%3u %6lld %2u %2u %s\n",
-		   atomic_read(&cell->usage),
+	seq_printf(m, "%3u %3u %6lld %2u %2u %s\n",
+		   atomic_read(&cell->ref),
+		   atomic_read(&cell->active),
 		   cell->dns_expiry - ktime_get_real_seconds(),
-		   vllist->nr_servers,
+		   vllist ? vllist->nr_servers : 0,
 		   cell->state,
 		   cell->name);
 	return 0;
@@ -128,7 +129,7 @@
 		}
 
 		if (test_and_set_bit(AFS_CELL_FL_NO_GC, &cell->flags))
-			afs_put_cell(net, cell);
+			afs_unuse_cell(net, cell, afs_cell_trace_unuse_no_pin);
 	} else {
 		goto inval;
 	}
@@ -154,13 +155,11 @@
 	struct afs_net *net;
 
 	net = afs_seq2net_single(m);
-	if (rcu_access_pointer(net->ws_cell)) {
-		rcu_read_lock();
-		cell = rcu_dereference(net->ws_cell);
-		if (cell)
-			seq_printf(m, "%s\n", cell->name);
-		rcu_read_unlock();
-	}
+	down_read(&net->cells_lock);
+	cell = net->ws_cell;
+	if (cell)
+		seq_printf(m, "%s\n", cell->name);
+	up_read(&net->cells_lock);
 	return 0;
 }
 
diff --git a/fs/afs/server.c b/fs/afs/server.c
index e82e452..684a2b0 100644
--- a/fs/afs/server.c
+++ b/fs/afs/server.c
@@ -550,7 +550,12 @@
 
 		_debug("manage %pU %u", &server->uuid, active);
 
-		ASSERTIFCMP(purging, active, ==, 0);
+		if (purging) {
+			trace_afs_server(server, atomic_read(&server->ref),
+					 active, afs_server_trace_purging);
+			if (active != 0)
+				pr_notice("Can't purge s=%08x\n", server->debug_id);
+		}
 
 		if (active == 0) {
 			time64_t expire_at = server->unuse_time;
diff --git a/fs/afs/super.c b/fs/afs/super.c
index 3a40ee7..6c5900d 100644
--- a/fs/afs/super.c
+++ b/fs/afs/super.c
@@ -294,7 +294,8 @@
 			       cellnamesz, cellnamesz, cellname ?: "");
 			return PTR_ERR(cell);
 		}
-		afs_put_cell(ctx->net, ctx->cell);
+		afs_unuse_cell(ctx->net, ctx->cell, afs_cell_trace_unuse_parse);
+		afs_see_cell(cell, afs_cell_trace_see_source);
 		ctx->cell = cell;
 	}
 
@@ -389,8 +390,9 @@
 				_debug("switch to alias");
 				key_put(ctx->key);
 				ctx->key = NULL;
-				cell = afs_get_cell(ctx->cell->alias_of);
-				afs_put_cell(ctx->net, ctx->cell);
+				cell = afs_use_cell(ctx->cell->alias_of,
+						    afs_cell_trace_use_fc_alias);
+				afs_unuse_cell(ctx->net, ctx->cell, afs_cell_trace_unuse_fc);
 				ctx->cell = cell;
 				goto reget_key;
 			}
@@ -507,7 +509,7 @@
 		if (ctx->dyn_root) {
 			as->dyn_root = true;
 		} else {
-			as->cell = afs_get_cell(ctx->cell);
+			as->cell = afs_use_cell(ctx->cell, afs_cell_trace_use_sbi);
 			as->volume = afs_get_volume(ctx->volume,
 						    afs_volume_trace_get_alloc_sbi);
 		}
@@ -520,7 +522,7 @@
 	if (as) {
 		struct afs_net *net = afs_net(as->net_ns);
 		afs_put_volume(net, as->volume, afs_volume_trace_put_destroy_sbi);
-		afs_put_cell(net, as->cell);
+		afs_unuse_cell(net, as->cell, afs_cell_trace_unuse_sbi);
 		put_net(as->net_ns);
 		kfree(as);
 	}
@@ -606,7 +608,7 @@
 
 	afs_destroy_sbi(fc->s_fs_info);
 	afs_put_volume(ctx->net, ctx->volume, afs_volume_trace_put_free_fc);
-	afs_put_cell(ctx->net, ctx->cell);
+	afs_unuse_cell(ctx->net, ctx->cell, afs_cell_trace_unuse_fc);
 	key_put(ctx->key);
 	kfree(ctx);
 }
@@ -633,9 +635,7 @@
 	ctx->net = afs_net(fc->net_ns);
 
 	/* Default to the workstation cell. */
-	rcu_read_lock();
-	cell = afs_lookup_cell_rcu(ctx->net, NULL, 0);
-	rcu_read_unlock();
+	cell = afs_find_cell(ctx->net, NULL, 0, afs_cell_trace_use_fc);
 	if (IS_ERR(cell))
 		cell = NULL;
 	ctx->cell = cell;
diff --git a/fs/afs/vl_alias.c b/fs/afs/vl_alias.c
index 5082ef0..f04a80e 100644
--- a/fs/afs/vl_alias.c
+++ b/fs/afs/vl_alias.c
@@ -177,7 +177,7 @@
 
 is_alias:
 	rcu_read_unlock();
-	cell->alias_of = afs_get_cell(p);
+	cell->alias_of = afs_use_cell(p, afs_cell_trace_use_alias);
 	return 1;
 }
 
@@ -247,18 +247,18 @@
 			continue;
 		if (p->root_volume)
 			continue; /* Ignore cells that have a root.cell volume. */
-		afs_get_cell(p);
+		afs_use_cell(p, afs_cell_trace_use_check_alias);
 		mutex_unlock(&cell->net->proc_cells_lock);
 
 		if (afs_query_for_alias_one(cell, key, p) != 0)
 			goto is_alias;
 
 		if (mutex_lock_interruptible(&cell->net->proc_cells_lock) < 0) {
-			afs_put_cell(cell->net, p);
+			afs_unuse_cell(cell->net, p, afs_cell_trace_unuse_check_alias);
 			return -ERESTARTSYS;
 		}
 
-		afs_put_cell(cell->net, p);
+		afs_unuse_cell(cell->net, p, afs_cell_trace_unuse_check_alias);
 	}
 
 	mutex_unlock(&cell->net->proc_cells_lock);
diff --git a/fs/afs/vl_rotate.c b/fs/afs/vl_rotate.c
index c0458c9..488e584 100644
--- a/fs/afs/vl_rotate.c
+++ b/fs/afs/vl_rotate.c
@@ -45,7 +45,7 @@
 	    cell->dns_expiry <= ktime_get_real_seconds()) {
 		dns_lookup_count = smp_load_acquire(&cell->dns_lookup_count);
 		set_bit(AFS_CELL_FL_DO_LOOKUP, &cell->flags);
-		queue_work(afs_wq, &cell->manager);
+		afs_queue_cell(cell, afs_cell_trace_get_queue_dns);
 
 		if (cell->dns_source == DNS_RECORD_UNAVAILABLE) {
 			if (wait_var_event_interruptible(
diff --git a/fs/afs/volume.c b/fs/afs/volume.c
index 9bc0509..f84194b 100644
--- a/fs/afs/volume.c
+++ b/fs/afs/volume.c
@@ -83,7 +83,7 @@
 
 	volume->vid		= vldb->vid[params->type];
 	volume->update_at	= ktime_get_real_seconds() + afs_volume_record_life;
-	volume->cell		= afs_get_cell(params->cell);
+	volume->cell		= afs_get_cell(params->cell, afs_cell_trace_get_vol);
 	volume->type		= params->type;
 	volume->type_force	= params->force;
 	volume->name_len	= vldb->name_len;
@@ -106,7 +106,7 @@
 	return volume;
 
 error_1:
-	afs_put_cell(params->net, volume->cell);
+	afs_put_cell(volume->cell, afs_cell_trace_put_vol);
 	kfree(volume);
 error_0:
 	return ERR_PTR(ret);
@@ -228,7 +228,7 @@
 
 	afs_remove_volume_from_cell(volume);
 	afs_put_serverlist(net, rcu_access_pointer(volume->servers));
-	afs_put_cell(net, volume->cell);
+	afs_put_cell(volume->cell, afs_cell_trace_put_vol);
 	trace_afs_volume(volume->vid, atomic_read(&volume->usage),
 			 afs_volume_trace_free);
 	kfree_rcu(volume, rcu);
diff --git a/include/trace/events/afs.h b/include/trace/events/afs.h
index 5f0c1cf..8eb4923 100644
--- a/include/trace/events/afs.h
+++ b/include/trace/events/afs.h
@@ -40,6 +40,7 @@
 	afs_server_trace_get_new_cbi,
 	afs_server_trace_get_probe,
 	afs_server_trace_give_up_cb,
+	afs_server_trace_purging,
 	afs_server_trace_put_call,
 	afs_server_trace_put_cbi,
 	afs_server_trace_put_find_rsq,
@@ -50,6 +51,7 @@
 	afs_server_trace_update,
 };
 
+
 enum afs_volume_trace {
 	afs_volume_trace_alloc,
 	afs_volume_trace_free,
@@ -67,6 +69,46 @@
 	afs_volume_trace_remove,
 };
 
+enum afs_cell_trace {
+	afs_cell_trace_alloc,
+	afs_cell_trace_free,
+	afs_cell_trace_get_queue_dns,
+	afs_cell_trace_get_queue_manage,
+	afs_cell_trace_get_queue_new,
+	afs_cell_trace_get_vol,
+	afs_cell_trace_insert,
+	afs_cell_trace_manage,
+	afs_cell_trace_put_candidate,
+	afs_cell_trace_put_destroy,
+	afs_cell_trace_put_queue_fail,
+	afs_cell_trace_put_queue_work,
+	afs_cell_trace_put_vol,
+	afs_cell_trace_see_source,
+	afs_cell_trace_see_ws,
+	afs_cell_trace_unuse_alias,
+	afs_cell_trace_unuse_check_alias,
+	afs_cell_trace_unuse_delete,
+	afs_cell_trace_unuse_fc,
+	afs_cell_trace_unuse_lookup,
+	afs_cell_trace_unuse_mntpt,
+	afs_cell_trace_unuse_no_pin,
+	afs_cell_trace_unuse_parse,
+	afs_cell_trace_unuse_pin,
+	afs_cell_trace_unuse_probe,
+	afs_cell_trace_unuse_sbi,
+	afs_cell_trace_unuse_ws,
+	afs_cell_trace_use_alias,
+	afs_cell_trace_use_check_alias,
+	afs_cell_trace_use_fc,
+	afs_cell_trace_use_fc_alias,
+	afs_cell_trace_use_lookup,
+	afs_cell_trace_use_mntpt,
+	afs_cell_trace_use_pin,
+	afs_cell_trace_use_probe,
+	afs_cell_trace_use_sbi,
+	afs_cell_trace_wait,
+};
+
 enum afs_fs_operation {
 	afs_FS_FetchData		= 130,	/* AFS Fetch file data */
 	afs_FS_FetchACL			= 131,	/* AFS Fetch file ACL */
@@ -270,6 +312,7 @@
 	EM(afs_server_trace_get_new_cbi,	"GET cbi  ") \
 	EM(afs_server_trace_get_probe,		"GET probe") \
 	EM(afs_server_trace_give_up_cb,		"giveup-cb") \
+	EM(afs_server_trace_purging,		"PURGE    ") \
 	EM(afs_server_trace_put_call,		"PUT call ") \
 	EM(afs_server_trace_put_cbi,		"PUT cbi  ") \
 	EM(afs_server_trace_put_find_rsq,	"PUT f-rsq") \
@@ -295,6 +338,44 @@
 	EM(afs_volume_trace_put_validate_fc,	"PUT fc-validat") \
 	E_(afs_volume_trace_remove,		"REMOVE        ")
 
+#define afs_cell_traces \
+	EM(afs_cell_trace_alloc,		"ALLOC     ") \
+	EM(afs_cell_trace_free,			"FREE      ") \
+	EM(afs_cell_trace_get_queue_dns,	"GET q-dns ") \
+	EM(afs_cell_trace_get_queue_manage,	"GET q-mng ") \
+	EM(afs_cell_trace_get_queue_new,	"GET q-new ") \
+	EM(afs_cell_trace_get_vol,		"GET vol   ") \
+	EM(afs_cell_trace_insert,		"INSERT    ") \
+	EM(afs_cell_trace_manage,		"MANAGE    ") \
+	EM(afs_cell_trace_put_candidate,	"PUT candid") \
+	EM(afs_cell_trace_put_destroy,		"PUT destry") \
+	EM(afs_cell_trace_put_queue_work,	"PUT q-work") \
+	EM(afs_cell_trace_put_queue_fail,	"PUT q-fail") \
+	EM(afs_cell_trace_put_vol,		"PUT vol   ") \
+	EM(afs_cell_trace_see_source,		"SEE source") \
+	EM(afs_cell_trace_see_ws,		"SEE ws    ") \
+	EM(afs_cell_trace_unuse_alias,		"UNU alias ") \
+	EM(afs_cell_trace_unuse_check_alias,	"UNU chk-al") \
+	EM(afs_cell_trace_unuse_delete,		"UNU delete") \
+	EM(afs_cell_trace_unuse_fc,		"UNU fc    ") \
+	EM(afs_cell_trace_unuse_lookup,		"UNU lookup") \
+	EM(afs_cell_trace_unuse_mntpt,		"UNU mntpt ") \
+	EM(afs_cell_trace_unuse_parse,		"UNU parse ") \
+	EM(afs_cell_trace_unuse_pin,		"UNU pin   ") \
+	EM(afs_cell_trace_unuse_probe,		"UNU probe ") \
+	EM(afs_cell_trace_unuse_sbi,		"UNU sbi   ") \
+	EM(afs_cell_trace_unuse_ws,		"UNU ws    ") \
+	EM(afs_cell_trace_use_alias,		"USE alias ") \
+	EM(afs_cell_trace_use_check_alias,	"USE chk-al") \
+	EM(afs_cell_trace_use_fc,		"USE fc    ") \
+	EM(afs_cell_trace_use_fc_alias,		"USE fc-al ") \
+	EM(afs_cell_trace_use_lookup,		"USE lookup") \
+	EM(afs_cell_trace_use_mntpt,		"USE mntpt ") \
+	EM(afs_cell_trace_use_pin,		"USE pin   ") \
+	EM(afs_cell_trace_use_probe,		"USE probe ") \
+	EM(afs_cell_trace_use_sbi,		"USE sbi   ") \
+	E_(afs_cell_trace_wait,			"WAIT      ")
+
 #define afs_fs_operations \
 	EM(afs_FS_FetchData,			"FS.FetchData") \
 	EM(afs_FS_FetchStatus,			"FS.FetchStatus") \
@@ -483,6 +564,7 @@
 
 afs_call_traces;
 afs_server_traces;
+afs_cell_traces;
 afs_fs_operations;
 afs_vl_operations;
 afs_edit_dir_ops;
@@ -1358,6 +1440,33 @@
 		      __entry->ref)
 	    );
 
+TRACE_EVENT(afs_cell,
+	    TP_PROTO(unsigned int cell_debug_id, int usage, int active,
+		     enum afs_cell_trace reason),
+
+	    TP_ARGS(cell_debug_id, usage, active, reason),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		cell		)
+		    __field(int,			usage		)
+		    __field(int,			active		)
+		    __field(int,			reason		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->cell = cell_debug_id;
+		    __entry->usage = usage;
+		    __entry->active = active;
+		    __entry->reason = reason;
+			   ),
+
+	    TP_printk("L=%08x %s u=%d a=%d",
+		      __entry->cell,
+		      __print_symbolic(__entry->reason, afs_cell_traces),
+		      __entry->usage,
+		      __entry->active)
+	    );
+
 #endif /* _TRACE_AFS_H */
 
 /* This part must be outside protection */