Initial check-in.

This is extracted from another git repo.  This is the first release, and
the prior commit history is not terribly interesting, so I'm not going
to bother using filter-branch to try to cleanly isolate the history for
this tool.

Cheers.
This commit is contained in:
Jacob Leverich 2012-08-23 14:30:32 -07:00
parent bcad3cfbc3
commit 386320d266
26 changed files with 2755 additions and 1 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
*~
*.[oda]
.scon*
config.log
gmon.out
mutilate
cmdline.cc
cmdline.h
gtest
config.h

98
AdaptiveSampler.h Normal file
View File

@ -0,0 +1,98 @@
/* -*- c++ -*- */
#ifndef ADAPTIVESAMPLER_H
#define ADAPTIVESAMPLER_H
// Simple exponential-backoff adaptive time series sampler. Will
// record at most max_samples samples out of however many samples are
// thrown at it. Makes a vague effort to do this evenly over the
// samples given to it. The sampling is time invariant (i.e. if you
// start inserting samples at a slower rate, they will be
// under-represented).
#include <assert.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <vector>
#include "log.h"
template <class T> class AdaptiveSampler {
public:
std::vector<T> samples;
unsigned int sample_rate;
unsigned int max_samples;
unsigned int total_samples;
AdaptiveSampler() = delete;
AdaptiveSampler(int max) :
sample_rate(1), max_samples(max), total_samples(0) {
}
void sample(T s) {
total_samples++;
if (drand48() < (1/(double) sample_rate))
samples.push_back(s);
// Throw out half of the samples, double sample_rate.
if (samples.size() >= max_samples) {
sample_rate *= 2;
std::vector<T> half_samples;
for (unsigned int i = 0; i < samples.size(); i++) {
if (drand48() > .5) half_samples.push_back(samples[i]);
}
samples = half_samples;
}
}
void save_samples(const char* type, const char* filename) {
FILE *file;
if ((file = fopen(filename, "a")) == NULL) {
W("fopen() failed: %s", strerror(errno));
return;
}
for (size_t i = 0; i < samples.size(); i++) {
fprintf(file, "%s %" PRIu64 " %f\n", type, i, samples[i]);
}
}
double average() {
double result = 0.0;
size_t length = samples.size();
for (size_t i = 0; i < length; i++) result += samples[i];
return result/length;
}
void print_header() {
printf("#%-6s %6s %8s %8s %8s %8s %8s %8s\n", "type", "size",
"min", "max", "avg", "90th", "95th", "99th");
}
void print_stats(const char *type, const char *size) {
std::vector<double> samples_copy = samples;
size_t l = samples_copy.size();
if (l == 0) {
printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
return;
}
sort(samples_copy.begin(), samples_copy.end());
printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
samples_copy[0], samples_copy[l-1], average(),
samples_copy[(l*90)/100], samples_copy[(l*95)/100],
samples_copy[(l*99)/100]);
}
};
#endif // ADAPTIVESAMPLER_H

13
AgentStats.h Normal file
View File

@ -0,0 +1,13 @@
/* -*- c++ -*- */
#ifndef AGENTSTATS_H
#define AGENTSTATS_H
class AgentStats {
public:
uint64_t rx_bytes, tx_bytes;
uint64_t gets, sets, get_misses;
double start, stop;
};
#endif // AGENTSTATS_H

24
COPYING Normal file
View File

@ -0,0 +1,24 @@
Copyright (c) 2012, Jacob Leverich, Stanford University
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Stanford University nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

496
Connection.cc Normal file
View File

