285 lines
6.7 KiB
C++
285 lines
6.7 KiB
C++
#pragma once
|
|
|
|
#include <memory>
|
|
#include <string>
|
|
#include <mutex>
|
|
#include <sstream>
|
|
#include <cstring>
|
|
#include <chrono>
|
|
|
|
#include <cstdio>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <cassert>
|
|
#include <sys/stat.h>
|
|
#include <dirent.h>
|
|
|
|
#include <grpcpp/grpcpp.h>
|
|
#include <vr.grpc.pb.h>
|
|
#include <map>
|
|
#include <iostream>
|
|
#include <vector>
|
|
#include <thread>
|
|
#include <cassert>
|
|
|
|
enum vr_status
|
|
{
|
|
NORMAL,
|
|
RECOVERING,
|
|
VIEW_CHANGE
|
|
};
|
|
|
|
enum vr_role
|
|
{
|
|
PRIMARY,
|
|
REPLICA
|
|
};
|
|
|
|
class vr_config_entry
|
|
{
|
|
public:
|
|
vr_role role;
|
|
std::string ip;
|
|
int port;
|
|
|
|
vr_config_entry() = delete;
|
|
|
|
vr_config_entry(vr_role role, std::string &ip, int port) : role(role), ip(ip), port(port)
|
|
{
|
|
}
|
|
};
|
|
|
|
class vr_config
|
|
{
|
|
public:
|
|
std::vector<vr_config_entry *> entries;
|
|
uint32_t primary_idx;
|
|
uint32_t f;
|
|
|
|
explicit vr_config(uint32_t f) : primary_idx((uint32_t) -1), f(f)
|
|
{
|
|
}
|
|
|
|
~vr_config()
|
|
{
|
|
for (auto &i : entries)
|
|
{
|
|
delete i;
|
|
}
|
|
}
|
|
|
|
vr_config_entry *get_entry(uint32_t idx)
|
|
{
|
|
assert(idx < entries.size());
|
|
return entries.at(idx);
|
|
}
|
|
|
|
void add_config(vr_role role, std::string &ip, int port)
|
|
{
|
|
if (role == PRIMARY)
|
|
{
|
|
if (primary_idx == (uint32_t) -1)
|
|
{
|
|
primary_idx = entries.size();
|
|
}
|
|
else
|
|
{
|
|
std::cerr << "Duplicate primary" << std::endl;
|
|
return;
|
|
}
|
|
}
|
|
|
|
auto *ent = new vr_config_entry(role, ip, port);
|
|
entries.push_back(ent);
|
|
}
|
|
};
|
|
|
|
class vr_log_data
|
|
{
|
|
public:
|
|
std::vector<uint32_t> sizes;
|
|
std::vector<const char *> datas;
|
|
std::vector<uint32_t> op_nums;
|
|
|
|
vr_log_data() = default;
|
|
|
|
void add_log(uint32_t op_num, const char *data, uint32_t size)
|
|
{
|
|
sizes.push_back(size);
|
|
datas.push_back(data);
|
|
op_nums.push_back(op_num);
|
|
}
|
|
|
|
void from_bytes(const char *data, uint32_t len)
|
|
{
|
|
uint32_t cur = 0;
|
|
while (true)
|
|
{
|
|
if (len - cur <= 2 * sizeof(uint32_t))
|
|
{
|
|
break;
|
|
}
|
|
|
|
uint32_t each_op;
|
|
uint32_t each_size;
|
|
|
|
memcpy(&each_op, &data[cur], sizeof(uint32_t));
|
|
cur += sizeof(uint32_t);
|
|
memcpy(&each_size, &data[cur], sizeof(uint32_t));
|
|
cur += sizeof(uint32_t);
|
|
|
|
if (len - cur < each_size)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// don't copy here
|
|
datas.push_back(&data[cur]);
|
|
sizes.push_back(each_size);
|
|
op_nums.push_back(each_op);
|
|
cur += each_size;
|
|
}
|
|
|
|
assert(cur == len);
|
|
}
|
|
|
|
void to_bytes(char **data, uint32_t *len)
|
|
{
|
|
uint32_t size = 0;
|
|
for (uint32_t i = 0; i < op_nums.size(); i++)
|
|
{
|
|
size += 2 * sizeof(uint32_t);
|
|
size += sizes.at(i);
|
|
}
|
|
|
|
uint32_t cur = 0;
|
|
char *ret = new char[size];
|
|
for (uint32_t i = 0; i < op_nums.size(); i++)
|
|
{
|
|
uint32_t each_op = op_nums.at(i);
|
|
memcpy(&ret[cur], &each_op, sizeof(uint32_t));
|
|
cur += sizeof(uint32_t);
|
|
|
|
uint32_t each_size = sizes.at(i);
|
|
memcpy(&ret[cur], &each_size, sizeof(uint32_t));
|
|
cur += sizeof(uint32_t);
|
|
|
|
memcpy(&ret[cur], datas.at(i), each_size);
|
|
cur += each_size;
|
|
}
|
|
assert(cur == size);
|
|
|
|
*data = ret;
|
|
*len = size;
|
|
}
|
|
};
|
|
|
|
class vr_vote_info
|
|
{
|
|
public:
|
|
uint32_t total_vote;
|
|
std::map<uint32_t, bool> vote_map;
|
|
|
|
vr_vote_info() : total_vote(0)
|
|
{
|
|
};
|
|
};
|
|
|
|
// every 1s
|
|
#define COMMIT_DELAY_HMS (10)
|
|
|
|
class vrsrvrpc_impl;
|
|
|
|
class vr_state
|
|
{
|
|
public:
|
|
vr_status status;
|
|
// giant lock, can be optimized
|
|
std::mutex lock;
|
|
vr_config config;
|
|
std::map<uint32_t, std::shared_ptr<grpc::Channel>> channels;
|
|
std::atomic<uint32_t> delay_hms;
|
|
|
|
uint32_t config_idx;
|
|
|
|
uint32_t view_id;
|
|
uint32_t op_num;
|
|
uint32_t commit_num;
|
|
|
|
std::map<uint32_t, std::pair<const char *, size_t> *> logs;
|
|
|
|
std::map<uint32_t, std::condition_variable *> prep_tbl;
|
|
std::map<uint32_t, std::condition_variable *> commit_tbl;
|
|
// map from op number to a map from server id to 1
|
|
std::map<uint32_t, vr_vote_info *> vote_tbl;
|
|
|
|
vr_state() = delete;
|
|
|
|
vr_state(vr_config &config, uint32_t config_idx) : status(NORMAL), config(config), delay_hms(COMMIT_DELAY_HMS),
|
|
config_idx(config_idx)
|
|
{
|
|
view_id = 0;
|
|
op_num = 0;
|
|
commit_num = 0;
|
|
|
|
if (cur_config()->role == PRIMARY)
|
|
{
|
|
std::cout << "[g_state] primary init" << std::endl;
|
|
// create channels to other clients
|
|
for (uint32_t i = 0; i < config.entries.size(); i++)
|
|
{
|
|
if (i == config.primary_idx)
|
|
{
|
|
continue;
|
|
}
|
|
vr_config_entry *each = config.get_entry(i);
|
|
assert(each->role == REPLICA);
|
|
channels[i] = grpc::CreateChannel(each->ip + ":" + std::to_string(each->port),
|
|
grpc::InsecureChannelCredentials());
|
|
std::cout << "[g_state] created channel to replica " << each->ip + ":" + std::to_string(each->port)
|
|
<< std::endl;
|
|
}
|
|
assert(config.f * 2 + 1 == config.entries.size());
|
|
}
|
|
else
|
|
{
|
|
// create channels to the primay
|
|
vr_config_entry *pri = config.get_entry(config.primary_idx);
|
|
assert(pri->role == PRIMARY);
|
|
channels[config.primary_idx] = grpc::CreateChannel(pri->ip + ":" + std::to_string(pri->port),
|
|
grpc::InsecureChannelCredentials());
|
|
std::cout << "[g_state] replica created channel to primary " << pri->ip + ":" + std::to_string(pri->port)
|
|
<< std::endl;
|
|
}
|
|
}
|
|
|
|
vr_config_entry *cur_config()
|
|
{
|
|
return config.get_entry(config_idx);
|
|
}
|
|
|
|
std::unique_ptr<vrrpc::vrsrvrpc::Stub> get_stub(int index)
|
|
{
|
|
std::shared_ptr<grpc::Channel> channel = channels[index];
|
|
|
|
return vrrpc::vrsrvrpc::NewStub(channel);
|
|
}
|
|
|
|
void append_to_log(uint32_t _op_num, const char *data, size_t len)
|
|
{
|
|
assert(logs.find(_op_num) == logs.end());
|
|
assert(logs.size() == op_num);
|
|
assert(_op_num == op_num + 1);
|
|
op_num++;
|
|
|
|
logs[op_num] = new std::pair(data, len);
|
|
}
|
|
|
|
std::string str()
|
|
{
|
|
std::stringstream ss;
|
|
ss << "view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num;
|
|
return ss.str();
|
|
}
|
|
};
|