This commit is contained in:
Daniel Byrne 2021-05-05 16:48:53 -04:00
parent 9ca42781af
commit 6b0303e41c
3 changed files with 102 additions and 101 deletions

View File

@ -35,7 +35,7 @@ extern pthread_mutex_t flock;
*/
Connection::Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t _options,
BlockingConcurrentQueue<string>* a_trace_queue,
ConcurrentQueue<string>* a_trace_queue,
bool sampling ) :
start_time(0), stats(sampling), options(_options),
hostname(_hostname), port(_port), base(_base), evdns(_evdns)
@ -407,101 +407,102 @@ int Connection::issue_getsetorset(double now) {
string rvaluelen;
trace_queue->wait_dequeue(line);
if (line.compare("EOF") == 0) {
eof = 1;
return 1;
}
/*
pthread_mutex_lock(&flock);
if (kvfile.good()) {
getline(kvfile,line);
pthread_mutex_unlock(&flock);
}
else {
pthread_mutex_unlock(&flock);
return 1;
}
*/
stringstream ss(line);
int Op = 0;
int vl = 0;
if (options.twitter_trace == 1) {
getline( ss, rT, ',' );
getline( ss, rKey, ',' );
getline( ss, rKeySize, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
vl = atoi(rvaluelen.c_str());
if (vl < 1) vl = 1;
if (vl > 524000) vl = 524000;
if (rOp.compare("get") == 0) {
Op = 1;
} else if (rOp.compare("set") == 0) {
Op = 2;
} else {
Op = 0;
bool res = trace_queue->try_dequeue(line);
if (res) {
if (line.compare("EOF") == 0) {
eof = 1;
return 1;
}
/*
pthread_mutex_lock(&flock);
if (kvfile.good()) {
getline(kvfile,line);
pthread_mutex_unlock(&flock);
}
else {
pthread_mutex_unlock(&flock);
return 1;
}
*/
stringstream ss(line);
int Op = 0;
int vl = 0;
if (options.twitter_trace == 1) {
getline( ss, rT, ',' );
getline( ss, rKey, ',' );
getline( ss, rKeySize, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
vl = atoi(rvaluelen.c_str());
if (vl < 1) vl = 1;
if (vl > 524000) vl = 524000;
if (rOp.compare("get") == 0) {
Op = 1;
} else if (rOp.compare("set") == 0) {
Op = 2;
} else {
Op = 0;
}
} else if (options.twitter_trace == 2) {
getline( ss, rT, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rOp, ',' );
int op_n = atoi(rOp.c_str());
if (op_n == 1)
Op = 1;
else if (op_n == 0) {
Op = 2;
}
vl = atoi(rvaluelen.c_str()) - 76;
vl = vl - strlen(rKey.c_str());
}
else {
getline( ss, rT, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
vl = atoi(rvaluelen.c_str());
if (rOp.compare("read") == 0)
Op = 1;
if (rOp.compare("write") == 0)
Op = 2;
}
char key[1024];
char skey[1024];
memset(key,0,1024);
memset(skey,0,1024);
strncpy(skey, rKey.c_str(),strlen(rKey.c_str()));
if (args.prefix_given) {
strncpy(key, options.prefix,strlen(options.prefix));
}
strncat(key,skey,strlen(skey));
//if (strcmp(key,"100004781") == 0) {
// fprintf(stderr,"ready!\n");
//}
switch(Op)
{
case 0:
fprintf(stderr,"invalid line: %s, vl: %d @T: %d\n",
key,vl,atoi(rT.c_str()));
break;
case 1:
issue_get_with_len(key, vl, now);
break;
case 2:
int index = lrand48() % (1024 * 1024);
issue_set(key, &random_char[index], vl, now,true);
break;
} else if (options.twitter_trace == 2) {
getline( ss, rT, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rOp, ',' );
int op_n = atoi(rOp.c_str());
if (op_n == 1)
Op = 1;
else if (op_n == 0) {
Op = 2;
}
vl = atoi(rvaluelen.c_str()) - 76;
vl = vl - strlen(rKey.c_str());
}
else {
getline( ss, rT, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
vl = atoi(rvaluelen.c_str());
if (rOp.compare("read") == 0)
Op = 1;
if (rOp.compare("write") == 0)
Op = 2;
}
char key[1024];
char skey[1024];
memset(key,0,1024);
memset(skey,0,1024);
strncpy(skey, rKey.c_str(),strlen(rKey.c_str()));
if (args.prefix_given) {
strncpy(key, options.prefix,strlen(options.prefix));
}
strncat(key,skey,strlen(skey));
//if (strcmp(key,"100004781") == 0) {
// fprintf(stderr,"ready!\n");
//}
switch(Op)
{
case 0:
fprintf(stderr,"invalid line: %s, vl: %d @T: %d\n",
key,vl,atoi(rT.c_str()));
break;
case 1:
issue_get_with_len(key, vl, now);
break;
case 2:
int index = lrand48() % (1024 * 1024);
issue_set(key, &random_char[index], vl, now,true);
break;
}
}

View File

@ -36,7 +36,7 @@ class Connection {
public:
Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t options,
BlockingConcurrentQueue<string> *a_trace_queue,
ConcurrentQueue<string> *a_trace_queue,
bool sampling = true);
~Connection();
@ -121,7 +121,7 @@ private:
Generator *iagen;
std::queue<Operation> op_queue;
BlockingConcurrentQueue<string> *trace_queue;
ConcurrentQueue<string> *trace_queue;
// state machine functions / event processing
void pop_op();

View File

@ -64,11 +64,11 @@ struct thread_data {
zmq::socket_t *socket;
#endif
int id;
BlockingConcurrentQueue<string> *trace_queue;
ConcurrentQueue<string> *trace_queue;
};
struct reader_data {
BlockingConcurrentQueue<string> *trace_queue;
ConcurrentQueue<string> *trace_queue;
string trace_filename;
};
@ -90,7 +90,7 @@ void go(const vector<string> &servers, options_t &options,
);
void do_mutilate(const vector<string> &servers, options_t &options,
ConnectionStats &stats,BlockingConcurrentQueue<string> *trace_queue, bool master = true
ConnectionStats &stats,ConcurrentQueue<string> *trace_queue, bool master = true
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket = NULL
#endif
@ -676,7 +676,7 @@ void go(const vector<string>& servers, options_t& options,
}
#endif
BlockingConcurrentQueue<string> *trace_queue = new BlockingConcurrentQueue<string>;
ConcurrentQueue<string> *trace_queue = new ConcurrentQueue<string>;
struct reader_data *rdata = (struct reader_data*)malloc(sizeof(struct reader_data));
rdata->trace_queue = trace_queue;
if (options.read_file) {
@ -800,7 +800,7 @@ int stick_this_thread_to_core(int core_id) {
void* reader_thread(void *arg) {
struct reader_data *rdata = (struct reader_data *) arg;
BlockingConcurrentQueue<string> *trace_queue = (BlockingConcurrentQueue<string>*) rdata->trace_queue;
ConcurrentQueue<string> *trace_queue = (ConcurrentQueue<string>*) rdata->trace_queue;
ifstream trace_file;
trace_file.open(rdata->trace_filename);
@ -836,7 +836,7 @@ void* thread_main(void *arg) {
}
void do_mutilate(const vector<string>& servers, options_t& options,
ConnectionStats& stats, BlockingConcurrentQueue<string> *trace_queue, bool master
ConnectionStats& stats, ConcurrentQueue<string> *trace_queue, bool master
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket
#endif