958 lines
25 KiB
C++
Executable File
958 lines
25 KiB
C++
Executable File
#include <sys/types.h>
|
|
#include <sys/param.h>
|
|
#include <sys/cpuset.h>
|
|
#include <sys/event.h>
|
|
#include <sys/queue.h>
|
|
#include <sys/socket.h>
|
|
|
|
#include <netinet/in.h>
|
|
#include <netinet/tcp.h>
|
|
|
|
#include <arpa/inet.h>
|
|
#include <bsock/bsock.h>
|
|
#include <errno.h>
|
|
#include <netdb.h>
|
|
#include <openssl/err.h>
|
|
#include <openssl/ssl.h>
|
|
#include <pthread.h>
|
|
#include <pthread_np.h>
|
|
#include <semaphore.h>
|
|
#include <unistd.h>
|
|
|
|
#include "dmsg.hh"
|
|
#include "generator.hh"
|
|
#include "io.hh"
|
|
#include "logger.h"
|
|
#include "mod.h"
|
|
#include "msg.hh"
|
|
#include "util.h"
|
|
|
|
#include <atomic>
|
|
#include <cerrno>
|
|
#include <csignal>
|
|
#include <cstdio>
|
|
#include <cstring>
|
|
#include <iostream>
|
|
#include <list>
|
|
#include <set>
|
|
#include <sstream>
|
|
#include <thread>
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
|
|
LOGGER_VAR_DECL;
|
|
|
|
static constexpr int MAX_MOD_ARGS = 32;
|
|
static constexpr int MAX_MOD_ARG_LEN = 128;
|
|
static constexpr int MAX_SLAVES = 32;
|
|
static constexpr int NEVENT = 64;
|
|
static constexpr int CTRL_PORT = 15545;
|
|
static constexpr int CTRL_TIMEOUT = 3;
|
|
static constexpr int CTRL_BACKLOG = 4096;
|
|
static constexpr int BSOCK_BUF_SZ = 4096;
|
|
static constexpr int THRD_INIT_PARALLEL = 2;
|
|
static constexpr int TIMER_MAGIC = 0x123456;
|
|
|
|
static constexpr int STATE_WAITING = 0;
|
|
static constexpr int STATE_RUN = 1;
|
|
static constexpr int STATE_STOP = 2;
|
|
|
|
struct dsmbr_thread_ctx {
|
|
int tid;
|
|
int core;
|
|
std::list<struct dsmbr_request_record *> stats;
|
|
pthread_t thrd;
|
|
};
|
|
|
|
struct dsmbr_request_record {
|
|
uint64_t recv_size;
|
|
uint64_t send_size;
|
|
uint64_t send_ts;
|
|
uint64_t recv_ts;
|
|
};
|
|
|
|
struct dsmbr_conn {
|
|
Generator *ia_gen;
|
|
SSL *ssl;
|
|
char *ssl_readbuf;
|
|
int conn_fd;
|
|
struct bsock *bsock;
|
|
struct ppd_bsock_io_ssl_ctx bsock_ctx;
|
|
|
|
void *m_ctx;
|
|
uint64_t next_send;
|
|
std::list<struct dsmbr_request_record *> req_in_flight;
|
|
};
|
|
|
|
struct dsmbr_slave {
|
|
int conn_fd;
|
|
/* stats */
|
|
uint64_t time;
|
|
uint64_t send_sz;
|
|
uint64_t recv_sz;
|
|
uint64_t reqs;
|
|
};
|
|
|
|
struct dsmbr_options {
|
|
int verbose;
|
|
char server_ip[64];
|
|
int server_port;
|
|
int conn_per_thread;
|
|
int target_qps;
|
|
int depth;
|
|
int warmup;
|
|
int duration;
|
|
char ia_dist[128];
|
|
char output_file[128];
|
|
cpuset_t cpuset;
|
|
char module_path[128];
|
|
char *mod_argk[MAX_MOD_ARGS];
|
|
char *mod_argv[MAX_MOD_ARGS];
|
|
int mod_argc;
|
|
int enable_tls;
|
|
int master_mode;
|
|
int slave_mode;
|
|
|
|
// master mode
|
|
char slave_ips[MAX_SLAVES][64];
|
|
int num_slaves;
|
|
|
|
// global states
|
|
std::atomic<int> state = STATE_WAITING;
|
|
sem_t worker_init_sem;
|
|
std::atomic<int> worker_init_comp = 0;
|
|
SSL_CTX *ssl_ctx;
|
|
ppd_mod_info *m_info;
|
|
void *m_global_ctx;
|
|
};
|
|
|
|
static struct dsmbr_options options = { .verbose = 0,
|
|
.server_ip = "127.0.0.1",
|
|
.conn_per_thread = 1,
|
|
.target_qps = 0,
|
|
.depth = 1,
|
|
.warmup = 0,
|
|
.duration = 5,
|
|
.ia_dist = "fb_ia",
|
|
.output_file = "samples.txt",
|
|
.cpuset = CPUSET_T_INITIALIZER(1),
|
|
.module_path = { 0 },
|
|
.mod_argc = 0,
|
|
.enable_tls = 0,
|
|
.master_mode = 0,
|
|
.slave_mode = 0,
|
|
.num_slaves = 0,
|
|
.ssl_ctx = nullptr,
|
|
.m_info = nullptr,
|
|
.m_global_ctx = nullptr };
|
|
|
|
static void
|
|
dsmbr_dump_options()
|
|
{
|
|
std::stringstream ss;
|
|
ss << "Configuration:\n"
|
|
<< " server: " << options.server_ip << std::endl
|
|
<< " port: " << options.server_port << std::endl
|
|
<< " connections per thread: " << options.conn_per_thread << std::endl
|
|
<< " enable TLS: " << options.enable_tls << std::endl
|
|
<< " target qps: " << options.target_qps << std::endl
|
|
<< " number of threads: " << CPU_COUNT(&options.cpuset) << std::endl
|
|
<< " verbose: " << options.verbose << std::endl
|
|
<< " master mode: " << options.master_mode << std::endl
|
|
<< " slave mode:" << options.slave_mode << std::endl
|
|
<< " module: " << options.module_path << std::endl
|
|
<< " run time: " << options.duration << std::endl
|
|
<< " warmup time: " << options.warmup << std::endl
|
|
<< " interarrival dist: " << options.ia_dist << std::endl
|
|
<< " output file: " << options.output_file << std::endl
|
|
<< " depth: " << options.depth << std::endl
|
|
<< " slaves:";
|
|
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
ss << " " << options.slave_ips[i];
|
|
}
|
|
|
|
ss << std::endl;
|
|
V("%s", ss.str().c_str());
|
|
}
|
|
|
|
static SSL *
|
|
dsmbr_tls_handshake_client(int conn_fd)
|
|
{
|
|
SSL *ssl;
|
|
int r;
|
|
|
|
ssl = SSL_new(options.ssl_ctx);
|
|
if (ssl == NULL) {
|
|
E("SSL_new() failed: %ld\n", ERR_get_error());
|
|
}
|
|
|
|
if (SSL_set_fd(ssl, conn_fd) == 0) {
|
|
E("SSL_set_fd() failed: %ld\n", ERR_get_error());
|
|
}
|
|
|
|
if ((r = SSL_connect(ssl)) <= 0) {
|
|
E("SSL_connect() failed: %ld\n", ERR_get_error());
|
|
}
|
|
|
|
return ssl;
|
|
}
|
|
|
|
static void
|
|
dsmbr_create_tls_context()
|
|
{
|
|
SSL_CTX *ctx;
|
|
|
|
ctx = SSL_CTX_new(TLS_client_method());
|
|
if (!ctx) {
|
|
E("SSL_CTX_new() failed: %ld\n", ERR_get_error());
|
|
}
|
|
|
|
SSL_CTX_set_min_proto_version(ctx, TLS1_2_VERSION);
|
|
SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION);
|
|
|
|
options.ssl_ctx = ctx;
|
|
}
|
|
|
|
static struct dsmbr_conn *
|
|
dsmbr_conn_create(void *thrd_ctx)
|
|
{
|
|
struct sockaddr_in server_addr;
|
|
|
|
server_addr.sin_family = AF_INET;
|
|
server_addr.sin_port = htons(options.server_port);
|
|
server_addr.sin_addr.s_addr = inet_addr(options.server_ip);
|
|
|
|
int enable = 1;
|
|
struct timeval tv = { .tv_sec = CTRL_TIMEOUT, .tv_usec = 0 };
|
|
|
|
int conn_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
if (conn_fd == -1) {
|
|
E("socket() error: %d\n", errno);
|
|
}
|
|
if (setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)) != 0) {
|
|
E("setsockopt() rcvtimeo: %d\n", errno);
|
|
}
|
|
if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) != 0) {
|
|
E("setsockopt() reuseaddr: %d\n", errno);
|
|
}
|
|
if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) != 0) {
|
|
E("setsockopt() reuseport: %d\n", errno);
|
|
}
|
|
if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) != 0) {
|
|
E("setsockopt() nodelay: %d\n", errno);
|
|
}
|
|
if (connect(conn_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0) {
|
|
E("connect() failed: %d\n", errno);
|
|
}
|
|
|
|
V("Established client connection %d...\n", conn_fd);
|
|
struct dsmbr_conn *conn = new struct dsmbr_conn;
|
|
conn->conn_fd = conn_fd;
|
|
struct bsock_ringbuf_io io;
|
|
void *bsock_ctx = nullptr;
|
|
if (options.enable_tls) {
|
|
V("Initiating TLS handshake on connection %d...\n", conn_fd);
|
|
conn->ssl = dsmbr_tls_handshake_client(conn_fd);
|
|
io = ppd_bsock_io_ssl();
|
|
conn->ssl_readbuf = new char[BSOCK_BUF_SZ];
|
|
conn->bsock_ctx.ssl_readbuf = conn->ssl_readbuf;
|
|
conn->bsock_ctx.ssl_readbuf_len = BSOCK_BUF_SZ;
|
|
conn->bsock_ctx.ssl = conn->ssl;
|
|
bsock_ctx = &conn->bsock_ctx;
|
|
} else {
|
|
io = bsock_io_posix();
|
|
conn->ssl_readbuf = nullptr;
|
|
conn->ssl = nullptr;
|
|
bsock_ctx = (void *)(uintptr_t)conn_fd;
|
|
}
|
|
|
|
conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, 0);
|
|
|
|
conn->ia_gen = createGenerator(options.ia_dist);
|
|
if (conn->ia_gen == NULL) {
|
|
E("Failed to create generator \"%s\"\n", options.ia_dist);
|
|
}
|
|
conn->ia_gen->set_lambda((double)options.target_qps /
|
|
(double)(options.conn_per_thread * CPU_COUNT(&options.cpuset)));
|
|
|
|
int status;
|
|
if ((status = options.m_info->conn_init_cb(options.m_global_ctx, thrd_ctx, &conn->m_ctx)) !=
|
|
0) {
|
|
E("Failed to create conn m_ctx: %d\n", status);
|
|
}
|
|
|
|
conn->next_send = 0;
|
|
return conn;
|
|
}
|
|
|
|
static void
|
|
dsmbr_conn_free(struct dsmbr_conn *conn, void *thrd_ctx)
|
|
{
|
|
if (conn->ssl != nullptr) {
|
|
SSL_shutdown(conn->ssl);
|
|
SSL_free(conn->ssl);
|
|
delete[] conn->ssl_readbuf;
|
|
}
|
|
options.m_info->conn_free_cb(options.m_global_ctx, thrd_ctx, conn->m_ctx);
|
|
|
|
bsock_free(conn->bsock);
|
|
close(conn->conn_fd);
|
|
delete conn->ia_gen;
|
|
delete conn;
|
|
}
|
|
|
|
void *
|
|
dsmbr_worker_main(void *ctx)
|
|
{
|
|
struct dsmbr_thread_ctx *tinfo = (struct dsmbr_thread_ctx *)ctx;
|
|
struct timespec timesp = { .tv_sec = 0, .tv_nsec = 0 };
|
|
struct kevent kevs[NEVENT];
|
|
std::vector<struct dsmbr_conn *> conns;
|
|
|
|
sem_wait(&options.worker_init_sem);
|
|
V("Thread %d initializing...\n", tinfo->tid);
|
|
int kqfd = kqueue();
|
|
if (kqfd < 0) {
|
|
E("Thread %d kqueue() failed: %d\n", tinfo->tid, errno);
|
|
}
|
|
struct ppd_msg *mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ];
|
|
|
|
void *m_ctx;
|
|
if (options.m_info->thread_init_cb(tinfo->core, options.m_global_ctx, &m_ctx) != 0) {
|
|
E("thread_create_cb() failed: %d\n", errno);
|
|
}
|
|
|
|
for (int i = 0; i < options.conn_per_thread; i++) {
|
|
struct dsmbr_conn *conn = dsmbr_conn_create(m_ctx);
|
|
conns.push_back(conn);
|
|
|
|
struct kevent kev;
|
|
EV_SET(&kev, conn->conn_fd, EVFILT_READ, EV_ADD, 0, 0, conn);
|
|
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
|
|
E("kevent() failed: %d\n", errno);
|
|
}
|
|
}
|
|
options.worker_init_comp.fetch_add(1, std::memory_order_relaxed);
|
|
sem_post(&options.worker_init_sem);
|
|
|
|
// create connections
|
|
V("Thread %d waiting...\n", tinfo->tid);
|
|
while (options.state.load(std::memory_order_relaxed) == STATE_WAITING) {
|
|
};
|
|
for (struct dsmbr_conn *conn : conns) {
|
|
conn->next_send = get_time_us();
|
|
}
|
|
|
|
V("Thread %d starting...\n", tinfo->tid);
|
|
while (true) {
|
|
int ret = 0;
|
|
|
|
if (options.state.load(std::memory_order_relaxed) == STATE_STOP) {
|
|
break;
|
|
}
|
|
|
|
// read resp first if possible
|
|
if ((ret = kevent(kqfd, NULL, 0, kevs, NEVENT, ×p)) < 0) {
|
|
E("Thread %d kevent() error %d.\n", tinfo->tid, errno);
|
|
}
|
|
|
|
int size = ret;
|
|
for (int i = 0; i < size; i++) {
|
|
struct dsmbr_conn *conn = (struct dsmbr_conn *)kevs[i].udata;
|
|
if (kevs[i].flags & EV_EOF) {
|
|
E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid,
|
|
conn->conn_fd, kevs[i].fflags);
|
|
}
|
|
ret = bsock_poll(conn->bsock);
|
|
if (ret < 0) {
|
|
E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid,
|
|
conn->conn_fd, ret);
|
|
}
|
|
}
|
|
|
|
uint64_t cur_ts = get_time_us();
|
|
for (struct dsmbr_conn *conn : conns) {
|
|
while (true) {
|
|
// read as many messages as possible
|
|
ret = ppd_readmsg(conn->bsock, (char *)mbuf, PPD_MSG_MAX_SZ);
|
|
if (ret == 0) {
|
|
size_t recv_size = mbuf->size + PPD_MSG_HDR_SZ;
|
|
ret = options.m_info->conn_recv_cb(mbuf->data, mbuf->size,
|
|
options.m_global_ctx, m_ctx, conn->m_ctx);
|
|
if (ret != 0) {
|
|
E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n",
|
|
tinfo->tid, conn->conn_fd, errno);
|
|
}
|
|
struct dsmbr_request_record *rec =
|
|
conn->req_in_flight.front();
|
|
conn->req_in_flight.pop_front();
|
|
rec->recv_size = recv_size;
|
|
rec->recv_ts = cur_ts;
|
|
tinfo->stats.push_back(rec);
|
|
D("Thread %d connection %d RECV msg of size %u latency %lu\n",
|
|
tinfo->tid, conn->conn_fd, mbuf->size,
|
|
rec->recv_ts - rec->send_ts);
|
|
} else {
|
|
if (errno == ENODATA) {
|
|
// not enough data for next msg
|
|
break;
|
|
} else {
|
|
E("ppd_readmsg failed with err %d\n", errno);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (conn->next_send <= cur_ts &&
|
|
conn->req_in_flight.size() < (unsigned int)options.depth) {
|
|
// send
|
|
size_t out_sz;
|
|
ret = options.m_info->conn_send_cb(mbuf->data, PPD_MSG_MAX_DATA_SZ,
|
|
&out_sz, options.m_global_ctx, m_ctx, conn->m_ctx);
|
|
if (ret != 0) {
|
|
E("Thread %d connection %d conn_send_cb() failed ERR %d.\n",
|
|
tinfo->tid, conn->conn_fd, errno);
|
|
}
|
|
|
|
mbuf->size = out_sz;
|
|
|
|
struct dsmbr_request_record *rec = new struct dsmbr_request_record;
|
|
rec->send_size = mbuf->size + sizeof(struct ppd_msg);
|
|
rec->send_ts = get_time_us();
|
|
|
|
// no blocking, always flush
|
|
ret = ppd_writemsg(conn->bsock, mbuf);
|
|
if (ret != 0) {
|
|
E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n",
|
|
tinfo->tid, conn->conn_fd, errno);
|
|
}
|
|
|
|
conn->req_in_flight.push_back(rec);
|
|
conn->next_send += (uint64_t)(conn->ia_gen->generate() *
|
|
(double)S2US);
|
|
D("Thread %d connection %d SEND msg of size %lu\n", tinfo->tid,
|
|
conn->conn_fd, out_sz);
|
|
}
|
|
}
|
|
}
|
|
|
|
V("Thread %d exiting...\n", tinfo->tid);
|
|
close(kqfd);
|
|
delete[] mbuf;
|
|
|
|
for (unsigned int i = 0; i < conns.size(); i++) {
|
|
dsmbr_conn_free(conns.at(i), m_ctx);
|
|
}
|
|
|
|
options.m_info->thread_free_cb(tinfo->core, options.m_global_ctx, m_ctx);
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
dsmbr_free_worker(struct dsmbr_thread_ctx *worker)
|
|
{
|
|
for (auto stat : worker->stats) {
|
|
delete stat;
|
|
}
|
|
delete worker;
|
|
}
|
|
|
|
static struct dsmbr_thread_ctx *
|
|
dsmbr_create_worker(int tid, int core)
|
|
{
|
|
struct dsmbr_thread_ctx *tinfo = new struct dsmbr_thread_ctx;
|
|
tinfo->tid = tid;
|
|
tinfo->core = core;
|
|
|
|
pthread_attr_t attr;
|
|
int status;
|
|
if ((status = pthread_attr_init(&attr)) != 0) {
|
|
E("pthread_attr_init() failed: %d\n", status);
|
|
}
|
|
|
|
cpuset_t cpuset;
|
|
CPU_ZERO(&cpuset);
|
|
CPU_SET(core, &cpuset);
|
|
|
|
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
|
|
if (status != 0) {
|
|
E("pthread_attr_setaffinity_np() failed : %d\n", status);
|
|
}
|
|
|
|
status = pthread_create(&tinfo->thrd, &attr, dsmbr_worker_main, tinfo);
|
|
if (status != 0) {
|
|
E("pthread_create() failed: %d\n", status);
|
|
}
|
|
|
|
return tinfo;
|
|
}
|
|
|
|
static void
|
|
dsmbr_master_connect_slaves(struct dsmbr_slave *slaves)
|
|
{
|
|
/* create a connection to the clients */
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
struct sockaddr_in csock_addr;
|
|
int c_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
csock_addr.sin_family = AF_INET;
|
|
csock_addr.sin_addr.s_addr = inet_addr(options.slave_ips[i]);
|
|
csock_addr.sin_port = htons(CTRL_PORT);
|
|
|
|
V("Connecting to slave %s...\n", options.slave_ips[i]);
|
|
|
|
if (connect(c_fd, (struct sockaddr *)&csock_addr, sizeof(csock_addr)) != 0) {
|
|
E("Connect failed. ERR %d\n", errno);
|
|
}
|
|
slaves[i].conn_fd = c_fd;
|
|
}
|
|
}
|
|
|
|
static int
|
|
dsmbr_slave_ctrl_sock_create(void)
|
|
{
|
|
struct sockaddr_in server_addr;
|
|
int status;
|
|
const int enable = 1;
|
|
server_addr.sin_family = AF_INET;
|
|
server_addr.sin_port = htons(CTRL_PORT);
|
|
server_addr.sin_addr.s_addr = INADDR_ANY;
|
|
|
|
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
if (fd < 0) {
|
|
E("socket() returned %d\n", errno);
|
|
}
|
|
|
|
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
|
|
E("setsockopt() NODELAY %d\n", errno);
|
|
}
|
|
|
|
status = bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
|
|
if (status < 0) {
|
|
E("bind() returned %d\n", errno);
|
|
}
|
|
|
|
status = listen(fd, CTRL_BACKLOG);
|
|
if (status < 0) {
|
|
E("listen() returned %d\n", errno);
|
|
}
|
|
|
|
return fd;
|
|
}
|
|
|
|
static void
|
|
usage()
|
|
{
|
|
fprintf(stdout,
|
|
"Options:\n"
|
|
" -q: target qps.\n"
|
|
" -s: server addr.\n"
|
|
" -p: server port.\n"
|
|
" -o: output file path.\n"
|
|
" -a: worker thread cpulist.\n"
|
|
" -c: connections per thread.\n"
|
|
" -m: module path.\n"
|
|
" -M: slave ips, repeatable, enables master mode.\n"
|
|
" -S: slave mode.\n"
|
|
" -v: verbose mode.\n"
|
|
" -t: run time.\n"
|
|
" -T: warmup time.\n"
|
|
" -i: interarrival time distribution.\n"
|
|
" -O: module arguments ('key'='value'), repeatable.\n\n");
|
|
}
|
|
|
|
static int
|
|
dsmbr_slave_accept_master(int slave_ctrl_sock)
|
|
{
|
|
int ret;
|
|
int conn_fd;
|
|
struct sockaddr addr;
|
|
socklen_t addrlen;
|
|
|
|
ret = accept(slave_ctrl_sock, &addr, &addrlen);
|
|
if (ret < 0) {
|
|
E("Failed to accept master connection: %d\n", errno);
|
|
}
|
|
conn_fd = ret;
|
|
|
|
V("Accepted master connection %d.\n", conn_fd);
|
|
|
|
return conn_fd;
|
|
}
|
|
|
|
static void
|
|
dsmbr_getopt(int argc, char *argv[])
|
|
{
|
|
char ch;
|
|
while ((ch = getopt(argc, argv, "q:s:p:o:a:c:m:M:Svt:T:i:O:h")) != -1) {
|
|
switch (ch) {
|
|
case 'q': {
|
|
options.target_qps = strtoul(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
case 's': {
|
|
if (nslookup(optarg, options.server_ip, sizeof(options.server_ip)) != 0) {
|
|
E("nslookup failed with %d for %s\n", errno, optarg);
|
|
}
|
|
break;
|
|
}
|
|
case 'p': {
|
|
options.server_port = strtoul(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
case 'o': {
|
|
strncpy(options.output_file, optarg, sizeof(options.output_file));
|
|
break;
|
|
}
|
|
case 'a': {
|
|
cpulist_to_cpuset(optarg, &options.cpuset);
|
|
break;
|
|
}
|
|
case 'c': {
|
|
options.conn_per_thread = strtoul(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
case 'm': {
|
|
strncpy(options.module_path, optarg, sizeof(options.module_path));
|
|
break;
|
|
}
|
|
case 'M': {
|
|
options.master_mode = 1;
|
|
if (nslookup(optarg, options.slave_ips[options.num_slaves],
|
|
sizeof(options.slave_ips[0])) != 0) {
|
|
E("nslookup failed with %d for %s\n", errno, optarg);
|
|
}
|
|
options.num_slaves++;
|
|
if (options.num_slaves > MAX_SLAVES) {
|
|
E("Too many slaves: %d > %d\n", options.num_slaves, MAX_SLAVES);
|
|
}
|
|
break;
|
|
}
|
|
case 'S': {
|
|
options.slave_mode = 1;
|
|
break;
|
|
}
|
|
case 'v': {
|
|
options.verbose++;
|
|
if (options.verbose == INFO_VERBOSITY) {
|
|
W("Verbose mode may cause SUBSTANTIAL latency fluctuation in some terminals emulators\n");
|
|
} else if (options.verbose == DEBUG_VERBOSITY) {
|
|
W("Debug mode enabled. Expect very verbose output.\n");
|
|
}
|
|
logger_set_verbosity(options.verbose);
|
|
break;
|
|
}
|
|
case 't': {
|
|
options.duration = strtoul(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
case 'T': {
|
|
options.warmup = strtoul(optarg, NULL, 10);
|
|
break;
|
|
}
|
|
case 'i': {
|
|
strncpy(options.ia_dist, optarg, sizeof(options.ia_dist));
|
|
break;
|
|
}
|
|
case 'O': {
|
|
options.mod_argk[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
|
|
options.mod_argv[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
|
|
split_kvstr(optarg, "=", options.mod_argk[options.mod_argc],
|
|
MAX_MOD_ARG_LEN, options.mod_argv[options.mod_argc], MAX_MOD_ARG_LEN);
|
|
options.mod_argc++;
|
|
break;
|
|
}
|
|
case 'h':
|
|
usage();
|
|
exit(0);
|
|
default:
|
|
E("Unrecognized option -%c\n\n", ch);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
dsmbr_send_ctrl_code(int sock, uint32_t code)
|
|
{
|
|
int ret;
|
|
struct dsmbr_ctrl_msg msg;
|
|
msg.code = code;
|
|
ret = dsmbr_send_ctrl_msg(sock, &msg);
|
|
if (ret != 0) {
|
|
E("Failed to send ctrl message %d\n", errno);
|
|
}
|
|
return;
|
|
}
|
|
|
|
static void
|
|
dsmbr_wait_ctrl_code(int sock, uint32_t code)
|
|
{
|
|
int ret;
|
|
struct dsmbr_ctrl_msg msg;
|
|
ret = dsmbr_recv_ctrl_msg(sock, &msg);
|
|
if (ret != 0) {
|
|
E("Failed to recv ctrl message %d\n", errno);
|
|
}
|
|
if (msg.code != code) {
|
|
E("Unexpected message code 0x%x, expected 0x%x\n", msg.code, code);
|
|
}
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* protocol:
|
|
*
|
|
* master -> client SYNC
|
|
* client and master all establish connections to the server
|
|
* client -> master ACK
|
|
* master -> client START
|
|
* master RUNS for X seconds, client runs forever
|
|
* master -> client STOP
|
|
* client -> master STAT
|
|
*/
|
|
int
|
|
main(int argc, char *argv[])
|
|
{
|
|
dsmbr_getopt(argc, argv);
|
|
dsmbr_dump_options();
|
|
|
|
dsmbr_create_tls_context();
|
|
|
|
if (sem_init(&options.worker_init_sem, 0, THRD_INIT_PARALLEL) != 0) {
|
|
E("sem_init failed: %d\n", errno);
|
|
}
|
|
|
|
V("Loading module %s...\n", options.module_path);
|
|
options.m_info = ppd_load_module(options.module_path);
|
|
V("Loaded module \"%s\".\n", options.m_info->name);
|
|
|
|
V("Initializing module...\n");
|
|
options.m_info->global_init_cb(options.mod_argc, options.mod_argk, options.mod_argv,
|
|
&options.m_global_ctx);
|
|
|
|
FILE *resp_fp_csv = NULL;
|
|
if (!options.slave_mode) {
|
|
resp_fp_csv = fopen(options.output_file, "w");
|
|
if (resp_fp_csv == NULL) {
|
|
E("Cannot open file %s for writing %d.", options.output_file, errno);
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
struct dsmbr_slave slaves[MAX_SLAVES];
|
|
int ret;
|
|
int slave_ctrl_sock = -1;
|
|
int master_ctrl_conn = -1;
|
|
std::vector<struct dsmbr_thread_ctx *> workers;
|
|
uint64_t start_ts;
|
|
uint64_t end_ts;
|
|
uint64_t warmup_ts;
|
|
struct dsmbr_ctrl_msg ctrl_msg;
|
|
int kqfd = kqueue();
|
|
if (kqfd < 0) {
|
|
E("kqueue() failed with %d\n", errno);
|
|
}
|
|
|
|
if (options.master_mode) {
|
|
/* initiate connections to clients */
|
|
V("M: connecting to slaves.\n");
|
|
dsmbr_master_connect_slaves(slaves);
|
|
V("M: SYNC to slaves.\n");
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_SYNC);
|
|
}
|
|
} else if (options.slave_mode) {
|
|
slave_ctrl_sock = dsmbr_slave_ctrl_sock_create();
|
|
V("S: waiting for master connection...\n");
|
|
master_ctrl_conn = dsmbr_slave_accept_master(slave_ctrl_sock);
|
|
// add to kqueue
|
|
V("S: waiting for master SYNC %d...\n", master_ctrl_conn);
|
|
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_SYNC);
|
|
|
|
struct kevent kev;
|
|
EV_SET(&kev, master_ctrl_conn, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
|
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
|
|
E("kevent() failed: %d\n", errno);
|
|
}
|
|
}
|
|
|
|
// here slaves and master are on the same page */
|
|
int core;
|
|
int tid = 0;
|
|
CPU_FOREACH_ISSET (core, &options.cpuset) {
|
|
workers.push_back(dsmbr_create_worker(tid, core));
|
|
V("Created thread %d on core %d.\n", tid, core);
|
|
tid++;
|
|
}
|
|
|
|
V("Waiting for worker threads connection establishment.\n");
|
|
while (options.worker_init_comp.load(std::memory_order_relaxed) !=
|
|
CPU_COUNT(&options.cpuset)) {
|
|
};
|
|
|
|
// now all connections are established
|
|
if (options.master_mode) {
|
|
V("M: waiting for slaves ACK...\n");
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
dsmbr_wait_ctrl_code(slaves[i].conn_fd, CTRL_ACK);
|
|
}
|
|
V("M: START to slaves...\n");
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_START);
|
|
}
|
|
} else if (options.slave_mode) {
|
|
V("S: ACK to master...\n");
|
|
dsmbr_send_ctrl_code(master_ctrl_conn, CTRL_ACK);
|
|
V("S: waiting for master START...\n");
|
|
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_START);
|
|
}
|
|
|
|
V("Starting benchmark...\n");
|
|
start_ts = get_time_us();
|
|
warmup_ts = start_ts;
|
|
// seq_cst to make sure start_ts happens before store
|
|
options.state.store(STATE_RUN, std::memory_order_seq_cst);
|
|
|
|
int cur_second = 0;
|
|
struct kevent kev;
|
|
if (!options.slave_mode) {
|
|
// slave mode runs forever
|
|
EV_SET(&kev, TIMER_MAGIC, EVFILT_TIMER, EV_ADD, NOTE_USECONDS, S2US, NULL);
|
|
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
|
|
E("kevent() failed: %d\n", errno);
|
|
}
|
|
}
|
|
while (true) {
|
|
if (kevent(kqfd, NULL, 0, &kev, 1, NULL) == -1) {
|
|
E("kevent() failed: %d\n", errno);
|
|
}
|
|
|
|
if (kev.ident == TIMER_MAGIC) {
|
|
if (kev.data != 1) {
|
|
E("Timer expired multiple times\n");
|
|
}
|
|
cur_second++;
|
|
if ((int)cur_second == options.warmup) {
|
|
warmup_ts = get_time_us();
|
|
}
|
|
if ((int)cur_second >= options.duration + options.warmup) {
|
|
break;
|
|
}
|
|
} else if (options.slave_mode && (int)kev.ident == master_ctrl_conn) {
|
|
dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_STOP);
|
|
V("S: received STOP from master.\n");
|
|
break;
|
|
} else {
|
|
E("Unknown kevent ident %d\n", (int)kev.ident);
|
|
}
|
|
}
|
|
|
|
V("Stopping benchmark...\n");
|
|
options.state.store(STATE_STOP, std::memory_order_seq_cst);
|
|
end_ts = get_time_us();
|
|
|
|
if (options.master_mode) {
|
|
V("M: STOP to slaves...\n");
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_STOP);
|
|
}
|
|
}
|
|
|
|
// wait for thread exit
|
|
for (unsigned int i = 0; i < workers.size(); i++) {
|
|
if ((ret = pthread_join(workers.at(i)->thrd, NULL)) != 0) {
|
|
E("pthread_join tid %d err %d\n", workers.at(i)->tid, ret);
|
|
}
|
|
}
|
|
|
|
// obtain stats from threads
|
|
uint64_t total_req = 0;
|
|
uint64_t total_send = 0;
|
|
uint64_t total_recv = 0;
|
|
for (auto worker : workers) {
|
|
for (auto stat : worker->stats) {
|
|
total_req++;
|
|
total_send += stat->send_size;
|
|
total_recv += stat->recv_size;
|
|
}
|
|
}
|
|
|
|
V("Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", total_req, end_ts - start_ts,
|
|
total_send, total_recv);
|
|
|
|
if (options.master_mode) {
|
|
V("M: waiting for slaves STAT...\n");
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
if (dsmbr_recv_ctrl_msg(slaves[i].conn_fd, &ctrl_msg) != 0) {
|
|
E("dsmbr_recv_ctrl_msg failed with %d.\n", errno);
|
|
}
|
|
if (ctrl_msg.code != CTRL_STAT) {
|
|
E("Unexpected ctrl code %d, expected: %d.\n", ctrl_msg.code,
|
|
CTRL_STAT);
|
|
}
|
|
slaves[i].reqs = ctrl_msg.data[0];
|
|
slaves[i].time = ctrl_msg.data[1];
|
|
slaves[i].send_sz = ctrl_msg.data[2];
|
|
slaves[i].recv_sz = ctrl_msg.data[3];
|
|
V("M: received stats - Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n",
|
|
slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz);
|
|
}
|
|
} else if (options.slave_mode) {
|
|
V("M: STAT to master...\n");
|
|
ctrl_msg.code = CTRL_STAT;
|
|
ctrl_msg.data[0] = total_req;
|
|
ctrl_msg.data[1] = end_ts - start_ts;
|
|
ctrl_msg.data[2] = total_send;
|
|
ctrl_msg.data[3] = total_recv;
|
|
if (dsmbr_send_ctrl_msg(master_ctrl_conn, &ctrl_msg) != 0) {
|
|
E("dsmbr_send_ctrl_msg failed with %d.\n", errno);
|
|
}
|
|
}
|
|
|
|
if (!options.slave_mode) {
|
|
V("Saving results to %s ...\n", options.output_file);
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
fprintf(resp_fp_csv, "S,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time,
|
|
slaves[i].send_sz, slaves[i].recv_sz);
|
|
}
|
|
fprintf(resp_fp_csv, "M,%lu,%lu,%lu,%lu\n", total_req, end_ts - start_ts,
|
|
total_send, total_recv);
|
|
for (auto worker : workers) {
|
|
for (auto stat : worker->stats) {
|
|
if (stat->send_ts < warmup_ts) {
|
|
fprintf(resp_fp_csv, "W,%lu,%lu,%lu,%lu\n", stat->send_ts,
|
|
stat->recv_ts, stat->send_size, stat->recv_size);
|
|
} else {
|
|
fprintf(resp_fp_csv, "R,%lu,%lu,%lu,%lu\n", stat->send_ts,
|
|
stat->recv_ts, stat->send_size, stat->recv_size);
|
|
}
|
|
}
|
|
}
|
|
fflush(resp_fp_csv);
|
|
}
|
|
|
|
// clean up
|
|
if (resp_fp_csv != NULL) {
|
|
fclose(resp_fp_csv);
|
|
}
|
|
sem_destroy(&options.worker_init_sem);
|
|
close(kqfd);
|
|
if (slave_ctrl_sock != -1) {
|
|
close(slave_ctrl_sock);
|
|
}
|
|
if (master_ctrl_conn != -1) {
|
|
close(master_ctrl_conn);
|
|
}
|
|
for (int i = 0; i < options.num_slaves; i++) {
|
|
close(slaves[i].conn_fd);
|
|
}
|
|
for (auto worker : workers) {
|
|
dsmbr_free_worker(worker);
|
|
}
|
|
options.m_info->global_free_cb(options.m_global_ctx);
|
|
for (int i = 0; i < options.mod_argc; i++) {
|
|
delete[] options.mod_argk[i];
|
|
delete[] options.mod_argv[i];
|
|
}
|
|
return 0;
|
|
}
|