Added binary support, toggable via --binary.

This commit is contained in:
Stephen Yang 2013-03-01 05:36:35 -08:00
parent 6ffebb2ec7
commit d3fa8769ae
5 changed files with 164 additions and 24 deletions

View File

@ -13,13 +13,15 @@
#include "distributions.h" #include "distributions.h"
#include "Generator.h" #include "Generator.h"
#include "mutilate.h" #include "mutilate.h"
#include "binary_protocol.h"
#include "util.h" #include "util.h"
Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t _options, string _hostname, string _port, options_t _options,
bool sampling) : bool useBinary,bool sampling) :
hostname(_hostname), port(_port), start_time(0), hostname(_hostname), port(_port), start_time(0),
stats(sampling), options(_options), base(_base), evdns(_evdns) stats(sampling), options(_options), base(_base), evdns(_evdns),
useBinary(useBinary)
{ {
valuesize = createGenerator(options.valuesize); valuesize = createGenerator(options.valuesize);
keysize = createGenerator(options.keysize); keysize = createGenerator(options.keysize);
@ -71,8 +73,10 @@ void Connection::reset() {
stats = ConnectionStats(stats.sampling); stats = ConnectionStats(stats.sampling);
} }
void Connection::issue_get(const char* key, double now) { void Connection::issue_get(const char* key, uint16_t keylen, double now) {
Operation op; Operation op;
binary_header h;
int l;
#if HAVE_CLOCK_GETTIME #if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate(); op.start_time = get_time_accurate();
@ -99,13 +103,27 @@ void Connection::issue_get(const char* key, double now) {
if (read_state == IDLE) if (read_state == IDLE)
read_state = WAITING_FOR_GET; read_state = WAITING_FOR_GET;
int l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); if (useBinary) {
// each line is 4-bytes
h = {0x80, CMD_GET, htons(keylen),
0x00, 0x00, htons(0), //TODO(syang0) get actual vbucket?
htonl(keylen) };
l = bufferevent_write(bev, &h, 24); // size does not include extras
l += bufferevent_write(bev, key, keylen);
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
}
if (read_state != LOADING) stats.tx_bytes += l; if (read_state != LOADING) stats.tx_bytes += l;
} }
void Connection::issue_set(const char* key, const char* value, int length, void Connection::issue_set(const char* key, uint16_t keylen,
const char* value, int length,
double now) { double now) {
Operation op; Operation op;
binary_header h;
int l;
#if HAVE_CLOCK_GETTIME #if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate(); op.start_time = get_time_accurate();
@ -120,10 +138,22 @@ void Connection::issue_set(const char* key, const char* value, int length,
if (read_state == IDLE) if (read_state == IDLE)
read_state = WAITING_FOR_SET; read_state = WAITING_FOR_SET;
int l = evbuffer_add_printf(bufferevent_get_output(bev), if (useBinary) {
// each line is 4-bytes
h = { 0x80, CMD_SET, htons(keylen),
0x08, 0x00, htons(0), //TODO(syang0) get actual vbucket?
htonl(keylen + 8 + length)};
bufferevent_write(bev, &h, 32); // With extras
bufferevent_write(bev, key, keylen);
bufferevent_write(bev, value, length);
l = 24 + h.body_len;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev),
"set %s 0 0 %d\r\n", key, length); "set %s 0 0 %d\r\n", key, length);
l += bufferevent_write(bev, value, length); l += bufferevent_write(bev, value, length);
l += bufferevent_write(bev, "\r\n", 2); l += bufferevent_write(bev, "\r\n", 2);
}
if (read_state != LOADING) stats.tx_bytes += l; if (read_state != LOADING) stats.tx_bytes += l;
} }
@ -133,15 +163,16 @@ void Connection::issue_something(double now) {
// FIXME: generate key distribution here! // FIXME: generate key distribution here!
string keystr = keygen->generate(lrand48() % options.records); string keystr = keygen->generate(lrand48() % options.records);
strcpy(key, keystr.c_str()); strcpy(key, keystr.c_str());
uint8_t keylen = keystr.length();
// int key_index = lrand48() % options.records; // int key_index = lrand48() % options.records;
// generate_key(key_index, options.keysize, key); // generate_key(key_index, options.keysize, key);
if (drand48() < options.update) { if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024); int index = lrand48() % (1024 * 1024);
// issue_set(key, &random_char[index], options.valuesize, now); // issue_set(key, &random_char[index], options.valuesize, now);
issue_set(key, &random_char[index], valuesize->generate(), now); issue_set(key, keylen, &random_char[index], valuesize->generate(), now);
} else { } else {
issue_get(key, now); issue_get(key, keylen, now);
} }
} }
@ -300,9 +331,33 @@ void Connection::read_callback() {
case INIT_READ: DIE("event from uninitialized connection"); case INIT_READ: DIE("event from uninitialized connection");
case IDLE: return; // We munched all the data we expected? case IDLE: return; // We munched all the data we expected?
// Note: for binary, the whole get suite (GET, GET_DATA, END) is collapsed
// into one state
case WAITING_FOR_GET: case WAITING_FOR_GET:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
if (useBinary) {
if (consume_binary_response(input)) {
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
pop_op();
drive_write_machine(now);
break;
} else {
return;
}
}
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // A whole line not received yet. Punt. if (buf == NULL) return; // A whole line not received yet. Punt.
@ -391,10 +446,13 @@ void Connection::read_callback() {
case WAITING_FOR_SET: case WAITING_FOR_SET:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
if (useBinary) {
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. Punt. if (buf == NULL) return; // Haven't received a whole line yet. Punt.
stats.rx_bytes += n_read_out; stats.rx_bytes += n_read_out;
}
now = get_time(); now = get_time();
@ -406,6 +464,7 @@ void Connection::read_callback() {
stats.log_set(*op); stats.log_set(*op);
if (!useBinary)
free(buf); free(buf);
pop_op(); pop_op();
@ -415,9 +474,13 @@ void Connection::read_callback() {
case LOADING: case LOADING:
assert(op_queue.size() > 0); assert(op_queue.size() > 0);
if (useBinary) {
if (!consume_binary_response(input)) return;
} else {
buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. if (buf == NULL) return; // Haven't received a whole line yet.
free(buf); free(buf);
}
loader_completed++; loader_completed++;
pop_op(); pop_op();
@ -435,7 +498,8 @@ void Connection::read_callback() {
int index = lrand48() % (1024 * 1024); int index = lrand48() % (1024 * 1024);
// generate_key(loader_issued, options.keysize, key); // generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize); // issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate()); issue_set(key, keystr.length(), &random_char[index],
valuesize->generate());
loader_issued++; loader_issued++;
} }
@ -448,6 +512,36 @@ void Connection::read_callback() {
} }
} }
/**
* Tries to consume a binary response (in its entirety) from an evbuffer.
*
* @param input evBuffer to read response from
* @return true if consumed, false if not enough data in buffer.
*/
bool Connection::consume_binary_response(evbuffer *input) {
// Read the first 24 bytes as a header
int length = evbuffer_get_length(input);
if (length < 24) return false;
binary_header* h =
reinterpret_cast<binary_header*>(evbuffer_pullup(input, 24));
assert(h);
// Not whole response
int targetLen = 24 + ntohl(h->body_len);
if (length < targetLen) {
return false;
}
// if something other than success, count it as a miss
if (h->opcode == CMD_GET && h->status) {
stats.get_misses++;
}
evbuffer_drain(input, targetLen);
stats.rx_bytes += targetLen;
return true;
}
void Connection::write_callback() {} void Connection::write_callback() {}
void Connection::timer_callback() { drive_write_machine(); } void Connection::timer_callback() { drive_write_machine(); }
@ -490,7 +584,7 @@ void Connection::start_loading() {
strcpy(key, keystr.c_str()); strcpy(key, keystr.c_str());
// generate_key(loader_issued, options.keysize, key); // generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize); // issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate()); issue_set(key, keystr.length(), &random_char[index], valuesize->generate());
loader_issued++; loader_issued++;
} }
} }

