ppd <-> dsmbr working.

This commit is contained in:
quackerd 2023-03-13 08:32:55 +01:00
parent b67f77e824
commit b4e9801d98
8 changed files with 206 additions and 131 deletions

View File

@ -122,50 +122,47 @@ ppd_readmsg(struct bsock * bsock, char *buf, size_t len)
int status;
struct ppd_msg *msg = (struct ppd_msg *)buf;
if (len < sizeof(struct ppd_msg)) {
if (len < PPD_MSG_HDR_SZ) {
errno = EINVAL;
return -1;
}
status = bsock_peek(bsock, buf, sizeof(struct ppd_msg));
status = bsock_peek(bsock, buf, PPD_MSG_HDR_SZ);
if (status < 0) {
return status;
} else if (status != sizeof(struct ppd_msg)) {
status = ENODATA;
} else if (status != PPD_MSG_HDR_SZ) {
errno = ENODATA;
return -1;
}
int sz = ntohl(msg->size);
if (sz > (int)(len - sizeof(struct ppd_msg))) {
if (sz > (int)(len - PPD_MSG_HDR_SZ)) {
errno = ENOMEM;
return -1;
}
if (bsock_read_avail_size(bsock) < (int)(sizeof(struct ppd_msg) + sz)) {
status = ENODATA;
if (bsock_read_avail_size(bsock) < (int)(PPD_MSG_HDR_SZ + sz)) {
errno = ENODATA;
return -1;
}
status = bsock_read(bsock, buf, sizeof(struct ppd_msg) + sz);
status = bsock_read(bsock, buf, PPD_MSG_HDR_SZ + sz);
if (status < 0) {
return status;
} else if (status != sizeof(struct ppd_msg) + sz) {
} else if (status != PPD_MSG_HDR_SZ + sz) {
// this should never happen unless there is a bug with libbsock
status = EIO;
errno = EIO;
return -1;
}
msg->size = sz;
return status;
return 0;
}
/*
* returns: 0 on success
* -1 + errno on fail
* unrecoverable errnos:
* EAGAIN - not all data is written. How much data is written is unknown.
@ -178,12 +175,12 @@ ppd_writemsg(struct bsock * bsock, struct ppd_msg *msg)
msg->size = htonl(sz);
status = bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz);
status = bsock_write(bsock, (char *)msg, PPD_MSG_HDR_SZ + sz);
if (status < 0) {
// not all message was sent
return status;
} else if (status != sz) {
} else if (status != PPD_MSG_HDR_SZ + sz) {
errno = EAGAIN;
return -1;
}

View File

@ -35,6 +35,7 @@
#include <iostream>
#include <list>
#include <set>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <vector>
@ -103,8 +104,8 @@ struct dsmbr_options {
char output_file[128];
cpuset_t cpuset;
char module_path[128];
char mod_argk[MAX_MOD_ARGS][MAX_MOD_ARG_LEN];
char mod_argv[MAX_MOD_ARGS][MAX_MOD_ARG_LEN];
char *mod_argk[MAX_MOD_ARGS];
char *mod_argv[MAX_MOD_ARGS];
int mod_argc;
int enable_tls;
int master_mode;
@ -146,36 +147,31 @@ static struct dsmbr_options options = { .verbose = 0,
static void
dsmbr_dump_options()
{
// V ("Configuration:\n"
// " connections per thread: %d\n"
// " num threads: %d\n"
// " target QPS: %d\n"
// " warmup: %d\n"
// " duration: %d\n"
// " master mode: %d\n"
// " client mode: %d\n"
// " output file: %s\n"
// " server ip: %s\n"
// " server port: %d\n"
// " interarrival dist: %s\n"
// " num_workload_param: %d\n"
// " module path: %d\n",
// this->client_conn,
// this->client_thread_count,
// this->target_qps,
// this->warmup,
// this->duration,
// this->master_mode,
// this->client_mode,
// this->output_name,
// this->server_ip,
// this->server_port,
// this->generator_name,
// this->master_server_ip,
// this->workload_type,
// this->num_gen_params,
// this->global_conn_start_idx.load());
return;
std::stringstream ss;
ss << "Configuration:\n"
<< " server: " << options.server_ip << std::endl
<< " port: " << options.server_port << std::endl
<< " connections per thread: " << options.conn_per_thread << std::endl
<< " enable TLS: " << options.enable_tls << std::endl
<< " target qps: " << options.target_qps << std::endl
<< " number of threads: " << CPU_COUNT(&options.cpuset) << std::endl
<< " verbose: " << options.verbose << std::endl
<< " master mode: " << options.master_mode << std::endl
<< " slave mode:" << options.slave_mode << std::endl
<< " module: " << options.module_path << std::endl
<< " run time: " << options.duration << std::endl
<< " warmup time: " << options.warmup << std::endl
<< " interarrival dist: " << options.ia_dist << std::endl
<< " output file: " << options.output_file << std::endl
<< " depth: " << options.depth << std::endl
<< " slaves:";
for (int i = 0; i < options.num_slaves; i++) {
ss << " " << options.slave_ips[i];
}
ss << std::endl;
V("%s", ss.str().c_str());
}
static SSL *
@ -248,13 +244,13 @@ dsmbr_conn_create(void *thrd_ctx)
E("connect() failed: %d\n", errno);
}
V("Established client connection %d...", conn_fd);
V("Established client connection %d...\n", conn_fd);
struct dsmbr_conn *conn = new struct dsmbr_conn;
conn->conn_fd = conn_fd;
struct bsock_ringbuf_io io;
void *bsock_ctx = nullptr;
if (options.enable_tls) {
V("Initiating TLS handshake on connection %d...", conn_fd);
V("Initiating TLS handshake on connection %d...\n", conn_fd);
conn->ssl = dsmbr_tls_handshake_client(conn_fd);
io = ppd_bsock_io_ssl();
conn->ssl_readbuf = new char[BSOCK_BUF_SZ];
@ -279,8 +275,8 @@ dsmbr_conn_create(void *thrd_ctx)
(double)(options.conn_per_thread * CPU_COUNT(&options.cpuset)));
int status;
if ((status = options.m_info->conn_create_cb(options.m_global_ctx, thrd_ctx,
&conn->m_ctx)) != 0) {
if ((status = options.m_info->conn_init_cb(options.m_global_ctx, thrd_ctx, &conn->m_ctx)) !=
0) {
E("Failed to create conn m_ctx: %d\n", status);
}
@ -296,7 +292,7 @@ dsmbr_conn_free(struct dsmbr_conn *conn, void *thrd_ctx)
SSL_free(conn->ssl);
delete[] conn->ssl_readbuf;
}
options.m_info->conn_destroy_cb(options.m_global_ctx, thrd_ctx, conn->m_ctx);
options.m_info->conn_free_cb(options.m_global_ctx, thrd_ctx, conn->m_ctx);
bsock_free(conn->bsock);
close(conn->conn_fd);
@ -315,13 +311,13 @@ dsmbr_worker_main(void *ctx)
sem_wait(&options.worker_init_sem);
V("Thread %d initializing...\n", tinfo->tid);
int kqfd = kqueue();
if (kqfd != 0) {
if (kqfd < 0) {
E("Thread %d kqueue() failed: %d\n", tinfo->tid, errno);
}
struct ppd_msg *mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ];
void *m_ctx;
if (options.m_info->thread_create_cb(tinfo->core, options.m_global_ctx, &m_ctx) != 0) {
if (options.m_info->thread_init_cb(tinfo->core, options.m_global_ctx, &m_ctx) != 0) {
E("thread_create_cb() failed: %d\n", errno);
}
@ -339,7 +335,7 @@ dsmbr_worker_main(void *ctx)
sem_post(&options.worker_init_sem);
// create connections
V("Thread %d waiting for start...\n", tinfo->tid);
V("Thread %d waiting...\n", tinfo->tid);
while (options.state.load(std::memory_order_relaxed) == STATE_WAITING) {
};
for (struct dsmbr_conn *conn : conns) {
@ -379,20 +375,22 @@ dsmbr_worker_main(void *ctx)
// read as many messages as possible
ret = ppd_readmsg(conn->bsock, (char *)mbuf, PPD_MSG_MAX_SZ);
if (ret == 0) {
size_t recv_size = mbuf->size + sizeof(struct ppd_msg);
size_t recv_size = mbuf->size + PPD_MSG_HDR_SZ;
ret = options.m_info->conn_recv_cb(mbuf->data, mbuf->size,
options.m_global_ctx, m_ctx, conn->m_ctx);
if (ret != 0) {
E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n",
tinfo->tid, conn->conn_fd, errno);
}
struct dsmbr_request_record *rec =
conn->req_in_flight.front();
conn->req_in_flight.pop_front();
rec->recv_size = recv_size;
rec->recv_ts = cur_ts;
tinfo->stats.push_back(rec);
V("Thread %d connection %d RECV msg of size %u latency %lu\n",
tinfo->tid, conn->conn_fd, mbuf->size,
rec->recv_ts - rec->send_ts);
} else {
if (errno == ENODATA) {
// not enough data for next msg
@ -430,6 +428,8 @@ dsmbr_worker_main(void *ctx)
conn->req_in_flight.push_back(rec);
conn->next_send += (uint64_t)(conn->ia_gen->generate() *
(double)S2US);
V("Thread %d connection %d SEND msg of size %lu\n", tinfo->tid,
conn->conn_fd, out_sz);
}
}
}
@ -441,11 +441,13 @@ dsmbr_worker_main(void *ctx)
for (unsigned int i = 0; i < conns.size(); i++) {
dsmbr_conn_free(conns.at(i), m_ctx);
}
options.m_info->thread_free_cb(tinfo->core, options.m_global_ctx, m_ctx);
return NULL;
}
static void
dsmbr_free_worker(struct dsmbr_thread_ctx * worker)
dsmbr_free_worker(struct dsmbr_thread_ctx *worker)
{
for (auto stat : worker->stats) {
delete stat;
@ -484,7 +486,7 @@ dsmbr_create_worker(int tid, int core)
}
static void
dsmbr_master_connect_slaves(struct dsmbr_slave * slaves)
dsmbr_master_connect_slaves(struct dsmbr_slave *slaves)
{
/* create a connection to the clients */
for (int i = 0; i < options.num_slaves; i++) {
@ -503,7 +505,6 @@ dsmbr_master_connect_slaves(struct dsmbr_slave * slaves)
}
}
static int
dsmbr_slave_ctrl_sock_create(void)
{
@ -586,7 +587,7 @@ static void
dsmbr_getopt(int argc, char *argv[])
{
char ch;
while ((ch = getopt(argc, argv, "q:s:p:o:a:c:m:M:Sv:t:T:i:O:h")) != -1) {
while ((ch = getopt(argc, argv, "q:s:p:o:a:c:m:M:Svt:T:i:O:h")) != -1) {
switch (ch) {
case 'q': {
options.target_qps = strtoul(optarg, NULL, 10);
@ -620,8 +621,8 @@ dsmbr_getopt(int argc, char *argv[])
}
case 'M': {
options.master_mode = 1;
if(nslookup(optarg, options.slave_ips[options.num_slaves],
sizeof(options.slave_ips[0])) != 0) {
if (nslookup(optarg, options.slave_ips[options.num_slaves],
sizeof(options.slave_ips[0])) != 0) {
E("nslookup failed with %d for %s\n", errno, optarg);
}
options.num_slaves++;
@ -652,9 +653,10 @@ dsmbr_getopt(int argc, char *argv[])
break;
}
case 'O': {
options.mod_argk[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
options.mod_argv[options.mod_argc] = new char[MAX_MOD_ARG_LEN];
split_kvstr(optarg, "=", options.mod_argk[options.mod_argc],
sizeof(options.mod_argk[0]), options.mod_argv[options.mod_argc],
sizeof(options.mod_argv[0]));
MAX_MOD_ARG_LEN, options.mod_argv[options.mod_argc], MAX_MOD_ARG_LEN);
options.mod_argc++;
break;
}
@ -713,11 +715,19 @@ main(int argc, char *argv[])
dsmbr_dump_options();
dsmbr_create_tls_context();
if (sem_init(&options.worker_init_sem, 0, THRD_INIT_PARALLEL) != 0) {
E("sem_init failed: %d\n", errno);
}
V("Loading module %s...\n", options.module_path);
options.m_info = ppd_load_module(options.module_path);
V("Loaded module \"%s\".\n", options.m_info->name);
V("Initializing module...\n");
options.m_info->global_init_cb(options.mod_argc, options.mod_argk, options.mod_argv,
&options.m_global_ctx);
FILE *resp_fp_csv = NULL;
if (!options.slave_mode) {
resp_fp_csv = fopen(options.output_file, "w");
@ -734,6 +744,7 @@ main(int argc, char *argv[])
std::vector<struct dsmbr_thread_ctx *> workers;
uint64_t start_ts;
uint64_t end_ts;
uint64_t warmup_ts;
struct dsmbr_ctrl_msg ctrl_msg;
int kqfd = kqueue();
if (kqfd < 0) {
@ -765,14 +776,16 @@ main(int argc, char *argv[])
// here slaves and master are on the same page */
int core;
int tid = 0;
CPU_FOREACH_ISSET(core, &options.cpuset) {
CPU_FOREACH_ISSET (core, &options.cpuset) {
workers.push_back(dsmbr_create_worker(tid, core));
V("Created thread %d on core %d.\n", tid, core);
tid++;
}
V("Waiting for worker threads connection establishment.\n");
while(options.worker_init_comp.load(std::memory_order_relaxed) != CPU_COUNT(&options.cpuset)) {};
while (options.worker_init_comp.load(std::memory_order_relaxed) !=
CPU_COUNT(&options.cpuset)) {
};
// now all connections are established
if (options.master_mode) {
@ -793,13 +806,15 @@ main(int argc, char *argv[])
V("Starting benchmark...\n");
start_ts = get_time_us();
options.state.store(STATE_RUN, std::memory_order_relaxed);
warmup_ts = start_ts;
// seq_cst to make sure start_ts happens before store
options.state.store(STATE_RUN, std::memory_order_seq_cst);
int cur_second = 0;
struct kevent kev;
if (!options.slave_mode) {
// slave mode runs forever
EV_SET(&kev, TIMER_MAGIC, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, S2US, NULL);
EV_SET(&kev, TIMER_MAGIC, EVFILT_TIMER, EV_ADD, NOTE_USECONDS, S2US, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
@ -810,7 +825,13 @@ main(int argc, char *argv[])
}
if (kev.ident == TIMER_MAGIC) {
if (kev.data != 1) {
E("Timer expired multiple times\n");
}
cur_second++;
if ((int)cur_second == options.warmup) {
warmup_ts = get_time_us();
}
if ((int)cur_second >= options.duration + options.warmup) {
break;
}
@ -824,7 +845,7 @@ main(int argc, char *argv[])
}
V("Stopping benchmark...\n");
options.state.store(STATE_STOP, std::memory_order_relaxed);
options.state.store(STATE_STOP, std::memory_order_seq_cst);
end_ts = get_time_us();
if (options.master_mode) {
@ -833,7 +854,7 @@ main(int argc, char *argv[])
dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_STOP);
}
}
// wait for thread exit
for (unsigned int i = 0; i < workers.size(); i++) {
if ((ret = pthread_join(workers.at(i)->thrd, NULL)) != 0) {
@ -846,14 +867,15 @@ main(int argc, char *argv[])
uint64_t total_send = 0;
uint64_t total_recv = 0;
for (auto worker : workers) {
for (auto stat : worker->stats) {
for (auto stat : worker->stats) {
total_req++;
total_send += stat->send_size;
total_recv += stat->recv_size;
}
}
V("Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", total_req, end_ts - start_ts, total_send, total_recv);
V("Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", total_req, end_ts - start_ts,
total_send, total_recv);
if (options.master_mode) {
V("M: waiting for slaves STAT...\n");
@ -862,13 +884,15 @@ main(int argc, char *argv[])
E("dsmbr_recv_ctrl_msg failed with %d.\n", errno);
}
if (ctrl_msg.code != CTRL_STAT) {
E("Unexpected ctrl code %d, expected: %d.\n", ctrl_msg.code, CTRL_STAT);
E("Unexpected ctrl code %d, expected: %d.\n", ctrl_msg.code,
CTRL_STAT);
}
slaves[i].reqs = ctrl_msg.data[0];
slaves[i].time = ctrl_msg.data[1];
slaves[i].send_sz = ctrl_msg.data[2];
slaves[i].recv_sz = ctrl_msg.data[3];
V("M: received stats - Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz);
V("M: received stats - Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n",
slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz);
}
} else if (options.slave_mode) {
V("M: STAT to master...\n");
@ -882,15 +906,23 @@ main(int argc, char *argv[])
}
}
V("Saving results to %s ...", options.output_file);
V("Saving results to %s ...\n", options.output_file);
if (!options.slave_mode) {
for (int i = 0; i < options.num_slaves; i++) {
fprintf(resp_fp_csv, "S,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz);
fprintf(resp_fp_csv, "S,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time,
slaves[i].send_sz, slaves[i].recv_sz);
}
fprintf(resp_fp_csv, "M,%lu,%lu,%lu,%lu\n", total_req, end_ts - start_ts, total_send, total_recv);
fprintf(resp_fp_csv, "M,%lu,%lu,%lu,%lu\n", total_req, end_ts - start_ts,
total_send, total_recv);
for (auto worker : workers) {
for (auto stat : worker->stats) {
fprintf(resp_fp_csv, "R,%lu,%lu,%lu,%lu\n", stat->send_ts, stat->recv_ts, stat->send_size, stat->recv_size);
if (stat->send_ts < warmup_ts) {
fprintf(resp_fp_csv, "W,%lu,%lu,%lu,%lu\n", stat->send_ts,
stat->recv_ts, stat->send_size, stat->recv_size);
} else {
fprintf(resp_fp_csv, "R,%lu,%lu,%lu,%lu\n", stat->send_ts,
stat->recv_ts, stat->send_size, stat->recv_size);
}
}
}
fflush(resp_fp_csv);
@ -914,6 +946,10 @@ main(int argc, char *argv[])
for (auto worker : workers) {
dsmbr_free_worker(worker);
}
options.m_info->global_free_cb(options.m_global_ctx);
for (int i = 0; i < options.mod_argc; i++) {
delete[] options.mod_argk[i];
delete[] options.mod_argv[i];
}
return 0;
}

View File

@ -13,11 +13,15 @@ extern "C" {
typedef int (*ppd_global_init_cb)(int argc, char** argk, char** argv, void **ctx);
typedef int (*ppd_thread_create_cb)(int core, void * global_ctx, void **ctx);
typedef void (*ppd_global_free_cb)(void *ctx);
typedef int (*ppd_conn_create_cb)(void * global_ctx, void * thread_ctx, void **ctx);
typedef int (*ppd_thread_init_cb)(int core, void * global_ctx, void **ctx);
typedef void (*ppd_conn_destroy_cb)(void * global_ctx, void * thread_ctx, void *conn_ctx);
typedef void (*ppd_thread_free_cb)(int core, void *global_ctx, void *ctx);
typedef int (*ppd_conn_init_cb)(void * global_ctx, void * thread_ctx, void **ctx);
typedef void (*ppd_conn_free_cb)(void * global_ctx, void * thread_ctx, void *conn_ctx);
typedef int (*ppd_conn_recv_cb)(const char * data, size_t sz, void * global_ctx, void * thread_ctx, void * conn_ctx);
@ -26,23 +30,15 @@ typedef int (*ppd_conn_send_cb)(const char * out, size_t sz, size_t * out_sz, vo
struct ppd_mod_info {
const char * name;
ppd_global_init_cb global_init_cb;
ppd_thread_create_cb thread_create_cb;
ppd_conn_create_cb conn_create_cb;
ppd_conn_destroy_cb conn_destroy_cb;
ppd_global_free_cb global_free_cb;
ppd_thread_init_cb thread_init_cb;
ppd_thread_free_cb thread_free_cb;
ppd_conn_init_cb conn_init_cb;
ppd_conn_free_cb conn_free_cb;
ppd_conn_send_cb conn_send_cb;
ppd_conn_recv_cb conn_recv_cb;
};
// struct dismemebr_mod_info {
// const char * name;
// ppd_global_init_cb global_init_cb;
// ppd_thread_create_cb thread_create_cb;
// ppd_conn_create_cb conn_create_cb;
// ppd_conn_destroy_cb conn_destroy_cb;
// ppd_conn_send_cb conn_send_cb;
// ppd_conn_recv_cb conn_recv_cb;
// };
typedef struct ppd_mod_info * (*ppd_get_mod_info_fn)(void);
#define PPD_GET_MOD_INFO_ID ppd_getmod_info
#define STR(x) #x

View File

@ -11,4 +11,5 @@ struct ppd_msg {
static constexpr unsigned long PPD_MSG_MAX_SZ = 1024 * 16 * 1024;
static constexpr unsigned long PPD_MSG_HDR_SZ = offsetof(struct ppd_msg, data);
static_assert(PPD_MSG_HDR_SZ == sizeof(ppd_msg));
static constexpr unsigned long PPD_MSG_MAX_DATA_SZ = PPD_MSG_MAX_SZ - PPD_MSG_HDR_SZ;

View File

@ -4,6 +4,7 @@
extern "C" {
#endif
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
@ -15,8 +16,10 @@ extern "C" {
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#define inline inline __attribute__((unused))
@ -65,16 +68,27 @@ nslookup(const char *hostname, char *out, int len)
{
int ret;
struct addrinfo *addrinfo;
ret = getaddrinfo(hostname, nullptr, nullptr, &addrinfo);
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
ret = getaddrinfo(hostname, nullptr, &hints, &addrinfo);
if (ret != 0) {
errno = ret;
return -1;
}
if (inet_ntop(AF_INET, addrinfo->ai_addr, out, len) == NULL) {
return -1;
for (struct addrinfo *cur = addrinfo; cur != NULL; cur = cur->ai_next) {
if (cur->ai_family == AF_INET) {
if (inet_ntop(AF_INET, &((struct sockaddr_in *)addrinfo->ai_addr)->sin_addr, out, len) == NULL) {
return -1;
}
freeaddrinfo(addrinfo);
return 0;
}
}
freeaddrinfo(addrinfo);
return 0;
errno = EADDRNOTAVAIL;
return -1;
}
#undef inline

View File

@ -7,20 +7,32 @@ global_init_cb(UNUSED int argc, UNUSED char **argk, UNUSED char **argv, UNUSED v
return 0;
}
static int
thread_create_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
static void
global_free_cb(UNUSED void *global_ctx)
{
return 0;
return;
}
static int
conn_create_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
thread_init_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
{
return 0;
}
static void
conn_destroy_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
thread_free_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void *ctx)
{
return;
}
static int
conn_init_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
{
return 0;
}
static void
conn_free_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
{
return;
}
@ -40,10 +52,13 @@ conn_send_cb(UNUSED const char *out, UNUSED size_t sz, size_t *out_sz, UNUSED vo
return 0;
}
static struct ppd_mod_info minfo = { .global_init_cb = global_init_cb,
.thread_create_cb = thread_create_cb,
.conn_create_cb = conn_create_cb,
.conn_destroy_cb = conn_destroy_cb,
static struct ppd_mod_info minfo = { .name = "ECHO_DSMBR",
.global_init_cb = global_init_cb,
.global_free_cb = global_free_cb,
.thread_init_cb = thread_init_cb,
.thread_free_cb = thread_free_cb,
.conn_init_cb = conn_init_cb,
.conn_free_cb = conn_free_cb,
.conn_send_cb = conn_send_cb,
.conn_recv_cb = conn_recv_cb };

View File

@ -7,20 +7,32 @@ global_init_cb(UNUSED int argc, UNUSED char **argk, UNUSED char **argv, UNUSED v
return 0;
}
static int
thread_create_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
static void
global_free_cb(UNUSED void *global_ctx)
{
return 0;
return;
}
static int
conn_create_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
thread_init_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
{
return 0;
}
static void
conn_destroy_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
thread_free_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void *ctx)
{
return;
}
static int
conn_init_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
{
return 0;
}
static void
conn_free_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
{
return;
}
@ -40,11 +52,13 @@ conn_send_cb(UNUSED const char *out, UNUSED size_t sz, size_t *out_sz, UNUSED vo
return 0;
}
static struct ppd_mod_info minfo = { .name = "ECHO",
static struct ppd_mod_info minfo = { .name = "ECHO_PPD",
.global_init_cb = global_init_cb,
.thread_create_cb = thread_create_cb,
.conn_create_cb = conn_create_cb,
.conn_destroy_cb = conn_destroy_cb,
.global_free_cb = global_free_cb,
.thread_init_cb = thread_init_cb,
.thread_free_cb = thread_free_cb,
.conn_init_cb = conn_init_cb,
.conn_free_cb = conn_free_cb,
.conn_send_cb = conn_send_cb,
.conn_recv_cb = conn_recv_cb };

View File

@ -40,7 +40,6 @@ static constexpr int SINGLE_LEGACY = -1;
static constexpr int DEFAULT_PORT = 9898;
static constexpr int BSOCK_BUF_SZ = 4096;
// 16MB max per message
static constexpr int MBUF_SZ = 1024 * 1024 * 16;
static constexpr int MAX_MODE_PARAMS = 16;
static constexpr int MAX_MODE_PARAMS_LEN = 128;
@ -238,7 +237,7 @@ ppd_conn_free(struct ppd_thread_ctx *tinfo, struct ppd_conn *conn)
{
void *conn_ctx = conn->m_conn_ctx;
ppd_conn_free_no_ctx(conn);
options.m_info->conn_destroy_cb(options.m_global_ctx, tinfo->m_thread_ctx, conn_ctx);
options.m_info->conn_free_cb(options.m_global_ctx, tinfo->m_thread_ctx, conn_ctx);
}
static void
@ -289,9 +288,9 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
}
while (true) {
status = ppd_readmsg(hint->bsock, tinfo->m_buf, MBUF_SZ);
status = ppd_readmsg(hint->bsock, tinfo->m_buf, PPD_MSG_MAX_SZ);
if (status != 0) {
if (errno == ERANGE) {
if (errno == ENODATA) {
// not enough data yet. try again later.
goto end;
} else {
@ -301,6 +300,7 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
}
msg = (struct ppd_msg *)tinfo->m_buf;
V("Thread %d connection %d RECV msg of size %u\n", tinfo->tid, conn_fd, msg->size);
status = options.m_info->conn_recv_cb(msg->data, msg->size, options.m_global_ctx,
tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
@ -309,7 +309,7 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
goto fail;
}
status = options.m_info->conn_send_cb(msg->data, MBUF_SZ - sizeof(struct ppd_msg), &out_sz,
status = options.m_info->conn_send_cb(msg->data, PPD_MSG_MAX_SZ - PPD_MSG_HDR_SZ, &out_sz,
options.m_global_ctx, tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_send_cb error %d\n", tinfo->tid,
@ -325,9 +325,10 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
goto fail;
}
V("Thread %d connection %d SEND msg of size %lu\n", tinfo->tid, conn_fd, out_sz);
// flush bsock immediately
status = bsock_flush(hint->bsock);
if (status <= 0) {
if (status < 0) {
W("Thread %d dropped connection %d due to bsock_flush ret %d errno %d\n", tinfo->tid, conn_fd, status, errno);
goto fail;
}
@ -353,7 +354,7 @@ worker_main(void *info)
V("Thread %d started.\n", tinfo->tid);
skev_sz = 0;
while (1) {
while (true) {
status = kevent(tinfo->kqfd, skev, skev_sz, kev, NEVENT, NULL);
if (status <= 0) {
@ -470,7 +471,7 @@ create_workers(std::vector<struct ppd_thread_ctx *> *workers)
aligned_alloc(CACHE_LINE_SIZE, sizeof(struct ppd_thread_ctx));
thrd->evcnt = 0;
thrd->tid = tid;
options.m_info->thread_create_cb(core, options.m_global_ctx, &thrd->m_thread_ctx);
options.m_info->thread_init_cb(core, options.m_global_ctx, &thrd->m_thread_ctx);
if (!options.shared_kq) {
kq = kqueue();
@ -494,6 +495,7 @@ create_workers(std::vector<struct ppd_thread_ctx *> *workers)
}
thrd->kqfd = kq;
thrd->m_buf = new char[PPD_MSG_MAX_SZ];
pthread_attr_t attr;
int status;
@ -649,7 +651,7 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx *> *workers)
}
int worker_idx = cur_conn % workers->size();
if (options.m_info->conn_create_cb(options.m_global_ctx,
if (options.m_info->conn_init_cb(options.m_global_ctx,
workers->at(worker_idx)->m_thread_ctx, &conn->m_conn_ctx) != 0) {
W("Failed to create connection %d ctx, dropped.\n", conn_fd);
ppd_conn_free_no_ctx(conn);