Several changes breaking netgraph module ABI collected together:

- reorder structures fields (XX_refs) a bit to group fields modified
   same time together. According to my tests it gives up to 10%
   SMP performance benefit on real workload due to reduced inter-CPU
   cache trashing.
 - change q_flags from long to int as long is not really needed there and
   it's usage with atomics is argued by some people.
 - move NGF_WORKQ flag into the separate field q_flags2 as it protected by
   queue mutex instead of node writer protection used by the rest of flags.
 - move nd_work queue entry to ng_queue structure to which it is more
   related and make it STAILQ instead of TAILQ as now it is a classic FIFO.
 - remove q_node pointer from ng_queue structure as it is not really needed.
 - reimplement item queue using STAILQ instead of own equal implementation.
   As soon as BT subsystem has own item queues using ng_item.el_next update
   it also.
 - change depth field in ng_item from uintptr_t to u_int. It was made
   uintptr_t to keep ABI compatibility.

Reviewed by:	julian, emax
Tested with:	Netperf cluster
This commit is contained in:
Alexander Motin 2008-04-15 21:15:32 +00:00
parent 8cd892f752
commit 9852972bb5
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=178228
3 changed files with 115 additions and 139 deletions

View File

@ -34,6 +34,8 @@
#ifndef _NETGRAPH_BLUETOOTH_H_
#define _NETGRAPH_BLUETOOTH_H_
#include <sys/queue.h>
/*
* Version of the stack
*/
@ -147,8 +149,7 @@ typedef struct ng_bt_mbufq * ng_bt_mbufq_p;
struct ng_item;
struct ng_bt_itemq {
struct ng_item *head; /* first item in the queue */
struct ng_item *tail; /* last item in the queue */
STAILQ_HEAD(, ng_item) queue; /* actually items queue */
u_int32_t len; /* number of items in the queue */
u_int32_t maxlen; /* maximal number of items in the queue */
u_int32_t drops; /* number if dropped items */
@ -156,14 +157,20 @@ struct ng_bt_itemq {
typedef struct ng_bt_itemq ng_bt_itemq_t;
typedef struct ng_bt_itemq * ng_bt_itemq_p;
#define NG_BT_ITEMQ_INIT(q, _maxlen) NG_BT_MBUFQ_INIT((q), (_maxlen))
#define NG_BT_ITEMQ_INIT(q, _maxlen) \
do { \
STAILQ_INIT(&(q)->queue); \
(q)->len = 0; \
(q)->maxlen = (_maxlen); \
(q)->drops = 0; \
} while (0)
#define NG_BT_ITEMQ_DESTROY(q) \
do { \
NG_BT_ITEMQ_DRAIN((q)); \
} while (0)
#define NG_BT_ITEMQ_FIRST(q) NG_BT_MBUFQ_FIRST((q))
#define NG_BT_ITEMQ_FIRST(q) STAILQ_FIRST(&(q)->queue)
#define NG_BT_ITEMQ_LEN(q) NG_BT_MBUFQ_LEN((q))
@ -173,37 +180,22 @@ typedef struct ng_bt_itemq * ng_bt_itemq_p;
#define NG_BT_ITEMQ_ENQUEUE(q, i) \
do { \
(i)->el_next = NULL; \
\
if ((q)->tail == NULL) \
(q)->head = (i); \
else \
(q)->tail->el_next = (i); \
\
(q)->tail = (i); \
STAILQ_INSERT_TAIL(&(q)->queue, (i), el_next); \
(q)->len ++; \
} while (0)
#define NG_BT_ITEMQ_DEQUEUE(q, i) \
do { \
(i) = (q)->head; \
(i) = STAILQ_FIRST(&(q)->queue); \
if ((i) != NULL) { \
(q)->head = (q)->head->el_next; \
if ((q)->head == NULL) \
(q)->tail = NULL; \
\
STAILQ_REMOVE_HEAD(&(q)->queue, el_next); \
(q)->len --; \
(i)->el_next = NULL; \
} \
} while (0)
#define NG_BT_ITEMQ_PREPEND(q, i) \
do { \
(i)->el_next = (q)->head; \
if ((q)->tail == NULL) \
(q)->tail = (i); \
\
(q)->head = (i); \
STAILQ_INSERT_HEAD(&(q)->queue, (i), el_next); \
(q)->len ++; \
} while (0)

View File

@ -66,7 +66,7 @@
* Change it for NETGRAPH_DEBUG version so we cannot mix debug and non debug
* modules.
*/
#define _NG_ABI_VERSION 11
#define _NG_ABI_VERSION 12
#ifdef NETGRAPH_DEBUG /*----------------------------------------------*/
#define NG_ABI_VERSION (_NG_ABI_VERSION + 0x10000)
#else /* NETGRAPH_DEBUG */ /*----------------------------------------------*/
@ -107,13 +107,13 @@ struct ng_hook {
char hk_name[NG_HOOKSIZ]; /* what this node knows this link as */
void *hk_private; /* node dependant ID for this hook */
int hk_flags; /* info about this hook/link */
int hk_refs; /* dont actually free this till 0 */
int hk_type; /* tbd: hook data link type */
struct ng_hook *hk_peer; /* the other end of this link */
struct ng_node *hk_node; /* The node this hook is attached to */
LIST_ENTRY(ng_hook) hk_hooks; /* linked list of all hooks on node */
ng_rcvmsg_t *hk_rcvmsg; /* control messages come here */
ng_rcvdata_t *hk_rcvdata; /* data comes here */
int hk_refs; /* dont actually free this till 0 */
#ifdef NETGRAPH_DEBUG /*----------------------------------------------*/
#define HK_MAGIC 0x78573011
int hk_magic;
@ -331,26 +331,25 @@ _ng_hook_hi_stack(hook_p hook, char * file, int line)
* embedded in the node structure
*/
struct ng_queue {
u_long q_flags;
u_int q_flags; /* Current r/w/q lock flags */
u_int q_flags2; /* Other queue flags */
struct mtx q_mtx;
item_p queue;
item_p *last;
struct ng_node *q_node; /* find the front of the node.. */
STAILQ_ENTRY(ng_node) q_work; /* nodes with work to do */
STAILQ_HEAD(, ng_item) queue; /* actually items queue */
};
struct ng_node {
char nd_name[NG_NODESIZ]; /* optional globally unique name */
struct ng_type *nd_type; /* the installed 'type' */
int nd_flags; /* see below for bit definitions */
int nd_refs; /* # of references to this node */
int nd_numhooks; /* number of hooks */
void *nd_private; /* node type dependant node ID */
ng_ID_t nd_ID; /* Unique per node */
LIST_HEAD(hooks, ng_hook) nd_hooks; /* linked list of node hooks */
LIST_ENTRY(ng_node) nd_nodes; /* linked list of all nodes */
LIST_ENTRY(ng_node) nd_idnodes; /* ID hash collision list */
TAILQ_ENTRY(ng_node) nd_work; /* nodes with work to do */
struct ng_queue nd_input_queue; /* input queue for locking */
int nd_refs; /* # of references to this node */
#ifdef NETGRAPH_DEBUG /*----------------------------------------------*/
#define ND_MAGIC 0x59264837
int nd_magic;
@ -363,8 +362,6 @@ struct ng_node {
/* Flags for a node */
#define NGF_INVALID 0x00000001 /* free when refs go to 0 */
#define NG_INVALID NGF_INVALID /* compat for old code */
#define NGF_WORKQ 0x00000002 /* node is on the work queue */
#define NG_WORKQ NGF_WORKQ /* compat for old code */
#define NGF_FORCE_WRITER 0x00000004 /* Never multithread this node */
#define NG_FORCE_WRITER NGF_FORCE_WRITER /* compat for old code */
#define NGF_CLOSING 0x00000008 /* ng_rmnode() at work */
@ -612,7 +609,7 @@ struct ng_apply_info {
};
struct ng_item {
u_long el_flags;
item_p el_next;
STAILQ_ENTRY(ng_item) el_next;
node_p el_dest; /* The node it will be applied against (or NULL) */
hook_p el_hook; /* Entering hook. Optional in Control messages */
union {
@ -635,7 +632,7 @@ struct ng_item {
* and its context.
*/
struct ng_apply_info *apply;
uintptr_t depth;
u_int depth;
#ifdef NETGRAPH_DEBUG /*----------------------------------------------*/
char *lastfile;
int lastline;

View File

@ -113,20 +113,19 @@ struct ng_node ng_deadnode = {
"dead",
&ng_deadtype,
NGF_INVALID,
1, /* refs */
0, /* numhooks */
NULL, /* private */
0, /* ID */
LIST_HEAD_INITIALIZER(ng_deadnode.hooks),
{}, /* all_nodes list entry */
{}, /* id hashtable list entry */
{}, /* workqueue entry */
{ 0,
0,
{}, /* should never use! (should hang) */
NULL,
&ng_deadnode.nd_input_queue.queue,
&ng_deadnode
{}, /* workqueue entry */
STAILQ_HEAD_INITIALIZER(ng_deadnode.nd_input_queue.queue),
},
1, /* refs */
#ifdef NETGRAPH_DEBUG
ND_MAGIC,
__FILE__,
@ -139,13 +138,13 @@ struct ng_hook ng_deadhook = {
"dead",
NULL, /* private */
HK_INVALID | HK_DEAD,
1, /* refs always >= 1 */
0, /* undefined data link type */
&ng_deadhook, /* Peer is self */
&ng_deadnode, /* attached to deadnode */
{}, /* hooks list */
NULL, /* override rcvmsg() */
NULL, /* override rcvdata() */
1, /* refs always >= 1 */
#ifdef NETGRAPH_DEBUG
HK_MAGIC,
__FILE__,
@ -158,7 +157,7 @@ struct ng_hook ng_deadhook = {
* END DEAD STRUCTURES
*/
/* List nodes with unallocated work */
static TAILQ_HEAD(, ng_node) ng_worklist = TAILQ_HEAD_INITIALIZER(ng_worklist);
static STAILQ_HEAD(, ng_node) ng_worklist = STAILQ_HEAD_INITIALIZER(ng_worklist);
static struct mtx ng_worklist_mtx; /* MUST LOCK NODE FIRST */
/* List of installed types */
@ -205,7 +204,7 @@ static int ngb_mod_event(module_t mod, int event, void *data);
static void ng_worklist_add(node_p node);
static void ngintr(void);
static int ng_apply_item(node_p node, item_p item, int rw);
static void ng_flush_input_queue(struct ng_queue * ngq);
static void ng_flush_input_queue(node_p node);
static node_p ng_ID2noderef(ng_ID_t ID);
static int ng_con_nodes(item_p item, node_p node, const char *name,
node_p node2, const char *name2);
@ -631,10 +630,8 @@ ng_make_node_common(struct ng_type *type, node_p *nodepp)
type->refs++;
NG_QUEUE_LOCK_INIT(&node->nd_input_queue);
node->nd_input_queue.queue = NULL;
node->nd_input_queue.last = &node->nd_input_queue.queue;
STAILQ_INIT(&node->nd_input_queue.queue);
node->nd_input_queue.q_flags = 0;
node->nd_input_queue.q_node = node;
/* Initialize hook list for new node */
LIST_INIT(&node->nd_hooks);
@ -723,7 +720,7 @@ ng_rmnode(node_p node, hook_p dummy1, void *dummy2, int dummy3)
* Just before the queue was closed, so it should be empty anyway.
* Also removes us from worklist if needed.
*/
ng_flush_input_queue(&node->nd_input_queue);
ng_flush_input_queue(node);
/* Ask the type if it has anything to do in this case */
if (node->nd_type && node->nd_type->shutdown) {
@ -1772,15 +1769,12 @@ ng_path2noderef(node_p here, const char *address,
* read-write queue locking inline functions *
\***************************************************************/
static __inline item_p ng_dequeue(struct ng_queue * ngq, int *rw);
static __inline item_p ng_acquire_read(struct ng_queue * ngq,
item_p item);
static __inline item_p ng_acquire_write(struct ng_queue * ngq,
item_p item);
static __inline void ng_leave_read(struct ng_queue * ngq);
static __inline void ng_leave_write(struct ng_queue * ngq);
static __inline void ng_queue_rw(struct ng_queue * ngq,
item_p item, int rw);
static __inline void ng_queue_rw(node_p node, item_p item, int rw);
static __inline item_p ng_dequeue(node_p node, int *rw);
static __inline item_p ng_acquire_read(node_p node, item_p item);
static __inline item_p ng_acquire_write(node_p node, item_p item);
static __inline void ng_leave_read(node_p node);
static __inline void ng_leave_write(node_p node);
/*
* Definition of the bits fields in the ng_queue flag word.
@ -1836,8 +1830,8 @@ Node queue has such semantics:
#define QUEUE_ACTIVE(QP) ((QP)->q_flags & OP_PENDING)
/* How to decide what the next queued item is. */
#define HEAD_IS_READER(QP) NGI_QUEUED_READER((QP)->queue)
#define HEAD_IS_WRITER(QP) NGI_QUEUED_WRITER((QP)->queue) /* notused */
#define HEAD_IS_READER(QP) NGI_QUEUED_READER(STAILQ_FIRST(&(QP)->queue))
#define HEAD_IS_WRITER(QP) NGI_QUEUED_WRITER(STAILQ_FIRST(&(QP)->queue)) /* notused */
/* Read the status to decide if the next item on the queue can now run. */
#define QUEUED_READER_CAN_PROCEED(QP) \
@ -1850,10 +1844,11 @@ Node queue has such semantics:
((HEAD_IS_READER(QP)) ? QUEUED_READER_CAN_PROCEED(QP) : \
QUEUED_WRITER_CAN_PROCEED(QP))
#define NGQRW_R 0
#define NGQRW_W 1
#define NGQ2_WORKQ 0x00000001
/*
* Taking into account the current state of the queue and node, possibly take
* the next entry off the queue and return it. Return NULL if there was
@ -1862,9 +1857,10 @@ Node queue has such semantics:
* on the queue.
*/
static __inline item_p
ng_dequeue(struct ng_queue *ngq, int *rw)
ng_dequeue(node_p node, int *rw)
{
item_p item;
struct ng_queue *ngq = &node->nd_input_queue;
/* This MUST be called with the mutex held. */
mtx_assert(&ngq->q_mtx, MA_OWNED);
@ -1873,7 +1869,7 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
if (!QUEUE_ACTIVE(ngq)) {
CTR4(KTR_NET, "%20s: node [%x] (%p) queue empty; "
"queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, ngq->q_flags);
node->nd_ID, node, ngq->q_flags);
return (NULL);
}
@ -1889,17 +1885,17 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
/* There is writer, reader can't proceed. */
CTR4(KTR_NET, "%20s: node [%x] (%p) queued reader "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, t);
node->nd_ID, node, t);
return (NULL);
}
if (atomic_cmpset_acq_long(&ngq->q_flags, t,
if (atomic_cmpset_acq_int(&ngq->q_flags, t,
t + READER_INCREMENT))
break;
cpu_spinwait();
}
/* We have got reader lock for the node. */
*rw = NGQRW_R;
} else if (atomic_cmpset_acq_long(&ngq->q_flags, OP_PENDING,
} else if (atomic_cmpset_acq_int(&ngq->q_flags, OP_PENDING,
OP_PENDING + WRITER_ACTIVE)) {
/* We have got writer lock for the node. */
*rw = NGQRW_W;
@ -1907,7 +1903,7 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
/* There is somebody other, writer can't proceed. */
CTR4(KTR_NET, "%20s: node [%x] (%p) queued writer "
"can't proceed; queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, ngq->q_flags);
node->nd_ID, node, ngq->q_flags);
return (NULL);
}
@ -1915,15 +1911,13 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* Now we dequeue the request (whatever it may be) and correct the
* pending flags and the next and last pointers.
*/
item = ngq->queue;
ngq->queue = item->el_next;
if (ngq->last == &(item->el_next)) {
ngq->last = &(ngq->queue);
atomic_clear_long(&ngq->q_flags, OP_PENDING);
}
item = STAILQ_FIRST(&ngq->queue);
STAILQ_REMOVE_HEAD(&ngq->queue, el_next);
if (STAILQ_EMPTY(&ngq->queue))
atomic_clear_int(&ngq->q_flags, OP_PENDING);
CTR6(KTR_NET, "%20s: node [%x] (%p) returning item %p as %s; "
"queue flags 0x%lx", __func__,
ngq->q_node->nd_ID, ngq->q_node, item, *rw ? "WRITER" : "READER" ,
node->nd_ID, node, item, *rw ? "WRITER" : "READER" ,
ngq->q_flags);
return (item);
}
@ -1933,90 +1927,92 @@ ng_dequeue(struct ng_queue *ngq, int *rw)
* If the queue could be run now, add node to the queue handler's worklist.
*/
static __inline void
ng_queue_rw(struct ng_queue * ngq, item_p item, int rw)
ng_queue_rw(node_p node, item_p item, int rw)
{
struct ng_queue *ngq = &node->nd_input_queue;
if (rw == NGQRW_W)
NGI_SET_WRITER(item);
else
NGI_SET_READER(item);
item->el_next = NULL; /* maybe not needed */
NG_QUEUE_LOCK(ngq);
/* Set OP_PENDING flag and enqueue the item. */
atomic_set_long(&ngq->q_flags, OP_PENDING);
*ngq->last = item;
ngq->last = &(item->el_next);
atomic_set_int(&ngq->q_flags, OP_PENDING);
STAILQ_INSERT_TAIL(&ngq->queue, item, el_next);
CTR5(KTR_NET, "%20s: node [%x] (%p) queued item %p as %s", __func__,
ngq->q_node->nd_ID, ngq->q_node, item, rw ? "WRITER" : "READER" );
node->nd_ID, node, item, rw ? "WRITER" : "READER" );
/*
* We can take the worklist lock with the node locked
* BUT NOT THE REVERSE!
*/
if (NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_worklist_add(ngq->q_node);
ng_worklist_add(node);
NG_QUEUE_UNLOCK(ngq);
}
/* Acquire reader lock on node. If node is busy, queue the packet. */
static __inline item_p
ng_acquire_read(struct ng_queue *ngq, item_p item)
ng_acquire_read(node_p node, item_p item)
{
KASSERT(ngq != &ng_deadnode.nd_input_queue,
KASSERT(node != &ng_deadnode,
("%s: working on deadnode", __func__));
/* Reader needs node without writer and pending items. */
while (1) {
long t = ngq->q_flags;
long t = node->nd_input_queue.q_flags;
if (t & NGQ_RMASK)
break; /* Node is not ready for reader. */
if (atomic_cmpset_acq_long(&ngq->q_flags, t, t + READER_INCREMENT)) {
if (atomic_cmpset_acq_int(&node->nd_input_queue.q_flags,
t, t + READER_INCREMENT)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
__func__, node->nd_ID, node, item);
return (item);
}
cpu_spinwait();
};
/* Queue the request for later. */
ng_queue_rw(ngq, item, NGQRW_R);
ng_queue_rw(node, item, NGQRW_R);
return (NULL);
}
/* Acquire writer lock on node. If node is busy, queue the packet. */
static __inline item_p
ng_acquire_write(struct ng_queue *ngq, item_p item)
ng_acquire_write(node_p node, item_p item)
{
KASSERT(ngq != &ng_deadnode.nd_input_queue,
KASSERT(node != &ng_deadnode,
("%s: working on deadnode", __func__));
/* Writer needs completely idle node. */
if (atomic_cmpset_acq_long(&ngq->q_flags, 0, WRITER_ACTIVE)) {
if (atomic_cmpset_acq_int(&node->nd_input_queue.q_flags,
0, WRITER_ACTIVE)) {
/* Successfully grabbed node */
CTR4(KTR_NET, "%20s: node [%x] (%p) acquired item %p",
__func__, ngq->q_node->nd_ID, ngq->q_node, item);
__func__, node->nd_ID, node, item);
return (item);
}
/* Queue the request for later. */
ng_queue_rw(ngq, item, NGQRW_W);
ng_queue_rw(node, item, NGQRW_W);
return (NULL);
}
#if 0
static __inline item_p
ng_upgrade_write(struct ng_queue *ngq, item_p item)
ng_upgrade_write(node_p node, item_p item)
{
KASSERT(ngq != &ng_deadnode.nd_input_queue,
struct ng_queue *ngq = &node->nd_input_queue;
KASSERT(node != &ng_deadnode,
("%s: working on deadnode", __func__));
NGI_SET_WRITER(item);
mtx_lock_spin(&(ngq->q_mtx));
NG_QUEUE_LOCK(ngq);
/*
* There will never be no readers as we are there ourselves.
@ -2028,9 +2024,9 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
* if there are no readers. We don't really care if there are queued
* items as we will bypass them anyhow.
*/
atomic_add_long(&ngq->q_flags, WRITER_ACTIVE - READER_INCREMENT);
if (ngq->q_flags & (NGQ_WMASK & ~OP_PENDING) == WRITER_ACTIVE) {
mtx_unlock_spin(&(ngq->q_mtx));
atomic_add_int(&ngq->q_flags, WRITER_ACTIVE - READER_INCREMENT);
if ((ngq->q_flags & (NGQ_WMASK & ~OP_PENDING)) == WRITER_ACTIVE) {
NG_QUEUE_UNLOCK(ngq);
/* It's just us, act on the item. */
/* will NOT drop writer lock when done */
@ -2040,7 +2036,7 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
* Having acted on the item, atomically
* down grade back to READER and finish up
*/
atomic_add_long(&ngq->q_flags,
atomic_add_int(&ngq->q_flags,
READER_INCREMENT - WRITER_ACTIVE);
/* Our caller will call ng_leave_read() */
@ -2053,62 +2049,53 @@ ng_upgrade_write(struct ng_queue *ngq, item_p item)
* through it once. If there is nothing else waiting,
* set the correct flags.
*/
if ((item->el_next = ngq->queue) == NULL) {
/*
* Set up the "last" pointer.
* We are the only (and thus last) item
*/
ngq->last = &(item->el_next);
if (STAILQ_EMPTY(&ngq->queue)) {
/* We've gone from, 0 to 1 item in the queue */
atomic_set_long(&ngq->q_flags, OP_PENDING);
atomic_set_int(&ngq->q_flags, OP_PENDING);
CTR3(KTR_NET, "%20s: node [%x] (%p) set OP_PENDING", __func__,
ngq->q_node->nd_ID, ngq->q_node);
node->nd_ID, node);
};
ngq->queue = item;
CTR5(KTR_NET, "%20s: node [%x] (%p) requeued item %p as WRITER",
__func__, ngq->q_node->nd_ID, ngq->q_node, item );
STAILQ_INSERT_HEAD(&ngq->queue, item, el_next);
CTR4(KTR_NET, "%20s: node [%x] (%p) requeued item %p as WRITER",
__func__, node->nd_ID, node, item );
/* Reverse what we did above. That downgrades us back to reader */
atomic_add_long(&ngq->q_flags, READER_INCREMENT - WRITER_ACTIVE);
atomic_add_int(&ngq->q_flags, READER_INCREMENT - WRITER_ACTIVE);
if (QUEUE_ACTIVE(ngq) && NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_worklist_add(ngq->q_node);
mtx_unlock_spin(&(ngq->q_mtx));
ng_worklist_add(node);
NG_QUEUE_UNLOCK(ngq);
return;
}
#endif
/* Release reader lock. */
static __inline void
ng_leave_read(struct ng_queue *ngq)
ng_leave_read(node_p node)
{
atomic_subtract_rel_long(&ngq->q_flags, READER_INCREMENT);
atomic_subtract_rel_int(&node->nd_input_queue.q_flags, READER_INCREMENT);
}
/* Release writer lock. */
static __inline void
ng_leave_write(struct ng_queue *ngq)
ng_leave_write(node_p node)
{
atomic_clear_rel_long(&ngq->q_flags, WRITER_ACTIVE);
atomic_clear_rel_int(&node->nd_input_queue.q_flags, WRITER_ACTIVE);
}
/* Purge node queue. Called on node shutdown. */
static void
ng_flush_input_queue(struct ng_queue * ngq)
ng_flush_input_queue(node_p node)
{
struct ng_queue *ngq = &node->nd_input_queue;
item_p item;
NG_QUEUE_LOCK(ngq);
while (ngq->queue) {
item = ngq->queue;
ngq->queue = item->el_next;
if (ngq->last == &(item->el_next)) {
ngq->last = &(ngq->queue);
atomic_clear_long(&ngq->q_flags, OP_PENDING);
}
while ((item = STAILQ_FIRST(&ngq->queue)) != NULL) {
STAILQ_REMOVE_HEAD(&ngq->queue, el_next);
if (STAILQ_EMPTY(&ngq->queue))
atomic_clear_int(&ngq->q_flags, OP_PENDING);
NG_QUEUE_UNLOCK(ngq);
/* If the item is supplying a callback, call it with an error */
@ -2222,11 +2209,10 @@ ng_snd_item(item_p item, int flags)
#endif
}
ngq = &node->nd_input_queue;
if (queue) {
item->depth = 1;
/* Put it on the queue for that node*/
ng_queue_rw(ngq, item, rw);
ng_queue_rw(node, item, rw);
return ((flags & NG_PROGRESS) ? EINPROGRESS : 0);
}
@ -2235,9 +2221,9 @@ ng_snd_item(item_p item, int flags)
* Try get the appropriate operating permission.
*/
if (rw == NGQRW_R)
item = ng_acquire_read(ngq, item);
item = ng_acquire_read(node, item);
else
item = ng_acquire_write(ngq, item);
item = ng_acquire_write(node, item);
/* Item was queued while trying to get permission. */
if (item == NULL)
@ -2249,10 +2235,11 @@ ng_snd_item(item_p item, int flags)
error = ng_apply_item(node, item, rw); /* drops r/w lock when done */
/* If something is waiting on queue and ready, schedule it. */
ngq = &node->nd_input_queue;
if (QUEUE_ACTIVE(ngq)) {
NG_QUEUE_LOCK(ngq);
if (QUEUE_ACTIVE(ngq) && NEXT_QUEUED_ITEM_CAN_PROCEED(ngq))
ng_worklist_add(ngq->q_node);
ng_worklist_add(node);
NG_QUEUE_UNLOCK(ngq);
}
@ -2407,9 +2394,9 @@ ng_apply_item(node_p node, item_p item, int rw)
NG_HOOK_UNREF(hook);
if (rw == NGQRW_R)
ng_leave_read(&node->nd_input_queue);
ng_leave_read(node);
else
ng_leave_write(&node->nd_input_queue);
ng_leave_write(node);
/* Apply callback. */
if (apply != NULL) {
@ -3214,12 +3201,12 @@ ngintr(void)
/* Get node from the worklist. */
NG_WORKLIST_LOCK();
node = TAILQ_FIRST(&ng_worklist);
node = STAILQ_FIRST(&ng_worklist);
if (!node) {
NG_WORKLIST_UNLOCK();
break;
}
TAILQ_REMOVE(&ng_worklist, node, nd_work);
STAILQ_REMOVE_HEAD(&ng_worklist, nd_input_queue.q_work);
NG_WORKLIST_UNLOCK();
CTR3(KTR_NET, "%20s: node [%x] (%p) taken off worklist",
__func__, node->nd_ID, node);
@ -3237,9 +3224,9 @@ ngintr(void)
int rw;
NG_QUEUE_LOCK(&node->nd_input_queue);
item = ng_dequeue(&node->nd_input_queue, &rw);
item = ng_dequeue(node, &rw);
if (item == NULL) {
atomic_clear_int(&node->nd_flags, NGF_WORKQ);
node->nd_input_queue.q_flags2 &= ~NGQ2_WORKQ;
NG_QUEUE_UNLOCK(&node->nd_input_queue);
break; /* go look for another node */
} else {
@ -3264,15 +3251,15 @@ ng_worklist_add(node_p node)
mtx_assert(&node->nd_input_queue.q_mtx, MA_OWNED);
if ((node->nd_flags & NGF_WORKQ) == 0) {
if ((node->nd_input_queue.q_flags2 & NGQ2_WORKQ) == 0) {
/*
* If we are not already on the work queue,
* then put us on.
*/
atomic_set_int(&node->nd_flags, NGF_WORKQ);
node->nd_input_queue.q_flags2 |= NGQ2_WORKQ;
NG_NODE_REF(node); /* XXX fafe in mutex? */
NG_WORKLIST_LOCK();
TAILQ_INSERT_TAIL(&ng_worklist, node, nd_work);
STAILQ_INSERT_TAIL(&ng_worklist, node, nd_input_queue.q_work);
NG_WORKLIST_UNLOCK();
schednetisr(NETISR_NETGRAPH);
CTR3(KTR_NET, "%20s: node [%x] (%p) put on worklist", __func__,