Add tcpp -- TCP parallelism microbenchmark.

This tool creates large numbers of TCP connections, each of which will
transmit a fixed amount of data, between client and server hosts.  tcpp can
use multiple workers (typically up to the number of hardware cores), and can
use multiple source IPs in order to use an expanded port/IP 4-tuple space to
avoid problems from reusing 4-tuples too quickly.  Aggregate bandwidth use
will be reported after a client run.

While by no means a perfect tool, it has proven quite useful in generating
and optimizing TCP stack lock contention by easily generating high-intensity
workloads.  It also proves surprisingly good at finding device driver bugs.
This commit is contained in:
Robert Watson 2009-03-10 14:52:17 +00:00
parent 062ef8a5f8
commit b860cb2652
7 changed files with 1098 additions and 0 deletions

View File

@ -0,0 +1,9 @@
# $FreeBSD$
PROG= tcpp
INCS= tcpp.h
NO_MAN=
SRCS= tcpp.c tcpp_client.c tcpp_server.c tcpp_util.c
WARNS= 3
.include <bsd.prog.mk>

View File

@ -0,0 +1,99 @@
tcpp -- Parallel TCP Exercise Tool
This is a new tool, and is rife with bugs. However, it appears to create
even more problems for device drivers and the kernel, so that's OK.
This tool generates large numbers of TCP connections and stuffs lots of data
into them. One binary encapsulates both a client and a server. Each of the
client and the server generates a certain number of worker processes, each of
which in turn uses its own TCP port. The number of server processes must be
>= the number of client processes, or some of the ports required by the
client won't have a listener. The client then proceeds to make connections
and send data to the server. Each worker multiplexes many connections at
once, up to a maximum parallelism limit. The client can use one or many IP
addresses, in order to make more 4-tuples available for testing, and will
automatically spread the load of new connections across available source
addresses.
You will need to retune your TCP stack for high volume, see Configuration
Notes below.
The server has very little to configure, use the following command line
flags:
-s Select server mode
-p <numprocs> Number of workers, should be >= client -p arg
-r <baseport> Non-default base TCP port, should match client
-T Print CPU usage every ten seconds
-m <maxconnectionsperproc> Maximum simultaneous connections/proc, should
be >= client setting.
Typical use:
./tcpp -s -p 4 -m 1000000
This selects server mode, four workers, and at most 1 million TCP connections
per worker at a time.
The client has more to configure, with the following flags:
-c <remoteIP> Select client mode, and specific dest IP
-C Print connections/second instead of GBps
-M <localIPcount> Number of sequential local IPs to use; req. -l
-T Include CPU use summary in stats at end of run
-b <bytespertcp> Data bytes per connection
-l <localIPbase> Starting local IP address to bind
-m <maxtcpsatonce> Max simultaneous conn/worker (see server -m)
-p <numprocs> Number of workers, should be <= server -p
-r <baseport> Non-default base TCP port, should match server
-t <tcpsperproc> How many connections to use per worker
Typical use:
./tcpp -c 192.168.100.201 -p 4 -t 100000 -m 10000 -b 100000 \
-l 192.168.100.101 -M 4
This creates four workers, each of which will (over its lifetime) set up and
use 100,000 TCP connections carrying 100K of data, up to 10,000 simultaneous
connection at any given moment. tcpp will use four source IP addresses,
starting with 192.168.100.101, and all connections will be to the single
destination IP of 192.168.100.201.
Having (p) <= the number of cores is advisable. When multiple IPs are used
on the client, they will be sequential starting with the localIPbase set with
-l.
Known Issues
------------
The bandwidth estimate doesn't handle failures well. It also has serious
rounding errors and probably conceptual problems.
It's not clear that kevent() is "fair" to multiple connections.
Rather than passing the length for each connection, we might want to pass
it once with a control connection up front. On the other hand, the server
is quite dumb right now, so we could take advantage of this to do size
mixes.
Configuration Notes
-------------------
In my testing, I use sysctl.conf entries of:
net.inet.ip.portrange.first=100
kern.ipc.maxsockets=1000000
net.inet.tcp.maxtcptw=3000000
kern.ipc.somaxconn=49152
# For running !multiq, do this before loading the driver:
kenv hw.cxgb.singleq="1"
kldload if_cxgb
# Consider turning off TSO and/or adjusting the MTU for some scenarios:
ifconfig cxgb0 -tso
ifconfig cxgb0 mtu 1500
$FreeBSD$

