ppd/dsmbr/dsmbr.cc
2023-04-03 20:13:54 -04:00

981 lines
26 KiB
C++
Executable File

#include <sys/types.h>
#include <sys/param.h>
#include <sys/cpuset.h>
#include <sys/event.h>
#include <sys/queue.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <bsock/bsock.h>
#include <netdb.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <pthread.h>
#include <pthread_np.h>
#include <semaphore.h>
#include <unistd.h>
#include "dmsg.hh"
#include "generator.hh"
#include "io.hh"
#include "logger.h"
#include "mod.h"
#include "msg.hh"
#include "util.h"
#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <list>
#include <set>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <vector>
LOGGER_VAR_DECL;
static constexpr int MAX_MOD_ARGS = 32;
static constexpr int MAX_MOD_ARG_LEN = 128;
static constexpr int MAX_SLAVES = 32;
static constexpr int NEVENT = 64;
static constexpr int CTRL_PORT = 15545;
static constexpr int CTRL_TIMEOUT = 3;
static constexpr int CTRL_BACKLOG = 4096;
static constexpr int BSOCK_BUF_SZ = 4096;
static constexpr int THRD_INIT_PARALLEL = 2;
static constexpr int TIMER_MAGIC = 0x123456;
static constexpr int DEFAULT_SERVER_PORT = 9898;
static constexpr int STATE_WAITING = 0;
static constexpr int STATE_RUN = 1;
static constexpr int STATE_STOP = 2;
struct dsmbr_thread_ctx {
int tid;
int core;
std::list<struct dsmbr_request_record *> stats;
pthread_t thrd;
};
struct dsmbr_request_record {
uint64_t recv_size;
uint64_t send_size;
uint64_t send_ts;
uint64_t recv_ts;
};
struct dsmbr_conn {
Generator *ia_gen;
SSL *ssl;
char *ssl_readbuf;
int conn_fd;
struct bsock *bsock;
struct ppd_bsock_io_ssl_ctx bsock_ctx;
void *m_ctx;
uint64_t next_send;
std::list<struct dsmbr_request_record *> req_in_flight;
};
struct dsmbr_slave {
int conn_fd;
/* stats */
uint64_t time;
uint64_t send_sz;
uint64_t recv_sz;
uint64_t reqs;
};
struct dsmbr_options {
int verbose;
char server_ip[64];
int server_port;
int conn_per_thread;
int target_qps;
int depth;
int warmup;
int duration;
char ia_dist[128];
char output_file[128];
cpuset_t cpuset;
char module_path[128];
char *mod_argk[MAX_MOD_ARGS];
char *mod_argv[MAX_MOD_ARGS];
int mod_argc;
int enable_tls;
int master_mode;
int slave_mode;
// master mode
char slave_ips[MAX_SLAVES][64];
int num_slaves;
// global states
std::atomic<int> state = STATE_WAITING;
sem_t worker_init_sem;
std::atomic<int> worker_init_comp = 0;
SSL_CTX *ssl_ctx;
ppd_mod_info *m_info;
void *m_global_ctx;
};
static struct dsmbr_options options = { .verbose = 0,
.server_ip = "127.0.0.1",
.server_port = DEFAULT_SERVER_PORT,
.conn_per_thread = 1,
.target_qps = 0,
.depth = 1,
.warmup = 0,
.duration = 5,
.ia_dist = "fb_ia",
.output_file = "samples.txt",
.cpuset = CPUSET_T_INITIALIZER(1),
.module_path = { 0 },
.mod_argc = 0,
.enable_tls = 0,
.master_mode = 0,
.slave_mode = 0,
.num_slaves = 0,
.ssl_ctx = nullptr,
.m_info = nullptr,
.m_global_ctx = nullptr };
static void
dsmbr_dump_options()
{
std::stringstream ss;
ss << "Configuration:\n"
<< " server: " << options.server_ip << std::endl
<< " port: " << options.server_port << std::endl
<< " connections per thread: " << options.conn_per_thread << std::endl
<< " enable TLS: " << options.enable_tls << std::endl
<< " target qps: " << options.target_qps << std::endl
<< " number of threads: " << CPU_COUNT(&options.cpuset) << std::endl
<< " verbose: " << options.verbose << std::endl
<< " master mode: " << options.master_mode << std::endl
<< " slave mode:" << options.slave_mode << std::endl
<< " module: " << options.module_path << std::endl
<< " run time: " << options.duration << std::endl
<< " warmup time: " << options.warmup << std::endl
<< " interarrival dist: " << options.ia_dist << std::endl
<< " output file: " << options.output_file << std::endl
<< " depth: " << options.depth << std::endl
<< " slaves:";
for (int i = 0; i < options.num_slaves; i++) {
ss << " " << options.slave_ips[i];
}
ss << std::endl;
V("%s", ss.str().c_str());
}
static SSL *
dsmbr_tls_handshake_client(int conn_fd)
{
SSL *ssl;
int r;
ssl = SSL_new(options.ssl_ctx);
if (ssl == NULL) {
E("SSL_new() failed: %ld\n", ERR_get_error());
}
if (SSL_set_fd(ssl, conn_fd) == 0) {
E("SSL_set_fd() failed: %ld\n", ERR_get_error());
}
if ((r = SSL_connect(ssl)) <= 0) {
E("SSL_connect() failed: %ld\n", ERR_get_error());
}
return ssl;
}
static void
dsmbr_create_tls_context()
{
SSL_CTX *ctx;
ctx = SSL_CTX_new(TLS_client_method());
if (!ctx) {
E("SSL_CTX_new() failed: %ld\n", ERR_get_error());
}
SSL_CTX_set_min_proto_version(ctx, TLS1_2_VERSION);
SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION);
options.ssl_ctx = ctx;
}
static struct dsmbr_conn *
dsmbr_conn_create(void *thrd_ctx)
{
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(options.server_port);
server_addr.sin_addr.s_addr = inet_addr(options.server_ip);
int enable = 1;
struct timeval tv = { .tv_sec = CTRL_TIMEOUT, .tv_usec = 0 };
int conn_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (conn_fd == -1) {
E("socket() error: %d\n", errno);
}
if (setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)) != 0) {
E("setsockopt() rcvtimeo: %d\n", errno);
}
if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) != 0) {
E("setsockopt() reuseaddr: %d\n", errno);
}
if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) != 0) {
E("setsockopt() reuseport: %d\n", errno);
}
if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) != 0) {
E("setsockopt() nodelay: %d\n", errno);
}
if (connect(conn_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0) {
E("connect() failed: %d\n", errno);
}
V("Established client connection %d...\n", conn_fd);
struct dsmbr_conn *conn = new struct dsmbr_conn;
conn->conn_fd = conn_fd;
struct bsock_ringbuf_io io;
void *bsock_ctx = nullptr;
if (options.enable_tls) {
V("Initiating TLS handshake on connection %d...\n", conn_fd);
conn->ssl = dsmbr_tls_handshake_client(conn_fd);
io = ppd_bsock_io_ssl();
conn->ssl_readbuf = new char[BSOCK_BUF_SZ];
conn->bsock_ctx.ssl_readbuf = conn->ssl_readbuf;
conn->bsock_ctx.ssl_readbuf_len = BSOCK_BUF_SZ;
conn->bsock_ctx.ssl = conn->ssl;
bsock_ctx = &conn->bsock_ctx;
} else {
io = bsock_io_posix();
conn->ssl_readbuf = nullptr;
conn->ssl = nullptr;
bsock_ctx = (void *)(uintptr_t)conn_fd;
}
conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, 0);
conn->ia_gen = createGenerator(options.ia_dist);
if (conn->ia_gen == NULL) {
E("Failed to create generator \"%s\"\n", options.ia_dist);
}
conn->ia_gen->set_lambda((double)options.target_qps /
(double)(options.conn_per_thread * CPU_COUNT(&options.cpuset)));
int status;
if ((status = options.m_info->conn_init_cb(options.m_global_ctx, thrd_ctx, &conn->m_ctx)) !=
0) {
E("Failed to create conn m_ctx: %d\n", status);
}
conn->next_send = 0;
return conn;
}
static void
dsmbr_conn_free(struct dsmbr_conn *conn, void *thrd_ctx)
{
if (conn->ssl != nullptr) {
SSL_shutdown(conn->ssl);
SSL_free(conn->ssl);
delete[] conn->ssl_readbuf;
}
options.m_info->conn_free_cb(options.m_global_ctx, thrd_ctx, conn->m_ctx);
bsock_free(conn->bsock);
close(conn->conn_fd);
delete conn->ia_gen;
delete conn;
}
void *
dsmbr_worker_main(void *ctx)
{
struct dsmbr_thread_ctx *tinfo = (struct dsmbr_thread_ctx *)ctx;
struct timespec timesp = { .tv_sec = 0, .tv_nsec = 0 };
struct kevent kevs[NEVENT];
std::vector<struct dsmbr_conn *> conns;
sem_wait(&options.worker_init_sem);
V("Thread %d initializing...\n", tinfo->tid);
int kqfd = kqueue();
if (kqfd < 0) {
E("Thread %d kqueue() failed: %d\n", tinfo->tid, errno);
}
struct ppd_msg *mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ];
void *m_ctx;
if (options.m_info->thread_init_cb(tinfo->core, options.m_global_ctx, &m_ctx) != 0) {
E("thread_create_cb() failed: %d\n", errno);
}
for (int i = 0; i < options.conn_per_thread; i++) {
struct dsmbr_conn *conn = dsmbr_conn_create(m_ctx);
conns.push_back(conn);
struct kevent kev;
EV_SET(&kev, conn->conn_fd, EVFILT_READ, EV_ADD, 0, 0, conn);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
}
options.worker_init_comp.fetch_add(1, std::memory_order_relaxed);
sem_post(&options.worker_init_sem);
// create connections
V("Thread %d waiting...\n", tinfo->tid);
while (options.state.load(std::memory_order_relaxed) == STATE_WAITING) {
};
for (struct dsmbr_conn *conn : conns) {
conn->next_send = get_time_us();
}
V("Thread %d starting...\n", tinfo->tid);
while (true) {
int ret = 0;
if (options.state.load(std::memory_order_relaxed) == STATE_STOP) {
break;
}
// read resp first if possible
if ((ret = kevent(kqfd, NULL, 0, kevs, NEVENT, &timesp)) < 0) {
E("Thread %d kevent() error %d.\n", tinfo->tid, errno);
}
int size = ret;
for (int i = 0; i < size; i++) {
struct dsmbr_conn *conn = (struct dsmbr_conn *)kevs[i].udata;
if (kevs[i].flags & EV_EOF) {
E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid,
conn->conn_fd, kevs[i].fflags);
}
ret = bsock_poll(conn->bsock);
if (ret < 0) {
E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid,
conn->conn_fd, ret);
}
}
uint64_t cur_ts = get_time_us();
for (struct dsmbr_conn *conn : conns) {
while (true) {
// read as many messages as possible
ret = ppd_readmsg(conn->bsock, (char *)mbuf, PPD_MSG_MAX_SZ);
if (ret == 0) {
size_t recv_size = mbuf->size + PPD_MSG_HDR_SZ;
ret = options.m_info->conn_recv_cb(mbuf->data, mbuf->size,
options.m_global_ctx, m_ctx, conn->m_ctx);
if (ret != 0) {
E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n",
tinfo->tid, conn->conn_fd, errno);
}
struct dsmbr_request_record *rec =
conn->req_in_flight.front();
conn->req_in_flight.pop_front();
rec->recv_size = recv_size;
rec->recv_ts = cur_ts;
tinfo->stats.push_back(rec);
D("Thread %d connection %d RECV msg of size %u latency %lu\n",
tinfo->tid, conn->conn_fd, mbuf->size,
rec->recv_ts - rec->send_ts);
} else {
if (errno == ENODATA) {
// not enough data for next msg
break;
} else {
E("ppd_readmsg failed with err %d\n", errno);
}
}
}
if (conn->next_send <= cur_ts &&
conn->req_in_flight.size() < (unsigned int)options.depth) {
// send
size_t out_sz;
ret = options.m_info->conn_send_cb(mbuf->data, PPD_MSG_MAX_DATA_SZ,
&out_sz, options.m_global_ctx, m_ctx, conn->m_ctx);
if (ret != 0) {
E("Thread %d connection %d conn_send_cb() failed ERR %d.\n",
tinfo->tid, conn->conn_fd, errno);
}
mbuf->size = out_sz;
struct dsmbr_request_record *rec = new struct dsmbr_request_record;
rec->send_size = mbuf->size + sizeof(struct ppd_msg);
rec->send_ts = get_time_us();
// no blocking, always flush
ret = ppd_writemsg(conn->bsock, mbuf);
if (ret != 0) {
E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n",
tinfo->tid, conn->conn_fd, errno);
}
conn->req_in_flight.push_back(rec);
conn->next_send += (uint64_t)(conn->ia_gen->generate() *
(double)S2US);
D("Thread %d connection %d SEND msg of size %lu\n", tinfo->tid,
conn->conn_fd, out_sz);
}
}
}
V("Thread %d exiting...\n", tinfo->tid);
close(kqfd);
delete[] mbuf;
for (unsigned int i = 0; i < conns.size(); i++) {
dsmbr_conn_free(conns.at(i), m_ctx);
}
options.m_info->thread_free_cb(tinfo->core, options.m_global_ctx, m_ctx);
return NULL;
}
static void
dsmbr_free_worker(struct dsmbr_thread_ctx *worker)
{
for (auto stat : worker->stats) {
delete stat;
}
delete worker;
}
static struct dsmbr_thread_ctx *
dsmbr_create_worker(int tid, int core)
{
struct dsmbr_thread_ctx *tinfo = new struct dsmbr_thread_ctx;
tinfo->tid = tid;
tinfo->core = core;
pthread_attr_t attr;
int status;
if ((status = pthread_attr_init(&attr)) != 0) {
E("pthread_attr_init() failed: %d\n", status);
}
cpuset_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) {
E("pthread_attr_setaffinity_np() failed : %d\n", status);
}
status = pthread_create(&tinfo->thrd, &attr, dsmbr_worker_main, tinfo);
if (status != 0) {
E("pthread_create() failed: %d\n", status);
}
return tinfo;
}
static void
dsmbr_master_connect_slaves(struct dsmbr_slave *slaves)
{
/* create a connection to the clients */
for (int i = 0; i < options.num_slaves; i++) {
struct sockaddr_in csock_addr;
int c_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
csock_addr.sin_family = AF_INET;
csock_addr.sin_addr.s_addr = inet_addr(options.slave_ips[i]);
csock_addr.sin_port = htons(CTRL_PORT);
V("Connecting to slave %s...\n", options.slave_ips[i]);
if (connect(c_fd, (struct sockaddr *)&csock_addr, sizeof(csock_addr)) != 0) {
E("Connect failed. ERR %d\n", errno);
}
slaves[i].conn_fd = c_fd;
}
}
static int
dsmbr_slave_ctrl_sock_create(void)
{
struct sockaddr_in server_addr;
int status;
const int enable = 1;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(CTRL_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
E("socket() returned %d\n", errno);
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) != 0) {
E("setsockopt() reuseaddr: %d\n", errno);
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) != 0) {
E("setsockopt() reuseport: %d\n", errno);
}
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
E("setsockopt() NODELAY %d\n", errno);
}
status = bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status < 0) {
E("bind() returned %d\n", errno);
}
status = listen(fd, CTRL_BACKLOG);
if (status < 0) {
E("listen() returned %d\n", errno);
}
return fd;
}
static void
usage()
{
fprintf(stdout,
"Options:\n"
" -q: target qps.\n"
" -s: server addr.\n"
" -p: server port.\n"
" -o: output file path.\n"
" -a: worker thread cpulist.\n"
" -c: connections per thread.\n"
" -m: module path.\n"
" -M: slave ips, repeatable, enables master mode.\n"
" -S: slave mode.\n"
" -v: verbose mode.\n"
" -t: run time.\n"
" -T: warmup time.\n"
" -d: request depth.\n"
" -i: interarrival time distribution.\n"
" -O: module arguments ('key'='value'), repeatable.\n\n");
}
static int
dsmbr_slave_accept_master(int slave_ctrl_sock)
{
int ret;
int conn_fd;
struct sockaddr addr;
struct sockaddr_in * inaddr;
socklen_t addrlen;
char ip[INET_ADDRSTRLEN];
ret = accept(slave_ctrl_sock, &addr, &addrlen);
if (ret < 0) {
E("Failed to accept master connection: %d\n", errno);
}
conn_fd = ret;
ret = getpeername(conn_fd, &addr, &addrlen);
if (ret < 0) {
E("getpeername failed: %d\n", errno);
}
inaddr = (struct sockaddr_in *)&addr;
if (inet_ntop(AF_INET, &inaddr->sin_addr, ip, INET_ADDRSTRLEN) == NULL) {
E("inet_ntop failed: %d\n", errno);
}
V("Accepted master connection %d from %s.\n", conn_fd, ip);
return conn_fd;
}
static void
dsmbr_getopt(int argc, char *argv[])
{
int ch;
while ((ch = getopt(argc, argv, "q:s:p:o:a:c:m:M:Svt:T:i:O:d:h")) != -1) {
switch (ch) {
case 'q': {
options.target_qps = strtoul(optarg, NULL, 10);
break;
}
case 's': {
if (nslookup(optarg, options.server_ip, sizeof(options.server_ip)) != 0) {
E("nslookup failed with %d for %s\n", errno, optarg);
}
break;
}
case 'p': {
options.server_port = strtoul(optarg, NULL, 10);
break;
}
case 'o': {
strncpy(options.output_file, optarg, sizeof(options.output_file));
break;
}
case 'a': {
cpulist_to_cpuset(optarg, &options.cpuset);
break;
}
case 'c': {
options.conn_per_thread = strtoul(optarg, NULL, 10);
break;
}
case 'm': {
strncpy(options.module_path, optarg, sizeof(options.module_path));
break;
}
case 'M': {
options.master_mode = 1;
if (nslookup(optarg, options.slave_ips[options.num_slaves],
sizeof(options.slave_ips[0])) != 0) {
E("nslookup failed with %d for %s\n", errno, optarg);
}
options.num_slaves++;
if (options.num_slaves > MAX_SLAVES) {
E("Too many slaves: %d > %d\n", options.num_slaves, MAX_SLAVES);
}
break;
}
case 'S': {
options.slave_mode = 1;
break;
}
case 'v': {
options.verbose++;
if (options.verbose == INFO_VERBOSITY) {
W("Verbose mode may cause SUBSTANTIAL latency fluctuation in some terminals emulators\n");
} else if (options.verbose == DEBUG_VERBOSITY) {
W("Debug mode enabled. Expect very verbose output.\n");
}
logger_set_verbosity(options.verbose);
break;
}
case 't': {
options.duration = strtoul(optarg, NULL, 10);
break;
}
case 'T': {
options.warmup = strtoul(optarg, NULL, 10);
break;
}
case 'i': {
strncpy(options.ia_dist, optarg, sizeof(options.ia_dist));
break;
}
case 'O': {
options.mod_argk[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
options.mod_argv[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
split_kvstr(optarg, "=", options.mod_argk[options.mod_argc],
MAX_MOD_ARG_LEN, options.mod_argv[options.mod_argc], MAX_MOD_ARG_LEN);
options.mod_argc++;
break;
}
case 'd' : {
options.depth = strtoul(optarg, NULL, 10);
break;
}
case 'h':
usage();
exit(0);
default:
E("Unrecognized option -%c\n\n", ch);
}
}
}
static void
dsmbr_send_ctrl_code(int sock, uint32_t code)
{
int ret;
struct dsmbr_ctrl_msg msg;
msg.code = code;
ret = dsmbr_send_ctrl_msg(sock, &msg);
if (ret != 0) {
E("Failed to send ctrl message %d\n", errno);
}
return;
}
static void
dsmbr_wait_ctrl_code(int sock, uint32_t code)
{
int ret;
struct dsmbr_ctrl_msg msg;
ret = dsmbr_recv_ctrl_msg(sock, &msg);
if (ret != 0) {
E("Failed to recv ctrl message %d\n", errno);
}
if (msg.code != code) {
E("Unexpected message code 0x%x, expected 0x%x\n", msg.code, code);
}
return;
}
/*
* protocol:
*
* master -> client SYNC
* client and master all establish connections to the server
* client -> master ACK
* master -> client START
* master RUNS for X seconds, client runs forever
* master -> client STOP
* client -> master STAT
*/
int
main(int argc, char *argv[])
{
dsmbr_getopt(argc, argv);
dsmbr_dump_options();
dsmbr_create_tls_context();
if (sem_init(&options.worker_init_sem, 0, THRD_INIT_PARALLEL) != 0) {
E("sem_init failed: %d\n", errno);
}
V("Loading module %s...\n", options.module_path);
options.m_info = ppd_load_module(options.module_path);
V("Loaded module \"%s\".\n", options.m_info->name);
V("Initializing module...\n");
options.m_info->global_init_cb(options.mod_argc, options.mod_argk, options.mod_argv,
&options.m_global_ctx);
FILE *resp_fp_csv = NULL;
if (!options.slave_mode) {
resp_fp_csv = fopen(options.output_file, "w");
if (resp_fp_csv == NULL) {
E("Cannot open file %s for writing %d.", options.output_file, errno);
exit(1);
}
}
struct dsmbr_slave slaves[MAX_SLAVES];
int ret;
int slave_ctrl_sock = -1;
int master_ctrl_conn = -1;
std::vector<struct dsmbr_thread_ctx *> workers;
uint64_t start_ts;
uint64_t end_ts;
uint64_t warmup_ts;
struct dsmbr_ctrl_msg ctrl_msg;
int kqfd = kqueue();
if (kqfd < 0) {
E("kqueue() failed with %d\n", errno);
}
if (options.master_mode) {
/* initiate connections to clients */
V("M: connecting to slaves.\n");
dsmbr_master_connect_slaves(slaves);
V("M: SYNC to slaves.\n");
for (int i = 0; i < options.num_slaves; i++) {
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_SYNC);
}
} else if (options.slave_mode) {
slave_ctrl_sock = dsmbr_slave_ctrl_sock_create();
V("S: waiting for master connection...\n");
master_ctrl_conn = dsmbr_slave_accept_master(slave_ctrl_sock);
// add to kqueue
V("S: waiting for master SYNC %d...\n", master_ctrl_conn);
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_SYNC);
struct kevent kev;
EV_SET(&kev, master_ctrl_conn, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
}
// here slaves and master are on the same page */
int core;
int tid = 0;
CPU_FOREACH_ISSET (core, &options.cpuset) {
workers.push_back(dsmbr_create_worker(tid, core));
V("Created thread %d on core %d.\n", tid, core);
tid++;
}
V("Waiting for worker threads connection establishment.\n");
while (options.worker_init_comp.load(std::memory_order_relaxed) !=
CPU_COUNT(&options.cpuset)) {
};
// now all connections are established
if (options.master_mode) {
V("M: waiting for slaves ACK...\n");
for (int i = 0; i < options.num_slaves; i++) {
dsmbr_wait_ctrl_code(slaves[i].conn_fd, CTRL_ACK);
}
V("M: START to slaves...\n");
for (int i = 0; i < options.num_slaves; i++) {
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_START);
}
} else if (options.slave_mode) {
V("S: ACK to master...\n");
dsmbr_send_ctrl_code(master_ctrl_conn, CTRL_ACK);
V("S: waiting for master START...\n");
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_START);
}
V("Starting benchmark...\n");
start_ts = get_time_us();
warmup_ts = start_ts;
// seq_cst to make sure start_ts happens before store
options.state.store(STATE_RUN, std::memory_order_seq_cst);
int cur_second = 0;
struct kevent kev;
if (!options.slave_mode) {
// slave mode runs forever
EV_SET(&kev, TIMER_MAGIC, EVFILT_TIMER, EV_ADD, NOTE_USECONDS, S2US, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
}
while (true) {
if (kevent(kqfd, NULL, 0, &kev, 1, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
if (kev.ident == TIMER_MAGIC) {
if (kev.data != 1) {
E("Timer expired multiple times\n");
}
cur_second++;
if ((int)cur_second == options.warmup) {
warmup_ts = get_time_us();
}
if ((int)cur_second >= options.duration + options.warmup) {
break;
}
} else if (options.slave_mode && (int)kev.ident == master_ctrl_conn) {
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_STOP);
V("S: received STOP from master.\n");
break;
} else {
E("Unknown kevent ident %d\n", (int)kev.ident);
}
}
V("Stopping benchmark...\n");
options.state.store(STATE_STOP, std::memory_order_seq_cst);
end_ts = get_time_us();
if (options.master_mode) {
V("M: STOP to slaves...\n");
for (int i = 0; i < options.num_slaves; i++) {
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_STOP);
}
}
// wait for thread exit
for (unsigned int i = 0; i < workers.size(); i++) {
if ((ret = pthread_join(workers.at(i)->thrd, NULL)) != 0) {
E("pthread_join tid %d err %d\n", workers.at(i)->tid, ret);
}
}
// obtain stats from threads
uint64_t total_req = 0;
uint64_t total_send = 0;
uint64_t total_recv = 0;
for (auto worker : workers) {
for (auto stat : worker->stats) {
total_req++;
total_send += stat->send_size;
total_recv += stat->recv_size;
}
}
V("Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", total_req, end_ts - start_ts, total_send, total_recv);
if (options.master_mode) {
V("M: waiting for slaves STAT...\n");
for (int i = 0; i < options.num_slaves; i++) {
if (dsmbr_recv_ctrl_msg(slaves[i].conn_fd, &ctrl_msg) != 0) {
E("dsmbr_recv_ctrl_msg failed with %d.\n", errno);
}
if (ctrl_msg.code != CTRL_STAT) {
E("Unexpected ctrl code %d, expected: %d.\n", ctrl_msg.code,
CTRL_STAT);
}
slaves[i].reqs = ctrl_msg.data[0];
slaves[i].time = ctrl_msg.data[1];
slaves[i].send_sz = ctrl_msg.data[2];
slaves[i].recv_sz = ctrl_msg.data[3];
V("M: received stats - Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n",
slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz);
}
} else if (options.slave_mode) {
V("M: STAT to master...\n");
ctrl_msg.code = CTRL_STAT;
ctrl_msg.data[0] = total_req;
ctrl_msg.data[1] = end_ts - start_ts;
ctrl_msg.data[2] = total_send;
ctrl_msg.data[3] = total_recv;
if (dsmbr_send_ctrl_msg(master_ctrl_conn, &ctrl_msg) != 0) {
E("dsmbr_send_ctrl_msg failed with %d.\n", errno);
}
}
if (!options.slave_mode) {
V("Saving results to %s ...\n", options.output_file);
for (int i = 0; i < options.num_slaves; i++) {
fprintf(resp_fp_csv, "S,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time,
slaves[i].send_sz, slaves[i].recv_sz);
}
fprintf(resp_fp_csv, "M,%lu,%lu,%lu,%lu\n", total_req, end_ts - start_ts,
total_send, total_recv);
for (auto worker : workers) {
for (auto stat : worker->stats) {
if (stat->send_ts < warmup_ts) {
fprintf(resp_fp_csv, "W,%lu,%lu,%lu,%lu\n", stat->send_ts,
stat->recv_ts, stat->send_size, stat->recv_size);
} else {
fprintf(resp_fp_csv, "R,%lu,%lu,%lu,%lu\n", stat->send_ts,
stat->recv_ts, stat->send_size, stat->recv_size);
}
}
}
fflush(resp_fp_csv);
}
// clean up
if (resp_fp_csv != NULL) {
fclose(resp_fp_csv);
}
sem_destroy(&options.worker_init_sem);
close(kqfd);
if (slave_ctrl_sock != -1) {
close(slave_ctrl_sock);
}
if (master_ctrl_conn != -1) {
close(master_ctrl_conn);
}
for (int i = 0; i < options.num_slaves; i++) {
close(slaves[i].conn_fd);
}
for (auto worker : workers) {
dsmbr_free_worker(worker);
}
options.m_info->global_free_cb(options.m_global_ctx);
for (int i = 0; i < options.mod_argc; i++) {
delete[] options.mod_argk[i];
delete[] options.mod_argv[i];
}
return 0;
}