echo modules
This commit is contained in:
parent
666292478e
commit
62dddff785
@ -1,13 +1,6 @@
|
||||
cmake_minimum_required(VERSION 3.18.0)
|
||||
project(ppd)
|
||||
|
||||
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/msg/msg.pb.cc
|
||||
COMMAND mkdir -p ${CMAKE_CURRENT_BINARY_DIR}/msg/
|
||||
COMMAND protoc --cpp_out=${CMAKE_CURRENT_BINARY_DIR}/msg/ --proto_path=${CMAKE_CURRENT_SOURCE_DIR}/msg/ msg.proto
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/msg/msg.proto)
|
||||
|
||||
set(CMAKE_EXPORT_COMPILE_COMMANDS True)
|
||||
find_package(PkgConfig REQUIRED)
|
||||
find_package(OpenSSL REQUIRED)
|
||||
@ -35,6 +28,7 @@ target_link_directories(ppd PRIVATE ${bsock_LIBRARY_DIRS})
|
||||
target_compile_options(ppd PRIVATE ${CFLAGS})
|
||||
target_include_directories(ppd PRIVATE ${bsock_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
||||
|
||||
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/modules)
|
||||
# if (${ENABLE_FSTACK} MATCHES "y")
|
||||
# add_executable(ppd_ff ${CMAKE_CURRENT_SOURCE_DIR}/ppd_ff/ppd.cc
|
||||
# ${CMAKE_CURRENT_SOURCE_DIR}/ppd_ff/reqproc.cc
|
||||
|
@ -45,7 +45,8 @@ struct ppd_mod_info {
|
||||
|
||||
typedef struct ppd_mod_info * (*ppd_get_mod_info_fn)(void);
|
||||
#define PPD_GET_MOD_INFO_ID ppd_getmod_info
|
||||
#define PPD_GET_MOD_INFO_FN ("##PPD_GET_MOD_INFO_ID")
|
||||
#define STR(x) #x
|
||||
#define STRX(x) STR(x)
|
||||
|
||||
#define inline inline __attribute__((unused))
|
||||
|
||||
@ -57,9 +58,9 @@ ppd_load_module(const char *path)
|
||||
E("Failed to load module %s: %s.\n", path, dlerror());
|
||||
}
|
||||
|
||||
ppd_get_mod_info_fn fn = (ppd_get_mod_info_fn)dlfunc(handle, PPD_GET_MOD_INFO_FN);
|
||||
ppd_get_mod_info_fn fn = (ppd_get_mod_info_fn)dlfunc(handle, STRX(PPD_GET_MOD_INFO_ID));
|
||||
if (fn == NULL) {
|
||||
E("Failed to find symbol %s: %s\n", PPD_GET_MOD_INFO_FN, dlerror());
|
||||
E("Failed to find symbol %s: %s\n", STRX(PPD_GET_MOD_INFO_ID), dlerror());
|
||||
}
|
||||
|
||||
return fn();
|
||||
|
11
modules/CMakeLists.txt
Normal file
11
modules/CMakeLists.txt
Normal file
@ -0,0 +1,11 @@
|
||||
set(CFLAGS -O3 -g -Wall -Werror -Wextra -std=c2x)
|
||||
|
||||
add_library(echo_ppd SHARED ${CMAKE_CURRENT_SOURCE_DIR}/echo/ppd_mod.c)
|
||||
target_compile_options(echo_ppd PRIVATE ${CFLAGS})
|
||||
target_include_directories(echo_ppd PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
set_target_properties(echo_ppd PROPERTIES PREFIX "")
|
||||
|
||||
add_library(echo_dsmbr SHARED ${CMAKE_CURRENT_SOURCE_DIR}/echo/dsmbr_mod.c)
|
||||
target_compile_options(echo_dsmbr PRIVATE ${CFLAGS})
|
||||
target_include_directories(echo_dsmbr PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
set_target_properties(echo_dsmbr PROPERTIES PREFIX "")
|
@ -1,10 +0,0 @@
|
||||
syntax = "proto3";
|
||||
|
||||
message dismember_stat {
|
||||
int32 type = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message dismember_ctrl {
|
||||
int32 type = 1;
|
||||
}
|
54
modules/echo/dsmbr_mod.c
Normal file
54
modules/echo/dsmbr_mod.c
Normal file
@ -0,0 +1,54 @@
|
||||
#include <mod.h>
|
||||
#define UNUSED __attribute__((unused))
|
||||
|
||||
static int
|
||||
global_init_cb(UNUSED int argc, UNUSED char **argk, UNUSED char **argv, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
thread_create_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_create_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
conn_destroy_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_recv_cb(UNUSED const char *data, UNUSED size_t sz, UNUSED void *global_ctx,
|
||||
UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_send_cb(UNUSED const char *out, UNUSED size_t sz, size_t *out_sz, UNUSED void *global_ctx,
|
||||
UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
*out_sz = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct ppd_mod_info minfo = { .global_init_cb = global_init_cb,
|
||||
.thread_create_cb = thread_create_cb,
|
||||
.conn_create_cb = conn_create_cb,
|
||||
.conn_destroy_cb = conn_destroy_cb,
|
||||
.conn_send_cb = conn_send_cb,
|
||||
.conn_recv_cb = conn_recv_cb };
|
||||
|
||||
struct ppd_mod_info *
|
||||
PPD_GET_MOD_INFO_ID(void)
|
||||
{
|
||||
return &minfo;
|
||||
}
|
54
modules/echo/ppd_mod.c
Normal file
54
modules/echo/ppd_mod.c
Normal file
@ -0,0 +1,54 @@
|
||||
#include <mod.h>
|
||||
#define UNUSED __attribute__((unused))
|
||||
|
||||
static int
|
||||
global_init_cb(UNUSED int argc, UNUSED char **argk, UNUSED char **argv, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
thread_create_cb(UNUSED int core, UNUSED void *global_ctx, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_create_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void **ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
conn_destroy_cb(UNUSED void *global_ctx, UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_recv_cb(UNUSED const char *data, UNUSED size_t sz, UNUSED void *global_ctx,
|
||||
UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
conn_send_cb(UNUSED const char *out, UNUSED size_t sz, size_t *out_sz, UNUSED void *global_ctx,
|
||||
UNUSED void *thread_ctx, UNUSED void *conn_ctx)
|
||||
{
|
||||
*out_sz = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct ppd_mod_info minfo = { .global_init_cb = global_init_cb,
|
||||
.thread_create_cb = thread_create_cb,
|
||||
.conn_create_cb = conn_create_cb,
|
||||
.conn_destroy_cb = conn_destroy_cb,
|
||||
.conn_send_cb = conn_send_cb,
|
||||
.conn_recv_cb = conn_recv_cb };
|
||||
|
||||
struct ppd_mod_info *
|
||||
PPD_GET_MOD_INFO_ID(void)
|
||||
{
|
||||
return &minfo;
|
||||
}
|
@ -1,346 +0,0 @@
|
||||
#include "google/protobuf/stubs/common.h"
|
||||
#include "options.h"
|
||||
#include <msg.pb.h>
|
||||
#include <stdint.h>
|
||||
#include <sstream>
|
||||
#include <sys/_stdint.h>
|
||||
#include <unistd.h>
|
||||
#include <util.h>
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
#include <rocksdb/db.h>
|
||||
#endif
|
||||
|
||||
#include "reqgen.h"
|
||||
////////////////
|
||||
// TOUCH Generator
|
||||
////////////////
|
||||
|
||||
touch_gen::touch_gen(const int conn_id, std::unordered_map<std::string, std::string>* args) : req_gen(conn_id)
|
||||
{
|
||||
this->ugen = createGenerator("uniform:100");
|
||||
if (this->ugen == NULL) {
|
||||
E("Failed to create ugen for touch_gen\n");
|
||||
}
|
||||
|
||||
if (args->find(PARAM_GEN) == args->end()) {
|
||||
this->wgen = createGenerator(PARAM_GEN_DEFAULT);
|
||||
} else {
|
||||
this->wgen = createGenerator(args->at(PARAM_GEN));
|
||||
}
|
||||
|
||||
if (this->wgen == NULL) {
|
||||
E("Failed to create wgen for touch_gen\n");
|
||||
}
|
||||
|
||||
if (args->find(PARAM_UPDATE) == args->end()) {
|
||||
this->update_ratio = PARAM_UPDATE_DEFAULT;
|
||||
} else {
|
||||
this->update_ratio = atoi(args->at(PARAM_UPDATE).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
touch_gen::~touch_gen()
|
||||
{
|
||||
delete wgen;
|
||||
delete ugen;
|
||||
}
|
||||
|
||||
int touch_gen::send_req(int fd)
|
||||
{
|
||||
ppd_touch_req req;
|
||||
|
||||
if (options.master_mode) {
|
||||
req.set_touch_cnt(0);
|
||||
} else {
|
||||
req.set_touch_cnt(this->wgen->generate());
|
||||
}
|
||||
|
||||
if (this->ugen->generate() < this->update_ratio) {
|
||||
req.set_inc(1);
|
||||
} else {
|
||||
req.set_inc(0);
|
||||
}
|
||||
|
||||
if (!req.SerializeToArray(this->send_buf, MAX_SEND_BUF_SIZE)) {
|
||||
E("Failed to serialize to array for fd %d", fd);
|
||||
}
|
||||
|
||||
return writemsg(fd, this->send_buf, this->MAX_SEND_BUF_SIZE, this->send_buf, req.ByteSizeLong());
|
||||
}
|
||||
|
||||
int touch_gen::read_resp(int fd)
|
||||
{
|
||||
ppd_touch_resp resp;
|
||||
struct ppd_msg * msg = (struct ppd_msg *)this->send_buf;
|
||||
|
||||
if (readmsg(fd, this->send_buf, MAX_SEND_BUF_SIZE) < 0) {
|
||||
E("Readbuf failed for fd %d\n", fd);
|
||||
}
|
||||
|
||||
resp.ParseFromArray(msg->payload, msg->size);
|
||||
|
||||
return resp.status();
|
||||
}
|
||||
|
||||
////////////////
|
||||
// ECHO Generator
|
||||
////////////////
|
||||
int echo_gen::delay_table[DT_SZ];
|
||||
std::atomic<int> echo_gen::delay_table_populated = ATOMIC_VAR_INIT(0);
|
||||
|
||||
void
|
||||
echo_gen::populate_delay_table()
|
||||
{
|
||||
int idx = 0;
|
||||
int expected = 0;
|
||||
|
||||
// hack
|
||||
if (echo_gen::DT_SZ != 100) {
|
||||
E("Delay table size isn't 100");
|
||||
}
|
||||
|
||||
/* 95 + 4 + 1 = 100 */
|
||||
if (!delay_table_populated.compare_exchange_weak(expected, 1)) {
|
||||
return;
|
||||
}
|
||||
|
||||
delay_table[idx++] = 200;
|
||||
|
||||
for(int i = 0; i < 4; i++) {
|
||||
delay_table[idx++] = 50;
|
||||
}
|
||||
|
||||
for(int i = 0; i < 95; i++) {
|
||||
delay_table[idx++] = 10;
|
||||
}
|
||||
}
|
||||
|
||||
echo_gen::echo_gen(const int conn_id, std::unordered_map<std::string, std::string>* args) : req_gen(conn_id)
|
||||
{
|
||||
|
||||
if (args->find(PARAM_GEN) == args->end()) {
|
||||
this->wgen = createGenerator(PARAM_GEN_DEFAULT);
|
||||
} else {
|
||||
this->wgen = createGenerator(args->at(PARAM_GEN));
|
||||
}
|
||||
|
||||
if (this->wgen == NULL) {
|
||||
E("Failed to create wgen for echo_gen");
|
||||
}
|
||||
|
||||
if (args->find(PARAM_CDELAY) == args->end()) {
|
||||
this->cdelay = PARAM_CDELAY_DEFAULT;
|
||||
} else {
|
||||
this->cdelay = atoi(args->at(PARAM_CDELAY).c_str());
|
||||
}
|
||||
|
||||
if (args->find(PARAM_SIZE) == args->end()) {
|
||||
this->data_sz = PARAM_SIZE_DEFAULT;
|
||||
} else {
|
||||
this->data_sz = atoi(args->at(PARAM_SIZE).c_str());
|
||||
}
|
||||
|
||||
if (this->cdelay) {
|
||||
populate_delay_table();
|
||||
}
|
||||
}
|
||||
|
||||
echo_gen::~echo_gen()
|
||||
{
|
||||
delete wgen;
|
||||
}
|
||||
|
||||
int echo_gen::get_delay()
|
||||
{
|
||||
if (cdelay) {
|
||||
return delay_table[conn_id % DT_SZ];
|
||||
} else {
|
||||
return this->wgen->generate();
|
||||
}
|
||||
}
|
||||
|
||||
int echo_gen::send_req(int fd)
|
||||
{
|
||||
ppd_echo_req req;
|
||||
|
||||
if (options.master_mode) {
|
||||
req.set_enable_delay(0);
|
||||
req.set_data_size(PARAM_SIZE_DEFAULT);
|
||||
} else {
|
||||
req.set_enable_delay(get_delay());
|
||||
req.set_data_size(this->data_sz);
|
||||
}
|
||||
|
||||
if (!req.SerializeToArray(this->send_buf, MAX_SEND_BUF_SIZE)) {
|
||||
E("Failed to serialize to array for fd %d\n", fd);
|
||||
}
|
||||
this->send_sz = req.ByteSizeLong() + sizeof(struct ppd_msg);
|
||||
return writemsg(fd, this->send_buf, MAX_SEND_BUF_SIZE, this->send_buf, req.ByteSizeLong());
|
||||
}
|
||||
|
||||
int echo_gen::read_resp(int fd)
|
||||
{
|
||||
ppd_echo_resp resp;
|
||||
struct ppd_msg * msg = (struct ppd_msg *)this->send_buf;
|
||||
|
||||
if (readmsg(fd, this->send_buf, MAX_SEND_BUF_SIZE) < 0) {
|
||||
E("Readbuf failed for fd %d\n", fd);
|
||||
}
|
||||
|
||||
resp.ParseFromArray(msg->payload, msg->size);
|
||||
this->recv_sz = msg->size + sizeof(struct ppd_msg);
|
||||
return resp.status();
|
||||
}
|
||||
|
||||
int echo_gen::get_send_sz()
|
||||
{
|
||||
return this->send_sz;
|
||||
}
|
||||
|
||||
int echo_gen::get_recv_sz()
|
||||
{
|
||||
return this->recv_sz;
|
||||
}
|
||||
|
||||
|
||||
////////////////
|
||||
// HTTP Generator
|
||||
////////////////
|
||||
char http_gen::cons_buf[CONS_SZ];
|
||||
|
||||
http_gen::http_gen(const int conn_id, const std::string& host, std::unordered_map<std::string, std::string>* args) : req_gen(conn_id)
|
||||
{
|
||||
// hack
|
||||
method = "GET";
|
||||
headers.insert({"Host", host});
|
||||
headers.insert({"Connection", "keep-alive"});
|
||||
major_ver = 1;
|
||||
minor_ver = 1;
|
||||
uri = "/";
|
||||
}
|
||||
|
||||
http_gen::~http_gen()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
std::string
|
||||
http_gen::build_req()
|
||||
{
|
||||
std::stringstream ss;
|
||||
|
||||
ss << method << ' ' \
|
||||
<< uri << ' ' \
|
||||
<< "HTTP/" + std::to_string(major_ver) + "." + std::to_string(minor_ver) \
|
||||
<< "\r\n";
|
||||
|
||||
for(auto &i : headers) {
|
||||
ss << i.first.c_str() << ": " << i.second.c_str() << "\r\n";
|
||||
}
|
||||
|
||||
ss << "\r\n";
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
int http_gen::send_req(int fd)
|
||||
{
|
||||
std::string req = build_req();
|
||||
//V("Sending Request: %s\n", req.c_str());
|
||||
return writebuf(fd, (void*)req.c_str(), req.length());
|
||||
}
|
||||
|
||||
int http_gen::read_resp(int fd)
|
||||
{
|
||||
// hack
|
||||
// consume everything
|
||||
return read(fd, cons_buf, CONS_SZ);;
|
||||
}
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
////////////////
|
||||
// RDB Generator
|
||||
////////////////
|
||||
|
||||
rdb_gen::rdb_gen(const int conn_id, std::unordered_map<std::string, std::string>* args) : req_gen(conn_id), rand(1000 + conn_id)
|
||||
{
|
||||
this->key = AllocateKey(&this->key_guard, KEY_SIZE);
|
||||
std::vector<double> ratio {GET_RATIO, PUT_RATIO, SEEK_RATIO};
|
||||
this->query.Initiate(ratio);
|
||||
gen_exp.InitiateExpDistribution(TOTAL_KEYS, KEYRANGE_DIST_A, KEYRANGE_DIST_B, KEYRANGE_DIST_C, KEYRANGE_DIST_D);
|
||||
}
|
||||
|
||||
rdb_gen::~rdb_gen()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
int rdb_gen::send_req(int fd)
|
||||
{
|
||||
int status;
|
||||
ppd_rdb_req req;
|
||||
|
||||
int64_t ini_rand = GetRandomKey(&this->rand);
|
||||
int64_t key_rand = gen_exp.DistGetKeyID(ini_rand, this->KEYRANGE_DIST_A, this->KEYRANGE_DIST_B);
|
||||
|
||||
int64_t rand_v = ini_rand % TOTAL_KEYS;
|
||||
double u = (double)rand_v / TOTAL_KEYS;
|
||||
|
||||
int query_type = options.master_mode ? 0 : query.GetType(rand_v);
|
||||
|
||||
GenerateKeyFromInt(key_rand, TOTAL_KEYS, KEY_SIZE, &this->key);
|
||||
|
||||
V("SENDING KEY: %s.\n", this->key.data());
|
||||
switch (query_type) {
|
||||
case 0: {
|
||||
// get query
|
||||
req.set_op(PPD_RDB_OP_GET);
|
||||
req.set_key(this->key.data(), this->key.size());
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
// put query
|
||||
int val_sz = ParetoCdfInversion(u, VALUE_THETA, VALUE_K, VALUE_SIGMA);
|
||||
rocksdb::Slice val = gen.Generate((unsigned int)val_sz);
|
||||
req.set_op(PPD_RDB_OP_PUT);
|
||||
req.set_key(this->key.data(), this->key.size());
|
||||
req.set_val(val.data(), val.size());
|
||||
break;
|
||||
}
|
||||
case 2: {
|
||||
// seek query
|
||||
int64_t scan_length = ParetoCdfInversion(u, ITER_THETA, ITER_K, ITER_SIGMA);
|
||||
req.set_op(PPD_RDB_OP_SEEK);
|
||||
req.set_key(this->key.data(), this->key.size());
|
||||
req.set_optarg(scan_length);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
E("Unsupported query type %d", query_type);
|
||||
}
|
||||
}
|
||||
|
||||
if (!req.SerializeToArray(this->send_buf, MAX_SEND_BUF_SIZE)) {
|
||||
E("Failed to serialize protobuf");
|
||||
}
|
||||
|
||||
status = writemsg(fd, this->send_buf, MAX_SEND_BUF_SIZE, this->send_buf, req.ByteSizeLong());
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
int rdb_gen::read_resp(int fd)
|
||||
{
|
||||
ppd_rdb_resp resp;
|
||||
struct ppd_msg * msg = (struct ppd_msg *)this->send_buf;
|
||||
|
||||
if (readmsg(fd, this->send_buf, MAX_SEND_BUF_SIZE) < 0) {
|
||||
E("Readbuf failed for fd %d", fd);
|
||||
}
|
||||
|
||||
resp.ParseFromArray(msg->payload, msg->size);
|
||||
|
||||
return resp.status();
|
||||
}
|
||||
#endif
|
622
modules/reqgen.h
622
modules/reqgen.h
@ -1,622 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdlib>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <random>
|
||||
#ifdef WITH_ROCKSDB
|
||||
#include <rocksdb/db.h>
|
||||
#endif
|
||||
#include "Generator.h"
|
||||
#include "options.h"
|
||||
#include <msg.pb.h>
|
||||
|
||||
#define DISABLE_EVIL_CONSTRUCTORS(name) \
|
||||
name(const name&) = delete; \
|
||||
void operator=(const name) = delete
|
||||
|
||||
class req_gen {
|
||||
protected:
|
||||
const int conn_id;
|
||||
char * send_buf;
|
||||
constexpr static int MAX_SEND_BUF_SIZE = 1024 * 1024;
|
||||
public:
|
||||
req_gen(const int id) : conn_id(id) { this->send_buf = new char[MAX_SEND_BUF_SIZE]; };
|
||||
virtual ~req_gen() { delete[] send_buf; };
|
||||
virtual int send_req(int fd) = 0;
|
||||
virtual int read_resp(int fd) = 0;
|
||||
};
|
||||
|
||||
class touch_gen : public req_gen
|
||||
{
|
||||
private:
|
||||
static constexpr const char* PARAM_GEN = "GEN";
|
||||
static constexpr const char* PARAM_GEN_DEFAULT = "fixed:64";
|
||||
static constexpr const char* PARAM_UPDATE = "UPDATE";
|
||||
static constexpr const int PARAM_UPDATE_DEFAULT = 0;
|
||||
Generator *wgen;
|
||||
Generator *ugen;
|
||||
int update_ratio;
|
||||
public:
|
||||
touch_gen(const int conn_id, std::unordered_map<std::string, std::string>* args);
|
||||
touch_gen() = delete;
|
||||
~touch_gen();
|
||||
DISABLE_EVIL_CONSTRUCTORS(touch_gen);
|
||||
int send_req(int fd);
|
||||
int read_resp(int fd);
|
||||
};
|
||||
|
||||
class echo_gen : public req_gen
|
||||
{
|
||||
private:
|
||||
static constexpr const char* PARAM_GEN = "GEN";
|
||||
static constexpr const char* PARAM_GEN_DEFAULT = "fixed:0";
|
||||
static constexpr const char* PARAM_CDELAY = "CDELAY";
|
||||
static constexpr const char* PARAM_SIZE = "SIZE";
|
||||
static constexpr const int PARAM_SIZE_DEFAULT = 0;
|
||||
static constexpr const int PARAM_CDELAY_DEFAULT = 0;
|
||||
static constexpr const int DT_SZ = 100;
|
||||
static int delay_table[DT_SZ];
|
||||
static void populate_delay_table();
|
||||
static std::atomic<int> delay_table_populated;
|
||||
|
||||
Generator *wgen;
|
||||
int cdelay;
|
||||
int data_sz;
|
||||
int recv_sz;
|
||||
int send_sz;
|
||||
int get_delay();
|
||||
public:
|
||||
echo_gen(const int conn_id, std::unordered_map<std::string, std::string>* args);
|
||||
echo_gen() = delete;
|
||||
~echo_gen();
|
||||
DISABLE_EVIL_CONSTRUCTORS(echo_gen);
|
||||
int send_req(int fd);
|
||||
int read_resp(int fd);
|
||||
int get_send_sz();
|
||||
int get_recv_sz();
|
||||
};
|
||||
|
||||
class http_gen : public req_gen
|
||||
{
|
||||
private:
|
||||
std::string build_req();
|
||||
std::string method;
|
||||
std::unordered_map<std::string, std::string> headers;
|
||||
std::string uri;
|
||||
int major_ver;
|
||||
int minor_ver;
|
||||
static constexpr const int CONS_SZ = 1024 * 1024 * 4;
|
||||
static char cons_buf[CONS_SZ];
|
||||
public:
|
||||
http_gen(const int conn_id, const std::string& host, std::unordered_map<std::string, std::string>* args);
|
||||
http_gen() = delete;
|
||||
~http_gen();
|
||||
DISABLE_EVIL_CONSTRUCTORS(http_gen);
|
||||
int send_req(int fd);
|
||||
int read_resp(int fd);
|
||||
};
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
class rdb_gen : public req_gen
|
||||
{
|
||||
private:
|
||||
enum DistributionType : unsigned char {
|
||||
kFixed = 0,
|
||||
kUniform,
|
||||
kNormal
|
||||
};
|
||||
|
||||
constexpr static int64_t TOTAL_KEYS = 50000000;
|
||||
constexpr static double GET_RATIO = 0.83;
|
||||
constexpr static double PUT_RATIO = 0.14;
|
||||
constexpr static double SEEK_RATIO = 0.03;
|
||||
constexpr static int KEYRANGE_NUM = 30;
|
||||
constexpr static double KEYRANGE_DIST_A = 14.18;
|
||||
constexpr static double KEYRANGE_DIST_B = -2.917;
|
||||
constexpr static double KEYRANGE_DIST_C = 0.0164;
|
||||
constexpr static double KEYRANGE_DIST_D = -0.08082;
|
||||
constexpr static double VALUE_THETA = 0;
|
||||
constexpr static double VALUE_K = 0.2615;
|
||||
constexpr static double VALUE_SIGMA = 25.45;
|
||||
constexpr static int VALUESIZE_MIN = 100;
|
||||
constexpr static int VALUESIZE_MAX = 102400;
|
||||
constexpr static DistributionType VALUESIZE_DIST = kFixed;
|
||||
constexpr static int KEY_SIZE = 48;
|
||||
constexpr static double ITER_THETA = 0;
|
||||
constexpr static double ITER_K = 2.517;
|
||||
constexpr static double ITER_SIGMA = 14.236;
|
||||
constexpr static double READ_RANDOM_EXP_RANGE = 0.0;
|
||||
constexpr static bool IS_LITTLE_ENDIAN = true;
|
||||
constexpr static int FIXED_VALUE_SIZE = 100;
|
||||
// A good 64-bit random number generator based on std::mt19937_64
|
||||
class Random64 {
|
||||
private:
|
||||
std::mt19937_64 generator_;
|
||||
|
||||
public:
|
||||
explicit Random64(uint64_t s) : generator_(s) { }
|
||||
|
||||
// Generates the next random number
|
||||
uint64_t Next() { return generator_(); }
|
||||
|
||||
// Returns a uniformly distributed value in the range [0..n-1]
|
||||
// REQUIRES: n > 0
|
||||
uint64_t Uniform(uint64_t n) {
|
||||
return std::uniform_int_distribution<uint64_t>(0, n - 1)(generator_);
|
||||
}
|
||||
|
||||
// Randomly returns true ~"1/n" of the time, and false otherwise.
|
||||
// REQUIRES: n > 0
|
||||
bool OneIn(uint64_t n) { return Uniform(n) == 0; }
|
||||
|
||||
// Skewed: pick "base" uniformly from range [0,max_log] and then
|
||||
// return "base" random bits. The effect is to pick a number in the
|
||||
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
|
||||
uint64_t Skewed(int max_log) {
|
||||
return Uniform(uint64_t(1) << Uniform(max_log + 1));
|
||||
}
|
||||
};
|
||||
|
||||
struct KeyrangeUnit {
|
||||
int64_t keyrange_start;
|
||||
int64_t keyrange_access;
|
||||
int64_t keyrange_keys;
|
||||
};
|
||||
|
||||
class GenerateTwoTermExpKeys {
|
||||
public:
|
||||
int64_t keyrange_rand_max_;
|
||||
int64_t keyrange_size_;
|
||||
int64_t keyrange_num_;
|
||||
bool initiated_;
|
||||
std::vector<KeyrangeUnit> keyrange_set_;
|
||||
|
||||
GenerateTwoTermExpKeys() {
|
||||
keyrange_rand_max_ = TOTAL_KEYS;
|
||||
initiated_ = false;
|
||||
}
|
||||
|
||||
~GenerateTwoTermExpKeys() {}
|
||||
|
||||
// Initiate the KeyrangeUnit vector and calculate the size of each
|
||||
// KeyrangeUnit.
|
||||
rocksdb::Status InitiateExpDistribution(int64_t total_keys, double prefix_a,
|
||||
double prefix_b, double prefix_c,
|
||||
double prefix_d) {
|
||||
int64_t amplify = 0;
|
||||
int64_t keyrange_start = 0;
|
||||
initiated_ = true;
|
||||
if (KEYRANGE_NUM <= 0) {
|
||||
keyrange_num_ = 1;
|
||||
} else {
|
||||
keyrange_num_ = KEYRANGE_NUM;
|
||||
}
|
||||
keyrange_size_ = total_keys / keyrange_num_;
|
||||
|
||||
// Calculate the key-range shares size based on the input parameters
|
||||
for (int64_t pfx = keyrange_num_; pfx >= 1; pfx--) {
|
||||
// Step 1. Calculate the probability that this key range will be
|
||||
// accessed in a query. It is based on the two-term expoential
|
||||
// distribution
|
||||
double keyrange_p = prefix_a * std::exp(prefix_b * pfx) +
|
||||
prefix_c * std::exp(prefix_d * pfx);
|
||||
if (keyrange_p < std::pow(10.0, -16.0)) {
|
||||
keyrange_p = 0.0;
|
||||
}
|
||||
// Step 2. Calculate the amplify
|
||||
// In order to allocate a query to a key-range based on the random
|
||||
// number generated for this query, we need to extend the probability
|
||||
// of each key range from [0,1] to [0, amplify]. Amplify is calculated
|
||||
// by 1/(smallest key-range probability). In this way, we ensure that
|
||||
// all key-ranges are assigned with an Integer that >=0
|
||||
if (amplify == 0 && keyrange_p > 0) {
|
||||
amplify = static_cast<int64_t>(std::floor(1 / keyrange_p)) + 1;
|
||||
}
|
||||
|
||||
// Step 3. For each key-range, we calculate its position in the
|
||||
// [0, amplify] range, including the start, the size (keyrange_access)
|
||||
KeyrangeUnit p_unit;
|
||||
p_unit.keyrange_start = keyrange_start;
|
||||
if (0.0 >= keyrange_p) {
|
||||
p_unit.keyrange_access = 0;
|
||||
} else {
|
||||
p_unit.keyrange_access =
|
||||
static_cast<int64_t>(std::floor(amplify * keyrange_p));
|
||||
}
|
||||
p_unit.keyrange_keys = keyrange_size_;
|
||||
keyrange_set_.push_back(p_unit);
|
||||
keyrange_start += p_unit.keyrange_access;
|
||||
}
|
||||
keyrange_rand_max_ = keyrange_start;
|
||||
|
||||
// Step 4. Shuffle the key-ranges randomly
|
||||
// Since the access probability is calculated from small to large,
|
||||
// If we do not re-allocate them, hot key-ranges are always at the end
|
||||
// and cold key-ranges are at the begin of the key space. Therefore, the
|
||||
// key-ranges are shuffled and the rand seed is only decide by the
|
||||
// key-range hotness distribution. With the same distribution parameters
|
||||
// the shuffle results are the same.
|
||||
Random64 rand_loca(keyrange_rand_max_);
|
||||
for (int64_t i = 0; i < KEYRANGE_NUM; i++) {
|
||||
int64_t pos = rand_loca.Next() % KEYRANGE_NUM;
|
||||
assert(i >= 0 && i < static_cast<int64_t>(keyrange_set_.size()) &&
|
||||
pos >= 0 && pos < static_cast<int64_t>(keyrange_set_.size()));
|
||||
std::swap(keyrange_set_[i], keyrange_set_[pos]);
|
||||
}
|
||||
|
||||
// Step 5. Recalculate the prefix start postion after shuffling
|
||||
int64_t offset = 0;
|
||||
for (auto& p_unit : keyrange_set_) {
|
||||
p_unit.keyrange_start = offset;
|
||||
offset += p_unit.keyrange_access;
|
||||
}
|
||||
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
// Generate the Key ID according to the input ini_rand and key distribution
|
||||
int64_t DistGetKeyID(int64_t ini_rand, double key_dist_a,
|
||||
double key_dist_b) {
|
||||
int64_t keyrange_rand = ini_rand % keyrange_rand_max_;
|
||||
|
||||
// Calculate and select one key-range that contains the new key
|
||||
int64_t start = 0, end = static_cast<int64_t>(keyrange_set_.size());
|
||||
while (start + 1 < end) {
|
||||
int64_t mid = start + (end - start) / 2;
|
||||
assert(mid >= 0 && mid < static_cast<int64_t>(keyrange_set_.size()));
|
||||
if (keyrange_rand < keyrange_set_[mid].keyrange_start) {
|
||||
end = mid;
|
||||
} else {
|
||||
start = mid;
|
||||
}
|
||||
}
|
||||
int64_t keyrange_id = start;
|
||||
|
||||
// Select one key in the key-range and compose the keyID
|
||||
int64_t key_offset = 0, key_seed;
|
||||
if (key_dist_a == 0.0 && key_dist_b == 0.0) {
|
||||
key_offset = ini_rand % keyrange_size_;
|
||||
} else {
|
||||
key_seed = static_cast<int64_t>(
|
||||
ceil(std::pow((ini_rand / key_dist_a), (1 / key_dist_b))));
|
||||
Random64 rand_key(key_seed);
|
||||
key_offset = static_cast<int64_t>(rand_key.Next()) % keyrange_size_;
|
||||
}
|
||||
return keyrange_size_ * keyrange_id + key_offset;
|
||||
}
|
||||
};
|
||||
|
||||
// Decide the ratio of different query types
|
||||
// 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
|
||||
class QueryDecider {
|
||||
public:
|
||||
std::vector<int> type_;
|
||||
std::vector<double> ratio_;
|
||||
int range_;
|
||||
|
||||
QueryDecider() {}
|
||||
~QueryDecider() {}
|
||||
|
||||
rocksdb::Status Initiate(std::vector<double> ratio_input) {
|
||||
int range_max = 1000;
|
||||
double sum = 0.0;
|
||||
for (auto& ratio : ratio_input) {
|
||||
sum += ratio;
|
||||
}
|
||||
range_ = 0;
|
||||
for (auto& ratio : ratio_input) {
|
||||
range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
|
||||
type_.push_back(range_);
|
||||
ratio_.push_back(ratio / sum);
|
||||
}
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
int GetType(int64_t rand_num) {
|
||||
if (rand_num < 0) {
|
||||
rand_num = rand_num * (-1);
|
||||
}
|
||||
assert(range_ != 0);
|
||||
int pos = static_cast<int>(rand_num % range_);
|
||||
for (int i = 0; i < static_cast<int>(type_.size()); i++) {
|
||||
if (pos < type_[i]) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
class BaseDistribution {
|
||||
public:
|
||||
BaseDistribution(unsigned int _min, unsigned int _max)
|
||||
: min_value_size_(_min), max_value_size_(_max) {}
|
||||
virtual ~BaseDistribution() {}
|
||||
|
||||
unsigned int Generate() {
|
||||
auto val = Get();
|
||||
if (NeedTruncate()) {
|
||||
val = std::max(min_value_size_, val);
|
||||
val = std::min(max_value_size_, val);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
private:
|
||||
virtual unsigned int Get() = 0;
|
||||
virtual bool NeedTruncate() {
|
||||
return true;
|
||||
}
|
||||
unsigned int min_value_size_;
|
||||
unsigned int max_value_size_;
|
||||
};
|
||||
|
||||
|
||||
class FixedDistribution : public BaseDistribution
|
||||
{
|
||||
public:
|
||||
FixedDistribution(unsigned int size) :
|
||||
BaseDistribution(size, size),
|
||||
size_(size) {}
|
||||
private:
|
||||
virtual unsigned int Get() override {
|
||||
return size_;
|
||||
}
|
||||
virtual bool NeedTruncate() override {
|
||||
return false;
|
||||
}
|
||||
unsigned int size_;
|
||||
};
|
||||
|
||||
class UniformDistribution
|
||||
: public BaseDistribution,
|
||||
public std::uniform_int_distribution<unsigned int> {
|
||||
public:
|
||||
UniformDistribution(unsigned int _min, unsigned int _max)
|
||||
: BaseDistribution(_min, _max),
|
||||
std::uniform_int_distribution<unsigned int>(_min, _max),
|
||||
gen_(rd_()) {}
|
||||
|
||||
private:
|
||||
virtual unsigned int Get() override {
|
||||
return (*this)(gen_);
|
||||
}
|
||||
virtual bool NeedTruncate() override {
|
||||
return false;
|
||||
}
|
||||
std::random_device rd_;
|
||||
std::mt19937 gen_;
|
||||
};
|
||||
|
||||
class NormalDistribution
|
||||
: public BaseDistribution, public std::normal_distribution<double> {
|
||||
public:
|
||||
NormalDistribution(unsigned int _min, unsigned int _max)
|
||||
: BaseDistribution(_min, _max),
|
||||
// 99.7% values within the range [min, max].
|
||||
std::normal_distribution<double>(
|
||||
(double)(_min + _max) / 2.0 /*mean*/,
|
||||
(double)(_max - _min) / 6.0 /*stddev*/),
|
||||
gen_(rd_()) {}
|
||||
|
||||
private:
|
||||
virtual unsigned int Get() override {
|
||||
return static_cast<unsigned int>((*this)(gen_));
|
||||
}
|
||||
std::random_device rd_;
|
||||
std::mt19937 gen_;
|
||||
};
|
||||
|
||||
|
||||
class Random {
|
||||
private:
|
||||
enum : uint32_t {
|
||||
M = 2147483647L // 2^31-1
|
||||
};
|
||||
enum : uint64_t {
|
||||
A = 16807 // bits 14, 8, 7, 5, 2, 1, 0
|
||||
};
|
||||
|
||||
uint32_t seed_;
|
||||
|
||||
static uint32_t GoodSeed(uint32_t s) { return (s & M) != 0 ? (s & M) : 1; }
|
||||
|
||||
public:
|
||||
// This is the largest value that can be returned from Next()
|
||||
enum : uint32_t { kMaxNext = M };
|
||||
|
||||
explicit Random(uint32_t s) : seed_(GoodSeed(s)) {}
|
||||
|
||||
void Reset(uint32_t s) { seed_ = GoodSeed(s); }
|
||||
|
||||
uint32_t Next() {
|
||||
// We are computing
|
||||
// seed_ = (seed_ * A) % M, where M = 2^31-1
|
||||
//
|
||||
// seed_ must not be zero or M, or else all subsequent computed values
|
||||
// will be zero or M respectively. For all other values, seed_ will end
|
||||
// up cycling through every number in [1,M-1]
|
||||
uint64_t product = seed_ * A;
|
||||
|
||||
// Compute (product % M) using the fact that ((x << 31) % M) == x.
|
||||
seed_ = static_cast<uint32_t>((product >> 31) + (product & M));
|
||||
// The first reduction may overflow by 1 bit, so we may need to
|
||||
// repeat. mod == M is not possible; using > allows the faster
|
||||
// sign-bit-based test.
|
||||
if (seed_ > M) {
|
||||
seed_ -= M;
|
||||
}
|
||||
return seed_;
|
||||
}
|
||||
|
||||
// Returns a uniformly distributed value in the range [0..n-1]
|
||||
// REQUIRES: n > 0
|
||||
uint32_t Uniform(int n) { return Next() % n; }
|
||||
|
||||
// Randomly returns true ~"1/n" of the time, and false otherwise.
|
||||
// REQUIRES: n > 0
|
||||
bool OneIn(int n) { return Uniform(n) == 0; }
|
||||
|
||||
// "Optional" one-in-n, where 0 or negative always returns false
|
||||
// (may or may not consume a random value)
|
||||
bool OneInOpt(int n) { return n > 0 && OneIn(n); }
|
||||
|
||||
// Returns random bool that is true for the given percentage of
|
||||
// calls on average. Zero or less is always false and 100 or more
|
||||
// is always true (may or may not consume a random value)
|
||||
bool PercentTrue(int percentage) {
|
||||
return static_cast<int>(Uniform(100)) < percentage;
|
||||
}
|
||||
|
||||
// Skewed: pick "base" uniformly from range [0,max_log] and then
|
||||
// return "base" random bits. The effect is to pick a number in the
|
||||
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
|
||||
uint32_t Skewed(int max_log) {
|
||||
return Uniform(1 << Uniform(max_log + 1));
|
||||
}
|
||||
|
||||
// Returns a Random instance for use by the current thread without
|
||||
// additional locking
|
||||
static Random* GetTLSInstance();
|
||||
};
|
||||
|
||||
static rocksdb::Slice RandomString(Random* rnd, int len, std::string* dst) {
|
||||
dst->resize(len);
|
||||
for (int i = 0; i < len; i++) {
|
||||
(*dst)[i] = static_cast<char>(' ' + rnd->Uniform(95)); // ' ' .. '~'
|
||||
}
|
||||
return rocksdb::Slice(*dst);
|
||||
}
|
||||
|
||||
static rocksdb::Slice CompressibleString(Random* rnd, double compressed_fraction,
|
||||
int len, std::string* dst) {
|
||||
int raw = static_cast<int>(len * compressed_fraction);
|
||||
if (raw < 1) raw = 1;
|
||||
std::string raw_data;
|
||||
RandomString(rnd, raw, &raw_data);
|
||||
|
||||
// Duplicate the random data until we have filled "len" bytes
|
||||
dst->clear();
|
||||
while (dst->size() < (unsigned int)len) {
|
||||
dst->append(raw_data);
|
||||
}
|
||||
dst->resize(len);
|
||||
return rocksdb::Slice(*dst);
|
||||
}
|
||||
|
||||
class RandomGenerator {
|
||||
private:
|
||||
std::string data_;
|
||||
unsigned int pos_;
|
||||
std::unique_ptr<BaseDistribution> dist_;
|
||||
|
||||
public:
|
||||
|
||||
RandomGenerator() {
|
||||
auto max_value_size = VALUESIZE_MAX;
|
||||
switch (VALUESIZE_DIST) {
|
||||
case kUniform:
|
||||
dist_.reset(new UniformDistribution(VALUESIZE_MIN,
|
||||
VALUESIZE_MAX));
|
||||
break;
|
||||
case kNormal:
|
||||
dist_.reset(new NormalDistribution(VALUESIZE_MIN,
|
||||
VALUESIZE_MAX));
|
||||
break;
|
||||
case kFixed:
|
||||
default:
|
||||
dist_.reset(new FixedDistribution(FIXED_VALUE_SIZE));
|
||||
max_value_size = FIXED_VALUE_SIZE;
|
||||
}
|
||||
// We use a limited amount of data over and over again and ensure
|
||||
// that it is larger than the compression window (32KB), and also
|
||||
// large enough to serve all typical value sizes we want to write.
|
||||
Random rnd(301);
|
||||
std::string piece;
|
||||
while (data_.size() < (unsigned)std::max(1048576, max_value_size)) {
|
||||
// Add a short fragment that is as compressible as specified
|
||||
// by FLAGS_compression_ratio.
|
||||
CompressibleString(&rnd, 0.5, 100, &piece);
|
||||
data_.append(piece);
|
||||
}
|
||||
pos_ = 0;
|
||||
}
|
||||
|
||||
rocksdb::Slice Generate(unsigned int len) {
|
||||
assert(len <= data_.size());
|
||||
if (pos_ + len > data_.size()) {
|
||||
pos_ = 0;
|
||||
}
|
||||
pos_ += len;
|
||||
return rocksdb::Slice(data_.data() + pos_ - len, len);
|
||||
}
|
||||
|
||||
rocksdb::Slice Generate() {
|
||||
auto len = dist_->Generate();
|
||||
return Generate(len);
|
||||
}
|
||||
};
|
||||
|
||||
// The inverse function of Pareto distribution
|
||||
static int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
|
||||
double ret;
|
||||
if (k == 0.0) {
|
||||
ret = theta - sigma * std::log(u);
|
||||
} else {
|
||||
ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
|
||||
}
|
||||
return static_cast<int64_t>(ceil(ret));
|
||||
}
|
||||
|
||||
|
||||
static int64_t GetRandomKey(Random64* rand) {
|
||||
uint64_t rand_int = rand->Next();
|
||||
int64_t key_rand = 0;
|
||||
if (READ_RANDOM_EXP_RANGE == 0) {
|
||||
key_rand = rand_int % TOTAL_KEYS;
|
||||
}
|
||||
return key_rand;
|
||||
}
|
||||
|
||||
static rocksdb::Slice AllocateKey(std::unique_ptr<const char[]>* key_guard, int key_size_) {
|
||||
char* data = new char[key_size_];
|
||||
const char* const_data = data;
|
||||
key_guard->reset(const_data);
|
||||
return rocksdb::Slice(key_guard->get(), key_size_);
|
||||
}
|
||||
|
||||
static void GenerateKeyFromInt(uint64_t v, int64_t num_keys, int key_size_, rocksdb::Slice* key) {
|
||||
char* start = const_cast<char*>(key->data());
|
||||
char* pos = start;
|
||||
|
||||
int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
|
||||
if (IS_LITTLE_ENDIAN) {
|
||||
for (int i = 0; i < bytes_to_fill; ++i) {
|
||||
pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
|
||||
}
|
||||
} else {
|
||||
memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
|
||||
}
|
||||
pos += bytes_to_fill;
|
||||
if (key_size_ > pos - start) {
|
||||
memset(pos, '0', key_size_ - (pos - start));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
GenerateTwoTermExpKeys gen_exp;
|
||||
QueryDecider query;
|
||||
Random64 rand;
|
||||
RandomGenerator gen;
|
||||
rocksdb::Slice key;
|
||||
std::unique_ptr<const char[]> key_guard;
|
||||
|
||||
public:
|
||||
rdb_gen(const int conn_id, std::unordered_map<std::string, std::string>* args);
|
||||
rdb_gen() = delete;
|
||||
~rdb_gen();
|
||||
DISABLE_EVIL_CONSTRUCTORS(rdb_gen);
|
||||
int send_req(int fd);
|
||||
int read_resp(int fd);
|
||||
};
|
||||
#endif
|
@ -1,323 +0,0 @@
|
||||
#include <arpa/inet.h>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cerrno>
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <new>
|
||||
#include <random>
|
||||
#include <stdint.h>
|
||||
#include <sstream>
|
||||
#include <sys/endian.h>
|
||||
#include <sys/param.h>
|
||||
#include <chrono>
|
||||
#include <unistd.h>
|
||||
#include "options.h"
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
#include "rocksdb/cache.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
#include <rocksdb/cache.h>
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/env.h>
|
||||
#include <rocksdb/filter_policy.h>
|
||||
#include <rocksdb/memtablerep.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/perf_context.h>
|
||||
#include <rocksdb/persistent_cache.h>
|
||||
#include <rocksdb/rate_limiter.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <rocksdb/slice_transform.h>
|
||||
#include <rocksdb/stats_history.h>
|
||||
#include <rocksdb/utilities/object_registry.h>
|
||||
#include <rocksdb/utilities/optimistic_transaction_db.h>
|
||||
#include <rocksdb/utilities/options_util.h>
|
||||
#include <rocksdb/utilities/sim_cache.h>
|
||||
#include <rocksdb/utilities/transaction.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include <rocksdb/write_batch.h>
|
||||
#include <rocksdb/table.h>
|
||||
#endif
|
||||
|
||||
#include <msg.pb.h>
|
||||
#include <util.h>
|
||||
#include <const.h>
|
||||
|
||||
////////////////
|
||||
// TOUCH Generator
|
||||
////////////////
|
||||
touch_proc::touch_proc(std::unordered_map<std::string, std::string>* args) : req_proc()
|
||||
{
|
||||
if (args->find(PARAM_TBSZ) != args->end()) {
|
||||
this->buffer_sz = atoi(args->at(PARAM_TBSZ).c_str());
|
||||
} else {
|
||||
this->buffer_sz = PARAM_TBSZ_DEFAULT;
|
||||
}
|
||||
|
||||
V("Allocating %d items x %d CASZ for connnection\n", this->buffer_sz, CACHE_LINE_SIZE);
|
||||
this->buffer = (struct ppd_touch_cache_item *)aligned_alloc(CACHE_LINE_SIZE, this->buffer_sz * sizeof(struct ppd_touch_cache_item));
|
||||
|
||||
this->rgen = new std::default_random_engine();
|
||||
this->rdist = new std::uniform_int_distribution<int>(0, INT32_MAX);
|
||||
}
|
||||
|
||||
touch_proc::~touch_proc()
|
||||
{
|
||||
delete this->buffer;
|
||||
delete this->rgen;
|
||||
delete this->rdist;
|
||||
}
|
||||
|
||||
int touch_proc::proc_req(int fd)
|
||||
{
|
||||
ppd_touch_req req;
|
||||
ppd_touch_resp resp;
|
||||
struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
|
||||
|
||||
int ret = readmsg(fd, this->read_buf, this->MAX_READ_BUF_SIZE);
|
||||
|
||||
if (ret < 0) {
|
||||
W("Readmsg failed with %d for connection %d\n", ret, fd);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (!req.ParseFromArray(msg->payload, msg->size)) {
|
||||
W("ParseFromArray failed for connection %d\n", fd);
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
V("Conn %d touching %d items...\n", fd, req.touch_cnt());
|
||||
|
||||
int sum = 0;
|
||||
int rand = (*this->rdist)(*this->rgen);
|
||||
for(int64_t i = rand; i < rand + req.touch_cnt(); i++) {
|
||||
if (req.inc() > 0) {
|
||||
this->buffer[i % this->buffer_sz].val += 1;
|
||||
} else {
|
||||
/* only read */
|
||||
sum += this->buffer[i % this->buffer_sz].val;
|
||||
}
|
||||
}
|
||||
sum = sum + 1;
|
||||
|
||||
resp.set_status(0);
|
||||
if (!resp.SerializeToArray(this->read_buf, MAX_READ_BUF_SIZE)) {
|
||||
W("Couldn't searialize to array connection %d\n", fd);
|
||||
}
|
||||
|
||||
ret = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
|
||||
if (ret < 0) {
|
||||
W("Writemsg failed with %d for connection %d\n", ret, fd);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
////////////////
|
||||
// ECHO Generator
|
||||
////////////////
|
||||
|
||||
echo_proc::echo_proc(std::unordered_map<std::string, std::string>*) : req_proc()
|
||||
{
|
||||
this->buf = new char[BUFFER_SZ];
|
||||
srand(time(NULL));
|
||||
for(size_t i = 0; i < BUFFER_SZ; i++) {
|
||||
char ch = (char)rand();
|
||||
this->buf[i] = ch;
|
||||
}
|
||||
}
|
||||
|
||||
echo_proc::~echo_proc()
|
||||
{
|
||||
delete this->buf;
|
||||
}
|
||||
|
||||
int echo_proc::proc_req(int fd)
|
||||
{
|
||||
uint64_t ms1, ms2;
|
||||
struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
|
||||
int ret = readmsg(fd, this->read_buf, MAX_READ_BUF_SIZE);
|
||||
|
||||
if (ret < 0) {
|
||||
W("Readbuf failed with %d for connection %d\n", ret, fd);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ms1 = get_time_us();
|
||||
if (!req.ParseFromArray(msg->payload, msg->size)) {
|
||||
W("ParseFromArray failed for connection %d\n", fd);
|
||||
return EINVAL;
|
||||
}
|
||||
int data_sz = req.data_size();
|
||||
V("Connection %d delay %d us %d data size \n", fd, req.enable_delay(), data_sz);
|
||||
|
||||
if (req.enable_delay() > 0) {
|
||||
uint64_t server_delay = req.enable_delay();
|
||||
uint64_t now = get_time_us();
|
||||
while (get_time_us() - now <= server_delay) {};
|
||||
}
|
||||
|
||||
if (data_sz > (int)BUFFER_SZ) {
|
||||
data_sz = BUFFER_SZ;
|
||||
}
|
||||
if (data_sz <= 0) {
|
||||
data_sz = 1;
|
||||
}
|
||||
resp.set_data(this->buf, data_sz);
|
||||
resp.set_status(0);
|
||||
if (!resp.SerializeToArray(this->read_buf, MAX_READ_BUF_SIZE)) {
|
||||
W("Couldn't searialize to array connection %d\n", fd);
|
||||
}
|
||||
ms2 = get_time_us();
|
||||
|
||||
V("Serialization: TIME: %ld\n", ms2 - ms1);
|
||||
|
||||
ret = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
|
||||
if (ret < 0) {
|
||||
W("Writemsg failed with %d for connection %d\n", ret, fd);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
////////////////
|
||||
// rdb Generator
|
||||
////////////////
|
||||
|
||||
rocksdb::DB * rdb_proc::db = nullptr;
|
||||
std::atomic<int> rdb_proc::db_init {0};
|
||||
|
||||
rdb_proc::rdb_proc(std::unordered_map<std::string, std::string>* args) : req_proc()
|
||||
{
|
||||
const char * db_path;
|
||||
int desired = 0;
|
||||
int target = 1;
|
||||
if (std::atomic_compare_exchange_strong(&rdb_proc::db_init, &desired, target)) {
|
||||
if (args->find(PARAM_PATH) != args->end()) {
|
||||
db_path = args->at(PARAM_PATH).c_str();
|
||||
} else {
|
||||
E("Must specify -OPATH for rocksdb.\n");
|
||||
}
|
||||
|
||||
V("Initializing rocksdb, path: %s.\n", db_path);
|
||||
|
||||
rocksdb::Options opt;
|
||||
std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(CACHE_SIZE, 6, false, 0.0);
|
||||
opt.use_direct_io_for_flush_and_compaction = USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION;
|
||||
opt.use_direct_reads = USE_DIRECT_READS;
|
||||
|
||||
rocksdb::BlockBasedTableOptions block_based_options;
|
||||
block_based_options.index_type = rocksdb::BlockBasedTableOptions::kBinarySearch;
|
||||
block_based_options.block_cache = cache;
|
||||
opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(block_based_options));
|
||||
|
||||
opt.IncreaseParallelism(12);
|
||||
opt.OptimizeLevelStyleCompaction(1024 * 1024 * 1024);
|
||||
opt.OptimizeUniversalStyleCompaction(1024 * 1024 * 1024);
|
||||
opt.write_buffer_size = 1024 * 1024 * 1024;
|
||||
opt.create_if_missing = false;
|
||||
opt.compression = rocksdb::kNoCompression;
|
||||
|
||||
|
||||
rocksdb::Status s = rocksdb::DB::Open(opt, std::string(db_path), &this->db);
|
||||
if (!s.ok()) {
|
||||
E("Could not open rocksdb! Err %s\n", s.ToString().c_str());
|
||||
}
|
||||
|
||||
rdb_proc::db_init.store(2);
|
||||
V("Finished initializing rocksdb.\n");
|
||||
} else {
|
||||
V("Checking for rocksdb initialization...\n");
|
||||
while(rdb_proc::db_init.load() != 2) {};
|
||||
V("Detected initialized rocksdb.\n");
|
||||
}
|
||||
}
|
||||
|
||||
rdb_proc::~rdb_proc()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
int rdb_proc::proc_req(int fd)
|
||||
{
|
||||
ppd_rdb_resp resp;
|
||||
ppd_rdb_req req;
|
||||
rocksdb::Status s;
|
||||
struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
|
||||
|
||||
int i = 0;
|
||||
int status = readmsg(fd, this->read_buf, MAX_READ_BUF_SIZE);
|
||||
|
||||
if (status != 0) {
|
||||
W("Readmsg failed with %d for connection %d\n", status, fd);
|
||||
return status;
|
||||
}
|
||||
|
||||
if (!req.ParseFromArray(msg->payload, msg->size)) {
|
||||
W("ParseFromArray failed for connection %d\n", fd);
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
V("Connection %d op: %d, key: %s. val: %s. optarg: %d.\n", fd, req.op(), req.key().c_str(), req.val().c_str(), req.optarg());
|
||||
|
||||
switch (req.op()) {
|
||||
case PPD_RDB_OP_PUT:{
|
||||
s = this->db->Put(rocksdb::WriteOptions(), req.key(), req.val());
|
||||
resp.set_status(s.code());
|
||||
break;
|
||||
}
|
||||
case PPD_RDB_OP_GET: {
|
||||
std::string val;
|
||||
s = this->db->Get(rocksdb::ReadOptions(), req.key(), &val);
|
||||
if (s.ok()) {
|
||||
resp.set_result(val);
|
||||
}
|
||||
resp.set_status(s.code());
|
||||
break;
|
||||
}
|
||||
case PPD_RDB_OP_SEEK: {
|
||||
rocksdb::Slice val;
|
||||
rocksdb::Iterator *it = this->db->NewIterator(rocksdb::ReadOptions(false, true));
|
||||
|
||||
it->Seek(req.key());
|
||||
resp.set_status(it->status().code());
|
||||
|
||||
if (it->Valid()) {
|
||||
val = it->value();
|
||||
resp.set_result(val.data(), val.size());
|
||||
}
|
||||
|
||||
for(int64_t j = 0; j < req.optarg() && it->Valid(); j++) {
|
||||
rocksdb::Slice val = it->value();
|
||||
// do something about the key
|
||||
std::memcpy(this->read_buf, val.data(), MIN(val.size(), MAX_READ_BUF_SIZE));
|
||||
it->Next();
|
||||
if (!it->status().ok()) {
|
||||
resp.set_status(it->status().code());
|
||||
break;
|
||||
}
|
||||
}
|
||||
delete it;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
W("Invalid opcode %d for connection %d\n", req.op(), fd);
|
||||
return EINVAL;
|
||||
}
|
||||
}
|
||||
|
||||
resp.set_status(0);
|
||||
|
||||
status = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
|
||||
if (status < 0) {
|
||||
W("Writemsg failed with %d for connection %d\n", status, fd);
|
||||
}
|
||||
|
||||
|
||||
return status;
|
||||
}
|
||||
#endif
|
@ -1,85 +0,0 @@
|
||||
#pragma once
|
||||
#ifdef WITH_ROCKSDB
|
||||
#include <rocksdb/db.h>
|
||||
#endif
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include <random>
|
||||
#include <unordered_map>
|
||||
#include <const.h>
|
||||
#include <atomic>
|
||||
#include <sys/param.h>
|
||||
|
||||
#include <msg.pb.h>
|
||||
|
||||
#define DISABLE_EVIL_CONSTRUCTORS(name) \
|
||||
name(const name&) = delete; \
|
||||
void operator=(const name) = delete
|
||||
|
||||
struct alignas(CACHE_LINE_SIZE) ppd_touch_cache_item {
|
||||
int val;
|
||||
};
|
||||
|
||||
class req_proc {
|
||||
protected:
|
||||
constexpr static int MAX_READ_BUF_SIZE = 1024 * 1024;
|
||||
char * read_buf;
|
||||
public:
|
||||
req_proc() {this->read_buf = new char[MAX_READ_BUF_SIZE]; };
|
||||
virtual ~req_proc() {delete[] this->read_buf;};
|
||||
virtual int proc_req(int fd) = 0;
|
||||
};
|
||||
|
||||
class touch_proc : public req_proc
|
||||
{
|
||||
private:
|
||||
int buffer_sz;
|
||||
std::default_random_engine * rgen;
|
||||
std::uniform_int_distribution<int> * rdist;
|
||||
struct ppd_touch_cache_item* buffer;
|
||||
static constexpr const char* PARAM_TBSZ = "ENTRIES";
|
||||
static constexpr const int PARAM_TBSZ_DEFAULT = 64;
|
||||
constexpr static int MAX_SZ = 1024 * 1024;
|
||||
public:
|
||||
touch_proc(std::unordered_map<std::string, std::string>* args);
|
||||
touch_proc() = delete;
|
||||
~touch_proc();
|
||||
DISABLE_EVIL_CONSTRUCTORS(touch_proc);
|
||||
int proc_req(int fd);
|
||||
};
|
||||
|
||||
class echo_proc : public req_proc
|
||||
{
|
||||
private:
|
||||
ppd_echo_req req;
|
||||
ppd_echo_resp resp;
|
||||
char * buf;
|
||||
static constexpr size_t BUFFER_SZ = 1024 * 1024;
|
||||
public:
|
||||
echo_proc(std::unordered_map<std::string, std::string>* args);
|
||||
echo_proc() = delete;
|
||||
~echo_proc();
|
||||
DISABLE_EVIL_CONSTRUCTORS(echo_proc);
|
||||
int proc_req(int fd);
|
||||
};
|
||||
|
||||
#ifdef WITH_ROCKSDB
|
||||
class rdb_proc : public req_proc
|
||||
{
|
||||
private:
|
||||
constexpr static bool USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION = true;
|
||||
constexpr static bool USE_DIRECT_READS = true;
|
||||
constexpr static int CACHE_SIZE = 268435456;
|
||||
constexpr static int MAX_MSG_SZ = 4096;
|
||||
static constexpr const char* PARAM_PATH = "PATH";
|
||||
static std::atomic<int> db_init;
|
||||
static rocksdb::DB *db;
|
||||
public:
|
||||
rdb_proc(std::unordered_map<std::string, std::string>* args);
|
||||
rdb_proc() = delete;
|
||||
~rdb_proc();
|
||||
DISABLE_EVIL_CONSTRUCTORS(rdb_proc);
|
||||
int proc_req(int fd);
|
||||
};
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user