1. Implemented --measure_* options to reasonably separate

configuration of measurement client (mutilate master) from
load generating clients (mutilate agents).  Use -C and
--measure_depth to approximate open-loop measurement control
(i.e. so long latency requests don't screw up the sample
population), and use -Q to ensure that you see minimal
client-side queuing delay.

2. Quickly documented agent "protocol".  Lots of improvement
to be had here...
This commit is contained in:
Jacob Leverich 2013-03-08 13:19:47 -08:00
parent 960b30dc7b
commit fc0f1a78a4
2 changed files with 135 additions and 6 deletions

View File

@ -59,8 +59,25 @@ option "scan" - "Scan latency across QPS rates from min to max."
text "\nAgent-mode options:" text "\nAgent-mode options:"
option "agentmode" A "Run client in agent mode." option "agentmode" A "Run client in agent mode."
option "agent" a "Enlist remote agent." string typestr="host" multiple option "agent" a "Enlist remote agent." string typestr="host" multiple
option "lambda_mul" l "Lambda multiplier. Increases share of QPS for this client." int default="1" option "lambda_mul" l "Lambda multiplier. Increases share of \
option "measure_mode" M "Master client ignores --connections, only uses 1 connection per thread." QPS for this client." int default="1"
option "measure_connections" C "Master client connections per server, \
overrides --connections." int
option "measure_qps" Q "Explicitly set master client QPS, \
spread across threads and connections." int
option "measure_depth" - "Set master client depth." int
text "
The --measure_* options aid in taking latency measurements of the
memcached server without incurring significant client-side queuing
delay. --measure_connections allows the master to override the
--connections option. --measure_depth allows the master to operate as
an \"open-loop\" client while other agents continue as a regular
closed-loop clients. --measure_qps lets you modulate the QPS the
master queries at independent of other clients. This theoretically
normalizes the baseline queuing delay you expect to see across a wide
range of --qps values.
"
text " text "
Some options take a 'distribution' as an argument. Some options take a 'distribution' as an argument.

View File

