Rework the ng_item queueing on nodes:

- Introduce a new flags NGQF_QREADER and NGQF_QWRITER,
    which tell how the item should be actually applied,
    overriding NGQF_READER/NGQF_WRITER flags.
  - Do not differ between pending reader or writer. Use only
    one flag that is raised, when there are pending items.
  - Schedule netgraph ISR in ng_queue_rw(), so that callers
    do not need to do this job.
  - Fix several comments.

Submitted by:	julian
This commit is contained in:
glebius 2005-11-02 14:27:24 +00:00
parent 3e97a4d0bd
commit e33b2b4ba9
2 changed files with 118 additions and 124 deletions

View File

@ -618,9 +618,13 @@ struct ng_item {
#define NGQF_FN 0x02 /* the queue element is a function */
#define NGQF_UNDEF 0x03 /* UNDEFINED */
#define NGQF_RW 0x04 /* MASK for queue entry read/write */
#define NGQF_READER 0x04 /* queued as a reader */
#define NGQF_WRITER 0x00 /* queued as a writer */
#define NGQF_RW 0x04 /* MASK for wanted queue mode */
#define NGQF_READER 0x04 /* wants to be a reader */
#define NGQF_WRITER 0x00 /* wants to be a writer */
#define NGQF_QMODE 0x08 /* MASK for how it was queued */
#define NGQF_QREADER 0x08 /* was queued as a reader */
#define NGQF_QWRITER 0x00 /* was queued as a writer */
/*
* Get the mbuf (etc) out of an item.
@ -805,6 +809,11 @@ _ngi_hook(item_p item, char *file, int line)
_NGI_HOOK(i) = NULL; \
} while (0)
#define NGI_SET_WRITER(i) ((i)->el_flags &= ~NGQF_QMODE)
#define NGI_SET_READER(i) ((i)->el_flags |= NGQF_QREADER)
#define NGI_QUEUED_READER(i) ((i)->el_flags & NGQF_QREADER)
#define NGI_QUEUED_WRITER(i) (((i)->el_flags & NGQF_QMODE) == NGQF_QWRITER)
/**********************************************************************
* Data macros. Send, manipulate and free.

View File

@ -1725,42 +1725,52 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
|
V
+-------+-------+-------+-------+-------+-------+-------+-------+
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
| |A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |R|A|W|
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | |P|W|P|
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
| |A|c|t|i|v|e| |R|e|a|d|e|r| |C|o|u|n|t| | | | | | | | | |P|A|
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | |O|W|
+-------+-------+-------+-------+-------+-------+-------+-------+
\___________________________ ____________________________/ | | |
V | | |
[active reader count] | | |
| | |
Read Pending ------------------------------------+ | |
\___________________________ ____________________________/ | |
V | |
[active reader count] | |
| |
Active Writer -------------------------------------+ |
Operation Pending -------------------------------+ |
|
Write Pending ---------------------------------------+
Active Writer ---------------------------------------+
*/
#define WRITE_PENDING 0x00000001
#define WRITER_ACTIVE 0x00000002
#define READ_PENDING 0x00000004
#define READER_INCREMENT 0x00000008
#define READER_MASK 0xfffffff8 /* Not valid if WRITER_ACTIVE is set */
#define SAFETY_BARRIER 0x00100000 /* 64K items queued should be enough */
#define WRITER_ACTIVE 0x00000001
#define OP_PENDING 0x00000002
#define READER_INCREMENT 0x00000004
#define READER_MASK 0xfffffffc /* Not valid if WRITER_ACTIVE is set */
#define SAFETY_BARRIER 0x00100000 /* 128K items queued should be enough */
/* Defines of more elaborate states on the queue */
/* Mask of bits a read cares about */
#define NGQ_RMASK (WRITE_PENDING|WRITER_ACTIVE|READ_PENDING)
/* Mask of bits a new read cares about */
#define NGQ_RMASK (WRITER_ACTIVE|OP_PENDING)
/* Mask of bits a write cares about */
/* Mask of bits a new write cares about */
#define NGQ_WMASK (NGQ_RMASK|READER_MASK)
/* tests to decide if we could get a read or write off the queue */
#define CAN_GET_READ(flag) ((flag & NGQ_RMASK) == READ_PENDING)
#define CAN_GET_WRITE(flag) ((flag & NGQ_WMASK) == WRITE_PENDING)
/* Test to decide if there is something on the queue. */
#define QUEUE_ACTIVE(QP) ((QP)->q_flags & OP_PENDING)
/* How to decide what the next queued item is. */
#define HEAD_IS_READER(QP) NGI_QUEUED_READER((QP)->queue)
#define HEAD_IS_WRITER(QP) NGI_QUEUED_WRITER((QP)->queue) /* notused */
/* Read the status to decide if the next item on the queue can now run. */
#define QUEUED_READER_CAN_PROCEED(QP) \
(((QP)->q_flags & (NGQ_RMASK & ~OP_PENDING)) == 0)
#define QUEUED_WRITER_CAN_PROCEED(QP) \
(((QP)->q_flags & (NGQ_WMASK & ~OP_PENDING)) == 0)
/* Is there a chance of getting ANY work off the queue? */
#define CAN_GET_WORK(flag) (CAN_GET_READ(flag) || CAN_GET_WRITE(flag))
#define NEXT_QUEUED_ITEM_CAN_PROCEED(QP) \
(QUEUE_ACTIVE(QP) && \
((HEAD_IS_READER(QP)) ? QUEUED_READER_CAN_PROCEED(QP) : \
QUEUED_WRITER_CAN_PROCEED(QP)))
#define NGQRW_R 0
#define NGQRW_W 1
@ -1781,17 +1791,38 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
u_int add_arg;
mtx_assert(&ngq->q_mtx, MA_OWNED);
/*
* If there is nothing queued, then just return.
* No point in continuing.
* XXXGL: assert this?
*/
if (!QUEUE_ACTIVE(ngq)) {
return (NULL);
}
if (CAN_GET_READ(ngq->q_flags)) {
/*
* From here, we can assume there is a head item.
* We need to find out what it is and if it can be dequeued, given
* the current state of the node.
*/
if (HEAD_IS_READER(ngq)) {
if (!QUEUED_READER_CAN_PROCEED(ngq)) {
/*
* It's a reader but we can't use it.
* We are stalled so make sure we don't
* get called again until something changes.
*/
ng_worklist_remove(ngq->q_node);
return (NULL);
}
/*
* Head of queue is a reader and we have no write active.
* We don't care how many readers are already active.
* Adjust the flags for the item we are about to dequeue.
* Add the correct increment for the reader count as well.
* Add the correct increment for the reader count.
*/
add_arg = (READER_INCREMENT - READ_PENDING);
add_arg = READER_INCREMENT;
*rw = NGQRW_R;
} else if (CAN_GET_WRITE(ngq->q_flags)) {
} else if (QUEUED_WRITER_CAN_PROCEED(ngq)) {
/*
* There is a pending write, no readers and no active writer.
* This means we can go ahead with the pending writer. Note
@ -1806,14 +1837,13 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* flags for a while. Since we ignore the reader count
* entirely when the WRITER_ACTIVE flag is set, this should
* not matter (in fact it is defined that way). If it tests
* the flag before this operation, the WRITE_PENDING flag
* the flag before this operation, the OP_PENDING flag
* will make it fail, and if it tests it later, the
* WRITER_ACTIVE flag will do the same. If it is SO slow that
* we have actually completed the operation, and neither flag
* is set (nor the READ_PENDING) by the time that it tests
* the flags, then it is actually ok for it to continue. If
* it completes and we've finished and the read pending is
* set it still fails.
* is set by the time that it tests the flags, then it is
* actually ok for it to continue. If it completes and we've
* finished and the read pending is set it still fails.
*
* So we can just ignore it, as long as we can ensure that the
* transition from WRITE_PENDING state to the WRITER_ACTIVE
@ -1825,10 +1855,9 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* leave empty handed because the ACTIVE WRITER bit will be
* set.
*
* Adjust the flags for the item we are about to dequeue
* and for the new active writer.
* Adjust the flags for the new active writer.
*/
add_arg = (WRITER_ACTIVE - WRITE_PENDING);
add_arg = WRITER_ACTIVE;
*rw = NGQRW_W;
/*
* We want to write "active writer, no readers " Now go make
@ -1843,12 +1872,14 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* have a write pending and the readers count is non zero. If
* we got here because a reader hit us just at the wrong
* moment with the fasttrack code, and put us in a strange
* state, then it will be through in just a moment, (as soon
* as we release the mutex) and keep things moving.
* Make sure we remove ourselves from the work queue.
* state, then it will be coming through in just a moment,
* (just as soon as we release the mutex) and keep things
* moving.
* Make sure we remove ourselves from the work queue. It
* would be a waste of effort to do all this again.
*/
ng_worklist_remove(ngq->q_node);
return (0);
return (NULL);
}
/*
@ -1860,9 +1891,10 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
if (ngq->last == &(item->el_next)) {
/*
* that was the last entry in the queue so set the 'last
* pointer up correctly and make sure the pending flags are
* pointer up correctly and make sure the pending flag is
* clear.
*/
add_arg += -OP_PENDING;
ngq->last = &(ngq->queue);
/*
* Whatever flag was set will be cleared and
@ -1873,36 +1905,19 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
atomic_add_long(&ngq->q_flags, add_arg);
ng_worklist_remove(ngq->q_node);
} else {
item_p item = ngq->queue;
hook_p hook = NGI_HOOK(item);
node_p node = NGI_NODE(item);
/*
* Since there is something on the queue, note what it is
* in the flags word. Don't forget about overrides, too.
* Since there is still something on the queue
* we don't need to change the PENDING flag.
*/
if ((node->nd_flags & NGF_FORCE_WRITER)
|| (hook && (hook->hk_flags & HK_FORCE_WRITER)))
add_arg += WRITE_PENDING;
else if ((item->el_flags & NGQF_RW) == NGQF_READER)
add_arg += READ_PENDING;
else
add_arg += WRITE_PENDING;
atomic_add_long(&ngq->q_flags, add_arg);
/*
* If we see more doable work, make sure we are
* on the work queue.
*/
if (CAN_GET_WORK(ngq->q_flags)) {
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq)) {
ng_setisr(ngq->q_node);
}
}
/*
* We have successfully cleared the old pending flag, set the new one
* if it is needed, and incremented the appropriate active field.
* (all in one atomic addition.. )
*/
return (item);
}
@ -1910,30 +1925,33 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* Queue a packet to be picked up by someone else.
* We really don't care who, but we can't or don't want to hang around
* to process it ourselves. We are probably an interrupt routine..
* If the queue could be run, flag the netisr handler to start.
*/
static __inline void
ng_queue_rw(struct ng_queue * ngq, item_p item, int rw)
{
mtx_assert(&ngq->q_mtx, MA_OWNED);
if (rw == NGQRW_W)
NGI_SET_WRITER(item);
else
NGI_SET_READER(item);
item->el_next = NULL; /* maybe not needed */
*ngq->last = item;
/*
* If it was the first item in the queue then we need to
* set the last pointer and the type flags.
*/
if (ngq->last == &(ngq->queue)) {
/*
* When called with constants for rw, the optimiser will
* remove the unneeded branch below.
*/
if (rw == NGQRW_W) {
atomic_add_long(&ngq->q_flags, WRITE_PENDING);
} else {
atomic_add_long(&ngq->q_flags, READ_PENDING);
}
}
if (ngq->last == &(ngq->queue))
atomic_add_long(&ngq->q_flags, OP_PENDING);
ngq->last = &(item->el_next);
/*
* We can take the worklist lock with the node locked
* BUT NOT THE REVERSE!
*/
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_setisr(ngq->q_node);
}
@ -1975,6 +1993,8 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
* have removed the last queued item that was stopping us from
* running, between the previous test, and the moment that we took
* the mutex. (Or maybe a writer completed.)
* Even if another fast-track reader hits during this period
* we don't care as multiple readers is OK.
*/
if ((ngq->q_flags & NGQ_RMASK) == 0) {
atomic_add_long(&ngq->q_flags, READER_INCREMENT);
@ -1986,8 +2006,6 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
* and queue the request for later.
*/
ng_queue_rw(ngq, item, NGQRW_R);
if (CAN_GET_WORK(ngq->q_flags))
ng_setisr(ngq->q_node);
mtx_unlock_spin(&(ngq->q_mtx));
return (NULL);
@ -2004,6 +2022,7 @@ ng_acquire_write(struct ng_queue *ngq, item_p item)
* request
*/
if ((ngq->q_flags & NGQ_WMASK) == 0) {
/* collision could happen *HERE* */
atomic_add_long(&ngq->q_flags, WRITER_ACTIVE);
mtx_unlock_spin((&ngq->q_mtx));
if (ngq->q_flags & READER_MASK) {
@ -2018,8 +2037,6 @@ ng_acquire_write(struct ng_queue *ngq, item_p item)
* and queue the request for later.
*/
ng_queue_rw(ngq, item, NGQRW_W);
if (CAN_GET_WORK(ngq->q_flags))
ng_setisr(ngq->q_node);
mtx_unlock_spin(&(ngq->q_mtx));
return (NULL);
@ -2041,32 +2058,18 @@ static void
ng_flush_input_queue(struct ng_queue * ngq)
{
item_p item;
u_int add_arg;
mtx_lock_spin(&ngq->q_mtx);
for (;;) {
/* Now take a look at what's on the queue */
if (ngq->q_flags & READ_PENDING) {
add_arg = -READ_PENDING;
} else if (ngq->q_flags & WRITE_PENDING) {
add_arg = -WRITE_PENDING;
} else {
break;
}
mtx_lock_spin(&ngq->q_mtx);
while (ngq->queue) {
item = ngq->queue;
ngq->queue = item->el_next;
if (ngq->last == &(item->el_next)) {
ngq->last = &(ngq->queue);
} else {
if ((ngq->queue->el_flags & NGQF_RW) == NGQF_READER) {
add_arg += READ_PENDING;
} else {
add_arg += WRITE_PENDING;
}
atomic_add_long(&ngq->q_flags, -OP_PENDING);
}
atomic_add_long(&ngq->q_flags, add_arg);
mtx_unlock_spin(&ngq->q_mtx);
/* If the item is supplying a callback, call it with an error */
if (item->apply != NULL) {
(item->apply)(item->context, ENOENT);
item->apply = NULL;
@ -2201,21 +2204,10 @@ ng_snd_item(item_p item, int flags)
if (queue) {
/* Put it on the queue for that node*/
#ifdef NETGRAPH_DEBUG
_ngi_check(item, __FILE__, __LINE__);
_ngi_check(item, __FILE__, __LINE__);
#endif
mtx_lock_spin(&(ngq->q_mtx));
ng_queue_rw(ngq, item, rw);
/*
* If there are active elements then we can rely on
* them. if not we should not rely on another packet
* coming here by another path,
* so it is best to put us in the netisr list.
* We can take the worklist lock with the node locked
* BUT NOT THE REVERSE!
*/
if (CAN_GET_WORK(ngq->q_flags)) {
ng_setisr(node);
}
mtx_unlock_spin(&(ngq->q_mtx));
if (flags & NG_PROGRESS)
@ -2233,11 +2225,7 @@ ng_snd_item(item_p item, int flags)
else
item = ng_acquire_write(ngq, item);
/*
* May have come back with a different item.
* or maybe none at all. The one we started with will
* have been queued in thises cases.
*/
if (item == NULL) {
if (flags & NG_PROGRESS)
return (EINPROGRESS);
@ -2248,10 +2236,7 @@ ng_snd_item(item_p item, int flags)
#ifdef NETGRAPH_DEBUG
_ngi_check(item, __FILE__, __LINE__);
#endif
/*
* Take over the reference frm the item.
* Hold it until the called function returns.
*/
NGI_GET_NODE(item, node); /* zaps stored node */
error = ng_apply_item(node, item, rw); /* drops r/w lock when done */
@ -2266,7 +2251,7 @@ ng_snd_item(item_p item, int flags)
}
mtx_lock_spin(&(ngq->q_mtx));
if (CAN_GET_WORK(ngq->q_flags))
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_setisr(ngq->q_node);
mtx_unlock_spin(&(ngq->q_mtx));
@ -2294,9 +2279,9 @@ ng_apply_item(node_p node, item_p item, int rw)
#endif
/*
* If item has apply callback, store it. Clear callback
* immediately, two avoid another call in case if
* item would be reused by destination node.
* If the item has an "apply" callback, store it.
* Clear item's callback immediately, to avoid an extra call if
* the item is reused by the destination node.
*/
if (item->apply != NULL) {
apply = item->apply;
@ -3308,7 +3293,7 @@ ng_setisr(node_p node)
* remote node might go away in this timescale.
* We know the hooks can't go away because that would require getting
* a writer item on both nodes and we must have at least a reader
* here to eb able to do this.
* here to be able to do this.
* Note that the hook loaded is the REMOTE hook.
*
* This is possibly in the critical path for new data.