This commit is contained in:
oscar 2023-03-07 21:46:23 -05:00
parent da273c5f39
commit bb0a09167a
5 changed files with 217 additions and 166 deletions

View File

@ -1,4 +1,5 @@
#include <netinet/in.h>
#include <errno.h>
#include <openssl/ssl.h>
#include <stdio.h>
#include <unistd.h>
@ -106,6 +107,15 @@ struct bsock_ringbuf_io ppd_bsock_io_ssl()
return io;
}
/*
* returns: 0 on success
* -1 + errno on fail
* recoverable errnos:
* ENOMEM - buffer is not big enough to hold message
* ENODATA - bsock does not have enough data for the entire message
* unrecoverable errnos:
* EIO - libbsock has enough data but didn't give us
*/
int
ppd_readmsg(struct bsock * bsock, char *buf, size_t len)
{
@ -113,44 +123,73 @@ ppd_readmsg(struct bsock * bsock, char *buf, size_t len)
struct ppd_msg *msg = (struct ppd_msg *)buf;
if (len < sizeof(struct ppd_msg)) {
errno = EOVERFLOW;
errno = EINVAL;
return -1;
}
status = bsock_peek(bsock, buf, sizeof(struct ppd_msg));
if (status != 0) {
if (status < 0) {
return status;
} else if (status != sizeof(struct ppd_msg)) {
status = ENODATA;
return -1;
}
int sz = ntohl(msg->size);
if (sz > (int)len) {
errno = EOVERFLOW;
if (sz > (int)(len - sizeof(struct ppd_msg))) {
errno = ENOMEM;
return -1;
}
if (bsock_read_avail_size(bsock) < (int)(sizeof(struct ppd_msg) + sz)) {
status = ENODATA;
return -1;
}
status = bsock_read(bsock, buf, sizeof(struct ppd_msg) + sz);
if (status != 0) {
if (status < 0) {
return status;
} else if (status != sizeof(struct ppd_msg) + sz) {
// this should never happen unless there is a bug with libbsock
status = EIO;
return -1;
}
msg->size = sz;
return status;
}
/*
* returns: 0 on success
* -1 + errno on fail
* unrecoverable errnos:
* EAGAIN - not all data is written. How much data is written is unknown.
*/
int
ppd_writemsg(struct bsock * bsock, struct ppd_msg *msg)
{
int status;
int sz = msg->size;
msg->size = htonl(msg->size);
msg->size = htonl(sz);
return bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz);
status = bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz);
if (status < 0) {
// not all message was sent
return status;
} else if (status != sz) {
errno = EAGAIN;
return -1;
}
return 0;
}
int
ppd_readbuf(int fd, void *buf, int len)
{

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <cstring>
#include <csignal>
#include <cerrno>
@ -25,9 +26,9 @@
#include <pthread.h>
#include <pthread_np.h>
#include <bsock/bsock.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "openssl/ssl.h"
#include "openssl/err.h"
#include "util.h"
#include "generator.h"
#include "logger.h"
@ -44,20 +45,21 @@ static constexpr int CTRL_PORT = 15367;
static constexpr int CTRL_TIMEOUT = 3;
static constexpr int CTRL_BACKLOG = 4096;
static constexpr int BSOCK_BUF_SZ = 4096;
static constexpr int KQ_TIMER_MAGIC = 0x3355;
static constexpr int STATE_WAITING = 0;
static constexpr int STATE_RUN = 1;
static constexpr int STATE_STOP = 2;
struct dsmbr_thread_ctx {
int tid;
int ctrl_pipe_r;
int ctrl_pipe_w;
int kqfd;
struct ppd_msg * mbuf;
pthread_t thrd;
sem_t start_sem;
void * m_ctx;
std::vector<struct dsmbr_conn *> conns;
};
struct dsmbr_request_record {
uint64_t recv_size;
uint64_t send_size;
@ -73,8 +75,7 @@ struct dsmbr_conn {
int conn_fd;
struct bsock * bsock;
struct ppd_bsock_io_ssl_ctx bsock_ctx;
int timer_expired;
void * m_ctx;
uint64_t next_send;
std::vector<struct dsmbr_request_record *> stats;
@ -112,6 +113,7 @@ struct dsmbr_options {
int num_slaves;
// global states
std::atomic<int> state = STATE_WAITING;
int is_master;
int is_slave;
int slave_sockfds[MAX_SLAVES];
@ -281,8 +283,6 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx)
conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, BSOCK_BUF_SZ);
conn->next_send = 0;
conn->timer_expired = 0;
conn->ia_gen = createGenerator(options.ia_dist);
if (conn->ia_gen == NULL) {
E("Failed to create generator \"%s\"\n", options.ia_dist);
@ -300,6 +300,7 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx)
E("kevent() failed: %d\n", errno);
}
conn->next_send = 0;
thrd_ctx->conns.push_back(conn);
return conn;
}
@ -334,145 +335,80 @@ dsmbr_conn_free(struct dsmbr_thread_ctx *ctx, struct dsmbr_conn *conn)
delete conn;
}
void dsmbr_handle_event(struct dsmbr_thread_ctx *tinfo, struct kevent *kev)
{
int kev_fd = ;
if (kev_fd != conn->conn_fd && kev_fd != KQ_TIMER_MAGIC) {
E("kevent() returned unknown fd %d\n", kev_fd);
}
if (kev_fd == KQ_TIMER_MAGIC) {
conn->timer_expired = 1;
}
int status;
struct kevent tmpev;
if (kev_fd != KQ_TIMER_MAGIC) {
} else {
}
if (kev_fd == KQ_TIMER_MAGIC) {
conn->next_send = ;
EV_SET(&tmpev, KQ_TIMER_MAGIC, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, conn->next_send, conn);
if (kevent(tinfo->kqfd, &tmpev, 1, nullptr, 0, nullptr) != 0) {
E("Error arming timer. ERR %d\n", conn->timer, errno);
}
}
case STATE_LIMITING:
if (conn->depth >= options.depth) {
return;
}
conn->state = STATE_WAITING;
break;
case STATE_SENDING: {
/* send one packet and transfer to wait state */
if (conn->depth >= options.depth) {
conn->state = STATE_LIMITING;
/* wait for the read to call us */
break;
} else {
now = get_time_us();
if (now < (int64_t)conn->next_send) {
/* only STATE_WAITING transfers us to this state
* the above condition cannot be true
*/
E("Oscar fucked up\n");
}
conn->depth++;
conn->next_send += (int)(conn->gen->generate() * 1000000.0);
conn->last_send = now;
conn->state = STATE_WAITING;
if (conn->rgen->send_req(conn->conn_fd) < 0) {
/* effectively skipping this packet */
W("Cannot write to connection %d\n", conn->conn_fd);
}
break;
}
}
default:
E("Unknown state %d\n", conn->state);
}
}
if ((int)kev->ident == conn->conn_fd || (int)kev->ident == KQ_TIMER_MAGIC) {
/* we got something to read */
if (status < 0) {
E("Connection %d read_resp failed. ERR: %d\n", conn->conn_fd, errno);
}
conn->depth--;
if (conn->depth < 0) {
E("More recved packets than sent.\n");
}
if ((long)cur_time >= options.warmup) {
thrd_perf_counters[id]->cnt++;
if (!options.client_mode) {
struct datapt* dat = new struct datapt;
dat->qps = 0;
dat->lat = get_time_us() - conn->last_send;
conn->stats->push_back(dat);
V("Conn %d: TS: %d LAT: %ld, QPS: %ld\n", conn->conn_fd, (int)cur_time, dat->lat, (long)dat->qps);
}
}
kqconn_state_machine(conn);
} else
}
void
void *
dsmbr_worker_main(void * ctx)
{
struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx * tinfo)ctx;
struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx *)ctx;
struct timespec timesp = {.tv_sec = 0, .tv_nsec = 0};
struct kevent kevs[NEVENT];
V("Thread %d has established %ld connections.\n", id, conns.size());
pthread_barrier_wait(&prepare_barrier);
V("Thread %d waiting for START...\n", id);
pthread_barrier_wait(&ok_barrier);
V("Thread %d running...\n", id);
/* send the initial packet now */
for (uint32_t i = 0; i < conns.size(); i++) {
conns.at(i)->next_send = get_time_us();
V("Thread %d waiting for start...\n", tinfo->tid);
while(options.state.load(std::memory_order_relaxed) == STATE_WAITING) {
};
for (struct dsmbr_conn * conn : tinfo->conns) {
conn->next_send = get_time_us();
}
V("Thread %d starting...\n", tinfo->tid);
while(true) {
struct kevent kevs[NEVENT];
if (kevent(kq, NULL, 0, &ev, 1, NULL) == 1) {
struct kqconn *conn = (struct kqconn *)ev.udata;
int ret = 0;
struct kevent kev;
if (ev.flags & EV_EOF) {
E("Connection %d dropped due to EV_EOF. ERR: %d\n", conn->conn_fd, ev.fflags);
if (options.state.load(std::memory_order_relaxed) == STATE_STOP) {
break;
}
// read resp first if possible
if ((ret = kevent(tinfo->kqfd, NULL, 0, kevs, NEVENT, &timesp)) < 0) {
E("Thread %d kevent() error %d.\n", tinfo->tid, errno);
}
int size = ret;
for (int i = 0; i < size; i++) {
struct dsmbr_conn * conn = (struct dsmbr_conn *)kevs[i].udata;
if (kevs[i].flags & EV_EOF) {
E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid, conn->conn_fd, kevs[i].fflags);
}
ret = bsock_poll(conn->bsock);
if (ret < 0) {
E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid, conn->conn_fd, ret);
}
}
uint64_t cur_ts = get_time_us();
for (struct dsmbr_conn * conn : tinfo->conns) {
ret = ppd_readmsg(conn->bsock, (char *)tinfo->mbuf, PPD_MSG_MAX_SZ);
if (ret == 0) {
ret = options.m_info->conn_recv_cb(tinfo->mbuf->data, tinfo->mbuf->size, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx);
if (ret != 0) {
E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno);
}
}
if (conn->next_send <= cur_ts && conn->req_in_flight.size() < (unsigned int)options.depth + 1) {
// send
size_t out_sz;
ret = options.m_info->conn_send_cb(tinfo->mbuf->data, PPD_MSG_MAX_SZ, &out_sz, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx);
if (ret != 0) {
E("Thread %d connection %d conn_send_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno);
}
tinfo->mbuf->size = out_sz;
// no blocking, always flush
ret = ppd_writemsg(conn->bsock, tinfo->mbuf);
if (ret != 0) {
E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno);
}
conn->req_in_flight
conn->in_flight++;
conn->next_send += (uint64_t)(conn->ia_gen->generate() * (double)S2US);
}
dsmbr_handle_event(id, &ev, conn);
} else {
E("Thread %d kevent failed. ERR %d\n", id, errno);
}
}
for (uint32_t i = 0; i < conns.size(); i++) {
kqconn_cleanup(conns.at(i));
}
close(kq);
V("Thread %d exiting...\n", id);
V("Thread %d exiting...\n", tinfo->tid);
return NULL;
}
static struct dsmbr_thread_ctx *
@ -486,20 +422,10 @@ dsmbr_create_worker(int tid, int core)
}
tinfo->kqfd = kqfd;
int pipes[2];
if (pipe(pipes) != 0) {
E("pipe() failed: %d\n", errno);
}
tinfo->ctrl_pipe_r = pipes[0];
tinfo->ctrl_pipe_w = pipes[1];
struct kevent kev;
EV_SET(&kev, tinfo->ctrl_pipe_r, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
if (sem_init(&tinfo->start_sem, 0, 0) != 0) {
E("sem_init() failed: %d\n", errno);
tinfo->mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ];
if (options.m_info->thread_create_cb(core, options.m_global_ctx, &tinfo->m_ctx) != 0) {
E("thread_create_cb() failed: %d\n", errno);
}
pthread_attr_t attr;
@ -539,7 +465,7 @@ dsmbr_create_workers()
}
static std::string
get_ip_from_hostname(std::string hostname)
dsmbr_nslookup(std::string hostname)
{
static char rt[100] = {0};
struct in_addr **addr;

75
dsmbr/generator.cc Normal file
View File

@ -0,0 +1,75 @@
// modified from mutilate
#include "generator.h"
Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); }
Generator* createFacebookValue() {
Generator* g = new GPareto(15.0, 214.476, 0.348238);
Discrete* d = new Discrete(g);
d->add(0.00536, 0.0);
d->add(0.00047, 1.0);
d->add(0.17820, 2.0);
d->add(0.09239, 3.0);
d->add(0.00018, 4.0);
d->add(0.02740, 5.0);
d->add(0.00065, 6.0);
d->add(0.00606, 7.0);
d->add(0.00023, 8.0);
d->add(0.00837, 9.0);
d->add(0.00837, 10.0);
d->add(0.08989, 11.0);
d->add(0.00092, 12.0);
d->add(0.00326, 13.0);
d->add(0.01980, 14.0);
return d;
}
Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); }
Generator* createGenerator(std::string str) {
if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey();
else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue();
else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA();
char *s_copy = new char[str.length() + 1];
strcpy(s_copy, str.c_str());
char *saveptr = NULL;
if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) {
double v = atof(s_copy);
delete[] s_copy;
return new Fixed(v);
}
char *t_ptr = strtok_r(s_copy, ":", &saveptr);
char *a_ptr = strtok_r(NULL, ":", &saveptr);
if (t_ptr == NULL) // || a_ptr == NULL)
DIE("strtok(.., \":\") failed to parse %s", str.c_str());
char t = t_ptr[0];
saveptr = NULL;
char *s1 = strtok_r(a_ptr, ",", &saveptr);
char *s2 = strtok_r(NULL, ",", &saveptr);
char *s3 = strtok_r(NULL, ",", &saveptr);
double a1 = s1 ? atof(s1) : 0.0;
double a2 = s2 ? atof(s2) : 0.0;
double a3 = s3 ? atof(s3) : 0.0;
delete[] s_copy;
if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1);
else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2);
else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1);
else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3);
else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3);
else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1);
DIE("Unable to create Generator '%s'", str.c_str());
return NULL;
}

View File

@ -1,8 +1,14 @@
#pragma once
#include <stdint.h>
#include <cstddef>
// Max 16MB per message
struct ppd_msg {
uint32_t size;
char data[0];
};
} __attribute__((packed));
static constexpr unsigned long PPD_MSG_MAX_SZ = 1024 * 16 * 1024;
static constexpr unsigned long PPD_MSG_HDR_SZ = offsetof(struct ppd_msg, data);
static constexpr unsigned long PPD_MSG_MAX_DATA_SZ = PPD_MSG_MAX_SZ - PPD_MSG_HDR_SZ;

View File

@ -1,5 +1,6 @@
#pragma once
#include <sys/_clock_id.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -8,20 +9,24 @@ extern "C" {
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <sys/clock.h>
#include <sys/param.h>
#include <sys/cpuset.h>
#define inline inline __attribute__((unused))
#define S2US (1000000)
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * S2US + ts.tv_nsec / 1000;
}
static inline void
cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset)
{