diff --git a/Connection.cc b/Connection.cc index 0df68b6..bb81b61 100644 --- a/Connection.cc +++ b/Connection.cc @@ -35,7 +35,7 @@ extern pthread_mutex_t flock; */ Connection::Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t _options, - BlockingConcurrentQueue* a_trace_queue, + ConcurrentQueue* a_trace_queue, bool sampling ) : start_time(0), stats(sampling), options(_options), hostname(_hostname), port(_port), base(_base), evdns(_evdns) @@ -407,101 +407,102 @@ int Connection::issue_getsetorset(double now) { string rvaluelen; - trace_queue->wait_dequeue(line); - if (line.compare("EOF") == 0) { - eof = 1; - return 1; - } - - /* - pthread_mutex_lock(&flock); - if (kvfile.good()) { - getline(kvfile,line); - pthread_mutex_unlock(&flock); - } - else { - pthread_mutex_unlock(&flock); - return 1; - } - */ - stringstream ss(line); - int Op = 0; - int vl = 0; - - if (options.twitter_trace == 1) { - getline( ss, rT, ',' ); - getline( ss, rKey, ',' ); - getline( ss, rKeySize, ',' ); - getline( ss, rvaluelen, ',' ); - getline( ss, rApp, ',' ); - getline( ss, rOp, ',' ); - vl = atoi(rvaluelen.c_str()); - if (vl < 1) vl = 1; - if (vl > 524000) vl = 524000; - if (rOp.compare("get") == 0) { - Op = 1; - } else if (rOp.compare("set") == 0) { - Op = 2; - } else { - Op = 0; + bool res = trace_queue->try_dequeue(line); + if (res) { + if (line.compare("EOF") == 0) { + eof = 1; + return 1; } + /* + pthread_mutex_lock(&flock); + if (kvfile.good()) { + getline(kvfile,line); + pthread_mutex_unlock(&flock); + } + else { + pthread_mutex_unlock(&flock); + return 1; + } + */ + stringstream ss(line); + int Op = 0; + int vl = 0; + + if (options.twitter_trace == 1) { + getline( ss, rT, ',' ); + getline( ss, rKey, ',' ); + getline( ss, rKeySize, ',' ); + getline( ss, rvaluelen, ',' ); + getline( ss, rApp, ',' ); + getline( ss, rOp, ',' ); + vl = atoi(rvaluelen.c_str()); + if (vl < 1) vl = 1; + if (vl > 524000) vl = 524000; + if (rOp.compare("get") == 0) { + Op = 1; + } else if (rOp.compare("set") == 0) { + Op = 2; + } else { + Op = 0; + } + + } else if (options.twitter_trace == 2) { + getline( ss, rT, ',' ); + getline( ss, rKey, ',' ); + getline( ss, rvaluelen, ',' ); + getline( ss, rOp, ',' ); + int op_n = atoi(rOp.c_str()); + + if (op_n == 1) + Op = 1; + else if (op_n == 0) { + Op = 2; + } + vl = atoi(rvaluelen.c_str()) - 76; + vl = vl - strlen(rKey.c_str()); + } + else { + getline( ss, rT, ',' ); + getline( ss, rApp, ',' ); + getline( ss, rOp, ',' ); + getline( ss, rKey, ',' ); + getline( ss, rvaluelen, ',' ); + vl = atoi(rvaluelen.c_str()); + if (rOp.compare("read") == 0) + Op = 1; + if (rOp.compare("write") == 0) + Op = 2; + } + + + char key[1024]; + char skey[1024]; + memset(key,0,1024); + memset(skey,0,1024); + strncpy(skey, rKey.c_str(),strlen(rKey.c_str())); + + if (args.prefix_given) { + strncpy(key, options.prefix,strlen(options.prefix)); + } + strncat(key,skey,strlen(skey)); + //if (strcmp(key,"100004781") == 0) { + // fprintf(stderr,"ready!\n"); + //} + switch(Op) + { + case 0: + fprintf(stderr,"invalid line: %s, vl: %d @T: %d\n", + key,vl,atoi(rT.c_str())); + break; + case 1: + issue_get_with_len(key, vl, now); + break; + case 2: + int index = lrand48() % (1024 * 1024); + issue_set(key, &random_char[index], vl, now,true); + break; - } else if (options.twitter_trace == 2) { - getline( ss, rT, ',' ); - getline( ss, rKey, ',' ); - getline( ss, rvaluelen, ',' ); - getline( ss, rOp, ',' ); - int op_n = atoi(rOp.c_str()); - - if (op_n == 1) - Op = 1; - else if (op_n == 0) { - Op = 2; } - vl = atoi(rvaluelen.c_str()) - 76; - vl = vl - strlen(rKey.c_str()); - } - else { - getline( ss, rT, ',' ); - getline( ss, rApp, ',' ); - getline( ss, rOp, ',' ); - getline( ss, rKey, ',' ); - getline( ss, rvaluelen, ',' ); - vl = atoi(rvaluelen.c_str()); - if (rOp.compare("read") == 0) - Op = 1; - if (rOp.compare("write") == 0) - Op = 2; - } - - - char key[1024]; - char skey[1024]; - memset(key,0,1024); - memset(skey,0,1024); - strncpy(skey, rKey.c_str(),strlen(rKey.c_str())); - - if (args.prefix_given) { - strncpy(key, options.prefix,strlen(options.prefix)); - } - strncat(key,skey,strlen(skey)); - //if (strcmp(key,"100004781") == 0) { - // fprintf(stderr,"ready!\n"); - //} - switch(Op) - { - case 0: - fprintf(stderr,"invalid line: %s, vl: %d @T: %d\n", - key,vl,atoi(rT.c_str())); - break; - case 1: - issue_get_with_len(key, vl, now); - break; - case 2: - int index = lrand48() % (1024 * 1024); - issue_set(key, &random_char[index], vl, now,true); - break; - } } diff --git a/Connection.h b/Connection.h index 3737e3a..20ed4e9 100644 --- a/Connection.h +++ b/Connection.h @@ -36,7 +36,7 @@ class Connection { public: Connection(struct event_base* _base, struct evdns_base* _evdns, string _hostname, string _port, options_t options, - BlockingConcurrentQueue *a_trace_queue, + ConcurrentQueue *a_trace_queue, bool sampling = true); ~Connection(); @@ -121,7 +121,7 @@ private: Generator *iagen; std::queue op_queue; - BlockingConcurrentQueue *trace_queue; + ConcurrentQueue *trace_queue; // state machine functions / event processing void pop_op(); diff --git a/mutilate.cc b/mutilate.cc index 4dcda4c..bca6a80 100644 --- a/mutilate.cc +++ b/mutilate.cc @@ -64,11 +64,11 @@ struct thread_data { zmq::socket_t *socket; #endif int id; - BlockingConcurrentQueue *trace_queue; + ConcurrentQueue *trace_queue; }; struct reader_data { - BlockingConcurrentQueue *trace_queue; + ConcurrentQueue *trace_queue; string trace_filename; }; @@ -90,7 +90,7 @@ void go(const vector &servers, options_t &options, ); void do_mutilate(const vector &servers, options_t &options, - ConnectionStats &stats,BlockingConcurrentQueue *trace_queue, bool master = true + ConnectionStats &stats,ConcurrentQueue *trace_queue, bool master = true #ifdef HAVE_LIBZMQ , zmq::socket_t* socket = NULL #endif @@ -676,7 +676,7 @@ void go(const vector& servers, options_t& options, } #endif - BlockingConcurrentQueue *trace_queue = new BlockingConcurrentQueue; + ConcurrentQueue *trace_queue = new ConcurrentQueue; struct reader_data *rdata = (struct reader_data*)malloc(sizeof(struct reader_data)); rdata->trace_queue = trace_queue; if (options.read_file) { @@ -800,7 +800,7 @@ int stick_this_thread_to_core(int core_id) { void* reader_thread(void *arg) { struct reader_data *rdata = (struct reader_data *) arg; - BlockingConcurrentQueue *trace_queue = (BlockingConcurrentQueue*) rdata->trace_queue; + ConcurrentQueue *trace_queue = (ConcurrentQueue*) rdata->trace_queue; ifstream trace_file; trace_file.open(rdata->trace_filename); @@ -836,7 +836,7 @@ void* thread_main(void *arg) { } void do_mutilate(const vector& servers, options_t& options, - ConnectionStats& stats, BlockingConcurrentQueue *trace_queue, bool master + ConnectionStats& stats, ConcurrentQueue *trace_queue, bool master #ifdef HAVE_LIBZMQ , zmq::socket_t* socket #endif