From ecfbd75065cb2ebfd1cde606b59fe685c497d2c0 Mon Sep 17 00:00:00 2001 From: David Terei Date: Fri, 12 Sep 2014 13:47:01 -0700 Subject: [PATCH 1/6] Seperate out protocol handling from connection. --- Connection.cc | 509 ++++++++++++++++---------------------------------- Connection.h | 104 ++++++----- Protocol.cc | 202 ++++++++++++++++++++ Protocol.h | 71 +++++++ SConstruct | 7 +- mutilate.cc | 31 +-- 6 files changed, 508 insertions(+), 416 deletions(-) create mode 100644 Protocol.cc create mode 100644 Protocol.h diff --git a/Connection.cc b/Connection.cc index b7dcf1d..ea02899 100644 --- a/Connection.cc +++ b/Connection.cc @@ -16,11 +16,14 @@ #include "binary_protocol.h" #include "util.h" +/** + * Create a new connection to a server endpoint. + */ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t _options, bool sampling) : - hostname(_hostname), port(_port), start_time(0), - stats(sampling), options(_options), base(_base), evdns(_evdns) + start_time(0), stats(sampling), options(_options), + hostname(_hostname), port(_port), base(_base), evdns(_evdns) { valuesize = createGenerator(options.valuesize); keysize = createGenerator(options.keysize); @@ -34,7 +37,7 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, iagen->set_lambda(options.lambda); } - read_state = INIT_READ; + read_state = INIT_READ; write_state = INIT_WRITE; last_tx = last_rx = 0.0; @@ -43,20 +46,28 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this); bufferevent_enable(bev, EV_READ | EV_WRITE); + if (options.binary) { + prot = new ProtocolBinary(options, this, bev); + } else { + prot = new ProtocolAscii(options, this, bev); + } + if (bufferevent_socket_connect_hostname(bev, evdns, AF_UNSPEC, hostname.c_str(), - atoi(port.c_str()))) + atoi(port.c_str()))) { DIE("bufferevent_socket_connect_hostname()"); + } timer = evtimer_new(base, timer_cb, this); } +/** + * Destroy a connection, performing cleanup. + */ Connection::~Connection() { event_free(timer); timer = NULL; - // FIXME: W("Drain op_q?"); - bufferevent_free(bev); delete iagen; @@ -65,6 +76,9 @@ Connection::~Connection() { delete valuesize; } +/** + * Reset the connection back to an initial, fresh state. + */ void Connection::reset() { // FIXME: Actually check the connection, drain all bufferevents, drain op_q. assert(op_queue.size() == 0); @@ -74,27 +88,56 @@ void Connection::reset() { stats = ConnectionStats(stats.sampling); } - -void Connection::issue_sasl() { - read_state = WAITING_FOR_SASL; - - string username = string(options.username); - string password = string(options.password); - - binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, {0}, 0, 0, 0}; - header.key_len = htons(5); - header.body_len = htonl(6 + username.length() + 1 + password.length()); - - bufferevent_write(bev, &header, 24); - bufferevent_write(bev, "PLAIN\0", 6); - bufferevent_write(bev, username.c_str(), username.length() + 1); - bufferevent_write(bev, password.c_str(), password.length()); +/** + * Set our event processing priority. + */ +void Connection::set_priority(int pri) { + if (bufferevent_priority_set(bev, pri)) { + DIE("bufferevent_set_priority(bev, %d) failed", pri); + } } +/** + * Load any required test data onto the server. + */ +void Connection::start_loading() { + read_state = LOADING; + loader_issued = loader_completed = 0; + + for (int i = 0; i < LOADER_CHUNK; i++) { + if (loader_issued >= options.records) break; + char key[256]; + int index = lrand48() % (1024 * 1024); + string keystr = keygen->generate(loader_issued); + strcpy(key, keystr.c_str()); + issue_set(key, &random_char[index], valuesize->generate()); + loader_issued++; + } +} + +/** + * Issue either a get or set request to the server according to our probability distribution. + */ +void Connection::issue_something(double now) { + char key[256]; + // FIXME: generate key distribution here! + string keystr = keygen->generate(lrand48() % options.records); + strcpy(key, keystr.c_str()); + + if (drand48() < options.update) { + int index = lrand48() % (1024 * 1024); + issue_set(key, &random_char[index], valuesize->generate(), now); + } else { + issue_get(key, now); + } +} + +/** + * Issue a get request to the server. + */ void Connection::issue_get(const char* key, double now) { Operation op; int l; - uint16_t keylen = strlen(key); #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -103,7 +146,6 @@ void Connection::issue_get(const char* key, double now) { #if USE_CACHED_TIME struct timeval now_tv; event_base_gettimeofday_cached(base, &now_tv); - op.start_time = tv_to_double(&now_tv); #else op.start_time = get_time(); @@ -113,35 +155,22 @@ void Connection::issue_get(const char* key, double now) { } #endif - op.type = Operation::GET; op.key = string(key); - + op.type = Operation::GET; op_queue.push(op); - if (read_state == IDLE) - read_state = WAITING_FOR_GET; - - if (options.binary) { - // each line is 4-bytes - binary_header_t h = {0x80, CMD_GET, htons(keylen), - 0x00, 0x00, {htons(0)}, //TODO(syang0) get actual vbucket? - htonl(keylen) }; - - bufferevent_write(bev, &h, 24); // size does not include extras - bufferevent_write(bev, key, keylen); - l = 24 + keylen; - } else { - l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); - } - + if (read_state == IDLE) read_state = WAITING_FOR_GET; + l = prot->get_request(key); if (read_state != LOADING) stats.tx_bytes += l; } +/** + * Issue a set request to the server. + */ void Connection::issue_set(const char* key, const char* value, int length, double now) { Operation op; int l; - uint16_t keylen = strlen(key); #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -153,47 +182,14 @@ void Connection::issue_set(const char* key, const char* value, int length, op.type = Operation::SET; op_queue.push(op); - if (read_state == IDLE) - read_state = WAITING_FOR_SET; - - if (options.binary) { - // each line is 4-bytes - binary_header_t h = { 0x80, CMD_SET, htons(keylen), - 0x08, 0x00, {htons(0)}, //TODO(syang0) get actual vbucket? - htonl(keylen + 8 + length)}; - - bufferevent_write(bev, &h, 32); // With extras - bufferevent_write(bev, key, keylen); - bufferevent_write(bev, value, length); - l = 24 + h.body_len; - } else { - l = evbuffer_add_printf(bufferevent_get_output(bev), - "set %s 0 0 %d\r\n", key, length); - bufferevent_write(bev, value, length); - bufferevent_write(bev, "\r\n", 2); - l += length + 2; - } - + if (read_state == IDLE) read_state = WAITING_FOR_SET; + l = prot->set_request(key, value, length); if (read_state != LOADING) stats.tx_bytes += l; } -void Connection::issue_something(double now) { - char key[256]; - // FIXME: generate key distribution here! - string keystr = keygen->generate(lrand48() % options.records); - strcpy(key, keystr.c_str()); - // int key_index = lrand48() % options.records; - // generate_key(key_index, options.keysize, key); - - if (drand48() < options.update) { - int index = lrand48() % (1024 * 1024); - // issue_set(key, &random_char[index], options.valuesize, now); - issue_set(key, &random_char[index], valuesize->generate(), now); - } else { - issue_get(key, now); - } -} - +/** + * Return the oldest live operation in progress. + */ void Connection::pop_op() { assert(op_queue.size() > 0); @@ -213,6 +209,39 @@ void Connection::pop_op() { } } +/** + * Finish up (record stats) an operation that just returned from the + * server. + */ +void Connection::finish_op(Operation *op) { + double now; +#if USE_CACHED_TIME + struct timeval now_tv; + event_base_gettimeofday_cached(base, &now_tv); + now = tv_to_double(&now_tv); +#else + now = get_time(); +#endif +#if HAVE_CLOCK_GETTIME + op->end_time = get_time_accurate(); +#else + op->end_time = now; +#endif + + switch (op->type) { + case Operation::GET: stats.log_get(*op); break; + case Operation::SET: stats.log_set(*op); break; + default: DIE("Not implemented."); + } + + last_rx = now; + pop_op(); + drive_write_machine(); +} + +/** + * Check if our testing is done and we should exit. + */ bool Connection::check_exit_condition(double now) { if (read_state == INIT_READ) return false; if (now == 0.0) now = get_time(); @@ -221,10 +250,43 @@ bool Connection::check_exit_condition(double now) { return false; } -// drive_write_machine() determines whether or not to issue a new -// command. Note that this function loops. Be wary of break -// vs. return. +/** + * Handle new connection and error events. + */ +void Connection::event_callback(short events) { + if (events & BEV_EVENT_CONNECTED) { + D("Connected to %s:%s.", hostname.c_str(), port.c_str()); + int fd = bufferevent_getfd(bev); + if (fd < 0) DIE("bufferevent_getfd"); + if (!options.no_nodelay) { + int one = 1; + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + (void *) &one, sizeof(one)) < 0) + DIE("setsockopt()"); + } + + read_state = CONN_SETUP; + if (prot->setup_connection_w()) { + read_state = IDLE; + } + + } else if (events & BEV_EVENT_ERROR) { + int err = bufferevent_socket_get_dns_error(bev); + if (err) DIE("DNS error: %s", evutil_gai_strerror(err)); + DIE("BEV_EVENT_ERROR: %s", strerror(errno)); + + } else if (events & BEV_EVENT_EOF) { + DIE("Unexpected EOF from server."); + } +} + +/** + * Request generation loop. Determines whether or not to issue a new command, + * based on timer events. + * + * Note that this function loops. Be wary of break vs. return. + */ void Connection::drive_write_machine(double now) { if (now == 0.0) now = get_time(); @@ -237,11 +299,9 @@ void Connection::drive_write_machine(double now) { switch (write_state) { case INIT_WRITE: delay = iagen->generate(); - next_time = now + delay; double_to_tv(delay, &tv); evtimer_add(timer, &tv); - write_state = WAITING_FOR_TIME; break; @@ -253,16 +313,10 @@ void Connection::drive_write_machine(double now) { write_state = WAITING_FOR_TIME; break; // We want to run through the state machine one more time // to make sure the timer is armed. - // } else if (options.moderate && options.lambda > 0.0 && - // now < last_rx + 0.25 / options.lambda) { } else if (options.moderate && now < last_rx + 0.00025) { write_state = WAITING_FOR_TIME; if (!event_pending(timer, EV_TIMEOUT, NULL)) { - // delay = last_rx + 0.25 / options.lambda - now; delay = last_rx + 0.00025 - now; - // I("MODERATE %f %f %f %f %f", now - last_rx, 0.25/options.lambda, - // 1/options.lambda, now-last_tx, delay); - double_to_tv(delay, &tv); evtimer_add(timer, &tv); } @@ -272,7 +326,6 @@ void Connection::drive_write_machine(double now) { issue_something(now); last_tx = now; stats.log_op(op_queue.size()); - next_time += iagen->generate(); if (options.skip && options.lambda > 0.0 && @@ -284,7 +337,6 @@ void Connection::drive_write_machine(double now) { next_time += iagen->generate(); } } - break; case WAITING_FOR_TIME: @@ -296,7 +348,6 @@ void Connection::drive_write_machine(double now) { } return; } - write_state = ISSUING; break; @@ -310,51 +361,14 @@ void Connection::drive_write_machine(double now) { } } -void Connection::event_callback(short events) { - // struct timeval now_tv; - // event_base_gettimeofday_cached(base, &now_tv); - - if (events & BEV_EVENT_CONNECTED) { - D("Connected to %s:%s.", hostname.c_str(), port.c_str()); - int fd = bufferevent_getfd(bev); - if (fd < 0) DIE("bufferevent_getfd"); - - if (!options.no_nodelay) { - int one = 1; - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, - (void *) &one, sizeof(one)) < 0) - DIE("setsockopt()"); - } - - if (options.sasl) - issue_sasl(); - else - read_state = IDLE; // This is the most important part! - } else if (events & BEV_EVENT_ERROR) { - int err = bufferevent_socket_get_dns_error(bev); - if (err) DIE("DNS error: %s", evutil_gai_strerror(err)); - - DIE("BEV_EVENT_ERROR: %s", strerror(errno)); - } else if (events & BEV_EVENT_EOF) { - DIE("Unexpected EOF from server."); - } -} - +/** + * Handle incoming data (responses). + */ void Connection::read_callback() { struct evbuffer *input = bufferevent_get_input(bev); -#if USE_CACHED_TIME - struct timeval now_tv; - event_base_gettimeofday_cached(base, &now_tv); -#endif - char *buf = NULL; Operation *op = NULL; - int length; - size_t n_read_out; - - double now; - - // Protocol processing loop. + bool done, full_read; if (op_queue.size() == 0) V("Spurious read callback."); @@ -365,161 +379,25 @@ void Connection::read_callback() { case INIT_READ: DIE("event from uninitialized connection"); case IDLE: return; // We munched all the data we expected? - // Note: for binary, the whole get suite (GET, GET_DATA, END) is collapsed - // into one state case WAITING_FOR_GET: assert(op_queue.size() > 0); - - if (options.binary) { - if (consume_binary_response(input)) { -#if USE_CACHED_TIME - now = tv_to_double(&now_tv); -#else - now = get_time(); -#endif -#if HAVE_CLOCK_GETTIME - op->end_time = get_time_accurate(); -#else - op->end_time = now; -#endif - stats.log_get(*op); - - last_rx = now; - pop_op(); - drive_write_machine(now); - break; - } else { - return; - } - } - - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // A whole line not received yet. Punt. - - stats.rx_bytes += n_read_out; // strlen(buf); - - if (!strcmp(buf, "END")) { - // D("GET (%s) miss.", op->key.c_str()); - stats.get_misses++; - -#if USE_CACHED_TIME - now = tv_to_double(&now_tv); -#else - now = get_time(); -#endif -#if HAVE_CLOCK_GETTIME - op->end_time = get_time_accurate(); -#else - op->end_time = now; -#endif - - stats.log_get(*op); - - free(buf); - - last_rx = now; - pop_op(); - drive_write_machine(); - break; - } else if (!strncmp(buf, "VALUE", 5)) { - sscanf(buf, "VALUE %*s %*d %d", &length); - - // FIXME: check key name to see if it corresponds to the op at - // the head of the op queue? This will be necessary to - // support "gets" where there may be misses. - - data_length = length; - read_state = WAITING_FOR_GET_DATA; - } - - free(buf); - - case WAITING_FOR_GET_DATA: - assert(op_queue.size() > 0); - - length = evbuffer_get_length(input); - - if (length >= data_length + 2) { - // FIXME: Actually parse the value? Right now we just drain it. - evbuffer_drain(input, data_length + 2); - read_state = WAITING_FOR_END; - - stats.rx_bytes += data_length + 2; - } else { + full_read = prot->handle_response(input, done); + if (!full_read) { return; + } else if (done) { + finish_op(op); // sets read_state = IDLE } - case WAITING_FOR_END: - assert(op_queue.size() > 0); - - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // Haven't received a whole line yet. Punt. - - stats.rx_bytes += n_read_out; - - if (!strcmp(buf, "END")) { -#if USE_CACHED_TIME - now = tv_to_double(&now_tv); -#else - now = get_time(); -#endif -#if HAVE_CLOCK_GETTIME - op->end_time = get_time_accurate(); -#else - op->end_time = now; -#endif - - stats.log_get(*op); - - free(buf); - - last_rx = now; - pop_op(); - drive_write_machine(now); - break; - } else { - DIE("Unexpected result when waiting for END"); - } + break; case WAITING_FOR_SET: assert(op_queue.size() > 0); - - if (options.binary) { - if (!consume_binary_response(input)) return; - } else { - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // Haven't received a whole line yet. Punt. - stats.rx_bytes += n_read_out; - } - - now = get_time(); - -#if HAVE_CLOCK_GETTIME - op->end_time = get_time_accurate(); -#else - op->end_time = now; -#endif - - stats.log_set(*op); - - if (!options.binary) - free(buf); - - last_rx = now; - pop_op(); - drive_write_machine(now); + if (!prot->handle_response(input, done)) return; + finish_op(op); break; case LOADING: assert(op_queue.size() > 0); - - if (options.binary) { - if (!consume_binary_response(input)) return; - } else { - buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // Haven't received a whole line yet. - free(buf); - } - + if (!prot->handle_response(input, done)) return; loader_completed++; pop_op(); @@ -534,8 +412,6 @@ void Connection::read_callback() { string keystr = keygen->generate(loader_issued); strcpy(key, keystr.c_str()); int index = lrand48() % (1024 * 1024); - // generate_key(loader_issued, options.keysize, key); - // issue_set(key, &random_char[index], options.valuesize); issue_set(key, &random_char[index], valuesize->generate()); loader_issued++; @@ -544,9 +420,9 @@ void Connection::read_callback() { break; - case WAITING_FOR_SASL: + case CONN_SETUP: assert(options.binary); - if (!consume_binary_response(input)) return; + if (!prot->setup_connection_r(input)) return; read_state = IDLE; break; @@ -556,48 +432,17 @@ void Connection::read_callback() { } /** - * Tries to consume a binary response (in its entirety) from an evbuffer. - * - * @param input evBuffer to read response from - * @return true if consumed, false if not enough data in buffer. + * Callback called when write requests finish. */ -bool Connection::consume_binary_response(evbuffer *input) { - // Read the first 24 bytes as a header - int length = evbuffer_get_length(input); - if (length < 24) return false; - binary_header_t* h = - reinterpret_cast(evbuffer_pullup(input, 24)); - assert(h); - - // Not whole response - int targetLen = 24 + ntohl(h->body_len); - if (length < targetLen) { - return false; - } - - // if something other than success, count it as a miss - if (h->opcode == CMD_GET && h->status) { - stats.get_misses++; - } - - #define unlikely(x) __builtin_expect((x),0) - if (unlikely(h->opcode == CMD_SASL)) { - if (h->status == RESP_OK) { - V("SASL authentication succeeded"); - } else { - DIE("SASL authentication failed"); - } - } - - evbuffer_drain(input, targetLen); - stats.rx_bytes += targetLen; - return true; -} - void Connection::write_callback() {} + +/** + * Callback for timer timeouts. + */ void Connection::timer_callback() { drive_write_machine(); } -// The follow are C trampolines for libevent callbacks. + +/* The follow are C trampolines for libevent callbacks. */ void bev_event_cb(struct bufferevent *bev, short events, void *ptr) { Connection* conn = (Connection*) ptr; conn->event_callback(events); @@ -618,25 +463,3 @@ void timer_cb(evutil_socket_t fd, short what, void *ptr) { conn->timer_callback(); } -void Connection::set_priority(int pri) { - if (bufferevent_priority_set(bev, pri)) - DIE("bufferevent_set_priority(bev, %d) failed", pri); -} - -void Connection::start_loading() { - read_state = LOADING; - loader_issued = loader_completed = 0; - - for (int i = 0; i < LOADER_CHUNK; i++) { - if (loader_issued >= options.records) break; - - char key[256]; - int index = lrand48() % (1024 * 1024); - string keystr = keygen->generate(loader_issued); - strcpy(key, keystr.c_str()); - // generate_key(loader_issued, options.keysize, key); - // issue_set(key, &random_char[index], options.valuesize); - issue_set(key, &random_char[index], valuesize->generate()); - loader_issued++; - } -} diff --git a/Connection.h b/Connection.h index 00e238b..fea451e 100644 --- a/Connection.h +++ b/Connection.h @@ -1,4 +1,6 @@ // -*- c++-mode -*- +#ifndef CONNECTION_H +#define CONNECTION_H #include #include @@ -16,6 +18,8 @@ #include "Operation.h" #include "util.h" +#include "Protocol.h" + using namespace std; void bev_event_cb(struct bufferevent *bev, short events, void *ptr); @@ -23,6 +27,8 @@ void bev_read_cb(struct bufferevent *bev, void *ptr); void bev_write_cb(struct bufferevent *bev, void *ptr); void timer_cb(evutil_socket_t fd, short what, void *ptr); +class Protocol; + class Connection { public: Connection(struct event_base* _base, struct evdns_base* _evdns, @@ -30,19 +36,44 @@ public: bool sampling = true); ~Connection(); + double start_time; // Time when this connection began operations. + ConnectionStats stats; + options_t options; + + bool is_ready() { return read_state == IDLE; } + void set_priority(int pri); + + // state commands + void start() { drive_write_machine(); } + void start_loading(); + void reset(); + bool check_exit_condition(double now = 0.0); + + // event callbacks + void event_callback(short events); + void read_callback(); + void write_callback(); + void timer_callback(); + +private: string hostname; string port; - double start_time; // Time when this connection began operations. + struct event_base *base; + struct evdns_base *evdns; + struct bufferevent *bev; + + struct event *timer; // Used to control inter-transmission time. + double next_time; // Inter-transmission time parameters. + double last_rx; // Used to moderate transmission rate. + double last_tx; enum read_state_enum { INIT_READ, + CONN_SETUP, LOADING, IDLE, - WAITING_FOR_SASL, WAITING_FOR_GET, - WAITING_FOR_GET_DATA, - WAITING_FOR_END, WAITING_FOR_SET, MAX_READ_STATE, }; @@ -58,51 +89,36 @@ public: read_state_enum read_state; write_state_enum write_state; - ConnectionStats stats; - - void issue_get(const char* key, double now = 0.0); - void issue_set(const char* key, const char* value, int length, - double now = 0.0); - void issue_something(double now = 0.0); - void pop_op(); - bool check_exit_condition(double now = 0.0); - void drive_write_machine(double now = 0.0); - - void start_loading(); - - void reset(); - void issue_sasl(); - - void event_callback(short events); - void read_callback(); - void write_callback(); - void timer_callback(); - bool consume_binary_response(evbuffer *input); - - void set_priority(int pri); - - options_t options; - - std::queue op_queue; - -private: - struct event_base *base; - struct evdns_base *evdns; - struct bufferevent *bev; - - struct event *timer; // Used to control inter-transmission time. - // double lambda; - double next_time; // Inter-transmission time parameters. - double last_rx; // Used to moderate transmission rate. - double last_tx; - - int data_length; // When waiting for data, how much we're peeking for. - // Parameters to track progress of the data loader. int loader_issued, loader_completed; + Protocol *prot; Generator *valuesize; Generator *keysize; KeyGenerator *keygen; Generator *iagen; + std::queue op_queue; + + // state machine functions / event processing + void pop_op(); + void finish_op(Operation *op); + void issue_something(double now = 0.0); + void drive_write_machine(double now = 0.0); + + // request functions + void issue_sasl(); + void issue_get(const char* key, double now = 0.0); + void issue_set(const char* key, const char* value, int length, + double now = 0.0); + + // protocol fucntions + int set_request_ascii(const char* key, const char* value, int length); + int set_request_binary(const char* key, const char* value, int length); + int get_request_ascii(const char* key); + int get_request_binary(const char* key); + + bool consume_binary_response(evbuffer *input); + bool consume_ascii_line(evbuffer *input, bool &done); }; + +#endif diff --git a/Protocol.cc b/Protocol.cc new file mode 100644 index 0000000..16005ad --- /dev/null +++ b/Protocol.cc @@ -0,0 +1,202 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "config.h" + +#include "Protocol.h" +#include "Connection.h" +#include "distributions.h" +#include "Generator.h" +#include "mutilate.h" +#include "binary_protocol.h" +#include "util.h" + +#define unlikely(x) __builtin_expect((x),0) + +/** + * Send an ascii get request. + */ +int ProtocolAscii::get_request(const char* key) { + int l; + l = evbuffer_add_printf( + bufferevent_get_output(bev), "get %s\r\n", key); + if (read_state == IDLE) read_state = WAITING_FOR_GET; + return l; +} + +/** + * Send an ascii set request. + */ +int ProtocolAscii::set_request(const char* key, const char* value, int len) { + int l; + l = evbuffer_add_printf(bufferevent_get_output(bev), + "set %s 0 0 %d\r\n", key, len); + bufferevent_write(bev, value, len); + bufferevent_write(bev, "\r\n", 2); + l += len + 2; + if (read_state == IDLE) read_state = WAITING_FOR_END; + return l; +} + +/** + * Handle an ascii response. + */ +bool ProtocolAscii::handle_response(evbuffer *input, bool &done) { + char *buf = NULL; + int len; + size_t n_read_out; + + switch (read_state) { + + case WAITING_FOR_GET: + case WAITING_FOR_END: + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return false; + + conn->stats.rx_bytes += n_read_out; + + if (!strncmp(buf, "END", 3)) { + if (read_state == WAITING_FOR_GET) conn->stats.get_misses++; + read_state = WAITING_FOR_GET; + done = true; + } else if (!strncmp(buf, "VALUE", 5)) { + sscanf(buf, "VALUE %*s %*d %d", &len); + + // FIXME: check key name to see if it corresponds to the op at + // the head of the op queue? This will be necessary to + // support "gets" where there may be misses. + + data_length = len; + read_state = WAITING_FOR_GET_DATA; + done = false; + } else { + // must be a value line.. + done = false; + } + free(buf); + return true; + + case WAITING_FOR_GET_DATA: + len = evbuffer_get_length(input); + if (len >= data_length + 2) { + evbuffer_drain(input, data_length + 2); + read_state = WAITING_FOR_END; + conn->stats.rx_bytes += data_length + 2; + done = false; + return true; + } + return false; + + default: printf("state: %d\n", read_state); DIE("Unimplemented!"); + } + + DIE("Shouldn't ever reach here..."); +} + +/** + * Perform SASL authentication if requested (write). + */ +bool ProtocolBinary::setup_connection_w() { + if (!opts.sasl) return true; + + string user = string(opts.username); + string pass = string(opts.password); + + binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, {0}, 0, 0, 0}; + header.key_len = htons(5); + header.body_len = htonl(6 + user.length() + 1 + pass.length()); + + bufferevent_write(bev, &header, 24); + bufferevent_write(bev, "PLAIN\0", 6); + bufferevent_write(bev, user.c_str(), user.length() + 1); + bufferevent_write(bev, pass.c_str(), pass.length()); + + return false; +} + +/** + * Perform SASL authentication if requested (read). + */ +bool ProtocolBinary::setup_connection_r(evbuffer* input) { + if (!opts.sasl) return true; + + bool b; + return handle_response(input, b); +} + +/** + * Send a binary get request. + */ +int ProtocolBinary::get_request(const char* key) { + uint16_t keylen = strlen(key); + + // each line is 4-bytes + binary_header_t h = { 0x80, CMD_GET, htons(keylen), + 0x00, 0x00, {htons(0)}, + htonl(keylen) }; + + bufferevent_write(bev, &h, 24); // size does not include extras + bufferevent_write(bev, key, keylen); + return 24 + keylen; +} + +/** + * Send a binary set request. + */ +int ProtocolBinary::set_request(const char* key, const char* value, int len) { + uint16_t keylen = strlen(key); + + // each line is 4-bytes + binary_header_t h = { 0x80, CMD_SET, htons(keylen), + 0x08, 0x00, {htons(0)}, + htonl(keylen + 8 + len) }; + + bufferevent_write(bev, &h, 32); // With extras + bufferevent_write(bev, key, keylen); + bufferevent_write(bev, value, len); + return 24 + h.body_len; +} + +/** + * Tries to consume a binary response (in its entirety) from an evbuffer. + * + * @param input evBuffer to read response from + * @return true if consumed, false if not enough data in buffer. + */ +bool ProtocolBinary::handle_response(evbuffer *input, bool &done) { + // Read the first 24 bytes as a header + int length = evbuffer_get_length(input); + if (length < 24) return false; + binary_header_t* h = + reinterpret_cast(evbuffer_pullup(input, 24)); + assert(h); + + // Not whole response + int targetLen = 24 + ntohl(h->body_len); + if (length < targetLen) return false; + + // If something other than success, count it as a miss + if (h->opcode == CMD_GET && h->status) { + conn->stats.get_misses++; + } + + if (unlikely(h->opcode == CMD_SASL)) { + if (h->status == RESP_OK) { + V("SASL authentication succeeded"); + } else { + DIE("SASL authentication failed"); + } + } + + evbuffer_drain(input, targetLen); + conn->stats.rx_bytes += targetLen; + done = true; + return true; +} + diff --git a/Protocol.h b/Protocol.h new file mode 100644 index 0000000..da7b253 --- /dev/null +++ b/Protocol.h @@ -0,0 +1,71 @@ +// -*- c++-mode -*- +#ifndef PROTOCOL_H +#define PROTOCOL_H + +#include + +#include "ConnectionOptions.h" + +using namespace std; + +class Connection; + +class Protocol { +public: + Protocol(options_t _opts, Connection* _conn, bufferevent* _bev): + opts(_opts), conn(_conn), bev(_bev) {}; + ~Protocol() {}; + + virtual bool setup_connection_w() = 0; + virtual bool setup_connection_r(evbuffer* input) = 0; + virtual int get_request(const char* key) = 0; + virtual int set_request(const char* key, const char* value, int len) = 0; + virtual bool handle_response(evbuffer* input, bool &done) = 0; + +protected: + options_t opts; + Connection* conn; + bufferevent* bev; +}; + +class ProtocolAscii : public Protocol { +public: + ProtocolAscii(options_t opts, Connection* conn, bufferevent* bev): + Protocol(opts, conn, bev) { + read_state = IDLE; + }; + + ~ProtocolAscii() {}; + + virtual bool setup_connection_w() { return true; } + virtual bool setup_connection_r(evbuffer* input) { return true; } + virtual int get_request(const char* key); + virtual int set_request(const char* key, const char* value, int len); + virtual bool handle_response(evbuffer* input, bool &done); + +private: + enum read_fsm { + IDLE, + WAITING_FOR_GET, + WAITING_FOR_GET_DATA, + WAITING_FOR_END, + }; + + read_fsm read_state; + int data_length; +}; + +class ProtocolBinary : public Protocol { +public: + ProtocolBinary(options_t opts, Connection* conn, bufferevent* bev): + Protocol(opts, conn, bev) {}; + ~ProtocolBinary() {}; + + virtual bool setup_connection_w(); + virtual bool setup_connection_r(evbuffer* input); + virtual int get_request(const char* key); + virtual int set_request(const char* key, const char* value, int len); + virtual bool handle_response(evbuffer* input, bool &done); +}; + +#endif diff --git a/SConstruct b/SConstruct index b782d90..3a1ab9a 100644 --- a/SConstruct +++ b/SConstruct @@ -8,7 +8,7 @@ env['HAVE_POSIX_BARRIER'] = True env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include']) env.Append(LIBPATH = ['/opt/local/lib']) -env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS') +env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') if sys.platform == 'darwin': env['CC'] = 'clang' env['CXX'] = 'clang++' @@ -32,7 +32,6 @@ if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"): Exit(1) conf.CheckLib("rt", "clock_gettime", language="C++") conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++") -# conf.CheckFunc('clock_gettime') if not conf.CheckFunc('pthread_barrier_init'): conf.env['HAVE_POSIX_BARRIER'] = False @@ -40,13 +39,11 @@ env = conf.Finish() env.Append(CFLAGS = ' -O3 -Wall -g') env.Append(CPPFLAGS = ' -O3 -Wall -g') -#env.Append(CPPFLAGS = ' -D_GNU_SOURCE -D__STDC_FORMAT_MACROS') -#env.Append(CPPFLAGS = ' -DUSE_ADAPTIVE_SAMPLER') env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE') src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc - Connection.cc Generator.cc""") + Connection.cc Protocol.cc Generator.cc""") if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER: src += ['barrier.cc'] diff --git a/mutilate.cc b/mutilate.cc index 227db71..d18473e 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -830,8 +830,7 @@ void do_mutilate(const vector& servers, options_t& options, bool restart = false; for (Connection *conn: connections) - if (conn->read_state != Connection::IDLE) - restart = true; + if (!conn->is_ready()) restart = true; if (restart) continue; else break; @@ -851,8 +850,7 @@ void do_mutilate(const vector& servers, options_t& options, bool restart = false; for (Connection *conn: connections) - if (conn->read_state != Connection::IDLE) - restart = true; + if (!conn->is_ready()) restart = true; if (restart) continue; else break; @@ -895,7 +893,7 @@ void do_mutilate(const vector& servers, options_t& options, for (Connection *conn: connections) { conn->start_time = start; conn->options.time = options.warmup; - conn->drive_write_machine(); // Kick the Connection into motion. + conn->start(); // Kick the Connection into motion. } while (1) { @@ -920,8 +918,7 @@ void do_mutilate(const vector& servers, options_t& options, bool restart = false; for (Connection *conn: connections) - if (conn->read_state != Connection::IDLE) - restart = true; + if (!conn->is_ready()) restart = true; if (restart) { @@ -935,18 +932,15 @@ void do_mutilate(const vector& servers, options_t& options, bool restart = false; for (Connection *conn: connections) - if (conn->read_state != Connection::IDLE) - restart = true; + if (!conn->is_ready()) restart = true; if (restart) continue; else break; } } - // options.time = old_time; for (Connection *conn: connections) { conn->reset(); - // conn->stats = ConnectionStats(); conn->options.time = old_time; } @@ -983,7 +977,7 @@ void do_mutilate(const vector& servers, options_t& options, start = get_time(); for (Connection *conn: connections) { conn->start_time = start; - conn->drive_write_machine(); // Kick the Connection into motion. + conn->start(); // Kick the Connection into motion. } // V("Start = %f", start); @@ -1096,18 +1090,7 @@ void args_to_options(options_t* options) { void init_random_stuff() { static char lorem[] = - R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas -turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus -suscipit nisl ut dui consectetur ultrices tincidunt eros -aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum -ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia -Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus -molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis -parturient montes, nascetur ridiculus mus. Ut cursus viverra -sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis -et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non -risus semper semper. Morbi molestie rhoncus mi, in egestas dui -facilisis et.)"; + R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus suscipit nisl ut dui consectetur ultrices tincidunt eros aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Ut cursus viverra sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non risus semper semper. Morbi molestie rhoncus mi, in egestas dui facilisis et.)"; size_t cursor = 0; From 1e78fc384780cb0476387a8613e69330435a3c2a Mon Sep 17 00:00:00 2001 From: David Terei Date: Tue, 16 Sep 2014 19:02:41 -0700 Subject: [PATCH 2/6] add travis support --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..817d4f2 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: cpp +script: scons +before_install: + - sudo apt-get update -qq + - sudo apt-get install gengetopt From e1d371784ca93715a4cd4adf05e23cf854d0cccf Mon Sep 17 00:00:00 2001 From: David Terei Date: Thu, 11 Dec 2014 21:04:17 -0800 Subject: [PATCH 3/6] improve travis script --- .travis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 817d4f2..615f52f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: cpp script: scons before_install: - - sudo apt-get update -qq - - sudo apt-get install gengetopt + - travis_retry sudo apt-get update -qq +install: + - travis_retry sudo apt-get install gengetopt libzmq-dev libevent-dev scons From a742f44cb79113bab2733d9729b15aba0fa4ac0c Mon Sep 17 00:00:00 2001 From: David Terei Date: Thu, 11 Dec 2014 21:04:26 -0800 Subject: [PATCH 4/6] fix non linux support --- mutilate.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mutilate.cc b/mutilate.cc index d18473e..426fd05 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -660,7 +660,9 @@ void go(const vector& servers, options_t& options, vector ts[options.threads]; #endif +#ifdef __linux__ int current_cpu = -1; +#endif for (int t = 0; t < options.threads; t++) { td[t].options = &options; @@ -683,6 +685,7 @@ void go(const vector& servers, options_t& options, pthread_attr_t attr; pthread_attr_init(&attr); +#ifdef __linux__ if (args.affinity_given) { int max_cpus = 8 * sizeof(cpu_set_t); cpu_set_t m; @@ -704,6 +707,7 @@ void go(const vector& servers, options_t& options, } } } +#endif if (pthread_create(&pt[t], &attr, thread_main, &td[t])) DIE("pthread_create() failed"); From 357f29acf5090796ca72f9508626fc118a4fb865 Mon Sep 17 00:00:00 2001 From: David Terei Date: Thu, 11 Dec 2014 21:04:37 -0800 Subject: [PATCH 5/6] specify c++11 in scons --- SConstruct | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SConstruct b/SConstruct index 3a1ab9a..57d0054 100644 --- a/SConstruct +++ b/SConstruct @@ -8,7 +8,7 @@ env['HAVE_POSIX_BARRIER'] = True env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include']) env.Append(LIBPATH = ['/opt/local/lib']) -env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') +env.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE') if sys.platform == 'darwin': env['CC'] = 'clang' env['CXX'] = 'clang++' From 224cca5bc741f9857d34dd6ea8d22337b84e4318 Mon Sep 17 00:00:00 2001 From: David Terei Date: Thu, 11 Dec 2014 21:04:47 -0800 Subject: [PATCH 6/6] update readme with travis badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9ae67b0..e599886 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Mutilate +Mutilate [![Build Status](https://img.shields.io/travis/dterei/mutilate.svg?style=flat)](https://travis-ci.org/dterei/mutilate) ======== Mutilate is a memcached load generator designed for high request