half done

This commit is contained in:
quackerd 2023-03-02 15:12:35 +01:00
parent 3c684a39e6
commit da273c5f39
7 changed files with 299 additions and 206 deletions

View File

@ -25,14 +25,15 @@ pkg_check_modules(bsock REQUIRED bsock)
set(CFLAGS -Wall -Wextra -Werror -Wno-unused-parameter -Wno-unused-variable -std=c++17 -O2 -g)
add_executable(dsmbr ${CMAKE_CURRENT_SOURCE_DIR}/ppd/dsmbr.cc
${CMAKE_CURRENT_SOURCE_DIR}/ppd/util.cc)
add_executable(dsmbr ${CMAKE_CURRENT_SOURCE_DIR}/dsmbr/dsmbr.cc
${CMAKE_CURRENT_SOURCE_DIR}/common/io.cc
${CMAKE_CURRENT_SOURCE_DIR}/dsmbr/dmsg.cc)
target_link_libraries(dsmbr pthread bsock)
target_compile_options(dsmbr PRIVATE ${CFLAGS} ${bsock_CFLAGS})
target_include_directories(dsmbr PRIVATE ${bsock_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR}/include)
add_executable(ppd ${CMAKE_CURRENT_SOURCE_DIR}/ppd/ppd.cc
${CMAKE_CURRENT_SOURCE_DIR}/ppd/util.cc)
${CMAKE_CURRENT_SOURCE_DIR}/common/io.cc)
target_link_libraries(ppd pthread bsock ${bsock_CFLAGS})
target_compile_options(ppd PRIVATE ${CFLAGS} ${bsock_INCLUDE_DIRS})
target_include_directories(ppd PRIVATE ${bsock_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR}/include)

View File

@ -1,10 +1,9 @@
#include <netinet/in.h>
#include <openssl/ssl.h>
#include <stdio.h>
#include <unistd.h>
#include <bsock/bsock.h>
#include "bsock/bsock.h"
#include "logger.h"
#include "msg.h"
#include "io.h"
@ -150,3 +149,46 @@ ppd_writemsg(struct bsock * bsock, struct ppd_msg *msg)
return bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz);
}
int
ppd_readbuf(int fd, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = recv(fd, buf, len, 0)) > 0) {
buf = (char *)buf + status;
len -= status;
} else if (status == 0) {
errno = ECONNRESET;
return -1;
} else {
if (errno != EINTR) {
return -1;
}
}
};
return 0;
}
int
ppd_writebuf(int fd, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = send(fd, buf, len, 0)) > 0) {
buf = (char *)buf + status;
len -= status;
} else if (status == 0) {
errno = ECONNRESET;
return -1;
} else {
return -1;
}
};
return 0;
}

49
dsmbr/dmsg.cc Normal file
View File

@ -0,0 +1,49 @@
#include "dmsg.h"
#include "io.h"
#include <cstdlib>
#include <netinet/in.h>
int
dsmbr_send_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg)
{
return ppd_writebuf(sockfd, &msg, sizeof(struct dsmbr_ctrl_msg));
}
int
dsmbr_sendall_ctrl_msg(int * sockfds, int count, int code)
{
int status = 0;
for(int i = 0; i < count; i++) {
status = dsmbr_send_ctrl_msg(sockfds[i], code, nullptr, 0);
if (status != 0) {
break;
}
}
return status;
}
int
dsmbr_recv_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg)
{
int status = ppd_readbuf(sockfd, msg, sizeof(struct dsmbr_ctrl_msg));
if (status == 0) {
msg->code = ntohl(msg->code);
}
return status;
}
int
dsmbr_recvall_ctrl_msg(int * sockfds, int count, struct dsmbr_ctrl_msg * msgs)
{
int status = 0;
for(int i = 0; i < count; i++) {
status = dsmbr_recv_ctrl_msg(sockfds[i], &msgs[i]);
if (status != 0) {
break;
}
}
return status;
}

27
dsmbr/dmsg.h Normal file
View File

@ -0,0 +1,27 @@
#pragma once
#include <cstdint>
#include <cstdio>
static constexpr int CTRL_SYNC = 0x1234;
static constexpr int CTRL_ACK = 0x2345;
static constexpr int CTRL_STOP = 0x3456;
static constexpr int CTRL_STAT = 0x4567;
static constexpr int DSMBR_MSG_MAX_DATA = 16;
struct dsmbr_ctrl_msg {
uint32_t code;
uint64_t data[DSMBR_MSG_MAX_DATA];
};
int
dsmbr_send_ctrl_msg(int sockfd, int code, char * data, size_t len);
int
dsmbr_sendall_ctrl_msg(int * sockfds, int count, int code);
int
dsmbr_recv_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg);
int
dsmbr_recvall_ctrl_msg(int * sockfds, int count, struct dsmbr_ctrl_msg * msgs);

View File

@ -4,6 +4,7 @@
#include <cstdio>
#include <iostream>
#include <sys/_pthreadtypes.h>
#include <sys/_stdint.h>
#include <sys/param.h>
#include <vector>
#include <thread>
@ -23,6 +24,7 @@
#include <netdb.h>
#include <pthread.h>
#include <pthread_np.h>
#include <bsock/bsock.h>
#include "openssl/ssl.h"
#include "openssl/err.h"
@ -31,11 +33,17 @@
#include "logger.h"
#include "mod.h"
#include "msg.h"
#include "io.h"
#include "dmsg.h"
static constexpr int MAX_MOD_ARGS = 32;
static constexpr int MAX_MOD_ARG_LEN = 128;
static constexpr int MAX_SLAVES = 32;
static constexpr int NEVENT = 64;
static constexpr int CTRL_PORT = 15367;
static constexpr int CTRL_TIMEOUT = 3;
static constexpr int CTRL_BACKLOG = 4096;
static constexpr int BSOCK_BUF_SZ = 4096;
static constexpr int KQ_TIMER_MAGIC = 0x3355;
struct dsmbr_thread_ctx {
@ -49,10 +57,6 @@ struct dsmbr_thread_ctx {
std::vector<struct dsmbr_conn *> conns;
};
static constexpr int CTRL_SYNC = 0x1234;
static constexpr int CTRL_ACK = 0x2345;
static constexpr int CTRL_STOP = 0x3456;
static constexpr int CTRL_STAT = 0x4567;
struct dsmbr_request_record {
uint64_t recv_size;
@ -62,16 +66,14 @@ struct dsmbr_request_record {
uint64_t epoch;
};
static constexpr int DSMBR_MSG_MAX_DATA = 16;
struct dsmbr_ctrl_msg {
uint32_t code;
uint64_t data[DSMBR_MSG_MAX_DATA];
};
struct dsmbr_conn {
Generator *ia_gen;
SSL* ssl;
char * ssl_readbuf;
int conn_fd;
struct bsock * bsock;
struct ppd_bsock_io_ssl_ctx bsock_ctx;
int timer_expired;
void * m_ctx;
uint64_t next_send;
@ -79,6 +81,14 @@ struct dsmbr_conn {
std::list<struct dsmbr_request_record *> req_in_flight;
};
struct dsmbr_slave {
int conn_fd;
/* stats */
uint64_t runtime;
uint64_t avg_pkt_sz;
uint64_t pkt_cnt;
};
struct dsmbr_options {
int verbose;
char server_ip[64];
@ -96,9 +106,6 @@ struct dsmbr_options {
char* mod_argv[MAX_MOD_ARGS];
int mod_argc;
int enable_tls;
int enable_ktls;
char tls_sk[128];
char tls_cert[128];
// master mode
char * slave_ips[MAX_SLAVES];
@ -130,9 +137,6 @@ static struct dsmbr_options options = {
.module_path = {0},
.mod_argc = 0,
.enable_tls = 0,
.enable_ktls = 0,
.tls_sk = {0},
.tls_cert = {0},
.slave_ips = {nullptr},
.num_slaves = 0,
.is_master = 0,
@ -179,59 +183,6 @@ dsmbr_dump_options()
return;
}
static int
dsmbr_send_ctrl_msg(int sockfd, int code, char * data, size_t len)
{
struct dsmbr_ctrl_msg msg;
if (len > sizeof(dsmbr_ctrl_msg::data)) {
return EOVERFLOW;
}
msg.code = htonl(code);
if (data != nullptr) {
memcpy(msg.data, data, len);
}
return ppd_writebuf(sockfd, &msg, sizeof(msg));
}
static int
dsmbr_sendall_ctrl_msg(int * sockfds, int count, int code)
{
int status = 0;
for(int i = 0; i < count; i++) {
status = dsmbr_send_ctrl_msg(sockfds[i], code, nullptr, 0);
if (status != 0) {
break;
}
}
return status;
}
static int
dsmbr_recv_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg)
{
int status = ppd_readbuf(sockfd, msg, sizeof(struct dsmbr_ctrl_msg));
if (status == 0) {
msg->code = ntohl(msg->code);
}
return status;
}
static int
dsmbr_recvall_ctrl_msg(int * sockfds, int count, struct dsmbr_ctrl_msg * msgs)
{
int status = 0;
for(int i = 0; i < count; i++) {
status = dsmbr_recv_ctrl_msg(sockfds[i], &msgs[i]);
if (status != 0) {
break;
}
}
return status;
}
static SSL *
dsmbr_tls_handshake_client(int conn_fd)
@ -252,28 +203,39 @@ dsmbr_tls_handshake_client(int conn_fd)
E("SSL_connect() failed: %ld\n", ERR_get_error());
}
if (options.enable_ktls) {
// make sure ktls is enabled
if (!(BIO_get_ktls_send(SSL_get_wbio(ssl)) &&
BIO_get_ktls_recv(SSL_get_rbio(ssl)))) {
E("kTLS not enabled on %d: %ld\n", conn_fd, ERR_get_error());
}
}
return ssl;
}
static void
dsmbr_create_tls_context()
{
SSL_CTX *ctx;
ctx = SSL_CTX_new(TLS_client_method());
if (!ctx) {
E("SSL_CTX_new() failed: %ld\n", ERR_get_error());
}
SSL_CTX_set_min_proto_version(ctx, TLS1_2_VERSION);
SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION);
options.ssl_ctx = ctx;
}
static struct dsmbr_conn *
dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx)
{
struct sockaddr_in server_addr;
SSL * ssl = nullptr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(options.server_port);
server_addr.sin_addr.s_addr = inet_addr(options.server_ip);
int enable = 1;
struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
struct timeval tv = { .tv_sec = CTRL_TIMEOUT, .tv_usec = 0 };
int conn_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (conn_fd == -1) {
@ -294,25 +256,39 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx)
if (connect(conn_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) != 0) {
E("connect() failed: %d\n", errno);
}
V("Established client connection %d...", conn_fd);
struct dsmbr_conn * conn = new struct dsmbr_conn;
conn->conn_fd = conn_fd;
conn->depth = 0;
struct bsock_ringbuf_io io;
void * bsock_ctx = nullptr;
if (options.enable_tls) {
V("Initiating TLS handshake on connection %d...", conn_fd);
conn->ssl = dsmbr_tls_handshake_client(conn_fd);
io = ppd_bsock_io_ssl();
conn->ssl_readbuf = new char[BSOCK_BUF_SZ];
conn->bsock_ctx.ssl_readbuf = conn->ssl_readbuf;
conn->bsock_ctx.ssl_readbuf_len = BSOCK_BUF_SZ;
conn->bsock_ctx.ssl = conn->ssl;
bsock_ctx = &conn->bsock_ctx;
} else {
io = bsock_io_posix();
conn->ssl_readbuf = nullptr;
conn->ssl = nullptr;
bsock_ctx = (void *)(uintptr_t)conn_fd;
}
conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, BSOCK_BUF_SZ);
conn->next_send = 0;
conn->last_send = 0;
conn->state = STATE_WAITING;
conn->timer_expired = 0;
conn->ia_gen = createGenerator(options.ia_dist);
if (conn->ia_gen == NULL) {
E("Failed to create generator \"%s\"\n", options.ia_dist);
}
conn->ia_gen->set_lambda((double)options.target_qps / (double)(options.conn_per_thread * CPU_COUNT(&options.cpuset)));
if (options.enable_tls) {
conn->ssl = dsmbr_tls_handshake_client(conn_fd);
} else {
conn->ssl = nullptr;
}
int status;
if ((status = options.m_info->conn_create_cb(options.m_global_ctx, thrd_ctx->m_ctx, &conn->m_ctx)) != 0) {
E("Failed to create conn m_ctx: %d\n", status);
@ -340,8 +316,10 @@ dsmbr_conn_free(struct dsmbr_thread_ctx *ctx, struct dsmbr_conn *conn)
if (conn->ssl != nullptr) {
SSL_shutdown(conn->ssl);
SSL_free(conn->ssl);
delete[] conn->ssl_readbuf;
}
bsock_free(conn->bsock);
close(conn->conn_fd);
for (auto it = ctx->conns.begin(); it != ctx->conns.end(); ) {
@ -356,70 +334,6 @@ dsmbr_conn_free(struct dsmbr_thread_ctx *ctx, struct dsmbr_conn *conn)
delete conn;
}
void * dsmbr_worker_main(void*);
static struct dsmbr_thread_ctx *
dsmbr_create_worker(int tid, int core)
{
struct dsmbr_thread_ctx * tinfo = new struct dsmbr_thread_ctx;
int kqfd = kqueue();
if (kqfd != 0) {
E("kqueue() failed: %d\n", errno);
}
tinfo->kqfd = kqfd;
int pipes[2];
if (pipe(pipes) != 0) {
E("pipe() failed: %d\n", errno);
}
tinfo->ctrl_pipe_r = pipes[0];
tinfo->ctrl_pipe_w = pipes[1];
struct kevent kev;
EV_SET(&kev, tinfo->ctrl_pipe_r, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
if (sem_init(&tinfo->start_sem, 0, 0) != 0) {
E("sem_init() failed: %d\n", errno);
}
pthread_attr_t attr;
int status;
if ((status = pthread_attr_init(&attr)) != 0) {
E("pthread_attr_init() failed: %d\n", status);
}
cpuset_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) {
E("pthread_attr_setaffinity_np() failed : %d\n", status);
}
status = pthread_create(&tinfo->thrd, &attr, dsmbr_worker_main, tinfo);
if (status != 0) {
E("pthread_create() failed: %d\n", status);
}
return tinfo;
}
static void
dsmbr_create_workers()
{
int tid = 0;
int core;
CPU_FOREACH_ISSET(core, &options.cpuset) {
options.workers.push_back(dsmbr_create_worker(tid, core));
tid++;
V("Created thread %d on core %d\n", tid, core);
}
}
void dsmbr_handle_event(struct dsmbr_thread_ctx *tinfo, struct kevent *kev)
{
@ -561,6 +475,69 @@ dsmbr_worker_main(void * ctx)
V("Thread %d exiting...\n", id);
}
static struct dsmbr_thread_ctx *
dsmbr_create_worker(int tid, int core)
{
struct dsmbr_thread_ctx * tinfo = new struct dsmbr_thread_ctx;
int kqfd = kqueue();
if (kqfd != 0) {
E("kqueue() failed: %d\n", errno);
}
tinfo->kqfd = kqfd;
int pipes[2];
if (pipe(pipes) != 0) {
E("pipe() failed: %d\n", errno);
}
tinfo->ctrl_pipe_r = pipes[0];
tinfo->ctrl_pipe_w = pipes[1];
struct kevent kev;
EV_SET(&kev, tinfo->ctrl_pipe_r, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
if (sem_init(&tinfo->start_sem, 0, 0) != 0) {
E("sem_init() failed: %d\n", errno);
}
pthread_attr_t attr;
int status;
if ((status = pthread_attr_init(&attr)) != 0) {
E("pthread_attr_init() failed: %d\n", status);
}
cpuset_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) {
E("pthread_attr_setaffinity_np() failed : %d\n", status);
}
status = pthread_create(&tinfo->thrd, &attr, dsmbr_worker_main, tinfo);
if (status != 0) {
E("pthread_create() failed: %d\n", status);
}
return tinfo;
}
static void
dsmbr_create_workers()
{
int tid = 0;
int core;
CPU_FOREACH_ISSET(core, &options.cpuset) {
options.workers.push_back(dsmbr_create_worker(tid, core));
tid++;
V("Created thread %d on core %d\n", tid, core);
}
}
static std::string
get_ip_from_hostname(std::string hostname)
{
@ -615,46 +592,36 @@ establish_slave_conns()
}
}
static void
slave_listen_socket_create()
static int
dsmbr_slave_ctrl_sock_create()
{
struct sockaddr_in server_addr;
int status;
const int enable = 1;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MAX_SLAVES);
server_addr.sin_port = htons(CTRL_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
for (int i = 0; i < 1; i++) {
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
E("socket() returned %d", errno);
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) {
E("setsockopt() reuseaddr %d", errno);
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) {
E("setsockopt() reuseport", errno);
}
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
E("setsockopt() NODELAY %d", errno);
}
status = bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status < 0) {
E("bind() returned %d", errno);
}
status = listen(fd, SOCK_BACKLOG);
if (status < 0) {
E("listen() returned %d", errno);
}
socks->push_back(fd);
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
E("socket() returned %d", errno);
}
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
E("setsockopt() NODELAY %d", errno);
}
status = bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status < 0) {
E("bind() returned %d", errno);
}
status = listen(fd, CTRL_BACKLOG);
if (status < 0) {
E("listen() returned %d", errno);
}
return fd;
}
static void client_send_stats(int qps, int send_sz, int recv_sz)
@ -777,6 +744,7 @@ static void usage()
* client and master all establish connections to the server
* client -> master ACK
* master -> client START (client runs forever)
* client -> master START_ACK
* master RUNS for X seconds
* master -> client STOP
* client -> master STATS
@ -901,32 +869,33 @@ main(int argc, char* argv[])
}
}
::signal(SIGPIPE, SIG_IGN);
if (!options.client_mode) {
resp_fp_csv = fopen(options.output_name, "w");
if (resp_fp_csv == NULL) {
E("cannot open file for writing %s.", options.output_name);
E("Cannot open file %s for writing %d.", options.output_name, errno);
exit(1);
}
}
if (options.master_mode && options.master_server_ip_given == 0) {
/* fall back to ip from -s */
strncpy(options.master_server_ip, options.server_ip, INET_ADDRSTRLEN);
}
/* create main thread's kqueue */
int mt_kq = kqueue();
struct dsmbr_ctrl_msg ctrl_msg;
int ret = 0;
int slave_sock = -1;
int master_conn = -1;
std::vector<struct dsmbr_slave *> slaves;
options.client_num = client_ips.size();
mt_kq = kqueue();
/* connect to clients and sync options */
if (options.master_mode) {
/* make master connections to clients */
prepare_clients();
/* initiate connections to clients */
} else if (options.client_mode) {
/* in client mode we receive all parameters from the server */
wait_master_prepare();
slave_sock = dsmbr_slave_ctrl_sock_create();
V("Waiting for master connection...\n");
ret = ppd_readbuf(slave_sock, &ctrl_msg, sizeof(ctrl_msg));
if (ret != 0) {
E("Failed to read from listen socket: %d\n", errno);
}
if (ctrl_msg.)
}
/* here EVERYONE is on the same page */

View File

@ -20,9 +20,14 @@ struct ppd_bsock_io_ssl_ctx {
size_t ssl_readbuf_len;
};
struct bsock_ringbuf_io ppd_bsock_io_ssl();
struct bsock_ringbuf_io ppd_bsock_io_ssl(void);
int ppd_readmsg(struct bsock *bsock, char *buf, size_t len);
int ppd_writemsg(struct bsock *bsock, struct ppd_msg *msg);
int
ppd_readbuf(int fd, void *buf, int len);
int
ppd_writebuf(int fd, void *buf, int len);

View File

@ -195,7 +195,7 @@ listen_socket_create(std::vector<int> *socks)
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) {
E("setsockopt() reuseport", errno);
E("setsockopt() reuseport %d", errno);
}
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {