From 86ad5659568258caef78c0cb489124db984ad2ff Mon Sep 17 00:00:00 2001 From: Jacob Leverich Date: Wed, 13 Mar 2013 16:58:36 -0700 Subject: [PATCH] Updated sync_agent() to be less ridiculous. Still ridiculous, though. --- mutilate.cc | 63 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/mutilate.cc b/mutilate.cc index a8a3e03..cacf4db 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -53,7 +53,7 @@ zmq::context_t context(1); struct thread_data { const vector *servers; options_t *options; - bool master; + bool master; // Thread #0, not to be confused with agent master. #ifdef HAVE_LIBZMQ zmq::socket_t *socket; #endif @@ -298,21 +298,20 @@ void finish_agent(ConnectionStats &stats) { } /* - * For each agent: - * Master -> Agent: sync1 - * Agent -> Master: sync + * This synchronization routine is ridiculous because the master only + * has a ZMQ_REQ socket to the agents, but it needs to wait for a + * message from each agent before it releases them. In order to get + * the ZMQ socket into a state where it'll allow the agent to send it + * a message, it must first send a message ("sync_req"). In order to + * not leave the socket dangling with an incomplete transaction, the + * agent must send a reply ("ack"). * - * This is a *TERRIBLE* barrier implementation and I am ashamed. This - * not only causes a lot of skew, but it also allows agents to - * continue before the master has spoken with all other agents. This - * is why you see sync_agent() repeated several times in mutilate.cc. - * - * It should be: + * Without this stupid complication it would be: * * For each agent: * Agent -> Master: sync * For each agent: - * Master -> Agent: sync1 + * Master -> Agent: proceed * * In this way, all agents must arrive at the barrier and the master * must receive a message from each of them before it continues. It @@ -324,13 +323,31 @@ void sync_agent(zmq::socket_t* socket) { // V("agent: synchronizing"); if (args.agent_given) { - for (auto s: agent_sockets) { - s_send(*s, "sync1"); - string rep = s_recv(*s); - } + for (auto s: agent_sockets) + s_send(*s, "sync_req"); + + /* The real sync */ + for (auto s: agent_sockets) + if (s_recv(*s).compare(string("sync"))) + DIE("sync_agent[M]: out of sync [1]"); + for (auto s: agent_sockets) + s_send(*s, "proceed"); + /* End sync */ + + for (auto s: agent_sockets) + if (s_recv(*s).compare(string("ack"))) + DIE("sync_agent[M]: out of sync [2]"); } else if (args.agentmode_given) { - string req = s_recv(*socket); + if (s_recv(*socket).compare(string("sync_req"))) + DIE("sync_agent[A]: out of sync [1]"); + + /* The real sync */ s_send(*socket, "sync"); + if (s_recv(*socket).compare(string("proceed"))) + DIE("sync_agent[A]: out of sync [2]"); + /* End sync */ + + s_send(*socket, "ack"); } // V("agent: synchronized"); @@ -667,7 +684,6 @@ void go(const vector& servers, options_t& options, #ifdef HAVE_LIBZMQ if (args.agent_given) { sync_agent(socket); - sync_agent(socket); } #endif } @@ -809,13 +825,14 @@ void do_mutilate(const vector& servers, options_t& options, #ifdef HAVE_LIBZMQ if (args.agent_given || args.agentmode_given) { if (master) V("Synchronizing."); + + // 1. thread barrier: make sure our threads ready before syncing agents + // 2. sync agents: all threads across all agents are now ready + // 3. thread barrier: don't release our threads until all agents ready + pthread_barrier_wait(&barrier); if (master) sync_agent(socket); - pthread_barrier_wait(&barrier); - if (master) sync_agent(socket); - - pthread_barrier_wait(&barrier); if (master) V("Synchronized."); } #endif @@ -900,13 +917,11 @@ void do_mutilate(const vector& servers, options_t& options, #ifdef HAVE_LIBZMQ if (args.agent_given || args.agentmode_given) { if (master) V("Synchronizing."); - if (master) sync_agent(socket); pthread_barrier_wait(&barrier); - if (master) sync_agent(socket); - pthread_barrier_wait(&barrier); + if (master) V("Synchronized."); } #endif