@ -0,0 +1,496 @@
#include <netinet/tcp.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/dns.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/util.h>
#include "config.h"
#include "Connection.h"
#include "distributions.h"
#include "Generator.h"
#include "mutilate.h"
#include "util.h"
Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t _options,
bool sampling) :
hostname(_hostname), port(_port), start_time(0),
stats(sampling), options(_options), base(_base), evdns(_evdns)
{
valuesize = createGenerator(options.valuesize);
keysize = createGenerator(options.keysize);
keygen = new KeyGenerator(keysize, options.records);
if (options.lambda <= 0) {
iagen = createGenerator("0");
} else {
D("iagen = createGenerator(%s)", options.ia);
iagen = createGenerator(options.ia);
iagen->set_lambda(options.lambda);
}
read_state = INIT_READ;
write_state = INIT_WRITE;
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);
if (bufferevent_socket_connect_hostname(bev, evdns, AF_UNSPEC,
hostname.c_str(),
atoi(port.c_str())))
DIE("bufferevent_socket_connect_hostname()");
timer = evtimer_new(base, timer_cb, this);
}
Connection::~Connection() {
event_free(timer);
timer = NULL;
// FIXME: W("Drain op_q?");
bufferevent_free(bev);
delete iagen;
delete keygen;
delete keysize;
delete valuesize;
}
void Connection::reset() {
// FIXME: Actually check the connection, drain all bufferevents, drain op_q.
assert(op_queue.size() == 0);
evtimer_del(timer);
read_state = IDLE;
write_state = INIT_WRITE;
stats = ConnectionStats(stats.sampling);
}
void Connection::issue_get(const char* key, double now) {
Operation op;
#if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate();
#else
if (now == 0.0) {
#if USE_CACHED_TIME
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
op.start_time = tv_to_double(&now_tv);
#else
op.start_time = get_time();
#endif
} else {
op.start_time = now;
}
#endif
op.type = Operation::GET;
op.key = string(key);
op_queue.push(op);
if (read_state == IDLE)
read_state = WAITING_FOR_GET;
int l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
if (read_state != LOADING) stats.tx_bytes += l;
}
void Connection::issue_set(const char* key, const char* value, int length,
double now) {
Operation op;
#if HAVE_CLOCK_GETTIME
op.start_time = get_time_accurate();
#else
if (now == 0.0) op.start_time = get_time();
else op.start_time = now;
#endif
op.type = Operation::SET;
op_queue.push(op);
if (read_state == IDLE)
read_state = WAITING_FOR_SET;
int l = evbuffer_add_printf(bufferevent_get_output(bev),
"set %s 0 0 %d\r\n", key, length);
l += bufferevent_write(bev, value, length);
l += bufferevent_write(bev, "\r\n", 2);
if (read_state != LOADING) stats.tx_bytes += l;
}
void Connection::issue_something(double now) {
char key[256];
// FIXME: generate key distribution here!
string keystr = keygen->generate(lrand48() % options.records);
strcpy(key, keystr.c_str());
// int key_index = lrand48() % options.records;
// generate_key(key_index, options.keysize, key);
if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024);
// issue_set(key, &random_char[index], options.valuesize, now);
issue_set(key, &random_char[index], valuesize->generate(), now);
} else {
issue_get(key, now);
}
}
void Connection::pop_op() {
assert(op_queue.size() > 0);
op_queue.pop();
if (read_state == LOADING) return;
read_state = IDLE;
// Advance the read state machine.
if (op_queue.size() > 0) {
Operation& op = op_queue.front();
switch (op.type) {
case Operation::GET: read_state = WAITING_FOR_GET; break;
case Operation::SET: read_state = WAITING_FOR_SET; break;
default: DIE("Not implemented.");
}
}
}
bool Connection::check_exit_condition(double now) {
if (read_state == INIT_READ) return false;
if (now == 0.0) now = get_time();
if (now > start_time + options.time) return true;
if (options.loadonly && read_state == IDLE) return true;
return false;
}
// drive_write_machine() determines whether or not to issue a new
// command. Note that this function loops. Be wary of break
// vs. return.
void Connection::drive_write_machine(double now) {
if (now == 0.0) now = get_time();
double delay;
struct timeval tv;
if (check_exit_condition(now)) return;
while (1) {
switch (write_state) {
case INIT_WRITE:
/*
if (options.iadist == EXPONENTIAL)
delay = generate_poisson(options.lambda);
else
delay = generate_uniform(options.lambda);
*/
delay = iagen->generate();
next_time = now + delay;
double_to_tv(delay, &tv);
evtimer_add(timer, &tv);
write_state = WAITING_FOR_TIME;
break;
case ISSUING:
if (op_queue.size() >= (size_t) options.depth) {
write_state = WAITING_FOR_OPQ;
return;
} else if (now < next_time) {
write_state = WAITING_FOR_TIME;
break; // We want to run through the state machine one more time
// to make sure the timer is armed.
}
issue_something(now);
stats.log_op(op_queue.size());
/*
if (options.iadist == EXPONENTIAL)
next_time += generate_poisson(options.lambda);
else
next_time += generate_uniform(options.lambda);
*/
next_time += iagen->generate();
break;
case WAITING_FOR_TIME:
if (now < next_time) {
if (!event_pending(timer, EV_TIMEOUT, NULL)) {
delay = next_time - now;
double_to_tv(delay, &tv);
evtimer_add(timer, &tv);
}
return;
}
write_state = ISSUING;
break;
case WAITING_FOR_OPQ:
if (op_queue.size() >= (size_t) options.depth) return;
write_state = ISSUING;
break;
default: DIE("Not implemented");
}
}
}
void Connection::event_callback(short events) {
// struct timeval now_tv;
// event_base_gettimeofday_cached(base, &now_tv);
if (events & BEV_EVENT_CONNECTED) {
D("Connected to %s:%s.", hostname.c_str(), port.c_str());
int fd = bufferevent_getfd(bev);
if (fd < 0) DIE("bufferevent_getfd");
if (!options.no_nodelay) {
int one = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(void *) &one, sizeof(one)) < 0)
DIE("setsockopt()");
}
read_state = IDLE; // This is the most important part!
} else if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err) DIE("DNS error: %s", evutil_gai_strerror(err));
DIE("BEV_EVENT_ERROR: %s", strerror(errno));
} else if (events & BEV_EVENT_EOF) {
DIE("Unexpected EOF from server.");
}
}
void Connection::read_callback() {
struct evbuffer *input = bufferevent_get_input(bev);
#if USE_CACHED_TIME
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
#endif
char *buf;
Operation *op = NULL;
int length;
size_t n_read_out;
double now;
// Protocol processing loop.
if (op_queue.size() == 0) V("Spurious read callback.");
while (1) {
if (op_queue.size() > 0) op = &op_queue.front();
switch (read_state) {
case INIT_READ: DIE("event from uninitialized connection");
case IDLE: return; // We munched all the data we expected?
case WAITING_FOR_GET:
assert(op_queue.size() > 0);
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // A whole line not received yet. Punt.
stats.rx_bytes += n_read_out; // strlen(buf);
if (!strcmp(buf, "END")) {
// D("GET (%s) miss.", op->key.c_str());
stats.get_misses++;
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
free(buf);
pop_op();
drive_write_machine();
break;
} else if (!strncmp(buf, "VALUE", 5)) {
sscanf(buf, "VALUE %*s %*d %d", &length);
// FIXME: check key name to see if it corresponds to the op at
// the head of the op queue? This will be necessary to
// support "gets" where there may be misses.
data_length = length;
read_state = WAITING_FOR_GET_DATA;
}
free(buf);
case WAITING_FOR_GET_DATA:
assert(op_queue.size() > 0);
length = evbuffer_get_length(input);
if (length >= data_length + 2) {
// FIXME: Actually parse the value? Right now we just drain it.
evbuffer_drain(input, data_length + 2);
read_state = WAITING_FOR_END;
stats.rx_bytes += data_length + 2;
} else {
return;
}
case WAITING_FOR_END:
assert(op_queue.size() > 0);
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. Punt.
stats.rx_bytes += n_read_out;
if (!strcmp(buf, "END")) {
#if USE_CACHED_TIME
now = tv_to_double(&now_tv);
#else
now = get_time();
#endif
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_get(*op);
free(buf);
pop_op();
drive_write_machine(now);
break;
} else {
DIE("Unexpected result when waiting for END");
}
case WAITING_FOR_SET:
assert(op_queue.size() > 0);
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet. Punt.
stats.rx_bytes += n_read_out;
now = get_time();
#if HAVE_CLOCK_GETTIME
op->end_time = get_time_accurate();
#else
op->end_time = now;
#endif
stats.log_set(*op);
free(buf);
pop_op();
drive_write_machine(now);
break;
case LOADING:
assert(op_queue.size() > 0);
buf = evbuffer_readln(input, NULL, EVBUFFER_EOL_CRLF);
if (buf == NULL) return; // Haven't received a whole line yet.
free(buf);
loader_completed++;
pop_op();
if (loader_completed == options.records) {
D("Finished loading.");
read_state = IDLE;
} else {
while (loader_issued < loader_completed + LOADER_CHUNK) {
if (loader_issued >= options.records) break;
char key[256];
string keystr = keygen->generate(loader_issued);
strcpy(key, keystr.c_str());
int index = lrand48() % (1024 * 1024);
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
}
break;
default: DIE("not implemented");
}
}
}
void Connection::write_callback() {}
void Connection::timer_callback() { drive_write_machine(); }
// The follow are C trampolines for libevent callbacks.
void bev_event_cb(struct bufferevent *bev, short events, void *ptr) {
Connection* conn = (Connection*) ptr;
conn->event_callback(events);
}
void bev_read_cb(struct bufferevent *bev, void *ptr) {
Connection* conn = (Connection*) ptr;
conn->read_callback();
}
void bev_write_cb(struct bufferevent *bev, void *ptr) {
Connection* conn = (Connection*) ptr;
conn->write_callback();
}
void timer_cb(evutil_socket_t fd, short what, void *ptr) {
Connection* conn = (Connection*) ptr;
conn->timer_callback();
}
void Connection::set_priority(int pri) {
if (bufferevent_priority_set(bev, pri))
DIE("bufferevent_set_priority(bev, %d) failed", pri);
}
void Connection::start_loading() {
read_state = LOADING;
loader_issued = loader_completed = 0;
for (int i = 0; i < LOADER_CHUNK; i++) {
if (loader_issued >= options.records) break;
char key[256];
int index = lrand48() % (1024 * 1024);
string keystr = keygen->generate(loader_issued);
strcpy(key, keystr.c_str());
// generate_key(loader_issued, options.keysize, key);
// issue_set(key, &random_char[index], options.valuesize);
issue_set(key, &random_char[index], valuesize->generate());
loader_issued++;
}
}

102
Connection.h Normal file
View File

@ -0,0 +1,102 @@
// -*- c++-mode -*-
#include <queue>
#include <string>
#include <event2/bufferevent.h>
#include <event2/dns.h>
#include <event2/event.h>
#include <event2/util.h>
#include "AdaptiveSampler.h"
#include "cmdline.h"
#include "ConnectionOptions.h"
#include "ConnectionStats.h"
#include "Generator.h"
#include "Operation.h"
#include "util.h"
using namespace std;
void bev_event_cb(struct bufferevent *bev, short events, void *ptr);
void bev_read_cb(struct bufferevent *bev, void *ptr);
void bev_write_cb(struct bufferevent *bev, void *ptr);
void timer_cb(evutil_socket_t fd, short what, void *ptr);
class Connection {
public:
Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t options,
bool sampling = true);
~Connection();
string hostname;
string port;
double start_time; // Time when this connection began operations.
enum read_state_enum {
INIT_READ,
LOADING,
IDLE,
WAITING_FOR_GET,
WAITING_FOR_GET_DATA,
WAITING_FOR_END,
WAITING_FOR_SET,
MAX_READ_STATE,
};
enum write_state_enum {
INIT_WRITE,
ISSUING,
WAITING_FOR_TIME,
WAITING_FOR_OPQ,
MAX_WRITE_STATE,
};
read_state_enum read_state;
write_state_enum write_state;
ConnectionStats stats;
void issue_get(const char* key, double now = 0.0);
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
void issue_something(double now = 0.0);
void pop_op();
bool check_exit_condition(double now = 0.0);
void drive_write_machine(double now = 0.0);
void start_loading();
void reset();
void event_callback(short events);
void read_callback();
void write_callback();
void timer_callback();
void set_priority(int pri);
options_t options;
std::queue<Operation> op_queue;
private:
struct event_base *base;
struct evdns_base *evdns;
struct bufferevent *bev;
struct event *timer; // Used to control inter-transmission time.
double lambda, next_time; // Inter-transmission time parameters.
int data_length; // When waiting for data, how much we're peeking for.
// Parameters to track progress of the data loader.
int loader_issued, loader_completed;
Generator *valuesize;
Generator *keysize;
KeyGenerator *keygen;
Generator *iagen;
};

39
ConnectionOptions.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef CONNECTIONOPTIONS_H
#define CONNECTIONOPTIONS_H
#include "distributions.h"
typedef struct {
int connections;
bool blocking;
double lambda;
int qps;
int records;
char keysize[32];
char valuesize[32];
// int keysize;
// int valuesize;
char ia[32];
// qps_per_connection
// iadist
double update;
int time;
bool loadonly;
int depth;
bool no_nodelay;
bool noload;
int threads;
enum distribution_t iadist;
int warmup;
bool roundrobin;
int server_given;
int lambda_denom;
bool oob_thread;
} options_t;
#endif // CONNECTIONOPTIONS_H

