Rewrite node's r/w/q-lock semantics using only atomics instead of mutex

and atomics combination. Mutex is now used only for queue protection.
Also avoid unneded extra swi scheduling calls.
This commit is contained in:
Alexander Motin 2008-04-06 15:26:32 +00:00
parent 0e7cce1381
commit 394cb30a36
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=177953

View File

@ -61,6 +61,7 @@
#include <sys/syslog.h>
#include <sys/refcount.h>
#include <sys/proc.h>
#include <machine/cpu.h>
#include <net/netisr.h>
@ -201,11 +202,10 @@ static int ng_add_hook(node_p node, const char *name, hook_p * hookp);
static int ng_generic_msg(node_p here, item_p item, hook_p lasthook);
static ng_ID_t ng_decodeidname(const char *name);
static int ngb_mod_event(module_t mod, int event, void *data);
static void ng_worklist_remove(node_p node);
static void ng_worklist_add(node_p node);
static void ngintr(void);
static int ng_apply_item(node_p node, item_p item, int rw);
static void ng_flush_input_queue(struct ng_queue * ngq);
static void ng_setisr(node_p node);
static node_p ng_ID2noderef(ng_ID_t ID);
static int ng_con_nodes(item_p item, node_p node, const char *name,
node_p node2, const char *name2);
@ -1836,9 +1836,8 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
/* Is there a chance of getting ANY work off the queue? */
#define NEXT_QUEUED_ITEM_CAN_PROCEED(QP) \
(QUEUE_ACTIVE(QP) && \
((HEAD_IS_READER(QP)) ? QUEUED_READER_CAN_PROCEED(QP) : \
QUEUED_WRITER_CAN_PROCEED(QP)))
QUEUED_WRITER_CAN_PROCEED(QP))
#define NGQRW_R 0
@ -1857,13 +1856,11 @@ static __inline item_p
ng_dequeue(struct ng_queue *ngq, int *rw)
{
item_p item;
u_int add_arg;
mtx_assert(&ngq->q_mtx, MA_OWNED);
/*
* If there is nothing queued, then just return.
* No point in continuing.
* XXXGL: assert this?
*/
if (!QUEUE_ACTIVE(ngq)) {
CTR4(KTR_NET, "%20s: node [%x] (%p) queue empty; "
@ -1878,84 +1875,27 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* the current state of the node.
*/
if (HEAD_IS_READER(ngq)) {
if (!QUEUED_READER_CAN_PROCEED(ngq)) {
/*
* It's a reader but we can't use it.
* We are stalled so make sure we don't
* get called again until something changes.
*/
ng_worklist_remove(ngq->q_node);
CTR4(KTR_NET, "%20s: node [%x] (%p) queued reader "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, ngq->q_flags);
return (NULL);
while (1) {
long t = ngq->q_flags;
if (t & WRITER_ACTIVE) {
/* It's a reader but we can't use it. */
CTR4(KTR_NET, "%20s: node [%x] (%p) queued reader "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, t);
return (NULL);
}
if (atomic_cmpset_long(&ngq->q_flags, t,
t + READER_INCREMENT))
break;
cpu_spinwait();
}
/*
* Head of queue is a reader and we have no write active.
* We don't care how many readers are already active.
* Add the correct increment for the reader count.
*/
add_arg = READER_INCREMENT;
*rw = NGQRW_R;
} else if (QUEUED_WRITER_CAN_PROCEED(ngq)) {
/*
* There is a pending write, no readers and no active writer.
* This means we can go ahead with the pending writer. Note
* the fact that we now have a writer, ready for when we take
* it off the queue.
*
* We don't need to worry about a possible collision with the
* fasttrack reader.
*
* The fasttrack thread may take a long time to discover that we
* are running so we would have an inconsistent state in the
* flags for a while. Since we ignore the reader count
* entirely when the WRITER_ACTIVE flag is set, this should
* not matter (in fact it is defined that way). If it tests
* the flag before this operation, the OP_PENDING flag
* will make it fail, and if it tests it later, the
* WRITER_ACTIVE flag will do the same. If it is SO slow that
* we have actually completed the operation, and neither flag
* is set by the time that it tests the flags, then it is
* actually ok for it to continue. If it completes and we've
* finished and the read pending is set it still fails.
*
* So we can just ignore it, as long as we can ensure that the
* transition from WRITE_PENDING state to the WRITER_ACTIVE
* state is atomic.
*
* After failing, first it will be held back by the mutex, then
* when it can proceed, it will queue its request, then it
* would arrive at this function. Usually it will have to
* leave empty handed because the ACTIVE WRITER bit will be
* set.
*
* Adjust the flags for the new active writer.
*/
add_arg = WRITER_ACTIVE;
} else if (atomic_cmpset_long(&ngq->q_flags, OP_PENDING,
OP_PENDING + WRITER_ACTIVE)) {
*rw = NGQRW_W;
/*
* We want to write "active writer, no readers " Now go make
* it true. In fact there may be a number in the readers
* count but we know it is not true and will be fixed soon.
* We will fix the flags for the next pending entry in a
* moment.
*/
} else {
/*
* We can't dequeue anything.. return and say so. Probably we
* have a write pending and the readers count is non zero. If
* we got here because a reader hit us just at the wrong
* moment with the fasttrack code, and put us in a strange
* state, then it will be coming through in just a moment,
* (just as soon as we release the mutex) and keep things
* moving.
* Make sure we remove ourselves from the work queue. It
* would be a waste of effort to do all this again.
*/
ng_worklist_remove(ngq->q_node);
CTR4(KTR_NET, "%20s: node [%x] (%p) can't dequeue anything; "
"queue flags 0x%lx", __func__,
CTR4(KTR_NET, "%20s: node [%x] (%p) queued writer "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, ngq->q_flags);
return (NULL);
}
@ -1966,31 +1906,9 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
*/
item = ngq->queue;
ngq->queue = item->el_next;
CTR6(KTR_NET, "%20s: node [%x] (%p) dequeued item %p with flags 0x%lx; "
"queue flags 0x%lx", __func__,
ngq->q_node->nd_ID,ngq->q_node, item, item->el_flags, ngq->q_flags);
if (ngq->last == &(item->el_next)) {
/*
* that was the last entry in the queue so set the 'last
* pointer up correctly and make sure the pending flag is
* clear.
*/
add_arg += -OP_PENDING;
ngq->last = &(ngq->queue);
/*
* Whatever flag was set will be cleared and
* the new acive field will be set by the add as well,
* so we don't need to change add_arg.
* But we know we don't need to be on the work list.
*/
atomic_add_long(&ngq->q_flags, add_arg);
ng_worklist_remove(ngq->q_node);
} else {
/*
* Since there is still something on the queue
* we don't need to change the PENDING flag.
*/
atomic_add_long(&ngq->q_flags, add_arg);
atomic_clear_long(&ngq->q_flags, OP_PENDING);
}
CTR6(KTR_NET, "%20s: node [%x] (%p) returning item %p as %s; "
"queue flags 0x%lx", __func__,
@ -2008,94 +1926,52 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
static __inline void
ng_queue_rw(struct ng_queue * ngq, item_p item, int rw)
{
mtx_assert(&ngq->q_mtx, MA_OWNED);
if (rw == NGQRW_W)
NGI_SET_WRITER(item);
else
NGI_SET_READER(item);
item->el_next = NULL; /* maybe not needed */
NG_QUEUE_LOCK(ngq);
/* Set OP_PENDING flag and enqueue the item. */
atomic_set_long(&ngq->q_flags, OP_PENDING);
*ngq->last = item;
ngq->last = &(item->el_next);
CTR5(KTR_NET, "%20s: node [%x] (%p) queued item %p as %s", __func__,
ngq->q_node->nd_ID, ngq->q_node, item, rw ? "WRITER" : "READER" );
/*
* If it was the first item in the queue then we need to
* set the last pointer and the type flags.
*/
if (ngq->last == &(ngq->queue)) {
atomic_add_long(&ngq->q_flags, OP_PENDING);
CTR3(KTR_NET, "%20s: node [%x] (%p) set OP_PENDING", __func__,
ngq->q_node->nd_ID, ngq->q_node);
}
ngq->last = &(item->el_next);
/*
* We can take the worklist lock with the node locked
* BUT NOT THE REVERSE!
*/
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_setisr(ngq->q_node);
ng_worklist_add(ngq->q_node);
NG_QUEUE_UNLOCK(ngq);
}
/*
* This function 'cheats' in that it first tries to 'grab' the use of the
* node, without going through the mutex. We can do this becasue of the
* semantics of the lock. The semantics include a clause that says that the
* value of the readers count is invalid if the WRITER_ACTIVE flag is set. It
* also says that the WRITER_ACTIVE flag cannot be set if the readers count
* is not zero. Note that this talks about what is valid to SET the
* WRITER_ACTIVE flag, because from the moment it is set, the value if the
* reader count is immaterial, and not valid. The two 'pending' flags have a
* similar effect, in that If they are orthogonal to the two active fields in
* how they are set, but if either is set, the attempted 'grab' need to be
* backed out because there is earlier work, and we maintain ordering in the
* queue. The result of this is that the reader request can try obtain use of
* the node with only a single atomic addition, and without any of the mutex
* overhead. If this fails the operation degenerates to the same as for other
* cases.
*
*/
static __inline item_p
ng_acquire_read(struct ng_queue *ngq, item_p item)
{
KASSERT(ngq != &ng_deadnode.nd_input_queue,
("%s: working on deadnode", __func__));
/* ######### Hack alert ######### */
atomic_add_long(&ngq->q_flags, READER_INCREMENT);
if ((ngq->q_flags & NGQ_RMASK) == 0) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) fast acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
return (item);
}
/* undo the damage if we didn't succeed */
atomic_subtract_long(&ngq->q_flags, READER_INCREMENT);
/* Reader needs node without writer and pending items. */
while (1) {
long t = ngq->q_flags;
if (t & NGQ_RMASK)
break; /* Node is not ready for reader. */
if (atomic_cmpset_long(&ngq->q_flags, t, t + READER_INCREMENT)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
return (item);
}
cpu_spinwait();
};
/* ######### End Hack alert ######### */
NG_QUEUE_LOCK(ngq);
/*
* Try again. Another processor (or interrupt for that matter) may
* have removed the last queued item that was stopping us from
* running, between the previous test, and the moment that we took
* the mutex. (Or maybe a writer completed.)
* Even if another fast-track reader hits during this period
* we don't care as multiple readers is OK.
*/
if ((ngq->q_flags & NGQ_RMASK) == 0) {
atomic_add_long(&ngq->q_flags, READER_INCREMENT);
NG_QUEUE_UNLOCK(ngq);
CTR4(KTR_NET, "%20s: node [%x] (%p) slow acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
return (item);
}
/*
* and queue the request for later.
*/
/* Queue the request for later. */
ng_queue_rw(ngq, item, NGQRW_R);
NG_QUEUE_UNLOCK(ngq);
return (NULL);
}
@ -2106,32 +1982,16 @@ ng_acquire_write(struct ng_queue *ngq, item_p item)
KASSERT(ngq != &ng_deadnode.nd_input_queue,
("%s: working on deadnode", __func__));
restart:
NG_QUEUE_LOCK(ngq);
/*
* If there are no readers, no writer, and no pending packets, then
* we can just go ahead. In all other situations we need to queue the
* request
*/
if ((ngq->q_flags & NGQ_WMASK) == 0) {
/* collision could happen *HERE* */
atomic_add_long(&ngq->q_flags, WRITER_ACTIVE);
NG_QUEUE_UNLOCK(ngq);
if (ngq->q_flags & READER_MASK) {
/* Collision with fast-track reader */
atomic_subtract_long(&ngq->q_flags, WRITER_ACTIVE);
goto restart;
}
/* Writer needs completely idle node. */
if (atomic_cmpset_long(&ngq->q_flags, 0, WRITER_ACTIVE)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
return (item);
}
/*
* and queue the request for later.
*/
/* Queue the request for later. */
ng_queue_rw(ngq, item, NGQRW_W);
NG_QUEUE_UNLOCK(ngq);
return (NULL);
}
@ -2190,7 +2050,7 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
ngq->last = &(item->el_next);
/* We've gone from, 0 to 1 item in the queue */
atomic_add_long(&ngq->q_flags, OP_PENDING);
atomic_set_long(&ngq->q_flags, OP_PENDING);
CTR3(KTR_NET, "%20s: node [%x] (%p) set OP_PENDING", __func__,
ngq->q_node->nd_ID, ngq->q_node);
@ -2201,8 +2061,8 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
/* Reverse what we did above. That downgrades us back to reader */
atomic_add_long(&ngq->q_flags, READER_INCREMENT - WRITER_ACTIVE);
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_setisr(ngq->q_node);
if (QUEUE_ACTIVE(ngq) && NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_worklist_add(ngq->q_node);
mtx_unlock_spin(&(ngq->q_mtx));
return;
@ -2219,7 +2079,7 @@ ng_leave_read(struct ng_queue *ngq)
static __inline void
ng_leave_write(struct ng_queue *ngq)
{
atomic_subtract_long(&ngq->q_flags, WRITER_ACTIVE);
atomic_clear_long(&ngq->q_flags, WRITER_ACTIVE);
}
static void
@ -2233,7 +2093,7 @@ ng_flush_input_queue(struct ng_queue * ngq)
ngq->queue = item->el_next;
if (ngq->last == &(item->el_next)) {
ngq->last = &(ngq->queue);
atomic_add_long(&ngq->q_flags, -OP_PENDING);
atomic_clear_long(&ngq->q_flags, OP_PENDING);
}
NG_QUEUE_UNLOCK(ngq);
@ -2249,11 +2109,6 @@ ng_flush_input_queue(struct ng_queue * ngq)
NG_FREE_ITEM(item);
NG_QUEUE_LOCK(ngq);
}
/*
* Take us off the work queue if we are there.
* We definately have no work to be done.
*/
ng_worklist_remove(ngq->q_node);
NG_QUEUE_UNLOCK(ngq);
}
@ -2355,15 +2210,9 @@ ng_snd_item(item_p item, int flags)
ngq = &node->nd_input_queue;
if (queue) {
/* Put it on the queue for that node*/
#ifdef NETGRAPH_DEBUG
_ngi_check(item, __FILE__, __LINE__);
#endif
item->depth = 1;
NG_QUEUE_LOCK(ngq);
/* Put it on the queue for that node*/
ng_queue_rw(ngq, item, rw);
NG_QUEUE_UNLOCK(ngq);
return ((flags & NG_PROGRESS) ? EINPROGRESS : 0);
}
@ -2376,32 +2225,28 @@ ng_snd_item(item_p item, int flags)
else
item = ng_acquire_write(ngq, item);
/* Item was queued while trying to get permission. */
if (item == NULL)
return ((flags & NG_PROGRESS) ? EINPROGRESS : 0);
#ifdef NETGRAPH_DEBUG
_ngi_check(item, __FILE__, __LINE__);
#endif
NGI_GET_NODE(item, node); /* zaps stored node */
item->depth++;
error = ng_apply_item(node, item, rw); /* drops r/w lock when done */
/*
* If the node goes away when we remove the reference,
* whatever we just did caused it.. whatever we do, DO NOT
* access the node again!
*/
if (NG_NODE_UNREF(node) == 0)
return (error);
/* If something is waiting on queue and ready, schedule it. */
if (QUEUE_ACTIVE(ngq)) {
NG_QUEUE_LOCK(ngq);
if (QUEUE_ACTIVE(ngq) && NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_worklist_add(ngq->q_node);
NG_QUEUE_UNLOCK(ngq);
}
NG_QUEUE_LOCK(ngq);
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_setisr(ngq->q_node);
NG_QUEUE_UNLOCK(ngq);
/*
* Node may go away as soon as we remove the reference.
* Whatever we do, DO NOT access the node again!
*/
NG_NODE_UNREF(node);
return (error);
@ -3350,17 +3195,16 @@ SYSCTL_PROC(_debug, OID_AUTO, ng_dump_items, CTLTYPE_INT | CTLFLAG_RW,
static void
ngintr(void)
{
item_p item;
node_p node = NULL;
for (;;) {
node_p node;
/* Get node from the worklist. */
NG_WORKLIST_LOCK();
node = TAILQ_FIRST(&ng_worklist);
if (!node) {
NG_WORKLIST_UNLOCK();
break;
}
node->nd_flags &= ~NGF_WORKQ;
TAILQ_REMOVE(&ng_worklist, node, nd_work);
NG_WORKLIST_UNLOCK();
CTR3(KTR_NET, "%20s: node [%x] (%p) taken off worklist",
@ -3375,11 +3219,13 @@ ngintr(void)
* Let the reference go at the last minute.
*/
for (;;) {
item_p item;
int rw;
NG_QUEUE_LOCK(&node->nd_input_queue);
item = ng_dequeue(&node->nd_input_queue, &rw);
if (item == NULL) {
atomic_clear_int(&node->nd_flags, NGF_WORKQ);
NG_QUEUE_UNLOCK(&node->nd_input_queue);
break; /* go look for another node */
} else {
@ -3393,31 +3239,13 @@ ngintr(void)
}
}
static void
ng_worklist_remove(node_p node)
{
mtx_assert(&node->nd_input_queue.q_mtx, MA_OWNED);
NG_WORKLIST_LOCK();
if (node->nd_flags & NGF_WORKQ) {
node->nd_flags &= ~NGF_WORKQ;
TAILQ_REMOVE(&ng_worklist, node, nd_work);
NG_WORKLIST_UNLOCK();
NG_NODE_UNREF(node);
CTR3(KTR_NET, "%20s: node [%x] (%p) removed from worklist",
__func__, node->nd_ID, node);
} else {
NG_WORKLIST_UNLOCK();
}
}
/*
* XXX
* It's posible that a debugging NG_NODE_REF may need
* to be outside the mutex zone
*/
static void
ng_setisr(node_p node)
ng_worklist_add(node_p node)
{
mtx_assert(&node->nd_input_queue.q_mtx, MA_OWNED);
@ -3427,17 +3255,18 @@ ng_setisr(node_p node)
* If we are not already on the work queue,
* then put us on.
*/
node->nd_flags |= NGF_WORKQ;
atomic_set_int(&node->nd_flags, NGF_WORKQ);
NG_NODE_REF(node); /* XXX fafe in mutex? */
NG_WORKLIST_LOCK();
TAILQ_INSERT_TAIL(&ng_worklist, node, nd_work);
NG_WORKLIST_UNLOCK();
NG_NODE_REF(node); /* XXX fafe in mutex? */
schednetisr(NETISR_NETGRAPH);
CTR3(KTR_NET, "%20s: node [%x] (%p) put on worklist", __func__,
node->nd_ID, node);
} else
} else {
CTR3(KTR_NET, "%20s: node [%x] (%p) already on worklist",
__func__, node->nd_ID, node);
schednetisr(NETISR_NETGRAPH);
}
}