This commit is contained in:
Daniel Byrne 2021-12-14 17:43:31 -05:00
parent b02676c8a8
commit eb4ed61847
6 changed files with 484 additions and 292 deletions

View File

@ -214,7 +214,10 @@ public:
int eof;
uint32_t get_cid();
//void set_queue(ConcurrentQueue<string> *a_trace_queue);
void set_queue(queue<string> *a_trace_queue);
int add_to_wb_keys(string wb_key);
void del_wb_keys(string wb_key);
void set_g_wbkeys(unordered_map<string,int> *a_wb_keys);
void set_queue(queue<Operation> *a_trace_queue);
void set_lock(pthread_mutex_t* a_lock);
private:
@ -222,6 +225,8 @@ private:
string hostname2;
string port;
double o_percent;
int trace_queue_n;
struct event_base *base;
struct evdns_base *evdns;
struct bufferevent *bev1;
@ -279,8 +284,8 @@ private:
KeyGenerator *keygen;
Generator *iagen;
pthread_mutex_t* lock;
//ConcurrentQueue<string> *trace_queue;
queue<string> *trace_queue;
unordered_map<string,int> *g_wb_keys;
queue<Operation> *trace_queue;
// state machine functions / event processing
void pop_op(Operation *op);

View File

@ -109,6 +109,8 @@ typedef struct _evicted_type {
char *evictedData;
} evicted_t;
static vector<double> cid_rate;
extern int max_n[3];
static void init_inclusives(char *inclusive_str) {
@ -128,7 +130,7 @@ static void init_classes() {
double factor = 1.25;
unsigned int chunk_size = 48;
unsigned int item_size = 24;
unsigned int size = item_size + chunk_size;
unsigned int size = 96; //warning if you change this you die
unsigned int i = 0;
unsigned int chunk_size_max = 1048576/2;
while (++i < NCLASSES-1) {
@ -146,11 +148,12 @@ static void init_classes() {
}
static int get_class(int vl, uint32_t kl) {
int vsize = vl+kl+24+1+2;
//warning if you change this you die
int vsize = vl+kl+48+1+2;
int res = 1;
while (vsize > sizes[res])
if (res++ == classes) {
fprintf(stderr,"item larger than max class size. vsize: %d, class size: %d\n",vsize,sizes[res]);
//fprintf(stderr,"item larger than max class size. vsize: %d, class size: %d\n",vsize,sizes[res]);
return -1;
}
return res;
@ -158,7 +161,11 @@ static int get_class(int vl, uint32_t kl) {
static int get_incl(int vl, int kl) {
int clsid = get_class(vl,kl);
return inclusives[clsid];
if (clsid) {
return inclusives[clsid];
} else {
return -1;
}
}
void ConnectionMulti::output_op(Operation *op, int type, bool found) {
@ -224,8 +231,19 @@ ConnectionMulti::ConnectionMulti(struct event_base* _base, struct evdns_base* _e
start_time(0), stats(sampling), options(_options),
hostname1(_hostname1), hostname2(_hostname2), port(_port), base(_base), evdns(_evdns)
{
init_classes();
init_inclusives(options.inclusives);
pthread_mutex_lock(&cid_lock_m);
cid = connids_m++;
if (cid == 1) {
cid_rate.push_back(100);
cid_rate.push_back(0);
init_classes();
init_inclusives(options.inclusives);
} else {
cid_rate.push_back(0);
}
pthread_mutex_unlock(&cid_lock_m);
valuesize = createGenerator(options.valuesize);
keysize = createGenerator(options.keysize);
srand(time(NULL));
@ -233,6 +251,7 @@ ConnectionMulti::ConnectionMulti(struct event_base* _base, struct evdns_base* _e
total = 0;
eof = 0;
o_percent = 0;
if (options.lambda <= 0) {
iagen = createGenerator("0");
@ -249,9 +268,6 @@ ConnectionMulti::ConnectionMulti(struct event_base* _base, struct evdns_base* _e
last_tx = last_rx = 0.0;
pthread_mutex_lock(&cid_lock_m);
cid = connids_m++;
pthread_mutex_unlock(&cid_lock_m);
op_queue_size = (uint32_t*)malloc(sizeof(uint32_t)*(LEVELS+1));
opaque = (uint32_t*)malloc(sizeof(uint32_t)*(LEVELS+1));
@ -279,18 +295,56 @@ ConnectionMulti::ConnectionMulti(struct event_base* _base, struct evdns_base* _e
}
void ConnectionMulti::set_queue(queue<string>* a_trace_queue) {
void ConnectionMulti::set_queue(queue<Operation>* a_trace_queue) {
trace_queue = a_trace_queue;
trace_queue_n = a_trace_queue->size();
}
void ConnectionMulti::set_lock(pthread_mutex_t* a_lock) {
lock = a_lock;
}
void ConnectionMulti::set_g_wbkeys(unordered_map<string,int> *a_wb_keys) {
g_wb_keys = a_wb_keys;
}
uint32_t ConnectionMulti::get_cid() {
return cid;
}
int ConnectionMulti::add_to_wb_keys(string key) {
int ret = -1;
pthread_mutex_lock(lock);
auto pos = g_wb_keys->find(key);
if (pos == g_wb_keys->end()) {
g_wb_keys->insert( {key,cid });
ret = 1;
//fprintf(stderr,"----set: %s----\n",Op.key.c_str());
//for (auto iter = g_wb_keys->begin(); iter != g_wb_keys->end(); ++iter){
// fprintf(stderr,"%s,%d\n",iter->first.c_str(),iter->second);
//}
//fprintf(stderr,"----%d----\n",cid);
} else {
ret = 2;
}
pthread_mutex_unlock(lock);
return ret;
}
void ConnectionMulti::del_wb_keys(string key) {
pthread_mutex_lock(lock);
auto position = g_wb_keys->find(key);
if (position != g_wb_keys->end()) {
g_wb_keys->erase(position);
} else {
fprintf(stderr,"expected %s, got nuthin\n",key.c_str());
}
pthread_mutex_unlock(lock);
}
int ConnectionMulti::do_connect() {
int connected = 0;
@ -390,152 +444,156 @@ void ConnectionMulti::set_priority(int pri) {
int ConnectionMulti::issue_getsetorset(double now) {
string line;
string rT;
string rApp;
string rOp;
string rKey;
string rKeySize;
string rvaluelen;
int ret = 0;
int nissued = 0;
while (nissued < options.depth) {
//while (nissued < options.depth) {
//pthread_mutex_lock(lock);
if (!trace_queue->empty()) {
Operation Op = trace_queue->front();
if (Op.type == Operation::SASL) {
eof = 1;
cid_rate[cid] = 100;
fprintf(stderr,"cid %d done\n",cid);
string op_queue1;
string op_queue2;
for (int j = 0; j < 2; j++) {
for (int i = 0; i < OPAQUE_MAX; i++) {
if (op_queue[j+1][i] != NULL) {
if (j == 0) {
op_queue1 = op_queue1 + "," + op_queue[j+1][i]->key;
} else {
op_queue2 = op_queue2 + "," + op_queue[j+1][i]->key;
}
}
}
}
fprintf(stderr,"cid %d op_queue1: %s op_queue2: %s, op_queue_size1: %d, op_queue_size2: %d\n",cid,op_queue1.c_str(),op_queue2.c_str(),op_queue_size[1],op_queue_size[2]);
return 1;
}
//pthread_mutex_lock(lock);
if (!trace_queue->empty()) {
line = trace_queue->front();
if (line.compare("EOF") == 0) {
eof = 1;
fprintf(stderr,"cid %d done\n",cid);
return 1;
}
stringstream ss(line);
//pthread_mutex_unlock(lock);
int Op = 0;
int vl = 0;
if (options.twitter_trace == 1) {
getline( ss, rT, ',' );
getline( ss, rKey, ',' );
getline( ss, rKeySize, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
vl = stoi(rvaluelen);
if (vl < 1) continue;
if (vl > 524000) vl = 524000;
if (rOp.compare("get") == 0) {
Op = 1;
} else if (rOp.compare("set") == 0) {
Op = 2;
} else {
Op = 0;
}
} else if (options.twitter_trace == 2) {
getline( ss, rT, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
Op = stoi(rOp);
vl = stoi(rvaluelen);
if (vl < 1 || vl > 524000) {
fprintf(stderr,"bad val for line: %s, %d\n",line.c_str(),vl);
}
} else {
getline( ss, rT, ',' );
getline( ss, rApp, ',' );
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
vl = stoi(rvaluelen);
if (rOp.compare("read") == 0)
Op = 1;
if (rOp.compare("write") == 0)
Op = 2;
}
if ((int)wb_keys.size() > options.depth*10) {
fprintf(stderr,"wb_queue size %d\n",(int)wb_keys.size());
return 1;
}
/* if this key is currently being written back wait for it to clear */
if (find(wb_keys.begin(), wb_keys.end(), rKey) != wb_keys.end()) {
return 1;
}
char key[256];
memset(key,0,256);
strncpy(key, rKey.c_str(),255);
/* check if in global wb queue */
pthread_mutex_lock(lock);
double percent = (double)total/((double)trace_queue_n) * 100;
if (percent > o_percent+1) {
//update the percentage table and see if we should execute
std::vector<double>::iterator mp = std::min_element(cid_rate.begin(), cid_rate.end());
double min_percent = *mp;
trace_queue->pop();
int issued = 0;
int incl = get_incl(vl,strlen(key));
int flags = 0;
int touch = (rand() % 100);
SET_INCL(incl,flags);
switch(Op)
{
case 0:
//fprintf(stderr,"invalid line: %s, vl: %d @T: %d\n",
// key,vl,stoi(rT));
break;
case 1:
//if (nissued < options.depth-1) {
// issued = issue_get_with_len(key, vl, now, false, 1, flags, 0, 1);
// last_quiet1 = false;
//} else {
//}
if (options.threshold > 0) {
key_hist[key]++;
}
issued = issue_get_with_len(key, vl, now, false, flags | LOG_OP | ITEM_L1);
if (touch == 0 && incl == 1) {
issue_touch(key,vl,now, ITEM_L2 | SRC_L1_H);
}
last_quiet1 = false;
this->stats.gets++;
break;
case 2:
if (last_quiet1) {
issue_noop(now,1);
}
int index = lrand48() % (1024 * 1024);
wb_keys.push_back(string(key));
if (touch == 0 && incl == 1) {
issue_touch(key,vl,now, ITEM_L2 | SRC_DIRECT_SET);
issued = issue_set(key, &random_char[index], vl, now, flags | LOG_OP | ITEM_L1 | SRC_DIRECT_SET);
} else {
issued = issue_set(key, &random_char[index], vl, now, flags | LOG_OP | ITEM_L1 | SRC_DIRECT_SET | ITEM_DIRTY);
}
last_quiet1 = false;
this->stats.sets++;
break;
if (percent > min_percent+2) {
pthread_mutex_unlock(lock);
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
int good = 0;
if (!event_pending(timer, EV_TIMEOUT, NULL)) {
good = evtimer_add(timer, &tv);
}
if (good != 0) {
fprintf(stderr,"eventimer is messed up!\n");
return 2;
}
return 1;
}
if (issued) {
nissued++;
total++;
} else {
if (Op != 0) {
fprintf(stderr,"failed to issue line: %s, vl: %d @T: %d\n",
key,vl,stoi(rT));
}
break;
}
} else {
//pthread_mutex_unlock(lock);
//we should protect this with a condition variable
//since trace queue size is 0 and not EOF.
return 0;
cid_rate[cid] = percent;
fprintf(stderr,"%f,%d,%.4f\n",now,cid,percent);
o_percent = percent;
}
auto check = g_wb_keys->find(Op.key);
if (check != g_wb_keys->end()) {
pthread_mutex_unlock(lock);
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
int good = 0;
if (!event_pending(timer, EV_TIMEOUT, NULL)) {
good = evtimer_add(timer, &tv);
}
if (good != 0) {
fprintf(stderr,"eventimer is messed up in checking for key: %s\n",Op.key.c_str());
return 2;
}
return 1;
} else {
g_wb_keys->insert( {Op.key, cid} );
//g_wb_keys->insert( {Op.key+"l2", cid} );
}
pthread_mutex_unlock(lock);
char key[256];
memset(key,0,256);
strncpy(key, Op.key.c_str(),255);
int vl = Op.valuelen;
trace_queue->pop();
int issued = 0;
int incl = get_incl(vl,strlen(key));
int cid = get_class(vl,strlen(key));
int flags = 0;
int touch = (rand() % 100);
int index = lrand48() % (1024 * 1024);
//int touch = 1;
SET_INCL(incl,flags);
switch(Op.type)
{
case Operation::GET:
//if (nissued < options.depth-1) {
// issued = issue_get_with_len(key, vl, now, false, 1, flags, 0, 1);
// last_quiet1 = false;
//} else {
//}
if (options.threshold > 0) {
if (Op.future) {
key_hist[key] = 1;
}
}
issued = issue_get_with_len(key, vl, now, false, flags | LOG_OP | ITEM_L1);
if (touch == 1 && incl == 1) {
issue_touch(key,vl,now, ITEM_L2 | SRC_L1_H);
}
last_quiet1 = false;
this->stats.gets++;
this->stats.gets_cid[cid]++;
break;
case Operation::SET:
if (last_quiet1) {
issue_noop(now,1);
}
if (incl == 1) {
issue_touch(key,vl,now, ITEM_L2 | SRC_DIRECT_SET);
} else if (incl == 2) {
issue_delete(key,now, ITEM_L2 | SRC_DIRECT_SET );
}
issued = issue_set(key, &random_char[index], vl, now, flags | LOG_OP | ITEM_L1 | SRC_DIRECT_SET);
last_quiet1 = false;
this->stats.sets++;
this->stats.sets_cid[cid]++;
break;
case Operation::DELETE:
case Operation::TOUCH:
case Operation::NOOP:
case Operation::SASL:
fprintf(stderr,"invalid line: %s, vl: %d\n",key,vl);
break;
}
if (issued) {
nissued++;
total++;
} else {
fprintf(stderr,"failed to issue line: %s, vl: %d @T: XX \n",key,vl);
}
} else {
return 1;
}
//}
if (last_quiet1) {
issue_noop(now,1);
last_quiet1 = false;
@ -595,7 +653,7 @@ int ConnectionMulti::issue_get_with_len(const char* key, int valuelen, double no
op_queue_size[level]++;
#ifdef DEBUGS
fprintf(stderr,"issing get: %s, size: %u, level %d, flags: %d, opaque: %d\n",key,valuelen,level,flags,pop->opaque);
fprintf(stderr,"cid: %d issing get: %s, size: %u, level %d, flags: %d, opaque: %d\n",cid,key,valuelen,level,flags,pop->opaque);
#endif
if (opaque[level] > OPAQUE_MAX) {
@ -681,6 +739,9 @@ int ConnectionMulti::issue_touch(const char* key, int valuelen, double now, int
h.opaque = htonl(pop->opaque);
uint32_t exp = 0;
if (flags & ITEM_DIRTY) {
exp = htonl(flags);
}
evbuffer_add(output, &h, 24);
evbuffer_add(output, &exp, 4);
evbuffer_add(output, key, keylen);
@ -738,6 +799,9 @@ int ConnectionMulti::issue_delete(const char* key, double now, uint32_t flags) {
if (opaque[level] > OPAQUE_MAX) {
opaque[level] = 1;
}
#ifdef DEBUGS
fprintf(stderr,"cid: %d issing delete: %s, level %d, flags: %d, opaque: %d\n",cid,key,level,flags,pop->opaque);
#endif
//if (read_state == IDLE) read_state = WAITING_FOR_GET;
uint16_t keylen = strlen(key);
@ -818,7 +882,7 @@ int ConnectionMulti::issue_set(const char* key, const char* value, int length, d
//op_queue[level].push(op);
op_queue_size[level]++;
#ifdef DEBUGS
fprintf(stderr,"issing set: %s, size: %u, level %d, flags: %d, opaque: %d\n",key,length,level,flags,pop->opaque);
fprintf(stderr,"cid: %d issing set: %s, size: %u, level %d, flags: %d, opaque: %d\n",cid,key,length,level,flags,pop->opaque);
#endif
if (opaque[level] > OPAQUE_MAX) {
@ -956,12 +1020,14 @@ void ConnectionMulti::finish_op(Operation *op, int was_hit) {
uint8_t level = OP_level(op);
if (op->l1 != NULL) {
delete op_queue[1][op->l1->opaque];
op_queue[1][op->l1->opaque] = 0;
op_queue_size[1]--;
}
//op_queue[level].erase(op_queue[level].begin()+opopq);
if (op == op_queue[level][op->opaque] &&
op->opaque == op_queue[level][op->opaque]->opaque) {
delete op_queue[level][op->opaque];
op_queue[level][op->opaque] = 0;
} else {
fprintf(stderr,"op_queue out of sync! Expected %p, got %p, opa1: %d opaq2: %d\n",
op,op_queue[level][op->opaque],op->opaque,op_queue[level][op->opaque]->opaque);
@ -978,38 +1044,10 @@ void ConnectionMulti::finish_op(Operation *op, int was_hit) {
* Check if our testing is done and we should exit.
*/
bool ConnectionMulti::check_exit_condition(double now) {
if (eof) {
if (eof && op_queue_size[1] == 0 && op_queue_size[2] == 0) {
return true;
}
if (read_state == INIT_READ) return false;
if (now == 0.0) now = get_time();
if (options.read_file) {
if (eof) {
return true;
}
else if ((options.queries == 1) &&
(now > start_time + options.time))
{
return true;
}
else {
return false;
}
} else {
if (options.queries != 0 &&
(((long unsigned)options.queries) == (stats.accesses)))
{
return true;
}
if ((options.queries == 0) &&
(now > start_time + options.time))
{
return true;
}
if (options.loadonly && read_state == IDLE) return true;
}
return false;
}
@ -1120,7 +1158,7 @@ void ConnectionMulti::drive_write_machine(double now) {
if (options.getsetorset) {
int ret = issue_getsetorset(now);
if (ret) return; //if at EOF
if (ret == 1) return; //if at EOF
}
last_tx = now;
@ -1136,6 +1174,10 @@ void ConnectionMulti::drive_write_machine(double now) {
case WAITING_FOR_OPQ:
if ( (op_queue_size[1] >= (size_t) options.depth) ||
(op_queue_size[2] >= (size_t) options.depth) ) {
//double delay = 0.01;
//struct timeval tv;
//double_to_tv(delay, &tv);
//evtimer_add(timer, &tv);
return;
} else {
write_state = ISSUING;
@ -1176,7 +1218,7 @@ static bool handle_response(ConnectionMulti *conn, evbuffer *input, bool &done,
opaque = ntohl(h->opaque);
uint16_t status = ntohs(h->status);
#ifdef DEBUGMC
fprintf(stderr,"handle resp from l%d - opcode: %u opaque: %u keylen: %u extralen: %u datalen: %u status: %u\n",level,
fprintf(stderr,"cid: %d handle resp from l%d - opcode: %u opaque: %u keylen: %u extralen: %u datalen: %u status: %u\n",conn->get_cid(),level,
h->opcode,ntohl(h->opaque),ntohs(h->key_len),h->extra_len,
ntohl(h->body_len),ntohs(h->status));
#endif
@ -1204,6 +1246,7 @@ static bool handle_response(ConnectionMulti *conn, evbuffer *input, bool &done,
evbuffer_drain(input,24);
unsigned char *buf = evbuffer_pullup(input,bl);
evict->clsid = *((uint32_t*)buf);
evict->clsid = ntohl(evict->clsid);
buf += 4;
@ -1334,11 +1377,8 @@ void ConnectionMulti::read_callback1() {
//finish_op(op,0);
} else {
if (found) {
finish_op(op,1);
} else {
finish_op(op,0);
}
del_wb_keys(op->key);
finish_op(op,found);
}
} else {
char out[128];
@ -1347,40 +1387,46 @@ void ConnectionMulti::read_callback1() {
}
break;
case Operation::SET:
if (OP_src(op) == SRC_L1_COPY ||
OP_src(op) == SRC_DIRECT_SET ||
OP_src(op) == SRC_L2_M ) {
vector<string>::iterator position = std::find(wb_keys.begin(), wb_keys.end(), op->key);
if (position != wb_keys.end()) {
wb_keys.erase(position);
} else {
fprintf(stderr,"expected %s, got nuthin\n",op->key.c_str());
}
}
//if (OP_src(op) == SRC_L1_COPY ||
// OP_src(op) == SRC_DIRECT_SET ||
// OP_src(op) == SRC_L2_M ) {
//}
if (evict->evicted) {
string wb_key(evict->evictedKey);
if ((evict->evictedFlags & ITEM_INCL) && (evict->evictedFlags & ITEM_DIRTY)) {
//strncpy(wb_key,evict->evictedKey,255);
string wb_key(evict->evictedKey);
wb_keys.push_back(wb_key);
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_INCL | ITEM_DIRTY | LOG_OP | SRC_WB);
//wb_keys.push_back(wb_key);
int ret = add_to_wb_keys(wb_key);
if (ret == 1) {
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_INCL | LOG_OP | SRC_WB);
}
//fprintf(stderr,"incl writeback %s\n",evict->evictedKey);
this->stats.incl_wbs++;
} else if (evict->evictedFlags & ITEM_EXCL) {
//fprintf(stderr,"excl writeback %s\n",evict->evictedKey);
//strncpy(wb_key,evict->evictedKey,255);
string wb_key(evict->evictedKey);
if (options.rand_admit && wb == 0) {
wb_keys.push_back(wb_key);
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_EXCL | LOG_OP | SRC_WB);
} else if (options.threshold && (key_hist[wb_key] >= options.threshold)) {
wb_keys.push_back(wb_key);
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_EXCL | LOG_OP | SRC_WB);
} else if (options.wb_all) {
wb_keys.push_back(wb_key);
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_EXCL | LOG_OP | SRC_WB);
if ( (options.rand_admit && wb == 0) ||
(options.threshold && (key_hist[wb_key] == 1)) ||
(options.wb_all) ) {
int ret = add_to_wb_keys(wb_key);
if (ret == 1) {
issue_set(evict->evictedKey, evict->evictedData, evict->evictedLen, now, ITEM_L2 | ITEM_EXCL | LOG_OP | SRC_WB);
}
this->stats.excl_wbs++;
}
this->stats.excl_wbs++;
}
/*
if (evict->serverFlags & ITEM_SIZE_CHANGE && OP_src(op) == SRC_DIRECT_SET) {
char key[256];
memset(key,0,256);
strncpy(key, op->key.c_str(),255);
if (evict->serverFlags & ITEM_INCL) {
int index = lrand48() % (1024 * 1024);
int valuelen = op->valuelen;
//the item's size was changed, issue a SET to L2 as a new command
issue_set(key, &random_char[index], valuelen, now, ITEM_L2 | ITEM_INCL | LOG_OP | SRC_L2_M);
}
}
*/
if (OP_src(op) == SRC_DIRECT_SET) {
if ( (evict->serverFlags & ITEM_SIZE_CHANGE) || ((evict->serverFlags & ITEM_WAS_HIT) == 0)) {
this->stats.set_misses_l1++;
@ -1391,19 +1437,10 @@ void ConnectionMulti::read_callback1() {
}
}
}
/*
if (OP_incl(op) && (OP_src(op) == SRC_DIRECT_SET)) {
char key[256];
memset(key,0,256);
strncpy(key, op->key.c_str(),255);
int valuelen = op->valuelen;
if (class_change) {
int index = lrand48() % (1024 * 1024);
issue_set(key, &random_char[index], valuelen, now, ITEM_L2 | ITEM_INCL | LOG_OP);
} else {
}
}
*/
del_wb_keys(op->key);
finish_op(op,1);
break;
case Operation::TOUCH:
finish_op(op,1);
break;
default:
@ -1506,14 +1543,21 @@ void ConnectionMulti::read_callback2() {
int valuelen = op->valuelen;
int index = lrand48() % (1024 * 1024);
int flags = OP_clu(op) | SRC_L2_M | LOG_OP;
wb_keys.push_back(op->key);
issue_set(key, &random_char[index], valuelen, now, flags | ITEM_L1);
//wb_keys.push_back(op->key);
last_quiet1 = false;
if (OP_incl(op)) {
wb_keys.push_back(op->key);
//wb_keys.push_back(op->key);
issue_set(key, &random_char[index], valuelen, now, flags | ITEM_L2);
last_quiet2 = false;
}
//pthread_mutex_lock(lock);
//fprintf(stderr,"----miss: %s----\n",key);
//for (auto iter = g_wb_keys->begin(); iter != g_wb_keys->end(); ++iter){
// fprintf(stderr,"%s,%d\n",iter->first.c_str(),iter->second);
//}
//fprintf(stderr,"----%d----\n",cid);
//pthread_mutex_unlock(lock);
finish_op(op,0); // sets read_state = IDLE
} else {
@ -1525,7 +1569,7 @@ void ConnectionMulti::read_callback2() {
int index = lrand48() % (1024 * 1024);
int flags = OP_clu(op) | ITEM_L1 | SRC_L1_COPY;
//found in l2, set in l1
wb_keys.push_back(op->key);
//wb_keys.push_back(op->key);
issue_set(key, &random_char[index],valuelen, now, flags);
this->stats.copies_to_l1++;
//if (OP_excl(op)) {
@ -1544,26 +1588,27 @@ void ConnectionMulti::read_callback2() {
}
break;
case Operation::SET:
if (OP_src(op) == SRC_WB || OP_src(op) == SRC_L2_M) {
vector<string>::iterator position = std::find(wb_keys.begin(), wb_keys.end(), op->key);
if (position != wb_keys.end()) {
wb_keys.erase(position);
} else {
fprintf(stderr,"expected wb %s, got nuthin\n",op->key.c_str());
}
if (OP_src(op) == SRC_WB) {
del_wb_keys(op->key);
}
finish_op(op,1);
break;
case Operation::TOUCH:
if (!found && (OP_src(op) == SRC_DIRECT_SET)) {
if (OP_src(op) == SRC_DIRECT_SET) {
char key[256];
memset(key,0,256);
strncpy(key, op->key.c_str(),255);
int valuelen = op->valuelen;
int index = lrand48() % (1024 * 1024);
wb_keys.push_back(op->key);
issue_set(key, &random_char[index],valuelen,now, ITEM_INCL | ITEM_L2 | LOG_OP | SRC_L2_M);
this->stats.set_misses_l2++;
if (!found) {
int index = lrand48() % (1024 * 1024);
//int ret = add_to_wb_keys(op->key+"l2");
//if (ret == 1) {
issue_set(key, &random_char[index],valuelen,now, ITEM_INCL | ITEM_L2 | LOG_OP | SRC_L2_M);
//}
this->stats.set_misses_l2++;
} else {
issue_touch(key,valuelen,now, ITEM_L1 | SRC_L2_H | ITEM_DIRTY);
}
}
//if (!found) {
// //int incl = op->incl;
@ -1637,7 +1682,8 @@ void ConnectionMulti::write_callback() {
/**
* Callback for timer timeouts.
*/
void ConnectionMulti::timer_callback() {
void ConnectionMulti::timer_callback() {
//fprintf(stderr,"timer up: %d\n",cid);
drive_write_machine();
}

View File

@ -49,6 +49,7 @@ class ConnectionStats {
copies_to_l1(0),
delete_misses_l2(0),
delete_hits_l2(0),
gets_cid(40), sets_cid(40),
set_incl_hits_l1(0),set_excl_hits_l1(0),
window_gets(0), window_sets(0), window_accesses(0),
window_get_misses(0), skips(0), sampling(_sampling) {}
@ -92,6 +93,8 @@ class ConnectionStats {
uint64_t copies_to_l1;
uint64_t delete_misses_l2;
uint64_t delete_hits_l2;
vector<uint64_t> gets_cid;
vector<uint64_t> sets_cid;
uint64_t set_incl_hits_l1, set_excl_hits_l1;
uint64_t window_gets, window_sets, window_accesses, window_get_misses;
uint64_t skips;
@ -174,6 +177,10 @@ class ConnectionStats {
op_sampler.accumulate(cs.op_sampler);
#endif
for (int i = 0; i < 40; i++) {
gets_cid[i] += cs.gets_cid[i];
sets_cid[i] += cs.sets_cid[i];
}
rx_bytes += cs.rx_bytes;
tx_bytes += cs.tx_bytes;
gets += cs.gets;

View File

@ -20,6 +20,8 @@ public:
uint32_t opaque;
uint32_t flags;
uint16_t clsid;
uint32_t future;
uint32_t curr;
string key;
Operation *l1;

View File

@ -51,8 +51,8 @@ env.Append(CPPFLAGS = ' -O3 -Wall -g')
#env.Append(LDFLAGS = '-fsantize=address')
#env.Append(CFLAGS = ' -O3 -Wall -g -fsantize=address')
#env.Append(CPPFLAGS = ' -O3 -Wall -g -fsanitize=address')
#env.Append(CFLAGS = ' -O0 -Wall -g -fsantize=address')
#env.Append(CPPFLAGS = ' -O0 -Wall -g -fsanitize=address')
#env.Append(CFLAGS = ' -O0 -Wall -g')
#env.Append(CPPFLAGS = ' -O0 -Wall -g')
#env.Append(CFLAGS = '-g -std=c++11 -D_GNU_SOURCE -static-libsan -fsanitize=address -I/u/dbyrne99/local/include' )
#env.Append(CCFLAGS = '-g -std=c++11 -D_GNU_SOURCE -static-libsan -fsanitize=address -I/u/dbyrne99/local/include' )

View File

@ -82,13 +82,15 @@ struct thread_data {
#endif
int id;
//std::vector<ConcurrentQueue<string>*> trace_queue;
std::vector<queue<string>*> *trace_queue;
std::vector<pthread_mutex_t*> *mutexes;
std::vector<queue<Operation>*> *trace_queue;
//std::vector<pthread_mutex_t*> *mutexes;
pthread_mutex_t* g_lock;
std::unordered_map<string,int> *g_wb_keys;
};
struct reader_data {
//std::vector<ConcurrentQueue<string>*> trace_queue;
std::vector<queue<string>*> *trace_queue;
std::vector<queue<Operation>*> *trace_queue;
std::vector<pthread_mutex_t*> *mutexes;
string *trace_filename;
int twitter_trace;
@ -114,7 +116,7 @@ void go(const vector<string> &servers, options_t &options,
//void do_mutilate(const vector<string> &servers, options_t &options,
// ConnectionStats &stats,std::vector<ConcurrentQueue<string>*> trace_queue, bool master = true
void do_mutilate(const vector<string> &servers, options_t &options,
ConnectionStats &stats,std::vector<queue<string>*> *trace_queue, std::vector<pthread_mutex_t*> *mutexes, bool master = true
ConnectionStats &stats,std::vector<queue<Operation>*> *trace_queue, pthread_mutex_t *g_lock, unordered_map<string,int> *g_wb_keys, bool master = true
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket = NULL
#endif
@ -669,6 +671,10 @@ int main(int argc, char **argv) {
additional = 0;
}
}
for (int i = 0; i < 40; i++) {
fprintf(stderr,"class %d, gets: %lu, sets: %lu\n",i,stats.gets_cid[i],stats.sets_cid[i]);
}
//printf("Misses (L1) = %" PRIu64 " (%.1f%%)\n", stats.get_misses_l1 + stats.set_misses_l1,
// (double) (stats.get_misses_l1 + stats.set_misses_l1) /(stats.gets + stats.sets)*100);
printf("Misses (L1) = %" PRIu64 " (%.1f%%)\n", stats.get_misses_l1 ,
@ -737,15 +743,20 @@ void go(const vector<string>& servers, options_t& options,
#endif
//std::vector<ConcurrentQueue<string>*> trace_queue; // = (ConcurrentQueue<string>**)malloc(sizeof(ConcurrentQueue<string>)
std::vector<queue<string>*> *trace_queue = new std::vector<queue<string>*>();
std::vector<queue<Operation>*> *trace_queue = new std::vector<queue<Operation>*>();
// = (ConcurrentQueue<string>**)malloc(sizeof(ConcurrentQueue<string>)
std::vector<pthread_mutex_t*> *mutexes = new std::vector<pthread_mutex_t*>();
//std::vector<pthread_mutex_t*> *mutexes = new std::vector<pthread_mutex_t*>();
pthread_mutex_t *g_lock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
*g_lock = PTHREAD_MUTEX_INITIALIZER;
unordered_map<string,int> *g_wb_keys = new unordered_map<string,int>();
for (int i = 0; i <= options.apps; i++) {
//trace_queue.push_back(new ConcurrentQueue<string>(2000000));
trace_queue->push_back(new std::queue<string>());
pthread_mutex_t *lock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
*lock = PTHREAD_MUTEX_INITIALIZER;
mutexes->push_back(lock);
// //trace_queue.push_back(new ConcurrentQueue<string>(2000000));
// pthread_mutex_t *lock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
// *lock = PTHREAD_MUTEX_INITIALIZER;
// mutexes->push_back(lock);
trace_queue->push_back(new std::queue<Operation>());
}
pthread_mutex_init(&reader_l, NULL);
pthread_cond_init(&reader_ready, NULL);
@ -753,7 +764,7 @@ void go(const vector<string>& servers, options_t& options,
//ConcurrentQueue<string> *trace_queue = new ConcurrentQueue<string>(20000000);
struct reader_data *rdata = (struct reader_data*)malloc(sizeof(struct reader_data));
rdata->trace_queue = trace_queue;
rdata->mutexes = mutexes;
//rdata->mutexes = mutexes;
rdata->twitter_trace = options.twitter_trace;
pthread_t rtid;
if (options.read_file) {
@ -794,7 +805,8 @@ void go(const vector<string>& servers, options_t& options,
td[t].options = &options;
td[t].id = t;
td[t].trace_queue = trace_queue;
td[t].mutexes = mutexes;
td[t].g_lock = g_lock;
td[t].g_wb_keys = g_wb_keys;
#ifdef HAVE_LIBZMQ
td[t].socket = socket;
#endif
@ -856,7 +868,7 @@ void go(const vector<string>& servers, options_t& options,
//delete trace_queue;
} else if (options.threads == 1) {
do_mutilate(servers, options, stats, trace_queue, mutexes, true
do_mutilate(servers, options, stats, trace_queue, g_lock, g_wb_keys, true
#ifdef HAVE_LIBZMQ
, socket
#endif
@ -977,10 +989,11 @@ static char *get_stream(ZSTD_DCtx* dctx, FILE *fin, size_t const buffInSize, voi
void* reader_thread(void *arg) {
struct reader_data *rdata = (struct reader_data *) arg;
//std::vector<ConcurrentQueue<string>*> trace_queue = (std::vector<ConcurrentQueue<string>*>) rdata->trace_queue;
std::vector<queue<string>*> *trace_queue = (std::vector<queue<string>*>*) rdata->trace_queue;
std::vector<pthread_mutex_t*> *mutexes = (std::vector<pthread_mutex_t*>*) rdata->mutexes;
std::vector<queue<Operation>*> *trace_queue = (std::vector<queue<Operation>*>*) rdata->trace_queue;
// std::vector<pthread_mutex_t*> *mutexes = (std::vector<pthread_mutex_t*>*) rdata->mutexes;
int twitter_trace = rdata->twitter_trace;
string fn = *(rdata->trace_filename);
srand(time(NULL));
if (hasEnding(fn,".zst")) {
//init
const char *filename = fn.c_str();
@ -990,13 +1003,16 @@ void* reader_thread(void *arg) {
size_t const buffOutSize = ZSTD_DStreamOutSize()*1000;
void* const buffOut = malloc_orDie(buffOutSize);
map<string,Operation> key_hist;
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
CHECK(dctx != NULL, "ZSTD_createDCtx() failed!");
//CHECK(dctx != NULL, "ZSTD_createDCtx() failed!");
//char *leftover = malloc(buffOutSize);
//memset(leftover,0,buffOutSize);
//char *trace = (char*)decompress(filename);
uint64_t nwrites = 0;
uint64_t n = 0;
uint64_t nout = 1;
int cappid = 1;
fprintf(stderr,"%lu trace queues for connections\n",trace_queue->size());
char *trace = get_stream(dctx, fin, buffInSize, buffIn, buffOutSize, buffOut);
while (trace != NULL) {
char *ftrace = trace;
@ -1009,15 +1025,16 @@ void* reader_thread(void *arg) {
int appid = 0;
if (full_line.length() > 10) {
if (trace_queue->size() > 1) {
if (trace_queue->size() > 0) {
stringstream ss(full_line);
string rT;
string rApp;
string rKey;
string rOp;
string rvaluelen;
Operation Op;
if (twitter_trace == 1) {
string rKeySize;
string rvaluelen;
size_t n = std::count(full_line.begin(), full_line.end(), ',');
if (n == 6) {
getline( ss, rT, ',' );
@ -1025,7 +1042,25 @@ void* reader_thread(void *arg) {
getline( ss, rKeySize, ',' );
getline( ss, rvaluelen, ',' );
getline( ss, rApp, ',' );
appid = (stoi(rApp)) % trace_queue->size();
getline( ss, rOp, ',' );
if (rOp.compare("get") == 0) {
Op.type = Operation::GET;
} else if (rOp.compare("set") == 0) {
Op.type = Operation::SET;
}
if (rvaluelen.compare("") == 0 || rvaluelen.size() < 1 || rvaluelen.empty()) {
continue;
}
//appid = cappid;
//if (nout % 1000 == 0) {
// cappid++;
// cappid = cappid % trace_queue->size();
// if (cappid == 0) cappid = 1;
//}
appid = stoi(rApp) % trace_queue->size();
if (appid == 0) appid = 1;
} else {
continue;
}
@ -1038,20 +1073,102 @@ void* reader_thread(void *arg) {
getline( ss, rApp, ',');
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
int ot = stoi(rOp);
switch (ot) {
case 1:
Op.type = Operation::GET;
break;
case 2:
Op.type = Operation::SET;
break;
}
appid = (stoi(rApp)) % trace_queue->size();
//appid = (nout) % trace_queue->size();
} else {
continue;
}
}
if (appid < (int)trace_queue->size()) {
//pthread_mutex_lock(mutexes[appid]);
trace_queue->at(appid)->push(full_line);
//pthread_mutex_unlock(mutexes[appid]);
else if (twitter_trace == 3) {
size_t n = std::count(full_line.begin(), full_line.end(), ',');
if (n == 4) {
getline( ss, rT, ',');
getline( ss, rApp, ',');
getline( ss, rOp, ',' );
getline( ss, rKey, ',' );
getline( ss, rvaluelen, ',' );
int ot = stoi(rOp);
switch (ot) {
case 1:
Op.type = Operation::GET;
break;
case 2:
Op.type = Operation::SET;
break;
}
//appid = (rand() % (trace_queue->size()-1)) + 1;
appid = cappid;
if (nout % 1000 == 0) {
cappid++;
cappid = cappid % trace_queue->size();
if (cappid == 0) cappid = 1;
}
} else {
continue;
}
}
else if (twitter_trace == 4) {
size_t n = std::count(full_line.begin(), full_line.end(), ',');
if (n == 3) {
getline( ss, rT, ',');
getline( ss, rKey, ',' );
getline( ss, rOp, ',' );
getline( ss, rvaluelen, ',' );
int ot = stoi(rOp);
switch (ot) {
case 1:
Op.type = Operation::GET;
break;
case 2:
Op.type = Operation::SET;
break;
}
if (rvaluelen == "0") {
rvaluelen = "50000";
}
appid = (rand() % (trace_queue->size()-1)) + 1;
//appid = cappid;
//if (nout % 1000 == 0) {
// cappid++;
// cappid = cappid % trace_queue->size();
// if (cappid == 0) cappid = 1;
//}
} else {
continue;
}
}
int vl = stoi(rvaluelen);
if (appid < (int)trace_queue->size() && vl < 524000 && vl > 1) {
Op.valuelen = vl;
Op.key = rKey;
if (Op.type == Operation::GET) {
//find when was last read
Operation last_op = key_hist[rKey];
if (last_op.valuelen > 0) {
last_op.future = nout; //THE FUTURE IS NOW
Op.curr = nout;
key_hist[rKey] = Op;
} else {
//first ref
key_hist[rKey] = Op;
}
}
trace_queue->at(appid)->push(Op);
}
} else {
//pthread_mutex_lock(mutexes[appid]);
trace_queue->at(appid)->push(full_line);
//pthread_mutex_unlock(mutexes[appid]);
fprintf(stderr,"big error!\n");
}
}
//bool res = trace_queue[appid]->try_enqueue(full_line);
@ -1060,8 +1177,8 @@ void* reader_thread(void *arg) {
// //res = trace_queue[appid]->try_enqueue(full_line);
// nwrites++;
//}
n++;
if (n % 1000000 == 0) fprintf(stderr,"decompressed requests: %lu, waits: %lu\n",n,nwrites);
nout++;
if (nout % 1000000 == 0) fprintf(stderr,"decompressed requests: %lu, waits: %lu\n",nout,nwrites);
//if (n > 100000000) {
// pthread_mutex_lock(&reader_l);
// reader_not_ready = 0;
@ -1075,11 +1192,15 @@ void* reader_thread(void *arg) {
trace = get_stream(dctx, fin, buffInSize, buffIn, buffOutSize, buffOut);
}
string eof = "EOF";
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < (int)trace_queue->size(); j++) {
//trace_queue[j]->enqueue(eof);
Operation eof;
eof.type = Operation::SASL;
trace_queue->at(j)->push(eof);
if (i == 0) {
fprintf(stderr,"appid %d, tq size: %ld\n",j,trace_queue->at(j)->size());
}
}
}
@ -1126,7 +1247,7 @@ void* thread_main(void *arg) {
}
ConnectionStats *cs = new ConnectionStats();
do_mutilate(*td->servers, *td->options, *cs, td->trace_queue, td->mutexes, td->master
do_mutilate(*td->servers, *td->options, *cs, td->trace_queue, td->g_lock, td->g_wb_keys, td->master
#ifdef HAVE_LIBZMQ
, td->socket
#endif
@ -1136,7 +1257,7 @@ void* thread_main(void *arg) {
}
void do_mutilate(const vector<string>& servers, options_t& options,
ConnectionStats& stats, vector<queue<string>*> *trace_queue, vector<pthread_mutex_t*> *mutexes, bool master
ConnectionStats& stats, vector<queue<Operation>*> *trace_queue, pthread_mutex_t* g_lock, unordered_map<string,int> *g_wb_keys, bool master
#ifdef HAVE_LIBZMQ
, zmq::socket_t* socket
#endif
@ -1215,9 +1336,9 @@ void do_mutilate(const vector<string>& servers, options_t& options,
sleep(d);
}
if (connected) {
fprintf(stderr,"cid %d gets trace_queue\nfirst: %s",conn->get_cid(),trace_queue->at(conn->get_cid())->front().c_str());
conn->set_queue(trace_queue->at(conn->get_cid()));
conn->set_lock(mutexes->at(conn->get_cid()));
//fprintf(stderr,"cid %d gets trace_queue\nfirst: %s",conn->get_cid(),trace_queue->at(conn->get_cid())->front().c_str());
//conn->set_queue(trace_queue->at(conn->get_cid()));
//conn->set_lock(mutexes->at(conn->get_cid()));
connections.push_back(conn);
} else {
fprintf(stderr,"conn: %d, not connected!!\n",c);
@ -1455,17 +1576,18 @@ void do_mutilate(const vector<string>& servers, options_t& options,
int addrlen;
addrlen = sizeof(sin1);
int max_tries = 13;
int max_tries = 50;
int n_tries = 0;
int s = 2;
int s = 10;
while (connect(fd1, (struct sockaddr*)&sin1, addrlen) == -1) {
perror("l1 connect error");
if (n_tries++ > max_tries) {
fprintf(stderr,"conn l1 %d unable to connect after sleep for %d\n",c+1,s);
exit(-1);
}
int d = s + rand() % 10;
sleep(d);
s += 4;
int d = s + rand() % 100;
usleep(d);
s = (int)((double)s*1.25);
}
int fd2 = -1;
@ -1480,15 +1602,16 @@ void do_mutilate(const vector<string>& servers, options_t& options,
fcntl(fd2, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */
addrlen = sizeof(sin2);
n_tries = 0;
s = 2;
s = 10;
while (connect(fd2, (struct sockaddr*)&sin2, addrlen) == -1) {
perror("l2 connect error");
if (n_tries++ > max_tries) {
fprintf(stderr,"conn l2 %d unable to connect after sleep for %d\n",c+1,s);
exit(-1);
}
int d = s + rand() % 10;
sleep(d);
s += 4;
int d = s + rand() % 100;
usleep(d);
s = (int)((double)s*1.25);
}
@ -1503,16 +1626,23 @@ void do_mutilate(const vector<string>& servers, options_t& options,
if (connected) {
fprintf(stderr,"cid %d gets l1 fd %d l2 fd %d\n",cid,fd1,fd2);
fprintf(stderr,"cid %d gets trace_queue\nfirst: %s\n",cid,trace_queue->at(cid)->front().c_str());
fprintf(stderr,"cid %d gets trace_queue\nfirst: %s\n",cid,trace_queue->at(cid)->front().key.c_str());
if (g_lock != NULL) {
conn->set_g_wbkeys(g_wb_keys);
conn->set_lock(g_lock);
}
conn->set_queue(trace_queue->at(cid));
conn->set_lock(mutexes->at(cid));
connections.push_back(conn);
} else {
fprintf(stderr,"conn multi: %d, not connected!!\n",c);
}
}
// wait for all threads to reach here
pthread_barrier_wait(&barrier);
fprintf(stderr,"thread %ld gtg\n",pthread_self());
// Wait for all Connections to become IDLE.
while (1) {
// FIXME: If all connections become ready before event_base_loop
@ -1526,14 +1656,16 @@ void do_mutilate(const vector<string>& servers, options_t& options,
if (restart) continue;
else break;
}
double start = get_time();
double now = start;
for (ConnectionMulti *conn: connections) {
conn->start_time = start;
conn->start(); // Kick the Connection into motion.
}
fprintf(stderr,"Start = %f\n", start);
//fprintf(stderr,"Start = %f\n", start);
// Main event loop.
while (1) {