Misc. experimental features. (1) thread affinity (2) event_base_flag_precise_timer (3) configurable agent port (4) per-connection qps moderation

This commit is contained in:
Jacob Leverich 2013-04-15 13:27:06 -07:00
parent 86ad565956
commit ebd96ada3c
8 changed files with 89 additions and 12 deletions

View File

@ -37,6 +37,8 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
read_state = INIT_READ; read_state = INIT_READ;
write_state = INIT_WRITE; write_state = INIT_WRITE;
last_tx = last_rx = 0.0;
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this); bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this);
bufferevent_enable(bev, EV_READ | EV_WRITE); bufferevent_enable(bev, EV_READ | EV_WRITE);
@ -251,9 +253,24 @@ void Connection::drive_write_machine(double now) {
write_state = WAITING_FOR_TIME; write_state = WAITING_FOR_TIME;
break; // We want to run through the state machine one more time break; // We want to run through the state machine one more time
// to make sure the timer is armed. // to make sure the timer is armed.
// } else if (options.moderate && options.lambda > 0.0 &&
// now < last_rx + 0.25 / options.lambda) {
} else if (options.moderate && now < last_rx + 0.00025) {
write_state = WAITING_FOR_TIME;
if (!event_pending(timer, EV_TIMEOUT, NULL)) {
// delay = last_rx + 0.25 / options.lambda - now;
delay = last_rx + 0.00025 - now;
// I("MODERATE %f %f %f %f %f", now - last_rx, 0.25/options.lambda,
// 1/options.lambda, now-last_tx, delay);
double_to_tv(delay, &tv);
evtimer_add(timer, &tv);
}
return;
} }
issue_something(now); issue_something(now);
last_tx = now;
stats.log_op(op_queue.size()); stats.log_op(op_queue.size());
next_time += iagen->generate(); next_time += iagen->generate();
@ -262,7 +279,7 @@ void Connection::drive_write_machine(double now) {
now - next_time > 0.005000 && now - next_time > 0.005000 &&
op_queue.size() >= (size_t) options.depth) { op_queue.size() >= (size_t) options.depth) {
while (next_time < now) { while (next_time < now - 0.004000) {
stats.skips++; stats.skips++;
next_time += iagen->generate(); next_time += iagen->generate();
} }
@ -367,6 +384,7 @@ void Connection::read_callback() {
#endif #endif
stats.log_get(*op); stats.log_get(*op);
last_rx = now;
pop_op(); pop_op();
drive_write_machine(now); drive_write_machine(now);
break; break;
@ -399,6 +417,7 @@ void Connection::read_callback() {
free(buf); free(buf);
last_rx = now;
pop_op(); pop_op();
drive_write_machine(); drive_write_machine();
break; break;
@ -453,6 +472,7 @@ void Connection::read_callback() {
free(buf); free(buf);
last_rx = now;
pop_op(); pop_op();
drive_write_machine(now); drive_write_machine(now);
break; break;
@ -484,6 +504,7 @@ void Connection::read_callback() {
if (!options.binary) if (!options.binary)
free(buf); free(buf);
last_rx = now;
pop_op(); pop_op();
drive_write_machine(now); drive_write_machine(now);
break; break;

View File

@ -91,7 +91,10 @@ private:
struct bufferevent *bev; struct bufferevent *bev;
struct event *timer; // Used to control inter-transmission time. struct event *timer; // Used to control inter-transmission time.
double lambda, next_time; // Inter-transmission time parameters. // double lambda;
double next_time; // Inter-transmission time parameters.
double last_rx; // Used to moderate transmission rate.
double last_tx;
int data_length; // When waiting for data, how much we're peeking for. int data_length; // When waiting for data, how much we're peeking for.

View File

@ -40,6 +40,8 @@ typedef struct {
int lambda_denom; int lambda_denom;
bool oob_thread; bool oob_thread;
bool moderate;
} options_t; } options_t;
#endif // CONNECTIONOPTIONS_H #endif // CONNECTIONOPTIONS_H

View File

@ -124,7 +124,7 @@ class ConnectionStats {
static void print_header() { static void print_header() {
printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n", printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n",
"#type", "avg", "min", "1st", "5th", "10th", "#type", "avg", "std", "min", /*"1st",*/ "5th", "10th",
"90th", "95th", "99th"); "90th", "95th", "99th");
} }
@ -200,8 +200,8 @@ class ConnectionStats {
} }
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, sampler.average(), tag, sampler.average(), sampler.stddev(),
sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5), sampler.get_nth(0), /*sampler.get_nth(1),*/ sampler.get_nth(5),
sampler.get_nth(10), sampler.get_nth(90), sampler.get_nth(10), sampler.get_nth(90),
sampler.get_nth(95), sampler.get_nth(99)); sampler.get_nth(95), sampler.get_nth(99));

