From 6750270f2334a9629aed8906d040da55a9c59185 Mon Sep 17 00:00:00 2001 From: Luigi Rizzo Date: Sun, 23 Dec 2012 23:03:45 +0000 Subject: [PATCH] small cleanup of the code, and add support for running multiple threads on each socket. --- tools/tools/netrate/netreceive/netreceive.c | 165 +++++++++++++++++--- 1 file changed, 144 insertions(+), 21 deletions(-) diff --git a/tools/tools/netrate/netreceive/netreceive.c b/tools/tools/netrate/netreceive/netreceive.c index 9300109e53a0..80be69374408 100644 --- a/tools/tools/netrate/netreceive/netreceive.c +++ b/tools/tools/netrate/netreceive/netreceive.c @@ -43,27 +43,158 @@ #define MAXSOCK 20 +#include +#include +#include /* clock_getres() */ + +static int round_to(int n, int l) +{ + return ((n + l - 1)/l)*l; +} + +/* + * Each socket uses multiple threads so the receiver is + * more efficient. A collector thread runs the stats. + */ +struct td_desc { + pthread_t td_id; + uint64_t count; /* rx counter */ + int fd; + char *buf; + int buflen; +}; + static void usage(void) { - fprintf(stderr, "netreceive [port]\n"); + fprintf(stderr, "netreceive port [nthreads]\n"); exit(-1); } +static __inline void +timespec_add(struct timespec *tsa, struct timespec *tsb) +{ + + tsa->tv_sec += tsb->tv_sec; + tsa->tv_nsec += tsb->tv_nsec; + if (tsa->tv_nsec >= 1000000000) { + tsa->tv_sec++; + tsa->tv_nsec -= 1000000000; + } +} + +static __inline void +timespec_sub(struct timespec *tsa, struct timespec *tsb) +{ + + tsa->tv_sec -= tsb->tv_sec; + tsa->tv_nsec -= tsb->tv_nsec; + if (tsa->tv_nsec < 0) { + tsa->tv_sec--; + tsa->tv_nsec += 1000000000; + } +} + +static void * +rx_body(void *data) +{ + struct td_desc *t = data; + struct pollfd fds; + int y; + + fds.fd = t->fd; + fds.events = POLLIN; + + for (;;) { + if (poll(&fds, 1, -1) < 0) + perror("poll on thread"); + if (!(fds.revents & POLLIN)) + continue; + for (;;) { + y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT); + if (y < 0) + break; + t->count++; + } + } + return NULL; +} + +int +make_threads(struct td_desc **tp, int *s, int nsock, int nthreads) +{ + int i, si, nt = nsock * nthreads; + int lb = round_to(nt * sizeof (struct td_desc *), 64); + int td_len = round_to(sizeof(struct td_desc), 64); // cache align + char *m = calloc(1, lb + td_len * nt); + + printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len); + /* pointers plus the structs */ + if (m == NULL) { + perror("no room for pointers!"); + exit(1); + } + tp = (struct td_desc **)m; + m += lb; /* skip the pointers */ + for (si = i = 0; i < nt; i++, m += td_len) { + tp[i] = (struct td_desc *)m; + tp[i]->fd = s[si]; + if (++si == nsock) + si = 0; + if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) { + perror("unable to create thread"); + exit(1); + } + } +} + +int +main_thread(struct td_desc **tp, int nsock, int nthreads) +{ + uint64_t c0, c1; + struct timespec now, then, delta; + /* now the parent collects and prints results */ + c0 = c1 = 0; + clock_gettime(CLOCK_REALTIME, &then); + fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec); + while (1) { + int i, nt = nsock * nthreads; + int64_t dn; + uint64_t pps; + + if (poll(NULL, 0, 500) < 0) + perror("poll"); + c0 = 0; + for (i = 0; i < nt; i++) { + c0 += tp[i]->count; + } + dn = c0 - c1; + clock_gettime(CLOCK_REALTIME, &now); + delta = now; + timespec_sub(&delta, &then); + then = now; + pps = dn; + pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); + fprintf(stderr, "%d pkts in %ld.%09ld ns %ld pps\n", + (int)dn, delta.tv_sec, delta.tv_nsec, (long)pps); + c1 = c0; + } +} + int main(int argc, char *argv[]) { struct addrinfo hints, *res, *res0; char *dummy, *packet; int port; - int error, v, i; + int error, v, nthreads = 1; + struct td_desc **tp; const char *cause = NULL; int s[MAXSOCK]; - struct pollfd fds[MAXSOCK]; int nsock; - if (argc != 2) + if (argc < 2) usage(); memset(&hints, 0, sizeof(hints)); @@ -74,6 +205,10 @@ main(int argc, char *argv[]) port = strtoul(argv[1], &dummy, 10); if (port < 1 || port > 65535 || *dummy != '\0') usage(); + if (argc > 2) + nthreads = strtoul(argv[2], &dummy, 10); + if (nthreads < 1 || nthreads > 64) + usage(); packet = malloc(65536); if (packet == NULL) { @@ -110,9 +245,6 @@ main(int argc, char *argv[]) continue; } (void) listen(s[nsock], 5); - fds[nsock].fd = s[nsock]; - fds[nsock].events = POLLIN; - nsock++; } if (nsock == 0) { @@ -121,21 +253,12 @@ main(int argc, char *argv[]) /*NOTREACHED*/ } - printf("netreceive listening on UDP port %d\n", (u_short)port); + printf("netreceive %d sockets x %d threads listening on UDP port %d\n", + nsock, nthreads, (u_short)port); + + make_threads(tp, s, nsock, nthreads); + main_thread(tp, nsock, nthreads); - while (1) { - if (poll(fds, nsock, -1) < 0) - perror("poll"); - for (i = 0; i < nsock; i++) { - if (fds[i].revents & POLLIN) { - if (recv(s[i], packet, 65536, 0) < 0) - perror("recv"); - } - if ((fds[i].revents &~ POLLIN) != 0) - perror("poll"); - } - } - /*NOTREACHED*/ freeaddrinfo(res0); }