Simplify ng_pipe locking model by relying on the netgraph framework
to provide serialization of calls into the node, which is accomplished by markng the node as single-threaded (NGF_FORCE_WRITER). The price we pay is that each ng_pipe instance now has its own callout handler which polls for queued frames on each clock tick, as long as the pipe has any frames in its internal queues. OTOH, we got rid of the global ng_pipe mutex, so from now on multiple ng_pipe instances can operate in parallel. This change also fixes counting of forwarded frames when an ng_pipe node is not enforcing any packet impairments. While here, attempt to improve adherance to style(9) throughout otherwise mostly unreadable code. MFC after: 3 days
This commit is contained in:
parent
24b7ca3412
commit
57ce8ebf8c
@ -1,5 +1,5 @@
|
||||
/*-
|
||||
* Copyright (c) 2004-2008 University of Zagreb
|
||||
* Copyright (c) 2004-2010 University of Zagreb
|
||||
* Copyright (c) 2007-2008 FreeBSD Foundation
|
||||
*
|
||||
* This software was developed by the University of Zagreb and the
|
||||
@ -62,8 +62,6 @@
|
||||
|
||||
static MALLOC_DEFINE(M_NG_PIPE, "ng_pipe", "ng_pipe");
|
||||
|
||||
struct mtx ng_pipe_giant;
|
||||
|
||||
/* Packet header struct */
|
||||
struct ngp_hdr {
|
||||
TAILQ_ENTRY(ngp_hdr) ngp_link; /* next pkt in queue */
|
||||
@ -88,7 +86,6 @@ struct hookinfo {
|
||||
int noqueue; /* bypass any processing */
|
||||
TAILQ_HEAD(, ngp_fifo) fifo_head; /* FIFO queues */
|
||||
TAILQ_HEAD(, ngp_hdr) qout_head; /* delay queue head */
|
||||
LIST_ENTRY(hookinfo) active_le; /* active hooks */
|
||||
struct timeval qin_utime;
|
||||
struct ng_pipe_hookcfg cfg;
|
||||
struct ng_pipe_hookrun run;
|
||||
@ -103,6 +100,8 @@ struct node_priv {
|
||||
u_int32_t header_offset;
|
||||
struct hookinfo lower;
|
||||
struct hookinfo upper;
|
||||
struct callout timer;
|
||||
int timer_scheduled;
|
||||
};
|
||||
typedef struct node_priv *priv_p;
|
||||
|
||||
@ -131,17 +130,9 @@ typedef struct node_priv *priv_p;
|
||||
static void parse_cfg(struct ng_pipe_hookcfg *, struct ng_pipe_hookcfg *,
|
||||
struct hookinfo *, priv_p);
|
||||
static void pipe_dequeue(struct hookinfo *, struct timeval *);
|
||||
static void pipe_scheduler(void *);
|
||||
static void pipe_poll(void);
|
||||
static void ngp_callout(node_p, hook_p, void *, int);
|
||||
static int ngp_modevent(module_t, int, void *);
|
||||
|
||||
/* linked list of active "pipe" hooks */
|
||||
static LIST_HEAD(, hookinfo) active_head;
|
||||
static int active_gen_id = 0;
|
||||
|
||||
/* timeout handle for pipe_scheduler */
|
||||
static struct callout polling_timer;
|
||||
|
||||
/* zone for storing ngp_hdr-s */
|
||||
static uma_zone_t ngp_zone;
|
||||
|
||||
@ -267,6 +258,11 @@ ngp_constructor(node_p node)
|
||||
return (ENOMEM);
|
||||
NG_NODE_SET_PRIVATE(node, priv);
|
||||
|
||||
/* Mark node as single-threaded */
|
||||
NG_NODE_FORCE_WRITER(node);
|
||||
|
||||
ng_callout_init(&priv->timer);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
@ -310,8 +306,6 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
struct ng_pipe_cfg *cfg;
|
||||
int error = 0;
|
||||
|
||||
mtx_lock(&ng_pipe_giant);
|
||||
|
||||
NGI_GET_MSG(item, msg);
|
||||
switch (msg->header.typecookie) {
|
||||
case NGM_PIPE_COOKIE:
|
||||
@ -326,7 +320,7 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
error = ENOMEM;
|
||||
break;
|
||||
}
|
||||
stats = (struct ng_pipe_stats *)resp->data;
|
||||
stats = (struct ng_pipe_stats *) resp->data;
|
||||
bcopy(&priv->upper.stats, &stats->downstream,
|
||||
sizeof(stats->downstream));
|
||||
bcopy(&priv->lower.stats, &stats->upstream,
|
||||
@ -345,7 +339,7 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
error = ENOMEM;
|
||||
break;
|
||||
}
|
||||
run = (struct ng_pipe_run *)resp->data;
|
||||
run = (struct ng_pipe_run *) resp->data;
|
||||
bcopy(&priv->upper.run, &run->downstream,
|
||||
sizeof(run->downstream));
|
||||
bcopy(&priv->lower.run, &run->upstream,
|
||||
@ -357,7 +351,7 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
error = ENOMEM;
|
||||
break;
|
||||
}
|
||||
cfg = (struct ng_pipe_cfg *)resp->data;
|
||||
cfg = (struct ng_pipe_cfg *) resp->data;
|
||||
bcopy(&priv->upper.cfg, &cfg->downstream,
|
||||
sizeof(cfg->downstream));
|
||||
bcopy(&priv->lower.cfg, &cfg->upstream,
|
||||
@ -374,7 +368,7 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
cfg->bandwidth = 0;
|
||||
break;
|
||||
case NGM_PIPE_SET_CFG:
|
||||
cfg = (struct ng_pipe_cfg *)msg->data;
|
||||
cfg = (struct ng_pipe_cfg *) msg->data;
|
||||
if (msg->header.arglen != sizeof(*cfg)) {
|
||||
error = EINVAL;
|
||||
break;
|
||||
@ -401,7 +395,8 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
|
||||
if (cfg->overhead == -1)
|
||||
priv->overhead = 0;
|
||||
else if (cfg->overhead > 0 && cfg->overhead < 256)
|
||||
else if (cfg->overhead > 0 &&
|
||||
cfg->overhead < MAX_OHSIZE)
|
||||
priv->overhead = cfg->overhead;
|
||||
|
||||
if (cfg->header_offset == -1)
|
||||
@ -411,9 +406,9 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
priv->header_offset = cfg->header_offset;
|
||||
|
||||
parse_cfg(&priv->upper.cfg, &cfg->downstream,
|
||||
&priv->upper, priv);
|
||||
&priv->upper, priv);
|
||||
parse_cfg(&priv->lower.cfg, &cfg->upstream,
|
||||
&priv->lower, priv);
|
||||
&priv->lower, priv);
|
||||
break;
|
||||
default:
|
||||
error = EINVAL;
|
||||
@ -427,8 +422,6 @@ ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
|
||||
NG_RESPOND_MSG(error, node, item, resp);
|
||||
NG_FREE_MSG(msg);
|
||||
|
||||
mtx_unlock(&ng_pipe_giant);
|
||||
|
||||
return (error);
|
||||
}
|
||||
|
||||
@ -449,9 +442,9 @@ parse_cfg(struct ng_pipe_hookcfg *current, struct ng_pipe_hookcfg *new,
|
||||
uint32_t fsize, i;
|
||||
|
||||
if (hinfo->ber_p == NULL)
|
||||
hinfo->ber_p = malloc(\
|
||||
(MAX_FSIZE + MAX_OHSIZE)*sizeof(uint64_t), \
|
||||
M_NG_PIPE, M_NOWAIT);
|
||||
hinfo->ber_p =
|
||||
malloc((MAX_FSIZE + MAX_OHSIZE) * sizeof(uint64_t),
|
||||
M_NG_PIPE, M_NOWAIT);
|
||||
current->ber = new->ber;
|
||||
|
||||
/*
|
||||
@ -467,10 +460,10 @@ parse_cfg(struct ng_pipe_hookcfg *current, struct ng_pipe_hookcfg *new,
|
||||
p = one;
|
||||
for (fsize = 0; fsize < MAX_FSIZE + MAX_OHSIZE; fsize++) {
|
||||
hinfo->ber_p[fsize] = p;
|
||||
for (i=0; i<8; i++)
|
||||
p = (p*(p0&0xffff)>>48) + \
|
||||
(p*((p0>>16)&0xffff)>>32) + \
|
||||
(p*(p0>>32)>>16);
|
||||
for (i = 0; i < 8; i++)
|
||||
p = (p * (p0 & 0xffff) >> 48) +
|
||||
(p * ((p0 >> 16) & 0xffff) >> 32) +
|
||||
(p * (p0 >> 32) >> 16);
|
||||
}
|
||||
}
|
||||
|
||||
@ -575,25 +568,42 @@ ngp_rcvdata(hook_p hook, item_p item)
|
||||
struct ngp_fifo *ngp_f = NULL, *ngp_f1;
|
||||
struct ngp_hdr *ngp_h = NULL;
|
||||
struct mbuf *m;
|
||||
int hash;
|
||||
int hash, plen;
|
||||
int error = 0;
|
||||
|
||||
if (hinfo->noqueue) {
|
||||
/*
|
||||
* Shortcut from inbound to outbound hook when neither of
|
||||
* bandwidth, delay, BER or duplication probability is
|
||||
* configured, nor we have queued frames to drain.
|
||||
*/
|
||||
if (hinfo->run.qin_frames == 0 && hinfo->run.qout_frames == 0 &&
|
||||
hinfo->noqueue) {
|
||||
struct hookinfo *dest;
|
||||
if (hinfo == &priv->lower)
|
||||
dest = &priv->upper;
|
||||
else
|
||||
dest = &priv->lower;
|
||||
|
||||
/* Send the frame. */
|
||||
plen = NGI_M(item)->m_pkthdr.len;
|
||||
NG_FWD_ITEM_HOOK(error, item, dest->hook);
|
||||
return error;
|
||||
|
||||
/* Update stats. */
|
||||
if (error) {
|
||||
hinfo->stats.out_disc_frames++;
|
||||
hinfo->stats.out_disc_octets += plen;
|
||||
} else {
|
||||
hinfo->stats.fwd_frames++;
|
||||
hinfo->stats.fwd_octets += plen;
|
||||
}
|
||||
|
||||
return (error);
|
||||
}
|
||||
|
||||
mtx_lock(&ng_pipe_giant);
|
||||
microuptime(now);
|
||||
|
||||
/*
|
||||
* Attach us to the list of active ng_pipes if this was an empty
|
||||
* one before, and also update the queue service deadline time.
|
||||
* If this was an empty queue, update service deadline time.
|
||||
*/
|
||||
if (hinfo->run.qin_frames == 0) {
|
||||
struct timeval *when = &hinfo->qin_utime;
|
||||
@ -602,8 +612,6 @@ ngp_rcvdata(hook_p hook, item_p item)
|
||||
when->tv_sec = now->tv_sec;
|
||||
when->tv_usec = now->tv_usec;
|
||||
}
|
||||
if (hinfo->run.qout_frames == 0)
|
||||
LIST_INSERT_HEAD(&active_head, hinfo, active_le);
|
||||
}
|
||||
|
||||
/* Populate the packet header */
|
||||
@ -702,9 +710,7 @@ ngp_rcvdata(hook_p hook, item_p item)
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to start the dequeuing process immediately. We must
|
||||
* hold the ng_pipe_giant lock here and pipe_dequeue() will
|
||||
* release it
|
||||
* Try to start the dequeuing process immediately.
|
||||
*/
|
||||
pipe_dequeue(hinfo, now);
|
||||
|
||||
@ -720,27 +726,21 @@ ngp_rcvdata(hook_p hook, item_p item)
|
||||
* to outbound (delay) queue;
|
||||
* 4) Loop to 2) until bandwidth quota for this timeslice is reached, or
|
||||
* inbound queue is flushed completely;
|
||||
* 5) Extract the first frame from the outbound queue, if it's time has
|
||||
* come. Queue the frame for transmission on the outbound hook;
|
||||
* 6) Loop to 5) until outbound queue is flushed completely, or the next
|
||||
* frame in the queue is not scheduled to be dequeued yet;
|
||||
* 7) Transimit all frames queued in 5)
|
||||
*
|
||||
* Note: the caller must hold the ng_pipe_giant lock; this function
|
||||
* returns with the lock released.
|
||||
* 5) Dequeue frames from the outbound queue and send them downstream until
|
||||
* outbound queue is flushed completely, or the next frame in the queue
|
||||
* is not due to be dequeued yet
|
||||
*/
|
||||
static void
|
||||
pipe_dequeue(struct hookinfo *hinfo, struct timeval *now) {
|
||||
static uint64_t rand, oldrand;
|
||||
const priv_p priv = NG_NODE_PRIVATE(NG_HOOK_NODE(hinfo->hook));
|
||||
const node_p node = NG_HOOK_NODE(hinfo->hook);
|
||||
const priv_p priv = NG_NODE_PRIVATE(node);
|
||||
struct hookinfo *dest;
|
||||
struct ngp_fifo *ngp_f, *ngp_f1;
|
||||
struct ngp_hdr *ngp_h;
|
||||
struct timeval *when;
|
||||
struct mbuf *q_head = NULL;
|
||||
struct mbuf *q_tail = NULL;
|
||||
struct mbuf *m;
|
||||
int error = 0;
|
||||
int plen, error = 0;
|
||||
|
||||
/* Which one is the destination hook? */
|
||||
if (hinfo == &priv->lower)
|
||||
@ -791,13 +791,13 @@ pipe_dequeue(struct hookinfo *hinfo, struct timeval *now) {
|
||||
|
||||
/* Calculate the serialization delay */
|
||||
if (hinfo->cfg.bandwidth) {
|
||||
hinfo->qin_utime.tv_usec += ((uint64_t) m->m_pkthdr.len
|
||||
+ priv->overhead ) *
|
||||
8000000 / hinfo->cfg.bandwidth;
|
||||
hinfo->qin_utime.tv_usec +=
|
||||
((uint64_t) m->m_pkthdr.len + priv->overhead ) *
|
||||
8000000 / hinfo->cfg.bandwidth;
|
||||
hinfo->qin_utime.tv_sec +=
|
||||
hinfo->qin_utime.tv_usec / 1000000;
|
||||
hinfo->qin_utime.tv_usec / 1000000;
|
||||
hinfo->qin_utime.tv_usec =
|
||||
hinfo->qin_utime.tv_usec % 1000000;
|
||||
hinfo->qin_utime.tv_usec % 1000000;
|
||||
}
|
||||
when = &ngp_h->when;
|
||||
when->tv_sec = hinfo->qin_utime.tv_sec;
|
||||
@ -853,94 +853,56 @@ pipe_dequeue(struct hookinfo *hinfo, struct timeval *now) {
|
||||
|
||||
/* Delay queue processing */
|
||||
while ((ngp_h = TAILQ_FIRST(&hinfo->qout_head))) {
|
||||
struct mbuf *m = ngp_h->m;
|
||||
|
||||
when = &ngp_h->when;
|
||||
m = ngp_h->m;
|
||||
if (when->tv_sec > now->tv_sec ||
|
||||
(when->tv_sec == now->tv_sec &&
|
||||
when->tv_usec > now->tv_usec))
|
||||
break;
|
||||
|
||||
/* Update outbound queue stats */
|
||||
hinfo->stats.fwd_frames++;
|
||||
hinfo->stats.fwd_octets += m->m_pkthdr.len;
|
||||
plen = m->m_pkthdr.len;
|
||||
hinfo->run.qout_frames--;
|
||||
hinfo->run.qout_octets -= m->m_pkthdr.len;
|
||||
hinfo->run.qout_octets -= plen;
|
||||
|
||||
/* Dequeue the packet from qout */
|
||||
TAILQ_REMOVE(&hinfo->qout_head, ngp_h, ngp_link);
|
||||
uma_zfree(ngp_zone, ngp_h);
|
||||
|
||||
/* Enqueue locally for sending downstream */
|
||||
if (q_head == NULL)
|
||||
q_head = m;
|
||||
if (q_tail)
|
||||
q_tail->m_nextpkt = m;
|
||||
q_tail = m;
|
||||
m->m_nextpkt = NULL;
|
||||
}
|
||||
|
||||
/* If both queues are empty detach us from the list of active queues */
|
||||
if (hinfo->run.qin_frames + hinfo->run.qout_frames == 0) {
|
||||
LIST_REMOVE(hinfo, active_le);
|
||||
active_gen_id++;
|
||||
}
|
||||
|
||||
mtx_unlock(&ng_pipe_giant);
|
||||
|
||||
while ((m = q_head) != NULL) {
|
||||
q_head = m->m_nextpkt;
|
||||
m->m_nextpkt = NULL;
|
||||
NG_SEND_DATA(error, dest->hook, m, meta);
|
||||
if (error) {
|
||||
hinfo->stats.out_disc_frames++;
|
||||
hinfo->stats.out_disc_octets += plen;
|
||||
} else {
|
||||
hinfo->stats.fwd_frames++;
|
||||
hinfo->stats.fwd_octets += plen;
|
||||
}
|
||||
}
|
||||
|
||||
if ((hinfo->run.qin_frames != 0 || hinfo->run.qout_frames != 0) &&
|
||||
!priv->timer_scheduled) {
|
||||
ng_callout(&priv->timer, node, NULL, 1, ngp_callout, NULL, 0);
|
||||
priv->timer_scheduled = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* This routine is called on every clock tick. We poll all nodes/hooks
|
||||
* This routine is called on every clock tick. We poll connected hooks
|
||||
* for queued frames by calling pipe_dequeue().
|
||||
*/
|
||||
static void
|
||||
pipe_scheduler(void *arg)
|
||||
ngp_callout(node_p node, hook_p hook, void *arg1, int arg2)
|
||||
{
|
||||
pipe_poll();
|
||||
|
||||
/* Reschedule */
|
||||
callout_reset(&polling_timer, 1, &pipe_scheduler, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Traverse the list of all active hooks and attempt to dequeue
|
||||
* some packets. Hooks with empty queues are not traversed since
|
||||
* they are not linked into this list.
|
||||
*/
|
||||
static void
|
||||
pipe_poll(void)
|
||||
{
|
||||
struct hookinfo *hinfo;
|
||||
const priv_p priv = NG_NODE_PRIVATE(node);
|
||||
struct timeval now;
|
||||
int old_gen_id = active_gen_id;
|
||||
|
||||
mtx_lock(&ng_pipe_giant);
|
||||
microuptime(&now);
|
||||
LIST_FOREACH(hinfo, &active_head, active_le) {
|
||||
CURVNET_SET(NG_HOOK_NODE(hinfo->hook)->nd_vnet);
|
||||
pipe_dequeue(hinfo, &now);
|
||||
CURVNET_RESTORE();
|
||||
mtx_lock(&ng_pipe_giant);
|
||||
if (old_gen_id != active_gen_id) {
|
||||
/* the list was updated; restart traversing */
|
||||
hinfo = LIST_FIRST(&active_head);
|
||||
if (hinfo == NULL)
|
||||
break;
|
||||
old_gen_id = active_gen_id;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
mtx_unlock(&ng_pipe_giant);
|
||||
}
|
||||
|
||||
priv->timer_scheduled = 0;
|
||||
microuptime(&now);
|
||||
if (priv->upper.hook != NULL)
|
||||
pipe_dequeue(&priv->upper, &now);
|
||||
if (priv->lower.hook != NULL)
|
||||
pipe_dequeue(&priv->lower, &now);
|
||||
}
|
||||
|
||||
/*
|
||||
* Shutdown processing
|
||||
@ -955,6 +917,8 @@ ngp_shutdown(node_p node)
|
||||
{
|
||||
const priv_p priv = NG_NODE_PRIVATE(node);
|
||||
|
||||
if (priv->timer_scheduled)
|
||||
ng_uncallout(&priv->timer, node);
|
||||
if (priv->lower.hook && priv->upper.hook)
|
||||
ng_bypass(priv->lower.hook, priv->upper.hook);
|
||||
else {
|
||||
@ -978,9 +942,6 @@ ngp_disconnect(hook_p hook)
|
||||
struct hookinfo *const hinfo = NG_HOOK_PRIVATE(hook);
|
||||
struct ngp_fifo *ngp_f;
|
||||
struct ngp_hdr *ngp_h;
|
||||
int removed = 0;
|
||||
|
||||
mtx_lock(&ng_pipe_giant);
|
||||
|
||||
KASSERT(hinfo != NULL, ("%s: null info", __FUNCTION__));
|
||||
hinfo->hook = NULL;
|
||||
@ -991,7 +952,6 @@ ngp_disconnect(hook_p hook)
|
||||
TAILQ_REMOVE(&ngp_f->packet_head, ngp_h, ngp_link);
|
||||
m_freem(ngp_h->m);
|
||||
uma_zfree(ngp_zone, ngp_h);
|
||||
removed++;
|
||||
}
|
||||
TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
|
||||
uma_zfree(ngp_zone, ngp_f);
|
||||
@ -1002,27 +962,12 @@ ngp_disconnect(hook_p hook)
|
||||
TAILQ_REMOVE(&hinfo->qout_head, ngp_h, ngp_link);
|
||||
m_freem(ngp_h->m);
|
||||
uma_zfree(ngp_zone, ngp_h);
|
||||
removed++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Both queues should be empty by now, so detach us from
|
||||
* the list of active queues
|
||||
*/
|
||||
if (removed) {
|
||||
LIST_REMOVE(hinfo, active_le);
|
||||
active_gen_id++;
|
||||
}
|
||||
if (hinfo->run.qin_frames + hinfo->run.qout_frames != removed)
|
||||
printf("Mismatch: queued=%d but removed=%d !?!",
|
||||
hinfo->run.qin_frames + hinfo->run.qout_frames, removed);
|
||||
|
||||
/* Release the packet loss probability table (BER) */
|
||||
if (hinfo->ber_p)
|
||||
free(hinfo->ber_p, M_NG_PIPE);
|
||||
|
||||
mtx_unlock(&ng_pipe_giant);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
@ -1038,16 +983,9 @@ ngp_modevent(module_t mod, int type, void *unused)
|
||||
UMA_ALIGN_PTR, 0);
|
||||
if (ngp_zone == NULL)
|
||||
panic("ng_pipe: couldn't allocate descriptor zone");
|
||||
|
||||
mtx_init(&ng_pipe_giant, "ng_pipe_giant", NULL, MTX_DEF);
|
||||
LIST_INIT(&active_head);
|
||||
callout_init(&polling_timer, CALLOUT_MPSAFE);
|
||||
callout_reset(&polling_timer, 1, &pipe_scheduler, NULL);
|
||||
break;
|
||||
case MOD_UNLOAD:
|
||||
callout_drain(&polling_timer);
|
||||
uma_zdestroy(ngp_zone);
|
||||
mtx_destroy(&ng_pipe_giant);
|
||||
break;
|
||||
default:
|
||||
error = EOPNOTSUPP;
|
||||
|
Loading…
x
Reference in New Issue
Block a user