Seperate out protocol handling from connection.

This commit is contained in:
David Terei 2014-09-12 13:47:01 -07:00
parent cfd4688510
commit ecfbd75065
6 changed files with 508 additions and 416 deletions

View File

@ -16,11 +16,14 @@
#include "binary_protocol.h" #include "binary_protocol.h"
#include "util.h" #include "util.h"
/**
* Create a new connection to a server endpoint.
*/
Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t _options, string _hostname, string _port, options_t _options,
bool sampling) : bool sampling) :
hostname(_hostname), port(_port), start_time(0), start_time(0), stats(sampling), options(_options),
stats(sampling), options(_options), base(_base), evdns(_evdns) hostname(_hostname), port(_port), base(_base), evdns(_evdns)
{ {
valuesize = createGenerator(options.valuesize); valuesize = createGenerator(options.valuesize);
keysize = createGenerator(options.keysize); keysize = createGenerator(options.keysize);
@ -43,20 +46,28 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this); bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this);
bufferevent_enable(bev, EV_READ | EV_WRITE); bufferevent_enable(bev, EV_READ | EV_WRITE);
if (options.binary) {
prot = new ProtocolBinary(options, this, bev);
} else {
prot = new ProtocolAscii(options, this, bev);
}
if (bufferevent_socket_connect_hostname(bev, evdns, AF_UNSPEC, if (bufferevent_socket_connect_hostname(bev, evdns, AF_UNSPEC,
hostname.c_str(), hostname.c_str(),
atoi(port.c_str()))) atoi(port.c_str()))) {
DIE("bufferevent_socket_connect_hostname()"); DIE("bufferevent_socket_connect_hostname()");
}
timer = evtimer_new(base, timer_cb, this); timer = evtimer_new(base, timer_cb, this);
} }
/**
* Destroy a connection, performing cleanup.
*/
Connection::~Connection() { Connection::~Connection() {
event_free(timer); event_free(timer);
timer = NULL; timer = NULL;
// FIXME: W("Drain op_q?"); // FIXME: W("Drain op_q?");
bufferevent_free(bev); bufferevent_free(bev);
delete iagen; delete iagen;
@ -65,6 +76,9 @@ Connection::~Connection() {
delete valuesize; delete valuesize;
} }
/**
* Reset the connection back to an initial, fresh state.
*/
void Connection::reset() { void Connection::reset() {
// FIXME: Actually check the connection, drain all bufferevents, drain op_q. // FIXME: Actually check the connection, drain all bufferevents, drain op_q.
assert(op_queue.size() == 0); assert(op_queue.size() == 0);
@ -74,27 +88,56 @@ void Connection::reset() {
stats = ConnectionStats(stats.sampling); stats = ConnectionStats(stats.sampling);
} }
/**
void Connection::issue_sasl() { * Set our event processing priority.
read_state = WAITING_FOR_SASL; */
void Connection::set_priority(int pri) {
string username = string(options.username); if (bufferevent_priority_set(bev, pri)) {
string password = string(options.password); DIE("bufferevent_set_priority(bev, %d) failed", pri);
}
binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, {0}, 0, 0, 0};
header.key_len = htons(5);
header.body_len = htonl(6 + username.length() + 1 + password.length());
bufferevent_write(bev, &header, 24);
bufferevent_write(bev, "PLAIN\0", 6);
bufferevent_write(bev, username.c_str(), username.length() + 1);
bufferevent_write(bev, password.c_str(), password.length());
} }
/**
* Load any required test data onto the server.
*/
void Connection::start_loading() {
read_state = LOADING;
loader_issued = loader_completed = 0;
for (int i = 0; i < LOADER_CHUNK; i++) {
if (loader_issued >= options.records) break;
char key[256];
int index = lrand48() % (1024 * 1024);
string keystr = keygen->generate(loader_issued);
strcpy(key, keystr.c_str());
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
}
/**
* Issue either a get or set request to the server according to our probability distribution.
*/
void Connection::issue_something(double now) {
char key[256];
// FIXME: generate key distribution here!
string keystr = keygen->generate(lrand48() % options.records);
strcpy(key, keystr.c_str());
if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024);
issue_set(key, &random_char[index], valuesize->generate(), now);
} else {
issue_get(key, now);
}
}
/**
* Issue a get request to the server.
*/
void Connection::issue_get(const char* key, double now) { void Connection::issue_get(const char* key, double now) {
Operation op; Operation op;
int l; int l;
uint16_t keylen = strlen(key);
#if HAVE_CLOCK_GETTIME #if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate(); op.start_time = get_time_accurate();
@ -103,7 +146,6 @@ void Connection::issue_get(const char* key, double now) {
#if USE_CACHED_TIME #if USE_CACHED_TIME
struct timeval now_tv; struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv); event_base_gettimeofday_cached(base, &now_tv);
op.start_time = tv_to_double(&now_tv); op.start_time = tv_to_double(&now_tv);
#else #else
op.start_time = get_time(); op.start_time = get_time();
@ -113,35 +155,22 @@ void Connection::issue_get(const char* key, double now) {
} }
#endif #endif
op.type = Operation::GET;
op.key = string(key); op.key = string(key);
op.type = Operation::GET;
op_queue.push(op); op_queue.push(op);
if (read_state == IDLE) if (read_state == IDLE) read_state = WAITING_FOR_GET;
read_state = WAITING_FOR_GET; l = prot->get_request(key);
if (options.binary) {
// each line is 4-bytes
binary_header_t h = {0x80, CMD_GET, htons(keylen),
0x00, 0x00, {htons(0)}, //TODO(syang0) get actual vbucket?
htonl(keylen) };
bufferevent_write(bev, &h, 24); // size does not include extras
bufferevent_write(bev, key, keylen);
l = 24 + keylen;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
}
if (read_state != LOADING) stats.tx_bytes += l; if (read_state != LOADING) stats.tx_bytes += l;
} }
/**
* Issue a set request to the server.
*/
void Connection::issue_set(const char* key, const char* value, int length, void Connection::issue_set(const char* key, const char* value, int length,
double now) { double now) {
Operation op; Operation op;
int l; int l;
uint16_t keylen = strlen(key);
#if HAVE_CLOCK_GETTIME #if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate(); op.start_time = get_time_accurate();
@ -153,47 +182,14 @@ void Connection::issue_set(const char* key, const char* value, int length,
op.type = Operation::SET; op.type = Operation::SET;
op_queue.push(op); op_queue.push(op);
if (read_state == IDLE) if (read_state == IDLE) read_state = WAITING_FOR_SET;
read_state = WAITING_FOR_SET; l = prot->set_request(key, value, length);
if (options.binary) {
// each line is 4-bytes
binary_header_t h = { 0x80, CMD_SET, htons(keylen),
0x08, 0x00, {htons(0)}, //TODO(syang0) get actual vbucket?
htonl(keylen + 8 + length)};
bufferevent_write(bev, &h, 32); // With extras
bufferevent_write(bev, key, keylen);
bufferevent_write(bev, value, length);
l = 24 + h.body_len;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev),
"set %s 0 0 %d\r\n", key, length);
bufferevent_write(bev, value, length);
bufferevent_write(bev, "\r\n", 2);
l += length + 2;
}
if (read_state != LOADING) stats.tx_bytes += l; if (read_state != LOADING) stats.tx_bytes += l;
} }
void Connection::issue_something(double now) { /**
char key[256]; * Return the oldest live operation in progress.
// FIXME: generate key distribution here! */
string keystr = keygen->generate(lrand48() % options.records);
strcpy(key, keystr.c_str());
// int key_index = lrand48() % options.records;
// generate_key(key_index, options.keysize, key);
if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024);
// issue_set(key, &random_char[index], options.valuesize, now);
issue_set(key, &random_char[index], valuesize->generate(), now);
} else {
issue_get(key, now);
}
}
void Connection::pop_op() { void Connection::pop_op() {
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
@ -213,6 +209,39 @@ void Connection::pop_op() {
} }
} }
/**
* Finish up (record stats) an operation that just returned from the
* server.
*/
void Connection::finish_op(Operation *op) {
double now;
#if USE_CACHED_TIME
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
switch (op->type) {
case Operation::GET: stats.log_get(*op); break;
case Operation::SET: stats.log_set(*op); break;
default: DIE("Not implemented.");
}
last_rx = now;
pop_op();
drive_write_machine();
}
/**
* Check if our testing is done and we should exit.
*/
bool Connection::check_exit_condition(double now) { bool Connection::check_exit_condition(double now) {
if (read_state == INIT_READ) return false; if (read_state == INIT_READ) return false;
if (now == 0.0) now = get_time(); if (now == 0.0) now = get_time();
@ -221,10 +250,43 @@ bool Connection::check_exit_condition(double now) {
return false; return false;
} }
// drive_write_machine() determines whether or not to issue a new /**
// command. Note that this function loops. Be wary of break * Handle new connection and error events.
// vs. return. */
void Connection::event_callback(short events) {
if (events & BEV_EVENT_CONNECTED) {
D("Connected to %s:%s.", hostname.c_str(), port.c_str());
int fd = bufferevent_getfd(bev);
if (fd < 0) DIE("bufferevent_getfd");
if (!options.no_nodelay) {
int one = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(void *) &one, sizeof(one)) < 0)
DIE("setsockopt()");
}
read_state = CONN_SETUP;
if (prot->setup_connection_w()) {
read_state = IDLE;
}
} else if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err) DIE("DNS error: %s", evutil_gai_strerror(err));
DIE("BEV_EVENT_ERROR: %s", strerror(errno));
} else if (events & BEV_EVENT_EOF) {
DIE("Unexpected EOF from server.");
}
}
/**
* Request generation loop. Determines whether or not to issue a new command,
* based on timer events.
*
* Note that this function loops. Be wary of break vs. return.
*/
void Connection::drive_write_machine(double now) { void Connection::drive_write_machine(double now) {
if (now == 0.0) now = get_time(); if (now == 0.0) now = get_time();
@ -237,11 +299,9 @@ void Connection::drive_write_machine(double now) {
switch (write_state) { switch (write_state) {
case INIT_WRITE: case INIT_WRITE:
delay = iagen->generate(); delay = iagen->generate();
next_time = now + delay; next_time = now + delay;
double_to_tv(delay, &tv); double_to_tv(delay, &tv);
evtimer_add(timer, &tv); evtimer_add(timer, &tv);
write_state = WAITING_FOR_TIME; write_state = WAITING_FOR_TIME;
break; break;
@ -253,16 +313,10 @@ void Connection::drive_write_machine(double now) {
write_state = WAITING_FOR_TIME; write_state = WAITING_FOR_TIME;
break; // We want to run through the state machine one more time break; // We want to run through the state machine one more time
// to make sure the timer is armed. // to make sure the timer is armed.
// } else if (options.moderate && options.lambda > 0.0 &&
// now < last_rx + 0.25 / options.lambda) {
} else if (options.moderate && now < last_rx + 0.00025) { } else if (options.moderate && now < last_rx + 0.00025) {
write_state = WAITING_FOR_TIME; write_state = WAITING_FOR_TIME;
if (!event_pending(timer, EV_TIMEOUT, NULL)) { if (!event_pending(timer, EV_TIMEOUT, NULL)) {
// delay = last_rx + 0.25 / options.lambda - now;
delay = last_rx + 0.00025 - now; delay = last_rx + 0.00025 - now;
// I("MODERATE %f %f %f %f %f", now - last_rx, 0.25/options.lambda,
// 1/options.lambda, now-last_tx, delay);
double_to_tv(delay, &tv); double_to_tv(delay, &tv);
evtimer_add(timer, &tv); evtimer_add(timer, &tv);
} }
@ -272,7 +326,6 @@ void Connection::drive_write_machine(double now) {
issue_something(now); issue_something(now);
last_tx = now; last_tx = now;
stats.log_op(op_queue.size()); stats.log_op(op_queue.size());
next_time += iagen->generate(); next_time += iagen->generate();
if (options.skip && options.lambda > 0.0 && if (options.skip && options.lambda > 0.0 &&
@ -284,7 +337,6 @@ void Connection::drive_write_machine(double now) {
next_time += iagen->generate(); next_time += iagen->generate();
} }
} }
break; break;
case WAITING_FOR_TIME: case WAITING_FOR_TIME:
@ -296,7 +348,6 @@ void Connection::drive_write_machine(double now) {
} }
return; return;
} }
write_state = ISSUING; write_state = ISSUING;
break; break;
@ -310,51 +361,14 @@ void Connection::drive_write_machine(double now) {
} }
} }
void Connection::event_callback(short events) { /**
// struct timeval now_tv; * Handle incoming data (responses).
// event_base_gettimeofday_cached(base, &now_tv); */
if (events & BEV_EVENT_CONNECTED) {
D("Connected to %s:%s.", hostname.c_str(), port.c_str());
int fd = bufferevent_getfd(bev);
if (fd < 0) DIE("bufferevent_getfd");
if (!options.no_nodelay) {
int one = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(void *) &one, sizeof(one)) < 0)
DIE("setsockopt()");
}
if (options.sasl)
issue_sasl();
else
read_state = IDLE; // This is the most important part!
} else if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err) DIE("DNS error: %s", evutil_gai_strerror(err));
DIE("BEV_EVENT_ERROR: %s", strerror(errno));
} else if (events & BEV_EVENT_EOF) {
DIE("Unexpected EOF from server.");
}
}
void Connection::read_callback() { void Connection::read_callback() {
struct evbuffer *input = bufferevent_get_input(bev); struct evbuffer *input = bufferevent_get_input(bev);
#if USE_CACHED_TIME
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
#endif
char *buf = NULL;
Operation *op = NULL; Operation *op = NULL;
int length; bool done, full_read;
size_t n_read_out;
double now;
// Protocol processing loop.
if (op_queue.size() == 0) V("Spurious read callback."); if (op_queue.size() == 0) V("Spurious read callback.");
@ -365,161 +379,25 @@ void Connection::read_callback() {
case INIT_READ: DIE("event from uninitialized connection"); case INIT_READ: DIE("event from uninitialized connection");
case IDLE: return; // We munched all the data we expected? case IDLE: return; // We munched all the data we expected?
// Note: for binary, the whole get suite (GET, GET_DATA, END) is collapsed
// into one state
case WAITING_FOR_GET: case WAITING_FOR_GET:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
full_read = prot->handle_response(input, done);
if (options.binary) { if (!full_read) {
if (consume_binary_response(input)) {
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
last_rx = now;
pop_op();
drive_write_machine(now);
break;
} else {
return; return;
} else if (done) {
finish_op(op); // sets read_state = IDLE
} }
}
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // A whole line not received yet. Punt.
stats.rx_bytes += n_read_out; // strlen(buf);
if (!strcmp(buf, "END")) {
// D("GET (%s) miss.", op->key.c_str());
stats.get_misses++;
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
free(buf);
last_rx = now;
pop_op();
drive_write_machine();
break; break;
} else if (!strncmp(buf, "VALUE", 5)) {
sscanf(buf, "VALUE %*s %*d %d", &length);
// FIXME: check key name to see if it corresponds to the op at
// the head of the op queue? This will be necessary to
// support "gets" where there may be misses.
data_length = length;
read_state = WAITING_FOR_GET_DATA;
}
free(buf);
case WAITING_FOR_GET_DATA:
assert(op_queue.size() > 0);
length = evbuffer_get_length(input);
if (length >= data_length + 2) {
// FIXME: Actually parse the value? Right now we just drain it.
evbuffer_drain(input, data_length + 2);
read_state = WAITING_FOR_END;
stats.rx_bytes += data_length + 2;
} else {
return;
}
case WAITING_FOR_END:
assert(op_queue.size() > 0);
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. Punt.
stats.rx_bytes += n_read_out;
if (!strcmp(buf, "END")) {
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
free(buf);
last_rx = now;
pop_op();
drive_write_machine(now);
break;
} else {
DIE("Unexpected result when waiting for END");
}
case WAITING_FOR_SET: case WAITING_FOR_SET:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
if (!prot->handle_response(input, done)) return;
if (options.binary) { finish_op(op);
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. Punt.
stats.rx_bytes += n_read_out;
}
now = get_time();
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_set(*op);
if (!options.binary)
free(buf);
last_rx = now;
pop_op();
drive_write_machine(now);
break; break;
case LOADING: case LOADING:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
if (!prot->handle_response(input, done)) return;
if (options.binary) {
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet.
free(buf);
}
loader_completed++; loader_completed++;
pop_op(); pop_op();
@ -534,8 +412,6 @@ void Connection::read_callback() {
string keystr = keygen->generate(loader_issued); string keystr = keygen->generate(loader_issued);
strcpy(key, keystr.c_str()); strcpy(key, keystr.c_str());
int index = lrand48() % (1024 * 1024); int index = lrand48() % (1024 * 1024);
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate()); issue_set(key, &random_char[index], valuesize->generate());
loader_issued++; loader_issued++;
@ -544,9 +420,9 @@ void Connection::read_callback() {
break; break;
case WAITING_FOR_SASL: case CONN_SETUP:
assert(options.binary); assert(options.binary);
if (!consume_binary_response(input)) return; if (!prot->setup_connection_r(input)) return;
read_state = IDLE; read_state = IDLE;
break; break;
@ -556,48 +432,17 @@ void Connection::read_callback() {
} }
/** /**
* Tries to consume a binary response (in its entirety) from an evbuffer. * Callback called when write requests finish.
*
* @param input evBuffer to read response from
* @return true if consumed, false if not enough data in buffer.
*/ */
bool Connection::consume_binary_response(evbuffer *input) {
// Read the first 24 bytes as a header
int length = evbuffer_get_length(input);
if (length < 24) return false;
binary_header_t* h =
reinterpret_cast<binary_header_t*>(evbuffer_pullup(input, 24));
assert(h);
// Not whole response
int targetLen = 24 + ntohl(h->body_len);
if (length < targetLen) {
return false;
}
// if something other than success, count it as a miss
if (h->opcode == CMD_GET && h->status) {
stats.get_misses++;
}
#define unlikely(x) __builtin_expect((x),0)
if (unlikely(h->opcode == CMD_SASL)) {
if (h->status == RESP_OK) {
V("SASL authentication succeeded");
} else {
DIE("SASL authentication failed");
}
}
evbuffer_drain(input, targetLen);
stats.rx_bytes += targetLen;
return true;
}
void Connection::write_callback() {} void Connection::write_callback() {}
/**
* Callback for timer timeouts.
*/
void Connection::timer_callback() { drive_write_machine(); } void Connection::timer_callback() { drive_write_machine(); }
// The follow are C trampolines for libevent callbacks.
/* The follow are C trampolines for libevent callbacks. */
void bev_event_cb(struct bufferevent *bev, short events, void *ptr) { void bev_event_cb(struct bufferevent *bev, short events, void *ptr) {
Connection* conn = (Connection*) ptr; Connection* conn = (Connection*) ptr;
conn->event_callback(events); conn->event_callback(events);
@ -618,25 +463,3 @@ void timer_cb(evutil_socket_t fd, short what, void *ptr) {
conn->timer_callback(); conn->timer_callback();
} }
void Connection::set_priority(int pri) {
if (bufferevent_priority_set(bev, pri))
DIE("bufferevent_set_priority(bev, %d) failed", pri);
}
void Connection::start_loading() {
read_state = LOADING;
loader_issued = loader_completed = 0;
for (int i = 0; i < LOADER_CHUNK; i++) {
if (loader_issued >= options.records) break;
char key[256];
int index = lrand48() % (1024 * 1024);
string keystr = keygen->generate(loader_issued);
strcpy(key, keystr.c_str());
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
}

View File

@ -1,4 +1,6 @@
// -*- c++-mode -*- // -*- c++-mode -*-
#ifndef CONNECTION_H
#define CONNECTION_H
#include <queue> #include <queue>
#include <string> #include <string>
@ -16,6 +18,8 @@
#include "Operation.h" #include "Operation.h"
#include "util.h" #include "util.h"
#include "Protocol.h"
using namespace std; using namespace std;
void bev_event_cb(struct bufferevent *bev, short events, void *ptr); void bev_event_cb(struct bufferevent *bev, short events, void *ptr);
@ -23,6 +27,8 @@ void bev_read_cb(struct bufferevent *bev, void *ptr);
void bev_write_cb(struct bufferevent *bev, void *ptr); void bev_write_cb(struct bufferevent *bev, void *ptr);
void timer_cb(evutil_socket_t fd, short what, void *ptr); void timer_cb(evutil_socket_t fd, short what, void *ptr);
class Protocol;
class Connection { class Connection {
public: public:
Connection(struct event_base* _base, struct evdns_base* _evdns, Connection(struct event_base* _base, struct evdns_base* _evdns,
@ -30,19 +36,44 @@ public:
bool sampling = true); bool sampling = true);
~Connection(); ~Connection();
double start_time; // Time when this connection began operations.
ConnectionStats stats;
options_t options;
bool is_ready() { return read_state == IDLE; }
void set_priority(int pri);
// state commands
void start() { drive_write_machine(); }
void start_loading();
void reset();
bool check_exit_condition(double now = 0.0);
// event callbacks
void event_callback(short events);
void read_callback();
void write_callback();
void timer_callback();
private:
string hostname; string hostname;
string port; string port;
double start_time; // Time when this connection began operations. struct event_base *base;
struct evdns_base *evdns;
struct bufferevent *bev;
struct event *timer; // Used to control inter-transmission time.
double next_time; // Inter-transmission time parameters.
double last_rx; // Used to moderate transmission rate.
double last_tx;
enum read_state_enum { enum read_state_enum {
INIT_READ, INIT_READ,
CONN_SETUP,
LOADING, LOADING,
IDLE, IDLE,
WAITING_FOR_SASL,
WAITING_FOR_GET, WAITING_FOR_GET,
WAITING_FOR_GET_DATA,
WAITING_FOR_END,
WAITING_FOR_SET, WAITING_FOR_SET,
MAX_READ_STATE, MAX_READ_STATE,
}; };
@ -58,51 +89,36 @@ public:
read_state_enum read_state; read_state_enum read_state;
write_state_enum write_state; write_state_enum write_state;
ConnectionStats stats;
void issue_get(const char* key, double now = 0.0);
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
void issue_something(double now = 0.0);
void pop_op();
bool check_exit_condition(double now = 0.0);
void drive_write_machine(double now = 0.0);
void start_loading();
void reset();
void issue_sasl();
void event_callback(short events);
void read_callback();
void write_callback();
void timer_callback();
bool consume_binary_response(evbuffer *input);
void set_priority(int pri);
options_t options;
std::queue<Operation> op_queue;
private:
struct event_base *base;
struct evdns_base *evdns;
struct bufferevent *bev;
struct event *timer; // Used to control inter-transmission time.
// double lambda;
double next_time; // Inter-transmission time parameters.
double last_rx; // Used to moderate transmission rate.
double last_tx;
int data_length; // When waiting for data, how much we're peeking for.
// Parameters to track progress of the data loader. // Parameters to track progress of the data loader.
int loader_issued, loader_completed; int loader_issued, loader_completed;
Protocol *prot;
Generator *valuesize; Generator *valuesize;
Generator *keysize; Generator *keysize;
KeyGenerator *keygen; KeyGenerator *keygen;
Generator *iagen; Generator *iagen;
std::queue<Operation> op_queue;
// state machine functions / event processing
void pop_op();
void finish_op(Operation *op);
void issue_something(double now = 0.0);
void drive_write_machine(double now = 0.0);
// request functions
void issue_sasl();
void issue_get(const char* key, double now = 0.0);
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
// protocol fucntions
int set_request_ascii(const char* key, const char* value, int length);
int set_request_binary(const char* key, const char* value, int length);
int get_request_ascii(const char* key);
int get_request_binary(const char* key);
bool consume_binary_response(evbuffer *input);
bool consume_ascii_line(evbuffer *input, bool &done);
}; };
#endif

202
Protocol.cc Normal file
View File

@ -0,0 +1,202 @@
#include <netinet/tcp.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/dns.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/util.h>
#include "config.h"
#include "Protocol.h"
#include "Connection.h"
#include "distributions.h"
#include "Generator.h"
#include "mutilate.h"
#include "binary_protocol.h"
#include "util.h"
#define unlikely(x) __builtin_expect((x),0)
/**
* Send an ascii get request.
*/
int ProtocolAscii::get_request(const char* key) {
int l;
l = evbuffer_add_printf(
bufferevent_get_output(bev), "get %s\r\n", key);
if (read_state == IDLE) read_state = WAITING_FOR_GET;
return l;
}
/**
* Send an ascii set request.
*/
int ProtocolAscii::set_request(const char* key, const char* value, int len) {
int l;
l = evbuffer_add_printf(bufferevent_get_output(bev),
"set %s 0 0 %d\r\n", key, len);
bufferevent_write(bev, value, len);
bufferevent_write(bev, "\r\n", 2);
l += len + 2;
if (read_state == IDLE) read_state = WAITING_FOR_END;
return l;
}
/**
* Handle an ascii response.
*/
bool ProtocolAscii::handle_response(evbuffer *input, bool &done) {
char *buf = NULL;
int len;
size_t n_read_out;
switch (read_state) {
case WAITING_FOR_GET:
case WAITING_FOR_END:
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return false;
conn->stats.rx_bytes += n_read_out;
if (!strncmp(buf, "END", 3)) {
if (read_state == WAITING_FOR_GET) conn->stats.get_misses++;
read_state = WAITING_FOR_GET;
done = true;
} else if (!strncmp(buf, "VALUE", 5)) {
sscanf(buf, "VALUE %*s %*d %d", &len);
// FIXME: check key name to see if it corresponds to the op at
// the head of the op queue? This will be necessary to
// support "gets" where there may be misses.
data_length = len;
read_state = WAITING_FOR_GET_DATA;
done = false;
} else {
// must be a value line..
done = false;
}
free(buf);
return true;
case WAITING_FOR_GET_DATA:
len = evbuffer_get_length(input);
if (len >= data_length + 2) {
evbuffer_drain(input, data_length + 2);
read_state = WAITING_FOR_END;
conn->stats.rx_bytes += data_length + 2;
done = false;
return true;
}
return false;
default: printf("state: %d\n", read_state); DIE("Unimplemented!");
}
DIE("Shouldn't ever reach here...");
}
/**
* Perform SASL authentication if requested (write).
*/
bool ProtocolBinary::setup_connection_w() {
if (!opts.sasl) return true;
string user = string(opts.username);
string pass = string(opts.password);
binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, {0}, 0, 0, 0};
header.key_len = htons(5);
header.body_len = htonl(6 + user.length() + 1 + pass.length());
bufferevent_write(bev, &header, 24);
bufferevent_write(bev, "PLAIN\0", 6);
bufferevent_write(bev, user.c_str(), user.length() + 1);
bufferevent_write(bev, pass.c_str(), pass.length());
return false;
}
/**
* Perform SASL authentication if requested (read).
*/
bool ProtocolBinary::setup_connection_r(evbuffer* input) {
if (!opts.sasl) return true;
bool b;
return handle_response(input, b);
}
/**
* Send a binary get request.
*/
int ProtocolBinary::get_request(const char* key) {
uint16_t keylen = strlen(key);
// each line is 4-bytes
binary_header_t h = { 0x80, CMD_GET, htons(keylen),
0x00, 0x00, {htons(0)},
htonl(keylen) };
bufferevent_write(bev, &h, 24); // size does not include extras
bufferevent_write(bev, key, keylen);
return 24 + keylen;
}
/**
* Send a binary set request.
*/
int ProtocolBinary::set_request(const char* key, const char* value, int len) {
uint16_t keylen = strlen(key);
// each line is 4-bytes
binary_header_t h = { 0x80, CMD_SET, htons(keylen),
0x08, 0x00, {htons(0)},
htonl(keylen + 8 + len) };
bufferevent_write(bev, &h, 32); // With extras
bufferevent_write(bev, key, keylen);
bufferevent_write(bev, value, len);
return 24 + h.body_len;
}
/**
* Tries to consume a binary response (in its entirety) from an evbuffer.
*
* @param input evBuffer to read response from
* @return true if consumed, false if not enough data in buffer.
*/
bool ProtocolBinary::handle_response(evbuffer *input, bool &done) {
// Read the first 24 bytes as a header
int length = evbuffer_get_length(input);
if (length < 24) return false;
binary_header_t* h =
reinterpret_cast<binary_header_t*>(evbuffer_pullup(input, 24));
assert(h);
// Not whole response
int targetLen = 24 + ntohl(h->body_len);
if (length < targetLen) return false;
// If something other than success, count it as a miss
if (h->opcode == CMD_GET && h->status) {
conn->stats.get_misses++;
}
if (unlikely(h->opcode == CMD_SASL)) {
if (h->status == RESP_OK) {
V("SASL authentication succeeded");
} else {
DIE("SASL authentication failed");
}
}
evbuffer_drain(input, targetLen);
conn->stats.rx_bytes += targetLen;
done = true;
return true;
}

71
Protocol.h Normal file
View File

@ -0,0 +1,71 @@
// -*- c++-mode -*-
#ifndef PROTOCOL_H
#define PROTOCOL_H
#include <event2/bufferevent.h>
#include "ConnectionOptions.h"
using namespace std;
class Connection;
class Protocol {
public:
Protocol(options_t _opts, Connection* _conn, bufferevent* _bev):
opts(_opts), conn(_conn), bev(_bev) {};
~Protocol() {};
virtual bool setup_connection_w() = 0;
virtual bool setup_connection_r(evbuffer* input) = 0;
virtual int get_request(const char* key) = 0;
virtual int set_request(const char* key, const char* value, int len) = 0;
virtual bool handle_response(evbuffer* input, bool &done) = 0;
protected:
options_t opts;
Connection* conn;
bufferevent* bev;
};
class ProtocolAscii : public Protocol {
public:
ProtocolAscii(options_t opts, Connection* conn, bufferevent* bev):
Protocol(opts, conn, bev) {
read_state = IDLE;
};
~ProtocolAscii() {};
virtual bool setup_connection_w() { return true; }
virtual bool setup_connection_r(evbuffer* input) { return true; }
virtual int get_request(const char* key);
virtual int set_request(const char* key, const char* value, int len);
virtual bool handle_response(evbuffer* input, bool &done);
private:
enum read_fsm {
IDLE,
WAITING_FOR_GET,
WAITING_FOR_GET_DATA,
WAITING_FOR_END,
};
read_fsm read_state;
int data_length;
};
class ProtocolBinary : public Protocol {
public:
ProtocolBinary(options_t opts, Connection* conn, bufferevent* bev):
Protocol(opts, conn, bev) {};
~ProtocolBinary() {};
virtual bool setup_connection_w();
virtual bool setup_connection_r(evbuffer* input);
virtual int get_request(const char* key);
virtual int set_request(const char* key, const char* value, int len);
virtual bool handle_response(evbuffer* input, bool &done);
};
#endif

View File

@ -8,7 +8,7 @@ env['HAVE_POSIX_BARRIER'] = True
env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include']) env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include'])
env.Append(LIBPATH = ['/opt/local/lib']) env.Append(LIBPATH = ['/opt/local/lib'])
env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS') env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE')
if sys.platform == 'darwin': if sys.platform == 'darwin':
env['CC'] = 'clang' env['CC'] = 'clang'
env['CXX'] = 'clang++' env['CXX'] = 'clang++'
@ -32,7 +32,6 @@ if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"):
Exit(1) Exit(1)
conf.CheckLib("rt", "clock_gettime", language="C++") conf.CheckLib("rt", "clock_gettime", language="C++")
conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++") conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++")
# conf.CheckFunc('clock_gettime')
if not conf.CheckFunc('pthread_barrier_init'): if not conf.CheckFunc('pthread_barrier_init'):
conf.env['HAVE_POSIX_BARRIER'] = False conf.env['HAVE_POSIX_BARRIER'] = False
@ -40,13 +39,11 @@ env = conf.Finish()
env.Append(CFLAGS = ' -O3 -Wall -g') env.Append(CFLAGS = ' -O3 -Wall -g')
env.Append(CPPFLAGS = ' -O3 -Wall -g') env.Append(CPPFLAGS = ' -O3 -Wall -g')
#env.Append(CPPFLAGS = ' -D_GNU_SOURCE -D__STDC_FORMAT_MACROS')
#env.Append(CPPFLAGS = ' -DUSE_ADAPTIVE_SAMPLER')
env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE') env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE')
src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc
Connection.cc Generator.cc""") Connection.cc Protocol.cc Generator.cc""")
if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER: if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER:
src += ['barrier.cc'] src += ['barrier.cc']

