From 386320d266a483fe41050267bbbd595230ac7a8a Mon Sep 17 00:00:00 2001 From: Jacob Leverich Date: Thu, 23 Aug 2012 14:30:32 -0700 Subject: [PATCH] Initial check-in. This is extracted from another git repo. This is the first release, and the prior commit history is not terribly interesting, so I'm not going to bother using filter-branch to try to cleanly isolate the history for this tool. Cheers. --- .gitignore | 10 + AdaptiveSampler.h | 98 +++++ AgentStats.h | 13 + COPYING | 24 ++ Connection.cc | 496 ++++++++++++++++++++++++ Connection.h | 102 +++++ ConnectionOptions.h | 39 ++ ConnectionStats.h | 184 +++++++++ Generator.cc | 78 ++++ Generator.h | 216 +++++++++++ HistogramSampler.h | 92 +++++ Operation.h | 26 ++ README.md | 4 +- SConstruct | 49 +++ TestGenerator.cc | 86 +++++ barrier.cc | 33 ++ barrier.h | 25 ++ cmdline.ggo | 79 ++++ distributions.cc | 34 ++ distributions.h | 14 + log.cc | 19 + log.h | 25 ++ mutilate.cc | 910 ++++++++++++++++++++++++++++++++++++++++++++ mutilate.h | 17 + util.cc | 31 ++ util.h | 52 +++ 26 files changed, 2755 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 AdaptiveSampler.h create mode 100644 AgentStats.h create mode 100644 COPYING create mode 100644 Connection.cc create mode 100644 Connection.h create mode 100644 ConnectionOptions.h create mode 100644 ConnectionStats.h create mode 100644 Generator.cc create mode 100644 Generator.h create mode 100644 HistogramSampler.h create mode 100644 Operation.h create mode 100644 SConstruct create mode 100644 TestGenerator.cc create mode 100644 barrier.cc create mode 100644 barrier.h create mode 100644 cmdline.ggo create mode 100644 distributions.cc create mode 100644 distributions.h create mode 100644 log.cc create mode 100644 log.h create mode 100644 mutilate.cc create mode 100644 mutilate.h create mode 100644 util.cc create mode 100644 util.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e8d211 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +*~ +*.[oda] +.scon* +config.log +gmon.out +mutilate +cmdline.cc +cmdline.h +gtest +config.h diff --git a/AdaptiveSampler.h b/AdaptiveSampler.h new file mode 100644 index 0000000..df9c2d4 --- /dev/null +++ b/AdaptiveSampler.h @@ -0,0 +1,98 @@ +/* -*- c++ -*- */ + +#ifndef ADAPTIVESAMPLER_H +#define ADAPTIVESAMPLER_H + +// Simple exponential-backoff adaptive time series sampler. Will +// record at most max_samples samples out of however many samples are +// thrown at it. Makes a vague effort to do this evenly over the +// samples given to it. The sampling is time invariant (i.e. if you +// start inserting samples at a slower rate, they will be +// under-represented). + +#include +#include +#include +#include +#include + +#include +#include + +#include "log.h" + +template class AdaptiveSampler { +public: + std::vector samples; + unsigned int sample_rate; + unsigned int max_samples; + unsigned int total_samples; + + AdaptiveSampler() = delete; + AdaptiveSampler(int max) : + sample_rate(1), max_samples(max), total_samples(0) { + } + + void sample(T s) { + total_samples++; + + if (drand48() < (1/(double) sample_rate)) + samples.push_back(s); + + // Throw out half of the samples, double sample_rate. + if (samples.size() >= max_samples) { + sample_rate *= 2; + + std::vector half_samples; + for (unsigned int i = 0; i < samples.size(); i++) { + if (drand48() > .5) half_samples.push_back(samples[i]); + } + samples = half_samples; + } + } + + void save_samples(const char* type, const char* filename) { + FILE *file; + + if ((file = fopen(filename, "a")) == NULL) { + W("fopen() failed: %s", strerror(errno)); + return; + } + + for (size_t i = 0; i < samples.size(); i++) { + fprintf(file, "%s %" PRIu64 " %f\n", type, i, samples[i]); + } + } + + double average() { + double result = 0.0; + size_t length = samples.size(); + for (size_t i = 0; i < length; i++) result += samples[i]; + return result/length; + } + + void print_header() { + printf("#%-6s %6s %8s %8s %8s %8s %8s %8s\n", "type", "size", + "min", "max", "avg", "90th", "95th", "99th"); + } + + void print_stats(const char *type, const char *size) { + std::vector samples_copy = samples; + size_t l = samples_copy.size(); + + if (l == 0) { + printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + return; + } + + sort(samples_copy.begin(), samples_copy.end()); + + printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, + samples_copy[0], samples_copy[l-1], average(), + samples_copy[(l*90)/100], samples_copy[(l*95)/100], + samples_copy[(l*99)/100]); + } +}; + +#endif // ADAPTIVESAMPLER_H diff --git a/AgentStats.h b/AgentStats.h new file mode 100644 index 0000000..ab92267 --- /dev/null +++ b/AgentStats.h @@ -0,0 +1,13 @@ +/* -*- c++ -*- */ +#ifndef AGENTSTATS_H +#define AGENTSTATS_H + +class AgentStats { +public: + uint64_t rx_bytes, tx_bytes; + uint64_t gets, sets, get_misses; + + double start, stop; +}; + +#endif // AGENTSTATS_H diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..e4c26bb --- /dev/null +++ b/COPYING @@ -0,0 +1,24 @@ +Copyright (c) 2012, Jacob Leverich, Stanford University +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Stanford University nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Connection.cc b/Connection.cc new file mode 100644 index 0000000..32bc5ce --- /dev/null +++ b/Connection.cc @@ -0,0 +1,496 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "config.h" + +#include "Connection.h" +#include "distributions.h" +#include "Generator.h" +#include "mutilate.h" +#include "util.h" + +Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, + string _hostname, string _port, options_t _options, + bool sampling) : + hostname(_hostname), port(_port), start_time(0), + stats(sampling), options(_options), base(_base), evdns(_evdns) +{ + valuesize = createGenerator(options.valuesize); + keysize = createGenerator(options.keysize); + keygen = new KeyGenerator(keysize, options.records); + + if (options.lambda <= 0) { + iagen = createGenerator("0"); + } else { + D("iagen = createGenerator(%s)", options.ia); + iagen = createGenerator(options.ia); + iagen->set_lambda(options.lambda); + } + + read_state = INIT_READ; + write_state = INIT_WRITE; + + bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); + bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this); + bufferevent_enable(bev, EV_READ | EV_WRITE); + + if (bufferevent_socket_connect_hostname(bev, evdns, AF_UNSPEC, + hostname.c_str(), + atoi(port.c_str()))) + DIE("bufferevent_socket_connect_hostname()"); + + timer = evtimer_new(base, timer_cb, this); +} + +Connection::~Connection() { + event_free(timer); + timer = NULL; + + // FIXME: W("Drain op_q?"); + + bufferevent_free(bev); + + delete iagen; + delete keygen; + delete keysize; + delete valuesize; +} + +void Connection::reset() { + // FIXME: Actually check the connection, drain all bufferevents, drain op_q. + assert(op_queue.size() == 0); + evtimer_del(timer); + read_state = IDLE; + write_state = INIT_WRITE; + stats = ConnectionStats(stats.sampling); +} + +void Connection::issue_get(const char* key, double now) { + Operation op; + +#if HAVE_CLOCK_GETTIME + op.start_time = get_time_accurate(); +#else + if (now == 0.0) { +#if USE_CACHED_TIME + struct timeval now_tv; + event_base_gettimeofday_cached(base, &now_tv); + + op.start_time = tv_to_double(&now_tv); +#else + op.start_time = get_time(); +#endif + } else { + op.start_time = now; + } +#endif + + op.type = Operation::GET; + op.key = string(key); + + op_queue.push(op); + + if (read_state == IDLE) + read_state = WAITING_FOR_GET; + + int l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key); + if (read_state != LOADING) stats.tx_bytes += l; +} + +void Connection::issue_set(const char* key, const char* value, int length, + double now) { + Operation op; + +#if HAVE_CLOCK_GETTIME + op.start_time = get_time_accurate(); +#else + if (now == 0.0) op.start_time = get_time(); + else op.start_time = now; +#endif + + op.type = Operation::SET; + op_queue.push(op); + + if (read_state == IDLE) + read_state = WAITING_FOR_SET; + + int l = evbuffer_add_printf(bufferevent_get_output(bev), + "set %s 0 0 %d\r\n", key, length); + l += bufferevent_write(bev, value, length); + l += bufferevent_write(bev, "\r\n", 2); + + if (read_state != LOADING) stats.tx_bytes += l; +} + +void Connection::issue_something(double now) { + char key[256]; + // FIXME: generate key distribution here! + string keystr = keygen->generate(lrand48() % options.records); + strcpy(key, keystr.c_str()); + // int key_index = lrand48() % options.records; + // generate_key(key_index, options.keysize, key); + + if (drand48() < options.update) { + int index = lrand48() % (1024 * 1024); + // issue_set(key, &random_char[index], options.valuesize, now); + issue_set(key, &random_char[index], valuesize->generate(), now); + } else { + issue_get(key, now); + } +} + +void Connection::pop_op() { + assert(op_queue.size() > 0); + + op_queue.pop(); + + if (read_state == LOADING) return; + read_state = IDLE; + + // Advance the read state machine. + if (op_queue.size() > 0) { + Operation& op = op_queue.front(); + switch (op.type) { + case Operation::GET: read_state = WAITING_FOR_GET; break; + case Operation::SET: read_state = WAITING_FOR_SET; break; + default: DIE("Not implemented."); + } + } +} + +bool Connection::check_exit_condition(double now) { + if (read_state == INIT_READ) return false; + if (now == 0.0) now = get_time(); + if (now > start_time + options.time) return true; + if (options.loadonly && read_state == IDLE) return true; + return false; +} + +// drive_write_machine() determines whether or not to issue a new +// command. Note that this function loops. Be wary of break +// vs. return. + +void Connection::drive_write_machine(double now) { + if (now == 0.0) now = get_time(); + + double delay; + struct timeval tv; + + if (check_exit_condition(now)) return; + + while (1) { + switch (write_state) { + case INIT_WRITE: + /* + if (options.iadist == EXPONENTIAL) + delay = generate_poisson(options.lambda); + else + delay = generate_uniform(options.lambda); + */ + delay = iagen->generate(); + + next_time = now + delay; + double_to_tv(delay, &tv); + evtimer_add(timer, &tv); + + write_state = WAITING_FOR_TIME; + break; + + case ISSUING: + if (op_queue.size() >= (size_t) options.depth) { + write_state = WAITING_FOR_OPQ; + return; + } else if (now < next_time) { + write_state = WAITING_FOR_TIME; + break; // We want to run through the state machine one more time + // to make sure the timer is armed. + } + + issue_something(now); + stats.log_op(op_queue.size()); + + /* + if (options.iadist == EXPONENTIAL) + next_time += generate_poisson(options.lambda); + else + next_time += generate_uniform(options.lambda); + */ + next_time += iagen->generate(); + + break; + + case WAITING_FOR_TIME: + if (now < next_time) { + if (!event_pending(timer, EV_TIMEOUT, NULL)) { + delay = next_time - now; + double_to_tv(delay, &tv); + evtimer_add(timer, &tv); + } + return; + } + + write_state = ISSUING; + break; + + case WAITING_FOR_OPQ: + if (op_queue.size() >= (size_t) options.depth) return; + write_state = ISSUING; + break; + + default: DIE("Not implemented"); + } + } +} + +void Connection::event_callback(short events) { + // struct timeval now_tv; + // event_base_gettimeofday_cached(base, &now_tv); + + if (events & BEV_EVENT_CONNECTED) { + D("Connected to %s:%s.", hostname.c_str(), port.c_str()); + int fd = bufferevent_getfd(bev); + if (fd < 0) DIE("bufferevent_getfd"); + + if (!options.no_nodelay) { + int one = 1; + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + (void *) &one, sizeof(one)) < 0) + DIE("setsockopt()"); + } + + read_state = IDLE; // This is the most important part! + } else if (events & BEV_EVENT_ERROR) { + int err = bufferevent_socket_get_dns_error(bev); + if (err) DIE("DNS error: %s", evutil_gai_strerror(err)); + + DIE("BEV_EVENT_ERROR: %s", strerror(errno)); + } else if (events & BEV_EVENT_EOF) { + DIE("Unexpected EOF from server."); + } +} + +void Connection::read_callback() { + struct evbuffer *input = bufferevent_get_input(bev); +#if USE_CACHED_TIME + struct timeval now_tv; + event_base_gettimeofday_cached(base, &now_tv); +#endif + + char *buf; + Operation *op = NULL; + int length; + size_t n_read_out; + + double now; + + // Protocol processing loop. + + if (op_queue.size() == 0) V("Spurious read callback."); + + while (1) { + if (op_queue.size() > 0) op = &op_queue.front(); + + switch (read_state) { + case INIT_READ: DIE("event from uninitialized connection"); + case IDLE: return; // We munched all the data we expected? + + case WAITING_FOR_GET: + assert(op_queue.size() > 0); + + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // A whole line not received yet. Punt. + + stats.rx_bytes += n_read_out; // strlen(buf); + + if (!strcmp(buf, "END")) { + // D("GET (%s) miss.", op->key.c_str()); + stats.get_misses++; + +#if USE_CACHED_TIME + now = tv_to_double(&now_tv); +#else + now = get_time(); +#endif +#if HAVE_CLOCK_GETTIME + op->end_time = get_time_accurate(); +#else + op->end_time = now; +#endif + + stats.log_get(*op); + + free(buf); + + pop_op(); + drive_write_machine(); + break; + } else if (!strncmp(buf, "VALUE", 5)) { + sscanf(buf, "VALUE %*s %*d %d", &length); + + // FIXME: check key name to see if it corresponds to the op at + // the head of the op queue? This will be necessary to + // support "gets" where there may be misses. + + data_length = length; + read_state = WAITING_FOR_GET_DATA; + } + + free(buf); + + case WAITING_FOR_GET_DATA: + assert(op_queue.size() > 0); + + length = evbuffer_get_length(input); + + if (length >= data_length + 2) { + // FIXME: Actually parse the value? Right now we just drain it. + evbuffer_drain(input, data_length + 2); + read_state = WAITING_FOR_END; + + stats.rx_bytes += data_length + 2; + } else { + return; + } + case WAITING_FOR_END: + assert(op_queue.size() > 0); + + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // Haven't received a whole line yet. Punt. + + stats.rx_bytes += n_read_out; + + if (!strcmp(buf, "END")) { +#if USE_CACHED_TIME + now = tv_to_double(&now_tv); +#else + now = get_time(); +#endif +#if HAVE_CLOCK_GETTIME + op->end_time = get_time_accurate(); +#else + op->end_time = now; +#endif + + stats.log_get(*op); + + free(buf); + + pop_op(); + drive_write_machine(now); + break; + } else { + DIE("Unexpected result when waiting for END"); + } + + case WAITING_FOR_SET: + assert(op_queue.size() > 0); + + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // Haven't received a whole line yet. Punt. + + stats.rx_bytes += n_read_out; + + now = get_time(); + +#if HAVE_CLOCK_GETTIME + op->end_time = get_time_accurate(); +#else + op->end_time = now; +#endif + + stats.log_set(*op); + + free(buf); + + pop_op(); + drive_write_machine(now); + break; + + case LOADING: + assert(op_queue.size() > 0); + + buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF); + if (buf == NULL) return; // Haven't received a whole line yet. + free(buf); + + loader_completed++; + pop_op(); + + if (loader_completed == options.records) { + D("Finished loading."); + read_state = IDLE; + } else { + while (loader_issued < loader_completed + LOADER_CHUNK) { + if (loader_issued >= options.records) break; + + char key[256]; + string keystr = keygen->generate(loader_issued); + strcpy(key, keystr.c_str()); + int index = lrand48() % (1024 * 1024); + // generate_key(loader_issued, options.keysize, key); + // issue_set(key, &random_char[index], options.valuesize); + issue_set(key, &random_char[index], valuesize->generate()); + + loader_issued++; + } + } + + break; + + default: DIE("not implemented"); + } + } +} + +void Connection::write_callback() {} +void Connection::timer_callback() { drive_write_machine(); } + +// The follow are C trampolines for libevent callbacks. +void bev_event_cb(struct bufferevent *bev, short events, void *ptr) { + Connection* conn = (Connection*) ptr; + conn->event_callback(events); +} + +void bev_read_cb(struct bufferevent *bev, void *ptr) { + Connection* conn = (Connection*) ptr; + conn->read_callback(); +} + +void bev_write_cb(struct bufferevent *bev, void *ptr) { + Connection* conn = (Connection*) ptr; + conn->write_callback(); +} + +void timer_cb(evutil_socket_t fd, short what, void *ptr) { + Connection* conn = (Connection*) ptr; + conn->timer_callback(); +} + +void Connection::set_priority(int pri) { + if (bufferevent_priority_set(bev, pri)) + DIE("bufferevent_set_priority(bev, %d) failed", pri); +} + +void Connection::start_loading() { + read_state = LOADING; + loader_issued = loader_completed = 0; + + for (int i = 0; i < LOADER_CHUNK; i++) { + if (loader_issued >= options.records) break; + + char key[256]; + int index = lrand48() % (1024 * 1024); + string keystr = keygen->generate(loader_issued); + strcpy(key, keystr.c_str()); + // generate_key(loader_issued, options.keysize, key); + // issue_set(key, &random_char[index], options.valuesize); + issue_set(key, &random_char[index], valuesize->generate()); + loader_issued++; + } +} diff --git a/Connection.h b/Connection.h new file mode 100644 index 0000000..5b7c1e2 --- /dev/null +++ b/Connection.h @@ -0,0 +1,102 @@ +// -*- c++-mode -*- + +#include +#include + +#include +#include +#include +#include + +#include "AdaptiveSampler.h" +#include "cmdline.h" +#include "ConnectionOptions.h" +#include "ConnectionStats.h" +#include "Generator.h" +#include "Operation.h" +#include "util.h" + +using namespace std; + +void bev_event_cb(struct bufferevent *bev, short events, void *ptr); +void bev_read_cb(struct bufferevent *bev, void *ptr); +void bev_write_cb(struct bufferevent *bev, void *ptr); +void timer_cb(evutil_socket_t fd, short what, void *ptr); + +class Connection { +public: + Connection(struct event_base* _base, struct evdns_base* _evdns, + string _hostname, string _port, options_t options, + bool sampling = true); + ~Connection(); + + string hostname; + string port; + + double start_time; // Time when this connection began operations. + + enum read_state_enum { + INIT_READ, + LOADING, + IDLE, + WAITING_FOR_GET, + WAITING_FOR_GET_DATA, + WAITING_FOR_END, + WAITING_FOR_SET, + MAX_READ_STATE, + }; + + enum write_state_enum { + INIT_WRITE, + ISSUING, + WAITING_FOR_TIME, + WAITING_FOR_OPQ, + MAX_WRITE_STATE, + }; + + read_state_enum read_state; + write_state_enum write_state; + + ConnectionStats stats; + + void issue_get(const char* key, double now = 0.0); + void issue_set(const char* key, const char* value, int length, + double now = 0.0); + void issue_something(double now = 0.0); + void pop_op(); + bool check_exit_condition(double now = 0.0); + void drive_write_machine(double now = 0.0); + + void start_loading(); + + void reset(); + + void event_callback(short events); + void read_callback(); + void write_callback(); + void timer_callback(); + + void set_priority(int pri); + + options_t options; + + std::queue op_queue; + +private: + struct event_base *base; + struct evdns_base *evdns; + struct bufferevent *bev; + + struct event *timer; // Used to control inter-transmission time. + double lambda, next_time; // Inter-transmission time parameters. + + int data_length; // When waiting for data, how much we're peeking for. + + // Parameters to track progress of the data loader. + int loader_issued, loader_completed; + + Generator *valuesize; + Generator *keysize; + KeyGenerator *keygen; + Generator *iagen; +}; diff --git a/ConnectionOptions.h b/ConnectionOptions.h new file mode 100644 index 0000000..b81a8e4 --- /dev/null +++ b/ConnectionOptions.h @@ -0,0 +1,39 @@ +#ifndef CONNECTIONOPTIONS_H +#define CONNECTIONOPTIONS_H + +#include "distributions.h" + +typedef struct { + int connections; + bool blocking; + double lambda; + int qps; + int records; + + char keysize[32]; + char valuesize[32]; + // int keysize; + // int valuesize; + char ia[32]; + + // qps_per_connection + // iadist + + double update; + int time; + bool loadonly; + int depth; + bool no_nodelay; + bool noload; + int threads; + enum distribution_t iadist; + int warmup; + + bool roundrobin; + int server_given; + int lambda_denom; + + bool oob_thread; +} options_t; + +#endif // CONNECTIONOPTIONS_H diff --git a/ConnectionStats.h b/ConnectionStats.h new file mode 100644 index 0000000..2b90920 --- /dev/null +++ b/ConnectionStats.h @@ -0,0 +1,184 @@ +/* -*- c++ -*- */ +#ifndef CONNECTIONSTATS_H +#define CONNECTIONSTATS_H + +#include +#include +#include + +#ifdef USE_ADAPTIVE_SAMPLER +#include "AdaptiveSampler.h" +#else +#include "HistogramSampler.h" +#endif +#include "AgentStats.h" +#include "Operation.h" + +using namespace std; + +class ConnectionStats { + public: + ConnectionStats(bool _sampling = true) : +#ifdef USE_ADAPTIVE_SAMPLER + get_sampler(100000), set_sampler(100000), op_sampler(100000), +#else + get_sampler(10000,1), set_sampler(10000,1), op_sampler(1000,1), +#endif + rx_bytes(0), tx_bytes(0), gets(0), sets(0), + get_misses(0), sampling(_sampling) {} + +#ifdef USE_ADAPTIVE_SAMPLER + AdaptiveSampler get_sampler; + AdaptiveSampler set_sampler; + AdaptiveSampler op_sampler; +#else + HistogramSampler get_sampler; + HistogramSampler set_sampler; + HistogramSampler op_sampler; +#endif + + uint64_t rx_bytes, tx_bytes; + uint64_t gets, sets, get_misses; + + double start, stop; + + bool sampling; + + void log_get(Operation& op) { if (sampling) get_sampler.sample(op); gets++; } + void log_set(Operation& op) { if (sampling) set_sampler.sample(op); sets++; } + void log_op (double op) { if (sampling) op_sampler.sample(op); } + + double get_qps() { + return (gets + sets) / (stop - start); + } + +#ifdef USE_ADAPTIVE_SAMPLER + double get_nth(double nth) { + vector samples; + + if (samples.size() == 0) return 0; + + for (auto s: get_sampler.samples) + samples.push_back(s.time()); // (s.end_time - s.start_time) * 1000000); + for (auto s: set_sampler.samples) + samples.push_back(s.time()); // (s.end_time - s.start_time) * 1000000); + + sort(samples.begin(), samples.end()); + + int l = samples.size(); + int i = (int)((nth * l) / 100); + + assert(i < l); + + return samples[i]; + } +#else + double get_nth(double nth) { + // FIXME: nth across gets & sets? + return get_sampler.get_nth(nth); + } +#endif + + void accumulate(const ConnectionStats &cs) { +#ifdef USE_ADAPTIVE_SAMPLER + for (auto i: cs.get_sampler.samples) get_sampler.sample(i); //log_get(i); + for (auto i: cs.set_sampler.samples) set_sampler.sample(i); //log_set(i); + for (auto i: cs.op_sampler.samples) op_sampler.sample(i); //log_op(i); +#else + get_sampler.accumulate(cs.get_sampler); + set_sampler.accumulate(cs.set_sampler); + op_sampler.accumulate(cs.op_sampler); +#endif + + rx_bytes += cs.rx_bytes; + tx_bytes += cs.tx_bytes; + gets += cs.gets; + sets += cs.sets; + get_misses += cs.get_misses; + + start = cs.start; + stop = cs.stop; + } + + void accumulate(const AgentStats &as) { + rx_bytes += as.rx_bytes; + tx_bytes += as.tx_bytes; + gets += as.gets; + sets += as.sets; + get_misses += as.get_misses; + + start = as.start; + stop = as.stop; + } + + static void print_header() { + printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n", + "#type", "avg", "min", "1st", "5th", "10th", + "90th", "95th", "99th"); + } + +#ifdef USE_ADAPTIVE_SAMPLER + void print_stats(const char *tag, AdaptiveSampler &sampler, + bool newline = true) { + vector copy; + + for (auto i: sampler.samples) copy.push_back(i.time()); + size_t l = copy.size(); + + if (l == 0) { + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + if (newline) printf("\n"); + return; + } + + sort(copy.begin(), copy.end()); + + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l, + copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], + copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100] + ); + if (newline) printf("\n"); + } + + void print_stats(const char *tag, AdaptiveSampler &sampler, + bool newline = true) { + vector copy; + + for (auto i: sampler.samples) copy.push_back(i); + size_t l = copy.size(); + + if (l == 0) { printf("%-7s 0", tag); if (newline) printf("\n"); return; } + + sort(copy.begin(), copy.end()); + + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l, + copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], + copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100] + ); + if (newline) printf("\n"); + } +#else + void print_stats(const char *tag, HistogramSampler &sampler, + bool newline = true) { + if (sampler.total() == 0) { + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + if (newline) printf("\n"); + return; + } + + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, sampler.average(), + sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5), + sampler.get_nth(10), sampler.get_nth(90), + sampler.get_nth(95), sampler.get_nth(99)); + + if (newline) printf("\n"); + } +#endif +}; + +#endif // CONNECTIONSTATS_H diff --git a/Generator.cc b/Generator.cc new file mode 100644 index 0000000..ac5acef --- /dev/null +++ b/Generator.cc @@ -0,0 +1,78 @@ +// -*- c++ -*- + +#include "config.h" + +#include "Generator.h" + +Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); } + +Generator* createFacebookValue() { + Generator* g = new GPareto(214.476, 0.348238); + + Discrete* d = new Discrete(g); + d->add(0.00536, 0.0); + d->add(0.00047, 1.0); + d->add(0.17820, 2.0); + d->add(0.09239, 3.0); + d->add(0.00018, 4.0); + d->add(0.02740, 5.0); + d->add(0.00065, 6.0); + d->add(0.00606, 7.0); + d->add(0.00023, 8.0); + d->add(0.00837, 9.0); + d->add(0.00837, 10.0); + d->add(0.08989, 11.0); + d->add(0.00092, 12.0); + d->add(0.00326, 13.0); + d->add(0.01980, 14.0); + + return d; +} + +Generator* createFacebookIA() { return new GPareto(16.0292, 0.154971); } + +Generator* createGenerator(std::string str) { + if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey(); + else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue(); + else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA(); + + char *s_copy = new char[str.length() + 1]; + strcpy(s_copy, str.c_str()); + char *saveptr = NULL; + + if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) { + double v = atof(s_copy); + delete[] s_copy; + return new Fixed(v); + } + + char *t_ptr = strtok_r(s_copy, ":", &saveptr); + char *a_ptr = strtok_r(NULL, ":", &saveptr); + + if (t_ptr == NULL) // || a_ptr == NULL) + DIE("strtok(.., \":\") failed to parse %s", str.c_str()); + + char t = t_ptr[0]; + + saveptr = NULL; + char *s1 = strtok_r(a_ptr, ",", &saveptr); + char *s2 = strtok_r(NULL, ",", &saveptr); + char *s3 = strtok_r(NULL, ",", &saveptr); + + double a1 = s1 ? atof(s1) : 0.0; + double a2 = s2 ? atof(s2) : 0.0; + double a3 = s3 ? atof(s3) : 0.0; + + delete[] s_copy; + + if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1); + else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2); + else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1); + else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2); + else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3); + else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1); + + DIE("Unable to create Generator '%s'", str.c_str()); + + return NULL; +} diff --git a/Generator.h b/Generator.h new file mode 100644 index 0000000..66c4d2f --- /dev/null +++ b/Generator.h @@ -0,0 +1,216 @@ +// -*- c++ -*- + +// 1. implement "fixed" generator +// 2. implement discrete generator +// 3. implement combine generator? + +#ifndef GENERATOR_H +#define GENERATOR_H + +#define MAX(a,b) ((a) > (b) ? (a) : (b)) + +#include "config.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "log.h" +#include "util.h" + +// Generator syntax: +// +// \d+ == fixed +// n[ormal]:mean,sd +// e[xponential]:lambda +// p[areto]:scale,shape +// g[ev]:loc,scale,shape +// fb_value, fb_key, fb_rate + +class Generator { +public: + Generator() {} + // Generator(const Generator &g) = delete; + // virtual Generator& operator=(const Generator &g) = delete; + virtual ~Generator() {} + + virtual double generate(double U = -1.0) = 0; + virtual void set_lambda(double lambda) {DIE("set_lambda() not implemented");} +protected: + std::string type; +}; + +class Fixed : public Generator { +public: + Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); } + virtual double generate(double U = -1.0) { return value; } + virtual void set_lambda(double lambda) { + if (lambda > 0.0) value = 1.0 / lambda; + else value = 0.0; + } + +private: + double value; +}; + +class Uniform : public Generator { +public: + Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return scale * U; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) scale = 2.0 / lambda; + else scale = 0.0; + } + +private: + double scale; +}; + +class Normal : public Generator { +public: + Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) { + D("Normal(mean=%f, sd=%f)", mean, sd); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + double V = U; // drand48(); + double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V); + return mean + sd * N; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) mean = 1.0 / lambda; + else mean = 0.0; + } + +private: + double mean, sd; +}; + +class Exponential : public Generator { +public: + Exponential(double _lambda = 1.0) : lambda(_lambda) { + D("Exponential(lambda=%f)", lambda); + } + + virtual double generate(double U = -1.0) { + if (lambda <= 0.0) return 0.0; + if (U < 0.0) U = drand48(); + return -log(U) / lambda; + } + + virtual void set_lambda(double lambda) { this->lambda = lambda; } + +private: + double lambda; +}; + +class GPareto : public Generator { +public: + GPareto(double _scale = 1.0, double _shape = 1.0) : + scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GPareto(scale=%f, shape=%f)", scale, shape); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return scale * (pow(U, -shape) - 1) / shape; + } + + virtual void set_lambda(double lambda) { + if (lambda <= 0.0) scale = 0.0; + else scale = (1 - shape) / lambda; + } + +private: + double scale /* sigma */, shape /* k */; +}; + +class GEV : public Generator { +public: + GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) : + e(1.0), loc(_loc), scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape); + } + + virtual double generate(double U = -1.0) { + return loc + scale * (pow(e.generate(U), -shape) - 1) / shape; + } + +private: + Exponential e; + double loc /* mu */, scale /* sigma */, shape /* k */; +}; + +class Discrete : public Generator { +public: + ~Discrete() { delete def; } + Discrete(Generator* _def = NULL) : def(_def) { + if (def == NULL) def = new Fixed(0.0); + } + + virtual double generate(double U = -1.0) { + double Uc = U; + if (pv.size() > 0 && U < 0.0) U = drand48(); + + double sum = 0; + + for (auto p: pv) { + sum += p.first; + if (U < sum) return p.second; + } + + return def->generate(Uc); + // return 0.0; + } + + void add(double p, double v) { + pv.push_back(std::pair(p, v)); + } + +private: + Generator *def; + std::vector< std::pair > pv; +}; + +class KeyGenerator { +public: + KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {} + std::string generate(uint64_t ind) { + uint64_t h = fnv_64(ind); + double U = (double) h / ULLONG_MAX; + double G = g->generate(U); + int keylen = MAX(round(G), floor(log10(max)) + 1); + char key[256]; + snprintf(key, 256, "%0*" PRIu64, keylen, ind); + + // D("%d = %s", ind, key); + return std::string(key); + } +private: + Generator* g; + double max; +}; + +Generator* createGenerator(std::string str); +Generator* createFacebookKey(); +Generator* createFacebookValue(); +Generator* createFacebookIA(); + +#endif // GENERATOR_H diff --git a/HistogramSampler.h b/HistogramSampler.h new file mode 100644 index 0000000..f764029 --- /dev/null +++ b/HistogramSampler.h @@ -0,0 +1,92 @@ +/* -*- c++ -*- */ +#ifndef HISTOGRAMSAMPLER_H +#define HISTOGRAMSAMPLER_H + +#include + +#include +#include + +#include "Operation.h" + +// parameters: # of bins, range? size of bins? + +class HistogramSampler { +public: + std::vector bins; + int width; + + double overflow_sum; + + HistogramSampler() = delete; + HistogramSampler(int _bins, int _width) : overflow_sum(0.0) { + assert(_bins > 0 && _width > 0); + + bins.resize(_bins + 1, 0); + width = _width; + } + + void sample(const Operation &op) { + sample(op.time()); + } + + void sample(double s) { + assert(s >= 0); + size_t bin = s / width; + + if (bin >= bins.size()) { + bin = bins.size() - 1; + overflow_sum += s; + } + + bins[bin]++; + } + + double average() { + uint64_t count = total(); + double sum = 0.0; + + for (size_t i = 0; i < bins.size() - 1; i++) { + sum += bins[i] * (i*width + (i+1)*width) / 2; + } + + sum += overflow_sum; + + return sum / count; + } + + double get_nth(double nth) { + uint64_t count = total(); + uint64_t n = 0; + double target = count * nth/100; + + for (size_t i = 0; i < bins.size(); i++) { + n += bins[i]; + + if (n > target) { // The nth is inside bins[i]. + double left = target - (n - bins[i]); + return i*width + left / bins[i] * width; + } + } + + return bins.size() * width; + } + + uint64_t total() { + uint64_t sum = 0.0; + + for (auto i: bins) sum += i; + + return sum; + } + + void accumulate(const HistogramSampler &h) { + assert(width == h.width && bins.size() == h.bins.size()); + + for (size_t i = 0; i < bins.size(); i++) bins[i] += h.bins[i]; + + overflow_sum += h.overflow_sum; + } +}; + +#endif // HISTOGRAMSAMPLER_H diff --git a/Operation.h b/Operation.h new file mode 100644 index 0000000..b963c30 --- /dev/null +++ b/Operation.h @@ -0,0 +1,26 @@ +// -*- c++-mode -*- +#ifndef OPERATION_H +#define OPERATION_H + +#include + +using namespace std; + +class Operation { +public: + double start_time, end_time; + + enum type_enum { + GET, SET + }; + + type_enum type; + + string key; + // string value; + + double time() const { return (end_time - start_time) * 1000000; } +}; + + +#endif // OPERATION_H diff --git a/README.md b/README.md index a58aceb..f63a090 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ mutilate ======== -Mutilate: high-performance memcached load generator \ No newline at end of file +Mutilate is a memcached load generator designed for high request +rates, good tail-latency measurements, and realistic request stream +generation. diff --git a/SConstruct b/SConstruct new file mode 100644 index 0000000..1cfd93b --- /dev/null +++ b/SConstruct @@ -0,0 +1,49 @@ +#!/usr/bin/python +import os + +env = Environment(ENV = os.environ) + +env['HAVE_POSIX_BARRIER'] = True + +env.Append(CPPPATH = ['/usr/local/include']) +env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS') + +conf = env.Configure(config_h = "config.h") +conf.Define("__STDC_FORMAT_MACROS") +if not conf.CheckCXX(): + print "A compiler with C++11 support is required." + Exit(1) +print "Checking for gengetopt...", +if Execute("@which gengetopt &> /dev/null"): + print "not found (required)" + Exit(1) +else: print "found" +if not conf.CheckLibWithHeader("event", "event2/event.h", "C++"): + print "libevent required" + Exit(1) +if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"): + print "pthread required" + Exit(1) +conf.CheckLib("rt", "clock_gettime", language="C++") +conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++") +conf.CheckFunc('clock_gettime') +if conf.CheckFunc('pthread_barrier_init'): + conf.env['HAVE_POSIX_BARRIER'] = False + +env = conf.Finish() + +env.Append(CFLAGS = ' -O3 -Wall -g') +#env.Append(CPPFLAGS = ' -D_GNU_SOURCE -D__STDC_FORMAT_MACROS') +#env.Append(CPPFLAGS = ' -DUSE_ADAPTIVE_SAMPLER') + +env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE') + +src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc + Connection.cc Generator.cc""") + +if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER: + src += ['barrier.cc'] + +env.Program(target='mutilate', source=src) +env.Program(target='gtest', source=['TestGenerator.cc', 'log.cc', 'util.cc', + 'Generator.cc']) diff --git a/TestGenerator.cc b/TestGenerator.cc new file mode 100644 index 0000000..30696a9 --- /dev/null +++ b/TestGenerator.cc @@ -0,0 +1,86 @@ +#include "config.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "Generator.h" +#include "util.h" + +int main(int argc, char **argv) { + // double now = get_time(); + // uint64_t x = fnv_64_buf(&now, sizeof(now)); + + srand48(0xdeadbeef); + + /* + Generator *n = createGenerator("n:1,1"); // new Normal(1, 1); + Generator *e = createGenerator("e:1"); // new Exponential(1); + Generator *p = createGenerator("p:214.476,0.348238"); // new GPareto(214.476, 0.348238); + Generator *g = createGenerator("g:30.7984,8.20449,0.078688"); // new GEV(30.7984, 8.20449, 0.078688); + + printf("%f\n", n->generate()); + printf("%f\n", e->generate()); + printf("%f\n", p->generate()); + printf("%f\n", g->generate()); + + srand48(0); + + printf("\n\n"); + + Discrete *d = new Discrete(createGenerator("p:214.476,0.348238")); + // d->add(.5, -1.0); + //Generator *d = createGenerator("p:214.476,0.348238"); + + for (int i = 0; i < 20; i++) { + printf("d %d\n", (int) d->generate()); + } + + printf("\n\n"); + srand48(0); + + //Discrete *d2 = new Discrete(createGenerator("p:214.476,0.348238")); + // d->add(.5, -1.0); + Generator *d2 = createGenerator("p:214.476,0.348238"); + + for (int i = 0; i < 20; i++) { + printf("d %d\n", (int) d2->generate()); + } + + KeyGenerator kg(g); + */ + + Generator *p2 = createGenerator("p:214.476,0.348238"); + // for (int i = 0; i < 1000; i++) + // printf("%f\n", p2->generate()); + + p2->set_lambda(1000); + for (int i = 0; i < 1000; i++) + printf("%f\n", p2->generate()); + + // for (int i = 0; i < 10000; i++) + // printf("%s\n", kg.generate(i).c_str()); + + /* + for (uint64_t ind = 0; ind < 10000; ind++) { + // uint64_t ind = 0; + uint64_t h = fnv_64(ind); + double U = (double) h / ULLONG_MAX; + // double E = e->generate(U); // -log(U); + double G = g->generate(U); + int keylen = MAX(round(G), floor(log10(10000)) + 1); + + // printf("ind=%" PRIu64 "\n", ind); + // printf("h=%" PRIu64 "\n", h); + // printf("U=%f\n", U); + // printf("G=%f\n", G); + // printf("keylen=%d\n", keylen); + printf("%7" PRIu64 " %7d key=%0*" PRIu64 "\n", ind, keylen, keylen, ind); + } + */ +} diff --git a/barrier.cc b/barrier.cc new file mode 100644 index 0000000..1a59df9 --- /dev/null +++ b/barrier.cc @@ -0,0 +1,33 @@ +#include + +#include "barrier.h" + +int barrier_init(barrier_t *barrier,int needed) +{ + barrier->needed = needed; + barrier->called = 0; + pthread_mutex_init(&barrier->mutex,NULL); + pthread_cond_init(&barrier->cond,NULL); + return 0; +} + +int barrier_destroy(barrier_t *barrier) +{ + pthread_mutex_destroy(&barrier->mutex); + pthread_cond_destroy(&barrier->cond); + return 0; +} + +int barrier_wait(barrier_t *barrier) +{ + pthread_mutex_lock(&barrier->mutex); + barrier->called++; + if (barrier->called == barrier->needed) { + barrier->called = 0; + pthread_cond_broadcast(&barrier->cond); + } else { + pthread_cond_wait(&barrier->cond,&barrier->mutex); + } + pthread_mutex_unlock(&barrier->mutex); + return 0; +} diff --git a/barrier.h b/barrier.h new file mode 100644 index 0000000..17de3c8 --- /dev/null +++ b/barrier.h @@ -0,0 +1,25 @@ +#ifndef BARRIER_H +#define BARRIER_H + +#include "config.h" + +typedef struct { + int needed; + int called; + pthread_mutex_t mutex; + pthread_cond_t cond; +} barrier_t; + +int barrier_init(barrier_t *barrier,int needed); +int barrier_destroy(barrier_t *barrier); +int barrier_wait(barrier_t *barrier); + +#ifndef HAVE_PTHREAD_BARRIER_INIT +#define pthread_barrier_t barrier_t +#define pthread_barrier_attr_t barrier_attr_t +#define pthread_barrier_init(b,a,n) barrier_init(b,n) +#define pthread_barrier_destroy(b) barrier_destroy(b) +#define pthread_barrier_wait(b) barrier_wait(b) +#endif + +#endif // BARRIER_H diff --git a/cmdline.ggo b/cmdline.ggo new file mode 100644 index 0000000..a3e1b6b --- /dev/null +++ b/cmdline.ggo @@ -0,0 +1,79 @@ +package "mutilate3" +version "0.1" +usage "mutilate -s server[:port] [options]" +description "\"High-performance\" memcached benchmarking tool" + +args "-c cc --show-required -C --default-optional -l" + +option "verbose" v "Verbosity. Repeat for more verbose." multiple +option "quiet" - "Disable log messages." + +text "\nBasic options:" + +option "server" s "Memcached server hostname[:port]. \ +Repeat to specify multiple servers." string multiple +option "qps" q "Target aggregate QPS." int default="0" +option "time" t "Maximum time to run (seconds)." int default="5" + +option "keysize" K "Length of memcached keys (distribution)." + string default="30" +option "valuesize" V "Length of memcached values (distribution)." + string default="200" + +option "records" r "Number of memcached records to use. \ +If multiple memcached servers are given, this number is divided \ +by the number of servers." int default="10000" + +option "update" u "Ratio of set:get commands." float default="0.0" + +text "\nAdvanced options:" + +option "threads" T "Number of threads to spawn." int default="1" +option "connections" c "Connections to establish per server." int default="1" +option "depth" d "Maximum depth to pipeline requests." int default="1" +option "roundrobin" R "Assign threads to servers in round-robin fashion. \ +By default, each thread connects to every server." + +option "iadist" i "Inter-arrival distribution (distribution)." + string default="exponential" + +option "noload" - "Skip database loading." +option "loadonly" - "Load database and then exit." + +option "blocking" B "Use blocking epoll(). May increase latency." +option "no_nodelay" D "Don't use TCP_NODELAY." + +option "warmup" w "Warmup time before starting measurement." int +option "wait" W "Time to wait after startup to start measurement." int + +option "search" S "Search for the QPS where N-order statistic < Xus. \ +(i.e. --search 95:1000 means find the QPS where 95% of requests are \ +faster than 1000us)." string typestr="N:X" +option "scan" - "Scan latency across QPS rates from min to max." + string typestr="min:max:step" + +text "\nAgent-mode options:" +option "agentmode" A "Run client in agent mode." +option "agent" a "Enlist remote agent." string typestr="host" multiple +option "lambda_mul" l "Lambda multiplier. Increases share of QPS for this client." int default="1" + +text " +Some options take a 'distribution' as an argument. +Distributions are specified by [:[,...]]. +Parameters are not required. The following distributions are supported: + + [fixed:] Always generates . + uniform: Uniform distribution between 0 and . + normal:, Normal distribution. + exponential: Exponential distribution. + pareto:, Generalized Pareto distribution. + gev:,, Generalized Extreme Value distribution. + + fb_key ETC key-size distribution from [1]. + fb_value ETC value-size distribution from [1]. + fb_ia ETC inter-arrival distribution from [1]. + +[1] Berk Atikoglu et al., Workload Analysis of a Large-Scale Key-Value Store, + SIGMETRICS 2012 +" + diff --git a/distributions.cc b/distributions.cc new file mode 100644 index 0000000..ce939e7 --- /dev/null +++ b/distributions.cc @@ -0,0 +1,34 @@ +#include +#include +#include + +#include "distributions.h" +#include "log.h" + +const char* distributions[] = + { "uniform", "exponential", "zipfian", "latest", NULL }; + +distribution_t get_distribution(const char *name) { + for (int i = 0; distributions[i] != NULL; i++) + if (!strcmp(distributions[i], name)) + return (distribution_t) i; + return (distribution_t) -1; +} + +double generate_normal(double mean, double sd) { + double U = drand48(); + double V = drand48(); + double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V); + return mean + sd * N; +} + +double generate_poisson(double lambda) { + if (lambda <= 0.0) return 0; + double U = drand48(); + return -log(U)/lambda; +} + +double generate_uniform(double lambda) { + if (lambda <= 0.0) return 0; + return 1.0 / lambda; +} diff --git a/distributions.h b/distributions.h new file mode 100644 index 0000000..de2a2de --- /dev/null +++ b/distributions.h @@ -0,0 +1,14 @@ +#ifndef DISTRIBUTIONS_H +#define DISTRIBUTIONS_H + +// If you change this, make sure to update distributions.cc. +enum distribution_t { UNIFORM, EXPONENTIAL, ZIPFIAN, LATEST }; +extern const char* distributions[]; + +double generate_normal(double mean, double sd); +double generate_poisson(double lambda); +double generate_uniform(double lambda); + +distribution_t get_distribution(const char *name); + +#endif // DISTRIBUTIONS_H diff --git a/log.cc b/log.cc new file mode 100644 index 0000000..e595153 --- /dev/null +++ b/log.cc @@ -0,0 +1,19 @@ +#include +#include + +#include "log.h" + +log_level_t log_level = INFO; + +void log_file_line(log_level_t level, const char *file, int line, + const char *format, ...) { + va_list args; + char new_format[512]; + + snprintf(new_format, sizeof(new_format), "%s(%d): %s\n", file, line, format); + + va_start(args, format); + if (level >= log_level) + vfprintf(stderr, new_format, args); + va_end(args); +} diff --git a/log.h b/log.h new file mode 100644 index 0000000..8566681 --- /dev/null +++ b/log.h @@ -0,0 +1,25 @@ +#ifndef LOG_H +#define LOG_H + +enum log_level_t { DEBUG, VERBOSE, INFO, WARN, QUIET }; +extern log_level_t log_level; + +void log_file_line(log_level_t level, const char *file, int line, + const char* format, ...); +#define L(level, args...) log_file_line(level, __FILE__, __LINE__, args) + +#define D(args...) L(DEBUG, args) +#define V(args...) L(VERBOSE, args) +#define I(args...) L(INFO, args) +#define W(args...) L(WARN, args) + +#define DIE(args...) do { W(args); exit(-1); } while (0) + +#define NOLOG(x) \ + do { log_level_t old = log_level; \ + log_level = QUIET; \ + (x); \ + log_level = old; \ + } while (0) + +#endif // LOG_H diff --git a/mutilate.cc b/mutilate.cc new file mode 100644 index 0000000..98fe0cf --- /dev/null +++ b/mutilate.cc @@ -0,0 +1,910 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "config.h" + +#ifdef HAVE_LIBZMQ +#include +#endif + +#include "AdaptiveSampler.h" +#include "AgentStats.h" +#ifndef HAVE_PTHREAD_BARRIER_INIT +#include "barrier.h" +#endif +#include "cmdline.h" +#include "Connection.h" +#include "ConnectionOptions.h" +#include "log.h" +#include "mutilate.h" +#include "util.h" + +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + +using namespace std; + +gengetopt_args_info args; +char random_char[2 * 1024 * 1024]; // Buffer used to generate random values. + +#ifdef HAVE_LIBZMQ +vector agent_sockets; +zmq::context_t context(1); +#endif + +struct thread_data { + const vector *servers; + options_t *options; + bool master; +#ifdef HAVE_LIBZMQ + zmq::socket_t *socket; +#endif +}; + +// struct evdns_base *evdns; + +pthread_barrier_t barrier; + +double boot_time; + +void init_random_stuff(); + +void go(const vector &servers, options_t &options, + ConnectionStats &stats +#ifdef HAVE_LIBZMQ +, zmq::socket_t* socket = NULL +#endif +); + +void do_mutilate(const vector &servers, options_t &options, + ConnectionStats &stats, bool master = true +#ifdef HAVE_LIBZMQ +, zmq::socket_t* socket = NULL +#endif +); +void args_to_options(options_t* options); +void* thread_main(void *arg); + +#ifdef HAVE_LIBZMQ +static std::string s_recv (zmq::socket_t &socket) { + zmq::message_t message; + socket.recv(&message); + + return std::string(static_cast(message.data()), message.size()); +} + +// Convert string to 0MQ string and send to socket +static bool s_send (zmq::socket_t &socket, const std::string &string) { + zmq::message_t message(string.size()); + memcpy(message.data(), string.data(), string.size()); + + return socket.send(message); +} + +void agent() { + zmq::context_t context(1); + + zmq::socket_t socket(context, ZMQ_REP); + socket.bind("tcp://*:5555"); + + while (true) { + zmq::message_t request; + + socket.recv(&request); + + zmq::message_t num(sizeof(int)); + *((int *) num.data()) = args.threads_arg * args.lambda_mul_arg; + socket.send(num); + + options_t options; + memcpy(&options, request.data(), sizeof(options)); + + vector servers; + + for (int i = 0; i < options.server_given; i++) { + servers.push_back(s_recv(socket)); + s_send(socket, "ACK"); + } + + for (auto i: servers) { + V("Got server = %s", i.c_str()); + } + + options.threads = args.threads_arg; + + socket.recv(&request); + options.lambda_denom = *((int *) request.data()); + s_send(socket, "THANKS"); + + // V("AGENT SLEEPS"); sleep(1); + options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg; + + // if (options.threads > 1) + pthread_barrier_init(&barrier, NULL, options.threads); + + ConnectionStats stats; + + go(servers, options, stats, &socket); + + AgentStats as; + + as.rx_bytes = stats.rx_bytes; + as.tx_bytes = stats.tx_bytes; + as.gets = stats.gets; + as.sets = stats.sets; + as.get_misses = stats.get_misses; + as.start = stats.start; + as.stop = stats.stop; + + string req = s_recv(socket); + // V("req = %s", req.c_str()); + request.rebuild(sizeof(as)); + memcpy(request.data(), &as, sizeof(as)); + socket.send(request); + } +} + +void prep_agent(const vector& servers, options_t& options) { + int sum = options.lambda_denom; + + for (auto s: agent_sockets) { + zmq::message_t message(sizeof(options_t)); + + memcpy((void *) message.data(), &options, sizeof(options_t)); + s->send(message); + + zmq::message_t rep; + s->recv(&rep); + unsigned int num = *((int *) rep.data()); + + sum += options.connections * (options.roundrobin ? + (servers.size() > num ? servers.size() : num) : + (servers.size() * num)); + + for (auto i: servers) { + s_send(*s, i); + string rep = s_recv(*s); + // V("Reply: %s", rep.c_str()); + } + } + + options.lambda_denom = sum; + options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg; + + V("lambda_denom = %d", sum); + + for (auto s: agent_sockets) { + zmq::message_t message(sizeof(sum)); + *((int *) message.data()) = sum; + s->send(message); + string rep = s_recv(*s); + } + + V("MASTER SLEEPS"); sleep_time(1.5); +} + +void finish_agent(ConnectionStats &stats) { + for (auto s: agent_sockets) { + s_send(*s, "stats"); + + AgentStats as; + zmq::message_t message; + + s->recv(&message); + memcpy(&as, message.data(), sizeof(as)); + stats.accumulate(as); + } +} + +void sync_agent(zmq::socket_t* socket) { + // V("agent: synchronizing"); + + if (args.agent_given) { + for (auto s: agent_sockets) { + s_send(*s, "sync1"); + string rep = s_recv(*s); + } + } else if (args.agentmode_given) { + string req = s_recv(*socket); + s_send(*socket, "sync"); + } + + // V("agent: synchronized"); +} +#endif + +string name_to_ipaddr(string host) { + char *s_copy = new char[host.length() + 1]; + strcpy(s_copy, host.c_str()); + + char *saveptr = NULL; // For reentrant strtok(). + + char *h_ptr = strtok_r(s_copy, ":", &saveptr); + char *p_ptr = strtok_r(NULL, ":", &saveptr); + + char ipaddr[16]; + + if (h_ptr == NULL) + DIE("strtok(.., \":\") failed to parse %s", host.c_str()); + + string hostname = h_ptr; + string port = "11211"; + if (p_ptr) port = p_ptr; + + struct evutil_addrinfo hints; + struct evutil_addrinfo *answer = NULL; + int err; + + /* Build the hints to tell getaddrinfo how to act. */ + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; /* v4 or v6 is fine. */ + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; /* We want a TCP socket */ + /* Only return addresses we can use. */ + hints.ai_flags = EVUTIL_AI_ADDRCONFIG; + + /* Look up the hostname. */ + err = evutil_getaddrinfo(h_ptr, NULL, &hints, &answer); + if (err < 0) { + DIE("Error while resolving '%s': %s", + host.c_str(), evutil_gai_strerror(err)); + } + + if (answer == NULL) DIE("No DNS answer."); + + void *ptr = NULL; + switch (answer->ai_family) { + case AF_INET: + ptr = &((struct sockaddr_in *) answer->ai_addr)->sin_addr; + break; + case AF_INET6: + ptr = &((struct sockaddr_in6 *) answer->ai_addr)->sin6_addr; + break; + } + + inet_ntop (answer->ai_family, ptr, ipaddr, 16); + + D("Resolved %s to %s", h_ptr, (string(ipaddr) + ":" + string(port)).c_str()); + + delete[] s_copy; + + return string(ipaddr) + ":" + string(port); +} + +int main(int argc, char **argv) { + if (cmdline_parser(argc, argv, &args) != 0) exit(-1); + + for (unsigned int i = 0; i < args.verbose_given; i++) + log_level = (log_level_t) ((int) log_level - 1); + + if (args.quiet_given) log_level = QUIET; + + if (args.depth_arg < 1) DIE("--depth must be >= 1"); + // if (args.valuesize_arg < 1 || args.valuesize_arg > 1024*1024) + // DIE("--valuesize must be >= 1 and <= 1024*1024"); + if (args.qps_arg < 0) DIE("--qps must be >= 0"); + if (args.update_arg < 0.0 || args.update_arg > 1.0) + DIE("--update must be >= 0.0 and <= 1.0"); + if (args.time_arg < 1) DIE("--time must be >= 1"); + // if (args.keysize_arg < MINIMUM_KEY_LENGTH) + // DIE("--keysize must be >= %d", MINIMUM_KEY_LENGTH); + if (args.connections_arg < 1 || args.connections_arg > MAXIMUM_CONNECTIONS) + DIE("--connections must be between [1,%d]", MAXIMUM_CONNECTIONS); + // if (get_distribution(args.iadist_arg) == -1) + // DIE("--iadist invalid: %s", args.iadist_arg); + if (!args.server_given && !args.agentmode_given) + DIE("--server or --agentmode must be specified."); + + // TODO: Discover peers, share arguments. + + init_random_stuff(); + boot_time = get_time(); + setvbuf(stdout, NULL, _IONBF, 0); + + // struct event_base *base; + + // if ((base = event_base_new()) == NULL) DIE("event_base_new() fail"); + // evthread_use_pthreads(); + + // if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns"); + +#ifdef HAVE_LIBZMQ + if (args.agentmode_given) { + agent(); + return 0; + } else if (args.agent_given) { + for (unsigned int i = 0; i < args.agent_given; i++) { + zmq::socket_t *s = new zmq::socket_t(context, ZMQ_REQ); + string host = string("tcp://") + string(args.agent_arg[i]) + string(":5555"); + s->connect(host.c_str()); + agent_sockets.push_back(s); + } + } +#endif + + options_t options; + args_to_options(&options); + + pthread_barrier_init(&barrier, NULL, options.threads); + + vector servers; + for (unsigned int s = 0; s < args.server_given; s++) + servers.push_back(name_to_ipaddr(string(args.server_arg[s]))); + + ConnectionStats stats; + + double peak_qps = 0.0; + + if (args.search_given) { + char *n_ptr = strtok(args.search_arg, ":"); + char *x_ptr = strtok(NULL, ":"); + + if (n_ptr == NULL || x_ptr == NULL) DIE("Invalid --search argument"); + + int n = atoi(n_ptr); + int x = atoi(x_ptr); + + I("Search-mode. Find QPS @ %dus %dth percentile.", x, n); + + int high_qps = 2000000; + int low_qps = 5000; + double nth; + int cur_qps; + + go(servers, options, stats); + + nth = stats.get_nth(n); + peak_qps = stats.get_qps(); + high_qps = stats.get_qps(); + cur_qps = stats.get_qps(); + + I("peak qps = %d", high_qps); + + if (nth > x) { + + while ((high_qps > low_qps * 1.02) && cur_qps > 10000) { + cur_qps = (high_qps + low_qps) / 2; + + args_to_options(&options); + + options.qps = cur_qps; + options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg; + + stats = ConnectionStats(); + + go(servers, options, stats); + + nth = stats.get_nth(n); + + I("cur_qps = %d, get_qps = %f, nth = %f", cur_qps, stats.get_qps(), nth); + + if (nth > x /*|| cur_qps > stats.get_qps() * 1.05*/) high_qps = cur_qps; + else low_qps = cur_qps; + } + + while (nth > x && cur_qps > 10000) { // > low_qps) { // 10000) { + cur_qps = cur_qps * 98 / 100; + + args_to_options(&options); + + options.qps = cur_qps; + options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg; + + stats = ConnectionStats(); + + go(servers, options, stats); + + nth = stats.get_nth(n); + + I("cur_qps = %d, get_qps = %f, nth = %f", cur_qps, stats.get_qps(), nth); + } + + } + } else if (args.scan_given) { + char *min_ptr = strtok(args.scan_arg, ":"); + char *max_ptr = strtok(NULL, ":"); + char *step_ptr = strtok(NULL, ":"); + + if (min_ptr == NULL || min_ptr == NULL || step_ptr == NULL) + DIE("Invalid --scan argument"); + + int min = atoi(min_ptr); + int max = atoi(max_ptr); + int step = atoi(step_ptr); + + printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s %8s %8s\n", + "#type", "avg", "min", "1st", "5th", "10th", + "90th", "95th", "99th", "QPS", "target"); + + for (int q = min; q <= max; q += step) { + args_to_options(&options); + + options.qps = q; + options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg; + // options.lambda = (double) options.qps / options.connections / + // args.server_given / + // (args.threads_arg < 1 ? 1 : args.threads_arg); + + stats = ConnectionStats(); + + go(servers, options, stats); + + stats.print_stats("read", stats.get_sampler, false); + printf(" %8.1f", stats.get_qps()); + printf(" %8d\n", q); + } + } else { + go(servers, options, stats); + } + + if (!args.scan_given && !args.loadonly_given) { + stats.print_header(); + stats.print_stats("read", stats.get_sampler); + stats.print_stats("update", stats.set_sampler); + stats.print_stats("op_q", stats.op_sampler); + + int total = stats.gets + stats.sets; + + printf("\nTotal QPS = %.1f (%d / %.1fs)\n\n", + total / (stats.stop - stats.start), + total, stats.stop - stats.start); + + if (args.search_given && peak_qps > 0.0) + printf("Peak QPS = %.1f\n\n", peak_qps); + + printf("Misses = %" PRIu64 " (%.1f%%)\n\n", stats.get_misses, + (double) stats.get_misses/stats.gets*100); + + printf("RX %10" PRIu64 " bytes : %6.1f MB/s\n", + stats.rx_bytes, + (double) stats.rx_bytes / 1024 / 1024 / (stats.stop - stats.start)); + printf("TX %10" PRIu64 " bytes : %6.1f MB/s\n", + stats.tx_bytes, + (double) stats.tx_bytes / 1024 / 1024 / (stats.stop - stats.start)); + } + + // if (args.threads_arg > 1) + pthread_barrier_destroy(&barrier); + +#ifdef HAVE_LIBZMQ + if (args.agent_given) { + for (auto i: agent_sockets) delete i; + } +#endif + + // evdns_base_free(evdns, 0); + // event_base_free(base); + + cmdline_parser_free(&args); +} + +void go(const vector& servers, options_t& options, + ConnectionStats &stats +#ifdef HAVE_LIBZMQ +, zmq::socket_t* socket +#endif +) { +#ifdef HAVE_LIBZMQ + if (args.agent_given > 0) { + prep_agent(servers, options); + } +#endif + + if (options.threads > 1) { + pthread_t pt[options.threads]; + struct thread_data td[options.threads]; + vector ts[options.threads]; + + for (int t = 0; t < options.threads; t++) { + td[t].options = &options; +#ifdef HAVE_LIBZMQ + td[t].socket = socket; +#endif + if (t == 0) td[t].master = true; + else td[t].master = false; + + if (options.roundrobin) { + for (unsigned int i = (t % servers.size()); + i < servers.size(); i += options.threads) + ts[t].push_back(servers[i % servers.size()]); + + td[t].servers = &ts[t]; + } else { + td[t].servers = &servers; + } + + if (pthread_create(&pt[t], NULL, thread_main, &td[t])) + DIE("pthread_create() failed"); + } + + for (int t = 0; t < options.threads; t++) { + ConnectionStats *cs; + if (pthread_join(pt[t], (void**) &cs)) DIE("pthread_join() failed"); + stats.accumulate(*cs); + delete cs; + } + } else if (options.threads == 1) { + do_mutilate(servers, options, stats, true +#ifdef HAVE_LIBZMQ +, socket +#endif +); + } else { +#ifdef HAVE_LIBZMQ + if (args.agent_given) { + sync_agent(socket); + sync_agent(socket); + } +#endif + } + +#ifdef HAVE_LIBZMQ + if (args.agent_given > 0) { + finish_agent(stats); + } +#endif +} + +void* thread_main(void *arg) { + struct thread_data *td = (struct thread_data *) arg; + + ConnectionStats *cs = new ConnectionStats(); + + do_mutilate(*td->servers, *td->options, *cs, td->master +#ifdef HAVE_LIBZMQ +, td->socket +#endif +); + + return cs; +} + +void do_mutilate(const vector& servers, options_t& options, + ConnectionStats& stats, bool master +#ifdef HAVE_LIBZMQ +, zmq::socket_t* socket +#endif +) { + int loop_flag = + (options.blocking || args.blocking_given) ? EVLOOP_ONCE : EVLOOP_NONBLOCK; + + char *saveptr = NULL; // For reentrant strtok(). + + struct event_base *base; + struct evdns_base *evdns; + + if ((base = event_base_new()) == NULL) DIE("event_base_new() fail"); + // evthread_use_pthreads(); + + if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns"); + + event_base_priority_init(base, 2); + + // FIXME: May want to move this to after all connections established. + double start = get_time(); + double now = start; + + vector connections; + vector server_lead; + + for (auto s: servers) { + // Split args.server_arg[s] into host:port using strtok(). + char *s_copy = new char[s.length() + 1]; + strcpy(s_copy, s.c_str()); + + char *h_ptr = strtok_r(s_copy, ":", &saveptr); + char *p_ptr = strtok_r(NULL, ":", &saveptr); + + if (h_ptr == NULL) DIE("strtok(.., \":\") failed to parse %s", s.c_str()); + + string hostname = h_ptr; + string port = "11211"; + if (p_ptr) port = p_ptr; + + delete[] s_copy; + + for (int c = 0; c < options.connections; c++) { + Connection* conn = new Connection(base, evdns, hostname, port, options, + args.agentmode_given ? false : + true); + connections.push_back(conn); + if (c == 0) server_lead.push_back(conn); + } + } + + // Wait for all Connections to become IDLE. + while (1) { + // FIXME: If all connections become ready before event_base_loop + // is called, this will deadlock. + event_base_loop(base, EVLOOP_ONCE); + + bool restart = false; + for (Connection *conn: connections) + if (conn->read_state != Connection::IDLE) + restart = true; + + if (restart) continue; + else break; + } + + // Load database on lead connection for each server. + if (!options.noload) { + V("Loading database."); + + for (auto c: server_lead) c->start_loading(); + + // Wait for all Connections to become IDLE. + while (1) { + // FIXME: If all connections become ready before event_base_loop + // is called, this will deadlock. + event_base_loop(base, EVLOOP_ONCE); + + bool restart = false; + for (Connection *conn: connections) + if (conn->read_state != Connection::IDLE) + restart = true; + + if (restart) continue; + else break; + } + } + + if (options.loadonly) { + evdns_base_free(evdns, 0); + event_base_free(base); + return; + } + + // FIXME: Remove. Not needed, testing only. + // // FIXME: Synchronize start_time here across threads/nodes. + // pthread_barrier_wait(&barrier); + + // Warmup connection. + if (options.warmup > 0) { + if (master) V("Warmup start."); + +#ifdef HAVE_LIBZMQ + if (args.agent_given || args.agentmode_given) { + if (master) V("Synchronizing."); + if (master) sync_agent(socket); + + pthread_barrier_wait(&barrier); + + if (master) sync_agent(socket); + + pthread_barrier_wait(&barrier); + if (master) V("Synchronized."); + } +#endif + + int old_time = options.time; + // options.time = 1; + + start = get_time(); + for (Connection *conn: connections) { + conn->start_time = start; + conn->options.time = options.warmup; + conn->drive_write_machine(); // Kick the Connection into motion. + } + + while (1) { + event_base_loop(base, loop_flag); + + //#ifdef USE_CLOCK_GETTIME + // now = get_time(); + //#else + struct timeval now_tv; + event_base_gettimeofday_cached(base, &now_tv); + now = tv_to_double(&now_tv); + //#endif + + bool restart = false; + for (Connection *conn: connections) + if (!conn->check_exit_condition(now)) + restart = true; + + if (restart) continue; + else break; + } + + bool restart = false; + for (Connection *conn: connections) + if (conn->read_state != Connection::IDLE) + restart = true; + + if (restart) { + + // Wait for all Connections to become IDLE. + while (1) { + // FIXME: If there were to use EVLOOP_ONCE and all connections + // become ready before event_base_loop is called, this will + // deadlock. We should check for IDLE before calling + // event_base_loop. + event_base_loop(base, EVLOOP_ONCE); // EVLOOP_NONBLOCK); + + bool restart = false; + for (Connection *conn: connections) + if (conn->read_state != Connection::IDLE) + restart = true; + + if (restart) continue; + else break; + } + } + + // options.time = old_time; + for (Connection *conn: connections) { + conn->reset(); + // conn->stats = ConnectionStats(); + conn->options.time = old_time; + } + + if (master) V("Warmup stop."); + } + + + // FIXME: Synchronize start_time here across threads/nodes. + pthread_barrier_wait(&barrier); + + if (master && args.wait_given) { + if (get_time() < boot_time + args.wait_arg) { + double t = (boot_time + args.wait_arg)-get_time(); + V("Sleeping %.1fs for -W.", t); + sleep_time(t); + } + } + +#ifdef HAVE_LIBZMQ + if (args.agent_given || args.agentmode_given) { + if (master) V("Synchronizing."); + if (master) sync_agent(socket); + + pthread_barrier_wait(&barrier); + + if (master) sync_agent(socket); + + pthread_barrier_wait(&barrier); + if (master) V("Synchronized."); + } +#endif + + start = get_time(); + for (Connection *conn: connections) { + conn->start_time = start; + conn->drive_write_machine(); // Kick the Connection into motion. + } + + // V("Start = %f", start); + + // Main event loop. + while (1) { + event_base_loop(base, loop_flag); + + //#if USE_CLOCK_GETTIME + // now = get_time(); + //#else + struct timeval now_tv; + event_base_gettimeofday_cached(base, &now_tv); + now = tv_to_double(&now_tv); + //#endif + + bool restart = false; + for (Connection *conn: connections) + if (!conn->check_exit_condition(now)) + restart = true; + + if (restart) continue; + else break; + } + + // Tear-down and accumulate stats. + for (Connection *conn: connections) { + stats.accumulate(conn->stats); + delete conn; + } + + stats.start = start; + stats.stop = now; + + evdns_base_free(evdns, 0); + event_base_free(base); +} + +void args_to_options(options_t* options) { + // bzero(options, sizeof(options_t)); + options->connections = args.connections_arg; + options->blocking = args.blocking_given; + options->qps = args.qps_arg; + options->threads = args.threads_arg; + options->server_given = args.server_given; + options->roundrobin = args.roundrobin_given; + + int connections = options->connections; + if (options->roundrobin) { + connections *= (options->server_given > options->threads ? + options->server_given : options->threads); + } else { + connections *= options->server_given * options->threads; + } + + // if (args.agent_given) connections *= (1 + args.agent_given); + + options->lambda_denom = connections > 1 ? connections : 1; + if (args.lambda_mul_arg > 1) options->lambda_denom *= args.lambda_mul_arg; + + if (options->threads < 1) options->lambda_denom = 0; + + options->lambda = (double) options->qps / (double) options->lambda_denom * args.lambda_mul_arg; + + // V("%d %d %d %f", options->qps, options->connections, + // connections, options->lambda); + + // if (args.no_record_scale_given) + // options->records = args.records_arg; + // else + options->records = args.records_arg / options->server_given; + + D("options->records = %d", options->records); + + if (!options->records) options->records = 1; + strcpy(options->keysize, args.keysize_arg); + // options->keysize = args.keysize_arg; + strcpy(options->valuesize, args.valuesize_arg); + // options->valuesize = args.valuesize_arg; + options->update = args.update_arg; + options->time = args.time_arg; + options->loadonly = args.loadonly_given; + options->depth = args.depth_arg; + options->no_nodelay = args.no_nodelay_given; + options->noload = args.noload_given; + options->iadist = get_distribution(args.iadist_arg); + strcpy(options->ia, args.iadist_arg); + options->warmup = args.warmup_given ? args.warmup_arg : 0; + options->oob_thread = false; +} + +void init_random_stuff() { + static char lorem[] = + R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas +turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus +suscipit nisl ut dui consectetur ultrices tincidunt eros +aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum +ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia +Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus +molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis +parturient montes, nascetur ridiculus mus. Ut cursus viverra +sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis +et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non +risus semper semper. Morbi molestie rhoncus mi, in egestas dui +facilisis et.)"; + + size_t cursor = 0; + + while (cursor < sizeof(random_char)) { + size_t max = sizeof(lorem); + if (sizeof(random_char) - cursor < max) + max = sizeof(random_char) - cursor; + + memcpy(&random_char[cursor], lorem, max); + cursor += max; + } +} + diff --git a/mutilate.h b/mutilate.h new file mode 100644 index 0000000..324262e --- /dev/null +++ b/mutilate.h @@ -0,0 +1,17 @@ +#ifndef MUTILATE_H +#define MUTILATE_H + +#include "cmdline.h" + +#define USE_CACHED_TIME 0 +#define MINIMUM_KEY_LENGTH 2 +#define MAXIMUM_CONNECTIONS 512 + +#define MAX_SAMPLES 100000 + +#define LOADER_CHUNK 1024 + +extern char random_char[]; +extern gengetopt_args_info args; + +#endif // MUTILATE_H diff --git a/util.cc b/util.cc new file mode 100644 index 0000000..714d415 --- /dev/null +++ b/util.cc @@ -0,0 +1,31 @@ +#include +#include +#include +#include + +#include "mutilate.h" +#include "util.h" + +void sleep_time(double duration) { + if (duration > 0) usleep((useconds_t) (duration * 1000000)); +} + +#define FNV_64_PRIME (0x100000001b3ULL) +#define FNV1_64_INIT (0xcbf29ce484222325ULL) +uint64_t fnv_64_buf(const void* buf, size_t len) { + uint64_t hval = FNV1_64_INIT; + + unsigned char *bp = (unsigned char *)buf; /* start of buffer */ + unsigned char *be = bp + len; /* beyond end of buffer */ + + while (bp < be) { + hval ^= (uint64_t)*bp++; + hval *= FNV_64_PRIME; + } + + return hval; +} + +void generate_key(int n, int length, char *buf) { + snprintf(buf, length + 1, "%0*d", length, n); +} diff --git a/util.h b/util.h new file mode 100644 index 0000000..6b09e77 --- /dev/null +++ b/util.h @@ -0,0 +1,52 @@ +#ifndef UTIL_H +#define UTIL_H + +#include +#include + +inline double tv_to_double(struct timeval *tv) { + return tv->tv_sec + (double) tv->tv_usec / 1000000; +} + +inline void double_to_tv(double val, struct timeval *tv) { + long long secs = (long long) val; + long long usecs = (long long) ((val - secs) * 1000000); + + tv->tv_sec = secs; + tv->tv_usec = usecs; +} + +inline double get_time_accurate() { +#if USE_CLOCK_GETTIME + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + // clock_gettime(CLOCK_REALTIME, &ts); + return ts.tv_sec + (double) ts.tv_nsec / 1000000000; +#else + struct timeval tv; + gettimeofday(&tv, NULL); + return tv_to_double(&tv); +#endif +} + +inline double get_time() { + //#if USE_CLOCK_GETTIME + // struct timespec ts; + // clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + // // clock_gettime(CLOCK_REALTIME, &ts); + // return ts.tv_sec + (double) ts.tv_nsec / 1000000000; + //#else + struct timeval tv; + gettimeofday(&tv, NULL); + return tv_to_double(&tv); + //#endif +} + +void sleep_time(double duration); + +uint64_t fnv_64_buf(const void* buf, size_t len); +inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); } + +void generate_key(int n, int length, char *buf); + +#endif // UTIL_H