Add memory barriers to the node locking operations.

Add some comments.
This commit is contained in:
Alexander Motin 2008-04-09 19:03:19 +00:00
parent c5876e1889
commit 8f9ac44aa7

View File

@ -1806,8 +1806,19 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
|
Active Writer ---------------------------------------+
Node queue has such semantics:
- All flags modifications are atomic.
- Reader count can be incremented only if there is no writer or pending flags.
As soon as this can't be done with single operation, it is implemented with
spin loop and atomic_cmpset().
- Writer flag can be set only if there is no any bits set.
It is implemented with atomic_cmpset().
- Pending flag can be set any time, but to avoid collision on queue processing
all queue fields are protected by the mutex.
- Queue processing thread reads queue holding the mutex, but releases it while
processing. When queue is empty pending flag is removed.
*/
#define WRITER_ACTIVE 0x00000001
#define OP_PENDING 0x00000002
#define READER_INCREMENT 0x00000004
@ -1849,19 +1860,16 @@ static __inline void ng_queue_rw(struct ng_queue * ngq,
* nothing we could return, either because there really was nothing there, or
* because the node was in a state where it cannot yet process the next item
* on the queue.
*
* This MUST MUST MUST be called with the mutex held.
*/
static __inline item_p
ng_dequeue(struct ng_queue *ngq, int *rw)
{
item_p item;
/* This MUST be called with the mutex held. */
mtx_assert(&ngq->q_mtx, MA_OWNED);
/*
* If there is nothing queued, then just return.
* No point in continuing.
*/
/* If there is nothing queued, then just return. */
if (!QUEUE_ACTIVE(ngq)) {
CTR4(KTR_NET, "%20s: node [%x] (%p) queue empty; "
"queue flags 0x%lx", __func__,
@ -1878,22 +1886,25 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
while (1) {
long t = ngq->q_flags;
if (t & WRITER_ACTIVE) {
/* It's a reader but we can't use it. */
/* There is writer, reader can't proceed. */
CTR4(KTR_NET, "%20s: node [%x] (%p) queued reader "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, t);
return (NULL);
}
if (atomic_cmpset_long(&ngq->q_flags, t,
if (atomic_cmpset_acq_long(&ngq->q_flags, t,
t + READER_INCREMENT))
break;
cpu_spinwait();
}
/* We have got reader lock for the node. */
*rw = NGQRW_R;
} else if (atomic_cmpset_long(&ngq->q_flags, OP_PENDING,
} else if (atomic_cmpset_acq_long(&ngq->q_flags, OP_PENDING,
OP_PENDING + WRITER_ACTIVE)) {
/* We have got writer lock for the node. */
*rw = NGQRW_W;
} else {
/* There is somebody other, writer can't proceed. */
CTR4(KTR_NET, "%20s: node [%x] (%p) queued writer "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, ngq->q_flags);
@ -1918,10 +1929,8 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
}
/*
* Queue a packet to be picked up by someone else.
* We really don't care who, but we can't or don't want to hang around
* to process it ourselves. We are probably an interrupt routine..
* If the queue could be run, flag the netisr handler to start.
* Queue a packet to be picked up later by someone else.
* If the queue could be run now, add node to the queue handler's worklist.
*/
static __inline void
ng_queue_rw(struct ng_queue * ngq, item_p item, int rw)
@ -1950,6 +1959,7 @@ ng_queue_rw(struct ng_queue * ngq, item_p item, int rw)
NG_QUEUE_UNLOCK(ngq);
}
/* Acquire reader lock on node. If node is busy, queue the packet. */
static __inline item_p
ng_acquire_read(struct ng_queue *ngq, item_p item)
{
@ -1961,7 +1971,7 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
long t = ngq->q_flags;
if (t & NGQ_RMASK)
break; /* Node is not ready for reader. */
if (atomic_cmpset_long(&ngq->q_flags, t, t + READER_INCREMENT)) {
if (atomic_cmpset_acq_long(&ngq->q_flags, t, t + READER_INCREMENT)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
@ -1976,6 +1986,7 @@ ng_acquire_read(struct ng_queue *ngq, item_p item)
return (NULL);
}
/* Acquire writer lock on node. If node is busy, queue the packet. */
static __inline item_p
ng_acquire_write(struct ng_queue *ngq, item_p item)
{
@ -1983,7 +1994,7 @@ ng_acquire_write(struct ng_queue *ngq, item_p item)
("%s: working on deadnode", __func__));
/* Writer needs completely idle node. */
if (atomic_cmpset_long(&ngq->q_flags, 0, WRITER_ACTIVE)) {
if (atomic_cmpset_acq_long(&ngq->q_flags, 0, WRITER_ACTIVE)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
@ -2070,18 +2081,21 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
#endif
/* Release reader lock. */
static __inline void
ng_leave_read(struct ng_queue *ngq)
{
atomic_subtract_long(&ngq->q_flags, READER_INCREMENT);
atomic_subtract_rel_long(&ngq->q_flags, READER_INCREMENT);
}
/* Release writer lock. */
static __inline void
ng_leave_write(struct ng_queue *ngq)
{
atomic_clear_long(&ngq->q_flags, WRITER_ACTIVE);
atomic_clear_rel_long(&ngq->q_flags, WRITER_ACTIVE);
}
/* Purge node queue. Called on node shutdown. */
static void
ng_flush_input_queue(struct ng_queue * ngq)
{