From bb0a09167a10d751b609d33514e96581b9e21498 Mon Sep 17 00:00:00 2001 From: oscar Date: Tue, 7 Mar 2023 21:46:23 -0500 Subject: [PATCH] wip --- common/io.cc | 55 +++++++++-- dsmbr/dsmbr.cc | 234 ++++++++++++++++----------------------------- dsmbr/generator.cc | 75 +++++++++++++++ include/msg.h | 8 +- include/util.h | 11 ++- 5 files changed, 217 insertions(+), 166 deletions(-) create mode 100644 dsmbr/generator.cc diff --git a/common/io.cc b/common/io.cc index 6704991..9bc101a 100644 --- a/common/io.cc +++ b/common/io.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -106,6 +107,15 @@ struct bsock_ringbuf_io ppd_bsock_io_ssl() return io; } +/* + * returns: 0 on success + * -1 + errno on fail + * recoverable errnos: + * ENOMEM - buffer is not big enough to hold message + * ENODATA - bsock does not have enough data for the entire message + * unrecoverable errnos: + * EIO - libbsock has enough data but didn't give us +*/ int ppd_readmsg(struct bsock * bsock, char *buf, size_t len) { @@ -113,44 +123,73 @@ ppd_readmsg(struct bsock * bsock, char *buf, size_t len) struct ppd_msg *msg = (struct ppd_msg *)buf; if (len < sizeof(struct ppd_msg)) { - errno = EOVERFLOW; + errno = EINVAL; return -1; } status = bsock_peek(bsock, buf, sizeof(struct ppd_msg)); - if (status != 0) { + if (status < 0) { return status; + } else if (status != sizeof(struct ppd_msg)) { + status = ENODATA; + return -1; } int sz = ntohl(msg->size); - if (sz > (int)len) { - errno = EOVERFLOW; + if (sz > (int)(len - sizeof(struct ppd_msg))) { + errno = ENOMEM; + return -1; + } + + if (bsock_read_avail_size(bsock) < (int)(sizeof(struct ppd_msg) + sz)) { + status = ENODATA; return -1; } status = bsock_read(bsock, buf, sizeof(struct ppd_msg) + sz); - if (status != 0) { + if (status < 0) { return status; + } else if (status != sizeof(struct ppd_msg) + sz) { + // this should never happen unless there is a bug with libbsock + status = EIO; + return -1; } msg->size = sz; return status; } +/* + * returns: 0 on success + + + + * -1 + errno on fail + * unrecoverable errnos: + * EAGAIN - not all data is written. How much data is written is unknown. +*/ int ppd_writemsg(struct bsock * bsock, struct ppd_msg *msg) { int status; int sz = msg->size; - msg->size = htonl(msg->size); + msg->size = htonl(sz); - return bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz); + status = bsock_write(bsock, (char *)msg, sizeof(struct ppd_msg) + sz); + + if (status < 0) { + // not all message was sent + return status; + } else if (status != sz) { + errno = EAGAIN; + return -1; + } + return 0; } - int ppd_readbuf(int fd, void *buf, int len) { diff --git a/dsmbr/dsmbr.cc b/dsmbr/dsmbr.cc index 0977c31..5673d30 100755 --- a/dsmbr/dsmbr.cc +++ b/dsmbr/dsmbr.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -25,9 +26,9 @@ #include #include #include +#include +#include -#include "openssl/ssl.h" -#include "openssl/err.h" #include "util.h" #include "generator.h" #include "logger.h" @@ -44,20 +45,21 @@ static constexpr int CTRL_PORT = 15367; static constexpr int CTRL_TIMEOUT = 3; static constexpr int CTRL_BACKLOG = 4096; static constexpr int BSOCK_BUF_SZ = 4096; -static constexpr int KQ_TIMER_MAGIC = 0x3355; + +static constexpr int STATE_WAITING = 0; +static constexpr int STATE_RUN = 1; +static constexpr int STATE_STOP = 2; struct dsmbr_thread_ctx { int tid; - int ctrl_pipe_r; - int ctrl_pipe_w; int kqfd; + + struct ppd_msg * mbuf; pthread_t thrd; - sem_t start_sem; void * m_ctx; std::vector conns; }; - struct dsmbr_request_record { uint64_t recv_size; uint64_t send_size; @@ -73,8 +75,7 @@ struct dsmbr_conn { int conn_fd; struct bsock * bsock; struct ppd_bsock_io_ssl_ctx bsock_ctx; - - int timer_expired; + void * m_ctx; uint64_t next_send; std::vector stats; @@ -112,6 +113,7 @@ struct dsmbr_options { int num_slaves; // global states + std::atomic state = STATE_WAITING; int is_master; int is_slave; int slave_sockfds[MAX_SLAVES]; @@ -281,8 +283,6 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, BSOCK_BUF_SZ); - conn->next_send = 0; - conn->timer_expired = 0; conn->ia_gen = createGenerator(options.ia_dist); if (conn->ia_gen == NULL) { E("Failed to create generator \"%s\"\n", options.ia_dist); @@ -300,6 +300,7 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) E("kevent() failed: %d\n", errno); } + conn->next_send = 0; thrd_ctx->conns.push_back(conn); return conn; } @@ -334,145 +335,80 @@ dsmbr_conn_free(struct dsmbr_thread_ctx *ctx, struct dsmbr_conn *conn) delete conn; } - -void dsmbr_handle_event(struct dsmbr_thread_ctx *tinfo, struct kevent *kev) -{ - int kev_fd = ; - - if (kev_fd != conn->conn_fd && kev_fd != KQ_TIMER_MAGIC) { - E("kevent() returned unknown fd %d\n", kev_fd); - } - - if (kev_fd == KQ_TIMER_MAGIC) { - conn->timer_expired = 1; - } - - int status; - struct kevent tmpev; - - if (kev_fd != KQ_TIMER_MAGIC) { - - } else { - - } - - if (kev_fd == KQ_TIMER_MAGIC) { - conn->next_send = ; - EV_SET(&tmpev, KQ_TIMER_MAGIC, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, conn->next_send, conn); - if (kevent(tinfo->kqfd, &tmpev, 1, nullptr, 0, nullptr) != 0) { - E("Error arming timer. ERR %d\n", conn->timer, errno); - } - - - } - - case STATE_LIMITING: - if (conn->depth >= options.depth) { - return; - } - conn->state = STATE_WAITING; - break; - - case STATE_SENDING: { - /* send one packet and transfer to wait state */ - if (conn->depth >= options.depth) { - conn->state = STATE_LIMITING; - /* wait for the read to call us */ - break; - } else { - now = get_time_us(); - if (now < (int64_t)conn->next_send) { - /* only STATE_WAITING transfers us to this state - * the above condition cannot be true - */ - E("Oscar fucked up\n"); - } - - conn->depth++; - conn->next_send += (int)(conn->gen->generate() * 1000000.0); - conn->last_send = now; - conn->state = STATE_WAITING; - - if (conn->rgen->send_req(conn->conn_fd) < 0) { - /* effectively skipping this packet */ - W("Cannot write to connection %d\n", conn->conn_fd); - } - - break; - } - } - - default: - E("Unknown state %d\n", conn->state); - } - } - - if ((int)kev->ident == conn->conn_fd || (int)kev->ident == KQ_TIMER_MAGIC) { - /* we got something to read */ - - if (status < 0) { - E("Connection %d read_resp failed. ERR: %d\n", conn->conn_fd, errno); - } - - conn->depth--; - if (conn->depth < 0) { - E("More recved packets than sent.\n"); - } - - if ((long)cur_time >= options.warmup) { - thrd_perf_counters[id]->cnt++; - if (!options.client_mode) { - struct datapt* dat = new struct datapt; - dat->qps = 0; - dat->lat = get_time_us() - conn->last_send; - conn->stats->push_back(dat); - V("Conn %d: TS: %d LAT: %ld, QPS: %ld\n", conn->conn_fd, (int)cur_time, dat->lat, (long)dat->qps); - } - } - - kqconn_state_machine(conn); - } else -} - -void +void * dsmbr_worker_main(void * ctx) { - struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx * tinfo)ctx; + struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx *)ctx; + struct timespec timesp = {.tv_sec = 0, .tv_nsec = 0}; + struct kevent kevs[NEVENT]; - V("Thread %d has established %ld connections.\n", id, conns.size()); - pthread_barrier_wait(&prepare_barrier); - - V("Thread %d waiting for START...\n", id); - pthread_barrier_wait(&ok_barrier); - - V("Thread %d running...\n", id); - /* send the initial packet now */ - for (uint32_t i = 0; i < conns.size(); i++) { - conns.at(i)->next_send = get_time_us(); + V("Thread %d waiting for start...\n", tinfo->tid); + while(options.state.load(std::memory_order_relaxed) == STATE_WAITING) { + }; + for (struct dsmbr_conn * conn : tinfo->conns) { + conn->next_send = get_time_us(); } + V("Thread %d starting...\n", tinfo->tid); while(true) { - struct kevent kevs[NEVENT]; - if (kevent(kq, NULL, 0, &ev, 1, NULL) == 1) { - struct kqconn *conn = (struct kqconn *)ev.udata; + int ret = 0; + struct kevent kev; - if (ev.flags & EV_EOF) { - E("Connection %d dropped due to EV_EOF. ERR: %d\n", conn->conn_fd, ev.fflags); + if (options.state.load(std::memory_order_relaxed) == STATE_STOP) { + break; + } + + // read resp first if possible + if ((ret = kevent(tinfo->kqfd, NULL, 0, kevs, NEVENT, ×p)) < 0) { + E("Thread %d kevent() error %d.\n", tinfo->tid, errno); + } + + int size = ret; + for (int i = 0; i < size; i++) { + struct dsmbr_conn * conn = (struct dsmbr_conn *)kevs[i].udata; + if (kevs[i].flags & EV_EOF) { + E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid, conn->conn_fd, kevs[i].fflags); + } + ret = bsock_poll(conn->bsock); + if (ret < 0) { + E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid, conn->conn_fd, ret); + } + } + + uint64_t cur_ts = get_time_us(); + for (struct dsmbr_conn * conn : tinfo->conns) { + ret = ppd_readmsg(conn->bsock, (char *)tinfo->mbuf, PPD_MSG_MAX_SZ); + if (ret == 0) { + ret = options.m_info->conn_recv_cb(tinfo->mbuf->data, tinfo->mbuf->size, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx); + if (ret != 0) { + E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + } + + } + + if (conn->next_send <= cur_ts && conn->req_in_flight.size() < (unsigned int)options.depth + 1) { + // send + size_t out_sz; + ret = options.m_info->conn_send_cb(tinfo->mbuf->data, PPD_MSG_MAX_SZ, &out_sz, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx); + if (ret != 0) { + E("Thread %d connection %d conn_send_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + } + + tinfo->mbuf->size = out_sz; + // no blocking, always flush + ret = ppd_writemsg(conn->bsock, tinfo->mbuf); + if (ret != 0) { + E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + } + conn->req_in_flight + conn->in_flight++; + conn->next_send += (uint64_t)(conn->ia_gen->generate() * (double)S2US); } - - dsmbr_handle_event(id, &ev, conn); - } else { - E("Thread %d kevent failed. ERR %d\n", id, errno); } } - for (uint32_t i = 0; i < conns.size(); i++) { - kqconn_cleanup(conns.at(i)); - } - - close(kq); - - V("Thread %d exiting...\n", id); + V("Thread %d exiting...\n", tinfo->tid); + return NULL; } static struct dsmbr_thread_ctx * @@ -486,20 +422,10 @@ dsmbr_create_worker(int tid, int core) } tinfo->kqfd = kqfd; - int pipes[2]; - if (pipe(pipes) != 0) { - E("pipe() failed: %d\n", errno); - } - tinfo->ctrl_pipe_r = pipes[0]; - tinfo->ctrl_pipe_w = pipes[1]; - struct kevent kev; - EV_SET(&kev, tinfo->ctrl_pipe_r, EVFILT_READ, EV_ADD, 0, 0, NULL); - if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) { - E("kevent() failed: %d\n", errno); - } - - if (sem_init(&tinfo->start_sem, 0, 0) != 0) { - E("sem_init() failed: %d\n", errno); + tinfo->mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ]; + + if (options.m_info->thread_create_cb(core, options.m_global_ctx, &tinfo->m_ctx) != 0) { + E("thread_create_cb() failed: %d\n", errno); } pthread_attr_t attr; @@ -539,7 +465,7 @@ dsmbr_create_workers() } static std::string -get_ip_from_hostname(std::string hostname) +dsmbr_nslookup(std::string hostname) { static char rt[100] = {0}; struct in_addr **addr; diff --git a/dsmbr/generator.cc b/dsmbr/generator.cc new file mode 100644 index 0000000..c8871c9 --- /dev/null +++ b/dsmbr/generator.cc @@ -0,0 +1,75 @@ +// modified from mutilate +#include "generator.h" + +Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); } + +Generator* createFacebookValue() { + Generator* g = new GPareto(15.0, 214.476, 0.348238); + + Discrete* d = new Discrete(g); + d->add(0.00536, 0.0); + d->add(0.00047, 1.0); + d->add(0.17820, 2.0); + d->add(0.09239, 3.0); + d->add(0.00018, 4.0); + d->add(0.02740, 5.0); + d->add(0.00065, 6.0); + d->add(0.00606, 7.0); + d->add(0.00023, 8.0); + d->add(0.00837, 9.0); + d->add(0.00837, 10.0); + d->add(0.08989, 11.0); + d->add(0.00092, 12.0); + d->add(0.00326, 13.0); + d->add(0.01980, 14.0); + + return d; +} + +Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); } + +Generator* createGenerator(std::string str) { + if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey(); + else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue(); + else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA(); + + char *s_copy = new char[str.length() + 1]; + strcpy(s_copy, str.c_str()); + char *saveptr = NULL; + + if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) { + double v = atof(s_copy); + delete[] s_copy; + return new Fixed(v); + } + + char *t_ptr = strtok_r(s_copy, ":", &saveptr); + char *a_ptr = strtok_r(NULL, ":", &saveptr); + + if (t_ptr == NULL) // || a_ptr == NULL) + DIE("strtok(.., \":\") failed to parse %s", str.c_str()); + + char t = t_ptr[0]; + + saveptr = NULL; + char *s1 = strtok_r(a_ptr, ",", &saveptr); + char *s2 = strtok_r(NULL, ",", &saveptr); + char *s3 = strtok_r(NULL, ",", &saveptr); + + double a1 = s1 ? atof(s1) : 0.0; + double a2 = s2 ? atof(s2) : 0.0; + double a3 = s3 ? atof(s3) : 0.0; + + delete[] s_copy; + + if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1); + else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2); + else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1); + else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3); + else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3); + else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1); + + DIE("Unable to create Generator '%s'", str.c_str()); + + return NULL; +} \ No newline at end of file diff --git a/include/msg.h b/include/msg.h index 76cfb78..deaa383 100644 --- a/include/msg.h +++ b/include/msg.h @@ -1,8 +1,14 @@ #pragma once #include +#include +// Max 16MB per message struct ppd_msg { uint32_t size; char data[0]; -}; \ No newline at end of file +} __attribute__((packed)); + +static constexpr unsigned long PPD_MSG_MAX_SZ = 1024 * 16 * 1024; +static constexpr unsigned long PPD_MSG_HDR_SZ = offsetof(struct ppd_msg, data); +static constexpr unsigned long PPD_MSG_MAX_DATA_SZ = PPD_MSG_MAX_SZ - PPD_MSG_HDR_SZ; \ No newline at end of file diff --git a/include/util.h b/include/util.h index 3261b23..b858b4c 100644 --- a/include/util.h +++ b/include/util.h @@ -1,5 +1,6 @@ #pragma once +#include #ifdef __cplusplus extern "C" { #endif @@ -8,20 +9,24 @@ extern "C" { #include #include #include + +#include #include #include #define inline inline __attribute__((unused)) +#define S2US (1000000) + static inline uint64_t get_time_us() { struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - // clock_gettime(CLOCK_REALTIME, &ts); - return ts.tv_sec * 1000000 + ts.tv_nsec / 1000; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts.tv_sec * S2US + ts.tv_nsec / 1000; } + static inline void cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset) {