diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..18e6350 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +out +CMakeLists.txt +cmake-build-debug +.idea diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7187441 --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ + +# global definitions +CC := clang++ +LD := $(CC) +PROC := protoc + +INC_COMMON := inc +MK := mk +OUT := out + +# global c and ld flags + +CFLAGS = -c\ + -Wall \ + -Wextra \ + -Wno-unused-parameter\ + -std=c++17\ + -g \ + -O2 \ + -I inc \ + $(CFLAGS_$(MOD)) \ + $(CFLAGS_$(d)_$<) + +LDFLAGS = $(LDFLAGS_$(MOD)) \ + $(LDFLAGS_$(d)_$<) + +PROCFLAGS = -I $(d) --cpp_out=$(OUT)/$(d) + +GRPC_CPP_PLUGIN = grpc_cpp_plugin +GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)` +GRPCFLAGS = -I $(d) --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) --grpc_out=$(OUT)/$(d) + +PROTCOMP = $(PROC) $(PROCFLAGS) $< +GRPCCOMP = $(PROC) $(GRPCFLAGS) $< +COMP = $(CC) $(CFLAGS) -o $@ $< +MKDIR = mkdir -p $(dir $@) +LINK = $(LD) $^ $(LDFLAGS) $(LDFLAGS_TMP) -o $@ + +include Rules.top diff --git a/Rules.top b/Rules.top new file mode 100644 index 0000000..a287fc1 --- /dev/null +++ b/Rules.top @@ -0,0 +1,27 @@ +include $(MK)/prologue.mk + +.DEFAULT_GOAL := all + +# OBJ var holds all OBJS required to link the kernel + +dir := proto +include $(dir)/Rules.mk +dir := server +include $(dir)/Rules.mk +dir := client +include $(dir)/Rules.mk + +LDFLAGS_TMP := -L/usr/local/lib `pkg-config --libs protobuf grpc++ grpc`\ + -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed \ + -ldl \ + +.PHONY: all +all: out/vrsrv + +out/vrsrv: $(OBJ) + $(LINK) + +clean: + rm -rf $(OUT) + +include $(MK)/epilogue.mk diff --git a/client/Rules.mk b/client/Rules.mk new file mode 100644 index 0000000..7966951 --- /dev/null +++ b/client/Rules.mk @@ -0,0 +1,13 @@ +include $(MK)/prologue.mk + +MOD:=client + +CFLAGS_$(MOD):=$(addprefix -I, $(OUT)/proto/) + +# ADD SOURCE FILES HERE +SRC_$(d) := client.cc +# DON'T TOUCH ANYTHING ELSE + +include $(MK)/cc.mk + +include $(MK)/epilogue.mk diff --git a/client/client.cc b/client/client.cc new file mode 100644 index 0000000..d03bc21 --- /dev/null +++ b/client/client.cc @@ -0,0 +1,125 @@ +#include +#include +#include +#include +#include + +using namespace std; + +// taken from https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c +vector split(const string& str, const string& delim) +{ + vector tokens; + size_t prev = 0, pos = 0; + do + { + pos = str.find(delim, prev); + if (pos == string::npos) pos = str.length(); + string token = str.substr(prev, pos-prev); + if (!token.empty()) tokens.push_back(token); + prev = pos + delim.length(); + } + while (pos < str.length() && prev < str.length()); + return tokens; +} + +vr_config* parse_config_file(string& path) +{ + uint32_t f; + string line; + ifstream infile(path); + getline(infile, line); + f = stoi(line); + + vr_config *config = new vr_config(f); + + while(getline(infile, line)) + { + vr_role role; + + vector vec = split(line, " "); + assert(vec.size() == 3); + + if(vec.at(0) == "p") + { + role = PRIMARY; + } else { + role = REPLICA; + } + + config->add_config(role, vec.at(1), stoi(vec.at(2))); + cout << "added config: " << (role == PRIMARY ? "PRIMARY" : "REPLICA") << " " << vec.at(1) << ":" << stoi(vec.at(2)) << endl; + } + return config; +} +#define TRANSFER_BLK (2048) +#define BLK_SIZE (4096) +int num_block = 0; +bool endd = false; +char block[BLK_SIZE]; + +void vr_accept(const char *data, size_t len) +{ + cout << "[APPLICATION] VR_ACCEPT: " << len << " block " << num_block << endl; + if(len == BLK_SIZE) + { + for(int i = 0; i < len; i++) + { + assert(data[i] == 6); + } + num_block++; + } else + { + if (num_block == TRANSFER_BLK) + { + cout << "SUCCESS!" << endl; + } + else { + cout << "ERROR!" << endl; + } + endd = true; + } +} + +int main(int argc, char* argv[]) +{ + if (argc != 3) + { + cout << "invalid arguments." << endl; + return -1; + } + + string arg(argv[1]); + vr_config *conf = parse_config_file(arg); + vr_init(*conf, stoi(argv[2])); + + if (stoi(argv[2]) == 0) + { + memset(block, 6, BLK_SIZE); + + for (uint32_t i = 0; i < TRANSFER_BLK; i++) + { + cout << "appending block " << i << endl; + vr_append(block, BLK_SIZE); + } + + vr_append(block, 10); + vr_append(block, 10); + } + + while(!endd) + { + + } + +// string each; +// while(!cin.eof()) +// { +// cin >> each; +// vr_append(each.c_str(), each.length() + 1); +// } + + return 0; +} + + diff --git a/config b/config new file mode 100644 index 0000000..6a07aa3 --- /dev/null +++ b/config @@ -0,0 +1,4 @@ +1 +p localhost 10777 +r localhost 11777 +r localhost 11888 \ No newline at end of file diff --git a/inc/common.h b/inc/common.h new file mode 100644 index 0000000..777b8b5 --- /dev/null +++ b/inc/common.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +/* Max read size is 3MB every time */ +#define MAX_READ_SIZE (3*1024*1024) + +/* Max write size is 3MB every time */ +#define MAX_WRITE_SIZE (3*1024*1024) + +/* Port number for target URL */ +#define SERVER_PORT (12344) + +#define COMMIT_NUM_INVALID (0) + +#define DUM_FD_START (0xDEADBEEF) + +#define RPC_TIMEOUT_MS (5000) + +#define FILE_NAME_DELIMITER (':') + + +struct WriteEntry +{ + uint64_t offset; + uint32_t size; + uint32_t commit_num; + char *buffer; +}; + +struct WriteLog +{ + std::mutex lock; + std::vector entries; + /* The commit number is to signal clients up to what commit it can safely delete + * This is because in case of commit IO failure, client would know what are committed and + * what are not so client doesn't resend already-committed writes + */ + uint32_t latest_commit; + + WriteLog() + { + latest_commit = 1; + } +}; + +struct WSessionInfo +{ + /* global map from client to filename to commit history */ + std::shared_mutex log_lk; + std::map log_map; + + WriteLog* get_write_log(const std::string& path) + { + WriteLog* log = nullptr; + bool free = false; + + log_lk.lock_shared(); + if (log_map.count(path) != 0) { + log = log_map[path]; + } + + log_lk.unlock_shared(); + + if (log == nullptr) { + + std::cout << "Creating new WriteLog for " << path << std::endl; + log = new WriteLog; + + log_lk.lock(); + if (log_map.count(path) == 0) { + log_map[path] = log; + } else { + free = true; + log = log_map[path]; + } + log_lk.unlock(); + + if (free) { + delete log; + } + + } else { + std::cout << "Found existing WriteLog for " << path << std::endl; + } + + return log; + } + + /* Wsession is used to identify whether the server has failed. + * wsession is generated every time the server starts, and it's supposed to be unique + * for distinct server instances. Clients query for this session number upon init + * If a server has failed and restarted then it acknowledges commits even when there + * is nothing to commit. After receiving invalid wsession a client is expected to update + * its stored wsession number resent all writes stored for the file */ + uint64_t wsession; + + WSessionInfo() + { + wsession = (uint64_t) std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + } +}; \ No newline at end of file diff --git a/inc/vr.h b/inc/vr.h new file mode 100644 index 0000000..44a8d10 --- /dev/null +++ b/inc/vr.h @@ -0,0 +1,9 @@ +#pragma once + +#include "vrp.h" + +void vr_append(const char *data, size_t len); + +void vr_accept(const char *data, size_t len); + +void vr_init(vr_config& config, uint32_t config_index); diff --git a/inc/vrp.h b/inc/vrp.h new file mode 100644 index 0000000..32fcfcd --- /dev/null +++ b/inc/vrp.h @@ -0,0 +1,284 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +enum vr_status +{ + NORMAL, + RECOVERING, + VIEW_CHANGE +}; + +enum vr_role +{ + PRIMARY, + REPLICA +}; + +class vr_config_entry +{ +public: + vr_role role; + std::string ip; + int port; + + vr_config_entry() = delete; + + vr_config_entry(vr_role role, std::string &ip, int port) : role(role), ip(ip), port(port) + { + } +}; + +class vr_config +{ +public: + std::vector entries; + uint32_t primary_idx; + uint32_t f; + + explicit vr_config(uint32_t f) : primary_idx((uint32_t) -1), f(f) + { + } + + ~vr_config() + { + for (auto &i : entries) + { + delete i; + } + } + + vr_config_entry *get_entry(uint32_t idx) + { + assert(idx < entries.size()); + return entries.at(idx); + } + + void add_config(vr_role role, std::string &ip, int port) + { + if (role == PRIMARY) + { + if (primary_idx == (uint32_t) -1) + { + primary_idx = entries.size(); + } + else + { + std::cerr << "Duplicate primary" << std::endl; + return; + } + } + + auto *ent = new vr_config_entry(role, ip, port); + entries.push_back(ent); + } +}; + +class vr_log_data +{ +public: + std::vector sizes; + std::vector datas; + std::vector op_nums; + + vr_log_data() = default; + + void add_log(uint32_t op_num, const char *data, uint32_t size) + { + sizes.push_back(size); + datas.push_back(data); + op_nums.push_back(op_num); + } + + void from_bytes(const char *data, uint32_t len) + { + uint32_t cur = 0; + while (true) + { + if (len - cur <= 2 * sizeof(uint32_t)) + { + break; + } + + uint32_t each_op; + uint32_t each_size; + + memcpy(&each_op, &data[cur], sizeof(uint32_t)); + cur += sizeof(uint32_t); + memcpy(&each_size, &data[cur], sizeof(uint32_t)); + cur += sizeof(uint32_t); + + if (len - cur < each_size) + { + break; + } + + // don't copy here + datas.push_back(&data[cur]); + sizes.push_back(each_size); + op_nums.push_back(each_op); + cur += each_size; + } + + assert(cur == len); + } + + void to_bytes(char **data, uint32_t *len) + { + uint32_t size = 0; + for (uint32_t i = 0; i < op_nums.size(); i++) + { + size += 2 * sizeof(uint32_t); + size += sizes.at(i); + } + + uint32_t cur = 0; + char *ret = new char[size]; + for (uint32_t i = 0; i < op_nums.size(); i++) + { + uint32_t each_op = op_nums.at(i); + memcpy(&ret[cur], &each_op, sizeof(uint32_t)); + cur += sizeof(uint32_t); + + uint32_t each_size = sizes.at(i); + memcpy(&ret[cur], &each_size, sizeof(uint32_t)); + cur += sizeof(uint32_t); + + memcpy(&ret[cur], datas.at(i), each_size); + cur += each_size; + } + assert(cur == size); + + *data = ret; + *len = size; + } +}; + +class vr_vote_info +{ +public: + uint32_t total_vote; + std::map vote_map; + + vr_vote_info() : total_vote(0) + { + }; +}; + +// every 1s +#define COMMIT_DELAY_HMS (10) + +class vrsrvrpc_impl; + +class vr_state +{ +public: + vr_status status; + // giant lock, can be optimized + std::mutex lock; + vr_config config; + std::map> channels; + std::atomic delay_hms; + + uint32_t config_idx; + + uint32_t view_id; + uint32_t op_num; + uint32_t commit_num; + + std::map *> logs; + + std::map prep_tbl; + std::map commit_tbl; + // map from op number to a map from server id to 1 + std::map vote_tbl; + + vr_state() = delete; + + vr_state(vr_config &config, uint32_t config_idx) : status(NORMAL), config(config), delay_hms(COMMIT_DELAY_HMS), + config_idx(config_idx) + { + view_id = 0; + op_num = 0; + commit_num = 0; + + if (cur_config()->role == PRIMARY) + { + std::cout << "[g_state] primary init" << std::endl; + // create channels to other clients + for (uint32_t i = 0; i < config.entries.size(); i++) + { + if (i == config.primary_idx) + { + continue; + } + vr_config_entry *each = config.get_entry(i); + assert(each->role == REPLICA); + channels[i] = grpc::CreateChannel(each->ip + ":" + std::to_string(each->port), + grpc::InsecureChannelCredentials()); + std::cout << "[g_state] created channel to replica " << each->ip + ":" + std::to_string(each->port) + << std::endl; + } + assert(config.f * 2 + 1 == config.entries.size()); + } + else + { + // create channels to the primay + vr_config_entry *pri = config.get_entry(config.primary_idx); + assert(pri->role == PRIMARY); + channels[config.primary_idx] = grpc::CreateChannel(pri->ip + ":" + std::to_string(pri->port), + grpc::InsecureChannelCredentials()); + std::cout << "[g_state] replica created channel to primary " << pri->ip + ":" + std::to_string(pri->port) + << std::endl; + } + } + + vr_config_entry *cur_config() + { + return config.get_entry(config_idx); + } + + std::unique_ptr get_stub(int index) + { + std::shared_ptr channel = channels[index]; + + return vrrpc::vrsrvrpc::NewStub(channel); + } + + void append_to_log(uint32_t _op_num, const char *data, size_t len) + { + assert(logs.find(_op_num) == logs.end()); + assert(logs.size() == op_num); + assert(_op_num == op_num + 1); + op_num++; + + logs[op_num] = new std::pair(data, len); + } + + std::string str() + { + std::stringstream ss; + ss << "view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num; + return ss.str(); + } +}; diff --git a/mk/cc.mk b/mk/cc.mk new file mode 100644 index 0000000..9d85ef4 --- /dev/null +++ b/mk/cc.mk @@ -0,0 +1,12 @@ + +OBJ_$(d) := $(OBJ_$(d)) $(addprefix $(OUT)/$(d)/, $(SRC_$(d):.cc=.o)) + +$(OUT)/$(d)/%.o: MOD:=$(MOD) +$(OUT)/$(d)/%.o: d:=$(d) +$(OBJ_$(d)): $(OUT)/$(d)/%.o: $(d)/%.cc + $(MKDIR) + $(COMP) + +OBJ := $(OBJ) $(OBJ_$(d)) + +-include $(DEP_$(d)) diff --git a/mk/epilogue.mk b/mk/epilogue.mk new file mode 100644 index 0000000..75fe942 --- /dev/null +++ b/mk/epilogue.mk @@ -0,0 +1,2 @@ +d := $(dirstack_$(sp)) +sp := $(basename $(sp)) \ No newline at end of file diff --git a/mk/prologue.mk b/mk/prologue.mk new file mode 100644 index 0000000..ac63cfa --- /dev/null +++ b/mk/prologue.mk @@ -0,0 +1,3 @@ +sp := $(sp).x +dirstack_$(sp) := $(d) +d := $(dir) diff --git a/mk/proto.mk b/mk/proto.mk new file mode 100644 index 0000000..0b8840b --- /dev/null +++ b/mk/proto.mk @@ -0,0 +1,23 @@ +PRO_$(d) := $(addprefix $(OUT)/$(d)/, $(SRCPRO_$(d):.proto=.pb.cc)) +GRPC_$(d) := $(addprefix $(OUT)/$(d)/, $(SRCPRO_$(d):.proto=.grpc.pb.cc)) +OBJ_PRO_$(d) := $(PRO_$(d):.pb.cc=.pb.o) $(GRPC_$(d):.grpc.pb.cc=.grpc.pb.o) + +$(OUT)/$(d)/%.o: d:=$(d) +$(OUT)/$(d)/%.o: MOD:=$(MOD) +$(OBJ_PRO_$(d)): $(OUT)/$(d)/%.o: $(OUT)/$(d)/%.cc + $(MKDIR) + $(COMP) + +$(OUT)/$(d)/%.grpc.pb.cc: MOD:=$(MOD) +$(OUT)/$(d)/%.grpc.pb.cc: d:=$(d) +$(GRPC_$(d)): $(OUT)/$(d)/%.grpc.pb.cc: $(d)/%.proto + $(MKDIR) + $(GRPCCOMP) + +$(OUT)/$(d)/%.pb.cc: MOD:=$(MOD) +$(OUT)/$(d)/%.pb.cc: d:=$(d) +$(PRO_$(d)): $(OUT)/$(d)/%.pb.cc: $(d)/%.proto + $(MKDIR) + $(PROTCOMP) + +OBJ := $(OBJ) $(OBJ_PRO_$(d)) diff --git a/proto/Rules.mk b/proto/Rules.mk new file mode 100644 index 0000000..53ba7b9 --- /dev/null +++ b/proto/Rules.mk @@ -0,0 +1,14 @@ +include $(MK)/prologue.mk + +MOD := proto +C_FLAGS_$(MOD):=`pkg-config --cflags protobuf grpc` \ + $(addprefix -I, $(OUT)/$(d)/proto/) + +# ADD SOURCE FILES HERE +SRCPRO_$(d) := vr.proto + +include $(MK)/proto.mk + +OBJ_$(MOD) := $(OBJ_PRO_$(d)) + +include $(MK)/epilogue.mk diff --git a/proto/vr.proto b/proto/vr.proto new file mode 100644 index 0000000..6e78f90 --- /dev/null +++ b/proto/vr.proto @@ -0,0 +1,48 @@ +syntax = "proto3"; + +option objc_class_prefix = "HLW"; + +package vrrpc; + +service vrsrvrpc { + rpc prepare (prepare_req) returns (empty_resp) {} + rpc prepare_ok (prepare_ok_req) returns (empty_resp) {} + rpc commit (commit_req) returns (empty_resp) {} + rpc get_state(get_state_req) returns (empty_resp) {} + rpc new_state(new_state_req) returns (empty_resp) {} +} + +message get_state_req { + uint32 view_id = 1; + uint32 op_num = 2; + uint32 srv_id = 3; +} + +message new_state_req { + uint32 view_id = 1; + bytes log_data = 2; + uint32 op_num = 3; + uint32 commit_num = 4; +} + +message prepare_req { + uint32 view_id = 1; + bytes data = 2; + uint32 op_num = 3; + uint32 commit_num = 4; +} + +message prepare_ok_req { + uint32 view_id = 1; + uint32 op_num = 2; + uint32 srv_id = 3; +} + +message commit_req { + uint32 view_id = 1; + uint32 commit_num = 2; +} + +message empty_resp { + +} diff --git a/server/Rules.mk b/server/Rules.mk new file mode 100644 index 0000000..c630eb2 --- /dev/null +++ b/server/Rules.mk @@ -0,0 +1,11 @@ +include $(MK)/prologue.mk + +MOD:=server +CFLAGS_$(MOD):=$(addprefix -I, $(OUT)/proto/) + +# ADD SOURCE FILES HERE +SRC_$(d) := server.cc + +include $(MK)/cc.mk + +include $(MK)/epilogue.mk diff --git a/server/server.cc b/server/server.cc new file mode 100644 index 0000000..c55dd3c --- /dev/null +++ b/server/server.cc @@ -0,0 +1,560 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +//#define VR_DBG + +#include + +using grpc::ClientAsyncResponseReader; +using grpc::CompletionQueue; +using grpc::ClientContext; +using grpc::Channel; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::Status; +using vrrpc::vrsrvrpc; + +using vrrpc::empty_resp; +using vrrpc::prepare_req; +using vrrpc::prepare_ok_req; +using vrrpc::commit_req; +using vrrpc::get_state_req; +using vrrpc::new_state_req; + +using namespace std; +using namespace std::chrono; + +vr_state *g_state; + +static void start_state_transfer() +{ + ClientContext ctx; + empty_resp resp; + get_state_req req; + g_state->lock.lock(); + req.set_view_id(g_state->view_id); + req.set_op_num(g_state->op_num); + req.set_srv_id(g_state->config_idx); + + g_state->lock.unlock(); + + // send to primary + g_state->get_stub(g_state->config.primary_idx)->get_state(&ctx, req, &resp); +} + +static void local_commit(uint32_t commit_num) +{ + for (uint32_t i = g_state->commit_num + 1; i <= commit_num; i++) + { + vr_accept(g_state->logs[i]->first, g_state->logs[i]->second); + } + + g_state->commit_num = commit_num; +} + +/* + * Server functions + */ +class vrsrvrpc_impl final : public vrsrvrpc::Service +{ + Status prepare(ServerContext *context, const prepare_req *req, empty_resp *reply) override + { + ClientContext ctx; + empty_resp resp; + uint32_t commit_num = req->commit_num(); + uint32_t view_id = req->view_id(); + uint32_t op_num = req->op_num(); + prepare_ok_req prep_req; + //vr_log *log; + char *buf; + + unique_lock lk(g_state->lock); +#ifdef VR_DBG + cout << "[prepare] received view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num + << " " << g_state->str() << endl << std::flush; +#endif + + // ignore if we are primary or we are not normal + if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) + { + lk.unlock(); + goto end; + } + + if (g_state->op_num > op_num - 1) + { +#ifdef VR_DBG + cout << "[prepare] skipping: already processed op_num: " << op_num << " " << g_state->str() << endl << std::flush; +#endif + // if we have already processed this op + // reply yes + } + else if (g_state->op_num == op_num - 1) + { + // if we haven't processed but no missing entries + // append the message to the log and reply yes + buf = new char[req->data().length()]; + memcpy(buf, req->data().c_str(), req->data().length()); + // log = new vr_log(op_num, buf); + g_state->append_to_log(op_num, buf, req->data().length()); +#ifdef VR_DBG + cout << "[prepare] processed op_num: " << op_num << " New " << g_state->str() << endl << std::flush; +#endif + } + else + { + // we are missing entries + // perform state transfer and wait for reply + if (g_state->prep_tbl.find(op_num - 1) == g_state->prep_tbl.end()) + { + g_state->prep_tbl[op_num - 1] = new condition_variable(); + + /* hack to get around sync */ + std::thread transfer_thrd(start_state_transfer); +#ifdef VR_DBG + cout << "[prepare] waiting for state transfer for op_num: " << op_num - 1 << ". " << g_state->str() << endl << std::flush; +#endif + + g_state->prep_tbl[op_num - 1]->wait(lk); + + transfer_thrd.join(); +#ifdef VR_DBG + cout << "[prepare] woke up from state transfer for op_num: " << op_num - 1 << ". " << g_state->str() << endl << std::flush; +#endif + assert(g_state->op_num > op_num - 1); + + delete g_state->prep_tbl[op_num - 1]; + g_state->prep_tbl.erase(op_num - 1); + } + else + { + // if there is already a thread waiting, ignore + lk.unlock(); + goto end; + } + } + + // here we would be up-to-date + // then handle commit + if (g_state->commit_num < commit_num) + { + local_commit(commit_num); +#ifdef VR_DBG + cout << "[prepare] committed logs. New " << g_state->str() << endl << std::flush; +#endif + } + + lk.unlock(); + + // send prepare_ok messages to primary + prep_req.set_op_num(g_state->op_num); + prep_req.set_view_id(g_state->view_id); + prep_req.set_srv_id(g_state->config_idx); + + g_state->get_stub(g_state->config.primary_idx)->prepare_ok(&ctx, prep_req, &resp); +end: + return Status::OK; + } + + Status prepare_ok(ServerContext *context, const prepare_ok_req *req, empty_resp *reply) override + { + uint32_t view_id = req->view_id(); + uint32_t op_num = req->op_num(); + uint32_t srv_id = req->srv_id(); + + g_state->lock.lock(); + +#ifdef VR_DBG + cout << "[prepare_ok] received view_id: " << view_id << " op_num: " << op_num << " server_id: " << srv_id + << " " << g_state->str() << endl << std::flush; +#endif + + // ignore if we aren't primary bla bla bla + if (g_state->cur_config()->role != PRIMARY || g_state->status != NORMAL || g_state->view_id != view_id) + { + g_state->lock.unlock(); + goto end; + } + + assert(op_num <= g_state->op_num); + + // if we already committed prepared ok stuff, ignore + // and get rid of whatever is in our table + if (op_num <= g_state->commit_num) + { + // TODO: garbage collect maybe? in case of packet loss? +#ifdef VR_DBG + cout << "[prepare_ok] already committed. " << g_state->str() << endl << std::flush; +#endif + g_state->lock.unlock(); + goto end; + } + + // otherwise check if table has entry + if (g_state->vote_tbl.find(op_num) == g_state->vote_tbl.end()) + { + g_state->vote_tbl[op_num] = new vr_vote_info; + } + + if (g_state->vote_tbl[op_num]->vote_map.find(srv_id) == g_state->vote_tbl[op_num]->vote_map.end()) + { +#ifdef VR_DBG + cout << "[prepare_ok] server_id: " << srv_id << " hasn't voted. " << g_state->str() << endl << std::flush; +#endif + g_state->vote_tbl[op_num]->vote_map[srv_id] = true; + g_state->vote_tbl[op_num]->total_vote++; + } + else + { +#ifdef VR_DBG + cout << "[prepare_ok] server_id: " << srv_id << " has already voted. " << g_state->str() << endl << std::flush; +#endif + } + +#ifdef VR_DBG + cout << "[prepare_ok] op_num: " << op_num << " total votes: " << g_state->vote_tbl[op_num]->total_vote + << ". " << g_state->str() << endl << std::flush; +#endif + if (g_state->vote_tbl[op_num]->total_vote == g_state->config.f) + { + delete g_state->vote_tbl[op_num]; + // commit the operation + if (op_num > g_state->commit_num) + { + local_commit(op_num); +#ifdef VR_DBG + cout << "[prepare_ok] committed logs. New " << g_state->str() << endl << std::flush; +#endif + } + } + + g_state->lock.unlock(); +end: + return Status::OK; + } + + Status commit(ServerContext *context, const commit_req *req, empty_resp *reply) override + { + uint32_t commit_num = req->commit_num(); + uint32_t view_id = req->view_id(); + + unique_lock lk(g_state->lock); + +#ifdef VR_DBG + cout << "[commit] received view_id: " << view_id << " commit_num: " << commit_num << " " + << g_state->str() << endl << std::flush; +#endif + + // ignore if we are primary or we are not normal + if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) + { + goto end; + } + + if (g_state->op_num < commit_num) + { + // we are missing entries, start a state transfer + g_state->commit_tbl[commit_num] = new condition_variable(); + + /* Hack to get around sync */ + std::thread transfer_thrd(start_state_transfer); + +#ifdef VR_DBG + cout << "[commit] waiting for state transfer for commit_num: " << commit_num << ". " << g_state->str() << endl << std::flush; +#endif + g_state->commit_tbl[commit_num]->wait(lk); + + transfer_thrd.join(); +#ifdef VR_DBG + cout << "[commit] woke up from state transfer for commit_num: " << commit_num << ". " << g_state->str() << endl << std::flush; +#endif + assert(g_state->op_num >= commit_num); + + delete g_state->commit_tbl[commit_num]; + g_state->commit_tbl.erase(commit_num); + + } + + // here we would be up-to-date + // then handle commit + if (g_state->commit_num < commit_num) + { + local_commit(commit_num); +#ifdef VR_DBG + cout << "[commit] committed logs. New " << g_state->str() << endl << std::flush; +#endif + } + +end: + lk.unlock(); + return Status::OK; + } + + Status get_state(ServerContext *context, const get_state_req *req, empty_resp *reply) override + { + ClientContext ctx; + new_state_req new_req; + empty_resp resp; + uint32_t op_num = req->op_num(); + uint32_t view_id = req->view_id(); + uint32_t srv_id = req->srv_id(); + char *data; + uint32_t size; + vr_log_data log_data; + + g_state->lock.lock(); + +#ifdef VR_DBG + cout << "[get_state] received view_id: " << view_id << " op_num: " << op_num << " server id: " << srv_id + << " " << g_state->str() << endl << std::flush; +#endif + // only primary gets the request + // ignore if we are primary or we are not normal + if (g_state->status != NORMAL || g_state->cur_config()->role != PRIMARY || g_state->view_id != view_id) + { + goto end; + } + + // cannot request information that doens't exist + assert(g_state->op_num > op_num); + + if (g_state->op_num == op_num) + { + // nothing to transfer + goto end; + } + + for (uint32_t i = op_num + 1; i <= g_state->op_num; i++) + { +#ifdef VR_DBG + cout << "[get_state] appending log op_num: " << i << " data: " << g_state->logs[i]->first << " " << g_state->str() << endl << std::flush; +#endif + log_data.add_log(i, g_state->logs[i]->first, g_state->logs[i]->second); + } + + + log_data.to_bytes(&data, &size); + + new_req.set_op_num(g_state->op_num); + new_req.set_commit_num(g_state->commit_num); + new_req.set_view_id(g_state->view_id); + new_req.set_log_data(data, size); + + g_state->lock.unlock(); + g_state->get_stub(srv_id)->new_state(&ctx, new_req, &resp); + delete[] data; + goto end_ul; + +end: + g_state->lock.unlock(); +end_ul: + return Status::OK; + } + + Status new_state(ServerContext *context, const new_state_req *req, empty_resp *reply) override + { + uint32_t op_num = req->op_num(); + uint32_t view_id = req->view_id(); + uint32_t commit_num = req->commit_num(); + vr_log_data log_data; + + g_state->lock.lock(); +#ifdef VR_DBG + cout << "[new_state] received view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num + << " " << g_state->str() << endl << std::flush; +#endif + // only replica gets the request + // ignore if we are primary or we are not normal + if (g_state->status != NORMAL || g_state->cur_config()->role == PRIMARY || g_state->view_id != view_id) + { + goto end; + } + + if (g_state->op_num < op_num) + { + log_data.from_bytes(req->log_data().c_str(), req->log_data().length()); + + for (uint32_t i = 0; i < log_data.op_nums.size(); i++) + { + uint32_t each_op = log_data.op_nums.at(i); + + if (each_op <= g_state->op_num) + { + continue; + } + + uint32_t each_len = log_data.sizes.at(i); + + char *each_log = new char[each_len]; + + memcpy(each_log, log_data.datas.at(i), each_len); + + // append to our own log + g_state->append_to_log(each_op, each_log, each_len); +#ifdef VR_DBG + cout << "[new_state] processed op_num: " << each_op << " New " << g_state->str() << endl << std::flush; +#endif + + // wake up threads waiting on this op_num + if (g_state->prep_tbl.find(g_state->op_num) != g_state->prep_tbl.end()) + { + g_state->prep_tbl[g_state->op_num]->notify_all(); +#ifdef VR_DBG + cout << "[new_state] woke up prep_tbl op_num: " << each_op << " " << g_state->str() << endl << std::flush; +#endif + } + + // wakeup threads waiting on this op_num + if (g_state->commit_tbl.find(g_state->op_num) != g_state->commit_tbl.end()) + { + g_state->commit_tbl[g_state->op_num]->notify_all(); +#ifdef VR_DBG + cout << "[new_state] woke up commit_tbl op_num: " << each_op << " " << g_state->str() << endl << std::flush; +#endif + } + } + + assert(g_state->op_num == op_num); + } + + if (g_state->commit_num < commit_num) + { + local_commit(commit_num); + } + +#ifdef VR_DBG + cout << "[new_state] finished processing all logs. New " << g_state->str() << endl << std::flush; +#endif + +end: + g_state->lock.unlock(); + return Status::OK; + } +}; + +void commit_proc() +{ + while (true) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + uint32_t cur = g_state->delay_hms.fetch_sub(1); + + if (cur == 1) + { + commit_req req; + +#ifdef VR_DBG + cout << "[commit_proc] sending commit msg... " << g_state->str() << endl << std::flush; +#endif + g_state->lock.lock(); + + req.set_view_id(g_state->view_id); + req.set_commit_num(g_state->commit_num); + + g_state->lock.unlock(); + + for (uint32_t i = 0; i < g_state->config.entries.size(); i++) + { + ClientContext ctx; + empty_resp resp; + if (i != g_state->config_idx) + { + g_state->get_stub(i)->commit(&ctx, req, &resp); + } + } + + g_state->delay_hms = COMMIT_DELAY_HMS; + } + } +} + +void vr_append(const char *_data, size_t len) +{ + uint32_t op_num = g_state->op_num + 1; + prepare_req req; + char *data = new char[len]; + memcpy(data, _data, len); + + g_state->lock.lock(); + if (g_state->cur_config()->role != PRIMARY || g_state->status != NORMAL) + { + g_state->lock.unlock(); + return; + } + +#ifdef VR_DBG + cout << "[vr_append] new op_num: " << op_num << " data_len: " << len << " " << g_state->str() << endl << std::flush; +#endif + + // append to our log + g_state->append_to_log(op_num, data, len); + + req.set_commit_num(g_state->commit_num); + req.set_view_id(g_state->view_id); + req.set_op_num(g_state->op_num); + req.set_data(data, len); + + // reset delay + g_state->delay_hms = COMMIT_DELAY_HMS; + + g_state->lock.unlock(); + + for (uint32_t i = 0; i < g_state->config.entries.size(); i++) + { + ClientContext ctx; + empty_resp resp; + if (i != g_state->config_idx) + { + g_state->get_stub(i)->prepare(&ctx, req, &resp); + } + } +} + +volatile bool init = false; +std::thread server_thrd; +std::thread commit_thrd; + +void vr_init_thrd(vr_config *config, uint32_t config_index) +{ + g_state = new vr_state(*config, config_index); + + vrsrvrpc_impl impl; + // start local grpc server + ServerBuilder builder; + builder.AddListeningPort("0.0.0.0:" + std::to_string(g_state->cur_config()->port), grpc::InsecureServerCredentials()); + builder.RegisterService(&impl); + unique_ptr server(builder.BuildAndStart()); + + cout << "Server listening on " << "0.0.0.0:" + std::to_string(g_state->cur_config()->port) << endl << std::flush; + + init = true; + // start commit proc + if (g_state->cur_config()->role == PRIMARY) + { + commit_thrd = thread(commit_proc); + } + server->Wait(); +} + +void vr_init(vr_config &config, uint32_t config_index) +{ + server_thrd = thread(vr_init_thrd, &config, config_index); + while(!init){} +}