View File

@ -0,0 +1,204 @@
/*-
* Copyright (c) 2008-2009 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <err.h>
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sysexits.h>
#include <unistd.h>
#include "tcpp.h"
#define BYTES_DEFAULT 10*1024*1024 /* Data per connection. */
#define MAXTCPS_DEFAULT 32 /* Number of TCPs at a time per proc. */
#define PROCS_DEFAULT 1 /* Processes used in run. */
#define TCPS_DEFAULT 1 /* Number of connections per process. */
#define BASEPORT_DEFAULT 10000
struct sockaddr_in remoteip; /* Base target address. */
struct sockaddr_in localipbase; /* Base local address, if -l. */
int cflag, lflag, mflag, pflag, sflag, tflag, Cflag, Mflag, Tflag;
uint64_t bflag;
u_short rflag;
static void
usage(void)
{
fprintf(stderr, "client: tcpp"
" -c remoteIP"
" [-CT]"
" [-M localIPcount]"
" [-l localIPbase]"
" [-b bytespertcp]"
" [-m maxtcpsatonce]"
"\n"
"\t"
" [-p procs]"
" [-t tcpsperproc]"
" [-r baseport]"
"\n");
fprintf(stderr, "server: tcpp"
" -s"
" [-T]"
" [-l localIPbase]"
" [-m maxtcpsatonce]"
" [-p procs]"
" [-r baseport]"
"\n");
exit(EX_USAGE);
}
int
main(int argc, char *argv[])
{
long long ll;
char *dummy;
int ch;
bzero(&localipbase, sizeof(localipbase));
localipbase.sin_len = sizeof(localipbase);
localipbase.sin_family = AF_INET;
localipbase.sin_addr.s_addr = htonl(INADDR_ANY); /* Default. */
localipbase.sin_port = htons(0); /* Default. */
bzero(&remoteip, sizeof(remoteip));
remoteip.sin_len = sizeof(remoteip);
remoteip.sin_family = AF_INET;
remoteip.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* Default. */
remoteip.sin_port = htons(0); /* Default. */
bflag = BYTES_DEFAULT;
mflag = MAXTCPS_DEFAULT;
pflag = PROCS_DEFAULT;
rflag = BASEPORT_DEFAULT;
tflag = TCPS_DEFAULT;
Mflag = 1;
while ((ch = getopt(argc, argv, "b:c:l:m:p:r:st:CM:T")) != -1) {
switch (ch) {
case 'b':
ll = strtoll(optarg, &dummy, 10);
if (*dummy != '\0' || ll <= 0)
usage();
bflag = ll;
break;
case 'c':
cflag++;
if (inet_aton(optarg, &remoteip.sin_addr) != 1)
err(-1, "inet_aton: %s", optarg);
break;
case 'l':
lflag++;
if (inet_aton(optarg, &localipbase.sin_addr) != 1)
err(-1, "inet_aton: %s", optarg);
break;
case 'm':
ll = strtoll(optarg, &dummy, 10);
if (*dummy != '\0' || ll <= 0)
usage();
mflag = ll;
break;
case 'p':
ll = strtoll(optarg, &dummy, 10);
if (*dummy != '\0' || ll <= 0)
usage();
pflag = ll;
break;
case 'r':
ll = strtol(optarg, &dummy, 10);
if (*dummy != '\0' || ll < 1 || ll > 65535)
usage();
rflag = ll;
break;
case 's':
sflag++;
break;
case 't':
ll = strtoll(optarg, &dummy, 10);
if (*dummy != '\0' || ll <= 0)
usage();
tflag = ll;
break;
case 'C':
Cflag++;
break;
case 'M':
ll = strtoll(optarg, &dummy, 10);
if (*dummy != '\0' || ll <= 1)
usage();
Mflag = ll;
break;
case 'T':
Tflag++;
break;
default:
usage();
}
}
/* Exactly one of client and server. */
if (cflag > 1 || sflag > 1)
usage();
if ((cflag && sflag) || (!cflag && !sflag))
usage();
/* If Mflag is specified, we must have the lflag for a local IP. */
if (Mflag > 1 && !lflag)
usage();
/* Several flags are valid only on the client, disallow if server. */
if (sflag && (Cflag || Mflag > 1))
usage();
if (cflag)
tcpp_client();
else
tcpp_server();
exit(0);
}

View File

