Clean up reference counting with relation to queued packets and the worklist,

and while I'm there, clean up the worklist insertion and removal.

Inspired by: Harti Brandt <brandt@fokus.gmd.de>
This commit is contained in:
Julian Elischer 2001-02-01 20:51:23 +00:00
parent a5108eaebf
commit b57a79658b
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=71902
2 changed files with 107 additions and 138 deletions

View File

@ -151,7 +151,7 @@ struct ng_hook ng_deadhook = {
*/
/* List nodes with unallocated work */
static TAILQ_HEAD(, ng_node) ng_worklist = TAILQ_HEAD_INITIALIZER(ng_worklist);
static struct mtx ng_worklist_mtx;
static struct mtx ng_worklist_mtx; /* MUST LOCK NODE FIRST */
/* List of installed types */
static LIST_HEAD(, ng_type) ng_typelist;
@ -703,15 +703,10 @@ ng_rmnode(node_p node, hook_p dummy1, void *dummy2, int dummy3)
* it has no hooks so what's it going to do, bleed on someone?
* Theoretically we came here from a queue entry that was added
* Just before the queue was closed, so it should be empty anyway.
* Also removes us from worklist if needed.
*/
ng_flush_input_queue(&node->nd_input_queue);
/*
* Take us off the work queue if we are there.
* We definatly have no work to be done.
*/
ng_worklist_remove(node);
/* Ask the type if it has anything to do in this case */
if (node->nd_type && node->nd_type->shutdown) {
(*node->nd_type->shutdown)(node);
@ -1699,15 +1694,7 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
* Defined here rather than in netgraph.h because no-one should fiddle
* with them.
*
* The ordering here is important! don't shuffle these. If you add
* READ_PENDING to the word when it has READ_PENDING already set, you
* generate a carry into the reader count, this you atomically add a reader,
* and remove the pending reader count! Similarly for the pending writer
* flag, adding WRITE_PENDING generates a carry and sets the WRITER_ACTIVE
* flag, while clearing WRITE_PENDING. When 'SINGLE_THREAD_ONLY' is set, then
* it is only permitted to do WRITER operations. Reader operations will
* result in errors.
* But that "hack" is unnecessary: "cpp" can do the math for us!
* The ordering here may be important! don't shuffle these.
*/
/*-
Safety Barrier--------+ (adjustable to suit taste) (not used yet)
@ -1715,28 +1702,42 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
V
+-------+-------+-------+-------+-------+-------+-------+-------+
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
|A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |R|A|W|S|
| | | | | | | | | | | | | | | | | | | | | | | | | | | | |P|W|P|T|
| |A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |R|A|W|
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | |P|W|P|
+-------+-------+-------+-------+-------+-------+-------+-------+
\_________________________ ____________________________/ | | | |
V | | | |
[active reader count] | | | |
| | | |
Read Pending ------------------------------------+ | | |
\___________________________ ____________________________/ | | |
V | | |
[active reader count] | | |
| | |
Active Writer -------------------------------------+ | |
Read Pending ------------------------------------+ | |
| |
Write Pending ---------------------------------------+ |
Active Writer -------------------------------------+ |
|
Single Threading Only ---------------------------------+
Write Pending ---------------------------------------+
*/
#define SINGLE_THREAD_ONLY 0x00000001 /* if set, even reads single thread */
#define WRITE_PENDING 0x00000002
#define WRITER_ACTIVE 0x00000004
#define READ_PENDING 0x00000008
#define READER_INCREMENT 0x00000010
#define READER_MASK 0xfffffff0 /* Not valid if WRITER_ACTIVE is set */
#define WRITE_PENDING 0x00000001
#define WRITER_ACTIVE 0x00000002
#define READ_PENDING 0x00000004
#define READER_INCREMENT 0x00000008
#define READER_MASK 0xfffffff0 /* Not valid if WRITER_ACTIVE is set */
#define SAFETY_BARRIER 0x00100000 /* 64K items queued should be enough */
/* Defines of more elaborate states on the queue */
/* Mask of bits a read cares about */
#define NGQ_RMASK (WRITE_PENDING|WRITER_ACTIVE|READ_PENDING)
/* Mask of bits a write cares about */
#define NGQ_WMASK (NGQ_RMASK|READER_MASK)
/* tests to decide if we could get a read or write off the queue */
#define CAN_GET_READ(flag) ((flag & NGQ_RMASK) == READ_PENDING)
#define CAN_GET_WRITE(flag) ((flag & NGQ_WMASK) == WRITE_PENDING)
/* Is there a chance of getting ANY work off the queue? */
#define CAN_GET_WORK(flag) (CAN_GET_READ(flag) || CAN_GET_WRITE(flag))
/*
* Taking into account the current state of the queue and node, possibly take
* the next entry off the queue and return it. Return NULL if there was
@ -1751,24 +1752,16 @@ ng_dequeue(struct ng_queue *ngq)
{
item_p item;
u_int add_arg;
/*
* If there is a writer, then the answer is "no". Everything else
* stops when there is a WRITER.
*/
if (ngq->q_flags & WRITER_ACTIVE) {
return (NULL);
}
/* Now take a look at what's on the queue and what's running */
if ((ngq->q_flags & ~(READER_MASK | SINGLE_THREAD_ONLY)) == READ_PENDING) {
if (CAN_GET_READ(ngq->q_flags)) {
/*
* It was a reader and we have no write active. We don't care
* how many readers are already active. Adjust the count for
* the item we are about to dequeue. Adding READ_PENDING to
* the exisiting READ_PENDING clears it and generates a carry
* into the reader count.
* Head of queue is a reader and we have no write active.
* We don't care how many readers are already active.
* Adjust the flags for the item we are about to dequeue.
* Add the correct increment for the reader count as well.
*/
add_arg = READ_PENDING;
} else if ((ngq->q_flags & ~SINGLE_THREAD_ONLY) == WRITE_PENDING) {
add_arg = (READER_INCREMENT - READ_PENDING);
} else if (CAN_GET_WRITE(ngq->q_flags)) {
/*
* There is a pending write, no readers and no active writer.
* This means we can go ahead with the pending writer. Note
@ -1785,7 +1778,7 @@ ng_dequeue(struct ng_queue *ngq)
* not matter (in fact it is defined that way). If it tests
* the flag before this operation, the WRITE_PENDING flag
* will make it fail, and if it tests it later, the
* ACTIVE_WRITER flag will do the same. If it is SO slow that
* WRITER_ACTIVE flag will do the same. If it is SO slow that
* we have actually completed the operation, and neither flag
* is set (nor the READ_PENDING) by the time that it tests
* the flags, then it is actually ok for it to continue. If
@ -1799,16 +1792,13 @@ ng_dequeue(struct ng_queue *ngq)
* After failing, first it will be held back by the mutex, then
* when it can proceed, it will queue its request, then it
* would arrive at this function. Usually it will have to
* leave empty handed because the ACTIVE WRITER bit wil be
* leave empty handed because the ACTIVE WRITER bit will be
* set.
*
* Adjust the flags for the item we are about to dequeue
* and for the new active writer.
*/
/*
* Adjust the flags for the item we are about to dequeue.
* Adding WRITE_PENDING to the exisiting WRITE_PENDING clears
* it and generates a carry into the WRITER_ACTIVE flag, all
* atomically.
*/
add_arg = WRITE_PENDING;
add_arg = (WRITER_ACTIVE - WRITE_PENDING);
/*
* We want to write "active writer, no readers " Now go make
* it true. In fact there may be a number in the readers
@ -1824,7 +1814,9 @@ ng_dequeue(struct ng_queue *ngq)
* moment with the fasttrack code, and put us in a strange
* state, then it will be through in just a moment, (as soon
* as we release the mutex) and keep things moving.
* Make sure we remove ourselves from the work queue.
*/
ng_worklist_remove(ngq->q_node);
return (0);
}
@ -1842,40 +1834,36 @@ ng_dequeue(struct ng_queue *ngq)
*/
ngq->last = &(ngq->queue);
/*
* Whatever flag was set is cleared and the carry sets the
* correct new active state/count. So we don't need to change
* add_arg.
* Whatever flag was set will be cleared and
* the new acive field will be set by the add as well,
* so we don't need to change add_arg.
* But we know we don't need to be on the work list.
*/
atomic_add_long(&ngq->q_flags, add_arg);
ng_worklist_remove(ngq->q_node);
} else {
/*
* Since there is something on the queue, note what it is
* in the flags word.
*/
if ((ngq->queue->el_flags & NGQF_RW) == NGQF_READER) {
/*
* If we had a READ_PENDING and have another one, we
* just want to add READ_PENDING twice (the same as
* adding READER_INCREMENT). If we had WRITE_PENDING,
* we want to add READ_PENDING + WRITE_PENDING to
* clear the old WRITE_PENDING, set ACTIVE_WRITER,
* and set READ_PENDING. Either way we just add
* READ_PENDING to whatever we already had.
*/
add_arg += READ_PENDING;
} else {
/*
* If we had a WRITE_PENDING and have another one, we
* just want to add WRITE_PENDING twice (the same as
* adding ACTIVE_WRITER). If we had READ_PENDING, we
* want to add READ_PENDING + WRITE_PENDING to clear
* the old READ_PENDING, increment the readers, and
* set WRITE_PENDING. Either way we just add
* WRITE_PENDING to whatever we already had.
*/
add_arg += WRITE_PENDING;
}
atomic_add_long(&ngq->q_flags, add_arg);
/*
* If we see more doable work, make sure we are
* on the work queue.
*/
if (CAN_GET_WORK(ngq->q_flags)) {
ng_setisr(ngq->q_node);
}
}
atomic_add_long(&ngq->q_flags, add_arg);
/*
* We have successfully cleared the old pending flag, set the new one
* if it is needed, and incremented the appropriate active field.
* (all in one atomic addition.. wow)
* (all in one atomic addition.. )
*/
return (item);
}
@ -1885,8 +1873,6 @@ ng_dequeue(struct ng_queue *ngq)
* We really don't care who, but we can't or don't want to hang around
* to process it ourselves. We are probably an interrupt routine..
* 1 = writer, 0 = reader
* We should set something to indicate NETISR requested
* If it's the first item queued.
*/
#define NGQRW_R 0
#define NGQRW_W 1
@ -1938,7 +1924,7 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
/* ######### Hack alert ######### */
atomic_add_long(&ngq->q_flags, READER_INCREMENT);
if ((ngq->q_flags & (~READER_MASK)) == 0) {
if ((ngq->q_flags & NGQ_RMASK) == 0) {
/* Successfully grabbed node */
return (item);
}
@ -1953,19 +1939,12 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
* running, between the previous test, and the moment that we took
* the mutex. (Or maybe a writer completed.)
*/
if ((ngq->q_flags & (~READER_MASK)) == 0) {
if ((ngq->q_flags & NGQ_RMASK) == 0) {
atomic_add_long(&ngq->q_flags, READER_INCREMENT);
mtx_exit((&ngq->q_mtx), MTX_SPIN);
return (item);
}
/*
* Quick check that we are doing things right.
*/
if (ngq->q_flags & SINGLE_THREAD_ONLY) {
panic("Calling single treaded queue incorrectly");
}
/*
* and queue the request for later.
*/
@ -1991,15 +1970,14 @@ ng_acquire_write(struct ng_queue *ngq, item_p item)
* we can just go ahead. In all other situations we need to queue the
* request
*/
if ((ngq->q_flags & (~SINGLE_THREAD_ONLY)) == 0) {
if ((ngq->q_flags & NGQ_WMASK) == 0) {
atomic_add_long(&ngq->q_flags, WRITER_ACTIVE);
mtx_exit((&ngq->q_mtx), MTX_SPIN);
if (ngq->q_flags & READER_MASK) {
/* Collision with fast-track reader */
atomic_add_long(&ngq->q_flags, -WRITER_ACTIVE);
atomic_subtract_long(&ngq->q_flags, WRITER_ACTIVE);
goto restart;
}
return (item);
}
@ -2063,6 +2041,11 @@ ng_flush_input_queue(struct ng_queue * ngq)
NG_FREE_ITEM(item);
mtx_enter(&ngq->q_mtx, MTX_SPIN);
}
/*
* Take us off the work queue if we are there.
* We definatly have no work to be done.
*/
ng_worklist_remove(ngq->q_node);
mtx_exit(&ngq->q_mtx, MTX_SPIN);
}
@ -2093,11 +2076,11 @@ int
ng_snd_item(item_p item, int queue)
{
hook_p hook = NGI_HOOK(item);
node_p dest = NGI_NODE(item);
node_p node = NGI_NODE(item);
int rw;
int error = 0, ierror;
item_p oitem;
struct ng_queue * ngq = &dest->nd_input_queue;
struct ng_queue * ngq = &node->nd_input_queue;
#ifdef NETGRAPH_DEBUG
_ngi_check(item, __FILE__, __LINE__);
@ -2107,7 +2090,7 @@ ng_snd_item(item_p item, int queue)
TRAP_ERROR();
return (EINVAL); /* failed to get queue element */
}
if (dest == NULL) {
if (node == NULL) {
NG_FREE_ITEM(item);
TRAP_ERROR();
return (EINVAL); /* No address */
@ -2173,8 +2156,7 @@ ng_snd_item(item_p item, int queue)
* Similarly the node may say one hook always produces writers.
* These are over-rides.
*/
if ((ngq->q_flags & SINGLE_THREAD_ONLY)
|| (dest->nd_flags & NG_FORCE_WRITER)
if ((node->nd_flags & NG_FORCE_WRITER)
|| (hook && (hook->hk_flags & HK_FORCE_WRITER))) {
rw = NGQRW_W;
item->el_flags &= ~NGQF_READER;
@ -2186,16 +2168,18 @@ ng_snd_item(item_p item, int queue)
#endif
mtx_enter(&(ngq->q_mtx), MTX_SPIN);
ng_queue_rw(ngq, item, rw);
mtx_exit(&(ngq->q_mtx), MTX_SPIN);
/*
* If there are active elements then we can rely on
* them. if not we should not rely on another packet
* coming here by another path,
* so it is best to put us in the netisr list.
* We can take the worklist lock with the node locked
* BUT NOT THE REVERSE!
*/
if ((ngq->q_flags & (READER_MASK|WRITER_ACTIVE)) == 0) {
ng_setisr(ngq->q_node);
if (CAN_GET_WORK(ngq->q_flags)) {
ng_setisr(node);
}
mtx_exit(&(ngq->q_mtx), MTX_SPIN);
return (0);
}
/*
@ -2245,30 +2229,17 @@ ng_snd_item(item_p item, int queue)
* in SMP systems. :-)
*/
for (;;) {
/* quick hack to save all that mutex stuff */
if ((ngq->q_flags & (WRITE_PENDING | READ_PENDING)) == 0) {
if (dest->nd_flags & NG_WORKQ)
ng_worklist_remove(dest);
return (error);
}
/*
* dequeue acquires and adjusts the input_queue as it dequeues
* packets. It acquires the rw lock as needed.
*/
mtx_enter(&ngq->q_mtx, MTX_SPIN);
item = ng_dequeue(ngq);
mtx_exit(&ngq->q_mtx, MTX_SPIN);
item = ng_dequeue(ngq); /* fixes worklist too*/
if (!item) {
/*
* If we have no work to do
* then we certainly don't need to be
* on the worklist.
*/
if (dest->nd_flags & NG_WORKQ)
ng_worklist_remove(dest);
mtx_exit(&ngq->q_mtx, MTX_SPIN);
return (error);
}
mtx_exit(&ngq->q_mtx, MTX_SPIN);
/*
* We have the appropriate lock, so run the item.
@ -2950,7 +2921,7 @@ ngb_mod_event(module_t mod, int event, void *data)
switch (event) {
case MOD_LOAD:
/* Register line discipline */
mtx_init(&ng_worklist_mtx, "netgraph worklist mutex", 0);
mtx_init(&ng_worklist_mtx, "netgraph worklist mutex", MTX_SPIN);
mtx_init(&ng_typelist_mtx, "netgraph types mutex", 0);
mtx_init(&ng_nodelist_mtx, "netgraph nodelist mutex", 0);
mtx_init(&ng_idhash_mtx, "netgraph idhash mutex", 0);
@ -3266,14 +3237,15 @@ ngintr(void)
node_p node = NULL;
for (;;) {
mtx_enter(&ng_worklist_mtx, MTX_DEF);
mtx_enter(&ng_worklist_mtx, MTX_SPIN);
node = TAILQ_FIRST(&ng_worklist);
if (!node) {
mtx_exit(&ng_worklist_mtx, MTX_DEF);
mtx_exit(&ng_worklist_mtx, MTX_SPIN);
break;
}
node->nd_flags &= ~NG_WORKQ;
TAILQ_REMOVE(&ng_worklist, node, nd_work);
mtx_exit(&ng_worklist_mtx, MTX_DEF);
mtx_exit(&ng_worklist_mtx, MTX_SPIN);
/*
* We have the node. We also take over the reference
* that the list had on it.
@ -3282,18 +3254,15 @@ ngintr(void)
* All this time, keep the reference
* that lets us be sure that the node still exists.
* Let the reference go at the last minute.
* ng_dequeue will put us back on the worklist
* if there is more too do. This may be of use if there
* are Multiple Processors and multiple Net threads in the
* future.
*/
for (;;) {
mtx_enter(&node->nd_input_queue.q_mtx, MTX_SPIN);
item = ng_dequeue(&node->nd_input_queue);
if (item == NULL) {
/*
* Say we are on the queue as long as
* we are processing it here.
* it probably wouldn't come here while we
* are processing anyhow.
*/
node->nd_flags &= ~NG_WORKQ;
mtx_exit(&node->nd_input_queue.q_mtx, MTX_SPIN);
NG_NODE_UNREF(node);
break; /* go look for another node */
@ -3308,19 +3277,19 @@ ngintr(void)
static void
ng_worklist_remove(node_p node)
{
mtx_enter(&ng_worklist_mtx, MTX_DEF);
mtx_enter(&ng_worklist_mtx, MTX_SPIN);
if (node->nd_flags & NG_WORKQ) {
TAILQ_REMOVE(&ng_worklist, node, nd_work);
NG_NODE_UNREF(node);
}
node->nd_flags &= ~NG_WORKQ;
mtx_exit(&ng_worklist_mtx, MTX_DEF);
mtx_exit(&ng_worklist_mtx, MTX_SPIN);
}
static void
ng_setisr(node_p node)
{
mtx_enter(&ng_worklist_mtx, MTX_DEF);
mtx_enter(&ng_worklist_mtx, MTX_SPIN);
if ((node->nd_flags & NG_WORKQ) == 0) {
/*
* If we are not already on the work queue,
@ -3330,7 +3299,7 @@ ng_setisr(node_p node)
TAILQ_INSERT_TAIL(&ng_worklist, node, nd_work);
NG_NODE_REF(node);
}
mtx_exit(&ng_worklist_mtx, MTX_DEF);
mtx_exit(&ng_worklist_mtx, MTX_SPIN);
schednetisr(NETISR_NETGRAPH);
}

View File

@ -144,14 +144,14 @@ struct ng_mesg {
/* Downstream messages */
#define NGM_DROP_LINK 41 /* drop DTR, etc. - stay in the graph */
#define NGM_RAISE LINK 42 /* if you previously dropped it */
#define NGM_FLUSH_QUEUE 43 /* no data */
#define NGM_RAISE_LINK 42 /* if you previously dropped it */
#define NGM_FLUSH_QUEUE 43 /* no data */
#define NGM_GET_BANDWIDTH (44|NGM_READONLY) /* either real or measured */
#define NGM_SET_XMIT_Q_LIMITS 45 /* includes queue state */
#define NGM_SET_XMIT_Q_LIMITS 45 /* includes queue state */
#define NGM_GET_XMIT_Q_LIMITS (46|NGM_READONLY) /* returns queue state */
#define NGM_MICROMANAGE 47 /* We want sync. queue state
#define NGM_MICROMANAGE 47 /* We want sync. queue state
reply for each packet sent */
#define NGM_SET_FLOW_MANAGER 48 /* send flow control here */
#define NGM_SET_FLOW_MANAGER 48 /* send flow control here */
/* Structure used for NGM_MKPEER */
struct ngm_mkpeer {
char type[NG_TYPELEN + 1]; /* peer type */