jvr/inc/vrp.h

285 lines
6.7 KiB
C++

#pragma once
#include <memory>
#include <string>
#include <mutex>
#include <sstream>
#include <cstring>
#include <chrono>
#include <cstdio>
#include <unistd.h>
#include <fcntl.h>
#include <cassert>
#include <sys/stat.h>
#include <dirent.h>
#include <grpcpp/grpcpp.h>
#include <vr.grpc.pb.h>
#include <map>
#include <iostream>
#include <vector>
#include <thread>
#include <cassert>
enum vr_status
{
NORMAL,
RECOVERING,
VIEW_CHANGE
};
enum vr_role
{
PRIMARY,
REPLICA
};
class vr_config_entry
{
public:
vr_role role;
std::string ip;
int port;
vr_config_entry() = delete;
vr_config_entry(vr_role role, std::string &ip, int port) : role(role), ip(ip), port(port)
{
}
};
class vr_config
{
public:
std::vector<vr_config_entry *> entries;
uint32_t primary_idx;
uint32_t f;
explicit vr_config(uint32_t f) : primary_idx((uint32_t) -1), f(f)
{
}
~vr_config()
{
for (auto &i : entries)
{
delete i;
}
}
vr_config_entry *get_entry(uint32_t idx)
{
assert(idx < entries.size());
return entries.at(idx);
}
void add_config(vr_role role, std::string &ip, int port)
{
if (role == PRIMARY)
{
if (primary_idx == (uint32_t) -1)
{
primary_idx = entries.size();
}
else
{
std::cerr << "Duplicate primary" << std::endl;
return;
}
}
auto *ent = new vr_config_entry(role, ip, port);
entries.push_back(ent);
}
};
class vr_log_data
{
public:
std::vector<uint32_t> sizes;
std::vector<const char *> datas;
std::vector<uint32_t> op_nums;
vr_log_data() = default;
void add_log(uint32_t op_num, const char *data, uint32_t size)
{
sizes.push_back(size);
datas.push_back(data);
op_nums.push_back(op_num);
}
void from_bytes(const char *data, uint32_t len)
{
uint32_t cur = 0;
while (true)
{
if (len - cur <= 2 * sizeof(uint32_t))
{
break;
}
uint32_t each_op;
uint32_t each_size;
memcpy(&each_op, &data[cur], sizeof(uint32_t));
cur += sizeof(uint32_t);
memcpy(&each_size, &data[cur], sizeof(uint32_t));
cur += sizeof(uint32_t);
if (len - cur < each_size)
{
break;
}
// don't copy here
datas.push_back(&data[cur]);
sizes.push_back(each_size);
op_nums.push_back(each_op);
cur += each_size;
}
assert(cur == len);
}
void to_bytes(char **data, uint32_t *len)
{
uint32_t size = 0;
for (uint32_t i = 0; i < op_nums.size(); i++)
{
size += 2 * sizeof(uint32_t);
size += sizes.at(i);
}
uint32_t cur = 0;
char *ret = new char[size];
for (uint32_t i = 0; i < op_nums.size(); i++)
{
uint32_t each_op = op_nums.at(i);
memcpy(&ret[cur], &each_op, sizeof(uint32_t));
cur += sizeof(uint32_t);
uint32_t each_size = sizes.at(i);
memcpy(&ret[cur], &each_size, sizeof(uint32_t));
cur += sizeof(uint32_t);
memcpy(&ret[cur], datas.at(i), each_size);
cur += each_size;
}
assert(cur == size);
*data = ret;
*len = size;
}
};
class vr_vote_info
{
public:
uint32_t total_vote;
std::map<uint32_t, bool> vote_map;
vr_vote_info() : total_vote(0)
{
};
};
// every 1s
#define COMMIT_DELAY_HMS (10)
class vrsrvrpc_impl;
class vr_state
{
public:
vr_status status;
// giant lock, can be optimized
std::mutex lock;
vr_config config;
std::map<uint32_t, std::shared_ptr<grpc::Channel>> channels;
std::atomic<uint32_t> delay_hms;
uint32_t config_idx;
uint32_t view_id;
uint32_t op_num;
uint32_t commit_num;
std::map<uint32_t, std::pair<const char *, size_t> *> logs;
std::map<uint32_t, std::condition_variable *> prep_tbl;
std::map<uint32_t, std::condition_variable *> commit_tbl;
// map from op number to a map from server id to 1
std::map<uint32_t, vr_vote_info *> vote_tbl;
vr_state() = delete;
vr_state(vr_config &config, uint32_t config_idx) : status(NORMAL), config(config), delay_hms(COMMIT_DELAY_HMS),
config_idx(config_idx)
{
view_id = 0;
op_num = 0;
commit_num = 0;
if (cur_config()->role == PRIMARY)
{
std::cout << "[g_state] primary init" << std::endl;
// create channels to other clients
for (uint32_t i = 0; i < config.entries.size(); i++)
{
if (i == config.primary_idx)
{
continue;
}
vr_config_entry *each = config.get_entry(i);
assert(each->role == REPLICA);
channels[i] = grpc::CreateChannel(each->ip + ":" + std::to_string(each->port),
grpc::InsecureChannelCredentials());
std::cout << "[g_state] created channel to replica " << each->ip + ":" + std::to_string(each->port)
<< std::endl;
}
assert(config.f * 2 + 1 == config.entries.size());
}
else
{
// create channels to the primay
vr_config_entry *pri = config.get_entry(config.primary_idx);
assert(pri->role == PRIMARY);
channels[config.primary_idx] = grpc::CreateChannel(pri->ip + ":" + std::to_string(pri->port),
grpc::InsecureChannelCredentials());
std::cout << "[g_state] replica created channel to primary " << pri->ip + ":" + std::to_string(pri->port)
<< std::endl;
}
}
vr_config_entry *cur_config()
{
return config.get_entry(config_idx);
}
std::unique_ptr<vrrpc::vrsrvrpc::Stub> get_stub(int index)
{
std::shared_ptr<grpc::Channel> channel = channels[index];
return vrrpc::vrsrvrpc::NewStub(channel);
}
void append_to_log(uint32_t _op_num, const char *data, size_t len)
{
assert(logs.find(_op_num) == logs.end());
assert(logs.size() == op_num);
assert(_op_num == op_num + 1);
op_num++;
logs[op_num] = new std::pair(data, len);
}
std::string str()
{
std::stringstream ss;
ss << "view_id: " << view_id << " op_num: " << op_num << " commit_num: " << commit_num;
return ss.str();
}
};