@ -0,0 +1,52 @@
/*-
* Copyright (c) 2008-2009 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#ifndef TCPP_H
#define TCPP_H
extern struct sockaddr_in localipbase, remoteip;
extern int cflag, lflag, mflag, pflag, sflag, tflag;
extern int Cflag, Iflag, Mflag, Tflag;
extern uint64_t bflag;
extern u_short rflag;
#define TCPP_MAGIC 0x84e812f7
struct tcpp_header {
u_int32_t th_magic;
u_int64_t th_len;
} __packed;
void tcpp_client(void);
void tcpp_header_encode(struct tcpp_header *thp);
void tcpp_header_decode(struct tcpp_header *thp);
void tcpp_server(void);
#define SYSCTLNAME_CPUS "kern.smp.cpus"
#define SYSCTLNAME_CPTIME "kern.cp_time"
#endif /* TCPP_H */

View File

@ -0,0 +1,346 @@
/*-
* Copyright (c) 2008-2009 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#include <sys/types.h>
#include <sys/event.h>
#include <sys/resource.h>
#include <sys/sched.h>
#include <sys/socket.h>
#include <sys/sysctl.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "tcpp.h"
#define min(x, y) (x < y ? x : y)
#define timespecsub(vvp, uvp) \
do { \
(vvp)->tv_sec -= (uvp)->tv_sec; \
(vvp)->tv_nsec -= (uvp)->tv_nsec; \
if ((vvp)->tv_nsec < 0) { \
(vvp)->tv_sec--; \
(vvp)->tv_nsec += 1000000000; \
} \
} while (0)
/*
* Gist of each client worker: build up to mflag connections at a time, and
* pump data in to them somewhat fairly until tflag connections have been
* completed.
*/
#define CONNECTION_MAGIC 0x87a3f56e
struct connection {
uint32_t conn_magic; /* Just magic. */
int conn_fd;
struct tcpp_header conn_header; /* Header buffer. */
u_int conn_header_sent; /* Header bytes sent. */
u_int64_t conn_data_sent; /* Data bytes sent.*/
};
static u_char buffer[256 * 1024]; /* Buffer to send. */
static pid_t *pid_list;
static int kq;
static int started; /* Number started so far. */
static int finished; /* Number finished so far. */
static int counter; /* IP number offset. */
static struct connection *
tcpp_client_newconn(void)
{
struct sockaddr_in sin;
struct connection *conn;
struct kevent kev;
int fd, i;
/*
* Spread load over available IPs, roating through them as we go. No
* attempt to localize IPs to particular workers.
*/
sin = localipbase;
sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) +
(counter++ % Mflag));
fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0)
err(-1, "socket");
if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
err(-1, "fcntl");
i = 1;
if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0)
err(-1, "setsockopt");
#if 0
i = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0)
err(-1, "setsockopt");
#endif
if (lflag) {
if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
err(-1, "bind");
}
if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 &&
errno != EINPROGRESS)
err(-1, "connect");
conn = malloc(sizeof(*conn));
if (conn == NULL)
return (NULL);
bzero(conn, sizeof(*conn));
conn->conn_magic = CONNECTION_MAGIC;
conn->conn_fd = fd;
conn->conn_header.th_magic = TCPP_MAGIC;
conn->conn_header.th_len = bflag;
tcpp_header_encode(&conn->conn_header);
EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn);
if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
err(-1, "newconn kevent");
started++;
return (conn);
}
static void
tcpp_client_closeconn(struct connection *conn)
{
close(conn->conn_fd);
bzero(conn, sizeof(*conn));
free(conn);
finished++;
}
static void
tcpp_client_handleconn(struct kevent *kev)
{
struct connection *conn;
ssize_t len;
conn = kev->udata;
if (conn->conn_magic != CONNECTION_MAGIC)
errx(-1, "tcpp_client_handleconn: magic");
if (conn->conn_header_sent < sizeof(conn->conn_header)) {
len = write(conn->conn_fd, ((u_char *)&conn->conn_header) +
conn->conn_header_sent, sizeof(conn->conn_header) -
conn->conn_header_sent);
if (len < 0) {
tcpp_client_closeconn(conn);
err(-1, "tcpp_client_handleconn: header write");
}
if (len == 0) {
tcpp_client_closeconn(conn);
errx(-1, "tcpp_client_handleconn: header write "
"premature EOF");
}
conn->conn_header_sent += len;
} else {
len = write(conn->conn_fd, buffer, min(sizeof(buffer),
bflag - conn->conn_data_sent));
if (len < 0) {
tcpp_client_closeconn(conn);
err(-1, "tcpp_client_handleconn: data write");
}
if (len == 0) {
tcpp_client_closeconn(conn);
errx(-1, "tcpp_client_handleconn: data write: "
"premature EOF");
}
conn->conn_data_sent += len;
if (conn->conn_data_sent >= bflag) {
/*
* All is well.
*/
tcpp_client_closeconn(conn);
}
}
}
static void
tcpp_client_worker(int workernum)
{
struct kevent *kev_array;
int i, numevents, kev_bytes;
#if defined(CPU_SETSIZE) && 0
cpu_set_t mask;
int ncpus;
size_t len;
len = sizeof(ncpus);
if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
if (len != sizeof(ncpus))
errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
(intmax_t)len);
CPU_ZERO(&mask);
CPU_SET(workernum % ncpus, &mask);
if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
err(-1, "sched_setaffinity");
#endif
setproctitle("tcpp_client %d", workernum);
/*
* Add the worker number to the remote port.
*/
remoteip.sin_port = htons(rflag + workernum);
kev_bytes = sizeof(*kev_array) * mflag;
kev_array = malloc(kev_bytes);
if (kev_array == NULL)
err(-1, "malloc");
bzero(kev_array, kev_bytes);
kq = kqueue();
if (kq < 0)
err(-1, "kqueue");
while (finished < tflag) {
while ((started - finished < mflag) && (started < tflag))
(void)tcpp_client_newconn();
numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL);
if (numevents < 0)
err(-1, "kevent");
if (numevents > mflag)
errx(-1, "kevent: %d", numevents);
for (i = 0; i < numevents; i++)
tcpp_client_handleconn(&kev_array[i]);
}
/* printf("Worker %d done - %d finished\n", workernum, finished); */
}
void
tcpp_client(void)
{
struct timespec ts_start, ts_finish;
long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES];
long ticks;
size_t size;
pid_t pid;
int i, failed, status;
pid_list = malloc(sizeof(*pid_list) * pflag);
if (pid_list == NULL)
err(-1, "malloc pid_list");
bzero(pid_list, sizeof(*pid_list) * pflag);
/*
* Start workers.
*/
size = sizeof(cp_time_start);
if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0)
< 0)
err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0)
err(-1, "clock_gettime");
for (i = 0; i < pflag; i++) {
pid = fork();
if (pid < 0) {
warn("fork");
for (i = 0; i < pflag; i++) {
if (pid_list[i] != 0)
(void)kill(pid_list[i], SIGKILL);
}
exit(-1);
}
if (pid == 0) {
tcpp_client_worker(i);
exit(0);
}
pid_list[i] = pid;
}
/*
* GC workers.
*/
failed = 0;
for (i = 0; i < pflag; i++) {
if (pid_list[i] != 0) {
while (waitpid(pid_list[i], &status, 0) != pid_list[i]);
if (WEXITSTATUS(status) != 0)
failed = 1;
}
}
if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0)
err(-1, "clock_gettime");
size = sizeof(cp_time_finish);
if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0)
< 0)
err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
timespecsub(&ts_finish, &ts_start);
if (failed)
errx(-1, "Too many errors");
printf("%jd bytes transferred in %jd.%09jd seconds\n",
(bflag * tflag * pflag), (intmax_t)ts_finish.tv_sec,
(intmax_t)(ts_finish.tv_nsec));
if (Tflag)
printf("%d procs ", pflag);
if (Cflag) {
printf("%f cps%s", (double)(pflag * tflag)/
(ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9),
Tflag ? " " : "\n");
} else {
printf("%f Gbps%s", (double)(bflag * tflag * pflag * 8) /
(ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9,
Tflag ? " " : "\n");
}
if (Tflag) {
ticks = 0;
for (i = 0; i < CPUSTATES; i++) {
cp_time_finish[i] -= cp_time_start[i];
ticks += cp_time_finish[i];
}
printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
"idle%% %lu\n",
(100 * cp_time_finish[CP_USER]) / ticks,
(100 * cp_time_finish[CP_NICE]) / ticks,
(100 * cp_time_finish[CP_SYS]) / ticks,
(100 * cp_time_finish[CP_INTR]) / ticks,
(100 * cp_time_finish[CP_IDLE]) / ticks);
}
}

