Added binary plaintext SASL support, toggable with --sasl=user[:pass]

This commit is contained in:
Stephen Yang 2013-03-02 05:46:39 -08:00
parent d3fa8769ae
commit 8658d6cbc4
6 changed files with 81 additions and 10 deletions

View File

@ -73,6 +73,29 @@ void Connection::reset() {
stats = ConnectionStats(stats.sampling);
}
void Connection::doPlaintextSASL(string username, string password) {
// each line is 4 bytes
binary_header h = { 0x80, CMD_SASL, htons(0x05),
0x00, 0x00, 0x00,
htonl(5 + 1 + username.length() + 1 + password.length())
};
Operation op;
// This is only done once, so efficiency is not as important.
bufferevent_write(bev, &h, 24);
bufferevent_write(bev, "PLAIN", 5);
bufferevent_write(bev, "\0" , 1);
bufferevent_write(bev, username.c_str(), username.length());
bufferevent_write(bev, "\0" , 1);
bufferevent_write(bev, password.c_str(), password.length());
// Don't care about key nor time
op.type = Operation::SASL;
op_queue.push(op);
read_state = WAITING_FOR_SASL;
}
void Connection::issue_get(const char* key, uint16_t keylen, double now) {
Operation op;
binary_header h;
@ -507,6 +530,12 @@ void Connection::read_callback() {
break;
case WAITING_FOR_SASL:
assert(op_queue.size() > 0);
if (!consume_binary_response(input)) return;
pop_op();
break;
default: DIE("not implemented");
}
}
@ -537,6 +566,15 @@ bool Connection::consume_binary_response(evbuffer *input) {
stats.get_misses++;
}
#define unlikely(x) __builtin_expect((x),0)
if (unlikely(h->opcode == CMD_SASL)) {
if (h->status == RESP_OK) {
V("SASL authentication succeeded");
} else {
DIE("SASL authentication failed");
}
}
evbuffer_drain(input, targetLen);
stats.rx_bytes += targetLen;
return true;

View File

@ -39,6 +39,7 @@ public:
INIT_READ,
LOADING,
IDLE,
WAITING_FOR_SASL,
WAITING_FOR_GET,
WAITING_FOR_GET_DATA,
WAITING_FOR_END,
@ -70,6 +71,7 @@ public:
void start_loading();
void reset();
void doPlaintextSASL(string username, string password);
void event_callback(short events);
void read_callback();
@ -101,11 +103,5 @@ private:
KeyGenerator *keygen;
Generator *iagen;
// Pisces specific
bool useBinary;
//TODO(syang0) give meaning to SASL fields!
bool useSASL;
const char* username;
const char* password;
};

View File

@ -11,7 +11,7 @@ public:
double start_time, end_time;
enum type_enum {
GET, SET
GET, SET, SASL
};
type_enum type;

View File

@ -10,6 +10,10 @@
#define CMD_GET 0x00
#define CMD_SET 0x01
#define CMD_SASL 0x21
#define RESP_OK 0x00
#define RESP_SASL_ERR 0x20
typedef struct __attribute__ ((__packed__)) {
uint8_t magic;

View File

@ -32,6 +32,9 @@ text "\nAdvanced options:"
option "threads" T "Number of threads to spawn." int default="1"
option "connections" c "Connections to establish per server." int default="1"
option "depth" d "Maximum depth to pipeline requests." int default="1"
option "sasl" - "Perform a binary SASL authentication (plaintext) before \
issuing any requests. String format is user:pass (Note: this does NOT
automatically set --binary)" string
option "roundrobin" R "Assign threads to servers in round-robin fashion. \
By default, each thread connects to every server."

View File

@ -629,7 +629,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
for (int c = 0; c < options.connections; c++) {
Connection* conn = new Connection(base, evdns, hostname, port, options,
args.binary_given,
args.binary_given || args.sasl_given,
args.agentmode_given ? false :
true);
connections.push_back(conn);
@ -652,6 +652,36 @@ void do_mutilate(const vector<string>& servers, options_t& options,
else break;
}
// Do SASL and wait to complete
if(args.sasl_given) {
char *user, *pass;
saveptr = NULL;
user = strtok_r(args.sasl_arg, ":", &saveptr);
pass = strtok_r(NULL, ":", &saveptr);
if (user == NULL)
DIE("strtok(.., \":\") failed to parse %s", args.sasl_arg);
for (Connection *conn: connections) {
conn->doPlaintextSASL(user, (pass == NULL) ? "" : pass);
}
while (1) {
// FIXME: If all connections become ready before event_base_loop
// is called, this will deadlock.
event_base_loop(base, EVLOOP_ONCE);
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) continue;
else break;
}
}
// Load database on lead connection for each server.
if (!options.noload) {
V("Loading database.");