View File

@ -830,8 +830,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
bool restart = false; bool restart = false;
for (Connection *conn: connections) for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE) if (!conn->is_ready()) restart = true;
restart = true;
if (restart) continue; if (restart) continue;
else break; else break;
@ -851,8 +850,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
bool restart = false; bool restart = false;
for (Connection *conn: connections) for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE) if (!conn->is_ready()) restart = true;
restart = true;
if (restart) continue; if (restart) continue;
else break; else break;
@ -895,7 +893,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
for (Connection *conn: connections) { for (Connection *conn: connections) {
conn->start_time = start; conn->start_time = start;
conn->options.time = options.warmup; conn->options.time = options.warmup;
conn->drive_write_machine(); // Kick the Connection into motion. conn->start(); // Kick the Connection into motion.
} }
while (1) { while (1) {
@ -920,8 +918,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
bool restart = false; bool restart = false;
for (Connection *conn: connections) for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE) if (!conn->is_ready()) restart = true;
restart = true;
if (restart) { if (restart) {
@ -935,18 +932,15 @@ void do_mutilate(const vector<string>& servers, options_t& options,
bool restart = false; bool restart = false;
for (Connection *conn: connections) for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE) if (!conn->is_ready()) restart = true;
restart = true;
if (restart) continue; if (restart) continue;
else break; else break;
} }
} }
// options.time = old_time;
for (Connection *conn: connections) { for (Connection *conn: connections) {
conn->reset(); conn->reset();
// conn->stats = ConnectionStats();
conn->options.time = old_time; conn->options.time = old_time;
} }
@ -983,7 +977,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
start = get_time(); start = get_time();
for (Connection *conn: connections) { for (Connection *conn: connections) {
conn->start_time = start; conn->start_time = start;
conn->drive_write_machine(); // Kick the Connection into motion. conn->start(); // Kick the Connection into motion.
} }
// V("Start = %f", start); // V("Start = %f", start);
@ -1096,18 +1090,7 @@ void args_to_options(options_t* options) {
void init_random_stuff() { void init_random_stuff() {
static char lorem[] = static char lorem[] =
R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus suscipit nisl ut dui consectetur ultrices tincidunt eros aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Ut cursus viverra sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non risus semper semper. Morbi molestie rhoncus mi, in egestas dui facilisis et.)";
turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus
suscipit nisl ut dui consectetur ultrices tincidunt eros
aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum
ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia
Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus
molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis
parturient montes, nascetur ridiculus mus. Ut cursus viverra
sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis
et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non
risus semper semper. Morbi molestie rhoncus mi, in egestas dui
facilisis et.)";
size_t cursor = 0; size_t cursor = 0;