184
ConnectionStats.h Normal file
View File

@ -0,0 +1,184 @@
/* -*- c++ -*- */
#ifndef CONNECTIONSTATS_H
#define CONNECTIONSTATS_H
#include <algorithm>
#include <inttypes.h>
#include <vector>
#ifdef USE_ADAPTIVE_SAMPLER
#include "AdaptiveSampler.h"
#else
#include "HistogramSampler.h"
#endif
#include "AgentStats.h"
#include "Operation.h"
using namespace std;
class ConnectionStats {
public:
ConnectionStats(bool _sampling = true) :
#ifdef USE_ADAPTIVE_SAMPLER
get_sampler(100000), set_sampler(100000), op_sampler(100000),
#else
get_sampler(10000,1), set_sampler(10000,1), op_sampler(1000,1),
#endif
rx_bytes(0), tx_bytes(0), gets(0), sets(0),
get_misses(0), sampling(_sampling) {}
#ifdef USE_ADAPTIVE_SAMPLER
AdaptiveSampler<Operation> get_sampler;
AdaptiveSampler<Operation> set_sampler;
AdaptiveSampler<double> op_sampler;
#else
HistogramSampler get_sampler;
HistogramSampler set_sampler;
HistogramSampler op_sampler;
#endif
uint64_t rx_bytes, tx_bytes;
uint64_t gets, sets, get_misses;
double start, stop;
bool sampling;
void log_get(Operation& op) { if (sampling) get_sampler.sample(op); gets++; }
void log_set(Operation& op) { if (sampling) set_sampler.sample(op); sets++; }
void log_op (double op) { if (sampling) op_sampler.sample(op); }
double get_qps() {
return (gets + sets) / (stop - start);
}
#ifdef USE_ADAPTIVE_SAMPLER
double get_nth(double nth) {
vector<double> samples;
if (samples.size() == 0) return 0;
for (auto s: get_sampler.samples)
samples.push_back(s.time()); // (s.end_time - s.start_time) * 1000000);
for (auto s: set_sampler.samples)
samples.push_back(s.time()); // (s.end_time - s.start_time) * 1000000);
sort(samples.begin(), samples.end());
int l = samples.size();
int i = (int)((nth * l) / 100);
assert(i < l);
return samples[i];
}
#else
double get_nth(double nth) {
// FIXME: nth across gets & sets?
return get_sampler.get_nth(nth);
}
#endif
void accumulate(const ConnectionStats &cs) {
#ifdef USE_ADAPTIVE_SAMPLER
for (auto i: cs.get_sampler.samples) get_sampler.sample(i); //log_get(i);
for (auto i: cs.set_sampler.samples) set_sampler.sample(i); //log_set(i);
for (auto i: cs.op_sampler.samples) op_sampler.sample(i); //log_op(i);
#else
get_sampler.accumulate(cs.get_sampler);
set_sampler.accumulate(cs.set_sampler);
op_sampler.accumulate(cs.op_sampler);
#endif
rx_bytes += cs.rx_bytes;
tx_bytes += cs.tx_bytes;
gets += cs.gets;
sets += cs.sets;
get_misses += cs.get_misses;
start = cs.start;
stop = cs.stop;
}
void accumulate(const AgentStats &as) {
rx_bytes += as.rx_bytes;
tx_bytes += as.tx_bytes;
gets += as.gets;
sets += as.sets;
get_misses += as.get_misses;
start = as.start;
stop = as.stop;
}
static void print_header() {
printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n",
"#type", "avg", "min", "1st", "5th", "10th",
"90th", "95th", "99th");
}
#ifdef USE_ADAPTIVE_SAMPLER
void print_stats(const char *tag, AdaptiveSampler<Operation> &sampler,
bool newline = true) {
vector<double> copy;
for (auto i: sampler.samples) copy.push_back(i.time());
size_t l = copy.size();
if (l == 0) {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
if (newline) printf("\n");
return;
}
sort(copy.begin(), copy.end());
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l,
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100]
);
if (newline) printf("\n");
}
void print_stats(const char *tag, AdaptiveSampler<double> &sampler,
bool newline = true) {
vector<double> copy;
for (auto i: sampler.samples) copy.push_back(i);
size_t l = copy.size();
if (l == 0) { printf("%-7s 0", tag); if (newline) printf("\n"); return; }
sort(copy.begin(), copy.end());
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l,
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100]
);
if (newline) printf("\n");
}
#else
void print_stats(const char *tag, HistogramSampler &sampler,
bool newline = true) {
if (sampler.total() == 0) {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
if (newline) printf("\n");
return;
}
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, sampler.average(),
sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5),
sampler.get_nth(10), sampler.get_nth(90),
sampler.get_nth(95), sampler.get_nth(99));
if (newline) printf("\n");
}
#endif
};
#endif // CONNECTIONSTATS_H

78
Generator.cc Normal file
View File

@ -0,0 +1,78 @@
// -*- c++ -*-
#include "config.h"
#include "Generator.h"
Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); }
Generator* createFacebookValue() {
Generator* g = new GPareto(214.476, 0.348238);
Discrete* d = new Discrete(g);
d->add(0.00536, 0.0);
d->add(0.00047, 1.0);
d->add(0.17820, 2.0);
d->add(0.09239, 3.0);
d->add(0.00018, 4.0);
d->add(0.02740, 5.0);
d->add(0.00065, 6.0);
d->add(0.00606, 7.0);
d->add(0.00023, 8.0);
d->add(0.00837, 9.0);
d->add(0.00837, 10.0);
d->add(0.08989, 11.0);
d->add(0.00092, 12.0);
d->add(0.00326, 13.0);
d->add(0.01980, 14.0);
return d;
}
Generator* createFacebookIA() { return new GPareto(16.0292, 0.154971); }
Generator* createGenerator(std::string str) {
if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey();
else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue();
else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA();
char *s_copy = new char[str.length() + 1];
strcpy(s_copy, str.c_str());
char *saveptr = NULL;
if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) {
double v = atof(s_copy);
delete[] s_copy;
return new Fixed(v);
}
char *t_ptr = strtok_r(s_copy, ":", &saveptr);
char *a_ptr = strtok_r(NULL, ":", &saveptr);
if (t_ptr == NULL) // || a_ptr == NULL)
DIE("strtok(.., \":\") failed to parse %s", str.c_str());
char t = t_ptr[0];
saveptr = NULL;
char *s1 = strtok_r(a_ptr, ",", &saveptr);
char *s2 = strtok_r(NULL, ",", &saveptr);
char *s3 = strtok_r(NULL, ",", &saveptr);
double a1 = s1 ? atof(s1) : 0.0;
double a2 = s2 ? atof(s2) : 0.0;
double a3 = s3 ? atof(s3) : 0.0;
delete[] s_copy;
if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1);
else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2);
else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1);
else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2);
else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3);
else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1);
DIE("Unable to create Generator '%s'", str.c_str());
return NULL;
}

216
Generator.h Normal file
View File

