diff --git a/Connection.h b/Connection.h index 7125eaa..b617b9e 100644 --- a/Connection.h +++ b/Connection.h @@ -733,8 +733,10 @@ public: bipbuf_t* bipbuf_out[3]; pthread_mutex_t* lock_in[3]; pthread_mutex_t* lock_out[3]; - pthread_cond_t* cond_in[3]; - pthread_cond_t* cond_out[3]; + pthread_cond_t* cond_in_not_empty[3]; + pthread_cond_t* cond_in_not_full[3]; + pthread_cond_t* cond_out_not_empty[3]; + pthread_cond_t* cond_out_not_full[3]; private: string hostname1; @@ -885,8 +887,13 @@ public: bipbuf_t* bipbuf_out[3]; pthread_mutex_t* lock_in[3]; pthread_mutex_t* lock_out[3]; - pthread_cond_t* cond_in[3]; - pthread_cond_t* cond_out[3]; + + int *bipbuf_out_bytes[3]; + int *bipbuf_in_bytes[3]; + pthread_cond_t* cond_in_not_empty[3]; + pthread_cond_t* cond_in_not_full[3]; + pthread_cond_t* cond_out_not_empty[3]; + pthread_cond_t* cond_out_not_full[3]; private: string hostname1; diff --git a/ConnectionMultiApproxBatchShm.cc b/ConnectionMultiApproxBatchShm.cc index 212f488..21e7593 100644 --- a/ConnectionMultiApproxBatchShm.cc +++ b/ConnectionMultiApproxBatchShm.cc @@ -463,8 +463,12 @@ int ConnectionMultiApproxBatchShm::do_connect() { bipbuf_t bipbuf_out; pthread_mutex_t lock_in; pthread_mutex_t lock_out; - pthread_cond_t cond_in; - pthread_cond_t cond_out; + pthread_cond_t cond_in_not_empty; + pthread_cond_t cond_in_not_full; + pthread_cond_t cond_out_not_empty; + pthread_cond_t cond_out_not_full; + int bipbuf_in_bytes; + int bipbuf_out_bytes; int shared_id; } shared_t; @@ -504,15 +508,24 @@ int ConnectionMultiApproxBatchShm::do_connect() { bipbuf_out[1] = &share_l1->bipbuf_in; bipbuf_out[2] = &share_l2->bipbuf_in; + bipbuf_in_bytes[1] = &share_l1->bipbuf_out_bytes; + bipbuf_in_bytes[2] = &share_l2->bipbuf_out_bytes; + bipbuf_out_bytes[1] = &share_l1->bipbuf_in_bytes; + bipbuf_out_bytes[2] = &share_l2->bipbuf_in_bytes; + lock_in[1] = &share_l1->lock_out; lock_in[2] = &share_l2->lock_out; lock_out[1] = &share_l1->lock_in; lock_out[2] = &share_l2->lock_in; - cond_in[1] = &share_l1->cond_out; - cond_in[2] = &share_l2->cond_out; - cond_out[1] = &share_l1->cond_in; - cond_out[2] = &share_l2->cond_in; + cond_in_not_empty[1] = &share_l1->cond_out_not_empty; + cond_in_not_empty[2] = &share_l2->cond_out_not_empty; + cond_in_not_full[1] = &share_l1->cond_out_not_full; + cond_in_not_full[2] = &share_l2->cond_out_not_full; + cond_out_not_empty[1] = &share_l1->cond_in_not_empty; + cond_out_not_empty[2] = &share_l2->cond_in_not_empty; + cond_out_not_full[1] = &share_l1->cond_in_not_full; + cond_out_not_full[2] = &share_l2->cond_in_not_full; read_state = IDLE; return connected; } @@ -631,24 +644,25 @@ int ConnectionMultiApproxBatchShm::send_write_buffer(int level) { pthread_mutex_lock(lock_out[level]); int to_write = buffer_write_nbytes[level]; int gtg = bipbuf_unused(bipbuf_out[level]) >= to_write ? 1 : 0; - if (gtg) { - int ret = bipbuf_offer(bipbuf_out[level],buffer_write[level],to_write); - if (ret != to_write) { - fprintf(stderr,"error writing buffer! level %d, size %d\n",level,to_write); - } - issued_queue[level] = buffer_write_n[level]; - buffer_write_n[level] = 0; - buffer_write_pos[level] = buffer_write[level]; - memset(buffer_write_pos[level],0,buffer_write_nbytes[level]); - stats.tx_bytes += buffer_write_nbytes[level]; - buffer_write_nbytes[level] = 0; - rc = 2; - pthread_cond_signal(cond_out[level]); - pthread_mutex_unlock(lock_out[level]); - } else { - pthread_mutex_unlock(lock_out[level]); - rc = 1; + while (gtg == 0) { + pthread_cond_wait(cond_out_not_full[level],lock_out[level]); + gtg = bipbuf_unused(bipbuf_out[level]) >= to_write ? 1 : 0; } + int ret = bipbuf_offer(bipbuf_out[level],buffer_write[level],to_write); + if (ret != to_write) { + fprintf(stderr,"error writing buffer! level %d, size %d\n",level,to_write); + } + *bipbuf_out_bytes[level] += to_write; + //fprintf(stderr,"writing %d to %d, total %d\n",to_write,level,*bipbuf_out_bytes[level]); + issued_queue[level] = buffer_write_n[level]; + buffer_write_n[level] = 0; + buffer_write_pos[level] = buffer_write[level]; + memset(buffer_write_pos[level],0,buffer_write_nbytes[level]); + stats.tx_bytes += buffer_write_nbytes[level]; + buffer_write_nbytes[level] = 0; + rc = 2; + pthread_cond_signal(cond_out_not_empty[level]); + pthread_mutex_unlock(lock_out[level]); return rc; } @@ -675,18 +689,6 @@ int ConnectionMultiApproxBatchShm::add_get_op_to_queue(Operation *pop, int level //} h.opaque = htonl(pop->opaque); - //int to_write = buffer_write_nbytes[level] + 24 + keylen; - //int gtg = bipbuf_unused(bipbuf_out[level]) >= to_write ? 1 : 0; - //if (gtg == 0) { - // switch (level) { - // case 1: - // read_callback1(); - // break; - // case 2: - // read_callback2(); - // break; - // } - //} memcpy(buffer_write_pos[level], &h, 24); buffer_write_pos[level] += 24; @@ -696,7 +698,7 @@ int ConnectionMultiApproxBatchShm::add_get_op_to_queue(Operation *pop, int level buffer_write_nbytes[level] += 24 + keylen; int res = 1; - if (buffer_write_n[level] >= (uint32_t)options.depth && cb == 0) { + if (buffer_write_n[level] >= (uint32_t)options.depth) { // && cb == 0) { res = send_write_buffer(level); } return res; @@ -733,7 +735,7 @@ int ConnectionMultiApproxBatchShm::issue_get_with_len(Operation *pop, double now } //put op into queue - return add_get_op_to_queue(pop,level); + return add_get_op_to_queue(pop,level,0); } /** @@ -771,7 +773,7 @@ int ConnectionMultiApproxBatchShm::issue_get_with_len(const char* key, int value pop->l1 = l1; } - return add_get_op_to_queue(pop,level,0); + return add_get_op_to_queue(pop,level,1); } @@ -978,7 +980,7 @@ int ConnectionMultiApproxBatchShm::add_set_to_queue(Operation *pop, int level, c buffer_write_nbytes[level] += length + 32 + keylen; int ret = 1; - if (buffer_write_n[level] >= (uint32_t)options.depth && cb == 0) { + if (buffer_write_n[level] >= (uint32_t)options.depth) { // && cb == 0) { ret = send_write_buffer(level); } return ret; @@ -1001,7 +1003,7 @@ int ConnectionMultiApproxBatchShm::issue_set(Operation *pop, const char* value, pop->opaque = opaque[level]++; pop->flags = flags; - return add_set_to_queue(pop,level,value); + return add_set_to_queue(pop,level,value,0); } @@ -1027,7 +1029,7 @@ int ConnectionMultiApproxBatchShm::issue_set(const char* key, const char* value, pop->flags = flags; pop->clsid = get_class(length,strlen(key)); - return add_set_to_queue(pop,level,value,0); + return add_set_to_queue(pop,level,value,1); } @@ -1202,7 +1204,7 @@ size_t ConnectionMultiApproxBatchShm::handle_response_batch(unsigned char *rbuf_ size_t read_bytes, size_t consumed_bytes, int level, int extra) { if (rbuf_pos[0] != 129) { - //fprintf(stderr,"cid %d we don't have a valid header %u\n",cid,rbuf_pos[0]); + fprintf(stderr,"cid %d we don't have a valid header %u\n",cid,rbuf_pos[0]); //buffer_read_pos[level] = rbuf_pos; //buffer_read_n[level] = 1; return 0; @@ -1216,7 +1218,7 @@ size_t ConnectionMultiApproxBatchShm::handle_response_batch(unsigned char *rbuf_ //buffer_lasthdr[level] = rbuf_pos; //buffer_read_n[level] = need; //buffer_read_nbytes[level] = have; - //fprintf(stderr,"cid %d - we don't have enough header data, need %lu more bytes, have %lu (targetLen: %d) (read_bytes %ld) (extra %d) %d)\n",cid,need,have,24,read_bytes,extra,level); + fprintf(stderr,"cid %d - we don't have enough header data, need %lu more bytes, have %lu (targetLen: %d) (read_bytes %ld) (extra %d) %d)\n",cid,need,have,24,read_bytes,extra,level); return 0; } @@ -1232,7 +1234,7 @@ size_t ConnectionMultiApproxBatchShm::handle_response_batch(unsigned char *rbuf_ buffer_read_n[level] = need; buffer_read_nbytes[level] = have; memcpy(buffer_leftover[level],rbuf_pos,have); - //fprintf(stderr,"cid %d - we don't have enough data, need %lu more bytes, have %lu (targetLen: %d) (read_bytes %ld) (extra %d) %d)\n",cid,need,have,targetLen,read_bytes,extra,level); + fprintf(stderr,"cid %d - we don't have enough data, need %lu more bytes, have %lu (targetLen: %d) (read_bytes %ld) (extra %d) %d)\n",cid,need,have,targetLen,read_bytes,extra,level); return 0; } @@ -1298,43 +1300,51 @@ size_t ConnectionMultiApproxBatchShm::fill_read_buffer(int level, int *extra) { size_t read_bytes = 0; pthread_mutex_lock(lock_in[level]); + //int len = *bipbuf_in_bytes[level]; int len = bipbuf_used(bipbuf_in[level]); while (len == 0) { - pthread_cond_wait(cond_in[level],lock_in[level]); + pthread_cond_wait(cond_in_not_empty[level],lock_in[level]); + //len = *bipbuf_in_bytes[level]; len = bipbuf_used(bipbuf_in[level]); } + unsigned int all = 0; + if (buffer_read_n[level] != 0) { uint32_t have = buffer_read_nbytes[level]; + fprintf(stderr,"already have %u\n",have); //if ((size_t)len < buffer_read_n[level]) { // pthread_mutex_unlock(lock_in[level]); // return 0; //} - unsigned char* input = bipbuf_poll(bipbuf_in[level],len); - if (!input || len == 0) { - if (!input && len > 0) - fprintf(stderr,"cid %d expected %d on level %d (already have %u)\n",cid,len,level,have); + unsigned char* input = bipbuf_peek_all(bipbuf_in[level],&all); + if (!input || all == 0) { + if (!input && all > 0) + fprintf(stderr,"cid %d expected %d on level %d (already have %u)\n",cid,all,level,have); pthread_mutex_unlock(lock_in[level]); return 0; } memcpy(buffer_read[level],buffer_leftover[level],have); buffer_read_pos[level] = buffer_read[level]; - memcpy(buffer_read_pos[level]+have,input,len); - read_bytes = len; + memcpy(buffer_read_pos[level]+have,input,all); + read_bytes = all; *extra = have; buffer_read_n[level] = 0; buffer_read_nbytes[level] = 0; } else { - unsigned char *input = bipbuf_poll(bipbuf_in[level],len); - if (!input || len == 0) { - if (!input && len > 0) - fprintf(stderr,"cid %d expected %d on level %d\n",cid,len,level); + unsigned char *input = bipbuf_peek_all(bipbuf_in[level],&all); + if (!input || all == 0) { + if (!input && all > 0) + fprintf(stderr,"cid %d expected %d on level %d\n",cid,all,level); pthread_mutex_unlock(lock_in[level]); return 0; } - read_bytes = len; + read_bytes = all; buffer_read_pos[level] = input; +#ifdef DEBUGMC + fprintf(stderr,"read %d of %d (avail: %d) on l%d\n",all,*bipbuf_in_bytes[level],len,level); +#endif //memcpy(buffer_read_pos[level],input,len); *extra = 0; @@ -1356,6 +1366,9 @@ void ConnectionMultiApproxBatchShm::read_callback1() { read_bytes = fill_read_buffer(level,&extra); if (read_bytes == 0) { + pthread_mutex_lock(lock_in[level]); + pthread_cond_signal(cond_in_not_full[level]); + pthread_mutex_unlock(lock_in[level]); return; } @@ -1468,16 +1481,19 @@ void ConnectionMultiApproxBatchShm::read_callback1() { free(evict); } nread_ops++; - if (buffer_read_pos[level][0] == 0) { - break; - } - if (buffer_read_pos[level][0] != 129) { - //fprintf(stderr,"cid %d we don't have a valid header post %u\n",cid,buffer_read_pos[level][0]); + if (buffer_read_pos[level][0] != 129 || (read_bytes - consumed_bytes == 0)) { break; } + //if (buffer_read_pos[level][0] != 129) { + // //fprintf(stderr,"cid %d we don't have a valid header post %u\n",cid,buffer_read_pos[level][0]); + // break; + //} } - - + pthread_mutex_lock(lock_in[level]); + bipbuf_poll(bipbuf_in[level],read_bytes); + *bipbuf_in_bytes[level] = *bipbuf_in_bytes[level] - read_bytes; + pthread_cond_signal(cond_in_not_full[level]); + pthread_mutex_unlock(lock_in[level]); double now = get_time(); last_tx = now; stats.log_op(op_queue_size[1]); @@ -1498,6 +1514,9 @@ void ConnectionMultiApproxBatchShm::read_callback2() { read_bytes = fill_read_buffer(level,&extra); if (read_bytes == 0) { + pthread_mutex_lock(lock_in[level]); + pthread_cond_signal(cond_in_not_full[level]); + pthread_mutex_unlock(lock_in[level]); return; } @@ -1599,11 +1618,7 @@ void ConnectionMultiApproxBatchShm::read_callback2() { DIE("not implemented"); } nread_ops++; - if (buffer_read_pos[level][0] == 0) { - break; - } - if (buffer_read_pos[level][0] != 129) { - //fprintf(stderr,"l2 cid %d we don't have a valid header post %u\n",cid,buffer_read_pos[level][0]); + if (buffer_read_pos[level][0] != 129 || (read_bytes - consumed_bytes == 0)) { break; } } @@ -1613,6 +1628,12 @@ void ConnectionMultiApproxBatchShm::read_callback2() { //if (nread_ops == 0) { // fprintf(stderr,"ugh l2 only got: %lu ops expected %lu\n",nread_ops,batch); //} + + pthread_mutex_lock(lock_in[level]); + bipbuf_poll(bipbuf_in[level],read_bytes); + *bipbuf_in_bytes[level] = *bipbuf_in_bytes[level] - read_bytes; + pthread_cond_signal(cond_in_not_full[level]); + pthread_mutex_unlock(lock_in[level]); double now = get_time(); diff --git a/SConstruct b/SConstruct index 212fb8f..f2a4e64 100644 --- a/SConstruct +++ b/SConstruct @@ -44,10 +44,10 @@ if not conf.CheckFunc('pthread_barrier_init'): env = conf.Finish() -env.Append(CFLAGS = '-O0 -Wall -g --std=c++17 -lstdc++fs') -env.Append(CPPFLAGS = '-O0 -Wall -g --std=c++17 -lstdc++fs') -#env.Append(CFLAGS = ' -O3 -Wall -g --std=c++17 -lstdc++fs') -#env.Append(CPPFLAGS = ' -O3 -Wall -g --std=c++17 -lstdc++fs') +#env.Append(CFLAGS = '-O0 -Wall -g --std=c++17 -lstdc++fs -fsanitize=address') +#env.Append(CPPFLAGS = '-O0 -Wall -g --std=c++17 -lstdc++fs -fsanitize=address') +env.Append(CFLAGS = ' -O2 -Wall -g --std=c++17 -lstdc++fs') +env.Append(CPPFLAGS = ' -O2 -Wall -g --std=c++17 -lstdc++fs') #env.Append(CFLAGS = ' -O3 -Wall -g') #env.Append(CPPFLAGS = ' -O3 -Wall -g') #env.Append(LDFLAGS = '-fsantize=address') @@ -62,7 +62,10 @@ env.Append(CPPFLAGS = '-O0 -Wall -g --std=c++17 -lstdc++fs') env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE') src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc - Connection.cc ConnectionMulti.cc ConnectionMultiApprox.cc ConnectionMultiApproxBatch.cc ConnectionMultiApproxShm.cc Protocol.cc Generator.cc bipbuffer.cc""") + Connection.cc ConnectionMulti.cc ConnectionMultiApprox.cc ConnectionMultiApproxBatchShm.cc ConnectionMultiApproxBatch.cc ConnectionMultiApproxShm.cc Protocol.cc Generator.cc bipbuffer.cc""") + +#src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc +# ConnectionMultiApprox.cc ConnectionMultiApproxBatchShm.cc Generator.cc bipbuffer.cc""") if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER: src += ['barrier.cc'] diff --git a/bipbuffer.cc b/bipbuffer.cc index 03e306f..b712617 100644 --- a/bipbuffer.cc +++ b/bipbuffer.cc @@ -69,8 +69,10 @@ int bipbuf_is_empty(const bipbuf_t* me) * ie. is the distance from A to buffer's end less than B to A? */ static void __check_for_switch_to_b(bipbuf_t* me) { - if (me->size - me->a_end < me->a_start - me->b_end) + if (me->size - me->a_end < me->a_start - me->b_end) { + //fprintf(stderr,"%p switching to b, a_start: %d, a_end: %d, b_end %d\n",me,me->a_start,me->a_end,me->b_end); me->b_inuse = 1; + } } /* TODO: DOCUMENT THESE TWO FUNCTIONS */ diff --git a/bipbuffer.h b/bipbuffer.h index 7ccf1f7..f99f148 100644 --- a/bipbuffer.h +++ b/bipbuffer.h @@ -3,6 +3,7 @@ #define BIPBUFSIZE 4*1024*1024 #include "binary_protocol.h" +#include extern "C" { typedef struct