diff --git a/AdaptiveSampler.h b/AdaptiveSampler.h index e6efbc5..e2f08a6 100644 --- a/AdaptiveSampler.h +++ b/AdaptiveSampler.h @@ -73,8 +73,8 @@ public: } void print_header() { - printf("#%-6s %6s %8s %8s %8s %8s %8s %8s\n", "type", "size", - "min", "max", "avg", "90th", "95th", "99th"); + printf("#%-6s %6s %8s %8s %8s %8s %8s %8s %8s %8s\n", "type", "size", + "min", "max", "avg", "50th", "90th", "95th", "99th", "99.9th"); } void print_stats(const char *type, const char *size) { @@ -82,17 +82,18 @@ public: size_t l = samples_copy.size(); if (l == 0) { - printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, + printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); return; } sort(samples_copy.begin(), samples_copy.end()); - printf("%-7s %6s %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, + printf("%-7s %6s %8.1f %8.1f% 8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", type, size, samples_copy[0], samples_copy[l-1], average(), + samples_copy[(l*50)/100], samples_copy[(l*90)/100], samples_copy[(l*95)/100], - samples_copy[(l*99)/100]); + samples_copy[(l*99)/100], samples_copy[(l*99.9)/100]); } }; diff --git a/Connection.cc b/Connection.cc index 6077741..8457b1c 100644 --- a/Connection.cc +++ b/Connection.cc @@ -130,13 +130,10 @@ void Connection::start_loading() { * Issue either a get or set request to the server according to our probability distribution. */ void Connection::issue_something(double now) { - char skey[256]; char key[256]; // FIXME: generate key distribution here! string keystr = keygen->generate(lrand48() % options.records); - strcpy(skey, keystr.c_str()); - strcpy(key, options.prefix); - strcat(key,skey); + strcpy(key, keystr.c_str()); if (drand48() < options.update) { int index = lrand48() % (1024 * 1024); @@ -201,11 +198,8 @@ void Connection::issue_getset(double now) { { string keystr; char key[256]; - char skey[256]; keystr = keygen->generate(lrand48() % options.records); - strcpy(skey, keystr.c_str()); - strcpy(key,options.prefix); - strcat(key,skey); + strcpy(key, keystr.c_str()); char log[256]; int length = valuesize->generate(); @@ -229,10 +223,7 @@ void Connection::issue_getset(double now) { key_len[rKey] = rvaluelen; char key[256]; - char skey[256]; - strcpy(skey, rKey.c_str()); - strcpy(key,options.prefix); - strcat(key,skey); + strcpy(key, rKey.c_str()); issue_get(key, now); } } @@ -555,6 +546,9 @@ void Connection::read_callback() { bool done, found, full_read; //initially assume found (for sets that may come through here) + //is this correct? do we want to assume true in case that + //GET was found, but wrong value size (i.e. update value) + // found = true; if (op_queue.size() == 0) V("Spurious read callback."); @@ -568,31 +562,59 @@ void Connection::read_callback() { case WAITING_FOR_GET: assert(op_queue.size() > 0); - full_read = prot->handle_response(input, done, found); + + int obj_size; + full_read = prot->handle_response(input, done, found, obj_size); if (!full_read) { return; } else if (done) { - if (!found && options.getset && stats.gets >= 1) - { - string keystr = op->key; - strcpy(last_key, keystr.c_str()); - last_miss = 1; - } - else if (options.getset) - { - string keystr = op->key; - strcpy(last_key, keystr.c_str()); - last_miss = 0; - } + if (!found && options.getset) + { + string keystr = op->key; + strcpy(last_key, keystr.c_str()); + last_miss = 1; + } + else if (found && options.getset) + { + string keystr = op->key; + strcpy(last_key, keystr.c_str()); + + + char vlen[256]; + string valuelen = key_len[keystr]; + strcpy(vlen, valuelen.c_str()); + size_t vl = atoi(vlen); + + //char log[256]; + //sprintf(log,"key %s, resp size: %d, last GET size %lu\n",keystr.c_str(),obj_size, vl); + //write(2,log,strlen(log)); + if (obj_size != (int)vl) + { + + stats.window_get_misses++; + stats.get_misses++; + //char log[256]; + //sprintf(log,"update key %s\n",keystr.c_str()); + //write(2,log,strlen(log)); + last_miss = 1; + } + else + { + //char log[256]; + //sprintf(log,"same key %s\n",keystr.c_str()); + //write(2,log,strlen(log)); + last_miss = 0; + } + } //char log[256]; //sprintf(log,"%f,%d,%d,%d,%d,%d,%d\n", // r_time,r_appid,r_type,r_ksize,r_vsize,r_key,r_hit); //write(2,log,strlen(log)); - + finish_op(op); // sets read_state = IDLE } @@ -601,7 +623,7 @@ void Connection::read_callback() { case WAITING_FOR_SET: assert(op_queue.size() > 0); - if (!prot->handle_response(input, done, found)) return; + if (!prot->handle_response(input, done, found, obj_size)) return; //char log[256]; @@ -613,13 +635,13 @@ void Connection::read_callback() { break; case WAITING_FOR_DELETE: - if (!prot->handle_response(input,done,found)) return; + if (!prot->handle_response(input,done,found, obj_size)) return; finish_op(op); break; case LOADING: assert(op_queue.size() > 0); - if (!prot->handle_response(input, done, found)) return; + if (!prot->handle_response(input, done, found, obj_size)) return; loader_completed++; pop_op(); diff --git a/ConnectionStats.h b/ConnectionStats.h index 9b492b3..8bd5dd6 100644 --- a/ConnectionStats.h +++ b/ConnectionStats.h @@ -125,9 +125,9 @@ class ConnectionStats { } static void print_header() { - printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s\n", + printf("%-7s %7s %7s %7s %7s %7s %7s %7s %7s %7s %7s\n", "#type", "avg", "std", "min", /*"1st",*/ "5th", "10th", - "90th", "95th", "99th"); + "50th", "90th", "95th", "99th", "99.9th"); } #ifdef USE_ADAPTIVE_SAMPLER @@ -139,18 +139,18 @@ class ConnectionStats { size_t l = copy.size(); if (l == 0) { - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", - tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); if (newline) printf("\n"); return; } sort(copy.begin(), copy.end()); - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l, - copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], - copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100] + copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], copy[(l*50) / 100], + copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100], copy[(l*99.9) / 100] ); if (newline) printf("\n"); } @@ -166,10 +166,10 @@ class ConnectionStats { sort(copy.begin(), copy.end()); - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", tag, std::accumulate(copy.begin(), copy.end(), 0.0) / l, - copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], - copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100] + copy[0], copy[(l*1) / 100], copy[(l*5) / 100], copy[(l*10) / 100], copy[(l*50) / 100], + copy[(l*90) / 100], copy[(l*95) / 100], copy[(l*99) / 100], copy[(l*99.9) / 100] ); if (newline) printf("\n"); } @@ -177,8 +177,8 @@ class ConnectionStats { void print_stats(const char *tag, HistogramSampler &sampler, bool newline = true) { if (sampler.total() == 0) { - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", - tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); if (newline) printf("\n"); return; } @@ -186,8 +186,8 @@ class ConnectionStats { printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", tag, sampler.average(), sampler.get_nth(0), sampler.get_nth(1), sampler.get_nth(5), - sampler.get_nth(10), sampler.get_nth(90), - sampler.get_nth(95), sampler.get_nth(99)); + sampler.get_nth(10), sampler.get_nth(50), sampler.get_nth(90), + sampler.get_nth(95), sampler.get_nth(99), sampler.get_nth(99.9)); if (newline) printf("\n"); } @@ -195,17 +195,18 @@ class ConnectionStats { void print_stats(const char *tag, LogHistogramSampler &sampler, bool newline = true) { if (sampler.total() == 0) { - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", - tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + tag, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0); if (newline) printf("\n"); return; } - printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", + printf("%-7s %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f %7.1f", tag, sampler.average(), sampler.stddev(), - sampler.get_nth(0), /*sampler.get_nth(1),*/ sampler.get_nth(5), - sampler.get_nth(10), sampler.get_nth(90), - sampler.get_nth(95), sampler.get_nth(99)); + sampler.get_nth(0), sampler.get_nth(5), + sampler.get_nth(10), sampler.get_nth(50), + sampler.get_nth(90), sampler.get_nth(95), + sampler.get_nth(99), sampler.get_nth(99.9) ); if (newline) printf("\n"); } diff --git a/Operation.h b/Operation.h index a0d8517..d56be55 100644 --- a/Operation.h +++ b/Operation.h @@ -17,7 +17,6 @@ public: type_enum type; string key; - // string value; double time() const { return (end_time - start_time) * 1000000; } }; diff --git a/Protocol.cc b/Protocol.cc index 7cf7031..294eb41 100644 --- a/Protocol.cc +++ b/Protocol.cc @@ -68,7 +68,7 @@ int ProtocolRESP::set_request(const char* key, const char* value, int len) { "*3\r\n$3\r\nSET\r\n$%lu\r\n%s\r\n$%d\r\n%s\r\n", strlen(key),key,len,val); l += len + 2; - if (read_state == IDLE) read_state = WAITING_FOR_END; + if (read_state == IDLE) read_state = WAITING_FOR_GET; free(val); return l; } @@ -188,49 +188,167 @@ int ProtocolRESP::delete90_request() { * * */ -bool ProtocolRESP::handle_response(evbuffer *input, bool &done, bool &found) { +bool ProtocolRESP::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) { + char *buf = NULL; + char *databuf = NULL; + char *obj_size_str = NULL; + int len; size_t n_read_out; - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF_STRICT); - if (buf == NULL) - { - done = false; - return false; - } - conn->stats.rx_bytes += n_read_out; - - //RESP null response => miss - if (!strncmp(buf,"$-1",3)) - { - conn->stats.get_misses++; - conn->stats.window_get_misses++; - found = false; + switch (read_state) { + + case WAITING_FOR_GET: - } - //HSET or SET response was good, just consume the input and move on - //with our lives - else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) ) - { + buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + if (buf == NULL) return false; + + obj_size_str = buf+1; + obj_size = atoi(obj_size_str); + + conn->stats.rx_bytes += n_read_out; + + databuf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + //fprintf(stderr,"--------------------\n"); + //fprintf(stderr,"resp size %lu\n",n_read_out); + //fprintf(stderr,"data size %d\n",obj_size); + //fprintf(stderr,"-------header---------\n"); + //fprintf(stderr,"%s\n",buf); + //fprintf(stderr,"-------data-----------\n"); + //fprintf(stderr,"%s\n",databuf); + + conn->stats.rx_bytes += n_read_out; + + if (!strncmp(buf,"$-1",3)) { + conn->stats.get_misses++; + conn->stats.window_get_misses++; + found = false; + done = true; + } else if ((int)n_read_out != obj_size) { + + + // FIXME: check key name to see if it corresponds to the op at + // the head of the op queue? This will be necessary to + // support "gets" where there may be misses. + + data_length = obj_size; + read_state = WAITING_FOR_GET_DATA; + done = false; + } else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) ) { found = false; done = true; + } else { + // got all the data.. + found = true; + done = true; + } + if (databuf) + free(databuf); + free(buf); + return true; + + case WAITING_FOR_GET_DATA: + + len = evbuffer_get_length(input); + + //finally got all data... + if (len >= data_length + 2) { + evbuffer_drain(input, data_length + 2); + conn->stats.rx_bytes += data_length + 2; + read_state = WAITING_FOR_GET; + obj_size = data_length; + found = true; + done = true; + return true; + } + return false; + + default: printf("state: %d\n", read_state); DIE("Unimplemented!"); } - //else we got a hit - else - { - if (buf) - free(buf); - // Consume the next "foobar" - buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF_STRICT); - conn->stats.rx_bytes += n_read_out; - found = true; - } - done = true; - free(buf); - return true; + + DIE("Shouldn't ever reach here..."); +} + + //char *buf = NUL; //for initial readline + //char *dbuf = NULL; //for data readline + //size_t n_read_out; + + //buf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + //if (buf == NULL) + //{ + // done = false; + // return false; + //} + //conn->stats.rx_bytes += n_read_out; + // + //size_t len = evbuffer_get_length(input); + + //fprintf(stderr,"--------------------\n"); + //fprintf(stderr,"resp size %lu\n",n_read_out); + //fprintf(stderr,"ev len %lu\n",len); + //fprintf(stderr,"--------------------\n"); + //fprintf(stderr,"%s\n",buf); + ////RESP null response => miss + //if (!strncmp(buf,"$-1",3)) + //{ + // conn->stats.get_misses++; + // conn->stats.window_get_misses++; + // found = false; + // + //} + ////HSET or SET response was good, just consume the input and move on + ////with our lives + //else if (!strncmp(buf,"+OK",3) || !strncmp(buf,":1",2) || !strncmp(buf,":0",2) ) + //{ + // found = false; + // done = true; + //} + ////else we got a hit + //else + //{ + // char* nlen = buf+1; + // //fprintf(stderr,"%s\n",nlen); + // obj_size = atoi(nlen); + // // Consume the next "foobar" + // //size_t len = evbuffer_get_length(input); + // //dbuf = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF); + // //if (!dbuf) + // //{ + // // fprintf(stderr,"--------------------\n"); + // // fprintf(stderr,"next foobar (null) %lu\n",n_read_out); + // // fprintf(stderr,"ev len %lu\n",len); + // // fprintf(stderr,"--------------------\n"); + // // fprintf(stderr,"%s\n",dbuf); + + // // //read_state = WAITING_FOR_GET_DATA; + // // //done = false; + // // //return false; + // //} + // //else + // //{ + + // // fprintf(stderr,"--------------------\n"); + // // fprintf(stderr,"next foobar (null) %lu\n",n_read_out); + // // fprintf(stderr,"ev len %lu\n",len); + // // fprintf(stderr,"--------------------\n"); + // // fprintf(stderr,"%s\n",dbuf); + // //} + + // //conn->stats.rx_bytes += n_read_out; + // found = true; + //} + ////read_state = WAITING_FOR_GET; + ////fprintf(stderr,"--------------------\n"); + ////fprintf(stderr,"read_state %u\n",read_state); + ////fprintf(stderr,"--------------------\n"); + //done = true; + ////if (dbuf) + //// free(dbuf); + //free(buf); + //return true; -} +//} /** * Send an ascii get request. @@ -269,7 +387,7 @@ int ProtocolAscii::delete90_request() { /** * Handle an ascii response. */ -bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found) { +bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) { char *buf = NULL; int len; size_t n_read_out; @@ -299,6 +417,7 @@ bool ProtocolAscii::handle_response(evbuffer *input, bool &done, bool &found) { // support "gets" where there may be misses. data_length = len; + obj_size = len; read_state = WAITING_FOR_GET_DATA; done = false; } else { @@ -353,7 +472,8 @@ bool ProtocolBinary::setup_connection_r(evbuffer* input) { if (!opts.sasl) return true; bool b,c; - return handle_response(input, b, c); + int obj_size; + return handle_response(input, b, c, obj_size); } /** @@ -406,7 +526,7 @@ int ProtocolBinary::delete90_request() { * @param input evBuffer to read response from * @return true if consumed, false if not enough data in buffer. */ -bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found) { +bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found, int &obj_size) { // Read the first 24 bytes as a header int length = evbuffer_get_length(input); if (length < 24) return false; @@ -418,6 +538,7 @@ bool ProtocolBinary::handle_response(evbuffer *input, bool &done, bool &found) { int targetLen = 24 + ntohl(h->body_len); if (length < targetLen) return false; + obj_size = ntohl(h->body_len); // If something other than success, count it as a miss if (h->opcode == CMD_GET && h->status) { conn->stats.get_misses++; diff --git a/Protocol.h b/Protocol.h index 91e480a..dd29d49 100644 --- a/Protocol.h +++ b/Protocol.h @@ -21,7 +21,7 @@ public: virtual int get_request(const char* key) = 0; virtual int set_request(const char* key, const char* value, int len) = 0; virtual int delete90_request() = 0; - virtual bool handle_response(evbuffer* input, bool &done, bool &found) = 0; + virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size) = 0; protected: options_t opts; @@ -43,7 +43,7 @@ public: virtual int get_request(const char* key); virtual int set_request(const char* key, const char* value, int len); virtual int delete90_request(); - virtual bool handle_response(evbuffer* input, bool &done, bool &found); + virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size); private: enum read_fsm { @@ -68,7 +68,7 @@ public: virtual int get_request(const char* key); virtual int set_request(const char* key, const char* value, int len); virtual int delete90_request(); - virtual bool handle_response(evbuffer* input, bool &done, bool &found); + virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size); }; class ProtocolRESP : public Protocol { @@ -84,7 +84,7 @@ public: virtual int hget_request(const char* key); virtual int hset_request(const char* key, const char* value, int len); virtual int delete90_request(); - virtual bool handle_response(evbuffer* input, bool &done, bool &found); + virtual bool handle_response(evbuffer* input, bool &done, bool &found, int &obj_size); private: enum read_fsm { diff --git a/SConstruct b/SConstruct index d220aee..d386fe0 100644 --- a/SConstruct +++ b/SConstruct @@ -8,7 +8,7 @@ env['HAVE_POSIX_BARRIER'] = True env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include']) env.Append(LIBPATH = ['/opt/local/lib']) -env.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE ') +env.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE') if sys.platform == 'darwin': env['CC'] = 'clang' env['CXX'] = 'clang++' @@ -37,8 +37,8 @@ if not conf.CheckFunc('pthread_barrier_init'): env = conf.Finish() -env.Append(CFLAGS = ' -O3 -Wall -g') -env.Append(CPPFLAGS = ' -O3 -Wall -g') +env.Append(CFLAGS = ' -O0 -Wall -g') +env.Append(CPPFLAGS = ' -O0 -Wall -g') env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE')