diff --git a/Connection.cc b/Connection.cc index 92cac73..d4fe464 100644 --- a/Connection.cc +++ b/Connection.cc @@ -18,10 +18,9 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t _options, - bool useBinary,bool sampling) : + bool sampling) : hostname(_hostname), port(_port), start_time(0), - stats(sampling), options(_options), base(_base), evdns(_evdns), - useBinary(useBinary) + stats(sampling), options(_options), base(_base), evdns(_evdns) { valuesize = createGenerator(options.valuesize); keysize = createGenerator(options.keysize); @@ -74,30 +73,26 @@ void Connection::reset() { } -void Connection::doPlaintextSASL(string username, string password) { - // each line is 4 bytes - binary_header h = { 0x80, CMD_SASL, htons(0x05), - 0x00, 0x00, 0x00, - htonl(5 + 1 + username.length() + 1 + password.length())}; - Operation op; - - // This is only done once, so efficiency is not as important. - bufferevent_write(bev, &h, 24); - bufferevent_write(bev, "PLAIN", 5); - bufferevent_write(bev, "\0" , 1); - bufferevent_write(bev, username.c_str(), username.length()); - bufferevent_write(bev, "\0" , 1); - bufferevent_write(bev, password.c_str(), password.length()); - - // Don't care about key nor time - op.type = Operation::SASL; - op_queue.push(op); +void Connection::issue_sasl() { read_state = WAITING_FOR_SASL; + + string username = string(options.username); + string password = string(options.password); + + binary_header_t header = {0x80, CMD_SASL, 0, 0, 0, 0, 0, 0, 0}; + header.key_len = htons(5); + header.body_len = htonl(6 + username.length() + 1 + password.length()); + + bufferevent_write(bev, &header, 24); + bufferevent_write(bev, "PLAIN\0", 6); + bufferevent_write(bev, username.c_str(), username.length() + 1); + bufferevent_write(bev, password.c_str(), password.length()); } -void Connection::issue_get(const char* key, uint16_t keylen, double now) { +void Connection::issue_get(const char* key, double now) { Operation op; int l; + uint16_t keylen = strlen(key); #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -124,26 +119,28 @@ void Connection::issue_get(const char* key, uint16_t keylen, double now) { if (read_state == IDLE) read_state = WAITING_FOR_GET; - if (useBinary) { + if (options.binary) { // each line is 4-bytes - binary_header h = {0x80, CMD_GET, htons(keylen), + binary_header_t h = {0x80, CMD_GET, htons(keylen), 0x00, 0x00, htons(0), //TODO(syang0) get actual vbucket? htonl(keylen) }; - l = bufferevent_write(bev, &h, 24); // size does not include extras - l += bufferevent_write(bev, key, keylen); + bufferevent_write(bev, &h, 24); // size does not include extras + bufferevent_write(bev, key, keylen); + l = 24 + keylen; } else { - l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); + l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); } if (read_state != LOADING) stats.tx_bytes += l; } -void Connection::issue_set(const char* key, uint16_t keylen, +void Connection::issue_set(const char* key, const char* value, int length, double now) { Operation op; int l; + uint16_t keylen = strlen(key); #if HAVE_CLOCK_GETTIME op.start_time = get_time_accurate(); @@ -158,9 +155,9 @@ void Connection::issue_set(const char* key, uint16_t keylen, if (read_state == IDLE) read_state = WAITING_FOR_SET; - if (useBinary) { + if (options.binary) { // each line is 4-bytes - binary_header h = { 0x80, CMD_SET, htons(keylen), + binary_header_t h = { 0x80, CMD_SET, htons(keylen), 0x08, 0x00, htons(0), //TODO(syang0) get actual vbucket? htonl(keylen + 8 + length)}; @@ -171,8 +168,9 @@ void Connection::issue_set(const char* key, uint16_t keylen, } else { l = evbuffer_add_printf(bufferevent_get_output(bev), "set %s 0 0 %d\r\n", key, length); - l += bufferevent_write(bev, value, length); - l += bufferevent_write(bev, "\r\n", 2); + bufferevent_write(bev, value, length); + bufferevent_write(bev, "\r\n", 2); + l += length + 2; } if (read_state != LOADING) stats.tx_bytes += l; @@ -183,16 +181,15 @@ void Connection::issue_something(double now) { // FIXME: generate key distribution here! string keystr = keygen->generate(lrand48() % options.records); strcpy(key, keystr.c_str()); - uint8_t keylen = keystr.length(); // int key_index = lrand48() % options.records; // generate_key(key_index, options.keysize, key); if (drand48() < options.update) { int index = lrand48() % (1024 * 1024); // issue_set(key, &random_char[index], options.valuesize, now); - issue_set(key, keylen, &random_char[index], valuesize->generate(), now); + issue_set(key, &random_char[index], valuesize->generate(), now); } else { - issue_get(key, keylen, now); + issue_get(key, now); } } @@ -315,7 +312,10 @@ void Connection::event_callback(short events) { DIE("setsockopt()"); } - read_state = IDLE; // This is the most important part! + if (options.sasl) + issue_sasl(); + else + read_state = IDLE; // This is the most important part! } else if (events & BEV_EVENT_ERROR) { int err = bufferevent_socket_get_dns_error(bev); if (err) DIE("DNS error: %s", evutil_gai_strerror(err)); @@ -356,7 +356,7 @@ void Connection::read_callback() { case WAITING_FOR_GET: assert(op_queue.size() > 0); - if (useBinary) { + if (options.binary) { if (consume_binary_response(input)) { #if USE_CACHED_TIME now = tv_to_double(&now_tv); @@ -466,7 +466,7 @@ void Connection::read_callback() { case WAITING_FOR_SET: assert(op_queue.size() > 0); - if (useBinary) { + if (options.binary) { if (!consume_binary_response(input)) return; } else { buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); @@ -484,7 +484,7 @@ void Connection::read_callback() { stats.log_set(*op); - if (!useBinary) + if (!options.binary) free(buf); pop_op(); @@ -494,7 +494,7 @@ void Connection::read_callback() { case LOADING: assert(op_queue.size() > 0); - if (useBinary) { + if (options.binary) { if (!consume_binary_response(input)) return; } else { buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); @@ -518,8 +518,7 @@ void Connection::read_callback() { int index = lrand48() % (1024 * 1024); // generate_key(loader_issued, options.keysize, key); // issue_set(key, &random_char[index], options.valuesize); - issue_set(key, keystr.length(), &random_char[index], - valuesize->generate()); + issue_set(key, &random_char[index], valuesize->generate()); loader_issued++; } @@ -528,9 +527,9 @@ void Connection::read_callback() { break; case WAITING_FOR_SASL: - assert(op_queue.size() > 0); + assert(options.binary); if (!consume_binary_response(input)) return; - pop_op(); + read_state = IDLE; break; default: DIE("not implemented"); @@ -548,8 +547,8 @@ bool Connection::consume_binary_response(evbuffer *input) { // Read the first 24 bytes as a header int length = evbuffer_get_length(input); if (length < 24) return false; - binary_header* h = - reinterpret_cast(evbuffer_pullup(input, 24)); + binary_header_t* h = + reinterpret_cast(evbuffer_pullup(input, 24)); assert(h); // Not whole response @@ -619,7 +618,7 @@ void Connection::start_loading() { strcpy(key, keystr.c_str()); // generate_key(loader_issued, options.keysize, key); // issue_set(key, &random_char[index], options.valuesize); - issue_set(key, keystr.length(), &random_char[index], valuesize->generate()); + issue_set(key, &random_char[index], valuesize->generate()); loader_issued++; } } diff --git a/Connection.h b/Connection.h index b5191b5..00fe03b 100644 --- a/Connection.h +++ b/Connection.h @@ -27,7 +27,7 @@ class Connection { public: Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t options, - bool useBinary, bool sampling = true); + bool sampling = true); ~Connection(); string hostname; @@ -60,8 +60,8 @@ public: ConnectionStats stats; - void issue_get(const char* key, uint16_t keylen, double now = 0.0); - void issue_set(const char* key, uint16_t keylen, const char* value, int length, + void issue_get(const char* key, double now = 0.0); + void issue_set(const char* key, const char* value, int length, double now = 0.0); void issue_something(double now = 0.0); void pop_op(); @@ -71,7 +71,7 @@ public: void start_loading(); void reset(); - void doPlaintextSASL(string username, string password); + void issue_sasl(); void event_callback(short events); void read_callback(); diff --git a/ConnectionOptions.h b/ConnectionOptions.h index b81a8e4..2315fb1 100644 --- a/ConnectionOptions.h +++ b/ConnectionOptions.h @@ -10,6 +10,11 @@ typedef struct { int qps; int records; + bool binary; + bool sasl; + char username[32]; + char password[32]; + char keysize[32]; char valuesize[32]; // int keysize; diff --git a/binary_protocol.h b/binary_protocol.h index c8f3ba4..2b5ef66 100644 --- a/binary_protocol.h +++ b/binary_protocol.h @@ -1,10 +1,3 @@ -/* - * File: binary_protocol.h - * Author: syang0 - * - * Created on March 1, 2013, 12:59 AM - */ - #ifndef BINARY_PROTOCOL_H #define BINARY_PROTOCOL_H @@ -33,7 +26,6 @@ typedef struct __attribute__ ((__packed__)) { // Used for set only. uint64_t extras; -} binary_header; - -#endif /* BINARY_PROTOCOL_H */ +} binary_header_t; +#endif /* BINARY_PROTOCOL_H */ diff --git a/cmdline.ggo b/cmdline.ggo index 640dd44..eb5f9e1 100644 --- a/cmdline.ggo +++ b/cmdline.ggo @@ -29,12 +29,14 @@ option "update" u "Ratio of set:get commands." float default="0.0" text "\nAdvanced options:" +option "username" U "Username to use for SASL authentication." string +option "password" P "Password to use for SASL authentication." string option "threads" T "Number of threads to spawn." int default="1" option "connections" c "Connections to establish per server." int default="1" option "depth" d "Maximum depth to pipeline requests." int default="1" -option "sasl" - "Perform a binary SASL authentication (plaintext) before \ -issuing any requests. String format is user:pass (Note: this does NOT -automatically set --binary)" string +#option "sasl" - "Perform a binary SASL authentication (plaintext) before \ +#issuing any requests. String format is user:pass (Note: this does NOT +#automatically set --binary)" string option "roundrobin" R "Assign threads to servers in round-robin fashion. \ By default, each thread connects to every server." diff --git a/mutilate.cc b/mutilate.cc index 9f3ad1f..792fcb1 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -629,7 +629,6 @@ void do_mutilate(const vector& servers, options_t& options, for (int c = 0; c < options.connections; c++) { Connection* conn = new Connection(base, evdns, hostname, port, options, - args.binary_given || args.sasl_given, args.agentmode_given ? false : true); connections.push_back(conn); @@ -652,36 +651,6 @@ void do_mutilate(const vector& servers, options_t& options, else break; } - // Do SASL and wait to complete - if(args.sasl_given) { - char *user, *pass; - saveptr = NULL; - - user = strtok_r(args.sasl_arg, ":", &saveptr); - pass = strtok_r(NULL, ":", &saveptr); - - if (user == NULL) - DIE("strtok(.., \":\") failed to parse %s", args.sasl_arg); - - for (Connection *conn: connections) { - conn->doPlaintextSASL(user, (pass == NULL) ? "" : pass); - } - - while (1) { - // FIXME: If all connections become ready before event_base_loop - // is called, this will deadlock. - event_base_loop(base, EVLOOP_ONCE); - - bool restart = false; - for (Connection *conn: connections) - if (conn->read_state != Connection::IDLE) - restart = true; - - if (restart) continue; - else break; - } - } - // Load database on lead connection for each server. if (!options.noload) { V("Loading database."); @@ -899,6 +868,19 @@ void args_to_options(options_t* options) { // else options->records = args.records_arg / options->server_given; + options->binary = args.binary_given; + options->sasl = args.username_given; + + if (args.password_given) + strcpy(options->password, args.password_arg); + else + strcpy(options->password, ""); + + if (args.username_given) + strcpy(options->username, args.username_arg); + else + strcpy(options->username, ""); + D("options->records = %d", options->records); if (!options->records) options->records = 1;