@ -0,0 +1,216 @@
// -*- c++ -*-
// 1. implement "fixed" generator
// 2. implement discrete generator
// 3. implement combine generator?
#ifndef GENERATOR_H
#define GENERATOR_H
#define MAX(a,b) ((a) > (b) ? (a) : (b))
#include "config.h"
#include <string>
#include <vector>
#include <utility>
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "log.h"
#include "util.h"
// Generator syntax:
//
// \d+ == fixed
// n[ormal]:mean,sd
// e[xponential]:lambda
// p[areto]:scale,shape
// g[ev]:loc,scale,shape
// fb_value, fb_key, fb_rate
class Generator {
public:
Generator() {}
// Generator(const Generator &g) = delete;
// virtual Generator& operator=(const Generator &g) = delete;
virtual ~Generator() {}
virtual double generate(double U = -1.0) = 0;
virtual void set_lambda(double lambda) {DIE("set_lambda() not implemented");}
protected:
std::string type;
};
class Fixed : public Generator {
public:
Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); }
virtual double generate(double U = -1.0) { return value; }
virtual void set_lambda(double lambda) {
if (lambda > 0.0) value = 1.0 / lambda;
else value = 0.0;
}
private:
double value;
};
class Uniform : public Generator {
public:
Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); }
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
return scale * U;
}
virtual void set_lambda(double lambda) {
if (lambda > 0.0) scale = 2.0 / lambda;
else scale = 0.0;
}
private:
double scale;
};
class Normal : public Generator {
public:
Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) {
D("Normal(mean=%f, sd=%f)", mean, sd);
}
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
double V = U; // drand48();
double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V);
return mean + sd * N;
}
virtual void set_lambda(double lambda) {
if (lambda > 0.0) mean = 1.0 / lambda;
else mean = 0.0;
}
private:
double mean, sd;
};
class Exponential : public Generator {
public:
Exponential(double _lambda = 1.0) : lambda(_lambda) {
D("Exponential(lambda=%f)", lambda);
}
virtual double generate(double U = -1.0) {
if (lambda <= 0.0) return 0.0;
if (U < 0.0) U = drand48();
return -log(U) / lambda;
}
virtual void set_lambda(double lambda) { this->lambda = lambda; }
private:
double lambda;
};
class GPareto : public Generator {
public:
GPareto(double _scale = 1.0, double _shape = 1.0) :
scale(_scale), shape(_shape) {
assert(shape != 0.0);
D("GPareto(scale=%f, shape=%f)", scale, shape);
}
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
return scale * (pow(U, -shape) - 1) / shape;
}
virtual void set_lambda(double lambda) {
if (lambda <= 0.0) scale = 0.0;
else scale = (1 - shape) / lambda;
}
private:
double scale /* sigma */, shape /* k */;
};
class GEV : public Generator {
public:
GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) :
e(1.0), loc(_loc), scale(_scale), shape(_shape) {
assert(shape != 0.0);
D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
}
virtual double generate(double U = -1.0) {
return loc + scale * (pow(e.generate(U), -shape) - 1) / shape;
}
private:
Exponential e;
double loc /* mu */, scale /* sigma */, shape /* k */;
};
class Discrete : public Generator {
public:
~Discrete() { delete def; }
Discrete(Generator* _def = NULL) : def(_def) {
if (def == NULL) def = new Fixed(0.0);
}
virtual double generate(double U = -1.0) {
double Uc = U;
if (pv.size() > 0 && U < 0.0) U = drand48();
double sum = 0;
for (auto p: pv) {
sum += p.first;
if (U < sum) return p.second;
}
return def->generate(Uc);
// return 0.0;
}
void add(double p, double v) {
pv.push_back(std::pair<double,double>(p, v));
}
private:
Generator *def;
std::vector< std::pair<double,double> > pv;
};
class KeyGenerator {
public:
KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {}
std::string generate(uint64_t ind) {
uint64_t h = fnv_64(ind);
double U = (double) h / ULLONG_MAX;
double G = g->generate(U);
int keylen = MAX(round(G), floor(log10(max)) + 1);
char key[256];
snprintf(key, 256, "%0*" PRIu64, keylen, ind);
// D("%d = %s", ind, key);
return std::string(key);
}
private:
Generator* g;
double max;
};
Generator* createGenerator(std::string str);
Generator* createFacebookKey();
Generator* createFacebookValue();
Generator* createFacebookIA();
#endif // GENERATOR_H

92
HistogramSampler.h Normal file
View File

@ -0,0 +1,92 @@
/* -*- c++ -*- */
#ifndef HISTOGRAMSAMPLER_H
#define HISTOGRAMSAMPLER_H
#include <inttypes.h>
#include <assert.h>
#include <vector>
#include "Operation.h"
// parameters: # of bins, range? size of bins?
class HistogramSampler {
public:
std::vector<uint64_t> bins;
int width;
double overflow_sum;
HistogramSampler() = delete;
HistogramSampler(int _bins, int _width) : overflow_sum(0.0) {
assert(_bins > 0 && _width > 0);
bins.resize(_bins + 1, 0);
width = _width;
}
void sample(const Operation &op) {
sample(op.time());
}
void sample(double s) {
assert(s >= 0);
size_t bin = s / width;
if (bin >= bins.size()) {
bin = bins.size() - 1;
overflow_sum += s;
}
bins[bin]++;
}
double average() {
uint64_t count = total();
double sum = 0.0;
for (size_t i = 0; i < bins.size() - 1; i++) {
sum += bins[i] * (i*width + (i+1)*width) / 2;
}
sum += overflow_sum;
return sum / count;
}
double get_nth(double nth) {
uint64_t count = total();
uint64_t n = 0;
double target = count * nth/100;
for (size_t i = 0; i < bins.size(); i++) {
n += bins[i];
if (n > target) { // The nth is inside bins[i].
double left = target - (n - bins[i]);
return i*width + left / bins[i] * width;
}
}
return bins.size() * width;
}
uint64_t total() {
uint64_t sum = 0.0;
for (auto i: bins) sum += i;
return sum;
}
void accumulate(const HistogramSampler &h) {
assert(width == h.width && bins.size() == h.bins.size());
for (size_t i = 0; i < bins.size(); i++) bins[i] += h.bins[i];
overflow_sum += h.overflow_sum;
}
};
#endif // HISTOGRAMSAMPLER_H

26
Operation.h Normal file
View File

@ -0,0 +1,26 @@
// -*- c++-mode -*-
#ifndef OPERATION_H
#define OPERATION_H
#include <string>
using namespace std;
class Operation {
public:
double start_time, end_time;
enum type_enum {
GET, SET
};
type_enum type;
string key;
// string value;
double time() const { return (end_time - start_time) * 1000000; }
};
#endif // OPERATION_H

View File

@ -1,4 +1,6 @@
mutilate
========
Mutilate: high-performance memcached load generator
Mutilate is a memcached load generator designed for high request
rates, good tail-latency measurements, and realistic request stream
generation.

49
SConstruct Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/python
import os
env = Environment(ENV = os.environ)
env['HAVE_POSIX_BARRIER'] = True
env.Append(CPPPATH = ['/usr/local/include'])
env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS')
conf = env.Configure(config_h = "config.h")
conf.Define("__STDC_FORMAT_MACROS")
if not conf.CheckCXX():
print "A compiler with C++11 support is required."
Exit(1)
print "Checking for gengetopt...",
if Execute("@which gengetopt &> /dev/null"):
print "not found (required)"
Exit(1)
else: print "found"
if not conf.CheckLibWithHeader("event", "event2/event.h", "C++"):
print "libevent required"
Exit(1)
if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"):
print "pthread required"
Exit(1)
conf.CheckLib("rt", "clock_gettime", language="C++")
conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++")
conf.CheckFunc('clock_gettime')
if conf.CheckFunc('pthread_barrier_init'):
conf.env['HAVE_POSIX_BARRIER'] = False
env = conf.Finish()
env.Append(CFLAGS = ' -O3 -Wall -g')
#env.Append(CPPFLAGS = ' -D_GNU_SOURCE -D__STDC_FORMAT_MACROS')
#env.Append(CPPFLAGS = ' -DUSE_ADAPTIVE_SAMPLER')
env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE')
src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc
Connection.cc Generator.cc""")
if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER:
src += ['barrier.cc']
env.Program(target='mutilate', source=src)
env.Program(target='gtest', source=['TestGenerator.cc', 'log.cc', 'util.cc',
'Generator.cc'])

86
TestGenerator.cc Normal file
View File

@ -0,0 +1,86 @@
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <time.h>
#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include "Generator.h"
#include "util.h"
int main(int argc, char **argv) {
// double now = get_time();
// uint64_t x = fnv_64_buf(&now, sizeof(now));
srand48(0xdeadbeef);
/*
Generator *n = createGenerator("n:1,1"); // new Normal(1, 1);
Generator *e = createGenerator("e:1"); // new Exponential(1);
Generator *p = createGenerator("p:214.476,0.348238"); // new GPareto(214.476, 0.348238);
Generator *g = createGenerator("g:30.7984,8.20449,0.078688"); // new GEV(30.7984, 8.20449, 0.078688);
printf("%f\n", n->generate());
printf("%f\n", e->generate());
printf("%f\n", p->generate());
printf("%f\n", g->generate());
srand48(0);
printf("\n\n");
Discrete *d = new Discrete(createGenerator("p:214.476,0.348238"));
// d->add(.5, -1.0);
//Generator *d = createGenerator("p:214.476,0.348238");
for (int i = 0; i < 20; i++) {
printf("d %d\n", (int) d->generate());
}
printf("\n\n");
srand48(0);
//Discrete *d2 = new Discrete(createGenerator("p:214.476,0.348238"));
// d->add(.5, -1.0);
Generator *d2 = createGenerator("p:214.476,0.348238");
for (int i = 0; i < 20; i++) {
printf("d %d\n", (int) d2->generate());
}
KeyGenerator kg(g);
*/
Generator *p2 = createGenerator("p:214.476,0.348238");
// for (int i = 0; i < 1000; i++)
// printf("%f\n", p2->generate());
p2->set_lambda(1000);
for (int i = 0; i < 1000; i++)
printf("%f\n", p2->generate());
// for (int i = 0; i < 10000; i++)
// printf("%s\n", kg.generate(i).c_str());
/*
for (uint64_t ind = 0; ind < 10000; ind++) {
// uint64_t ind = 0;
uint64_t h = fnv_64(ind);
double U = (double) h / ULLONG_MAX;
// double E = e->generate(U); // -log(U);
double G = g->generate(U);
int keylen = MAX(round(G), floor(log10(10000)) + 1);
// printf("ind=%" PRIu64 "\n", ind);
// printf("h=%" PRIu64 "\n", h);
// printf("U=%f\n", U);
// printf("G=%f\n", G);
// printf("keylen=%d\n", keylen);
printf("%7" PRIu64 " %7d key=%0*" PRIu64 "\n", ind, keylen, keylen, ind);
}
*/
}

33
barrier.cc Normal file
View File

@ -0,0 +1,33 @@
#include <pthread.h>
#include "barrier.h"
int barrier_init(barrier_t *barrier,int needed)
{
barrier->needed = needed;
barrier->called = 0;
pthread_mutex_init(&barrier->mutex,NULL);
pthread_cond_init(&barrier->cond,NULL);
return 0;
}
int barrier_destroy(barrier_t *barrier)
{
pthread_mutex_destroy(&barrier->mutex);
pthread_cond_destroy(&barrier->cond);
return 0;
}
int barrier_wait(barrier_t *barrier)
{
pthread_mutex_lock(&barrier->mutex);
barrier->called++;
if (barrier->called == barrier->needed) {
barrier->called = 0;
pthread_cond_broadcast(&barrier->cond);
} else {
pthread_cond_wait(&barrier->cond,&barrier->mutex);
}
pthread_mutex_unlock(&barrier->mutex);
return 0;
}

25
barrier.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef BARRIER_H
#define BARRIER_H
#include "config.h"
typedef struct {
int needed;
int called;
pthread_mutex_t mutex;
pthread_cond_t cond;
} barrier_t;
int barrier_init(barrier_t *barrier,int needed);
int barrier_destroy(barrier_t *barrier);
int barrier_wait(barrier_t *barrier);
#ifndef HAVE_PTHREAD_BARRIER_INIT
#define pthread_barrier_t barrier_t
#define pthread_barrier_attr_t barrier_attr_t
#define pthread_barrier_init(b,a,n) barrier_init(b,n)
#define pthread_barrier_destroy(b) barrier_destroy(b)
#define pthread_barrier_wait(b) barrier_wait(b)
#endif
#endif // BARRIER_H

79
cmdline.ggo Normal file
View File

@ -0,0 +1,79 @@
package "mutilate3"
version "0.1"
usage "mutilate -s server[:port] [options]"
description "\"High-performance\" memcached benchmarking tool"
args "-c cc --show-required -C --default-optional -l"
option "verbose" v "Verbosity. Repeat for more verbose." multiple
option "quiet" - "Disable log messages."
text "\nBasic options:"
option "server" s "Memcached server hostname[:port]. \
Repeat to specify multiple servers." string multiple
option "qps" q "Target aggregate QPS." int default="0"
option "time" t "Maximum time to run (seconds)." int default="5"
option "keysize" K "Length of memcached keys (distribution)."
string default="30"
option "valuesize" V "Length of memcached values (distribution)."
string default="200"
option "records" r "Number of memcached records to use. \
If multiple memcached servers are given, this number is divided \
by the number of servers." int default="10000"
option "update" u "Ratio of set:get commands." float default="0.0"
text "\nAdvanced options:"
option "threads" T "Number of threads to spawn." int default="1"
option "connections" c "Connections to establish per server." int default="1"
option "depth" d "Maximum depth to pipeline requests." int default="1"
option "roundrobin" R "Assign threads to servers in round-robin fashion. \
By default, each thread connects to every server."
option "iadist" i "Inter-arrival distribution (distribution)."
string default="exponential"
option "noload" - "Skip database loading."
option "loadonly" - "Load database and then exit."
option "blocking" B "Use blocking epoll(). May increase latency."
option "no_nodelay" D "Don't use TCP_NODELAY."
option "warmup" w "Warmup time before starting measurement." int
option "wait" W "Time to wait after startup to start measurement." int
option "search" S "Search for the QPS where N-order statistic < Xus. \
(i.e. --search 95:1000 means find the QPS where 95% of requests are \
faster than 1000us)." string typestr="N:X"
option "scan" - "Scan latency across QPS rates from min to max."
string typestr="min:max:step"
text "\nAgent-mode options:"
option "agentmode" A "Run client in agent mode."
option "agent" a "Enlist remote agent." string typestr="host" multiple
option "lambda_mul" l "Lambda multiplier. Increases share of QPS for this client." int default="1"
text "
Some options take a 'distribution' as an argument.
Distributions are specified by <distribution>[:<param1>[,...]].
Parameters are not required. The following distributions are supported:
[fixed:]<value> Always generates <value>.
uniform:<max> Uniform distribution between 0 and <max>.
normal:<mean>,<sd> Normal distribution.
exponential:<lambda> Exponential distribution.
pareto:<scale>,<shape> Generalized Pareto distribution.
gev:<loc>,<scale>,<shape> Generalized Extreme Value distribution.
fb_key ETC key-size distribution from [1].
fb_value ETC value-size distribution from [1].
fb_ia ETC inter-arrival distribution from [1].
[1] Berk Atikoglu et al., Workload Analysis of a Large-Scale Key-Value Store,
SIGMETRICS 2012
"

34
distributions.cc Normal file
View File

@ -0,0 +1,34 @@
#include <math.h>
#include <stdlib.h>
#include <string.h>
#include "distributions.h"
#include "log.h"
const char* distributions[] =
{ "uniform", "exponential", "zipfian", "latest", NULL };
distribution_t get_distribution(const char *name) {
for (int i = 0; distributions[i] != NULL; i++)
if (!strcmp(distributions[i], name))
return (distribution_t) i;
return (distribution_t) -1;
}
double generate_normal(double mean, double sd) {
double U = drand48();
double V = drand48();
double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V);
return mean + sd * N;
}
double generate_poisson(double lambda) {
if (lambda <= 0.0) return 0;
double U = drand48();
return -log(U)/lambda;
}
double generate_uniform(double lambda) {
if (lambda <= 0.0) return 0;
return 1.0 / lambda;
}

14
distributions.h Normal file
View File

@ -0,0 +1,14 @@
#ifndef DISTRIBUTIONS_H
#define DISTRIBUTIONS_H
// If you change this, make sure to update distributions.cc.
enum distribution_t { UNIFORM, EXPONENTIAL, ZIPFIAN, LATEST };
extern const char* distributions[];
double generate_normal(double mean, double sd);
double generate_poisson(double lambda);
double generate_uniform(double lambda);
distribution_t get_distribution(const char *name);
#endif // DISTRIBUTIONS_H

19
log.cc Normal file
View File

@ -0,0 +1,19 @@
#include <stdio.h>
#include <stdarg.h>
#include "log.h"
log_level_t log_level = INFO;
void log_file_line(log_level_t level, const char *file, int line,
const char *format, ...) {
va_list args;
char new_format[512];
snprintf(new_format, sizeof(new_format), "%s(%d): %s\n", file, line, format);
va_start(args, format);
if (level >= log_level)
vfprintf(stderr, new_format, args);
va_end(args);
}

25
log.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef LOG_H
#define LOG_H
enum log_level_t { DEBUG, VERBOSE, INFO, WARN, QUIET };
extern log_level_t log_level;
void log_file_line(log_level_t level, const char *file, int line,
const char* format, ...);
#define L(level, args...) log_file_line(level, __FILE__, __LINE__, args)
#define D(args...) L(DEBUG, args)
#define V(args...) L(VERBOSE, args)
#define I(args...) L(INFO, args)
#define W(args...) L(WARN, args)
#define DIE(args...) do { W(args); exit(-1); } while (0)
#define NOLOG(x) \
do { log_level_t old = log_level; \
log_level = QUIET; \
(x); \
log_level = old; \
} while (0)
#endif // LOG_H

910
mutilate.cc Normal file
View File

@ -0,0 +1,910 @@
#include <arpa/inet.h>
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <queue>
#include <string>
#include <vector>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/dns.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/util.h>
#include "config.h"
#ifdef HAVE_LIBZMQ
#include <zmq.hpp>
#endif
#include "AdaptiveSampler.h"
#include "AgentStats.h"
#ifndef HAVE_PTHREAD_BARRIER_INIT
#include "barrier.h"
#endif
#include "cmdline.h"
#include "Connection.h"
#include "ConnectionOptions.h"
#include "log.h"
#include "mutilate.h"
#include "util.h"
#define MIN(a,b) ((a) < (b) ? (a) : (b))
using namespace std;
gengetopt_args_info args;
char random_char[2 * 1024 * 1024]; // Buffer used to generate random values.
#ifdef HAVE_LIBZMQ
vector<zmq::socket_t*> agent_sockets;
zmq::context_t context(1);
#endif
struct thread_data {
const vector<string> *servers;
options_t *options;
bool master;
#ifdef HAVE_LIBZMQ
zmq::socket_t *socket;
#endif
};
// struct evdns_base *evdns;
pthread_barrier_t barrier;
double boot_time;
void init_random_stuff();
void go(const vector<string> &servers, options_t &options,
ConnectionStats &stats
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket = NULL
#endif
);
void do_mutilate(const vector<string> &servers, options_t &options,
ConnectionStats &stats, bool master = true
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket = NULL
#endif
);
void args_to_options(options_t* options);
void* thread_main(void *arg);
#ifdef HAVE_LIBZMQ
static std::string s_recv (zmq::socket_t &socket) {
zmq::message_t message;
socket.recv(&message);
return std::string(static_cast<char*>(message.data()), message.size());
}
// Convert string to 0MQ string and send to socket
static bool s_send (zmq::socket_t &socket, const std::string &string) {
zmq::message_t message(string.size());
memcpy(message.data(), string.data(), string.size());
return socket.send(message);
}
void agent() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555");
while (true) {
zmq::message_t request;
socket.recv(&request);
zmq::message_t num(sizeof(int));
*((int *) num.data()) = args.threads_arg * args.lambda_mul_arg;
socket.send(num);
options_t options;
memcpy(&options, request.data(), sizeof(options));
vector<string> servers;
for (int i = 0; i < options.server_given; i++) {
servers.push_back(s_recv(socket));
s_send(socket, "ACK");
}
for (auto i: servers) {
V("Got server = %s", i.c_str());
}
options.threads = args.threads_arg;
socket.recv(&request);
options.lambda_denom = *((int *) request.data());
s_send(socket, "THANKS");
// V("AGENT SLEEPS"); sleep(1);
options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg;
// if (options.threads > 1)
pthread_barrier_init(&barrier, NULL, options.threads);
ConnectionStats stats;
go(servers, options, stats, &socket);
AgentStats as;
as.rx_bytes = stats.rx_bytes;
as.tx_bytes = stats.tx_bytes;
as.gets = stats.gets;
as.sets = stats.sets;
as.get_misses = stats.get_misses;
as.start = stats.start;
as.stop = stats.stop;
string req = s_recv(socket);
// V("req = %s", req.c_str());
request.rebuild(sizeof(as));
memcpy(request.data(), &as, sizeof(as));
socket.send(request);
}
}
void prep_agent(const vector<string>& servers, options_t& options) {
int sum = options.lambda_denom;
for (auto s: agent_sockets) {
zmq::message_t message(sizeof(options_t));
memcpy((void *) message.data(), &options, sizeof(options_t));
s->send(message);
zmq::message_t rep;
s->recv(&rep);
unsigned int num = *((int *) rep.data());
sum += options.connections * (options.roundrobin ?
(servers.size() > num ? servers.size() : num) :
(servers.size() * num));
for (auto i: servers) {
s_send(*s, i);
string rep = s_recv(*s);
// V("Reply: %s", rep.c_str());
}
}
options.lambda_denom = sum;
options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg;
V("lambda_denom = %d", sum);
for (auto s: agent_sockets) {
zmq::message_t message(sizeof(sum));
*((int *) message.data()) = sum;
s->send(message);
string rep = s_recv(*s);
}
V("MASTER SLEEPS"); sleep_time(1.5);
}
void finish_agent(ConnectionStats &stats) {
for (auto s: agent_sockets) {
s_send(*s, "stats");
AgentStats as;
zmq::message_t message;
s->recv(&message);
memcpy(&as, message.data(), sizeof(as));
stats.accumulate(as);
}
}
void sync_agent(zmq::socket_t* socket) {
// V("agent: synchronizing");
if (args.agent_given) {
for (auto s: agent_sockets) {
s_send(*s, "sync1");
string rep = s_recv(*s);
}
} else if (args.agentmode_given) {
string req = s_recv(*socket);
s_send(*socket, "sync");
}
// V("agent: synchronized");
}
#endif
string name_to_ipaddr(string host) {
char *s_copy = new char[host.length() + 1];
strcpy(s_copy, host.c_str());
char *saveptr = NULL; // For reentrant strtok().
char *h_ptr = strtok_r(s_copy, ":", &saveptr);
char *p_ptr = strtok_r(NULL, ":", &saveptr);
char ipaddr[16];
if (h_ptr == NULL)
DIE("strtok(.., \":\") failed to parse %s", host.c_str());
string hostname = h_ptr;
string port = "11211";
if (p_ptr) port = p_ptr;
struct evutil_addrinfo hints;
struct evutil_addrinfo *answer = NULL;
int err;
/* Build the hints to tell getaddrinfo how to act. */
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; /* v4 or v6 is fine. */
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP; /* We want a TCP socket */
/* Only return addresses we can use. */
hints.ai_flags = EVUTIL_AI_ADDRCONFIG;
/* Look up the hostname. */
err = evutil_getaddrinfo(h_ptr, NULL, &hints, &answer);
if (err < 0) {
DIE("Error while resolving '%s': %s",
host.c_str(), evutil_gai_strerror(err));
}
if (answer == NULL) DIE("No DNS answer.");
void *ptr = NULL;
switch (answer->ai_family) {
case AF_INET:
ptr = &((struct sockaddr_in *) answer->ai_addr)->sin_addr;
break;
case AF_INET6:
ptr = &((struct sockaddr_in6 *) answer->ai_addr)->sin6_addr;
break;
}
inet_ntop (answer->ai_family, ptr, ipaddr, 16);
D("Resolved %s to %s", h_ptr, (string(ipaddr) + ":" + string(port)).c_str());
delete[] s_copy;
return string(ipaddr) + ":" + string(port);
}
int main(int argc, char **argv) {
if (cmdline_parser(argc, argv, &args) != 0) exit(-1);
for (unsigned int i = 0; i < args.verbose_given; i++)
log_level = (log_level_t) ((int) log_level - 1);
if (args.quiet_given) log_level = QUIET;
if (args.depth_arg < 1) DIE("--depth must be >= 1");
// if (args.valuesize_arg < 1 || args.valuesize_arg > 1024*1024)
// DIE("--valuesize must be >= 1 and <= 1024*1024");
if (args.qps_arg < 0) DIE("--qps must be >= 0");
if (args.update_arg < 0.0 || args.update_arg > 1.0)
DIE("--update must be >= 0.0 and <= 1.0");
if (args.time_arg < 1) DIE("--time must be >= 1");
// if (args.keysize_arg < MINIMUM_KEY_LENGTH)
// DIE("--keysize must be >= %d", MINIMUM_KEY_LENGTH);
if (args.connections_arg < 1 || args.connections_arg > MAXIMUM_CONNECTIONS)
DIE("--connections must be between [1,%d]", MAXIMUM_CONNECTIONS);
// if (get_distribution(args.iadist_arg) == -1)
// DIE("--iadist invalid: %s", args.iadist_arg);
if (!args.server_given && !args.agentmode_given)
DIE("--server or --agentmode must be specified.");
// TODO: Discover peers, share arguments.
init_random_stuff();
boot_time = get_time();
setvbuf(stdout, NULL, _IONBF, 0);
// struct event_base *base;
// if ((base = event_base_new()) == NULL) DIE("event_base_new() fail");
// evthread_use_pthreads();
// if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns");
#ifdef HAVE_LIBZMQ
if (args.agentmode_given) {
agent();
return 0;
} else if (args.agent_given) {
for (unsigned int i = 0; i < args.agent_given; i++) {
zmq::socket_t *s = new zmq::socket_t(context, ZMQ_REQ);
string host = string("tcp://") + string(args.agent_arg[i]) + string(":5555");
s->connect(host.c_str());
agent_sockets.push_back(s);
}
}
#endif
options_t options;
args_to_options(&options);
pthread_barrier_init(&barrier, NULL, options.threads);
vector<string> servers;
for (unsigned int s = 0; s < args.server_given; s++)
servers.push_back(name_to_ipaddr(string(args.server_arg[s])));
ConnectionStats stats;
double peak_qps = 0.0;
if (args.search_given) {
char *n_ptr = strtok(args.search_arg, ":");
char *x_ptr = strtok(NULL, ":");
if (n_ptr == NULL || x_ptr == NULL) DIE("Invalid --search argument");
int n = atoi(n_ptr);
int x = atoi(x_ptr);
I("Search-mode. Find QPS @ %dus %dth percentile.", x, n);
int high_qps = 2000000;
int low_qps = 5000;
double nth;
int cur_qps;
go(servers, options, stats);
nth = stats.get_nth(n);
peak_qps = stats.get_qps();
high_qps = stats.get_qps();
cur_qps = stats.get_qps();
I("peak qps = %d", high_qps);
if (nth > x) {
while ((high_qps > low_qps * 1.02) && cur_qps > 10000) {
cur_qps = (high_qps + low_qps) / 2;
args_to_options(&options);
options.qps = cur_qps;
options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg;
stats = ConnectionStats();
go(servers, options, stats);
nth = stats.get_nth(n);
I("cur_qps = %d, get_qps = %f, nth = %f", cur_qps, stats.get_qps(), nth);
if (nth > x /*|| cur_qps > stats.get_qps() * 1.05*/) high_qps = cur_qps;
else low_qps = cur_qps;
}
while (nth > x && cur_qps > 10000) { // > low_qps) { // 10000) {
cur_qps = cur_qps * 98 / 100;
args_to_options(&options);
options.qps = cur_qps;
options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg;
stats = ConnectionStats();
go(servers, options, stats);
nth = stats.get_nth(n);
I("cur_qps = %d, get_qps = %f, nth = %f", cur_qps, stats.get_qps(), nth);
}
}
} else if (args.scan_given) {
char *min_ptr = strtok(args.scan_arg, ":");
char *max_ptr = strtok(NULL, ":");
char *step_ptr = strtok(NULL, ":");
if (min_ptr == NULL || min_ptr == NULL || step_ptr == NULL)
DIE("Invalid --scan argument");
int min = atoi(min_ptr);
int max = atoi(max_ptr);
int step = atoi(step_ptr);
printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s %8s %8s\n",
"#type", "avg", "min", "1st", "5th", "10th",
"90th", "95th", "99th", "QPS", "target");
for (int q = min; q <= max; q += step) {
args_to_options(&options);
options.qps = q;
options.lambda = (double) options.qps / (double) options.lambda_denom * args.lambda_mul_arg;
// options.lambda = (double) options.qps / options.connections /
// args.server_given /
// (args.threads_arg < 1 ? 1 : args.threads_arg);
stats = ConnectionStats();
go(servers, options, stats);
stats.print_stats("read", stats.get_sampler, false);
printf(" %8.1f", stats.get_qps());
printf(" %8d\n", q);
}
} else {
go(servers, options, stats);
}
if (!args.scan_given && !args.loadonly_given) {
stats.print_header();
stats.print_stats("read", stats.get_sampler);
stats.print_stats("update", stats.set_sampler);
stats.print_stats("op_q", stats.op_sampler);
int total = stats.gets + stats.sets;
printf("\nTotal QPS = %.1f (%d / %.1fs)\n\n",
total / (stats.stop - stats.start),
total, stats.stop - stats.start);
if (args.search_given && peak_qps > 0.0)
printf("Peak QPS = %.1f\n\n", peak_qps);
printf("Misses = %" PRIu64 " (%.1f%%)\n\n", stats.get_misses,
(double) stats.get_misses/stats.gets*100);
printf("RX %10" PRIu64 " bytes : %6.1f MB/s\n",
stats.rx_bytes,
(double) stats.rx_bytes / 1024 / 1024 / (stats.stop - stats.start));
printf("TX %10" PRIu64 " bytes : %6.1f MB/s\n",
stats.tx_bytes,
(double) stats.tx_bytes / 1024 / 1024 / (stats.stop - stats.start));
}
// if (args.threads_arg > 1)
pthread_barrier_destroy(&barrier);
#ifdef HAVE_LIBZMQ
if (args.agent_given) {
for (auto i: agent_sockets) delete i;
}
#endif
// evdns_base_free(evdns, 0);
// event_base_free(base);
cmdline_parser_free(&args);
}
void go(const vector<string>& servers, options_t& options,
ConnectionStats &stats
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket
#endif
) {
#ifdef HAVE_LIBZMQ
if (args.agent_given > 0) {
prep_agent(servers, options);
}
#endif
if (options.threads > 1) {
pthread_t pt[options.threads];
struct thread_data td[options.threads];
vector<string> ts[options.threads];
for (int t = 0; t < options.threads; t++) {
td[t].options = &options;
#ifdef HAVE_LIBZMQ
td[t].socket = socket;
#endif
if (t == 0) td[t].master = true;
else td[t].master = false;
if (options.roundrobin) {
for (unsigned int i = (t % servers.size());
i < servers.size(); i += options.threads)
ts[t].push_back(servers[i % servers.size()]);
td[t].servers = &ts[t];
} else {
td[t].servers = &servers;
}
if (pthread_create(&pt[t], NULL, thread_main, &td[t]))
DIE("pthread_create() failed");
}
for (int t = 0; t < options.threads; t++) {
ConnectionStats *cs;
if (pthread_join(pt[t], (void**) &cs)) DIE("pthread_join() failed");
stats.accumulate(*cs);
delete cs;
}
} else if (options.threads == 1) {
do_mutilate(servers, options, stats, true
#ifdef HAVE_LIBZMQ
, socket
#endif
);
} else {
#ifdef HAVE_LIBZMQ
if (args.agent_given) {
sync_agent(socket);
sync_agent(socket);
}
#endif
}
#ifdef HAVE_LIBZMQ
if (args.agent_given > 0) {
finish_agent(stats);
}
#endif
}
void* thread_main(void *arg) {
struct thread_data *td = (struct thread_data *) arg;
ConnectionStats *cs = new ConnectionStats();
do_mutilate(*td->servers, *td->options, *cs, td->master
#ifdef HAVE_LIBZMQ
, td->socket
#endif
);
return cs;
}
void do_mutilate(const vector<string>& servers, options_t& options,
ConnectionStats& stats, bool master
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket
#endif
) {
int loop_flag =
(options.blocking || args.blocking_given) ? EVLOOP_ONCE : EVLOOP_NONBLOCK;
char *saveptr = NULL; // For reentrant strtok().
struct event_base *base;
struct evdns_base *evdns;
if ((base = event_base_new()) == NULL) DIE("event_base_new() fail");
// evthread_use_pthreads();
if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns");
event_base_priority_init(base, 2);
// FIXME: May want to move this to after all connections established.
double start = get_time();
double now = start;
vector<Connection*> connections;
vector<Connection*> server_lead;
for (auto s: servers) {
// Split args.server_arg[s] into host:port using strtok().
char *s_copy = new char[s.length() + 1];
strcpy(s_copy, s.c_str());
char *h_ptr = strtok_r(s_copy, ":", &saveptr);
char *p_ptr = strtok_r(NULL, ":", &saveptr);
if (h_ptr == NULL) DIE("strtok(.., \":\") failed to parse %s", s.c_str());
string hostname = h_ptr;
string port = "11211";
if (p_ptr) port = p_ptr;
delete[] s_copy;
for (int c = 0; c < options.connections; c++) {
Connection* conn = new Connection(base, evdns, hostname, port, options,
args.agentmode_given ? false :
true);
connections.push_back(conn);
if (c == 0) server_lead.push_back(conn);
}
}
// Wait for all Connections to become IDLE.
while (1) {
// FIXME: If all connections become ready before event_base_loop
// is called, this will deadlock.
event_base_loop(base, EVLOOP_ONCE);
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) continue;
else break;
}
// Load database on lead connection for each server.
if (!options.noload) {
V("Loading database.");
for (auto c: server_lead) c->start_loading();
// Wait for all Connections to become IDLE.
while (1) {
// FIXME: If all connections become ready before event_base_loop
// is called, this will deadlock.
event_base_loop(base, EVLOOP_ONCE);
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) continue;
else break;
}
}
if (options.loadonly) {
evdns_base_free(evdns, 0);
event_base_free(base);
return;
}
// FIXME: Remove. Not needed, testing only.
// // FIXME: Synchronize start_time here across threads/nodes.
// pthread_barrier_wait(&barrier);
// Warmup connection.
if (options.warmup > 0) {
if (master) V("Warmup start.");
#ifdef HAVE_LIBZMQ
if (args.agent_given || args.agentmode_given) {
if (master) V("Synchronizing.");
if (master) sync_agent(socket);
pthread_barrier_wait(&barrier);
if (master) sync_agent(socket);
pthread_barrier_wait(&barrier);
if (master) V("Synchronized.");
}
#endif
int old_time = options.time;
// options.time = 1;
start = get_time();
for (Connection *conn: connections) {
conn->start_time = start;
conn->options.time = options.warmup;
conn->drive_write_machine(); // Kick the Connection into motion.
}
while (1) {
event_base_loop(base, loop_flag);
//#ifdef USE_CLOCK_GETTIME
// now = get_time();
//#else
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
now = tv_to_double(&now_tv);
//#endif
bool restart = false;
for (Connection *conn: connections)
if (!conn->check_exit_condition(now))
restart = true;
if (restart) continue;
else break;
}
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) {
// Wait for all Connections to become IDLE.
while (1) {
// FIXME: If there were to use EVLOOP_ONCE and all connections
// become ready before event_base_loop is called, this will
// deadlock. We should check for IDLE before calling
// event_base_loop.
event_base_loop(base, EVLOOP_ONCE); // EVLOOP_NONBLOCK);
bool restart = false;
for (Connection *conn: connections)
if (conn->read_state != Connection::IDLE)
restart = true;
if (restart) continue;
else break;
}
}
// options.time = old_time;
for (Connection *conn: connections) {
conn->reset();
// conn->stats = ConnectionStats();
conn->options.time = old_time;
}
if (master) V("Warmup stop.");
}
// FIXME: Synchronize start_time here across threads/nodes.
pthread_barrier_wait(&barrier);
if (master && args.wait_given) {
if (get_time() < boot_time + args.wait_arg) {
double t = (boot_time + args.wait_arg)-get_time();
V("Sleeping %.1fs for -W.", t);
sleep_time(t);
}
}
#ifdef HAVE_LIBZMQ
if (args.agent_given || args.agentmode_given) {
if (master) V("Synchronizing.");
if (master) sync_agent(socket);
pthread_barrier_wait(&barrier);
if (master) sync_agent(socket);
pthread_barrier_wait(&barrier);
if (master) V("Synchronized.");
}
#endif
start = get_time();
for (Connection *conn: connections) {
conn->start_time = start;
conn->drive_write_machine(); // Kick the Connection into motion.
}
// V("Start = %f", start);
// Main event loop.
while (1) {
event_base_loop(base, loop_flag);
//#if USE_CLOCK_GETTIME
// now = get_time();
//#else
struct timeval now_tv;
event_base_gettimeofday_cached(base, &now_tv);
now = tv_to_double(&now_tv);
//#endif
bool restart = false;
for (Connection *conn: connections)
if (!conn->check_exit_condition(now))
restart = true;
if (restart) continue;
else break;
}
// Tear-down and accumulate stats.
for (Connection *conn: connections) {
stats.accumulate(conn->stats);
delete conn;
}
stats.start = start;
stats.stop = now;
evdns_base_free(evdns, 0);
event_base_free(base);
}
void args_to_options(options_t* options) {
// bzero(options, sizeof(options_t));
options->connections = args.connections_arg;
options->blocking = args.blocking_given;
options->qps = args.qps_arg;
options->threads = args.threads_arg;
options->server_given = args.server_given;
options->roundrobin = args.roundrobin_given;
int connections = options->connections;
if (options->roundrobin) {
connections *= (options->server_given > options->threads ?
options->server_given : options->threads);
} else {
connections *= options->server_given * options->threads;
}
// if (args.agent_given) connections *= (1 + args.agent_given);
options->lambda_denom = connections > 1 ? connections : 1;
if (args.lambda_mul_arg > 1) options->lambda_denom *= args.lambda_mul_arg;
if (options->threads < 1) options->lambda_denom = 0;
options->lambda = (double) options->qps / (double) options->lambda_denom * args.lambda_mul_arg;
// V("%d %d %d %f", options->qps, options->connections,
// connections, options->lambda);
// if (args.no_record_scale_given)
// options->records = args.records_arg;
// else
options->records = args.records_arg / options->server_given;
D("options->records = %d", options->records);
if (!options->records) options->records = 1;
strcpy(options->keysize, args.keysize_arg);
// options->keysize = args.keysize_arg;
strcpy(options->valuesize, args.valuesize_arg);
// options->valuesize = args.valuesize_arg;
options->update = args.update_arg;
options->time = args.time_arg;
options->loadonly = args.loadonly_given;
options->depth = args.depth_arg;
options->no_nodelay = args.no_nodelay_given;
options->noload = args.noload_given;
options->iadist = get_distribution(args.iadist_arg);
strcpy(options->ia, args.iadist_arg);
options->warmup = args.warmup_given ? args.warmup_arg : 0;
options->oob_thread = false;
}
void init_random_stuff() {
static char lorem[] =
R"(Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas
turpis dui, suscipit non vehicula non, malesuada id sem. Phasellus
suscipit nisl ut dui consectetur ultrices tincidunt eros
aliquet. Donec feugiat lectus sed nibh ultrices ultrices. Vestibulum
ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia
Curae; Mauris suscipit eros sed justo lobortis at ultrices lacus
molestie. Duis in diam mi. Cum sociis natoque penatibus et magnis dis
parturient montes, nascetur ridiculus mus. Ut cursus viverra
sagittis. Vivamus non facilisis tortor. Integer lectus arcu, sagittis
et eleifend rutrum, condimentum eget sem. Vestibulum tempus tellus non
risus semper semper. Morbi molestie rhoncus mi, in egestas dui
facilisis et.)";
size_t cursor = 0;
while (cursor < sizeof(random_char)) {
size_t max = sizeof(lorem);
if (sizeof(random_char) - cursor < max)
max = sizeof(random_char) - cursor;
memcpy(&random_char[cursor], lorem, max);
cursor += max;
}
}

17
mutilate.h Normal file
View File

@ -0,0 +1,17 @@
#ifndef MUTILATE_H
#define MUTILATE_H
#include "cmdline.h"
#define USE_CACHED_TIME 0
#define MINIMUM_KEY_LENGTH 2
#define MAXIMUM_CONNECTIONS 512
#define MAX_SAMPLES 100000
#define LOADER_CHUNK 1024
extern char random_char[];
extern gengetopt_args_info args;
#endif // MUTILATE_H

31
util.cc Normal file
View File

@ -0,0 +1,31 @@
#include <inttypes.h>
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#include "mutilate.h"
#include "util.h"
void sleep_time(double duration) {
if (duration > 0) usleep((useconds_t) (duration * 1000000));
}
#define FNV_64_PRIME (0x100000001b3ULL)
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
uint64_t fnv_64_buf(const void* buf, size_t len) {
uint64_t hval = FNV1_64_INIT;
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
while (bp < be) {
hval ^= (uint64_t)*bp++;
hval *= FNV_64_PRIME;
}
return hval;
}
void generate_key(int n, int length, char *buf) {
snprintf(buf, length + 1, "%0*d", length, n);
}

52
util.h Normal file
View File

@ -0,0 +1,52 @@
#ifndef UTIL_H
#define UTIL_H
#include <sys/time.h>
#include <time.h>
inline double tv_to_double(struct timeval *tv) {
return tv->tv_sec + (double) tv->tv_usec / 1000000;
}
inline void double_to_tv(double val, struct timeval *tv) {
long long secs = (long long) val;
long long usecs = (long long) ((val - secs) * 1000000);
tv->tv_sec = secs;
tv->tv_usec = usecs;
}
inline double get_time_accurate() {
#if USE_CLOCK_GETTIME
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec + (double) ts.tv_nsec / 1000000000;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
return tv_to_double(&tv);
#endif
}
inline double get_time() {
//#if USE_CLOCK_GETTIME
// struct timespec ts;
// clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
// // clock_gettime(CLOCK_REALTIME, &ts);
// return ts.tv_sec + (double) ts.tv_nsec / 1000000000;
//#else
struct timeval tv;
gettimeofday(&tv, NULL);
return tv_to_double(&tv);
//#endif
}
void sleep_time(double duration);
uint64_t fnv_64_buf(const void* buf, size_t len);
inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); }
void generate_key(int n, int length, char *buf);
#endif // UTIL_H