diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..f784c57 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,53 @@ +cmake_minimum_required(VERSION 3.10.0) +project(pingpong) + +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}) + +add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/msg/msg.pb.cc + COMMAND mkdir -p ${CMAKE_CURRENT_BINARY_DIR}/msg/ + COMMAND protoc --cpp_out=${CMAKE_CURRENT_BINARY_DIR}/msg/ --proto_path=${CMAKE_CURRENT_SOURCE_DIR}/msg/ msg.proto + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/msg/msg.proto) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(rocksdb rocksdb) +pkg_check_modules(protobuf REQUIRED protobuf) + + +if (${ENABLE_FSTACK} MATCHES "y") + pkg_check_modules(dpdk REQUIRED libdpdk) + pkg_check_modules(bsdtopo REQUIRED bsdtopo) + pkg_check_modules(ssl REQUIRED libssl) + include_directories(${dpdk_INCLUDE_DIRS}) + include_directories(${ssl_INCLUDE_DIRS}) + include_directories(${bsdtopo_INCLUDE_DIRS}) +endif() + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) +include_directories(${rocksdb_INCLUDE_DIRS}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}/msg) +include_directories(${protobuf_INCLUDE_DIRS}) + +set(CFLAGS -Wall -Wextra -Werror -Wno-unused-parameter -Wno-unused-variable -std=c++17 -O2 -g) + +add_executable(dismember ${CMAKE_CURRENT_SOURCE_DIR}/dismember/dismember.cc + ${CMAKE_CURRENT_SOURCE_DIR}/dismember/Generator.cc + ${CMAKE_CURRENT_SOURCE_DIR}/dismember/reqgen.cc + ${CMAKE_CURRENT_SOURCE_DIR}/dismember/util.cc + ${CMAKE_CURRENT_BINARY_DIR}/msg/msg.pb.cc) +target_link_libraries(dismember ${protobuf_LINK_LIBRARIES} ${rocksdb_LINK_LIBRARIES} bz2 z pthread) +target_compile_options(dismember PRIVATE ${CFLAGS}) + +add_executable(ppd ${CMAKE_CURRENT_SOURCE_DIR}/ppd/ppd.cc + ${CMAKE_CURRENT_SOURCE_DIR}/ppd/reqproc.cc + ${CMAKE_CURRENT_BINARY_DIR}/msg/msg.pb.cc) +target_link_libraries(ppd ${protobuf_LINK_LIBRARIES} ${rocksdb_LINK_LIBRARIES} bz2 z pthread) +target_compile_options(ppd PRIVATE ${CFLAGS}) + +if (${ENABLE_FSTACK} MATCHES "y") + add_executable(ppd_ff ${CMAKE_CURRENT_SOURCE_DIR}/ppd_ff/ppd.cc + ${CMAKE_CURRENT_SOURCE_DIR}/ppd_ff/reqproc.cc + ${CMAKE_CURRENT_BINARY_DIR}/msg/msg.pb.cc) + target_link_libraries(ppd_ff ${protobuf_LINK_LIBRARIES} fstack ${ssl_LINK_LIBRARIES} bz2 z crypto ${dpdk_LIBRARIES} ${bsdtopo_LIBRARIES} librte_net_bond.a librte_bus_vdev.a) + target_link_directories(ppd_ff PRIVATE /usr/local/lib ${dpdk_LIBRARY_DIRS} ${bsdtopo_LIBRARY_DIRS}) + target_compile_options(ppd_ff PRIVATE ${CFLAGS} ${dpdk_CFLAGS}) +endif() diff --git a/dismember/Generator.cc b/dismember/Generator.cc new file mode 100644 index 0000000..e72b864 --- /dev/null +++ b/dismember/Generator.cc @@ -0,0 +1,76 @@ +// modified from mutilate + +#include "Generator.h" + +Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); } + +Generator* createFacebookValue() { + Generator* g = new GPareto(15.0, 214.476, 0.348238); + + Discrete* d = new Discrete(g); + d->add(0.00536, 0.0); + d->add(0.00047, 1.0); + d->add(0.17820, 2.0); + d->add(0.09239, 3.0); + d->add(0.00018, 4.0); + d->add(0.02740, 5.0); + d->add(0.00065, 6.0); + d->add(0.00606, 7.0); + d->add(0.00023, 8.0); + d->add(0.00837, 9.0); + d->add(0.00837, 10.0); + d->add(0.08989, 11.0); + d->add(0.00092, 12.0); + d->add(0.00326, 13.0); + d->add(0.01980, 14.0); + + return d; +} + +Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); } + +Generator* createGenerator(std::string str) { + if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey(); + else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue(); + else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA(); + + char *s_copy = new char[str.length() + 1]; + strcpy(s_copy, str.c_str()); + char *saveptr = NULL; + + if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) { + double v = atof(s_copy); + delete[] s_copy; + return new Fixed(v); + } + + char *t_ptr = strtok_r(s_copy, ":", &saveptr); + char *a_ptr = strtok_r(NULL, ":", &saveptr); + + if (t_ptr == NULL) // || a_ptr == NULL) + DIE("strtok(.., \":\") failed to parse %s", str.c_str()); + + char t = t_ptr[0]; + + saveptr = NULL; + char *s1 = strtok_r(a_ptr, ",", &saveptr); + char *s2 = strtok_r(NULL, ",", &saveptr); + char *s3 = strtok_r(NULL, ",", &saveptr); + + double a1 = s1 ? atof(s1) : 0.0; + double a2 = s2 ? atof(s2) : 0.0; + double a3 = s3 ? atof(s3) : 0.0; + + delete[] s_copy; + + if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1); + else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2); + else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1); + else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3); + else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3); + else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1); + + DIE("Unable to create Generator '%s'", str.c_str()); + + return NULL; +} diff --git a/dismember/Generator.h b/dismember/Generator.h new file mode 100644 index 0000000..4031889 --- /dev/null +++ b/dismember/Generator.h @@ -0,0 +1,217 @@ +// modified from mutilate +// -*- c++ -*- + +// 1. implement "fixed" generator +// 2. implement discrete generator +// 3. implement combine generator? + +#ifndef GENERATOR_H +#define GENERATOR_H + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "util.h" +#define D(fmt, ...) +#define DIE(fmt, ...) + +// Generator syntax: +// +// \d+ == fixed +// n[ormal]:mean,sd +// e[xponential]:lambda +// p[areto]:scale,shape +// g[ev]:loc,scale,shape +// fb_value, fb_key, fb_rate + +class Generator { +public: + Generator() {} + // Generator(const Generator &g) = delete; + // virtual Generator& operator=(const Generator &g) = delete; + virtual ~Generator() {} + + virtual double generate(double U = -1.0) = 0; + virtual void set_lambda(double lambda) {DIE("set_lambda() not implemented");} +protected: + std::string type; +}; + +class Fixed : public Generator { +public: + Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); } + virtual double generate(double U = -1.0) { return value; } + virtual void set_lambda(double lambda) { + if (lambda > 0.0) value = 1.0 / lambda; + else value = 0.0; + } + +private: + double value; +}; + +class Uniform : public Generator { +public: + Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return scale * U; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) scale = 2.0 / lambda; + else scale = 0.0; + } + +private: + double scale; +}; + +class Normal : public Generator { +public: + Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) { + D("Normal(mean=%f, sd=%f)", mean, sd); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + double V = U; // drand48(); + double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V); + return mean + sd * N; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) mean = 1.0 / lambda; + else mean = 0.0; + } + +private: + double mean, sd; +}; + +class Exponential : public Generator { +public: + Exponential(double _lambda = 1.0) : lambda(_lambda) { + D("Exponential(lambda=%f)", lambda); + } + + virtual double generate(double U = -1.0) { + if (lambda <= 0.0) return 0.0; + if (U < 0.0) U = drand48(); + return -log(U) / lambda; + } + + virtual void set_lambda(double lambda) { this->lambda = lambda; } + +private: + double lambda; +}; + +class GPareto : public Generator { +public: + GPareto(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) : + loc(_loc), scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GPareto(loc=%f, scale=%f, shape=%f)", loc, scale, shape); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return loc + scale * (pow(U, -shape) - 1) / shape; + } + + virtual void set_lambda(double lambda) { + if (lambda <= 0.0) scale = 0.0; + else scale = (1 - shape) / lambda - (1 - shape) * loc; + } + +private: + double loc /* mu */; + double scale /* sigma */, shape /* k */; +}; + +class GEV : public Generator { +public: + GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) : + e(1.0), loc(_loc), scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape); + } + + virtual double generate(double U = -1.0) { + return loc + scale * (pow(e.generate(U), -shape) - 1) / shape; + } + +private: + Exponential e; + double loc /* mu */, scale /* sigma */, shape /* k */; +}; + +class Discrete : public Generator { +public: + ~Discrete() { delete def; } + Discrete(Generator* _def = NULL) : def(_def) { + if (def == NULL) def = new Fixed(0.0); + } + + virtual double generate(double U = -1.0) { + double Uc = U; + if (pv.size() > 0 && U < 0.0) U = drand48(); + + double sum = 0; + + for (auto p: pv) { + sum += p.first; + if (U < sum) return p.second; + } + + return def->generate(Uc); + } + + void add(double p, double v) { + pv.push_back(std::pair(p, v)); + } + +private: + Generator *def; + std::vector< std::pair > pv; +}; + +class KeyGenerator { +public: + KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {} + std::string generate(uint64_t ind) { + uint64_t h = fnv_64(ind); + double U = (double) h / (double)ULLONG_MAX; + double G = g->generate(U); + int keylen = MAX(round(G), floor(log10(max)) + 1); + char key[256]; + snprintf(key, 256, "%0*" PRIu64, keylen, ind); + + // D("%d = %s", ind, key); + return std::string(key); + } +private: + Generator* g; + double max; +}; + +Generator* createGenerator(std::string str); +Generator* createFacebookKey(); +Generator* createFacebookValue(); +Generator* createFacebookIA(); + +#endif // GENERATOR_H diff --git a/dismember/dismember.cc b/dismember/dismember.cc new file mode 100755 index 0000000..f70686b --- /dev/null +++ b/dismember/dismember.cc @@ -0,0 +1,1056 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include "Generator.h" +#include "msg.pb.h" +#include "reqgen.h" +#include "options.h" + +#define MSG_TEST_OK (0x1234) +#define MSG_TEST_START (0x2345) +#define MSG_TEST_STOP (0x3456) +#define MSG_TEST_QPS_ACK (0x4567) + +using namespace std; + +option options; + +struct slave_stats { + int qps; + int send_sz; + int recv_sz; +}; + +struct perf_counter { + char pad1[64]; + int cnt; + char pad2[64]; +}; + +static vector thrd_perf_counters; + +/* client server stuff */ +static vector client_ips; +static vector client_fds; +static unordered_map rgen_params; +static int master_fd = 0; + +static int mt_kq; /* kq for the main thread. Has all connections and the clock timer */ + +static pthread_barrier_t prepare_barrier; +static pthread_barrier_t ok_barrier; + +static const int depth_limit = 1; +static volatile uint64_t cur_time = 0; + +struct datapt { + uint64_t qps; + uint64_t lat; +}; + +enum conn_state { + STATE_LIMITING, + STATE_WAITING, + STATE_SENDING +}; + +struct kqconn { + req_gen *rgen; + Generator *gen; + int conn_fd; + int conn_id; + int timer; + int timer_expired; + int kq; + int depth; + enum conn_state state; + uint64_t next_send; + uint64_t last_send; + vector *conns; + vector *stats; +}; + +req_gen * create_rgen(WORKLOAD_TYPE type, const int conn_id, std::unordered_map *args) +{ + switch (type) { + case WORKLOAD_TYPE::ECHO : + return new echo_gen(conn_id, args); + break; + case WORKLOAD_TYPE::TOUCH : + return new touch_gen(conn_id, args); + break; + case WORKLOAD_TYPE::HTTP : + return new http_gen(conn_id, std::string(options.server_ip) + ":" + std::to_string(options.server_port) ,args); + break; +#ifdef WITH_ROCKSDB + case WORKLOAD_TYPE::RDB : + return new rdb_gen(conn_id, args); + break; +#endif + default: + E("Unsupported workload type %d\n", type); + } +} + +void parse_rgen_params() +{ + char * saveptr; + + for (int i = 0; i < options.num_gen_params; i++) { + saveptr = NULL; + char *key = strtok_r(options.gen_params[i], "=", &saveptr); + char *val = strtok_r(NULL, "=", &saveptr); + + rgen_params.insert({key, val}); + + V("Parsed workload parameter: %s = %s\n", key, val); + } +} + +void kqconn_cleanup(struct kqconn *conn) +{ + int status; + struct kevent ev[2]; + int nev = 1; + EV_SET(&ev[0], conn->conn_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (!conn->timer_expired) { + EV_SET(&ev[1], conn->timer, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + nev++; + } + + status = kevent(conn->kq, ev, nev, NULL, 0, NULL); + if (status == -1) { + E("Error kevent kqconn_cleanup\n"); + } + + delete conn->rgen; + delete conn->gen; + delete conn; +} + +void kqconn_state_machine(struct kqconn *conn) +{ + int64_t now; + struct kevent ev; + while(1) { + switch (conn->state) { + case STATE_WAITING: { + now = conn->next_send - (int64_t)get_time_us(); + if (now > 0) { + if (conn->timer_expired) { + EV_SET(&ev, conn->timer, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, now, conn); + if (kevent(conn->kq, &ev, 1, NULL, 0, NULL) == -1) { + E("Error arming timer %d. ERR %d\n", conn->timer, errno); + } + conn->timer_expired = 0; + } + return; + } else { + conn->state = STATE_SENDING; + break; + } + } + + case STATE_LIMITING: + if (conn->depth >= depth_limit) { + return; + } + conn->state = STATE_WAITING; + break; + + case STATE_SENDING: { + /* send one packet and transfer to wait state */ + if (conn->depth >= depth_limit) { + conn->state = STATE_LIMITING; + /* wait for the read to call us */ + break; + } else { + now = get_time_us(); + if (now < (int64_t)conn->next_send) { + /* only STATE_WAITING transfers us to this state + * the above condition cannot be true + */ + E("Oscar fucked up\n"); + } + + conn->depth++; + conn->next_send += (int)(conn->gen->generate() * 1000000.0); + conn->last_send = now; + conn->state = STATE_WAITING; + + if (conn->rgen->send_req(conn->conn_fd) < 0) { + /* effectively skipping this packet */ + W("Cannot write to connection %d\n", conn->conn_fd); + } + + break; + } + } + + default: + E("Oscar fucked up hard\n"); + } + } +} + +void ev_loop(int id, struct kevent *ev, struct kqconn *conn) +{ + int status; + + if ((int)ev->ident == conn->conn_fd) { + /* we got something to read */ + status = conn->rgen->read_resp(conn->conn_fd); + if (status < 0) { + E("Connection %d read_resp failed. ERR: %d\n", conn->conn_fd, errno); + } + + conn->depth--; + if (conn->depth < 0) { + E("More recved packets than sent.\n"); + } + + if ((long)cur_time >= options.warmup) { + thrd_perf_counters[id]->cnt++; + if (!options.client_mode) { + struct datapt* dat = new struct datapt; + dat->qps = 0; + dat->lat = get_time_us() - conn->last_send; + conn->stats->push_back(dat); + V("Conn %d: TS: %d LAT: %ld, QPS: %ld\n", conn->conn_fd, (int)cur_time, dat->lat, (long)dat->qps); + } + } + + kqconn_state_machine(conn); + + } else if ((int)ev->ident == conn->timer) { + conn->timer_expired = 1; + kqconn_state_machine(conn); + } else { + E("Oscar really fucked up hard\n"); + } +} +static int send_sz = 0; +static int recv_sz = 0; +void +worker_thread(int id, int notif_pipe, vector *data) +{ + int timer_start = -1; + int conn_fd; + struct sockaddr_in server_addr; + vector conns; + struct kevent ev[2]; + int kq = kqueue(); + + EV_SET(&ev[0], notif_pipe, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (kevent(kq, &ev[0], 1, NULL, 0, NULL) == -1) { + E("conn fd event kq reg problem"); + } + + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(options.server_port); + server_addr.sin_addr.s_addr = inet_addr(options.master_mode ? options.master_server_ip : options.server_ip); + + // initialize all the connections + for (int i = 0 ; i < options.client_conn; i++) { + while (true) { + int enable = 1; + struct timeval tv = { .tv_sec = 5, .tv_usec = 0 }; + + conn_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (conn_fd == -1) { + W("Error in creating socket, will retry.\n"); + continue; + } + if (setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)) < 0) { + E("setsockopt rcvtimeo"); + } + if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) { + E("setsockopt reuseaddr"); + } + if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) { + E("setsockopt reuseport"); + } + if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) { + E("setsockopt() failed on socket %d\n", conn_fd); + } + if (connect(conn_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) != 0) { + E("Connection %d connect() failed. Dropping. Err: %d\n", conn_fd, errno); + } + + struct kqconn* conn = new struct kqconn; + conn->conn_fd = conn_fd; + conn->conn_id = options.global_conn_start_idx.fetch_add(1); + conn->timer = timer_start--; + conn->kq = kq; + conn->depth = 0; + conn->conns = &conns; + conn->next_send = 0; + conn->timer_expired = 0; + conn->state = STATE_WAITING; + conn->stats = data; + conn->gen = createGenerator(options.generator_name); + if (conn->gen == NULL) { + E("Unknown generator \"%s\"\n", options.generator_name); + } + conn->gen->set_lambda((double)options.target_qps / (double)(options.client_thread_count * options.client_conn)); + conn->rgen = create_rgen(options.workload_type, conn->conn_id, &rgen_params); + + EV_SET(&ev[0], conn->conn_fd, EVFILT_READ, EV_ADD, 0, 0, conn); + EV_SET(&ev[1], conn->timer, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, 0, conn); + if (kevent(kq, ev, 2, NULL, 0, NULL) == -1) { + E("conn fd event kq reg problem"); + } + + conns.push_back(conn); + V("Established connection %d with global id %d\n", conn->conn_fd, conn->conn_id); + usleep(50); + break; + } + } + + V("Thread %d has established %ld connections.\n", id, conns.size()); + pthread_barrier_wait(&prepare_barrier); + + V("Thread %d waiting for START...\n", id); + pthread_barrier_wait(&ok_barrier); + + V("Thread %d running...\n", id); + /* send the initial packet now */ + for (uint32_t i = 0; i < conns.size(); i++) { + conns.at(i)->next_send = get_time_us(); + } + + while(1) { + struct kevent ev; + if (kevent(kq, NULL, 0, &ev, 1, NULL) == 1) { + struct kqconn *conn = (struct kqconn *)ev.udata; + + if (ev.flags & EV_EOF) { + /* just error */ + E("Connection %d dropped due to EOF. ERR: %d\n", conn->conn_fd, ev.fflags); + continue; + } + + if ((int)ev.ident == notif_pipe) { + char c; + + if (read(notif_pipe, &c, sizeof(c)) == -1) + E("Error reading pipe. ERR %d\n", errno); + + if (c == 'e') + break; + } + + ev_loop(id, &ev, conn); + } else { + E("Thread %d kevent failed. ERR %d\n", id, errno); + } + } + + if (options.workload_type == WORKLOAD_TYPE::ECHO) { + send_sz = ((echo_gen*)conns.at(0)->rgen)->get_send_sz(); + recv_sz = ((echo_gen*)conns.at(0)->rgen)->get_recv_sz(); + } + + for (uint32_t i = 0; i < conns.size(); i++) { + kqconn_cleanup(conns.at(i)); + } + + close(kq); + + V("Thread %d exiting...\n", id); +} + +void +master_recv_stats(struct slave_stats *stats) +{ + int tot = 0; + struct kevent kev; + ppd_slave_resp resp; + char msg[1024]; + + while(tot < options.client_num) { + + if (kevent(mt_kq, NULL, 0, &kev, 1, NULL) != 1) { + E("Error recving qps. ERR %d\n", errno); + } + + if (kev.flags & EV_EOF) { + continue; + } + + long sz; + if ((sz = readbuf(kev.ident, msg, sizeof(long))) == -1) { + E("Failed to read from master_fd or message invalid. ERR %d\n", errno); + } + memcpy(&sz, msg, sizeof(long)); + sz = ntohl(sz); + if (readbuf(kev.ident, msg, sz) == -1) { + E("Failed to read from master_fd or message invalid 2. ERR %d\n", errno); + } + + resp.ParseFromArray(msg, sz); + stats->qps += resp.qps(); + stats->send_sz = resp.send_sz(); + stats->recv_sz = resp.resp_sz(); + + V("Received qps %d send %d recv %d from client %d\n", resp.qps(), resp.send_sz(), resp.resp_sz(), (int)kev.ident); + + int msg = MSG_TEST_QPS_ACK; + if (writebuf(kev.ident, &msg, sizeof(int)) == -1) { + E("Failed to send ACK to client %d. ERR %d\n", (int)kev.ident, errno); + } + + V("Sent ACK to client %d\n", (int)kev.ident); + tot++; + } +} + +void +client_stop(struct slave_stats *stats) +{ + int qps; + int msg = MSG_TEST_STOP; + int stop_cnt = 0; + struct kevent kev; + + V("Sending clients STOP...\n"); + for (uint32_t i=0;ih_addr_list; + for (int i=0;addr[i]!=NULL;i++) { + strncpy(rt, inet_ntoa(*addr[i]), 99); + return rt; + } + return rt; +} + +static void send_master_ok() +{ + struct kevent kev; + int status; + int msg = MSG_TEST_OK; + status = writebuf(master_fd, &msg, sizeof(msg)); + + if (status < 0) { + E("Failed to respond OK to master. ERR %d\n", errno); + } + + /* wait for START */ + status = kevent(mt_kq, NULL, 0, &kev, 1, NULL); + + if (status != 1) { + E("Failed to wait for START. ERR %d\n", errno); + } + + status = readbuf(master_fd, &msg, sizeof(msg)); + + if (status == -1 || msg != MSG_TEST_START) { + E("Failed to read START from master. ERR %d\n", errno); + } +} + +static void wait_clients_ok() +{ + set acked; + int status; + int msg; + struct kevent kev; + + /* wait for client OKs */ + while(acked.size() < client_fds.size()) { + status = kevent(mt_kq, NULL, 0, &kev, 1, NULL); + + if (status != 1) { + E("kevent wait for client ok %d\n", errno); + } + + V("Received client ok from %d\n", (int)kev.ident); + + status = readbuf(kev.ident, &msg, sizeof(int)); + + if (status < 0) { + E("readbuf wait for client ok %d\n", errno); + } + + if (msg != MSG_TEST_OK || acked.find(kev.ident) != acked.end()) { + E("Duplicate or invalid client ok message\n"); + } + + acked.insert(kev.ident); + } + + sleep(1); + /* start all clients */ + msg = MSG_TEST_START; + for (uint32_t i = 0; i < client_fds.size(); i++) { + if(writebuf(client_fds.at(i), &msg, sizeof(int)) < 0) { + E("Error sending START to client %d. ERR %d\n",client_fds.at(i) , errno); + } + } +} + +static void prepare_clients() +{ + int real_conn = options.master_conn == -1 ? options.client_conn : options.master_conn; + int real_thread = options.master_thread_count == -1 ? options.client_thread_count : options.master_thread_count; + int real_qps; + + if (options.master_qps != -1) { + real_qps = options.master_qps; + } else { + real_qps = options.target_qps * (real_conn * real_thread) / (real_conn * real_thread + options.client_num * options.client_conn * options.client_thread_count); + } + + /* this is the qps for each client */ + options.target_qps = (options.target_qps - real_qps) / client_ips.size(); + + struct kevent *kev = new struct kevent[client_ips.size()]; + + /* create a connection to the clients */ + for (uint32_t i=0; i < client_ips.size(); i++) { + struct sockaddr_in csock_addr; + int c_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + csock_addr.sin_family = AF_INET; + csock_addr.sin_addr.s_addr = inet_addr(client_ips[i]); + csock_addr.sin_port = htons(DEFAULT_CLIENT_CTL_PORT); + + V("Connecting to client %s...\n", client_ips.at(i)); + + if (connect(c_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) != 0) { + E("Connect failed. ERR %d\n", errno); + } + + if (writebuf(c_fd, &options, sizeof(options)) < 0) { + E("Write to client. ERR %d\n", errno); + } + + client_fds.push_back(c_fd); + V("Client connected %d/%lu.\n", i + 1, client_ips.size()); + + EV_SET(&kev[i], c_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + + options.global_conn_start_idx += options.client_conn * options.client_thread_count; + } + + V("Registering client fds to mtkq...\n"); + /* add to main thread's kq */ + if (kevent(mt_kq, kev, client_ips.size(), NULL, 0, NULL) == -1) { + E("Failed to add some clients to mtkq. ERR %d\n", errno); + } + + delete[] kev; + + /* adjust the existing settings */ + options.client_conn = real_conn; + options.client_thread_count = real_thread; + options.target_qps = real_qps; +} + +static void client_send_stats(int qps, int send_sz, int recv_sz) +{ + struct kevent kev; + int msg = 0; + ppd_slave_resp resp; + char buf[1024]; + + resp.set_qps(qps); + resp.set_send_sz(send_sz); + resp.set_resp_sz(recv_sz); + resp.SerializeToArray(buf + sizeof(long), 1024 - sizeof(long)); + long sz = htonl(resp.ByteSizeLong()); + memcpy(buf, &sz, sizeof(long)); + + /* clients need to send qps */ + V("Sending master stats, qps %d, send %d, resp %d...\n", qps, send_sz, recv_sz); + if (writebuf(master_fd, buf, resp.ByteSizeLong() + sizeof(long)) < 0) { + E("Error writing stats to master\n"); + } + + V("Waiting for master ACK...\n"); + + if (kevent(mt_kq, NULL, 0, &kev, 1, NULL) != 1) { + E("kevent wait for master ack %d\n", errno); + } + + if (readbuf((int)kev.ident, &msg, sizeof(int)) < 0) { + E("Failed to receive ack from master\n"); + } + + if (msg != MSG_TEST_QPS_ACK) { + E("Invalid ack message\n"); + } +} + +static void wait_master_prepare() +{ + V("Waiting for master's PREPARE...\n"); + + struct sockaddr_in csock_addr; + int listen_fd; + int enable = 1; + csock_addr.sin_family = AF_INET; + csock_addr.sin_addr.s_addr = htonl(INADDR_ANY); + csock_addr.sin_port = htons(DEFAULT_CLIENT_CTL_PORT); + listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + + if (listen_fd < 0) { + E("socket"); + } + + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) { + E("setsockopt reuseaddr"); + } + + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) { + E("setsockopt reuseport"); + } + + if (::bind(listen_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) < 0) { + E("bind"); + } + + if (listen(listen_fd, 10) < 0) { + E("ctl listen"); + } + + master_fd = accept(listen_fd, NULL, NULL); + if (master_fd == -1) { + E("Failed to accept master. ERR %d\n", errno); + } + + close(listen_fd); + + if (readbuf(master_fd, &options, sizeof(options)) < 0) { + E("Failed to receive options from master. ERR %d\n", errno); + } + + V("Registering master fd to mtkq...\n"); + struct kevent kev; + + EV_SET(&kev, master_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + + if (kevent(mt_kq, &kev, 1, NULL, 0, NULL) == -1) { + E("Failed to register master fd to mtkq. ERR %d\n", errno); + } + + /* set the correct mode */ + options.master_mode = 0; + options.client_mode = 1; + options.output_name = NULL; +} + +static void usage() +{ + fprintf(stdout, "Usage:\n" + " -s: server addr.\n" + " -p: server port.\n" + " -q: target qps.\n" + " -t: thread's cpu list.\n" + " -c: connections per thread.\n" + " -o: test output file name.\n" + " -h: show help.\n" + " -v: verbose mode.\n" + " -W: warm up time.\n" + " -w: test duration.\n" + " -i: interarrival distribution. Default fb_ia. See mutilate.\n" + " -l: workload type. ECHO(0), TOUCH(1), RDB(2), HTTP(3). Default 0.\n" + " -O: workload specific parameters. Format: param=value. E.g. -Otsala=patis.\n\n" + "Master mode:\n" + " -a: client addr.\n" + " -A: client mode.\n" + " -C: master connections.\n" + " -Q: master qps.\n" + " -T: master threads.\n" + " -S: master mode server ip.\n\n" + "Workload specific parameters:\n" + " ECHO:\n" + " GEN: the generator for request delay. Default fixed:0.\n" + " CDELAY: enable per-connection delay. Default 0.\n" + " SIZE: the request payload size for resp filled by the server. Default 0." + " TOUCH:\n" + " GEN: the generator for items touched per request. Default fixed:64.\n" + " UPDATE: the update ratio of request. Default 0.\n" + " HTTP:\n" + " N/A\n" + " RDB:\n" + " N/A\n\n"); +} + +/* + * protocol: + * + * master -> client options + * client and master all establish connections to the server + * client -> master OK + * master -> client START (client runs forever) + * master RUNS for X seconds + * master -> client STOP + */ +int +main(int argc, char* argv[]) +{ + int ch; + FILE *resp_fp_csv; + + while ((ch = getopt(argc, argv, "q:s:C:p:o:t:c:hvW:w:T:Aa:Q:i:S:l:O:")) != -1) { + switch (ch) { + case 'q': + options.target_qps = atoi(optarg); + if (options.target_qps < 0) { + E("Target QPS must be positive\n"); + } + break; + case 'Q': + options.master_qps = atoi(optarg); + if (options.master_qps < 0) { + E("Master QPS must be positive\n"); + } + break; + case 's': { + string ip = get_ip_from_hostname(optarg); + strncpy(options.server_ip, ip.c_str(), INET_ADDRSTRLEN); + break; + } + case 'p': + options.server_port = atoi(optarg); + if (options.server_port <= 0) { + E("Server port must be positive\n"); + } + break; + case 'o': + options.output_name = optarg; + break; + case 't': + options.client_thread_count = atoi(optarg); + if (options.client_thread_count <= 0) { + E("Client threads must be positive\n"); + } + break; + case 'T': + options.master_thread_count = atoi(optarg); + if (options.master_thread_count <= 0) { + E("Master threads must be positive\n"); + } + break; + case 'c': + options.client_conn = atoi(optarg); + if (options.client_conn <= 0) { + E("Client connections must be positive\n"); + } + break; + case 'l': { + options.workload_type = static_cast(atoi(optarg)); + break; + } + case 'C': + options.master_conn = atoi(optarg); + if (options.master_conn <= 0) { + E("Master connections must be positive\n"); + } + break; + case 'S': { + string ip = get_ip_from_hostname(optarg); + options.master_server_ip_given = 1; + strncpy(options.master_server_ip, ip.c_str(), INET_ADDRSTRLEN); + break; + } + case 'O': { + strncpy(options.gen_params[options.num_gen_params], optarg, MAX_GEN_PARAMS_LEN); + options.num_gen_params++; + break; + } + case 'h': + case '?': + usage(); + exit(0); + case 'v': + options.verbose = 1; + W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some(XFCE) terminals!\n"); + break; + case 'a': { + if (options.client_mode == 1) { + E("Cannot be both master and client\n"); + } + string ip = get_ip_from_hostname(optarg); + char *rip = new char[INET_ADDRSTRLEN + 1]; + strncpy(rip, ip.c_str(), INET_ADDRSTRLEN); + client_ips.push_back(rip); + options.master_mode = 1; + break; + } + case 'W': + options.warmup = atoi(optarg); + if (options.warmup < 0) { + E("Warmup must be positive\n"); + } + break; + case 'w': + options.duration = atoi(optarg); + if (options.duration <= 0) { + E("Test duration must be positive\n"); + } + break; + case 'A': { + if (options.master_mode == 1) { + E("Cannot be both master and client\n"); + } + options.client_mode = 1; + break; + } + case 'i': { + strncpy(options.generator_name, optarg, MAX_GEN_LEN); + break; + } + default: + E("Unrecognized option -%c\n\n", ch); + } + } + + ::signal(SIGPIPE, SIG_IGN); + + if (!options.client_mode) { + resp_fp_csv = fopen(options.output_name, "w"); + if (resp_fp_csv == NULL) { + E("cannot open file for writing %s.", options.output_name); + exit(1); + } + } + + if (options.master_mode && options.master_server_ip_given == 0) { + /* fall back to ip from -s */ + strncpy(options.master_server_ip, options.server_ip, INET_ADDRSTRLEN); + } + + options.client_num = client_ips.size(); + + mt_kq = kqueue(); + + /* connect to clients and sync options */ + if (options.master_mode) { + /* make master connections to clients */ + prepare_clients(); + } else if (options.client_mode) { + /* in client mode we receive all parameters from the server */ + wait_master_prepare(); + } + + /* here EVERYONE is on the same page */ + options.dump(); + parse_rgen_params(); + + vector threads; + vector send_pipes; + vector recv_pipes; + send_pipes.resize(options.client_thread_count); + recv_pipes.resize(options.client_thread_count); + + /* do our setup according to the options */ + pthread_barrier_init(&prepare_barrier, NULL, options.client_thread_count + 1); + pthread_barrier_init(&ok_barrier, NULL, options.client_thread_count + 1); + + V("Creating %d threads...\n", options.client_thread_count); + + /* create worker threads */ + vector *data = new vector[options.client_thread_count]; + for (int i = 0; i < options.client_thread_count; i++) { + int pipes[2]; + if (pipe(&pipes[0]) == -1) { + E("Cannot create pipes. ERR %d\n", errno); + } + + struct perf_counter *perf = new struct perf_counter; + perf->cnt = 0; + send_pipes[i] = pipes[0]; + recv_pipes[i] = pipes[1]; + + thrd_perf_counters.push_back(perf); + std::thread * thrd = new std::thread(worker_thread, i, pipes[1], &data[i]); + threads.push_back(thrd); + pthread_t handle = thrd->native_handle(); + cpuset_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(i * 2, &cpuset); + V("Setting thread %d's affinity to %d.\n", i , i * 2); + if (pthread_setaffinity_np(handle, sizeof(cpuset_t), &cpuset) != 0) { + E("Failed to set affinity for thread %d\n", i); + } + } + + V("Waiting for thread connection establishment.\n"); + pthread_barrier_wait(&prepare_barrier); + + if (options.master_mode) { + V("Waiting for clients...\n"); + /* wait for ok messages from the clients */ + wait_clients_ok(); + } else if (options.client_mode) { + V("Sending OK to master...\n"); + /* send ok messages to the master */ + send_master_ok(); + } + + + /* create main thread timer loop */ + struct kevent kev; +#define TIMER_FD (-1) + EV_SET(&kev, TIMER_FD, EVFILT_TIMER, EV_ADD, NOTE_SECONDS, 1, NULL); + + if (kevent(mt_kq, &kev, 1, NULL, 0, NULL) == -1) { + E("Cannot create kevent timer. ERR %d\n", errno); + } + + /* kick off worker threads */ + pthread_barrier_wait(&ok_barrier); + /* now we are free to start the experiment */ + V("Main thread running.\n"); + + while(1) { + /* client mode runs forever unless server sends us */ + if ((int)cur_time >= options.duration + options.warmup && !options.client_mode) { + break; + } + + if (kevent(mt_kq, NULL, 0, &kev, 1, NULL) != 1) { + E("Error in main event loop. ERR %d\n", errno); + } + + if ((int)kev.ident == TIMER_FD) { + cur_time++; + } else { + /* its from either master or client */ + if (kev.flags & EV_EOF) { + E("Client or master %d disconnected\n", (int)kev.ident); + } + + int msg; + + if (readbuf(kev.ident, &msg, sizeof(msg)) == -1) + E("Failed to read from master_fd or message invalid. ERR %d\n", errno); + + if (msg == MSG_TEST_STOP) { + V("Received STOP from master\n"); + break; + } else { + E("Unexpected message from master: %d\n", msg); + } + } + } + + V("Signaling threads to exit...\n"); + for (int i = 0; i < options.client_thread_count; i++) { + if (write(send_pipes[i], "e", sizeof(char)) == -1) { + E("Couldn't write to thread pipe %d. ERR %d\n", send_pipes[i], errno); + } + } + + for (int i = 0; i < options.client_thread_count; i++) { + threads.at(i)->join(); + delete(threads.at(i)); + delete thrd_perf_counters[i]; + close(send_pipes[i]); + close(recv_pipes[i]); + } + + int qps = 0; + + for(int i = 0; i < options.client_thread_count; i++) { + qps += thrd_perf_counters[i]->cnt; + } + V("Total requests: %d\n", qps); + + if (options.client_mode) { + client_send_stats(qps, send_sz, recv_sz); + close(master_fd); + } + + struct slave_stats stats; + stats.qps = 0; + stats.recv_sz = 0; + stats.send_sz = 0; + if (options.master_mode) { + V("Shutting down clients...\n"); + client_stop(&stats); + qps += stats.qps; + } + + V("Aggregated %d operations over %d seconds\n", qps, options.duration); + qps = qps / (options.duration); + + if (!options.client_mode) { + /* stop the measurement */ + V("Saving results...\n"); + for (int i = 0; i < options.client_thread_count; i++) { + fprintf(resp_fp_csv, "%d,%d,%d\n", qps, stats.send_sz, stats.recv_sz); + for (uint32_t j = 0; j < data[i].size(); j++) { + struct datapt *pt = data[i].at(j); + fprintf(resp_fp_csv, "%ld\n", pt->lat); + delete pt; + } + } + + delete[] data; + + fclose(resp_fp_csv); + } + + close(mt_kq); + + return 0; +} diff --git a/dismember/options.h b/dismember/options.h new file mode 100644 index 0000000..28eab24 --- /dev/null +++ b/dismember/options.h @@ -0,0 +1,124 @@ +#pragma once + +#include +#include +#include + +#include +#include + +static constexpr const int MAX_GEN_LEN = 31; +static constexpr const int DEFAULT_SERVER_CLIENT_CONN_PORT = 9898; +static constexpr const int DEFAULT_CLIENT_CTL_PORT = 9901; +static constexpr const char* DEFAULT_OUTPUT = "output.sample"; +static constexpr const int MAX_GEN_PARAMS = 8; +static constexpr const int MAX_GEN_PARAMS_LEN = 31; + +struct option; + +extern option options; + +struct option { + int verbose; + + enum WORKLOAD_TYPE workload_type; + + const char *output_name; + + int client_num; + int client_thread_count; + int master_thread_count; + + int client_conn; + int master_conn; + + int target_qps; + int master_qps; + + int master_mode; + int client_mode; + + std::atomic_int global_conn_start_idx; + + char server_ip[INET_ADDRSTRLEN + 1]; + char generator_name[MAX_GEN_LEN + 1]; + + char gen_params[MAX_GEN_PARAMS][MAX_GEN_PARAMS_LEN + 1]; + int num_gen_params; + + int master_server_ip_given; + char master_server_ip[INET_ADDRSTRLEN + 1]; + int server_port; + int depth_limit; + + int warmup; + int duration; + + option() + { + this->verbose = 0; + this->depth_limit = 1; + this->output_name = DEFAULT_OUTPUT; + this->client_thread_count = 1; + this->master_thread_count = -1; + this->client_conn = 1; + this->master_conn = -1; + this->target_qps = 0; + this->master_qps = -1; + this->client_mode = 0; + this->master_mode = 0; + this->warmup = 0; + this->duration = 10; + this->server_port = DEFAULT_SERVER_CLIENT_CONN_PORT; + this->master_server_ip_given = 0; + this->workload_type = WORKLOAD_TYPE::ECHO; + this->num_gen_params = 0; + this->global_conn_start_idx = 0; + + for(int i = 0; i < MAX_GEN_PARAMS; i++) { + memset(gen_params[i], 0, MAX_GEN_LEN + 1); + } + + /* set default values */ + strncpy(this->generator_name, "fb_ia" , MAX_GEN_LEN); + strncpy(this->server_ip, "127.0.0.1" , INET_ADDRSTRLEN); + strncpy(this->master_server_ip, "127.0.0.1", INET_ADDRSTRLEN); + } + + void dump() + { + V ("Configuration:\n" + " Connections per thread: %d\n" + " Num threads: %d\n" + " Target QPS: %d\n" + " warmup: %d\n" + " duration: %d\n" + " master_mode: %d\n" + " client_mode: %d\n" + " output_file: %s\n" + " server_ip: %s\n" + " server_port: %d\n" + " IA_DIST: %s\n" + " master_server_ip: %s\n" + " workload_type: %d\n" + " num_workload_param: %d\n" + " global_conn_idx: %d\n", + this->client_conn, + this->client_thread_count, + this->target_qps, + this->warmup, + this->duration, + this->master_mode, + this->client_mode, + this->output_name, + this->server_ip, + this->server_port, + this->generator_name, + this->master_server_ip, + this->workload_type, + this->num_gen_params, + this->global_conn_start_idx.load()); + } +}; +/* Don't send god damn vtables */ +static_assert(std::is_standard_layout