From b57a79658b78d76a39aab4ac895c6e8585eb744f Mon Sep 17 00:00:00 2001
From: Julian Elischer <julian@FreeBSD.org>
Date: Thu, 1 Feb 2001 20:51:23 +0000
Subject: [PATCH] Clean up reference counting with relation to queued packets
 and the worklist, and while I'm there, clean up the worklist insertion and
 removal.

Inspired by: Harti Brandt <brandt@fokus.gmd.de>
---
 sys/netgraph/ng_base.c    | 235 +++++++++++++++++---------------------
 sys/netgraph/ng_message.h |  10 +-
 2 files changed, 107 insertions(+), 138 deletions(-)

diff --git a/sys/netgraph/ng_base.c b/sys/netgraph/ng_base.c
index 3e4e174a4e9f..d071f5fb9939 100644
--- a/sys/netgraph/ng_base.c
+++ b/sys/netgraph/ng_base.c
@@ -151,7 +151,7 @@ struct ng_hook ng_deadhook = {
  */
 /* List nodes with unallocated work */
 static TAILQ_HEAD(, ng_node) ng_worklist = TAILQ_HEAD_INITIALIZER(ng_worklist);
-static struct mtx	ng_worklist_mtx;
+static struct mtx	ng_worklist_mtx;   /* MUST LOCK NODE FIRST */
 
 /* List of installed types */
 static LIST_HEAD(, ng_type) ng_typelist;
@@ -703,15 +703,10 @@ ng_rmnode(node_p node, hook_p dummy1, void *dummy2, int dummy3)
 	 * it has no hooks so what's it going to do, bleed on someone?
 	 * Theoretically we came here from a queue entry that was added
 	 * Just before the queue was closed, so it should be empty anyway.
+	 * Also removes us from worklist if needed.
 	 */
 	ng_flush_input_queue(&node->nd_input_queue);
 
-	/*
-	 * Take us off the work queue if we are there.
-	 * We definatly have no work to be done.
-	 */
-	ng_worklist_remove(node);
-
 	/* Ask the type if it has anything to do in this case */
 	if (node->nd_type && node->nd_type->shutdown) {
 		(*node->nd_type->shutdown)(node);
@@ -1699,15 +1694,7 @@ static __inline void	ng_queue_rw(struct ng_queue * ngq,
  * Defined here rather than in netgraph.h because no-one should fiddle
  * with them.
  *
- * The ordering here is important! don't shuffle these. If you add
- * READ_PENDING to the word when it has READ_PENDING already set, you
- * generate a carry into the reader count, this you atomically add a reader,
- * and remove the pending reader count! Similarly for the pending writer
- * flag, adding WRITE_PENDING generates a carry and sets the WRITER_ACTIVE
- * flag, while clearing WRITE_PENDING. When 'SINGLE_THREAD_ONLY' is set, then
- * it is only permitted to do WRITER operations. Reader operations will
- * result in errors.
- * But that "hack" is unnecessary: "cpp" can do the math for us!
+ * The ordering here may be important! don't shuffle these.
  */
 /*-
  Safety Barrier--------+ (adjustable to suit taste) (not used yet)
@@ -1715,28 +1702,42 @@ static __inline void	ng_queue_rw(struct ng_queue * ngq,
                        V
 +-------+-------+-------+-------+-------+-------+-------+-------+
 | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
-|A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |R|A|W|S|
-| | | | | | | | | | | | | | | | | | | | | | | | | | | | |P|W|P|T|
+| |A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |R|A|W|
+| | | | | | | | | | | | | | | | | | | | | | | | | | | | | |P|W|P|
 +-------+-------+-------+-------+-------+-------+-------+-------+
-\_________________________ ____________________________/ | | | |
-                          V                              | | | |
-                [active reader count]                    | | | |
-                                                         | | | |
-        Read Pending ------------------------------------+ | | |
+\___________________________ ____________________________/ | | |
+                            V                              | | |
+                  [active reader count]                    | | |
                                                            | | |
-        Active Writer -------------------------------------+ | |
+          Read Pending ------------------------------------+ | |
                                                              | |
-        Write Pending ---------------------------------------+ |
+          Active Writer -------------------------------------+ |
                                                                |
-        Single Threading Only ---------------------------------+
+          Write Pending ---------------------------------------+
+
+
 */
-#define	SINGLE_THREAD_ONLY 0x00000001	/* if set, even reads single thread */
-#define WRITE_PENDING	0x00000002
-#define	WRITER_ACTIVE	0x00000004
-#define READ_PENDING	0x00000008
-#define	READER_INCREMENT 0x00000010
-#define	READER_MASK	0xfffffff0	/* Not valid if WRITER_ACTIVE is set */
+#define WRITE_PENDING	0x00000001
+#define WRITER_ACTIVE	0x00000002
+#define READ_PENDING	0x00000004
+#define READER_INCREMENT 0x00000008
+#define READER_MASK	0xfffffff0	/* Not valid if WRITER_ACTIVE is set */
 #define SAFETY_BARRIER	0x00100000	/* 64K items queued should be enough */
+
+/* Defines of more elaborate states on the queue */
+/* Mask of bits a read cares about */
+#define NGQ_RMASK	(WRITE_PENDING|WRITER_ACTIVE|READ_PENDING)
+
+/* Mask of bits a write cares about */
+#define NGQ_WMASK	(NGQ_RMASK|READER_MASK)
+
+/* tests to decide if we could get a read or write off the queue */
+#define CAN_GET_READ(flag)	((flag & NGQ_RMASK) == READ_PENDING)
+#define CAN_GET_WRITE(flag)	((flag & NGQ_WMASK) == WRITE_PENDING)
+
+/* Is there a chance of getting ANY work off the queue? */
+#define CAN_GET_WORK(flag)	(CAN_GET_READ(flag) || CAN_GET_WRITE(flag))
+
 /*
  * Taking into account the current state of the queue and node, possibly take
  * the next entry off the queue and return it. Return NULL if there was
@@ -1751,24 +1752,16 @@ ng_dequeue(struct ng_queue *ngq)
 {
 	item_p item;
 	u_int		add_arg;
-	/*
-	 * If there is a writer, then the answer is "no". Everything else
-	 * stops when there is a WRITER.
-	 */
-	if (ngq->q_flags & WRITER_ACTIVE) {
-		return (NULL);
-	}
-	/* Now take a look at what's on the queue and what's running */
-	if ((ngq->q_flags & ~(READER_MASK | SINGLE_THREAD_ONLY)) == READ_PENDING) {
+
+	if (CAN_GET_READ(ngq->q_flags)) {
 		/*
-		 * It was a reader and we have no write active. We don't care
-		 * how many readers are already active. Adjust the count for
-		 * the item we are about to dequeue. Adding READ_PENDING to
-		 * the exisiting READ_PENDING clears it and generates a carry
-		 * into the reader count.
+		 * Head of queue is a reader and we have no write active.
+		 * We don't care how many readers are already active. 
+		 * Adjust the flags for the item we are about to dequeue.
+		 * Add the correct increment for the reader count as well.
 		 */
-		add_arg = READ_PENDING;
-	} else if ((ngq->q_flags & ~SINGLE_THREAD_ONLY) == WRITE_PENDING) {
+		add_arg = (READER_INCREMENT - READ_PENDING);
+	} else if (CAN_GET_WRITE(ngq->q_flags)) {
 		/*
 		 * There is a pending write, no readers and no active writer.
 		 * This means we can go ahead with the pending writer. Note
@@ -1785,7 +1778,7 @@ ng_dequeue(struct ng_queue *ngq)
 		 * not matter (in fact it is defined that way). If it tests
 		 * the flag before this operation, the WRITE_PENDING flag
 		 * will make it fail, and if it tests it later, the
-		 * ACTIVE_WRITER flag will do the same. If it is SO slow that
+		 * WRITER_ACTIVE flag will do the same. If it is SO slow that
 		 * we have actually completed the operation, and neither flag
 		 * is set (nor the READ_PENDING) by the time that it tests
 		 * the flags, then it is actually ok for it to continue. If
@@ -1799,16 +1792,13 @@ ng_dequeue(struct ng_queue *ngq)
 		 * After failing, first it will be held back by the mutex, then
 		 * when it can proceed, it will queue its request, then it
 		 * would arrive at this function. Usually it will have to
-		 * leave empty handed because the ACTIVE WRITER bit wil be
+		 * leave empty handed because the ACTIVE WRITER bit will be
 		 * set.
+		 *
+		 * Adjust the flags for the item we are about to dequeue
+		 * and for the new active writer.
 		 */
-		/*
-		 * Adjust the flags for the item we are about to dequeue.
-		 * Adding WRITE_PENDING to the exisiting WRITE_PENDING clears
-		 * it and generates a carry into the WRITER_ACTIVE flag, all
-		 * atomically.
-		 */
-		add_arg = WRITE_PENDING;
+		add_arg = (WRITER_ACTIVE - WRITE_PENDING);
 		/*
 		 * We want to write "active writer, no readers " Now go make
 		 * it true. In fact there may be a number in the readers
@@ -1824,7 +1814,9 @@ ng_dequeue(struct ng_queue *ngq)
 		 * moment with the fasttrack code, and put us in a strange
 		 * state, then it will be through in just a moment, (as soon
 		 * as we release the mutex) and keep things moving.
+		 * Make sure we remove ourselves from the work queue.
 		 */
+		ng_worklist_remove(ngq->q_node);
 		return (0);
 	}
 
@@ -1842,40 +1834,36 @@ ng_dequeue(struct ng_queue *ngq)
 		 */
 		ngq->last = &(ngq->queue);
 		/*
-		 * Whatever flag was set is cleared and the carry sets the
-		 * correct new active state/count. So we don't need to change
-		 * add_arg.
+		 * Whatever flag was set will be cleared and
+		 * the new acive field will be set by the add as well,
+		 * so we don't need to change add_arg.
+		 * But we know we don't need to be on the work list.
 		 */
+		atomic_add_long(&ngq->q_flags, add_arg);
+		ng_worklist_remove(ngq->q_node);
 	} else {
+		/* 
+		 * Since there is something on the queue, note what it is
+		 * in the flags word.
+		 */
 		if ((ngq->queue->el_flags & NGQF_RW) == NGQF_READER) {
-			/*
-			 * If we had a READ_PENDING and have another one, we
-			 * just want to add READ_PENDING twice (the same as
-			 * adding READER_INCREMENT). If we had WRITE_PENDING,
-			 * we want to add READ_PENDING + WRITE_PENDING to
-			 * clear the old WRITE_PENDING, set ACTIVE_WRITER,
-			 * and set READ_PENDING. Either way we just add
-			 * READ_PENDING to whatever we already had.
-			 */
 			add_arg += READ_PENDING;
 		} else {
-			/*
-			 * If we had a WRITE_PENDING and have another one, we
-			 * just want to add WRITE_PENDING twice (the same as
-			 * adding ACTIVE_WRITER). If we had READ_PENDING, we
-			 * want to add READ_PENDING + WRITE_PENDING to clear
-			 * the old READ_PENDING, increment the readers, and
-			 * set WRITE_PENDING. Either way we just add
-			 * WRITE_PENDING to whatever we already had.
-			 */
 			add_arg += WRITE_PENDING;
 		}
+		atomic_add_long(&ngq->q_flags, add_arg);
+		/*
+		 * If we see more doable work, make sure we are
+		 * on the work queue.
+		 */
+		if (CAN_GET_WORK(ngq->q_flags)) {
+			ng_setisr(ngq->q_node);
+		}
 	}
-	atomic_add_long(&ngq->q_flags, add_arg);
 	/*
 	 * We have successfully cleared the old pending flag, set the new one
 	 * if it is needed, and incremented the appropriate active field.
-	 * (all in one atomic addition.. wow)
+	 * (all in one atomic addition.. )
 	 */
 	return (item);
 }
@@ -1885,8 +1873,6 @@ ng_dequeue(struct ng_queue *ngq)
  * We really don't care who, but we can't or don't want to hang around
  * to process it ourselves. We are probably an interrupt routine..
  * 1 = writer, 0 = reader
- * We should set something to indicate NETISR requested
- * If it's the first item queued.
  */
 #define NGQRW_R 0
 #define NGQRW_W 1
@@ -1938,7 +1924,7 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
 
 	/* ######### Hack alert ######### */
 	atomic_add_long(&ngq->q_flags, READER_INCREMENT);
-	if ((ngq->q_flags & (~READER_MASK)) == 0) {
+	if ((ngq->q_flags & NGQ_RMASK) == 0) {
 		/* Successfully grabbed node */
 		return (item);
 	}
@@ -1953,19 +1939,12 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
 	 * running, between the previous test, and the moment that we took
 	 * the mutex. (Or maybe a writer completed.)
 	 */
-	if ((ngq->q_flags & (~READER_MASK)) == 0) {
+	if ((ngq->q_flags & NGQ_RMASK) == 0) {
 		atomic_add_long(&ngq->q_flags, READER_INCREMENT);
 		mtx_exit((&ngq->q_mtx), MTX_SPIN);
 		return (item);
 	}
 
-	/*
-	 * Quick check that we are doing things right.
-	 */
-	if (ngq->q_flags & SINGLE_THREAD_ONLY) {
-		panic("Calling single treaded queue incorrectly");
-	}
-
 	/*
 	 * and queue the request for later.
 	 */
@@ -1991,15 +1970,14 @@ restart:
 	 * we can just go ahead. In all other situations we need to queue the
 	 * request
 	 */
-	if ((ngq->q_flags & (~SINGLE_THREAD_ONLY)) == 0) {
+	if ((ngq->q_flags & NGQ_WMASK) == 0) {
 		atomic_add_long(&ngq->q_flags, WRITER_ACTIVE);
 		mtx_exit((&ngq->q_mtx), MTX_SPIN);
 		if (ngq->q_flags & READER_MASK) {
 			/* Collision with fast-track reader */
-			atomic_add_long(&ngq->q_flags, -WRITER_ACTIVE);
+			atomic_subtract_long(&ngq->q_flags, WRITER_ACTIVE);
 			goto restart;
 		}
-
 		return (item);
 	}
 
@@ -2063,6 +2041,11 @@ ng_flush_input_queue(struct ng_queue * ngq)
 		NG_FREE_ITEM(item);
 		mtx_enter(&ngq->q_mtx, MTX_SPIN);
 	}
+	/*
+	 * Take us off the work queue if we are there.
+	 * We definatly have no work to be done.
+	 */
+	ng_worklist_remove(ngq->q_node);
 	mtx_exit(&ngq->q_mtx, MTX_SPIN);
 }
 
@@ -2093,11 +2076,11 @@ int
 ng_snd_item(item_p item, int queue)
 {
 	hook_p hook = NGI_HOOK(item);
-	node_p dest = NGI_NODE(item);
+	node_p node = NGI_NODE(item);
 	int rw;
 	int error = 0, ierror;
 	item_p	oitem;
-	struct ng_queue * ngq = &dest->nd_input_queue;
+	struct ng_queue * ngq = &node->nd_input_queue;
 
 #ifdef	NETGRAPH_DEBUG
         _ngi_check(item, __FILE__, __LINE__);
@@ -2107,7 +2090,7 @@ ng_snd_item(item_p item, int queue)
 		TRAP_ERROR();
 		return (EINVAL);	/* failed to get queue element */
 	}
-	if (dest == NULL) {
+	if (node == NULL) {
 		NG_FREE_ITEM(item);
 		TRAP_ERROR();
 		return (EINVAL);	/* No address */
@@ -2173,8 +2156,7 @@ ng_snd_item(item_p item, int queue)
 	 * Similarly the node may say one hook always produces writers.
 	 * These are over-rides.
 	 */
-	if ((ngq->q_flags & SINGLE_THREAD_ONLY)
-	|| (dest->nd_flags & NG_FORCE_WRITER)
+	if ((node->nd_flags & NG_FORCE_WRITER)
 	|| (hook && (hook->hk_flags & HK_FORCE_WRITER))) {
 			rw = NGQRW_W;
 			item->el_flags &= ~NGQF_READER;
@@ -2186,16 +2168,18 @@ ng_snd_item(item_p item, int queue)
 #endif
 		mtx_enter(&(ngq->q_mtx), MTX_SPIN);
 		ng_queue_rw(ngq, item, rw);
-		mtx_exit(&(ngq->q_mtx), MTX_SPIN);
 		/*
 		 * If there are active elements then we can rely on
 		 * them. if not we should not rely on another packet
 		 * coming here by another path,
 		 * so it is best to put us in the netisr list.
+		 * We can take the worklist lock with the node locked
+		 * BUT NOT THE REVERSE!
 		 */
-		if ((ngq->q_flags & (READER_MASK|WRITER_ACTIVE)) == 0) {
-			ng_setisr(ngq->q_node);
+		if (CAN_GET_WORK(ngq->q_flags)) {
+			ng_setisr(node);
 		}
+		mtx_exit(&(ngq->q_mtx), MTX_SPIN);
 		return (0);
 	}
 	/*
@@ -2245,30 +2229,17 @@ ng_snd_item(item_p item, int queue)
 	 * in SMP systems. :-)
 	 */
 	for (;;) {
-		/* quick hack to save all that mutex stuff */
-		if ((ngq->q_flags & (WRITE_PENDING | READ_PENDING)) == 0) {
-			if (dest->nd_flags & NG_WORKQ)
-				ng_worklist_remove(dest);
-			return (error);
-		}
-
 		/*
 		 * dequeue acquires and adjusts the input_queue as it dequeues
 		 * packets. It acquires the rw lock as needed.
 		 */
 		mtx_enter(&ngq->q_mtx, MTX_SPIN);
-		item = ng_dequeue(ngq);
-		mtx_exit(&ngq->q_mtx, MTX_SPIN);
+		item = ng_dequeue(ngq); /* fixes worklist too*/
 		if (!item) {
-			/*
-			 * If we have no work to do
-			 * then we certainly don't need to be
-			 * on the worklist.
-			 */
-			if (dest->nd_flags & NG_WORKQ)
-				ng_worklist_remove(dest);
+			mtx_exit(&ngq->q_mtx, MTX_SPIN);
 			return (error);
 		}
+		mtx_exit(&ngq->q_mtx, MTX_SPIN);
 
 		/*
 		 * We have the appropriate lock, so run the item.
@@ -2950,7 +2921,7 @@ ngb_mod_event(module_t mod, int event, void *data)
 	switch (event) {
 	case MOD_LOAD:
 		/* Register line discipline */
-		mtx_init(&ng_worklist_mtx, "netgraph worklist mutex", 0);
+		mtx_init(&ng_worklist_mtx, "netgraph worklist mutex", MTX_SPIN);
 		mtx_init(&ng_typelist_mtx, "netgraph types mutex", 0);
 		mtx_init(&ng_nodelist_mtx, "netgraph nodelist mutex", 0);
 		mtx_init(&ng_idhash_mtx, "netgraph idhash mutex", 0);
@@ -3266,14 +3237,15 @@ ngintr(void)
 	node_p  node = NULL;
 
 	for (;;) {
-		mtx_enter(&ng_worklist_mtx, MTX_DEF);
+		mtx_enter(&ng_worklist_mtx, MTX_SPIN);
 		node = TAILQ_FIRST(&ng_worklist);
 		if (!node) {
-			mtx_exit(&ng_worklist_mtx, MTX_DEF);
+			mtx_exit(&ng_worklist_mtx, MTX_SPIN);
 			break;
 		}
+		node->nd_flags &= ~NG_WORKQ;	
 		TAILQ_REMOVE(&ng_worklist, node, nd_work);
-		mtx_exit(&ng_worklist_mtx, MTX_DEF);
+		mtx_exit(&ng_worklist_mtx, MTX_SPIN);
 		/*
 		 * We have the node. We also take over the reference
 		 * that the list had on it.
@@ -3282,18 +3254,15 @@ ngintr(void)
 		 * All this time, keep the reference
 		 * that lets us be sure that the node still exists.
 		 * Let the reference go at the last minute.
+		 * ng_dequeue will put us back on the worklist
+		 * if there is more too do. This may be of use if there
+		 * are Multiple Processors and multiple Net threads in the 
+		 * future.
 		 */
 		for (;;) {
 			mtx_enter(&node->nd_input_queue.q_mtx, MTX_SPIN);
 			item = ng_dequeue(&node->nd_input_queue);
 			if (item == NULL) {
-				/*
-				 * Say we are on the queue as long as
-				 * we are processing it here.
-				 * it probably wouldn't come here while we
-				 * are processing anyhow.
-				 */
-				node->nd_flags &= ~NG_WORKQ;	
 				mtx_exit(&node->nd_input_queue.q_mtx, MTX_SPIN);
 				NG_NODE_UNREF(node);
 				break; /* go look for another node */
@@ -3308,19 +3277,19 @@ ngintr(void)
 static void
 ng_worklist_remove(node_p node)
 {
-	mtx_enter(&ng_worklist_mtx, MTX_DEF);
+	mtx_enter(&ng_worklist_mtx, MTX_SPIN);
 	if (node->nd_flags & NG_WORKQ) {
 		TAILQ_REMOVE(&ng_worklist, node, nd_work);
 		NG_NODE_UNREF(node);
 	}
 	node->nd_flags &= ~NG_WORKQ;
-	mtx_exit(&ng_worklist_mtx, MTX_DEF);
+	mtx_exit(&ng_worklist_mtx, MTX_SPIN);
 }
 
 static void
 ng_setisr(node_p node)
 {
-	mtx_enter(&ng_worklist_mtx, MTX_DEF);
+	mtx_enter(&ng_worklist_mtx, MTX_SPIN);
 	if ((node->nd_flags & NG_WORKQ) == 0) {
 		/*
 		 * If we are not already on the work queue,
@@ -3330,7 +3299,7 @@ ng_setisr(node_p node)
 		TAILQ_INSERT_TAIL(&ng_worklist, node, nd_work);
 		NG_NODE_REF(node);
 	}
-	mtx_exit(&ng_worklist_mtx, MTX_DEF);
+	mtx_exit(&ng_worklist_mtx, MTX_SPIN);
 	schednetisr(NETISR_NETGRAPH);
 }
 
diff --git a/sys/netgraph/ng_message.h b/sys/netgraph/ng_message.h
index e051f79d738c..7aac041c4d08 100644
--- a/sys/netgraph/ng_message.h
+++ b/sys/netgraph/ng_message.h
@@ -144,14 +144,14 @@ struct ng_mesg {
 
 /* Downstream messages */
 #define NGM_DROP_LINK		41	/* drop DTR, etc. - stay in the graph */
-#define NGM_RAISE LINK		42	/* if you previously dropped it */
-#define NGM_FLUSH_QUEUE		43		/* no data */
+#define NGM_RAISE_LINK		42	/* if you previously dropped it */
+#define NGM_FLUSH_QUEUE		43	/* no data */
 #define NGM_GET_BANDWIDTH	(44|NGM_READONLY)	/* either real or measured */
-#define NGM_SET_XMIT_Q_LIMITS	45		/* includes queue state */
+#define NGM_SET_XMIT_Q_LIMITS	45	/* includes queue state */
 #define NGM_GET_XMIT_Q_LIMITS	(46|NGM_READONLY)	/* returns queue state */
-#define NGM_MICROMANAGE		47		/* We want sync. queue state
+#define NGM_MICROMANAGE		47	/* We want sync. queue state
 						reply for each packet sent */
-#define NGM_SET_FLOW_MANAGER	48		/* send flow control here */ 
+#define NGM_SET_FLOW_MANAGER	48	/* send flow control here */ 
 /* Structure used for NGM_MKPEER */
 struct ngm_mkpeer {
 	char	type[NG_TYPELEN + 1];			/* peer type */