@ -99,6 +99,60 @@ static bool s_send (zmq::socket_t &socket, const std::string &string) {
return socket.send(message); return socket.send(message);
} }
/*
* Agent protocol
*
* PREPARATION PHASE
*
* 1. Master -> Agent: options_t
*
* options_t contains most of the information needed to drive the
* client, including the aggregate QPS that has been requested.
* However, neither the master nor the agent know at this point how
* many total connections will be made to the memcached server.
*
* 2. Agent -> Master: int num = (--threads) * (--lambda_mul)
*
* The agent sends a number to the master indicating how many threads
* this mutilate agent will spawn, and a mutiplier that weights how
* many QPS this agent's connections will send relative to unweighted
* connections (i.e. we can request that a purely load-generating
* agent or an agent on a really fast network connection be more
* aggressive than other agents or the master).
*
* 3. Master -> Agent: lambda_denom
*
* The master aggregates all of the numbers collected in (2) and
* computes a global "lambda_denom". Which is essentially a count of
* the total number of Connections across all mutilate instances,
* weighted by lambda_mul if necessary. It broadcasts this number to
* all agents.
*
* Each instance of mutilate at this point adjusts the lambda in
* options_t sent in (1) to account for lambda_denom. Note that
* lambda_mul is specific to each instance of mutilate
* (i.e. --lambda_mul X) and not sent as part of options_t.
*
* lambda = qps / lambda_denom * args.lambda_mul;
*
* RUN PHASE
*
* After the PREP phase completes, everyone executes do_mutilate().
* All clients spawn threads, open connections, load the DB, and wait
* for all connections to become IDLE. Following that, they
* synchronize and finally do the heavy lifting.
*
* [IF WARMUP] -1: Master <-> Agent: Synchronize
* [IF WARMUP] 0: Everyone: RUN for options.warmup seconds.
* 1. Master <-> Agent: Synchronize
* 2. Everyone: RUN for options.time seconds.
* 3. Master -> Agent: Dummy message
* 4. Agent -> Master: Send AgentStats [w/ RX/TX bytes, # gets/sets]
*
* The master then aggregates AgentStats across all agents with its
* own ConnectionStats to compute overall statistics.
*/
void agent() { void agent() {
zmq::context_t context(1); zmq::context_t context(1);
@ -137,6 +191,9 @@ void agent() {
// V("AGENT SLEEPS"); sleep(1); // V("AGENT SLEEPS"); sleep(1);
options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg; options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg;
V("lambda_denom = %d, lambda = %f, qps = %d",
options.lambda_denom, options.lambda, options.qps);
// if (options.threads > 1) // if (options.threads > 1)
pthread_barrier_init(&barrier, NULL, options.threads); pthread_barrier_init(&barrier, NULL, options.threads);
@ -163,7 +220,15 @@ void agent() {
} }
void prep_agent(const vector<string>& servers, options_t& options) { void prep_agent(const vector<string>& servers, options_t& options) {
int sum = args.measure_mode_given ? options.server_given * options.threads : options.lambda_denom; int sum = options.lambda_denom;
if (args.measure_connections_given)
sum = args.measure_connections_arg * options.server_given * options.threads;
int master_sum = sum;
if (args.measure_qps_given) {
sum = 0;
if (options.qps) options.qps -= args.measure_qps_arg;
}
for (auto s: agent_sockets) { for (auto s: agent_sockets) {
zmq::message_t message(sizeof(options_t)); zmq::message_t message(sizeof(options_t));
@ -182,15 +247,28 @@ void prep_agent(const vector<string>& servers, options_t& options) {
for (auto i: servers) { for (auto i: servers) {
s_send(*s, i); s_send(*s, i);
string rep = s_recv(*s); string rep = s_recv(*s);
// V("Reply: %s", rep.c_str());
} }
} }
// Adjust options_t according to --measure_* arguments.
options.lambda_denom = sum; options.lambda_denom = sum;
options.lambda = (double) options.qps / options.lambda_denom * args.lambda_mul_arg; options.lambda = (double) options.qps / options.lambda_denom *
args.lambda_mul_arg;
V("lambda_denom = %d", sum); V("lambda_denom = %d", sum);
if (args.measure_qps_given) {
double master_lambda = (double) args.measure_qps_arg / master_sum;
if (options.qps && master_lambda > options.lambda)
V("warning: master_lambda (%f) > options.lambda (%f)",
master_lambda, options.lambda);
options.lambda = master_lambda;
}
if (args.measure_depth_given) options.depth = args.measure_depth_arg;
for (auto s: agent_sockets) { for (auto s: agent_sockets) {
zmq::message_t message(sizeof(sum)); zmq::message_t message(sizeof(sum));
*((int *) message.data()) = sum; *((int *) message.data()) = sum;
@ -198,6 +276,10 @@ void prep_agent(const vector<string>& servers, options_t& options) {
string rep = s_recv(*s); string rep = s_recv(*s);
} }
// Master sleeps here to give agents a chance to connect to
// memcached server before the master, so that the master is never
// the very first set of connections. Is this reasonable or
// necessary? Most probably not.
V("MASTER SLEEPS"); sleep_time(1.5); V("MASTER SLEEPS"); sleep_time(1.5);
} }
@ -214,6 +296,29 @@ void finish_agent(ConnectionStats &stats) {
} }
} }
/*
* For each agent:
* Master -> Agent: sync1
* Agent -> Master: sync
*
* This is a *TERRIBLE* barrier implementation and I am ashamed. This
* not only causes a lot of skew, but it also allows agents to
* continue before the master has spoken with all other agents. This
* is why you see sync_agent() repeated several times in mutilate.cc.
*
* It should be:
*
* For each agent:
* Agent -> Master: sync
* For each agent:
* Master -> Agent: sync1
*
* In this way, all agents must arrive at the barrier and the master
* must receive a message from each of them before it continues. It
* then broadcasts the message to proceed, which reasonably limits
* skew.
*/
void sync_agent(zmq::socket_t* socket) { void sync_agent(zmq::socket_t* socket) {
// V("agent: synchronizing"); // V("agent: synchronizing");
@ -564,6 +669,12 @@ void go(const vector<string>& servers, options_t& options,
#ifdef HAVE_LIBZMQ #ifdef HAVE_LIBZMQ
if (args.agent_given > 0) { if (args.agent_given > 0) {
int total = stats.gets + stats.sets;
V("Local QPS = %.1f (%d / %.1fs)",
total / (stats.stop - stats.start),
total, stats.stop - stats.start);
finish_agent(stats); finish_agent(stats);
} }
#endif #endif
@ -627,7 +738,8 @@ void do_mutilate(const vector<string>& servers, options_t& options,
delete[] s_copy; delete[] s_copy;
int conns = args.measure_mode_given ? 1 : options.connections; int conns = args.measure_connections_given ? args.measure_connections_arg :
options.connections;
for (int c = 0; c < conns; c++) { for (int c = 0; c < conns; c++) {
Connection* conn = new Connection(base, evdns, hostname, port, options, Connection* conn = new Connection(base, evdns, hostname, port, options,