From d3fa8769ae62fb5697406ae628d974e68e15b2b1 Mon Sep 17 00:00:00 2001 From: Stephen Yang Date: Fri, 1 Mar 2013 05:36:35 -0800 Subject: [PATCH] Added binary support, toggable via --binary. --- Connection.cc | 136 +++++++++++++++++++++++++++++++++++++++------- Connection.h | 15 ++++- binary_protocol.h | 35 ++++++++++++ cmdline.ggo | 1 + mutilate.cc | 1 + 5 files changed, 164 insertions(+), 24 deletions(-) create mode 100644 binary_protocol.h diff --git a/Connection.cc b/Connection.cc index 32bc5ce..006135e 100644 --- a/Connection.cc +++ b/Connection.cc @@ -13,13 +13,15 @@ #include "distributions.h" #include "Generator.h" #include "mutilate.h" +#include "binary_protocol.h" #include "util.h" Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t _options, - bool sampling) : + bool useBinary,bool sampling) : hostname(_hostname), port(_port), start_time(0), - stats(sampling), options(_options), base(_base), evdns(_evdns) + stats(sampling), options(_options), base(_base), evdns(_evdns), + useBinary(useBinary) { valuesize = createGenerator(options.valuesize); keysize = createGenerator(options.keysize); @@ -71,8 +73,10 @@ void Connection::reset() { stats = ConnectionStats(stats.sampling); } -void Connection::issue_get(const char* key, double now) { +void Connection::issue_get(const char* key, uint16_t keylen, double now) { Operation op; + binary_header h; + int l; #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -99,13 +103,27 @@ void Connection::issue_get(const char* key, double now) { if (read_state == IDLE) read_state = WAITING_FOR_GET; - int l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); + if (useBinary) { + // each line is 4-bytes + h = {0x80, CMD_GET, htons(keylen), + 0x00, 0x00, htons(0), //TODO(syang0) get actual vbucket? + htonl(keylen) }; + + l = bufferevent_write(bev, &h, 24); // size does not include extras + l += bufferevent_write(bev, key, keylen); + } else { + l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); + } + if (read_state != LOADING) stats.tx_bytes += l; } -void Connection::issue_set(const char* key, const char* value, int length, +void Connection::issue_set(const char* key, uint16_t keylen, + const char* value, int length, double now) { Operation op; + binary_header h; + int l; #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -120,10 +138,22 @@ void Connection::issue_set(const char* key, const char* value, int length, if (read_state == IDLE) read_state = WAITING_FOR_SET; - int l = evbuffer_add_printf(bufferevent_get_output(bev), - "set %s 0 0 %d\r\n", key, length); - l += bufferevent_write(bev, value, length); - l += bufferevent_write(bev, "\r\n", 2); + if (useBinary) { + // each line is 4-bytes + h = { 0x80, CMD_SET, htons(keylen), + 0x08, 0x00, htons(0), //TODO(syang0) get actual vbucket? + htonl(keylen + 8 + length)}; + + bufferevent_write(bev, &h, 32); // With extras + bufferevent_write(bev, key, keylen); + bufferevent_write(bev, value, length); + l = 24 + h.body_len; + } else { + l = evbuffer_add_printf(bufferevent_get_output(bev), + "set %s 0 0 %d\r\n", key, length); + l += bufferevent_write(bev, value, length); + l += bufferevent_write(bev, "\r\n", 2); + } if (read_state != LOADING) stats.tx_bytes += l; } @@ -133,15 +163,16 @@ void Connection::issue_something(double now) { // FIXME: generate key distribution here! string keystr = keygen->generate(lrand48() % options.records); strcpy(key, keystr.c_str()); + uint8_t keylen = keystr.length(); // int key_index = lrand48() % options.records; // generate_key(key_index, options.keysize, key); if (drand48() < options.update) { int index = lrand48() % (1024 * 1024); // issue_set(key, &random_char[index], options.valuesize, now); - issue_set(key, &random_char[index], valuesize->generate(), now); + issue_set(key, keylen, &random_char[index], valuesize->generate(), now); } else { - issue_get(key, now); + issue_get(key, keylen, now); } } @@ -300,9 +331,33 @@ void Connection::read_callback() { case INIT_READ: DIE("event from uninitialized connection"); case IDLE: return; // We munched all the data we expected? + // Note: for binary, the whole get suite (GET, GET_DATA, END) is collapsed + // into one state case WAITING_FOR_GET: assert(op_queue.size() > 0); + if (useBinary) { + if (consume_binary_response(input)) { +#if USE_CACHED_TIME + now = tv_to_double(&now_tv); +#else + now = get_time(); +#endif +#if HAVE_CLOCK_GETTIME + op->end_time = get_time_accurate(); +#else + op->end_time = now; +#endif + stats.log_get(*op); + + pop_op(); + drive_write_machine(now); + break; + } else { + return; + } + } + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); if (buf == NULL) return; // A whole line not received yet. Punt. @@ -391,10 +446,13 @@ void Connection::read_callback() { case WAITING_FOR_SET: assert(op_queue.size() > 0); - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // Haven't received a whole line yet. Punt. - - stats.rx_bytes += n_read_out; + if (useBinary) { + if (!consume_binary_response(input)) return; + } else { + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // Haven't received a whole line yet. Punt. + stats.rx_bytes += n_read_out; + } now = get_time(); @@ -406,7 +464,8 @@ void Connection::read_callback() { stats.log_set(*op); - free(buf); + if (!useBinary) + free(buf); pop_op(); drive_write_machine(now); @@ -415,9 +474,13 @@ void Connection::read_callback() { case LOADING: assert(op_queue.size() > 0); - buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); - if (buf == NULL) return; // Haven't received a whole line yet. - free(buf); + if (useBinary) { + if (!consume_binary_response(input)) return; + } else { + buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // Haven't received a whole line yet. + free(buf); + } loader_completed++; pop_op(); @@ -435,7 +498,8 @@ void Connection::read_callback() { int index = lrand48() % (1024 * 1024); // generate_key(loader_issued, options.keysize, key); // issue_set(key, &random_char[index], options.valuesize); - issue_set(key, &random_char[index], valuesize->generate()); + issue_set(key, keystr.length(), &random_char[index], + valuesize->generate()); loader_issued++; } @@ -448,6 +512,36 @@ void Connection::read_callback() { } } +/** + * Tries to consume a binary response (in its entirety) from an evbuffer. + * + * @param input evBuffer to read response from + * @return true if consumed, false if not enough data in buffer. + */ +bool Connection::consume_binary_response(evbuffer *input) { + // Read the first 24 bytes as a header + int length = evbuffer_get_length(input); + if (length < 24) return false; + binary_header* h = + reinterpret_cast(evbuffer_pullup(input, 24)); + assert(h); + + // Not whole response + int targetLen = 24 + ntohl(h->body_len); + if (length < targetLen) { + return false; + } + + // if something other than success, count it as a miss + if (h->opcode == CMD_GET && h->status) { + stats.get_misses++; + } + + evbuffer_drain(input, targetLen); + stats.rx_bytes += targetLen; + return true; +} + void Connection::write_callback() {} void Connection::timer_callback() { drive_write_machine(); } @@ -490,7 +584,7 @@ void Connection::start_loading() { strcpy(key, keystr.c_str()); // generate_key(loader_issued, options.keysize, key); // issue_set(key, &random_char[index], options.valuesize); - issue_set(key, &random_char[index], valuesize->generate()); + issue_set(key, keystr.length(), &random_char[index], valuesize->generate()); loader_issued++; } } diff --git a/Connection.h b/Connection.h index 5b7c1e2..87b2aa1 100644 --- a/Connection.h +++ b/Connection.h @@ -27,7 +27,7 @@ class Connection { public: Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t options, - bool sampling = true); + bool useBinary, bool sampling = true); ~Connection(); string hostname; @@ -59,8 +59,8 @@ public: ConnectionStats stats; - void issue_get(const char* key, double now = 0.0); - void issue_set(const char* key, const char* value, int length, + void issue_get(const char* key, uint16_t keylen, double now = 0.0); + void issue_set(const char* key, uint16_t keylen, const char* value, int length, double now = 0.0); void issue_something(double now = 0.0); void pop_op(); @@ -75,6 +75,7 @@ public: void read_callback(); void write_callback(); void timer_callback(); + bool consume_binary_response(evbuffer *input); void set_priority(int pri); @@ -99,4 +100,12 @@ private: Generator *keysize; KeyGenerator *keygen; Generator *iagen; + + // Pisces specific + bool useBinary; + + //TODO(syang0) give meaning to SASL fields! + bool useSASL; + const char* username; + const char* password; }; diff --git a/binary_protocol.h b/binary_protocol.h new file mode 100644 index 0000000..6fd3588 --- /dev/null +++ b/binary_protocol.h @@ -0,0 +1,35 @@ +/* + * File: binary_protocol.h + * Author: syang0 + * + * Created on March 1, 2013, 12:59 AM + */ + +#ifndef BINARY_PROTOCOL_H +#define BINARY_PROTOCOL_H + +#define CMD_GET 0x00 +#define CMD_SET 0x01 + +typedef struct __attribute__ ((__packed__)) { + uint8_t magic; + uint8_t opcode; + uint16_t key_len; + + uint8_t extra_len; + uint8_t data_type; + union { + uint16_t vbucket; // request use + uint16_t status; // response use + }; + + uint32_t body_len; + uint32_t opaque; + uint64_t version; + + // Used for set only. + uint64_t extras; +} binary_header; + +#endif /* BINARY_PROTOCOL_H */ + diff --git a/cmdline.ggo b/cmdline.ggo index 8be8f72..81b305d 100644 --- a/cmdline.ggo +++ b/cmdline.ggo @@ -12,6 +12,7 @@ text "\nBasic options:" option "server" s "Memcached server hostname[:port]. \ Repeat to specify multiple servers." string multiple +option "binary" - "Use binary memcached protocol instead of ASCII." option "qps" q "Target aggregate QPS. 0 = peak QPS." int default="0" option "time" t "Maximum time to run (seconds)." int default="5" diff --git a/mutilate.cc b/mutilate.cc index 87e3514..8a18df8 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -629,6 +629,7 @@ void do_mutilate(const vector& servers, options_t& options, for (int c = 0; c < options.connections; c++) { Connection* conn = new Connection(base, evdns, hostname, port, options, + args.binary_given, args.agentmode_given ? false : true); connections.push_back(conn);