View File

@ -27,7 +27,7 @@ class Connection {
public: public:
Connection(struct event_base* _base, struct evdns_base* _evdns, Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t options, string _hostname, string _port, options_t options,
bool sampling = true); bool useBinary, bool sampling = true);
~Connection(); ~Connection();
string hostname; string hostname;
@ -59,8 +59,8 @@ public:
ConnectionStats stats; ConnectionStats stats;
void issue_get(const char* key, double now = 0.0); void issue_get(const char* key, uint16_t keylen, double now = 0.0);
void issue_set(const char* key, const char* value, int length, void issue_set(const char* key, uint16_t keylen, const char* value, int length,
double now = 0.0); double now = 0.0);
void issue_something(double now = 0.0); void issue_something(double now = 0.0);
void pop_op(); void pop_op();
@ -75,6 +75,7 @@ public:
void read_callback(); void read_callback();
void write_callback(); void write_callback();
void timer_callback(); void timer_callback();
bool consume_binary_response(evbuffer *input);
void set_priority(int pri); void set_priority(int pri);
@ -99,4 +100,12 @@ private:
Generator *keysize; Generator *keysize;
KeyGenerator *keygen; KeyGenerator *keygen;
Generator *iagen; Generator *iagen;
// Pisces specific
bool useBinary;
//TODO(syang0) give meaning to SASL fields!
bool useSASL;
const char* username;
const char* password;
}; };

35
binary_protocol.h Normal file
View File

@ -0,0 +1,35 @@
/*
* File: binary_protocol.h
* Author: syang0
*
* Created on March 1, 2013, 12:59 AM
*/
#ifndef BINARY_PROTOCOL_H
#define BINARY_PROTOCOL_H
#define CMD_GET 0x00
#define CMD_SET 0x01
typedef struct __attribute__ ((__packed__)) {
uint8_t magic;
uint8_t opcode;
uint16_t key_len;
uint8_t extra_len;
uint8_t data_type;
union {
uint16_t vbucket; // request use
uint16_t status; // response use
};
uint32_t body_len;
uint32_t opaque;
uint64_t version;
// Used for set only.
uint64_t extras;
} binary_header;
#endif /* BINARY_PROTOCOL_H */

View File

@ -12,6 +12,7 @@ text "\nBasic options:"
option "server" s "Memcached server hostname[:port]. \ option "server" s "Memcached server hostname[:port]. \
Repeat to specify multiple servers." string multiple Repeat to specify multiple servers." string multiple
option "binary" - "Use binary memcached protocol instead of ASCII."
option "qps" q "Target aggregate QPS. 0 = peak QPS." int default="0" option "qps" q "Target aggregate QPS. 0 = peak QPS." int default="0"
option "time" t "Maximum time to run (seconds)." int default="5" option "time" t "Maximum time to run (seconds)." int default="5"

View File

@ -629,6 +629,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
for (int c = 0; c < options.connections; c++) { for (int c = 0; c < options.connections; c++) {
Connection* conn = new Connection(base, evdns, hostname, port, options, Connection* conn = new Connection(base, evdns, hostname, port, options,
args.binary_given,
args.agentmode_given ? false : args.agentmode_given ? false :
true); true);
connections.push_back(conn); connections.push_back(conn);