add support for rate-limiting in the sender (-R ...)

Obtained from:	Giuseppe Lettieri
This commit is contained in:
luigi 2013-05-30 11:10:42 +00:00
parent 4b534acddc
commit 3136540cb4

View File

@ -25,7 +25,7 @@
/*
* $FreeBSD$
* $Id: pkt-gen.c 12024 2013-01-25 05:41:51Z luigi $
* $Id$
*
* Example program to show how to build a multithreaded packet
* source/sink using the netmap device.
@ -89,6 +89,9 @@ struct glob_arg {
int dev_type;
pcap_t *p;
int tx_rate;
struct timespec tx_period;
int affinity;
int main_fd;
int report_interval;
@ -114,7 +117,7 @@ struct targ {
struct netmap_if *nifp;
uint16_t qfirst, qlast; /* range of queues to scan */
volatile uint64_t count;
struct timeval tic, toc;
struct timespec tic, toc;
int me;
pthread_t thread;
int affinity;
@ -669,6 +672,76 @@ ponger_body(void *data)
return NULL;
}
static __inline int
timespec_ge(const struct timespec *a, const struct timespec *b)
{
if (a->tv_sec > b->tv_sec)
return (1);
if (a->tv_sec < b->tv_sec)
return (0);
if (a->tv_nsec >= b->tv_nsec)
return (1);
return (0);
}
static __inline struct timespec
timeval2spec(const struct timeval *a)
{
struct timespec ts = {
.tv_sec = a->tv_sec,
.tv_nsec = a->tv_usec * 1000
};
return ts;
}
static __inline struct timeval
timespec2val(const struct timespec *a)
{
struct timeval tv = {
.tv_sec = a->tv_sec,
.tv_usec = a->tv_nsec / 1000
};
return tv;
}
static int
wait_time(struct timespec ts, struct timespec *wakeup_ts, long long *waited)
{
struct timespec curtime;
curtime.tv_sec = 0;
curtime.tv_nsec = 0;
if (clock_gettime(CLOCK_REALTIME_PRECISE, &curtime) == -1) {
D("clock_gettime: %s", strerror(errno));
return (-1);
}
while (timespec_ge(&ts, &curtime)) {
if (waited != NULL)
(*waited)++;
if (clock_gettime(CLOCK_REALTIME_PRECISE, &curtime) == -1) {
D("clock_gettime");
return (-1);
}
}
if (wakeup_ts != NULL)
*wakeup_ts = curtime;
return (0);
}
static __inline void
timespec_add(struct timespec *tsa, struct timespec *tsb)
{
tsa->tv_sec += tsb->tv_sec;
tsa->tv_nsec += tsb->tv_nsec;
if (tsa->tv_nsec >= 1000000000) {
tsa->tv_sec++;
tsa->tv_nsec -= 1000000000;
}
}
static void *
sender_body(void *data)
@ -680,7 +753,10 @@ sender_body(void *data)
struct netmap_ring *txring;
int i, n = targ->g->npackets / targ->g->nthreads, sent = 0;
int options = targ->g->options | OPT_COPY;
D("start");
struct timespec tmptime, nexttime = { 0, 0}; // XXX silence compiler
int rate_limit = targ->g->tx_rate;
long long waited = 0;
D("start");
if (setaffinity(targ->thread, targ->affinity))
goto quit;
/* setup poll(2) mechanism. */
@ -689,8 +765,18 @@ D("start");
fds[0].events = (POLLOUT);
/* main loop.*/
gettimeofday(&targ->tic, NULL);
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
if (rate_limit) {
tmptime.tv_sec = 2;
tmptime.tv_nsec = 0;
timespec_add(&targ->tic, &tmptime);
targ->tic.tv_nsec = 0;
if (wait_time(targ->tic, NULL, NULL) == -1) {
D("wait_time: %s", strerror(errno));
goto quit;
}
nexttime = targ->tic;
}
if (targ->g->dev_type == DEV_PCAP) {
int size = targ->g->pkt_size;
void *pkt = &targ->pkt;
@ -718,8 +804,18 @@ D("start");
}
}
} else {
int tosend = 0;
while (!targ->cancel && (n == 0 || sent < n)) {
if (rate_limit && tosend <= 0) {
tosend = targ->g->burst;
timespec_add(&nexttime, &targ->g->tx_period);
if (wait_time(nexttime, &tmptime, &waited) == -1) {
D("wait_time");
goto quit;
}
}
/*
* wait for available room in the send queue(s)
*/
@ -737,7 +833,7 @@ D("start");
options &= ~OPT_COPY;
}
for (i = targ->qfirst; i < targ->qlast; i++) {
int m, limit = targ->g->burst;
int m, limit = rate_limit ? tosend : targ->g->burst;
if (n > 0 && n - sent < limit)
limit = n - sent;
txring = NETMAP_TXRING(nifp, i);
@ -746,6 +842,7 @@ D("start");
m = send_packets(txring, &targ->pkt, targ->g->pkt_size,
limit, options);
sent += m;
tosend -= m;
targ->count = sent;
}
}
@ -762,7 +859,7 @@ D("start");
}
}
gettimeofday(&targ->toc, NULL);
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
targ->completed = 1;
targ->count = sent;
@ -834,7 +931,7 @@ receiver_body(void *data)
}
/* main loop, exit after 1s silence */
gettimeofday(&targ->tic, NULL);
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
if (targ->g->dev_type == DEV_PCAP) {
while (!targ->cancel) {
/* XXX should we poll ? */
@ -852,8 +949,8 @@ receiver_body(void *data)
while (!targ->cancel) {
/* Once we started to receive packets, wait at most 1 seconds
before quitting. */
if (poll(fds, 1, 1 * 1000) <= 0 && targ->g->forever == 0) {
gettimeofday(&targ->toc, NULL);
if (poll(fds, 1, 1 * 1000) <= 0 && !targ->g->forever) {
clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
targ->toc.tv_sec -= 1; /* Subtract timeout time. */
break;
}
@ -1085,11 +1182,13 @@ main_thread(struct glob_arg *g)
timerclear(&tic);
timerclear(&toc);
for (i = 0; i < g->nthreads; i++) {
struct timespec t_tic, t_toc;
/*
* Join active threads, unregister interfaces and close
* file descriptors.
*/
pthread_join(targs[i].thread, NULL);
if (targs[i].used)
pthread_join(targs[i].thread, NULL);
close(targs[i].fd);
if (targs[i].completed == 0)
@ -1100,10 +1199,12 @@ main_thread(struct glob_arg *g)
* how long it took to send all the packets.
*/
count += targs[i].count;
if (!timerisset(&tic) || timercmp(&targs[i].tic, &tic, <))
tic = targs[i].tic;
if (!timerisset(&toc) || timercmp(&targs[i].toc, &toc, >))
toc = targs[i].toc;
t_tic = timeval2spec(&tic);
t_toc = timeval2spec(&toc);
if (!timerisset(&tic) || timespec_ge(&targs[i].tic, &t_tic))
tic = timespec2val(&targs[i].tic);
if (!timerisset(&toc) || timespec_ge(&targs[i].toc, &t_toc))
toc = timespec2val(&targs[i].toc);
}
/* print output. */
@ -1115,7 +1216,6 @@ main_thread(struct glob_arg *g)
rx_output(count, delta_t);
if (g->dev_type == DEV_NETMAP) {
ioctl(g->main_fd, NIOCUNREGIF, NULL); // XXX deprecated
munmap(g->mmap_addr, g->mmap_size);
close(g->main_fd);
}
@ -1224,9 +1324,10 @@ main(int arc, char **argv)
g.burst = 512; // default
g.nthreads = 1;
g.cpus = 1;
g.tx_rate = 0;
while ( (ch = getopt(arc, argv,
"a:f:n:i:t:r:l:d:s:D:S:b:c:o:p:PT:w:Wv")) != -1) {
"a:f:n:i:t:r:l:d:s:D:S:b:c:o:p:PT:w:WvR:")) != -1) {
struct sf *fn;
switch(ch) {
@ -1325,6 +1426,10 @@ main(int arc, char **argv)
break;
case 'v':
verbose++;
break;
case 'R':
g.tx_rate = atoi(optarg);
break;
}
}
@ -1473,6 +1578,23 @@ main(int arc, char **argv)
g.options & OPT_MEMCPY ? " memcpy" : "",
g.options & OPT_COPY ? " copy" : "");
}
if (g.tx_rate == 0) {
g.tx_period.tv_sec = 0;
g.tx_period.tv_nsec = 0;
} else if (g.tx_rate == 1) {
g.tx_period.tv_sec = 1;
g.tx_period.tv_nsec = 0;
} else {
g.tx_period.tv_sec = 0;
g.tx_period.tv_nsec = (1e9 / g.tx_rate) * g.burst;
if (g.tx_period.tv_nsec > 1000000000) {
g.tx_period.tv_sec = g.tx_period.tv_nsec / 1000000000;
g.tx_period.tv_nsec = g.tx_period.tv_nsec % 1000000000;
}
}
D("Sending %d packets every %d.%09d ns",
g.burst, (int)g.tx_period.tv_sec, (int)g.tx_period.tv_nsec);
/* Wait for PHY reset. */
D("Wait %d secs for phy reset", wait_link);
sleep(wait_link);