View File

@ -0,0 +1,340 @@
/*-
* Copyright (c) 2008-2009 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#include <sys/types.h>
#include <sys/endian.h>
#include <sys/event.h>
#include <sys/resource.h>
#include <sys/sched.h>
#include <sys/socket.h>
#include <sys/sysctl.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <err.h>
#include <fcntl.h>
#include <inttypes.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "tcpp.h"
/*
* Server side -- create a pool of processes, each listening on its own TCP
* port number for new connections. The first 8 bytes of each connection
* will be a network byte order length, then there will be that number of
* bytes of data. We use non-blocking sockets with kqueue to to avoid the
* overhead of threading or more than one process per processor, which makes
* things a bit awkward when dealing with data we care about. As such, we
* read into a small character buffer which we then convert to a length once
* we have all the data.
*/
#define CONNECTION_MAGIC 0x6392af27
struct connection {
uint32_t conn_magic; /* Just magic. */
int conn_fd;
struct tcpp_header conn_header; /* Header buffer. */
u_int conn_header_len; /* Bytes so far. */
u_int64_t conn_data_len; /* How much to sink. */
u_int64_t conn_data_received; /* How much so far. */
};
static pid_t *pid_list;
static int kq;
static struct connection *
tcpp_server_newconn(int listen_fd)
{
struct connection *conn;
struct kevent kev;
int fd;
fd = accept(listen_fd, NULL, NULL);
if (fd < 0) {
warn("accept");
return (NULL);
}
if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
err(-1, "fcntl");
conn = malloc(sizeof(*conn));
if (conn == NULL)
return (NULL);
bzero(conn, sizeof(*conn));
conn->conn_magic = CONNECTION_MAGIC;
conn->conn_fd = fd;
/*
* Register to read on the socket, and set our conn pointer as the
* udata so we can find it quickly in the future.
*/
EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, conn);
if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
err(-1, "kevent");
return (conn);
}
static void
tcpp_server_closeconn(struct connection *conn)
{
/*
* Kqueue cleans up after itself once we close the socket, and since
* we are processing only one kevent at a time, we don't need to
* worry about watching out for future kevents referring to it.
*
* ... right?
*/
close(conn->conn_fd);
bzero(conn, sizeof(*conn));
free(conn);
}
static u_char buffer[256*1024]; /* Buffer in which to sink data. */
static void
tcpp_server_handleconn(struct kevent *kev)
{
struct connection *conn;
ssize_t len;
conn = kev->udata;
if (conn->conn_magic != CONNECTION_MAGIC)
errx(-1, "tcpp_server_handleconn: magic");
if (conn->conn_header_len < sizeof(conn->conn_header)) {
len = read(conn->conn_fd,
((u_char *)&conn->conn_header) + conn->conn_header_len,
sizeof(conn->conn_header) - conn->conn_header_len);
if (len < 0) {
warn("tcpp_server_handleconn: header read");
tcpp_server_closeconn(conn);
return;
}
if (len == 0) {
warnx("tcpp_server_handleconn: header premature eof");
tcpp_server_closeconn(conn);
return;
}
conn->conn_header_len += len;
if (conn->conn_header_len == sizeof(conn->conn_header)) {
tcpp_header_decode(&conn->conn_header);
if (conn->conn_header.th_magic != TCPP_MAGIC) {
warnx("tcpp_server_handleconn: bad magic");
tcpp_server_closeconn(conn);
return;
}
}
} else {
/*
* Drain up to a buffer from the connection, so that we pay
* attention to other connections too.
*/
len = read(conn->conn_fd, buffer, sizeof(buffer));
if (len < 0) {
warn("tcpp_server_handleconn: data bad read");
tcpp_server_closeconn(conn);
return;
}
if (len == 0 && conn->conn_data_received <
conn->conn_header.th_len) {
warnx("tcpp_server_handleconn: data premature eof");
tcpp_server_closeconn(conn);
return;
}
conn->conn_data_received += len;
if (conn->conn_data_received > conn->conn_header.th_len) {
warnx("tcpp_server_handleconn: too much data");
tcpp_server_closeconn(conn);
return;
}
if (conn->conn_data_received == conn->conn_header.th_len) {
/*
* All is well.
*/
tcpp_server_closeconn(conn);
return;
}
}
}
static void
tcpp_server_worker(int workernum)
{
int i, listen_sock, numevents;
struct kevent kev, *kev_array;
int kev_bytes;
#if defined(CPU_SETSIZE) && 0
cpu_set_t mask;
int ncpus;
ssize_t len;
len = sizeof(ncpus);
if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
if (len != sizeof(ncpus))
errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
(intmax_t)len);
CPU_ZERO(&mask);
CPU_SET(workernum % ncpus, &mask);
if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
err(-1, "sched_setaffinity");
#endif
setproctitle("tcpp_server %d", workernum);
/* Allow an extra kevent for the listen socket. */
kev_bytes = sizeof(*kev_array) * (mflag + 1);
kev_array = malloc(kev_bytes);
if (kev_array == NULL)
err(-1, "malloc");
bzero(kev_array, kev_bytes);
/* XXXRW: Want to set and pin the CPU here. */
/*
* Add the worker number to the local port.
*/
localipbase.sin_port = htons(rflag + workernum);
listen_sock = socket(PF_INET, SOCK_STREAM, 0);
if (listen_sock < 0)
err(-1, "socket");
i = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i))
< 0)
err(-1, "setsockopt");
i = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i))
< 0)
err(-1, "setsockopt");
if (bind(listen_sock, (struct sockaddr *)&localipbase,
sizeof(localipbase)) < 0)
err(-1, "bind");
if (listen(listen_sock, 16384))
err(-1, "listen");
if (fcntl(listen_sock, F_SETFL, O_NONBLOCK) < 0)
err(-1, "fcntl");
kq = kqueue();
if (kq < 0)
err(-1, "kqueue");
EV_SET(&kev, listen_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
err(-1, "kevent");
while ((numevents = kevent(kq, NULL, 0, kev_array, mflag + 1, NULL))
> 0) {
for (i = 0; i < numevents; i++) {
if (kev_array[i].ident == (u_int)listen_sock)
(void)tcpp_server_newconn(listen_sock);
else
tcpp_server_handleconn(&kev_array[i]);
}
}
printf("Worker %d done\n", workernum);
}
void
tcpp_server(void)
{
long cp_time_last[CPUSTATES], cp_time_now[CPUSTATES], ticks;
size_t size;
pid_t pid;
int i;
pid_list = malloc(sizeof(*pid_list) * pflag);
if (pid_list == NULL)
err(-1, "malloc pid_list");
bzero(pid_list, sizeof(*pid_list) * pflag);
/*
* Start workers.
*/
for (i = 0; i < pflag; i++) {
pid = fork();
if (pid < 0) {
warn("fork");
for (i = 0; i < pflag; i++) {
if (pid_list[i] != 0)
(void)kill(pid_list[i], SIGKILL);
}
exit(-1);
}
if (pid == 0) {
tcpp_server_worker(i);
exit(0);
}
pid_list[i] = pid;
}
if (Tflag) {
size = sizeof(cp_time_last);
if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_last, &size,
NULL, 0) < 0)
err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
while (1) {
sleep(10);
size = sizeof(cp_time_last);
if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_now,
&size, NULL, 0) < 0)
err(-1, "sysctlbyname: %s",
SYSCTLNAME_CPTIME);
ticks = 0;
for (i = 0; i < CPUSTATES; i++) {
cp_time_last[i] = cp_time_now[i] -
cp_time_last[i];
ticks += cp_time_last[i];
}
printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
"idle%% %lu\n",
(100 * cp_time_last[CP_USER]) / ticks,
(100 * cp_time_last[CP_NICE]) / ticks,
(100 * cp_time_last[CP_SYS]) / ticks,
(100 * cp_time_last[CP_INTR]) / ticks,
(100 * cp_time_last[CP_IDLE]) / ticks);
bcopy(cp_time_now, cp_time_last, sizeof(cp_time_last));
}
}
/*
* GC workers.
*/
for (i = 0; i < pflag; i++) {
if (pid_list[i] != 0) {
while (waitpid(pid_list[i], NULL, 0) != pid_list[i]);
}
}
}

View File

@ -0,0 +1,48 @@
/*-
* Copyright (c) 2008 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#include <sys/types.h>
#include <sys/endian.h>
#include "tcpp.h"
void
tcpp_header_encode(struct tcpp_header *thp)
{
thp->th_magic = htobe32(thp->th_magic);
thp->th_len = htobe64(thp->th_len);
}
void
tcpp_header_decode(struct tcpp_header *thp)
{
thp->th_magic = be32toh(thp->th_magic);
thp->th_len = be64toh(thp->th_len);
}