#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#define VR_DBG #include using grpc::ClientAsyncResponseReader; using grpc::CompletionQueue; using grpc::ClientContext; using grpc::Channel; using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; using vrrpc::vrsrvrpc; using vrrpc::empty_resp; using vrrpc::prepare_req; using vrrpc::prepare_ok_req; using vrrpc::commit_req; using vrrpc::get_state_req; using vrrpc::new_state_req; using namespace std; using namespace std::chrono; vr_state *g_state; static void start_state_transfer() { ClientContext ctx; empty_resp resp; get_state_req req; g_state->lock.lock(); req.set_view_id(g_state->view_id); req.set_op_num(g_state->op_num); req.set_srv_id(g_state->config_idx); g_state->lock.unlock(); // send to primary g_state->get_stub(g_state->config.primary_idx)->get_state(&ctx, req, &resp); } static void local_commit(uint32_t commit_num) { for (uint32_t i = g_state->commit_num + 1; i <= commit_num; i++) { vr_accept(g_state->logs[i]->first, g_state->logs[i]->second); } g_state->commit_num = commit_num; } /* * Server functions */ class vrsrvrpc_impl final : public vrsrvrpc::Service { Status prepare(ServerContext *context, const prepare_req *req, empty_resp *reply) override { ClientContext ctx; empty_resp resp; uint32_t commit_num = req->commit_num(); uint32_t view_id = req->view_id(); uint32_t op_num = req->op_num(); prepare_ok_req prep_req; //vr_log *log; char *buf; unique_lock lk(g_state->lock); #ifdef VR_DBG cout << "[prepare] received view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num << " " << g_state->str() << endl << std::flush; #endif // ignore if we are primary or we are not normal if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) { lk.unlock(); goto end; } if (g_state->op_num > op_num - 1) { #ifdef VR_DBG cout << "[prepare] skipping: already processed op_num: " << op_num << " " << g_state->str() << endl << std::flush; #endif // if we have already processed this op // reply yes } else if (g_state->op_num == op_num - 1) { // if we haven't processed but no missing entries // append the message to the log and reply yes buf = new char[req->data().length()]; memcpy(buf, req->data().c_str(), req->data().length()); // log = new vr_log(op_num, buf); g_state->append_to_log(op_num, buf, req->data().length()); #ifdef VR_DBG cout << "[prepare] processed op_num: " << op_num << " New " << g_state->str() << endl << std::flush; #endif } else { // we are missing entries // perform state transfer and wait for reply if (g_state->prep_tbl.find(op_num - 1) == g_state->prep_tbl.end()) { g_state->prep_tbl[op_num - 1] = new condition_variable(); /* hack to get around sync */ std::thread transfer_thrd(start_state_transfer); #ifdef VR_DBG cout << "[prepare] waiting for state transfer for op_num: " << op_num - 1 << ". " << g_state->str() << endl << std::flush; #endif g_state->prep_tbl[op_num - 1]->wait(lk); transfer_thrd.join(); #ifdef VR_DBG cout << "[prepare] woke up from state transfer for op_num: " << op_num - 1 << ". " << g_state->str() << endl << std::flush; #endif assert(g_state->op_num > op_num - 1); delete g_state->prep_tbl[op_num - 1]; g_state->prep_tbl.erase(op_num - 1); } else { // if there is already a thread waiting, ignore lk.unlock(); goto end; } } // here we would be up-to-date // then handle commit if (g_state->commit_num < commit_num) { local_commit(commit_num); #ifdef VR_DBG cout << "[prepare] committed logs. New " << g_state->str() << endl << std::flush; #endif } lk.unlock(); // send prepare_ok messages to primary prep_req.set_op_num(g_state->op_num); prep_req.set_view_id(g_state->view_id); prep_req.set_srv_id(g_state->config_idx); g_state->get_stub(g_state->config.primary_idx)->prepare_ok(&ctx, prep_req, &resp); end: return Status::OK; } Status prepare_ok(ServerContext *context, const prepare_ok_req *req, empty_resp *reply) override { uint32_t view_id = req->view_id(); uint32_t op_num = req->op_num(); uint32_t srv_id = req->srv_id(); g_state->lock.lock(); #ifdef VR_DBG cout << "[prepare_ok] received view_id: " << view_id << " op_num: " << op_num << " server_id: " << srv_id << " " << g_state->str() << endl << std::flush; #endif // ignore if we aren't primary bla bla bla if (g_state->cur_config()->role != PRIMARY || g_state->status != NORMAL || g_state->view_id != view_id) { g_state->lock.unlock(); goto end; } assert(op_num <= g_state->op_num); // if we already committed prepared ok stuff, ignore // and get rid of whatever is in our table if (op_num <= g_state->commit_num) { // TODO: garbage collect maybe? in case of packet loss? #ifdef VR_DBG cout << "[prepare_ok] already committed. " << g_state->str() << endl << std::flush; #endif g_state->lock.unlock(); goto end; } // otherwise check if table has entry if (g_state->vote_tbl.find(op_num) == g_state->vote_tbl.end()) { g_state->vote_tbl[op_num] = new vr_vote_info; } if (g_state->vote_tbl[op_num]->vote_map.find(srv_id) == g_state->vote_tbl[op_num]->vote_map.end()) { #ifdef VR_DBG cout << "[prepare_ok] server_id: " << srv_id << " hasn't voted. " << g_state->str() << endl << std::flush; #endif g_state->vote_tbl[op_num]->vote_map[srv_id] = true; g_state->vote_tbl[op_num]->total_vote++; } else { #ifdef VR_DBG cout << "[prepare_ok] server_id: " << srv_id << " has already voted. " << g_state->str() << endl << std::flush; #endif } #ifdef VR_DBG cout << "[prepare_ok] op_num: " << op_num << " total votes: " << g_state->vote_tbl[op_num]->total_vote << ". " << g_state->str() << endl << std::flush; #endif if (g_state->vote_tbl[op_num]->total_vote == g_state->config.f) { delete g_state->vote_tbl[op_num]; // commit the operation if (op_num > g_state->commit_num) { local_commit(op_num); #ifdef VR_DBG cout << "[prepare_ok] committed logs. New " << g_state->str() << endl << std::flush; #endif } } g_state->lock.unlock(); end: return Status::OK; } Status commit(ServerContext *context, const commit_req *req, empty_resp *reply) override { uint32_t commit_num = req->commit_num(); uint32_t view_id = req->view_id(); unique_lock lk(g_state->lock); #ifdef VR_DBG cout << "[commit] received view_id: " << view_id << " commit_num: " << commit_num << " " << g_state->str() << endl << std::flush; #endif // ignore if we are primary or we are not normal if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) { goto end; } if (g_state->op_num < commit_num) { // we are missing entries, start a state transfer g_state->commit_tbl[commit_num] = new condition_variable(); /* Hack to get around sync */ std::thread transfer_thrd(start_state_transfer); #ifdef VR_DBG cout << "[commit] waiting for state transfer for commit_num: " << commit_num << ". " << g_state->str() << endl << std::flush; #endif g_state->commit_tbl[commit_num]->wait(lk); transfer_thrd.join(); #ifdef VR_DBG cout << "[commit] woke up from state transfer for commit_num: " << commit_num << ". " << g_state->str() << endl << std::flush; #endif assert(g_state->op_num >= commit_num); delete g_state->commit_tbl[commit_num]; g_state->commit_tbl.erase(commit_num); } // here we would be up-to-date // then handle commit if (g_state->commit_num < commit_num) { local_commit(commit_num); #ifdef VR_DBG cout << "[commit] committed logs. New " << g_state->str() << endl << std::flush; #endif } end: lk.unlock(); return Status::OK; } Status get_state(ServerContext *context, const get_state_req *req, empty_resp *reply) override { ClientContext ctx; new_state_req new_req; empty_resp resp; uint32_t op_num = req->op_num(); uint32_t view_id = req->view_id(); uint32_t srv_id = req->srv_id(); char *data; uint32_t size; vr_log_data log_data; g_state->lock.lock(); #ifdef VR_DBG cout << "[get_state] received view_id: " << view_id << " op_num: " << op_num << " server id: " << srv_id << " " << g_state->str() << endl << std::flush; #endif // only primary gets the request // ignore if we are primary or we are not normal if (g_state->status != NORMAL || g_state->cur_config()->role != PRIMARY || g_state->view_id != view_id) { goto end; } // cannot request information that doens't exist assert(g_state->op_num > op_num); if (g_state->op_num == op_num) { // nothing to transfer goto end; } for (uint32_t i = op_num + 1; i <= g_state->op_num; i++) { #ifdef VR_DBG cout << "[get_state] appending log op_num: " << i << " data: " << g_state->logs[i]->first << " " << g_state->str() << endl << std::flush; #endif log_data.add_log(i, g_state->logs[i]->first, g_state->logs[i]->second); } log_data.to_bytes(&data, &size); new_req.set_op_num(g_state->op_num); new_req.set_commit_num(g_state->commit_num); new_req.set_view_id(g_state->view_id); new_req.set_log_data(data, size); g_state->lock.unlock(); g_state->get_stub(srv_id)->new_state(&ctx, new_req, &resp); delete[] data; goto end_ul; end: g_state->lock.unlock(); end_ul: return Status::OK; } Status new_state(ServerContext *context, const new_state_req *req, empty_resp *reply) override { uint32_t op_num = req->op_num(); uint32_t view_id = req->view_id(); uint32_t commit_num = req->commit_num(); vr_log_data log_data; g_state->lock.lock(); #ifdef VR_DBG cout << "[new_state] received view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num << " " << g_state->str() << endl << std::flush; #endif // only replica gets the request // ignore if we are primary or we are not normal if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) { goto end; } if (g_state->op_num < op_num) { log_data.from_bytes(req->log_data().c_str(), req->log_data().length()); for (uint32_t i = 0; i < log_data.op_nums.size(); i++) { uint32_t each_op = log_data.op_nums.at(i); if (each_op <= g_state->op_num) { continue; } uint32_t each_len = log_data.sizes.at(i); char *each_log = new char[each_len]; memcpy(each_log, log_data.datas.at(i), each_len); // append to our own log g_state->append_to_log(each_op, each_log, each_len); #ifdef VR_DBG cout << "[new_state] processed op_num: " << each_op << " New " << g_state->str() << endl << std::flush; #endif // wake up threads waiting on this op_num if (g_state->prep_tbl.find(g_state->op_num) != g_state->prep_tbl.end()) { g_state->prep_tbl[g_state->op_num]->notify_all(); #ifdef VR_DBG cout << "[new_state] woke up prep_tbl op_num: " << each_op << " " << g_state->str() << endl << std::flush; #endif } // wakeup threads waiting on this op_num if (g_state->commit_tbl.find(g_state->op_num) != g_state->commit_tbl.end()) { g_state->commit_tbl[g_state->op_num]->notify_all(); #ifdef VR_DBG cout << "[new_state] woke up commit_tbl op_num: " << each_op << " " << g_state->str() << endl << std::flush; #endif } } assert(g_state->op_num == op_num); } if (g_state->commit_num < commit_num) { local_commit(commit_num); } #ifdef VR_DBG cout << "[new_state] finished processing all logs. New " << g_state->str() << endl << std::flush; #endif end: g_state->lock.unlock(); return Status::OK; } }; void commit_proc() { while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); uint32_t cur = g_state->delay_hms.fetch_sub(1); if (cur == 1) { commit_req req; #ifdef VR_DBG cout << "[commit_proc] sending commit msg... " << g_state->str() << endl << std::flush; #endif g_state->lock.lock(); req.set_view_id(g_state->view_id); req.set_commit_num(g_state->commit_num); g_state->lock.unlock(); for (uint32_t i = 0; i < g_state->config.entries.size(); i++) { ClientContext ctx; empty_resp resp; if (i != g_state->config_idx) { g_state->get_stub(i)->commit(&ctx, req, &resp); } } g_state->delay_hms = COMMIT_DELAY_HMS; } } } extern "C" void vr_append(const char *_data, size_t len) { uint32_t op_num = g_state->op_num + 1; prepare_req req; char *data = new char[len]; memcpy(data, _data, len); g_state->lock.lock(); if (g_state->cur_config()->role != PRIMARY || g_state->status != NORMAL) { g_state->lock.unlock(); return; } #ifdef VR_DBG cout << "[vr_append] new op_num: " << op_num << " data_len: " << len << " " << g_state->str() << endl << std::flush; #endif // append to our log g_state->append_to_log(op_num, data, len); req.set_commit_num(g_state->commit_num); req.set_view_id(g_state->view_id); req.set_op_num(g_state->op_num); req.set_data(data, len); // reset delay g_state->delay_hms = COMMIT_DELAY_HMS; g_state->lock.unlock(); for (uint32_t i = 0; i < g_state->config.entries.size(); i++) { ClientContext ctx; empty_resp resp; if (i != g_state->config_idx) { g_state->get_stub(i)->prepare(&ctx, req, &resp); } } } volatile bool init = false; std::thread server_thrd; std::thread commit_thrd; void vr_init_thrd(vr_config *config, uint32_t config_index) { g_state = new vr_state(*config, config_index); vrsrvrpc_impl impl; // start local grpc server ServerBuilder builder; builder.AddListeningPort("0.0.0.0:" + std::to_string(g_state->cur_config()->port), grpc::InsecureServerCredentials()); builder.RegisterService(&impl); unique_ptr server(builder.BuildAndStart()); cout << "Server listening on " << "0.0.0.0:" + std::to_string(g_state->cur_config()->port) << endl << std::flush; init = true; // start commit proc if (g_state->cur_config()->role == PRIMARY) { commit_thrd = thread(commit_proc); } server->Wait(); } static void _vr_init(vr_config &config, uint32_t config_index) { server_thrd = thread(vr_init_thrd, &config, config_index); while(!init){} } // taken from https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c vector split(const string& str, const string& delim) { vector tokens; size_t prev = 0, pos = 0; do { pos = str.find(delim, prev); if (pos == string::npos) pos = str.length(); string token = str.substr(prev, pos-prev); if (!token.empty()) tokens.push_back(token); prev = pos + delim.length(); } while (pos < str.length() && prev < str.length()); return tokens; } vr_config* parse_config_file(string& path) { uint32_t f; string line; ifstream infile(path); getline(infile, line); f = stoi(line); vr_config *config = new vr_config(f); while(getline(infile, line)) { vr_role role; vector vec = split(line, " "); assert(vec.size() == 3); if(vec.at(0) == "p") { role = PRIMARY; } else { role = REPLICA; } config->add_config(role, vec.at(1), stoi(vec.at(2))); cout << "Read config: " << (role == PRIMARY ? "PRIMARY" : "REPLICA") << " " << vec.at(1) << ":" << stoi(vec.at(2)) << endl; } return config; } extern "C" void vr_init(const char* path, uint32_t config_index) { vr_config* config; string _path(path); config = parse_config_file(_path); _vr_init(*config, config_index); }