Merged Stephen Yang's and David Terei's binary protocol and SASL implementations.

I ended up cherry-picking different parts from each implementation.
I liked David's changes to the read state machine to implement SASL.
I used Stephene's implementation of the binary protocol integrated with
the ASCII protocol.
This commit is contained in:
Jacob Leverich 2013-03-04 09:31:13 -08:00
parent ff6987e8d7
commit 4e8d2f6d10
6 changed files with 75 additions and 95 deletions

View File

@ -18,10 +18,9 @@
Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t _options,
bool useBinary,bool sampling) :
bool sampling) :
hostname(_hostname), port(_port), start_time(0),
stats(sampling), options(_options), base(_base), evdns(_evdns),
useBinary(useBinary)
stats(sampling), options(_options), base(_base), evdns(_evdns)
{
valuesize = createGenerator(options.valuesize);
keysize = createGenerator(options.keysize);
@ -74,30 +73,26 @@ void Connection::reset() {
}
void Connection::doPlaintextSASL(string username, string password) {
// each line is 4 bytes
binary_header h = { 0x80, CMD_SASL, htons(0x05),
0x00, 0x00, 0x00,
htonl(5 + 1 + username.length() + 1 + password.length())};
Operation op;
// This is only done once, so efficiency is not as important.
bufferevent_write(bev, &h, 24);
bufferevent_write(bev, "PLAIN", 5);
bufferevent_write(bev, "\0" , 1);
bufferevent_write(bev, username.c_str(), username.length());
bufferevent_write(bev, "\0" , 1);
bufferevent_write(bev, password.c_str(), password.length());
// Don't care about key nor time
op.type = Operation::SASL;
op_queue.push(op);
void Connection::issue_sasl() {
read_state = WAITING_FOR_SASL;
string username = string(options.username);
string password = string(options.password);
binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, 0, 0, 0, 0};
header.key_len = htons(5);
header.body_len = htonl(6 + username.length() + 1 + password.length());
bufferevent_write(bev, &header, 24);
bufferevent_write(bev, "PLAIN\0", 6);
bufferevent_write(bev, username.c_str(), username.length() + 1);
bufferevent_write(bev, password.c_str(), password.length());
}
void Connection::issue_get(const char* key, uint16_t keylen, double now) {
void Connection::issue_get(const char* key, double now) {
Operation op;
int l;
uint16_t keylen = strlen(key);
#if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate();
@ -124,26 +119,28 @@ void Connection::issue_get(const char* key, uint16_t keylen, double now) {
if (read_state == IDLE)
read_state = WAITING_FOR_GET;
if (useBinary) {
if (options.binary) {
// each line is 4-bytes
binary_header h = {0x80, CMD_GET, htons(keylen),
binary_header_t h = {0x80, CMD_GET, htons(keylen),
0x00, 0x00, htons(0), //TODO(syang0) get actual vbucket?
htonl(keylen) };
l = bufferevent_write(bev, &h, 24); // size does not include extras
l += bufferevent_write(bev, key, keylen);
bufferevent_write(bev, &h, 24); // size does not include extras
bufferevent_write(bev, key, keylen);
l = 24 + keylen;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
}
if (read_state != LOADING) stats.tx_bytes += l;
}
void Connection::issue_set(const char* key, uint16_t keylen,
void Connection::issue_set(const char* key,
const char* value, int length,
double now) {
Operation op;
int l;
uint16_t keylen = strlen(key);
#if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate();
@ -158,9 +155,9 @@ void Connection::issue_set(const char* key, uint16_t keylen,
if (read_state == IDLE)
read_state = WAITING_FOR_SET;
if (useBinary) {
if (options.binary) {
// each line is 4-bytes
binary_header h = { 0x80, CMD_SET, htons(keylen),
binary_header_t h = { 0x80, CMD_SET, htons(keylen),
0x08, 0x00, htons(0), //TODO(syang0) get actual vbucket?
htonl(keylen + 8 + length)};
@ -171,8 +168,9 @@ void Connection::issue_set(const char* key, uint16_t keylen,
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev),
"set %s 0 0 %d\r\n", key, length);
l += bufferevent_write(bev, value, length);
l += bufferevent_write(bev, "\r\n", 2);
bufferevent_write(bev, value, length);
bufferevent_write(bev, "\r\n", 2);
l += length + 2;
}
if (read_state != LOADING) stats.tx_bytes += l;
@ -183,16 +181,15 @@ void Connection::issue_something(double now) {
// FIXME: generate key distribution here!
string keystr = keygen->generate(lrand48() % options.records);
strcpy(key, keystr.c_str());
uint8_t keylen = keystr.length();
// int key_index = lrand48() % options.records;
// generate_key(key_index, options.keysize, key);
if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024);
// issue_set(key, &random_char[index], options.valuesize, now);
issue_set(key, keylen, &random_char[index], valuesize->generate(), now);
issue_set(key, &random_char[index], valuesize->generate(), now);
} else {
issue_get(key, keylen, now);
issue_get(key, now);
}
}
@ -315,7 +312,10 @@ void Connection::event_callback(short events) {
DIE("setsockopt()");
}
read_state = IDLE; // This is the most important part!
if (options.sasl)
issue_sasl();
else
read_state = IDLE; // This is the most important part!
} else if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err) DIE("DNS error: %s", evutil_gai_strerror(err));
@ -356,7 +356,7 @@ void Connection::read_callback() {
case WAITING_FOR_GET:
assert(op_queue.size() > 0);
if (useBinary) {
if (options.binary) {
if (consume_binary_response(input)) {
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
@ -466,7 +466,7 @@ void Connection::read_callback() {
case WAITING_FOR_SET:
assert(op_queue.size() > 0);
if (useBinary) {
if (options.binary) {
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
@ -484,7 +484,7 @@ void Connection::read_callback() {
stats.log_set(*op);
if (!useBinary)
if (!options.binary)
free(buf);
pop_op();
@ -494,7 +494,7 @@ void Connection::read_callback() {
case LOADING:
assert(op_queue.size() > 0);
if (useBinary) {
if (options.binary) {
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF);
@ -518,8 +518,7 @@ void Connection::read_callback() {
int index = lrand48() % (1024 * 1024);
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, keystr.length(), &random_char[index],
valuesize->generate());
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
@ -528,9 +527,9 @@ void Connection::read_callback() {
break;
case WAITING_FOR_SASL:
assert(op_queue.size() > 0);
assert(options.binary);
if (!consume_binary_response(input)) return;
pop_op();
read_state = IDLE;
break;
default: DIE("not implemented");
@ -548,8 +547,8 @@ bool Connection::consume_binary_response(evbuffer *input) {
// Read the first 24 bytes as a header
int length = evbuffer_get_length(input);
if (length < 24) return false;
binary_header* h =
reinterpret_cast<binary_header*>(evbuffer_pullup(input, 24));
binary_header_t* h =
reinterpret_cast<binary_header_t*>(evbuffer_pullup(input, 24));
assert(h);
// Not whole response
@ -619,7 +618,7 @@ void Connection::start_loading() {
strcpy(key, keystr.c_str());
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, keystr.length(), &random_char[index], valuesize->generate());
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
}

View File

@ -27,7 +27,7 @@ class Connection {
public:
Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t options,
bool useBinary, bool sampling = true);
bool sampling = true);
~Connection();
string hostname;
@ -60,8 +60,8 @@ public:
ConnectionStats stats;
void issue_get(const char* key, uint16_t keylen, double now = 0.0);
void issue_set(const char* key, uint16_t keylen, const char* value, int length,
void issue_get(const char* key, double now = 0.0);
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
void issue_something(double now = 0.0);
void pop_op();
@ -71,7 +71,7 @@ public:
void start_loading();
void reset();
void doPlaintextSASL(string username, string password);
void issue_sasl();
void event_callback(short events);
void read_callback();

View File

@ -10,6 +10,11 @@ typedef struct {
int qps;
int records;
bool binary;
bool sasl;
char username[32];
char password[32];
char keysize[32];
char valuesize[32];
// int keysize;

View File

@ -1,10 +1,3 @@
/*
* File: binary_protocol.h
* Author: syang0
*
* Created on March 1, 2013, 12:59 AM
*/
#ifndef BINARY_PROTOCOL_H
#define BINARY_PROTOCOL_H
@ -33,7 +26,6 @@ typedef struct __attribute__ ((__packed__)) {
// Used for set only.
uint64_t extras;
} binary_header;
#endif /* BINARY_PROTOCOL_H */
} binary_header_t;
#endif /* BINARY_PROTOCOL_H */

View File

@ -29,12 +29,14 @@ option "update" u "Ratio of set:get commands." float default="0.0"
text "\nAdvanced options:"
option "username" U "Username to use for SASL authentication." string
option "password" P "Password to use for SASL authentication." string
option "threads" T "Number of threads to spawn." int default="1"
option "connections" c "Connections to establish per server." int default="1"
option "depth" d "Maximum depth to pipeline requests." int default="1"
option "sasl" - "Perform a binary SASL authentication (plaintext) before \
issuing any requests. String format is user:pass (Note: this does NOT
automatically set --binary)" string
#option "sasl" - "Perform a binary SASL authentication (plaintext) before \
#issuing any requests. String format is user:pass (Note: this does NOT
#automatically set --binary)" string
option "roundrobin" R "Assign threads to servers in round-robin fashion. \
By default, each thread connects to every server."

View File

@ -629,7 +629,6 @@ void do_mutilate(const vector<string>& servers, options_t& options,
for (int c = 0; c < options.connections; c++) {
Connection* conn = new Connection(base, evdns, hostname, port, options,
args.binary_given || args.sasl_given,
args.agentmode_given ? false :
true);
connections.push_back(conn);
@ -652,36 +651,6 @@ void do_mutilate(const vector<string>& servers, options_t& options,
else break;
}
// Do SASL and wait to complete
if(args.sasl_given) {
char *user, *pass;
saveptr = NULL;
user = strtok_r(args.sasl_arg, ":", &saveptr);
pass = strtok_r(NULL, ":", &saveptr);
if (user == NULL)
DIE("strtok(.., \":\") failed to parse %s", args.sasl_arg);
for (Connection *conn: connections) {
conn->doPlaintextSASL(user, (pass == NULL) ? "" : pass);
}
while (1) {
// FIXME: If all connections become ready before event_base_loop
// is called, this will deadlock.
event_base_loop(base, EVLOOP_ONCE);
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) continue;
else break;
}
}
// Load database on lead connection for each server.
if (!options.noload) {
V("Loading database.");
@ -899,6 +868,19 @@ void args_to_options(options_t* options) {
// else
options->records = args.records_arg / options->server_given;
options->binary = args.binary_given;
options->sasl = args.username_given;
if (args.password_given)
strcpy(options->password, args.password_arg);
else
strcpy(options->password, "");
if (args.username_given)
strcpy(options->username, args.username_arg);
else
strcpy(options->username, "");
D("options->records = %d", options->records);
if (!options->records) options->records = 1;