From ebd96ada3c9d0733507bfa0ed33860419945a6ae Mon Sep 17 00:00:00 2001 From: Jacob Leverich Date: Mon, 15 Apr 2013 13:27:06 -0700 Subject: [PATCH] Misc. experimental features. (1) thread affinity (2) event_base_flag_precise_timer (3) configurable agent port (4) per-connection qps moderation --- Connection.cc | 23 ++++++++++++++++++- Connection.h | 5 ++++- ConnectionOptions.h | 2 ++ ConnectionStats.h | 6 ++--- LogHistogramSampler.h | 8 +++++++ SConstruct | 2 +- cmdline.ggo | 4 ++++ mutilate.cc | 51 ++++++++++++++++++++++++++++++++++++++----- 8 files changed, 89 insertions(+), 12 deletions(-) diff --git a/Connection.cc b/Connection.cc index 584fa9c..279b6af 100644 --- a/Connection.cc +++ b/Connection.cc @@ -37,6 +37,8 @@ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, read_state = INIT_READ; write_state = INIT_WRITE; + last_tx = last_rx = 0.0; + bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, bev_read_cb, bev_write_cb, bev_event_cb, this); bufferevent_enable(bev, EV_READ | EV_WRITE); @@ -251,9 +253,24 @@ void Connection::drive_write_machine(double now) { write_state = WAITING_FOR_TIME; break; // We want to run through the state machine one more time // to make sure the timer is armed. + // } else if (options.moderate && options.lambda > 0.0 && + // now < last_rx + 0.25 / options.lambda) { + } else if (options.moderate && now < last_rx + 0.00025) { + write_state = WAITING_FOR_TIME; + if (!event_pending(timer, EV_TIMEOUT, NULL)) { + // delay = last_rx + 0.25 / options.lambda - now; + delay = last_rx + 0.00025 - now; + // I("MODERATE %f %f %f %f %f", now - last_rx, 0.25/options.lambda, + // 1/options.lambda, now-last_tx, delay); + + double_to_tv(delay, &tv); + evtimer_add(timer, &tv); + } + return; } issue_something(now); + last_tx = now; stats.log_op(op_queue.size()); next_time += iagen->generate(); @@ -262,7 +279,7 @@ void Connection::drive_write_machine(double now) { now - next_time > 0.005000 && op_queue.size() >= (size_t) options.depth) { - while (next_time < now) { + while (next_time < now - 0.004000) { stats.skips++; next_time += iagen->generate(); } @@ -367,6 +384,7 @@ void Connection::read_callback() { #endif stats.log_get(*op); + last_rx = now; pop_op(); drive_write_machine(now); break; @@ -399,6 +417,7 @@ void Connection::read_callback() { free(buf); + last_rx = now; pop_op(); drive_write_machine(); break; @@ -453,6 +472,7 @@ void Connection::read_callback() { free(buf); + last_rx = now; pop_op(); drive_write_machine(now); break; @@ -484,6 +504,7 @@ void Connection::read_callback() { if (!options.binary) free(buf); + last_rx = now; pop_op(); drive_write_machine(now); break; diff --git a/Connection.h b/Connection.h index f157db7..00e238b 100644 --- a/Connection.h +++ b/Connection.h @@ -91,7 +91,10 @@ private: struct bufferevent *bev; struct event *timer; // Used to control inter-transmission time. - double lambda, next_time; // Inter-transmission time parameters. + // double lambda; + double next_time; // Inter-transmission time parameters. + double last_rx; // Used to moderate transmission rate. + double last_tx; int data_length; // When waiting for data, how much we're peeking for. diff --git a/ConnectionOptions.h b/ConnectionOptions.h index 5851b4e..ba3d70c 100644 --- a/ConnectionOptions.h +++ b/ConnectionOptions.h @@ -40,6 +40,8 @@ typedef struct { int lambda_denom; bool oob_thread; + + bool moderate; } options_t; #endif // CONNECTIONOPTIONS_H diff --git a/ConnectionStats.h b/ConnectionStats.h index b3fb6a7..e957c19 100644 --- a/ConnectionStats.h +++ b/ConnectionStats.h @@ -124,7 +124,7 @@ class ConnectionStats { static void print_header() { printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n", - "#type", "avg", "min", "1st", "5th", "10th", + "#type", "avg", "std", "min", /*"1st",*/ "5th", "10th", "90th", "95th", "99th"); } @@ -200,8 +200,8 @@ class ConnectionStats { } printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", - tag, sampler.average(), - sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5), + tag, sampler.average(), sampler.stddev(), + sampler.get_nth(0), /*sampler.get_nth(1),*/ sampler.get_nth(5), sampler.get_nth(10), sampler.get_nth(90), sampler.get_nth(95), sampler.get_nth(99)); diff --git a/LogHistogramSampler.h b/LogHistogramSampler.h index 44fada3..1fa4301 100644 --- a/LogHistogramSampler.h +++ b/LogHistogramSampler.h @@ -17,6 +17,7 @@ public: std::vector bins; double sum; + double sum_sq; LogHistogramSampler() = delete; LogHistogramSampler(int _bins) : sum(0.0) { @@ -34,6 +35,7 @@ public: size_t bin = log(s)/log(_POW); sum += s; + sum_sq += s*s; // I("%f", sum); @@ -51,6 +53,11 @@ public: return sum / total(); } + double stddev() { + // I("%f %d", sum, total()); + return sqrt(sum_sq / total() - pow(sum / total(), 2.0)); + } + double minimum() { for (size_t i = 0; i < bins.size(); i++) if (bins[i] > 0) return pow(_POW, (double) i + 0.5); @@ -89,6 +96,7 @@ public: for (size_t i = 0; i < bins.size(); i++) bins[i] += h.bins[i]; sum += h.sum; + sum_sq += h.sum_sq; } }; diff --git a/SConstruct b/SConstruct index e8a5b8e..ee6f099 100644 --- a/SConstruct +++ b/SConstruct @@ -31,7 +31,7 @@ if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"): Exit(1) conf.CheckLib("rt", "clock_gettime", language="C++") conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++") -conf.CheckFunc('clock_gettime') +# conf.CheckFunc('clock_gettime') if not conf.CheckFunc('pthread_barrier_init'): conf.env['HAVE_POSIX_BARRIER'] = False diff --git a/cmdline.ggo b/cmdline.ggo index b342900..ffe6531 100644 --- a/cmdline.ggo +++ b/cmdline.ggo @@ -32,17 +32,20 @@ text "\nAdvanced options:" option "username" U "Username to use for SASL authentication." string option "password" P "Password to use for SASL authentication." string option "threads" T "Number of threads to spawn." int default="1" +option "affinity" - "Set CPU affinity for threads, round-robin" option "connections" c "Connections to establish per server." int default="1" option "depth" d "Maximum depth to pipeline requests." int default="1" option "roundrobin" R "Assign threads to servers in round-robin fashion. \ By default, each thread connects to every server." +option "cork" - "Minimum timer interval, in usecs. (experimental)" int option "iadist" i "Inter-arrival distribution (distribution). Note: \ The distribution will automatically be adjusted to match the QPS given \ by --qps." string default="exponential" option "skip" S "Skip transmissions if previous requests are late. This \ harms the long-term QPS average, but reduces spikes in QPS after \ long latency requests." +option "moderate" - "Enforce a minimum delay of ~1/lambda between requests." option "noload" - "Skip database loading." option "loadonly" - "Load database and then exit." @@ -62,6 +65,7 @@ option "scan" - "Scan latency across QPS rates from min to max." text "\nAgent-mode options:" option "agentmode" A "Run client in agent mode." option "agent" a "Enlist remote agent." string typestr="host" multiple +option "agent_port" p "Agent port." string default="5556" option "lambda_mul" l "Lambda multiplier. Increases share of \ QPS for this client." int default="1" option "measure_connections" C "Master client connections per server, \ diff --git a/mutilate.cc b/mutilate.cc index cacf4db..00d6aa7 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -157,7 +157,7 @@ void agent() { zmq::context_t context(1); zmq::socket_t socket(context, ZMQ_REP); - socket.bind("tcp://*:5555"); + socket.bind((string("tcp://*:")+string(args.agent_port_arg)).c_str()); while (true) { zmq::message_t request; @@ -456,7 +456,8 @@ int main(int argc, char **argv) { } else if (args.agent_given) { for (unsigned int i = 0; i < args.agent_given; i++) { zmq::socket_t *s = new zmq::socket_t(context, ZMQ_REQ); - string host = string("tcp://") + string(args.agent_arg[i]) + string(":5555"); + string host = string("tcp://") + string(args.agent_arg[i]) + + string(":") + string(args.agent_port_arg); s->connect(host.c_str()); agent_sockets.push_back(s); } @@ -600,7 +601,8 @@ int main(int argc, char **argv) { printf("Misses = %" PRIu64 " (%.1f%%)\n", stats.get_misses, (double) stats.get_misses/stats.gets*100); - printf("Skipped TXs = %" PRIu64 "\n\n", stats.skips); + printf("Skipped TXs = %" PRIu64 " (%.1f%%)\n\n", stats.skips, + (double) stats.skips / total * 100); printf("RX %10" PRIu64 " bytes : %6.1f MB/s\n", stats.rx_bytes, @@ -646,6 +648,8 @@ void go(const vector& servers, options_t& options, vector ts[options.threads]; #endif + int current_cpu = -1; + for (int t = 0; t < options.threads; t++) { td[t].options = &options; #ifdef HAVE_LIBZMQ @@ -664,7 +668,32 @@ void go(const vector& servers, options_t& options, td[t].servers = &servers; } - if (pthread_create(&pt[t], NULL, thread_main, &td[t])) + pthread_attr_t attr; + pthread_attr_init(&attr); + + if (args.affinity_given) { + int max_cpus = 8 * sizeof(cpu_set_t); + cpu_set_t m; + CPU_ZERO(&m); + sched_getaffinity(0, sizeof(cpu_set_t), &m); + + for (int i = 0; i < max_cpus; i++) { + int c = (current_cpu + i + 1) % max_cpus; + if (CPU_ISSET(c, &m)) { + CPU_ZERO(&m); + CPU_SET(c, &m); + int ret; + if ((ret = pthread_attr_setaffinity_np(&attr, + sizeof(cpu_set_t), &m))) + DIE("pthread_attr_setaffinity_np(%d) failed: %s", + c, strerror(ret)); + current_cpu = c; + break; + } + } + } + + if (pthread_create(&pt[t], &attr, thread_main, &td[t])) DIE("pthread_create() failed"); } @@ -728,13 +757,21 @@ void do_mutilate(const vector& servers, options_t& options, struct event_base *base; struct evdns_base *evdns; + struct event_config *config; + + if ((config = event_config_new()) == NULL) DIE("event_config_new() fail"); + + if (event_config_set_flag(config, EVENT_BASE_FLAG_PRECISE_TIMER)) + DIE("event_config_set_flag(EVENT_BASE_FLAG_PRECISE_TIMER) fail"); + + if ((base = event_base_new_with_config(config)) == NULL) + DIE("event_base_new() fail"); - if ((base = event_base_new()) == NULL) DIE("event_base_new() fail"); // evthread_use_pthreads(); if ((evdns = evdns_base_new(base, 1)) == 0) DIE("evdns"); - event_base_priority_init(base, 2); + // event_base_priority_init(base, 2); // FIXME: May want to move this to after all connections established. double start = get_time(); @@ -964,6 +1001,7 @@ void do_mutilate(const vector& servers, options_t& options, stats.start = start; stats.stop = now; + event_config_free(config); evdns_base_free(evdns, 0); event_base_free(base); } @@ -1033,6 +1071,7 @@ void args_to_options(options_t* options) { options->warmup = args.warmup_given ? args.warmup_arg : 0; options->oob_thread = false; options->skip = args.skip_given; + options->moderate = args.moderate_given; } void init_random_stuff() {