cleanup and document in some detail the internals of the testing code

for dummynet schedulers
This commit is contained in:
Luigi Rizzo 2016-01-27 02:22:31 +00:00
parent ff8d60ab4d
commit 1cdc5f0b87
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=294882
5 changed files with 199 additions and 143 deletions

View File

@ -20,7 +20,7 @@ HEAP_OBJS=$(HEAP_SRCS:.c=.o)
VPATH= .:..
CFLAGS = -I.. -I. -Wall -Werror -O3 -DIPFW -Wextra
CFLAGS = -I.. -I. -Wall -Werror -O3 -Wextra
TARGETS= test_sched # no test_heap by default
all: $(TARGETS)

View File

@ -23,8 +23,8 @@ extern "C" {
extern int debug;
#define ND(fmt, args...) do {} while (0)
#define D1(fmt, args...) do {} while (0)
#define D(fmt, args...) fprintf(stderr, "%-8s " fmt "\n", \
__FUNCTION__, ## args)
#define D(fmt, args...) fprintf(stderr, "%-10s %4d %-8s " fmt "\n", \
__FILE__, __LINE__, __FUNCTION__, ## args)
#define DX(lev, fmt, args...) do { \
if (debug > lev) D(fmt, ## args); } while (0)
@ -53,11 +53,24 @@ enum {
DN_SCHED_WF2QP,
};
/* from ip_dummynet.h, fields used in ip_dn_private.h */
struct dn_id {
int type, subtype, len, id;
uint16_t len; /* total len inc. this header */
uint8_t type;
uint8_t subtype;
// uint32_t id; /* generic id */
};
/* (from ip_dummynet.h)
* A flowset, which is a template for flows. Contains parameters
* from the command line: id, target scheduler, queue sizes, plr,
* flow masks, buckets for the flow hash, and possibly scheduler-
* specific parameters (weight, quantum and so on).
*/
struct dn_fs {
/* generic scheduler parameters. Leave them at -1 if unset.
* Now we use 0: weight, 1: lmax, 2: priority
*/
int par[4]; /* flowset parameters */
/* simulation entries.
@ -78,16 +91,29 @@ struct dn_fs {
int cur;
};
/* (ip_dummynet.h)
* scheduler template, indicating nam, number, mask and buckets
*/
struct dn_sch {
};
/* (from ip_dummynet.h)
* dn_flow collects flow_id and stats for queues and scheduler
* instances, and is used to pass these info to userland.
* oid.type/oid.subtype describe the object, oid.id is number
* of the parent object.
*/
struct dn_flow {
struct dn_id oid;
int length;
int len_bytes;
int drops;
uint64_t tot_pkts;
uint64_t tot_bytes;
uint32_t flow_id;
uint32_t length; /* Queue length, in packets */
uint32_t len_bytes; /* Queue length, in bytes */
uint32_t drops;
//uint32_t flow_id;
/* the following fields are used by the traffic generator.
*/
struct list_head h; /* used by the generator */
/* bytes served by the flow since the last backlog time */
@ -96,6 +122,7 @@ struct dn_flow {
uint64_t sch_bytes;
};
/* the link */
struct dn_link {
};
@ -107,9 +134,9 @@ struct mbuf {
int len;
} m_pkthdr;
struct mbuf *m_nextpkt;
int flow_id; /* for testing, index of a flow */
uint32_t flow_id; /* for testing, index of a flow */
//int flowset_id; /* for testing, index of a flowset */
void *cfg; /* config args */
//void *cfg; /* config args */
};
#define MALLOC_DECLARE(x) extern volatile int __dummy__ ## x
@ -131,36 +158,9 @@ typedef struct _md_t moduledata_t;
moduledata_t *_g_##name = & b
#define MODULE_DEPEND(a, b, c, d, e)
#ifdef IPFW
#include <dn_heap.h>
#include <ip_dn_private.h>
#include <dn_sched.h>
#else
struct dn_queue {
struct dn_fsk *fs; /* parent flowset. */
struct dn_sch_inst *_si; /* parent sched instance. */
};
struct dn_schk {
};
struct dn_fsk {
struct dn_fs fs;
struct dn_schk *sched;
};
struct dn_sch_inst {
struct dn_schk *sched;
};
struct dn_alg {
int type;
const char *name;
void *enqueue, *dequeue;
int q_datalen, si_datalen, schk_datalen;
int (*config)(struct dn_schk *);
int (*new_sched)(struct dn_sch_inst *);
int (*new_fsk)(struct dn_fsk *);
int (*new_queue)(struct dn_queue *q);
};
#endif
#ifndef __FreeBSD__
int fls(int);

View File

@ -9,11 +9,9 @@
* keeping track of statistics.
*/
#include "dn_test.h"
// #define USE_BURST // what is this for ?
struct q_list {
struct list_head h;
};
#include "dn_test.h"
struct cfg_s {
int ac;
@ -30,14 +28,19 @@ struct cfg_s {
uint32_t dequeue;
/* generator parameters */
int th_min, th_max;
int32_t th_min, th_max; /* thresholds for hysteresis; negative means per flow */
#ifdef USE_BURST
int maxburst;
#endif /* USE_BURST */
int lmin, lmax; /* packet len */
int flows; /* number of flows */
int flowsets; /* number of flowsets */
int wsum; /* sum of weights of all flows */
#ifdef USE_CUR
int max_y; /* max random number in the generation */
int cur_y, cur_fs; /* used in generation, between 0 and max_y - 1 */
int cur_y
int cur_fs; /* used in generation, between 0 and max_y - 1 */
#endif /* USE_CUR */
const char *fs_config; /* flowset config */
int can_dequeue;
int burst; /* count of packets sent in a burst */
@ -52,17 +55,26 @@ struct cfg_s {
struct mbuf *);
struct mbuf * (*deq)(struct dn_sch_inst *);
/* size of the three fields including sched-specific areas */
int schk_len;
int q_len; /* size of a queue including sched-fields */
int si_len; /* size of a sch_inst including sched-fields */
uint32_t schk_len;
uint32_t q_len; /* size of a queue including sched-fields */
uint32_t si_len; /* size of a sch_inst including sched-fields */
char *q; /* array of flow queues */
/* use a char* because size is variable */
struct dn_fsk *fs; /* array of flowsets */
struct dn_sch_inst *si;
/*
* The scheduler template (one) followd by schk_datalen bytes
* for scheduler-specific parameters, total size is schk_len
*/
struct dn_schk *sched;
/*
* one scheduler instance, followed by si_datalen bytes
* for scheduler specific parameters of this instance,
* total size is si_len. si->sched points to sched
*/
struct dn_sch_inst *si;
struct dn_fsk *fs; /* array of flowsets */
/* generator state */
int state; /* 0 = going up, 1: going down */
int state; /* 0 = going up (enqueue), 1: going down (dequeue) */
/*
* We keep lists for each backlog level, and always serve
@ -72,17 +84,18 @@ struct cfg_s {
* XXX to optimize things, entry i could contain queues with
* 2^{i-1}+1 .. 2^i entries.
*/
#define BACKLOG 30
uint32_t llmask;
#define BACKLOG 30 /* this many backlogged classes, we only need BACKLOG+1 */
uint64_t llmask;
struct list_head ll[BACKLOG + 10];
double *q_wfi; /* (byte) Worst-case Fair Index of the flows */
double wfi; /* (byte) Worst-case Fair Index of the system */
};
/* FI2Q and Q2FI converts from flow_id to dn_queue and back.
* We cannot easily use pointer arithmetic because it is variable size.
*/
/* FI2Q and Q2FI converts from flow_id (i.e. queue index)
* to dn_queue and back. We cannot simply use pointer arithmetic
* because the queu has variable size, q_len
*/
#define FI2Q(c, i) ((struct dn_queue *)((c)->q + (c)->q_len * (i)))
#define Q2FI(c, q) (((char *)(q) - (c)->q)/(c)->q_len)
@ -92,10 +105,11 @@ struct dn_parms dn_cfg;
static void controller(struct cfg_s *c);
/* release a packet: put the mbuf in the freelist, and the queue in
* the bucket.
/* release a packet for a given flow_id.
* Put the mbuf in the freelist, and in case move the
* flow to the end of the bucket.
*/
int
static int
drop(struct cfg_s *c, struct mbuf *m)
{
struct dn_queue *q;
@ -118,31 +132,36 @@ drop(struct cfg_s *c, struct mbuf *m)
return 0;
}
/* dequeue returns NON-NULL when a packet is dropped */
/*
* dn_sch_inst does not have a queue, for the RR we
* allocate a mq right after si
*/
static int
enqueue(struct cfg_s *c, void *_m)
default_enqueue(struct dn_sch_inst *si, struct dn_queue *q, struct mbuf *m)
{
struct mbuf *m = _m;
if (c->enq)
return c->enq(c->si, FI2Q(c, m->flow_id), m);
if (c->head == NULL)
c->head = m;
struct mq *mq = (struct mq *)si;
(void)q;
/* this is the default function if no scheduler is provided */
if (mq->head == NULL)
mq->head = m;
else
c->tail->m_nextpkt = m;
c->tail = m;
mq->tail->m_nextpkt = m;
mq->tail = m;
return 0; /* default - success */
}
/* dequeue returns NON-NULL when a packet is available */
static void *
dequeue(struct cfg_s *c)
static struct mbuf *
default_dequeue(struct dn_sch_inst *si)
{
struct mq *mq = (struct mq *)si;
struct mbuf *m;
if (c->deq)
return c->deq(c->si);
if ((m = c->head)) {
m = c->head;
c->head = m->m_nextpkt;
/* this is the default function if no scheduler is provided */
if ((m = mq->head)) {
m = mq->head;
mq->head = m->m_nextpkt;
m->m_nextpkt = NULL;
}
return m;
@ -193,23 +212,34 @@ mainloop(struct cfg_s *c)
DX(3, "loop %d enq %d send %p rx %d",
i, c->_enqueue, c->tosend, c->can_dequeue);
if ( (m = c->tosend) ) {
int ret;
struct dn_queue *q = FI2Q(c, m->flow_id);
c->_enqueue++;
if (enqueue(c, m)) {
ret = c->enq(c->si, q, m);
if (ret) {
drop(c, m);
ND("loop %d enqueue fail", i );
D("loop %d enqueue fail", i );
/*
* XXX do not insist; rather, try dequeue
*/
goto do_dequeue;
} else {
ND("enqueue ok");
c->pending++;
gnet_stats_enq(c, m);
}
}
if (c->can_dequeue) {
} else if (c->can_dequeue) {
do_dequeue:
c->dequeue++;
if ((m = dequeue(c))) {
m = c->deq(c->si);
if (m) {
c->pending--;
drop(c, m);
c->drop--; /* compensate */
gnet_stats_deq(c, m);
} else {
D("--- ouch, cannot operate on iteration %d, pending %d", i, c->pending);
break;
}
}
}
@ -221,11 +251,10 @@ int
dump(struct cfg_s *c)
{
int i;
struct dn_queue *q;
for (i=0; i < c->flows; i++) {
q = FI2Q(c, i);
DX(1, "queue %4d tot %10llu", i,
//struct dn_queue *q = FI2Q(c, i);
ND(1, "queue %4d tot %10llu", i,
(unsigned long long)q->ni.tot_bytes);
}
DX(1, "done %d loops\n", c->loops);
@ -284,20 +313,25 @@ getnum(const char *s, char **next, const char *key)
* weight:maxlen:flows
* indicating how many flows are hooked to that fs.
* Both weight and range can be min-max-steps.
* In a first pass we just count the number of flowsets and flows,
* in a second pass we complete the setup.
* The first pass (fs != NULL) justs count the number of flowsets and flows,
* the second pass (fs == NULL) we complete the setup.
*/
static void
parse_flowsets(struct cfg_s *c, const char *fs, int pass)
parse_flowsets(struct cfg_s *c, const char *fs)
{
char *s, *cur, *next;
int n_flows = 0, n_fs = 0, wsum = 0;
int i, j;
struct dn_fs *prev = NULL;
int pass = (fs == NULL);
DX(3, "--- pass %d flows %d flowsets %d", pass, c->flows, c->flowsets);
if (pass == 0)
if (fs != NULL) { /* first pass */
if (c->fs_config)
D("warning, overwriting fs %s with %s",
c->fs_config, fs);
c->fs_config = fs;
}
s = c->fs_config ? strdup(c->fs_config) : NULL;
if (s == NULL) {
if (pass == 0)
@ -327,7 +361,7 @@ parse_flowsets(struct cfg_s *c, const char *fs, int pass)
w, w_h, w_steps, len, len_h, l_steps, flows);
if (w == 0 || w_h < w || len == 0 || len_h < len ||
flows == 0) {
DX(4,"wrong parameters %s", fs);
DX(4,"wrong parameters %s", s);
return;
}
n_flows += flows * w_steps * l_steps;
@ -361,7 +395,6 @@ parse_flowsets(struct cfg_s *c, const char *fs, int pass)
}
}
}
c->max_y = prev ? prev->base_y + prev->y : 0;
c->flows = n_flows;
c->flowsets = n_fs;
c->wsum = wsum;
@ -369,7 +402,11 @@ parse_flowsets(struct cfg_s *c, const char *fs, int pass)
return;
/* now link all flows to their parent flowsets */
DX(1,"%d flows on %d flowsets", c->flows, c->flowsets);
#ifdef USE_CUR
c->max_y = prev ? prev->base_y + prev->y : 0;
DX(1,"%d flows on %d flowsets max_y %d", c->flows, c->flowsets, c->max_y);
#endif /* USE_CUR */
for (i=0; i < c->flowsets; i++) {
struct dn_fs *fs = &c->fs[i].fs;
DX(1, "fs %3d w %5d l %4d flow %5d .. %5d y %6d .. %6d",
@ -383,6 +420,18 @@ parse_flowsets(struct cfg_s *c, const char *fs, int pass)
}
}
/* available schedulers */
extern moduledata_t *_g_dn_fifo;
extern moduledata_t *_g_dn_wf2qp;
extern moduledata_t *_g_dn_rr;
extern moduledata_t *_g_dn_qfq;
#ifdef WITH_QFQP
extern moduledata_t *_g_dn_qfqp;
#endif
#ifdef WITH_KPS
extern moduledata_t *_g_dn_kps;
#endif
static int
init(struct cfg_s *c)
{
@ -395,7 +444,7 @@ init(struct cfg_s *c)
moduledata_t *mod = NULL;
struct dn_alg *p = NULL;
c->th_min = 0;
c->th_min = -1; /* 1 packet per flow */
c->th_max = -20;/* 20 packets per flow */
c->lmin = c->lmax = 1280; /* packet len */
c->flows = 1;
@ -408,16 +457,6 @@ init(struct cfg_s *c)
} else if (!strcmp(*av, "-d")) {
debug = atoi(av[1]);
} else if (!strcmp(*av, "-alg")) {
extern moduledata_t *_g_dn_fifo;
extern moduledata_t *_g_dn_wf2qp;
extern moduledata_t *_g_dn_rr;
extern moduledata_t *_g_dn_qfq;
#ifdef WITH_QFQP
extern moduledata_t *_g_dn_qfqp;
#endif
#ifdef WITH_KPS
extern moduledata_t *_g_dn_kps;
#endif
if (!strcmp(av[1], "rr"))
mod = _g_dn_rr;
else if (!strcmp(av[1], "wf2qp"))
@ -443,9 +482,11 @@ init(struct cfg_s *c)
c->lmin = getnum(av[1], NULL, av[0]);
c->lmax = c->lmin;
DX(3, "setting max to %d", c->th_max);
#ifdef USE_BURST
} else if (!strcmp(*av, "-burst")) {
c->maxburst = getnum(av[1], NULL, av[0]);
DX(3, "setting max to %d", c->th_max);
#endif /* USE_BURST */
} else if (!strcmp(*av, "-qmax")) {
c->th_max = getnum(av[1], NULL, av[0]);
DX(3, "setting max to %d", c->th_max);
@ -456,15 +497,17 @@ init(struct cfg_s *c)
c->flows = getnum(av[1], NULL, av[0]);
DX(3, "setting flows to %d", c->flows);
} else if (!strcmp(*av, "-flowsets")) {
parse_flowsets(c, av[1], 0);
parse_flowsets(c, av[1]); /* first pass */
DX(3, "setting flowsets to %d", c->flowsets);
} else {
D("option %s not recognised, ignore", *av);
}
ac -= 2; av += 2;
}
#ifdef USE_BURST
if (c->maxburst <= 0)
c->maxburst = 1;
#endif /* USE_BURST */
if (c->loops <= 0)
c->loops = 1;
if (c->flows <= 0)
@ -482,45 +525,61 @@ init(struct cfg_s *c)
c->th_max = c->flows * -c->th_max;
if (c->th_max <= c->th_min)
c->th_max = c->th_min + 1;
/* now load parameters from the module */
if (mod) {
p = mod->p;
DX(3, "using module %s f %p p %p", mod->name, mod->f, mod->p);
DX(3, "modname %s ty %d", p->name, p->type);
// XXX check enq and deq not null
c->enq = p->enqueue;
c->deq = p->dequeue;
c->si_len += p->si_datalen;
c->q_len += p->q_datalen;
c->schk_len += p->schk_datalen;
} else {
/* make sure c->si has room for a queue */
c->enq = default_enqueue;
c->deq = default_dequeue;
}
/* allocate queues, flowsets and one scheduler */
c->q = calloc(c->flows, c->q_len);
c->q_wfi = (double *)calloc(c->flows, sizeof(double));
D("using %d flows, %d flowsets", c->flows, c->flowsets);
D("q_len %d dn_fsk %d si %d sched %d",
c->q_len, (int)sizeof(struct dn_fsk),
c->si_len, c->schk_len);
c->sched = calloc(1, c->schk_len); /* one parent scheduler */
c->si = calloc(1, c->si_len); /* one scheduler instance */
c->fs = calloc(c->flowsets, sizeof(struct dn_fsk));
c->si = calloc(1, c->si_len);
c->sched = calloc(c->flows, c->schk_len);
if (c->q == NULL || c->fs == NULL || !c->q_wfi) {
D("error allocating memory for flows");
c->q = calloc(c->flows, c->q_len); /* one queue per flow */
c->q_wfi = calloc(c->flows, sizeof(double)); /* stats, one per flow */
if (!c->sched || !c->si || !c->fs || !c->q || !c->q_wfi) {
D("error allocating memory");
exit(1);
}
c->si->sched = c->sched;
c->si->sched = c->sched; /* link scheduler instance to template */
if (p) {
/* run initialization code if needed */
if (p->config)
p->config(c->sched);
p->config(c->si->sched);
if (p->new_sched)
p->new_sched(c->si);
}
/* parse_flowsets links queues to their flowsets */
parse_flowsets(c, av[1], 1);
parse_flowsets(c, NULL); /* second pass */
/* complete the work calling new_fsk */
for (i = 0; i < c->flowsets; i++) {
if (c->fs[i].fs.par[1] == 0)
c->fs[i].fs.par[1] = 1000; /* default pkt len */
c->fs[i].sched = c->sched;
struct dn_fsk *fsk = &c->fs[i];
if (fsk->fs.par[1] == 0)
fsk->fs.par[1] = 1000; /* default pkt len */
fsk->sched = c->si->sched;
if (p && p->new_fsk)
p->new_fsk(&c->fs[i]);
p->new_fsk(fsk);
}
/* --- now the scheduler is initialized --- */
/* initialize the lists for the generator, and put
/*
* initialize the lists for the generator, and put
* all flows in the list for backlog = 0
*/
for (i=0; i <= BACKLOG+5; i++)
@ -536,7 +595,7 @@ init(struct cfg_s *c)
INIT_LIST_HEAD(&q->ni.h);
list_add_tail(&q->ni.h, &c->ll[0]);
}
c->llmask = 1;
c->llmask = 1; /* all flows are in the first list */
return 0;
}
@ -545,7 +604,6 @@ int
main(int ac, char *av[])
{
struct cfg_s c;
struct timeval end;
double ll;
int i;
char msg[40];
@ -555,16 +613,15 @@ main(int ac, char *av[])
c.av = av;
init(&c);
gettimeofday(&c.time, NULL);
D("th_min %d th_max %d", c.th_min, c.th_max);
mainloop(&c);
gettimeofday(&end, NULL);
end.tv_sec -= c.time.tv_sec;
end.tv_usec -= c.time.tv_usec;
if (end.tv_usec < 0) {
end.tv_usec += 1000000;
end.tv_sec--;
{
struct timeval end;
gettimeofday(&end, NULL);
timersub(&end, &c.time, &c.time);
}
c.time = end;
ll = end.tv_sec*1000000 + end.tv_usec;
ll = c.time.tv_sec*1000000 + c.time.tv_usec;
ll *= 1000; /* convert to nanoseconds */
ll /= c._enqueue;
sprintf(msg, "1::%d", c.flows);
@ -572,8 +629,10 @@ main(int ac, char *av[])
if (c.wfi < c.q_wfi[i])
c.wfi = c.q_wfi[i];
}
D("sched=%-12s\ttime=%d.%03d sec (%.0f nsec)\twfi=%.02f\tflow=%-16s",
c.name, (int)c.time.tv_sec, (int)c.time.tv_usec / 1000, ll, c.wfi,
D("sched=%-12s\ttime=%d.%03d sec (%.0f nsec) enq %lu %lu deq\n"
"\twfi=%.02f\tflow=%-16s",
c.name, (int)c.time.tv_sec, (int)c.time.tv_usec / 1000, ll,
(unsigned long)c._enqueue, (unsigned long)c.dequeue, c.wfi,
c.fs_config ? c.fs_config : msg);
dump(&c);
DX(1, "done ac %d av %p", ac, av);
@ -593,7 +652,7 @@ controller(struct cfg_s *c)
struct dn_fs *fs;
int flow_id;
/* histeresis between max and min */
/* hysteresis between max and min */
if (c->state == 0 && c->pending >= (uint32_t)c->th_max)
c->state = 1;
else if (c->state == 1 && c->pending <= (uint32_t)c->th_min)
@ -601,9 +660,14 @@ controller(struct cfg_s *c)
ND(1, "state %d pending %2d", c->state, c->pending);
c->can_dequeue = c->state;
c->tosend = NULL;
if (c->state)
if (c->can_dequeue)
return;
/*
* locate the flow to use for enqueueing
* We take the queue with the lowest number of queued packets,
* generate a packet for it, and put the queue in the next highest.
*/
if (1) {
int i;
struct dn_queue *q;
@ -611,7 +675,7 @@ controller(struct cfg_s *c)
i = ffs(c->llmask) - 1;
if (i < 0) {
DX(2, "no candidate");
D("no candidate");
c->can_dequeue = 1;
return;
}
@ -633,8 +697,9 @@ controller(struct cfg_s *c)
c->llmask |= 1<<(1+i);
}
fs = &q->fs->fs;
c->cur_fs = q->fs - c->fs;
fs->cur = flow_id;
#ifdef USE_CUR
c->cur_fs = q->fs - c->fs;
} else {
/* XXX this does not work ? */
/* now decide whom to send the packet, and the length */
@ -650,6 +715,7 @@ controller(struct cfg_s *c)
c->cur_y++;
if (c->cur_y >= fs->next_y)
c->cur_fs++;
#endif /* USE_CUR */
}
/* construct a packet */
@ -662,7 +728,7 @@ controller(struct cfg_s *c)
if (m == NULL)
return;
m->cfg = c;
//m->cfg = c;
m->m_nextpkt = NULL;
m->m_pkthdr.len = fs->par[1]; // XXX maxlen
m->flow_id = flow_id;
@ -672,15 +738,3 @@ controller(struct cfg_s *c)
fs->par[0], m->m_pkthdr.len);
}
/*
Packet allocation:
to achieve a distribution that matches weights, for each X=w/lmax class
we should generate a number of packets proportional to Y = X times the number
of flows in the class.
So we construct an array with the cumulative distribution of Y's,
and use it to identify the flow via inverse mapping (if the Y's are
not too many we can use an array for the lookup). In practice,
each flow will have X entries [virtually] pointing to it.
*/

View File

@ -6,6 +6,7 @@
#ifndef _MYLIST_H
#define _MYLIST_H
/* not just a head, also the link field for a list entry */
struct list_head {
struct list_head *prev, *next;
};

View File

@ -61,6 +61,7 @@ dn_enqueue(struct dn_queue *q, struct mbuf* m, int drop)
mq_append(&q->mq, m);
q->ni.length++;
q->ni.tot_bytes += m->m_pkthdr.len;
q->ni.tot_pkts++;
return 0;
drop: