This commit is contained in:
Daniel Byrne 2022-06-11 06:52:42 -04:00
parent 50cdc24f04
commit e878bbfd7b
5 changed files with 1943 additions and 29 deletions

View File

@ -733,6 +733,8 @@ public:
bipbuf_t* bipbuf_out[3];
pthread_mutex_t* lock_in[3];
pthread_mutex_t* lock_out[3];
pthread_cond_t* cond_in[3];
pthread_cond_t* cond_out[3];
private:
string hostname1;
@ -841,4 +843,172 @@ private:
bool consume_resp_line(evbuffer *input, bool &done);
};
class ConnectionMultiApproxBatchShm {
public:
ConnectionMultiApproxBatchShm(options_t options, bool sampling = true);
~ConnectionMultiApproxBatchShm();
int do_connect();
double start_time; // Time when this connection began operations.
ConnectionStats stats;
options_t options;
bool is_ready() { return read_state == IDLE; }
void set_priority(int pri);
void start_loading();
void reset();
bool check_exit_condition(double now = 0.0);
void read_callback1();
void read_callback2();
int eof;
uint32_t get_cid();
//void set_queue(ConcurrentQueue<string> *a_trace_queue);
int add_to_wb_keys(string wb_key);
int add_to_copy_keys(string key);
int add_to_touch_keys(string key);
void del_wb_keys(string wb_key);
void del_copy_keys(string key);
void del_touch_keys(string key);
void set_g_wbkeys(unordered_map<string,vector<Operation*>> *a_wb_keys);
void set_queue(queue<Operation*> *a_trace_queue);
void set_lock(pthread_mutex_t* a_lock);
size_t handle_response_batch(unsigned char *rbuf_pos, resp_t *resp,
size_t read_bytes, size_t consumed_bytes,
int level, int extra);
void drive_write_machine_shm(double now = 0.0);
bipbuf_t* bipbuf_in[3];
bipbuf_t* bipbuf_out[3];
pthread_mutex_t* lock_in[3];
pthread_mutex_t* lock_out[3];
pthread_cond_t* cond_in[3];
pthread_cond_t* cond_out[3];
private:
string hostname1;
string hostname2;
string port;
double o_percent;
int trace_queue_n;
struct event *timer; // Used to control inter-transmission time.
double next_time; // Inter-transmission time parameters.
double last_rx; // Used to moderate transmission rate.
double last_tx;
enum read_state_enum {
INIT_READ,
CONN_SETUP,
LOADING,
IDLE,
WAITING_FOR_GET,
WAITING_FOR_SET,
WAITING_FOR_DELETE,
MAX_READ_STATE,
};
enum write_state_enum {
INIT_WRITE,
ISSUING,
WAITING_FOR_TIME,
WAITING_FOR_OPQ,
MAX_WRITE_STATE,
};
read_state_enum read_state;
write_state_enum write_state;
// Parameters to track progress of the data loader.
int loader_issued, loader_completed;
uint32_t *opaque;
int *issue_buf_size;
int *issue_buf_n;
unsigned char **issue_buf_pos;
unsigned char **issue_buf;
bool last_quiet1;
bool last_quiet2;
uint32_t total;
uint32_t cid;
uint32_t gets;
uint32_t gloc;
uint32_t ghits;
uint32_t sloc;
uint32_t esets;
uint32_t isets;
uint32_t iloc;
uint32_t buffer_size_;
unsigned char* buffer_write[MAX_LEVELS];
unsigned char* buffer_read[MAX_LEVELS];
unsigned char* buffer_write_pos[MAX_LEVELS];
unsigned char* buffer_read_pos[MAX_LEVELS];
unsigned char* buffer_lasthdr[MAX_LEVELS];
unsigned char* buffer_leftover[MAX_LEVELS];
uint32_t buffer_read_n[MAX_LEVELS];
uint32_t buffer_write_n[MAX_LEVELS];
uint32_t buffer_read_nbytes[MAX_LEVELS];
uint32_t buffer_write_nbytes[MAX_LEVELS];
//std::vector<std::queue<Operation>> op_queue;
Operation ***op_queue;
uint32_t *op_queue_size;
uint32_t *issued_queue;
Generator *valuesize;
Generator *keysize;
KeyGenerator *keygen;
Generator *iagen;
pthread_mutex_t* lock;
unordered_map<string,vector<Operation*>> *g_wb_keys;
queue<Operation*> *trace_queue;
queue<Operation*> extra_queue;
// state machine functions / event processing
void pop_op(Operation *op);
void output_op(Operation *op, int type, bool was_found);
//void finish_op(Operation *op);
void finish_op(Operation *op,int was_hit);
int issue_getsetorset(double now = 0.0);
// request functions
void issue_sasl();
int issue_op(Operation* op);
int issue_noop(int level = 1);
int issue_touch(const char* key, int valuelen, double now, int level);
int issue_delete(const char* key, double now, uint32_t flags);
int issue_get_with_len(const char* key, int valuelen, double now, bool quiet, uint32_t flags, Operation *l1 = NULL);
int issue_get_with_len(Operation *pop, double now, bool quiet, uint32_t flags, Operation *l1 = NULL);
int issue_set(const char* key, const char* value, int length, double now, uint32_t flags);
int issue_set(Operation *pop, const char* value, double now, uint32_t flags);
int offer_set(Operation *pop, int extra = 0);
int offer_get(Operation *pop, int extra = 0);
int send_write_buffer(int level);
size_t fill_read_buffer(int level, int *extra);
int add_get_op_to_queue(Operation *pop, int level, int cb = 0);
int add_set_to_queue(Operation *pop, int level, const char *value, int cb = 0);
int read_response_l1();
void read_response_l2();
// protocol fucntions
int set_request_ascii(const char* key, const char* value, int length);
int set_request_binary(const char* key, const char* value, int length);
int set_request_resp(const char* key, const char* value, int length);
int get_request_ascii(const char* key);
int get_request_binary(const char* key);
int get_request_resp(const char* key);
bool consume_binary_response(evbuffer *input);
bool consume_ascii_line(evbuffer *input, bool &done);
bool consume_resp_line(evbuffer *input, bool &done);
};
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1250,6 +1250,86 @@ void ConnectionMultiApproxShm::drive_write_machine_shm(double now) {
}
/**
* Request generation loop
*/
//void ConnectionMultiApproxShm::drive_write_machine_shm_2(double now) {
//
// while (trace_queue->size() > 0) {
// int extra_tries = extra_queue.size();
// for (int i = 0; i < extra_tries; i++) {
// Operation *Op = extra_queue.front();
// switch(Op->type)
// {
// case Operation::GET:
// offer_get(Op,1);
// break;
// case Operation::SET:
// offer_set(Op,1);
// break;
// }
// }
//
// int nissued = 0;
// int nissuedl2 = 0;
// while (nissued < options.depth && extra_queue.size() == 0) {
// Operation *Op = trace_queue->front();
//
// if (Op == NULL || trace_queue->size() <= 0 || Op->type == Operation::SASL) {
// eof = 1;
// cid_rate.insert( {cid, 100 } );
// fprintf(stderr,"cid %d done\n",cid);
// string op_queue1;
// string op_queue2;
// for (int j = 0; j < 2; j++) {
// for (int i = 0; i < OPAQUE_MAX; i++) {
// if (op_queue[j+1][i] != NULL) {
// if (j == 0) {
// op_queue1 = op_queue1 + "," + op_queue[j+1][i]->key;
// } else {
// op_queue2 = op_queue2 + "," + op_queue[j+1][i]->key;
// }
// }
// }
// }
// fprintf(stderr,"cid %d op_queue1: %s op_queue2: %s, op_queue_size1: %d, op_queue_size2: %d\n",cid,op_queue1.c_str(),op_queue2.c_str(),op_queue_size[1],op_queue_size[2]);
// return;
// }
// int gtg = 0;
// pthread_mutex_lock(lock_out[1]);
// switch(Op->type)
// {
// case Operation::GET:
// gtg = bipbuf_unused(bipbuf_out[1]) > (int)(24+strlen(Op->key)) ? 1 : 0;
// break;
// case Operation::SET:
// gtg = bipbuf_unused(bipbuf_out[1]) > (int)(32+Op->valuelen) ? 1 : 0;
// break;
// }
// pthread_mutex_unlock(lock_out[1]);
//
//
// if (gtg) {
// trace_queue->pop();
// int l2issued = issue_op(Op);
// nissuedl2 += l2issued;
// nissued++;
// } else {
// break;
// }
// }
//
// //wait for response (at least nissued)
// int l2issued = read_response_l1();
// nissuedl2 += l2issued;
// if (nissuedl2 > 0) {
// read_response_l2();
// }
//
// }
//
//}
/**
* Tries to consume a binary response (in its entirety) from shared memory.
*
@ -1287,7 +1367,7 @@ static int handle_response(ConnectionMultiApproxShm *conn, unsigned char *input,
while ((abuf = bipbuf_poll(conn->bipbuf_in[level],targetLen)) == NULL) {
pthread_mutex_unlock(conn->lock_in[level]);
tries++;
if (tries > 100) {
if (tries > 10) {
//fprintf(stderr,"more than 10000 tries for cid: %d for length %d\n",conn->get_cid(),targetLen);
return 0;
@ -1348,7 +1428,7 @@ static int handle_response(ConnectionMultiApproxShm *conn, unsigned char *input,
evict->evictedData = (char*)malloc(evict->evictedLen);
memcpy(evict->evictedData,buf,evict->evictedLen);
evict->evicted = true;
fprintf(stderr,"class: %u, serverFlags: %u, evictedFlags: %u\n",evict->clsid,evict->serverFlags,evict->evictedFlags);
//fprintf(stderr,"class: %u, serverFlags: %u, evictedFlags: %u\n",evict->clsid,evict->serverFlags,evict->evictedFlags);
} else if ( (opcode == CMD_TOUCH && status == RESP_NOT_FOUND) ||
(opcode == CMD_DELETE && status == RESP_NOT_FOUND) ) {
found = false;
@ -1521,16 +1601,6 @@ int ConnectionMultiApproxShm::read_response_l1() {
}
pthread_mutex_lock(lock_in[1]);
unsigned char *in = bipbuf_peek(bipbuf_in[1],24);
//int tries = 0;
//while (input == NULL) {
// tries++;
// if (tries > 1000) {
// fprintf(stderr,"more than 1000 tries for header cid: %d\n",cid);
// break;
// }
// in = bipbuf_poll(bipbuf_in[1],24);
//
//}
if (in) {
memcpy(input,in,24);
pthread_mutex_unlock(lock_in[1]);
@ -1689,16 +1759,6 @@ void ConnectionMultiApproxShm::read_response_l2() {
pthread_mutex_lock(lock_in[2]);
unsigned char *in = bipbuf_peek(bipbuf_in[2],24);
//int tries = 0;
//while (in == NULL) {
// tries++;
// if (tries > 2000) {
// fprintf(stderr,"more than 2000 tries for header cid: %d\n",cid);
// break;
// }
// in = bipbuf_poll(bipbuf_in[2],24);
//
//}
if (in) {
memcpy(input,in,24);
pthread_mutex_unlock(lock_in[2]);

View File

@ -11,6 +11,7 @@ option "quiet" - "Disable log messages."
text "\nBasic options:"
option "use_shm" - "use shared memory"
option "use_shm_batch" - "use shared memory BATCHED"
option "ratelimit" - "limit conns from exceeding each other in requests"
option "v1callback" - "use v1 readcallbacks"
option "server" s "Memcached server hostname[:port]. \

View File

@ -1377,7 +1377,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
if (servers.size() == 1) {
vector<Connection*> connections;
vector<Connection*> server_lead;
for (auto s: servers) {
for (auto s: servers) {
// Split args.server_arg[s] into host:port using strtok().
char *s_copy = new char[s.length() + 1];
strcpy(s_copy, s.c_str());
@ -1629,7 +1629,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
event_config_free(config);
evdns_base_free(evdns, 0);
event_base_free(base);
} else if (servers.size() == 2 && ! ( args.approx_given || args.approx_batch_given || args.use_shm_given)) {
} else if (servers.size() == 2 && !(args.approx_given || args.approx_batch_given || args.use_shm_given || args.use_shm_batch_given)) {
vector<ConnectionMulti*> connections;
vector<ConnectionMulti*> server_lead;
@ -1786,7 +1786,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
event_config_free(config);
evdns_base_free(evdns, 0);
event_base_free(base);
} else if (servers.size() == 2 && args.approx_given && !args.approx_batch_given && !args.use_shm_given) {
} else if (servers.size() == 2 && args.approx_given) {
vector<ConnectionMultiApprox*> connections;
vector<ConnectionMultiApprox*> server_lead;
@ -1944,7 +1944,7 @@ void do_mutilate(const vector<string>& servers, options_t& options,
evdns_base_free(evdns, 0);
event_base_free(base);
} else if (servers.size() == 2 && args.approx_batch_given && !args.use_shm_given) {
} else if (servers.size() == 2 && args.approx_batch_given) {
vector<ConnectionMultiApproxBatch*> connections;
vector<ConnectionMultiApproxBatch*> server_lead;
@ -2139,8 +2139,8 @@ void do_mutilate(const vector<string>& servers, options_t& options,
// wait for all threads to reach here
pthread_barrier_wait(&barrier);
//fprintf(stderr,"Start = %f\n", start);
double start = get_time();
fprintf(stderr,"Start = %f\n", start);
double now = start;
for (ConnectionMultiApproxShm *conn: connections) {
conn->start_time = now;
@ -2157,9 +2157,68 @@ void do_mutilate(const vector<string>& servers, options_t& options,
stats.accumulate(conn->stats);
delete conn;
}
double stop = get_time();
fprintf(stderr,"Stop = %f\n", stop);
stats.start = start;
stats.stop = now;
stats.stop = stop;
} else if (servers.size() == 2 && args.use_shm_batch_given) {
vector<ConnectionMultiApproxBatchShm*> connections;
int conns = args.measure_connections_given ? args.measure_connections_arg :
options.connections;
srand(time(NULL));
for (int c = 0; c < conns; c++) {
ConnectionMultiApproxBatchShm* conn = new ConnectionMultiApproxBatchShm(options,args.agentmode_given ? false : true);
int connected = 0;
if (conn && conn->do_connect()) {
connected = 1;
}
int cid = conn->get_cid();
if (connected) {
fprintf(stderr,"cid %d gets trace_queue\nfirst: %s\n",cid,trace_queue->at(cid)->front()->key);
if (g_lock != NULL) {
conn->set_g_wbkeys(g_wb_keys);
conn->set_lock(g_lock);
}
conn->set_queue(trace_queue->at(cid));
connections.push_back(conn);
} else {
fprintf(stderr,"conn multi: %d, not connected!!\n",c);
}
}
// wait for all threads to reach here
pthread_barrier_wait(&barrier);
double start = get_time();
fprintf(stderr,"Start = %f\n", start);
double now = start;
for (ConnectionMultiApproxBatchShm *conn: connections) {
conn->start_time = now;
conn->drive_write_machine_shm(now);
}
if (master && !args.scan_given && !args.search_given)
V("stopped at %f options.time = %d", get_time(), options.time);
// Tear-down and accumulate stats.
for (ConnectionMultiApproxBatchShm *conn: connections) {
stats.accumulate(conn->stats);
delete conn;
}
double stop = get_time();
fprintf(stderr,"Stop = %f\n", stop);
stats.start = start;
stats.stop = stop;
}
}