View File

@ -17,6 +17,7 @@ public:
std::vector<uint64_t> bins; std::vector<uint64_t> bins;
double sum; double sum;
double sum_sq;
LogHistogramSampler() = delete; LogHistogramSampler() = delete;
LogHistogramSampler(int _bins) : sum(0.0) { LogHistogramSampler(int _bins) : sum(0.0) {
@ -34,6 +35,7 @@ public:
size_t bin = log(s)/log(_POW); size_t bin = log(s)/log(_POW);
sum += s; sum += s;
sum_sq += s*s;
// I("%f", sum); // I("%f", sum);
@ -51,6 +53,11 @@ public:
return sum / total(); return sum / total();
} }
double stddev() {
// I("%f %d", sum, total());
return sqrt(sum_sq / total() - pow(sum / total(), 2.0));
}
double minimum() { double minimum() {
for (size_t i = 0; i < bins.size(); i++) for (size_t i = 0; i < bins.size(); i++)
if (bins[i] > 0) return pow(_POW, (double) i + 0.5); if (bins[i] > 0) return pow(_POW, (double) i + 0.5);
@ -89,6 +96,7 @@ public:
for (size_t i = 0; i < bins.size(); i++) bins[i] += h.bins[i]; for (size_t i = 0; i < bins.size(); i++) bins[i] += h.bins[i];
sum += h.sum; sum += h.sum;
sum_sq += h.sum_sq;
} }
}; };

View File

@ -31,7 +31,7 @@ if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"):
Exit(1) Exit(1)
conf.CheckLib("rt", "clock_gettime", language="C++") conf.CheckLib("rt", "clock_gettime", language="C++")
conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++") conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++")
conf.CheckFunc('clock_gettime') # conf.CheckFunc('clock_gettime')
if not conf.CheckFunc('pthread_barrier_init'): if not conf.CheckFunc('pthread_barrier_init'):
conf.env['HAVE_POSIX_BARRIER'] = False conf.env['HAVE_POSIX_BARRIER'] = False

View File

@ -32,17 +32,20 @@ text "\nAdvanced options:"
option "username" U "Username to use for SASL authentication." string option "username" U "Username to use for SASL authentication." string
option "password" P "Password to use for SASL authentication." string option "password" P "Password to use for SASL authentication." string
option "threads" T "Number of threads to spawn." int default="1" option "threads" T "Number of threads to spawn." int default="1"
option "affinity" - "Set CPU affinity for threads, round-robin"
option "connections" c "Connections to establish per server." int default="1" option "connections" c "Connections to establish per server." int default="1"
option "depth" d "Maximum depth to pipeline requests." int default="1" option "depth" d "Maximum depth to pipeline requests." int default="1"
option "roundrobin" R "Assign threads to servers in round-robin fashion. \ option "roundrobin" R "Assign threads to servers in round-robin fashion. \
By default, each thread connects to every server." By default, each thread connects to every server."
option "cork" - "Minimum timer interval, in usecs. (experimental)" int
option "iadist" i "Inter-arrival distribution (distribution). Note: \ option "iadist" i "Inter-arrival distribution (distribution). Note: \
The distribution will automatically be adjusted to match the QPS given \ The distribution will automatically be adjusted to match the QPS given \
by --qps." string default="exponential" by --qps." string default="exponential"
option "skip" S "Skip transmissions if previous requests are late. This \ option "skip" S "Skip transmissions if previous requests are late. This \
harms the long-term QPS average, but reduces spikes in QPS after \ harms the long-term QPS average, but reduces spikes in QPS after \
long latency requests." long latency requests."
option "moderate" - "Enforce a minimum delay of ~1/lambda between requests."
option "noload" - "Skip database loading." option "noload" - "Skip database loading."
option "loadonly" - "Load database and then exit." option "loadonly" - "Load database and then exit."
@ -62,6 +65,7 @@ option "scan" - "Scan latency across QPS rates from min to max."
text "\nAgent-mode options:" text "\nAgent-mode options:"
option "agentmode" A "Run client in agent mode." option "agentmode" A "Run client in agent mode."
option "agent" a "Enlist remote agent." string typestr="host" multiple option "agent" a "Enlist remote agent." string typestr="host" multiple
option "agent_port" p "Agent port." string default="5556"
option "lambda_mul" l "Lambda multiplier. Increases share of \ option "lambda_mul" l "Lambda multiplier. Increases share of \
QPS for this client." int default="1" QPS for this client." int default="1"
option "measure_connections" C "Master client connections per server, \ option "measure_connections" C "Master client connections per server, \

View File

@ -157,7 +157,7 @@ void agent() {
zmq::context_t context(1); zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP); zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555"); socket.bind((string("tcp://*:")+string(args.agent_port_arg)).c_str());
while (true) { while (true) {
zmq::message_t request; zmq::message_t request;
@ -456,7 +456,8 @@ int main(int argc, char **argv) {
} else if (args.agent_given) { } else if (args.agent_given) {
for (unsigned int i = 0; i < args.agent_given; i++) { for (unsigned int i = 0; i < args.agent_given; i++) {
zmq::socket_t *s = new zmq::socket_t(context, ZMQ_REQ); zmq::socket_t *s = new zmq::socket_t(context, ZMQ_REQ);
string host = string("tcp://") + string(args.agent_arg[i]) + string(":5555"); string host = string("tcp://") + string(args.agent_arg[i]) +
string(":") + string(args.agent_port_arg);
s->connect(host.c_str()); s->connect(host.c_str());
agent_sockets.push_back(s); agent_sockets.push_back(s);
} }
@ -600,7 +601,8 @@ int main(int argc, char **argv) {
printf("Misses = %" PRIu64 " (%.1f%%)\n", stats.get_misses, printf("Misses = %" PRIu64 " (%.1f%%)\n", stats.get_misses,
(double) stats.get_misses/stats.gets*100); (double) stats.get_misses/stats.gets*100);
printf("Skipped TXs = %" PRIu64 "\n\n", stats.skips); printf("Skipped TXs = %" PRIu64 " (%.1f%%)\n\n", stats.skips,
(double) stats.skips / total * 100);
printf("RX %10" PRIu64 " bytes : %6.1f MB/s\n", printf("RX %10" PRIu64 " bytes : %6.1f MB/s\n",
stats.rx_bytes, stats.rx_bytes,
@ -646,6 +648,8 @@ void go(const vector<string>& servers, options_t& options,
vector<string> ts[options.threads]; vector<string> ts[options.threads];
#endif #endif
int current_cpu = -1;
for (int t = 0; t < options.threads; t++) { for (int t = 0; t < options.threads; t++) {
td[t].options = &options; td[t].options = &options;
#ifdef HAVE_LIBZMQ #ifdef HAVE_LIBZMQ
@ -664,7 +668,32 @@ void go(const vector<string>& servers, options_t& options,
td[t].servers = &servers; td[t].servers = &servers;
} }
if (pthread_create(&pt[t], NULL, thread_main, &td[t])) pthread_attr_t attr;
pthread_attr_init(&attr);
if (args.affinity_given) {
int max_cpus = 8 * sizeof(cpu_set_t);
cpu_set_t m;
CPU_ZERO(&m);
sched_getaffinity(0, sizeof(cpu_set_t), &m);
for (int i = 0; i < max_cpus; i++) {
int c = (current_cpu + i + 1) % max_cpus;
if (CPU_ISSET(c, &m)) {
CPU_ZERO(&m);
CPU_SET(c, &m);
int ret;
if ((ret = pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &m)))
DIE("pthread_attr_setaffinity_np(%d) failed: %s",
c, strerror(ret));
current_cpu = c;
break;
}
}
}
if (pthread_create(&pt[t], &attr, thread_main, &td[t]))
DIE("pthread_create() failed"); DIE("pthread_create() failed");
} }
@ -728,13 +757,21 @@ void do_mutilate(const vector<string>& servers, options_t& options,
struct event_base *base; struct event_base *base;
struct evdns_base *evdns; struct evdns_base *evdns;
struct event_config *config;
if ((config = event_config_new()) == NULL) DIE("event_config_new() fail");
if (event_config_set_flag(config, EVENT_BASE_FLAG_PRECISE_TIMER))
DIE("event_config_set_flag(EVENT_BASE_FLAG_PRECISE_TIMER) fail");
if ((base = event_base_new_with_config(config)) == NULL)
DIE("event_base_new() fail");
if ((base = event_base_new()) == NULL) DIE("event_base_new() fail");
// evthread_use_pthreads(); // evthread_use_pthreads();
if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns"); if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns");
event_base_priority_init(base, 2); // event_base_priority_init(base, 2);
// FIXME: May want to move this to after all connections established. // FIXME: May want to move this to after all connections established.
double start = get_time(); double start = get_time();
@ -964,6 +1001,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
stats.start = start; stats.start = start;
stats.stop = now; stats.stop = now;
event_config_free(config);
evdns_base_free(evdns, 0); evdns_base_free(evdns, 0);
event_base_free(base); event_base_free(base);
} }
@ -1033,6 +1071,7 @@ void args_to_options(options_t* options) {
options->warmup = args.warmup_given ? args.warmup_arg : 0; options->warmup = args.warmup_given ? args.warmup_arg : 0;
options->oob_thread = false; options->oob_thread = false;
options->skip = args.skip_given; options->skip = args.skip_given;
options->moderate = args.moderate_given;
} }
void init_random_stuff() { void init_random_stuff() {