updated samples

This commit is contained in:
Daniel Byrne 2019-02-22 09:28:13 -05:00
parent e76e69d939
commit ba0349cb05
7 changed files with 244 additions and 100 deletions

View File

@ -73,8 +73,8 @@ public:
}
void print_header() {
printf("#%-6s %6s %8s %8s %8s %8s %8s %8s\n", "type", "size",
"min", "max", "avg", "90th", "95th", "99th");
printf("#%-6s %6s %8s %8s %8s %8s %8s %8s %8s %8s\n", "type", "size",
"min", "max", "avg", "50th", "90th", "95th", "99th", "99.9th");
}
void print_stats(const char *type, const char *size) {
@ -82,17 +82,18 @@ public:
size_t l = samples_copy.size();
if (l == 0) {
printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
return;
}
sort(samples_copy.begin(), samples_copy.end());
printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
printf("%-7s %6s %8.1f %8.1f% 8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size,
samples_copy[0], samples_copy[l-1], average(),
samples_copy[(l*50)/100],
samples_copy[(l*90)/100], samples_copy[(l*95)/100],
samples_copy[(l*99)/100]);
samples_copy[(l*99)/100], samples_copy[(l*99.9)/100]);
}
};

View File

@ -130,13 +130,10 @@ void Connection::start_loading() {
* Issue either a get or set request to the server according to our probability distribution.
*/
void Connection::issue_something(double now) {
char skey[256];
char key[256];
// FIXME: generate key distribution here!
string keystr = keygen->generate(lrand48() % options.records);
strcpy(skey, keystr.c_str());
strcpy(key, options.prefix);
strcat(key,skey);
strcpy(key, keystr.c_str());
if (drand48() < options.update) {
int index = lrand48() % (1024 * 1024);
@ -201,11 +198,8 @@ void Connection::issue_getset(double now) {
{
string keystr;
char key[256];
char skey[256];
keystr = keygen->generate(lrand48() % options.records);
strcpy(skey, keystr.c_str());
strcpy(key,options.prefix);
strcat(key,skey);
strcpy(key, keystr.c_str());
char log[256];
int length = valuesize->generate();
@ -229,10 +223,7 @@ void Connection::issue_getset(double now) {
key_len[rKey] = rvaluelen;
char key[256];
char skey[256];
strcpy(skey, rKey.c_str());
strcpy(key,options.prefix);
strcat(key,skey);
strcpy(key, rKey.c_str());
issue_get(key, now);
}
}
@ -555,6 +546,9 @@ void Connection::read_callback() {
bool done, found, full_read;
//initially assume found (for sets that may come through here)
//is this correct? do we want to assume true in case that
//GET was found, but wrong value size (i.e. update value)
//
found = true;
if (op_queue.size() == 0) V("Spurious read callback.");
@ -568,31 +562,59 @@ void Connection::read_callback() {
case WAITING_FOR_GET:
assert(op_queue.size() > 0);
full_read = prot->handle_response(input, done, found);
int obj_size;
full_read = prot->handle_response(input, done, found, obj_size);
if (!full_read) {
return;
} else if (done) {
if (!found && options.getset && stats.gets >= 1)
{
string keystr = op->key;
strcpy(last_key, keystr.c_str());
last_miss = 1;
}
else if (options.getset)
{
string keystr = op->key;
strcpy(last_key, keystr.c_str());
last_miss = 0;
}
if (!found && options.getset)
{
string keystr = op->key;
strcpy(last_key, keystr.c_str());
last_miss = 1;
}
else if (found && options.getset)
{
string keystr = op->key;
strcpy(last_key, keystr.c_str());
char vlen[256];
string valuelen = key_len[keystr];
strcpy(vlen, valuelen.c_str());
size_t vl = atoi(vlen);
//char log[256];
//sprintf(log,"key %s, resp size: %d, last GET size %lu\n",keystr.c_str(),obj_size, vl);
//write(2,log,strlen(log));
if (obj_size != (int)vl)
{
stats.window_get_misses++;
stats.get_misses++;
//char log[256];
//sprintf(log,"update key %s\n",keystr.c_str());
//write(2,log,strlen(log));
last_miss = 1;
}
else
{
//char log[256];
//sprintf(log,"same key %s\n",keystr.c_str());
//write(2,log,strlen(log));
last_miss = 0;
}
}
//char log[256];
//sprintf(log,"%f,%d,%d,%d,%d,%d,%d\n",
// r_time,r_appid,r_type,r_ksize,r_vsize,r_key,r_hit);
//write(2,log,strlen(log));
finish_op(op); // sets read_state = IDLE
}
@ -601,7 +623,7 @@ void Connection::read_callback() {
case WAITING_FOR_SET:
assert(op_queue.size() > 0);
if (!prot->handle_response(input, done, found)) return;
if (!prot->handle_response(input, done, found, obj_size)) return;
//char log[256];
@ -613,13 +635,13 @@ void Connection::read_callback() {
break;
case WAITING_FOR_DELETE:
if (!prot->handle_response(input,done,found)) return;
if (!prot->handle_response(input,done,found, obj_size)) return;
finish_op(op);
break;
case LOADING:
assert(op_queue.size() > 0);
if (!prot->handle_response(input, done, found)) return;
if (!prot->handle_response(input, done, found, obj_size)) return;
loader_completed++;
pop_op();

View File

@ -125,9 +125,9 @@ class ConnectionStats {
}
static void print_header() {
printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n",
printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s %7s %7s\n",
"#type", "avg", "std", "min", /*"1st",*/ "5th", "10th",
"90th", "95th", "99th");
"50th", "90th", "95th", "99th", "99.9th");
}
#ifdef USE_ADAPTIVE_SAMPLER
@ -139,18 +139,18 @@ class ConnectionStats {
size_t l = copy.size();
if (l == 0) {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
if (newline) printf("\n");
return;
}
sort(copy.begin(), copy.end());
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l,
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100]
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], copy[(l*50) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100], copy[(l*99.9) / 100]
);
if (newline) printf("\n");
}
@ -166,10 +166,10 @@ class ConnectionStats {
sort(copy.begin(), copy.end());
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l,
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100]
copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], copy[(l*50) / 100],
copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100], copy[(l*99.9) / 100]
);
if (newline) printf("\n");
}
@ -177,8 +177,8 @@ class ConnectionStats {
void print_stats(const char *tag, HistogramSampler &sampler,
bool newline = true) {
if (sampler.total() == 0) {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
if (newline) printf("\n");
return;
}
@ -186,8 +186,8 @@ class ConnectionStats {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, sampler.average(),
sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5),
sampler.get_nth(10), sampler.get_nth(90),
sampler.get_nth(95), sampler.get_nth(99));
sampler.get_nth(10), sampler.get_nth(50), sampler.get_nth(90),
sampler.get_nth(95), sampler.get_nth(99), sampler.get_nth(99.9));
if (newline) printf("\n");
}
@ -195,17 +195,18 @@ class ConnectionStats {
void print_stats(const char *tag, LogHistogramSampler &sampler,
bool newline = true) {
if (sampler.total() == 0) {
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
if (newline) printf("\n");
return;
}
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f",
tag, sampler.average(), sampler.stddev(),
sampler.get_nth(0), /*sampler.get_nth(1),*/ sampler.get_nth(5),
sampler.get_nth(10), sampler.get_nth(90),
sampler.get_nth(95), sampler.get_nth(99));
sampler.get_nth(0), sampler.get_nth(5),
sampler.get_nth(10), sampler.get_nth(50),
sampler.get_nth(90), sampler.get_nth(95),
sampler.get_nth(99), sampler.get_nth(99.9) );
if (newline) printf("\n");
}

View File

@ -17,7 +17,6 @@ public:
type_enum type;
string key;
// string value;
double time() const { return (end_time - start_time) * 1000000; }
};

View File

@ -68,7 +68,7 @@ int ProtocolRESP::set_request(const char* key, const char* value, int len) {
"*3\r\n$3\r\nSET\r\n$%lu\r\n%s\r\n$%d\r\n%s\r\n",
strlen(key),key,len,val);
l += len + 2;
if (read_state == IDLE) read_state = WAITING_FOR_END;
if (read_state == IDLE) read_state = WAITING_FOR_GET;
free(val);
return l;
}
@ -188,49 +188,167 @@ int ProtocolRESP::delete90_request() {
*
*
*/
bool ProtocolRESP::handle_response(evbuffer *input, bool &done, bool &found) {
bool ProtocolRESP::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) {
char *buf = NULL;
char *databuf = NULL;
char *obj_size_str = NULL;
int len;
size_t n_read_out;
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF_STRICT);
if (buf == NULL)
{
done = false;
return false;
}
conn->stats.rx_bytes += n_read_out;
//RESP null response => miss
if (!strncmp(buf,"$-1",3))
{
conn->stats.get_misses++;
conn->stats.window_get_misses++;
found = false;
switch (read_state) {
case WAITING_FOR_GET:
}
//HSET or SET response was good, just consume the input and move on
//with our lives
else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) )
{
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
if (buf == NULL) return false;
obj_size_str = buf+1;
obj_size = atoi(obj_size_str);
conn->stats.rx_bytes += n_read_out;
databuf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
//fprintf(stderr,"--------------------\n");
//fprintf(stderr,"resp size %lu\n",n_read_out);
//fprintf(stderr,"data size %d\n",obj_size);
//fprintf(stderr,"-------header---------\n");
//fprintf(stderr,"%s\n",buf);
//fprintf(stderr,"-------data-----------\n");
//fprintf(stderr,"%s\n",databuf);
conn->stats.rx_bytes += n_read_out;
if (!strncmp(buf,"$-1",3)) {
conn->stats.get_misses++;
conn->stats.window_get_misses++;
found = false;
done = true;
} else if ((int)n_read_out != obj_size) {
// FIXME: check key name to see if it corresponds to the op at
// the head of the op queue? This will be necessary to
// support "gets" where there may be misses.
data_length = obj_size;
read_state = WAITING_FOR_GET_DATA;
done = false;
} else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) ) {
found = false;
done = true;
} else {
// got all the data..
found = true;
done = true;
}
if (databuf)
free(databuf);
free(buf);
return true;
case WAITING_FOR_GET_DATA:
len = evbuffer_get_length(input);
//finally got all data...
if (len >= data_length + 2) {
evbuffer_drain(input, data_length + 2);
conn->stats.rx_bytes += data_length + 2;
read_state = WAITING_FOR_GET;
obj_size = data_length;
found = true;
done = true;
return true;
}
return false;
default: printf("state: %d\n", read_state); DIE("Unimplemented!");
}
//else we got a hit
else
{
if (buf)
free(buf);
// Consume the next "foobar"
buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF_STRICT);
conn->stats.rx_bytes += n_read_out;
found = true;
}
done = true;
free(buf);
return true;
DIE("Shouldn't ever reach here...");
}
//char *buf = NUL; //for initial readline
//char *dbuf = NULL; //for data readline
//size_t n_read_out;
//buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
//if (buf == NULL)
//{
// done = false;
// return false;
//}
//conn->stats.rx_bytes += n_read_out;
//
//size_t len = evbuffer_get_length(input);
//fprintf(stderr,"--------------------\n");
//fprintf(stderr,"resp size %lu\n",n_read_out);
//fprintf(stderr,"ev len %lu\n",len);
//fprintf(stderr,"--------------------\n");
//fprintf(stderr,"%s\n",buf);
////RESP null response => miss
//if (!strncmp(buf,"$-1",3))
//{
// conn->stats.get_misses++;
// conn->stats.window_get_misses++;
// found = false;
//
//}
////HSET or SET response was good, just consume the input and move on
////with our lives
//else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) )
//{
// found = false;
// done = true;
//}
////else we got a hit
//else
//{
// char* nlen = buf+1;
// //fprintf(stderr,"%s\n",nlen);
// obj_size = atoi(nlen);
// // Consume the next "foobar"
// //size_t len = evbuffer_get_length(input);
// //dbuf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF);
// //if (!dbuf)
// //{
// // fprintf(stderr,"--------------------\n");
// // fprintf(stderr,"next foobar (null) %lu\n",n_read_out);
// // fprintf(stderr,"ev len %lu\n",len);
// // fprintf(stderr,"--------------------\n");
// // fprintf(stderr,"%s\n",dbuf);
// // //read_state = WAITING_FOR_GET_DATA;
// // //done = false;
// // //return false;
// //}
// //else
// //{
// // fprintf(stderr,"--------------------\n");
// // fprintf(stderr,"next foobar (null) %lu\n",n_read_out);
// // fprintf(stderr,"ev len %lu\n",len);
// // fprintf(stderr,"--------------------\n");
// // fprintf(stderr,"%s\n",dbuf);
// //}
// //conn->stats.rx_bytes += n_read_out;
// found = true;
//}
////read_state = WAITING_FOR_GET;
////fprintf(stderr,"--------------------\n");
////fprintf(stderr,"read_state %u\n",read_state);
////fprintf(stderr,"--------------------\n");
//done = true;
////if (dbuf)
//// free(dbuf);
//free(buf);
//return true;
}
//}
/**
* Send an ascii get request.
@ -269,7 +387,7 @@ int ProtocolAscii::delete90_request() {
/**
* Handle an ascii response.
*/
bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found) {
bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) {
char *buf = NULL;
int len;
size_t n_read_out;
@ -299,6 +417,7 @@ bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found) {
// support "gets" where there may be misses.
data_length = len;
obj_size = len;
read_state = WAITING_FOR_GET_DATA;
done = false;
} else {
@ -353,7 +472,8 @@ bool ProtocolBinary::setup_connection_r(evbuffer* input) {
if (!opts.sasl) return true;
bool b,c;
return handle_response(input, b, c);
int obj_size;
return handle_response(input, b, c, obj_size);
}
/**
@ -406,7 +526,7 @@ int ProtocolBinary::delete90_request() {
* @param input evBuffer to read response from
* @return true if consumed, false if not enough data in buffer.
*/
bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found) {
bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) {
// Read the first 24 bytes as a header
int length = evbuffer_get_length(input);
if (length < 24) return false;
@ -418,6 +538,7 @@ bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found) {
int targetLen = 24 + ntohl(h->body_len);
if (length < targetLen) return false;
obj_size = ntohl(h->body_len);
// If something other than success, count it as a miss
if (h->opcode == CMD_GET && h->status) {
conn->stats.get_misses++;

View File

@ -21,7 +21,7 @@ public:
virtual int get_request(const char* key) = 0;
virtual int set_request(const char* key, const char* value, int len) = 0;
virtual int delete90_request() = 0;
virtual bool handle_response(evbuffer* input, bool &done, bool &found) = 0;
virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size) = 0;
protected:
options_t opts;
@ -43,7 +43,7 @@ public:
virtual int get_request(const char* key);
virtual int set_request(const char* key, const char* value, int len);
virtual int delete90_request();
virtual bool handle_response(evbuffer* input, bool &done, bool &found);
virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size);
private:
enum read_fsm {
@ -68,7 +68,7 @@ public:
virtual int get_request(const char* key);
virtual int set_request(const char* key, const char* value, int len);
virtual int delete90_request();
virtual bool handle_response(evbuffer* input, bool &done, bool &found);
virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size);
};
class ProtocolRESP : public Protocol {
@ -84,7 +84,7 @@ public:
virtual int hget_request(const char* key);
virtual int hset_request(const char* key, const char* value, int len);
virtual int delete90_request();
virtual bool handle_response(evbuffer* input, bool &done, bool &found);
virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size);
private:
enum read_fsm {

View File

@ -8,7 +8,7 @@ env['HAVE_POSIX_BARRIER'] = True
env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include'])
env.Append(LIBPATH = ['/opt/local/lib'])
env.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE ')
env.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE')
if sys.platform == 'darwin':
env['CC'] = 'clang'
env['CXX'] = 'clang++'
@ -37,8 +37,8 @@ if not conf.CheckFunc('pthread_barrier_init'):
env = conf.Finish()
env.Append(CFLAGS = ' -O3 -Wall -g')
env.Append(CPPFLAGS = ' -O3 -Wall -g')
env.Append(CFLAGS = ' -O0 -Wall -g')
env.Append(CPPFLAGS = ' -O0 -Wall -g')
env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE')