diff --git a/Connection.cc b/Connection.cc index 006135e..9f39a77 100644 --- a/Connection.cc +++ b/Connection.cc @@ -73,6 +73,29 @@ void Connection::reset() { stats = ConnectionStats(stats.sampling); } + +void Connection::doPlaintextSASL(string username, string password) { + // each line is 4 bytes + binary_header h = { 0x80, CMD_SASL, htons(0x05), + 0x00, 0x00, 0x00, + htonl(5 + 1 + username.length() + 1 + password.length()) + }; + Operation op; + + // This is only done once, so efficiency is not as important. + bufferevent_write(bev, &h, 24); + bufferevent_write(bev, "PLAIN", 5); + bufferevent_write(bev, "\0" , 1); + bufferevent_write(bev, username.c_str(), username.length()); + bufferevent_write(bev, "\0" , 1); + bufferevent_write(bev, password.c_str(), password.length()); + + // Don't care about key nor time + op.type = Operation::SASL; + op_queue.push(op); + read_state = WAITING_FOR_SASL; +} + void Connection::issue_get(const char* key, uint16_t keylen, double now) { Operation op; binary_header h; @@ -507,6 +530,12 @@ void Connection::read_callback() { break; + case WAITING_FOR_SASL: + assert(op_queue.size() > 0); + if (!consume_binary_response(input)) return; + pop_op(); + break; + default: DIE("not implemented"); } } @@ -537,6 +566,15 @@ bool Connection::consume_binary_response(evbuffer *input) { stats.get_misses++; } + #define unlikely(x) __builtin_expect((x),0) + if (unlikely(h->opcode == CMD_SASL)) { + if (h->status == RESP_OK) { + V("SASL authentication succeeded"); + } else { + DIE("SASL authentication failed"); + } + } + evbuffer_drain(input, targetLen); stats.rx_bytes += targetLen; return true; diff --git a/Connection.h b/Connection.h index 87b2aa1..b5191b5 100644 --- a/Connection.h +++ b/Connection.h @@ -39,6 +39,7 @@ public: INIT_READ, LOADING, IDLE, + WAITING_FOR_SASL, WAITING_FOR_GET, WAITING_FOR_GET_DATA, WAITING_FOR_END, @@ -70,6 +71,7 @@ public: void start_loading(); void reset(); + void doPlaintextSASL(string username, string password); void event_callback(short events); void read_callback(); @@ -101,11 +103,5 @@ private: KeyGenerator *keygen; Generator *iagen; - // Pisces specific bool useBinary; - - //TODO(syang0) give meaning to SASL fields! - bool useSASL; - const char* username; - const char* password; }; diff --git a/Operation.h b/Operation.h index b963c30..b594b17 100644 --- a/Operation.h +++ b/Operation.h @@ -11,7 +11,7 @@ public: double start_time, end_time; enum type_enum { - GET, SET + GET, SET, SASL }; type_enum type; diff --git a/binary_protocol.h b/binary_protocol.h index 6fd3588..c8f3ba4 100644 --- a/binary_protocol.h +++ b/binary_protocol.h @@ -8,8 +8,12 @@ #ifndef BINARY_PROTOCOL_H #define BINARY_PROTOCOL_H -#define CMD_GET 0x00 -#define CMD_SET 0x01 +#define CMD_GET 0x00 +#define CMD_SET 0x01 +#define CMD_SASL 0x21 + +#define RESP_OK 0x00 +#define RESP_SASL_ERR 0x20 typedef struct __attribute__ ((__packed__)) { uint8_t magic; diff --git a/cmdline.ggo b/cmdline.ggo index 81b305d..640dd44 100644 --- a/cmdline.ggo +++ b/cmdline.ggo @@ -32,6 +32,9 @@ text "\nAdvanced options:" option "threads" T "Number of threads to spawn." int default="1" option "connections" c "Connections to establish per server." int default="1" option "depth" d "Maximum depth to pipeline requests." int default="1" +option "sasl" - "Perform a binary SASL authentication (plaintext) before \ +issuing any requests. String format is user:pass (Note: this does NOT +automatically set --binary)" string option "roundrobin" R "Assign threads to servers in round-robin fashion. \ By default, each thread connects to every server." diff --git a/mutilate.cc b/mutilate.cc index 8a18df8..9f3ad1f 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -629,7 +629,7 @@ void do_mutilate(const vector& servers, options_t& options, for (int c = 0; c < options.connections; c++) { Connection* conn = new Connection(base, evdns, hostname, port, options, - args.binary_given, + args.binary_given || args.sasl_given, args.agentmode_given ? false : true); connections.push_back(conn); @@ -652,6 +652,36 @@ void do_mutilate(const vector& servers, options_t& options, else break; } + // Do SASL and wait to complete + if(args.sasl_given) { + char *user, *pass; + saveptr = NULL; + + user = strtok_r(args.sasl_arg, ":", &saveptr); + pass = strtok_r(NULL, ":", &saveptr); + + if (user == NULL) + DIE("strtok(.., \":\") failed to parse %s", args.sasl_arg); + + for (Connection *conn: connections) { + conn->doPlaintextSASL(user, (pass == NULL) ? "" : pass); + } + + while (1) { + // FIXME: If all connections become ready before event_base_loop + // is called, this will deadlock. + event_base_loop(base, EVLOOP_ONCE); + + bool restart = false; + for (Connection *conn: connections) + if (conn->read_state != Connection::IDLE) + restart = true; + + if (restart) continue; + else break; + } + } + // Load database on lead connection for each server. if (!options.noload) { V("Loading database.");