WIP, broken

This commit is contained in:
quackerd 2023-01-17 21:22:43 +01:00
parent e2f4842849
commit b2e7c471d0
53 changed files with 3533 additions and 1988 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

198
.clang-format Normal file
View File

@ -0,0 +1,198 @@
# $FreeBSD$
# Basic .clang-format
---
BasedOnStyle: WebKit
AlignAfterOpenBracket: DontAlign
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlines: Left
AlignOperands: false
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: InlineOnly
AllowShortIfStatementsOnASingleLine: Never
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterReturnType: TopLevelDefinitions
AlwaysBreakBeforeMultilineStrings: false
AlwaysBreakTemplateDeclarations: MultiLine
BinPackArguments: true
BinPackParameters: true
BreakBeforeBinaryOperators: None
BreakBeforeBraces: WebKit
BreakBeforeTernaryOperators: false
# TODO: BreakStringLiterals can cause very strange formatting so turn it off?
BreakStringLiterals: false
# Prefer:
# some_var = function(arg1,
# arg2)
# over:
# some_var =
# function(arg1, arg2)
PenaltyBreakAssignment: 100
# Prefer:
# some_long_function(arg1, arg2
# arg3)
# over:
# some_long_function(
# arg1, arg2, arg3)
PenaltyBreakBeforeFirstCallParameter: 100
CompactNamespaces: true
DerivePointerAlignment: false
DisableFormat: false
ForEachMacros:
- ARB_ARRFOREACH
- ARB_ARRFOREACH_REVWCOND
- ARB_ARRFOREACH_REVERSE
- ARB_FOREACH
- ARB_FOREACH_FROM
- ARB_FOREACH_SAFE
- ARB_FOREACH_REVERSE
- ARB_FOREACH_REVERSE_FROM
- ARB_FOREACH_REVERSE_SAFE
- BIT_FOREACH_ISCLR
- BIT_FOREACH_ISSET
- CPU_FOREACH
- CPU_FOREACH_ISCLR
- CPU_FOREACH_ISSET
- FOREACH_THREAD_IN_PROC
- FOREACH_PROC_IN_SYSTEM
- FOREACH_PRISON_CHILD
- FOREACH_PRISON_DESCENDANT
- FOREACH_PRISON_DESCENDANT_LOCKED
- FOREACH_PRISON_DESCENDANT_LOCKED_LEVEL
- MNT_VNODE_FOREACH_ALL
- MNT_VNODE_FOREACH_ACTIVE
- RB_FOREACH
- RB_FOREACH_FROM
- RB_FOREACH_SAFE
- RB_FOREACH_REVERSE
- RB_FOREACH_REVERSE_FROM
- RB_FOREACH_REVERSE_SAFE
- SLIST_FOREACH
- SLIST_FOREACH_FROM
- SLIST_FOREACH_FROM_SAFE
- SLIST_FOREACH_SAFE
- SLIST_FOREACH_PREVPTR
- SPLAY_FOREACH
- LIST_FOREACH
- LIST_FOREACH_FROM
- LIST_FOREACH_FROM_SAFE
- LIST_FOREACH_SAFE
- STAILQ_FOREACH
- STAILQ_FOREACH_FROM
- STAILQ_FOREACH_FROM_SAFE
- STAILQ_FOREACH_SAFE
- TAILQ_FOREACH
- TAILQ_FOREACH_FROM
- TAILQ_FOREACH_FROM_SAFE
- TAILQ_FOREACH_REVERSE
- TAILQ_FOREACH_REVERSE_FROM
- TAILQ_FOREACH_REVERSE_FROM_SAFE
- TAILQ_FOREACH_REVERSE_SAFE
- TAILQ_FOREACH_SAFE
- VM_MAP_ENTRY_FOREACH
- VM_PAGE_DUMP_FOREACH
IndentCaseLabels: false
IndentPPDirectives: None
Language: Cpp
NamespaceIndentation: None
PointerAlignment: Right
ContinuationIndentWidth: 4
IndentWidth: 8
TabWidth: 8
ColumnLimit: 100
UseTab: Always
SpaceAfterCStyleCast: false
IncludeBlocks: Regroup
IncludeCategories:
- Regex: '^\"opt_.*\.h\"'
Priority: 1
SortPriority: 10
- Regex: '^<sys/cdefs\.h>'
Priority: 2
SortPriority: 20
- Regex: '^<sys/types\.h>'
Priority: 2
SortPriority: 21
- Regex: '^<sys/param\.h>'
Priority: 2
SortPriority: 22
- Regex: '^<sys/systm\.h>'
Priority: 2
SortPriority: 23
- Regex: '^<sys.*/'
Priority: 2
SortPriority: 24
- Regex: '^<vm/vm\.h>'
Priority: 3
SortPriority: 30
- Regex: '^<vm/'
Priority: 3
SortPriority: 31
- Regex: '^<machine/'
Priority: 4
SortPriority: 40
- Regex: '^<(x86|amd64|i386|xen)/'
Priority: 5
SortPriority: 50
- Regex: '^<dev/'
Priority: 6
SortPriority: 60
- Regex: '^<net.*/'
Priority: 7
SortPriority: 70
- Regex: '^<protocols/'
Priority: 7
SortPriority: 71
- Regex: '^<(fs|nfs(|client|server)|ufs)/'
Priority: 8
SortPriority: 80
- Regex: '^<[^/].*\.h'
Priority: 9
SortPriority: 90
- Regex: '^\".*\.h\"'
Priority: 10
SortPriority: 100
# LLVM's header include ordering style is almost the exact opposite of ours.
# Unfortunately, they have hard-coded their preferences into clang-format.
# Clobbering this regular expression to avoid matching prevents non-system
# headers from being forcibly moved to the top of the include list.
# http://llvm.org/docs/CodingStandards.html#include-style
IncludeIsMainRegex: 'BLAH_DONT_MATCH_ANYTHING'
SortIncludes: true
KeepEmptyLinesAtTheStartOfBlocks: true
TypenameMacros:
- ARB_ELMTYPE
- ARB_HEAD
- ARB8_HEAD
- ARB16_HEAD
- ARB32_HEAD
- ARB_ENTRY
- ARB8_ENTRY
- ARB16_ENTRY
- ARB32_ENTRY
- LIST_CLASS_ENTRY
- LIST_CLASS_HEAD
- LIST_ENTRY
- LIST_HEAD
- QUEUE_TYPEOF
- RB_ENTRY
- RB_HEAD
- SLIST_CLASS_HEAD
- SLIST_CLASS_ENTRY
- SLIST_HEAD
- SLIST_ENTRY
- SMR_POINTER
- SPLAY_ENTRY
- SPLAY_HEAD
- STAILQ_CLASS_ENTRY
- STAILQ_CLASS_HEAD
- STAILQ_ENTRY
- STAILQ_HEAD
- TAILQ_CLASS_ENTRY
- TAILQ_CLASS_HEAD
- TAILQ_ENTRY
- TAILQ_HEAD

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.10.0)
project(pingpong)
project(ppd)
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR})

BIN
build/dismember Executable file

Binary file not shown.

1991
build/msg/msg.pb.cc Normal file

File diff suppressed because it is too large Load Diff

View File

@ -50,6 +50,9 @@ extern ppd_echo_reqDefaultTypeInternal _ppd_echo_req_default_instance_;
class ppd_echo_resp;
struct ppd_echo_respDefaultTypeInternal;
extern ppd_echo_respDefaultTypeInternal _ppd_echo_resp_default_instance_;
class ppd_master_start;
struct ppd_master_startDefaultTypeInternal;
extern ppd_master_startDefaultTypeInternal _ppd_master_start_default_instance_;
class ppd_rdb_req;
struct ppd_rdb_reqDefaultTypeInternal;
extern ppd_rdb_reqDefaultTypeInternal _ppd_rdb_req_default_instance_;
@ -68,6 +71,7 @@ extern ppd_touch_respDefaultTypeInternal _ppd_touch_resp_default_instance_;
PROTOBUF_NAMESPACE_OPEN
template<> ::ppd_echo_req* Arena::CreateMaybeMessage<::ppd_echo_req>(Arena*);
template<> ::ppd_echo_resp* Arena::CreateMaybeMessage<::ppd_echo_resp>(Arena*);
template<> ::ppd_master_start* Arena::CreateMaybeMessage<::ppd_master_start>(Arena*);
template<> ::ppd_rdb_req* Arena::CreateMaybeMessage<::ppd_rdb_req>(Arena*);
template<> ::ppd_rdb_resp* Arena::CreateMaybeMessage<::ppd_rdb_resp>(Arena*);
template<> ::ppd_slave_resp* Arena::CreateMaybeMessage<::ppd_slave_resp>(Arena*);
@ -242,6 +246,149 @@ class ppd_slave_resp final :
};
// -------------------------------------------------------------------
class ppd_master_start final :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:ppd_master_start) */ {
public:
inline ppd_master_start() : ppd_master_start(nullptr) {}
~ppd_master_start() override;
explicit PROTOBUF_CONSTEXPR ppd_master_start(::PROTOBUF_NAMESPACE_ID::internal::ConstantInitialized);
ppd_master_start(const ppd_master_start& from);
ppd_master_start(ppd_master_start&& from) noexcept
: ppd_master_start() {
*this = ::std::move(from);
}
inline ppd_master_start& operator=(const ppd_master_start& from) {
CopyFrom(from);
return *this;
}
inline ppd_master_start& operator=(ppd_master_start&& from) noexcept {
if (this == &from) return *this;
if (GetOwningArena() == from.GetOwningArena()
#ifdef PROTOBUF_FORCE_COPY_IN_MOVE
&& GetOwningArena() != nullptr
#endif // !PROTOBUF_FORCE_COPY_IN_MOVE
) {
InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return default_instance().GetMetadata().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return default_instance().GetMetadata().reflection;
}
static const ppd_master_start& default_instance() {
return *internal_default_instance();
}
static inline const ppd_master_start* internal_default_instance() {
return reinterpret_cast<const ppd_master_start*>(
&_ppd_master_start_default_instance_);
}
static constexpr int kIndexInFileMessages =
1;
friend void swap(ppd_master_start& a, ppd_master_start& b) {
a.Swap(&b);
}
inline void Swap(ppd_master_start* other) {
if (other == this) return;
#ifdef PROTOBUF_FORCE_COPY_IN_SWAP
if (GetOwningArena() != nullptr &&
GetOwningArena() == other->GetOwningArena()) {
#else // PROTOBUF_FORCE_COPY_IN_SWAP
if (GetOwningArena() == other->GetOwningArena()) {
#endif // !PROTOBUF_FORCE_COPY_IN_SWAP
InternalSwap(other);
} else {
::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other);
}
}
void UnsafeArenaSwap(ppd_master_start* other) {
if (other == this) return;
GOOGLE_DCHECK(GetOwningArena() == other->GetOwningArena());
InternalSwap(other);
}
// implements Message ----------------------------------------------
ppd_master_start* New(::PROTOBUF_NAMESPACE_ID::Arena* arena = nullptr) const final {
return CreateMaybeMessage<ppd_master_start>(arena);
}
using ::PROTOBUF_NAMESPACE_ID::Message::CopyFrom;
void CopyFrom(const ppd_master_start& from);
using ::PROTOBUF_NAMESPACE_ID::Message::MergeFrom;
void MergeFrom(const ppd_master_start& from);
private:
static void MergeImpl(::PROTOBUF_NAMESPACE_ID::Message* to, const ::PROTOBUF_NAMESPACE_ID::Message& from);
public:
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
uint8_t* _InternalSerialize(
uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
void SharedCtor();
void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(ppd_master_start* other);
private:
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "ppd_master_start";
}
protected:
explicit ppd_master_start(::PROTOBUF_NAMESPACE_ID::Arena* arena,
bool is_message_owned = false);
public:
static const ClassData _class_data_;
const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*GetClassData() const final;
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kDummyFieldNumber = 1,
};
// int32 dummy = 1;
void clear_dummy();
int32_t dummy() const;
void set_dummy(int32_t value);
private:
int32_t _internal_dummy() const;
void _internal_set_dummy(int32_t value);
public:
// @@protoc_insertion_point(class_scope:ppd_master_start)
private:
class _Internal;
template <typename T> friend class ::PROTOBUF_NAMESPACE_ID::Arena::InternalHelper;
typedef void InternalArenaConstructable_;
typedef void DestructorSkippable_;
int32_t dummy_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_msg_2eproto;
};
// -------------------------------------------------------------------
class ppd_rdb_req final :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:ppd_rdb_req) */ {
public:
@ -290,7 +437,7 @@ class ppd_rdb_req final :
&_ppd_rdb_req_default_instance_);
}
static constexpr int kIndexInFileMessages =
1;
2;
friend void swap(ppd_rdb_req& a, ppd_rdb_req& b) {
a.Swap(&b);
@ -476,7 +623,7 @@ class ppd_rdb_resp final :
&_ppd_rdb_resp_default_instance_);
}
static constexpr int kIndexInFileMessages =
2;
3;
friend void swap(ppd_rdb_resp& a, ppd_rdb_resp& b) {
a.Swap(&b);
@ -635,7 +782,7 @@ class ppd_echo_req final :
&_ppd_echo_req_default_instance_);
}
static constexpr int kIndexInFileMessages =
3;
4;
friend void swap(ppd_echo_req& a, ppd_echo_req& b) {
a.Swap(&b);
@ -789,7 +936,7 @@ class ppd_echo_resp final :
&_ppd_echo_resp_default_instance_);
}
static constexpr int kIndexInFileMessages =
4;
5;
friend void swap(ppd_echo_resp& a, ppd_echo_resp& b) {
a.Swap(&b);
@ -948,7 +1095,7 @@ class ppd_touch_req final :
&_ppd_touch_req_default_instance_);
}
static constexpr int kIndexInFileMessages =
5;
6;
friend void swap(ppd_touch_req& a, ppd_touch_req& b) {
a.Swap(&b);
@ -1102,7 +1249,7 @@ class ppd_touch_resp final :
&_ppd_touch_resp_default_instance_);
}
static constexpr int kIndexInFileMessages =
6;
7;
friend void swap(ppd_touch_resp& a, ppd_touch_resp& b) {
a.Swap(&b);
@ -1268,6 +1415,30 @@ inline void ppd_slave_resp::set_resp_sz(int32_t value) {
// -------------------------------------------------------------------
// ppd_master_start
// int32 dummy = 1;
inline void ppd_master_start::clear_dummy() {
dummy_ = 0;
}
inline int32_t ppd_master_start::_internal_dummy() const {
return dummy_;
}
inline int32_t ppd_master_start::dummy() const {
// @@protoc_insertion_point(field_get:ppd_master_start.dummy)
return _internal_dummy();
}
inline void ppd_master_start::_internal_set_dummy(int32_t value) {
dummy_ = value;
}
inline void ppd_master_start::set_dummy(int32_t value) {
_internal_set_dummy(value);
// @@protoc_insertion_point(field_set:ppd_master_start.dummy)
}
// -------------------------------------------------------------------
// ppd_rdb_req
// int32 op = 1;
@ -1685,6 +1856,8 @@ inline void ppd_touch_resp::set_status(int32_t value) {
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)

View File

@ -1,124 +0,0 @@
#pragma once
#include <arpa/inet.h>
#include <string.h>
#include <atomic>
#include <const.h>
#include <util.h>
static constexpr const int MAX_GEN_LEN = 31;
static constexpr const int DEFAULT_SERVER_CLIENT_CONN_PORT = 9898;
static constexpr const int DEFAULT_CLIENT_CTL_PORT = 9901;
static constexpr const char* DEFAULT_OUTPUT = "output.sample";
static constexpr const int MAX_GEN_PARAMS = 8;
static constexpr const int MAX_GEN_PARAMS_LEN = 31;
struct option;
extern option options;
struct option {
int verbose;
enum WORKLOAD_TYPE workload_type;
const char *output_name;
int client_num;
int client_thread_count;
int master_thread_count;
int client_conn;
int master_conn;
int target_qps;
int master_qps;
int master_mode;
int client_mode;
std::atomic_int global_conn_start_idx;
char server_ip[INET_ADDRSTRLEN + 1];
char generator_name[MAX_GEN_LEN + 1];
char gen_params[MAX_GEN_PARAMS][MAX_GEN_PARAMS_LEN + 1];
int num_gen_params;
int master_server_ip_given;
char master_server_ip[INET_ADDRSTRLEN + 1];
int server_port;
int depth_limit;
int warmup;
int duration;
option()
{
this->verbose = 0;
this->depth_limit = 1;
this->output_name = DEFAULT_OUTPUT;
this->client_thread_count = 1;
this->master_thread_count = -1;
this->client_conn = 1;
this->master_conn = -1;
this->target_qps = 0;
this->master_qps = -1;
this->client_mode = 0;
this->master_mode = 0;
this->warmup = 0;
this->duration = 10;
this->server_port = DEFAULT_SERVER_CLIENT_CONN_PORT;
this->master_server_ip_given = 0;
this->workload_type = WORKLOAD_TYPE::ECHO;
this->num_gen_params = 0;
this->global_conn_start_idx = 0;
for(int i = 0; i < MAX_GEN_PARAMS; i++) {
memset(gen_params[i], 0, MAX_GEN_LEN + 1);
}
/* set default values */
strncpy(this->generator_name, "fb_ia" , MAX_GEN_LEN);
strncpy(this->server_ip, "127.0.0.1" , INET_ADDRSTRLEN);
strncpy(this->master_server_ip, "127.0.0.1", INET_ADDRSTRLEN);
}
void dump()
{
V ("Configuration:\n"
" Connections per thread: %d\n"
" Num threads: %d\n"
" Target QPS: %d\n"
" warmup: %d\n"
" duration: %d\n"
" master_mode: %d\n"
" client_mode: %d\n"
" output_file: %s\n"
" server_ip: %s\n"
" server_port: %d\n"
" IA_DIST: %s\n"
" master_server_ip: %s\n"
" workload_type: %d\n"
" num_workload_param: %d\n"
" global_conn_idx: %d\n",
this->client_conn,
this->client_thread_count,
this->target_qps,
this->warmup,
this->duration,
this->master_mode,
this->client_mode,
this->output_name,
this->server_ip,
this->server_port,
this->generator_name,
this->master_server_ip,
this->workload_type,
this->num_gen_params,
this->global_conn_start_idx.load());
}
};
/* Don't send god damn vtables */
static_assert(std::is_standard_layout<option>(), "struct option must be standard layout");

View File

@ -1,30 +0,0 @@
#include <inttypes.h>
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#include "util.h"
void sleep_time(double duration) {
if (duration > 0) usleep((useconds_t) (duration * 1000000));
}
#define FNV_64_PRIME (0x100000001b3ULL)
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
uint64_t fnv_64_buf(const void* buf, size_t len) {
uint64_t hval = FNV1_64_INIT;
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
while (bp < be) {
hval ^= (uint64_t)*bp++;
hval *= FNV_64_PRIME;
}
return hval;
}
void generate_key(int n, int length, char *buf) {
snprintf(buf, length + 1, "%0*d", length, n);
}

View File

@ -1,52 +0,0 @@
#ifndef UTIL_H
#define UTIL_H
#include <sys/time.h>
#include <time.h>
inline double tv_to_double(struct timeval *tv) {
return tv->tv_sec + (double) tv->tv_usec / 1000000;
}
inline void double_to_tv(double val, struct timeval *tv) {
long long secs = (long long) val;
long long usecs = (long long) ((val - secs) * 1000000);
tv->tv_sec = secs;
tv->tv_usec = usecs;
}
inline double get_time_accurate() {
#if USE_CLOCK_GETTIME
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec + (double) ts.tv_nsec / 1000000000;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
return tv_to_double(&tv);
#endif
}
inline double get_time() {
//#if USE_CLOCK_GETTIME
// struct timespec ts;
// clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
// // clock_gettime(CLOCK_REALTIME, &ts);
// return ts.tv_sec + (double) ts.tv_nsec / 1000000000;
//#else
struct timeval tv;
gettimeofday(&tv, NULL);
return tv_to_double(&tv);
//#endif
}
void sleep_time(double duration);
uint64_t fnv_64_buf(const void* buf, size_t len);
inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); }
void generate_key(int n, int length, char *buf);
#endif // UTIL_H

View File

@ -23,10 +23,17 @@
#include <string.h>
#include <sys/param.h>
#include "util.h"
#define D(fmt, ...)
#define DIE(fmt, ...)
uint64_t
fnv_64_buf(const void* buf, size_t len);
static inline uint64_t
fnv_64(uint64_t in) {
return fnv_64_buf(&in, sizeof(in));
}
// Generator syntax:
//
// \d+ == fixed

View File

@ -1,19 +0,0 @@
#pragma once
#include <stdint.h>
enum WORKLOAD_TYPE {
ECHO = 0,
TOUCH = 1,
RDB = 2,
HTTP = 3,
};
struct ppd_msg {
uint32_t size;
char payload[0];
};
static constexpr int PPD_RDB_OP_GET = 0;
static constexpr int PPD_RDB_OP_PUT = 1;
static constexpr int PPD_RDB_OP_SEEK = 2;

17
include/logger.h Normal file
View File

@ -0,0 +1,17 @@
#pragma once
#include <stdio.h>
#define W(fmt, ...) do { \
fprintf(stderr, "[WARN] %s: " fmt, __func__, ##__VA_ARGS__); \
} while(0)
#define E(fmt, ...) do { \
fprintf(stderr, "[ERROR] %s: " fmt, __func__, ##__VA_ARGS__); \
exit(1); \
} while(0)
#define V(fmt, ...) do { \
if (options.verbose) { \
fprintf(stdout, "[INFO] %s: " fmt, __func__, ##__VA_ARGS__); \
} \
} while(0)

48
include/mod.h Normal file
View File

@ -0,0 +1,48 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
typedef int (*ppd_global_init_cb)(int argc, char** argk, char** argv, void **ctx);
typedef int (*ppd_thread_create_cb)(int core, void * global_ctx, void **ctx);
typedef int (*ppd_conn_create_cb)(void * global_ctx, void * thread_ctx, void **ctx);
typedef void (*ppd_conn_destroy_cb)(void * global_ctx, void * thread_ctx, void *conn_ctx);
typedef int (*ppd_conn_recv_cb)(const char * data, size_t sz, void * global_ctx, void * thread_ctx, void * conn_ctx);
typedef int (*ppd_conn_send_cb)(const char * out, size_t sz, size_t * out_sz, void * global_ctx, void * thread_ctx, void * conn_ctx);
struct ppd_mod_info {
const char * name;
ppd_global_init_cb global_init_cb;
ppd_thread_create_cb thread_create_cb;
ppd_conn_create_cb conn_create_cb;
ppd_conn_destroy_cb conn_destroy_cb;
ppd_conn_send_cb conn_send_cb;
ppd_conn_recv_cb conn_recv_cb;
};
// struct dismemebr_mod_info {
// const char * name;
// ppd_global_init_cb global_init_cb;
// ppd_thread_create_cb thread_create_cb;
// ppd_conn_create_cb conn_create_cb;
// ppd_conn_destroy_cb conn_destroy_cb;
// ppd_conn_send_cb conn_send_cb;
// ppd_conn_recv_cb conn_recv_cb;
// };
typedef struct ppd_mod_info * (*ppd_get_mod_info_fn)(void);
#define PPD_GET_MOD_INFO_ID ppd_getmod_info
#define PPD_GET_MOD_INFO_FN ("##PPD_GET_MOD_INFO_ID")
#ifdef __cplusplus
}
#endif

View File

@ -1,128 +0,0 @@
#pragma once
#include "const.h"
#include <cerrno>
#include <sys/ioctl.h>
#include <sys/event.h>
#include <stdint.h>
#include <sys/socket.h>
#include <thread>
#include <stdio.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/_cpuset.h>
#include <sys/cpuset.h>
#define W(fmt, ...) do { \
fprintf(stderr, "[WARN] %s: " fmt, __func__, ##__VA_ARGS__); \
} while(0)
#define E(fmt, ...) do { \
fprintf(stderr, "[ERROR] %s: " fmt, __func__, ##__VA_ARGS__); \
exit(1); \
} while(0)
#define V(fmt, ...) do { \
if (options.verbose) { \
fprintf(stdout, "[INFO] %s: " fmt, __func__, ##__VA_ARGS__); \
} \
} while(0)
static inline void
cpulist_to_cpuset(char * cpulist, cpuset_t * cpuset)
{
char * cpu = strtok(cpulist, ",");
CPU_ZERO(cpuset);
while (cpu != nullptr) {
CPU_SET(atoi(cpu), cpuset);
cpu = strtok(nullptr, ",");
}
}
static inline int
readbuf(int fd, void *buf, int len)
{
int status;
do {
if ((status = recv(fd, buf, len, 0)) < 0) {
return -1;
} else if (status == 0) { // connection disconnected.
return -1;
}
buf = (char*)buf + status;
len -= status;
} while (len > 0);
return 0;
}
static inline int
writebuf(int fd, void * buf, int len)
{
int status;
do {
if ((status = send(fd, buf, len, 0)) < 0) {
return -1;
} else if (status == 0) {
return -1;
}
buf = (char*) buf + status;
len -= status;
} while (len > 0);
return 0;
}
static inline int
readmsg(int fd, char *buf, int len)
{
if ((uint)len < sizeof(struct ppd_msg)) {
return EOVERFLOW;
}
int status = readbuf(fd, buf, sizeof(struct ppd_msg));
if (status != 0) {
return status;
}
if (((struct ppd_msg *)buf)->size + sizeof(ppd_msg) > (uint)len) {
return EOVERFLOW;
}
if (((struct ppd_msg *)buf)->size > 0) {
status = readbuf(fd, buf + sizeof(ppd_msg), ((struct ppd_msg *)buf)->size);
}
return status;
}
static inline int
writemsg(int fd, char *buf, int len, const char *msg, int mlen)
{
int real_len = sizeof(struct ppd_msg) + mlen;
if (len < real_len) {
return EOVERFLOW;
}
struct ppd_msg * m = (struct ppd_msg *)buf;
memmove(m->payload, msg, mlen);
m->size = mlen;
return writebuf(fd, buf, real_len);
}
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
#define UNUSED(x) (x)

View File

@ -1,116 +0,0 @@
#pragma once
#include "const.h"
#include <cerrno>
#include <stdint.h>
#include <thread>
#include <stdio.h>
#include <ff_api.h>
#define W(fmt, ...) do { \
fprintf(stderr, "[WARN] " fmt, ##__VA_ARGS__); \
} while(0)
#define E(fmt, ...) do { \
fprintf(stderr, "[ERROR] " fmt, ##__VA_ARGS__); \
exit(1); \
} while(0)
#define V(fmt, ...) do { \
if (options.verbose) { \
fprintf(stdout, "[INFO] " fmt, ##__VA_ARGS__); \
} \
} while(0)
static inline int
readbuf(int fd, void *buf, int len)
{
int status;
do {
if ((status = ff_recv(fd, buf, len, 0)) < 0) {
return -1;
} else if (status == 0) { // connection disconnected.
return -1;
}
buf = (char*)buf + status;
len -= status;
} while (len > 0);
return 0;
}
static inline int
writebuf(int fd, void * buf, int len)
{
int status;
do {
if ((status = ff_send(fd, buf, len, 0)) < 0) {
return -1;
} else if (status == 0) {
return -1;
}
buf = (char*) buf + status;
len -= status;
} while (len > 0);
return 0;
}
static inline int
readmsg(int fd, char *buf, int len)
{
if ((uint)len < sizeof(struct ppd_msg)) {
return EOVERFLOW;
}
int status = readbuf(fd, buf, sizeof(struct ppd_msg));
if (status != 0) {
return status;
}
if (((struct ppd_msg *)buf)->size + sizeof(ppd_msg) > (uint)len) {
return EOVERFLOW;
}
if (((struct ppd_msg *)buf)->size > 0) {
status = readbuf(fd, buf + sizeof(ppd_msg), ((struct ppd_msg *)buf)->size);
}
return status;
}
static inline int
writemsg(int fd, char *buf, int len, const char *msg, int mlen)
{
int real_len = sizeof(struct ppd_msg) + mlen;
if (len < real_len) {
return EOVERFLOW;
}
struct ppd_msg * m = (struct ppd_msg *)buf;
memmove(m->payload, msg, mlen);
m->size = mlen;
return writebuf(fd, buf, real_len);
}
static inline int
get_numcpus()
{
return std::thread::hardware_concurrency();
}
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
#define UNUSED(x) (x)

View File

@ -2,6 +2,23 @@
#include "Generator.h"
#define FNV_64_PRIME (0x100000001b3ULL)
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
uint64_t fnv_64_buf(const void* buf, size_t len)
{
uint64_t hval = FNV1_64_INIT;
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
while (bp < be) {
hval ^= (uint64_t)*bp++;
hval *= FNV_64_PRIME;
}
return hval;
}
Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); }
Generator* createFacebookValue() {

10
modules/dismember.proto Normal file
View File

@ -0,0 +1,10 @@
syntax = "proto3";
message dismember_stat {
int32 type = 1;
bytes data = 2;
}
message dismember_ctrl {
int32 type = 1;
}

View File

@ -1,4 +1,3 @@
#include "reqproc.h"
#include <arpa/inet.h>
#include <atomic>
#include <cassert>

View File

@ -1,43 +0,0 @@
syntax = "proto3";
message ppd_slave_resp {
int32 qps = 1;
int32 send_sz = 2;
int32 resp_sz = 3;
}
message ppd_master_start {
int32 dummy = 0;
}
message ppd_rdb_req {
int32 op = 1;
int32 optarg = 2;
bytes key = 3;
bytes val = 4;
}
message ppd_rdb_resp {
int32 status = 1;
bytes result = 2;
}
message ppd_echo_req {
int32 enable_delay = 1;
int32 data_size = 2;
}
message ppd_echo_resp {
int32 status = 1;
bytes data = 2;
}
message ppd_touch_req {
int32 touch_cnt = 1;
int32 inc = 2;
}
message ppd_touch_resp {
int32 status = 1;
}

View File

File diff suppressed because it is too large Load Diff

9
ppd/msg.h Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include <stdint.h>
#include <sys/endian.h>
struct ppd_msg {
uint32_t size;
char data[0];
};

View File

@ -1,39 +0,0 @@
#pragma once
#include "openssl/types.h"
#include <vector>
#include <util.h>
#include <openssl/ssl.h>
static constexpr int MAX_MODE_PARAMS = 8;
static constexpr int MAX_MODE_PARAMS_LEN = 64;
struct ppd_options {
int port;
int shared_kq;
cpuset_t cpuset;
int verbose;
int enable_tls;
int enable_ktls;
char secret_key_fn[128];
char cert_fn[128];
std::vector<char*> hpip;
int skq_dump;
int skq_flag;
int skq_rtshare;
int skq_rtfreq;
/* the mode this server runs in */
int mode;
char mode_params[MAX_MODE_PARAMS][MAX_MODE_PARAMS_LEN + 1];
int num_mode_params;
/* runtime states */
SSL_CTX * ssl_ctx; // static SSL_CTX
};
extern ppd_options options;

View File

@ -1,56 +1,79 @@
#include <cerrno>
#include <cstdio>
#include <unordered_map>
#include <cstring>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <stdalign.h>
#include <stdio.h>
#include <sys/cpuset.h>
#include <unistd.h>
#include <err.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread_np.h>
#include <sys/types.h>
#include <sys/cpuset.h>
#include <sys/event.h>
#include <sys/socket.h>
#include <sys/sysctl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <vector>
#include <sstream>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <pthread.h>
#include <pthread_np.h>
#include <unistd.h>
#include "options.h"
#include "reqproc.h"
#include <const.h>
#include <util.h>
#include "logger.h"
#include "mod.h"
#include "msg.h"
#include "util.h"
#include <msg.pb.h>
#include <cerrno>
#include <csignal>
#include <cstddef>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <sstream>
#include <unordered_map>
#include <vector>
static constexpr int NEVENT = 2048;
static constexpr int NEVENT = 64;
static constexpr int SOCK_BACKLOG = 10000;
static constexpr int SINGLE_LEGACY = -1;
static constexpr int DEFAULT_PORT = 9898;
// 16MB max per message
static constexpr int MBUF_SZ = 1024 * 1024 * 16;
static constexpr int MAX_MODE_PARAMS = 16;
static constexpr int MAX_MODE_PARAMS_LEN = 128;
static std::unordered_map<std::string, std::string> mode_params;
struct ppd_options {
int port;
int shared_kq;
cpuset_t cpuset;
int verbose;
int enable_tls;
int enable_ktls;
char secret_key_fn[128];
char cert_fn[128];
struct alignas(CACHE_LINE_SIZE) ppd_cache_item {
int val;
std::vector<char *> hpip;
int skq_dump;
int skq_flag;
int skq_rtshare;
int skq_rtfreq;
/* the mode this server runs in */
char module_path[128];
char *mod_argk[MAX_MODE_PARAMS];
char *mod_argv[MAX_MODE_PARAMS];
int mod_argc;
/* runtime states */
SSL_CTX *ssl_ctx; // static SSL_CTX
struct ppd_mod_info *m_info;
void *m_global_ctx;
};
static_assert(sizeof(struct ppd_cache_item) == CACHE_LINE_SIZE, "cache_item not cache line sized");
struct ppd_conn {
int conn_fd;
req_proc * proc;
SSL * ssl;
SSL *ssl;
void *m_conn_ctx;
};
struct ppd_thread_ctx {
@ -58,24 +81,28 @@ struct ppd_thread_ctx {
pthread_t thrd;
int kqfd;
long evcnt;
void *m_thread_ctx;
char *m_buf;
};
ppd_options options = {.port = DEFAULT_PORT,
.shared_kq = 0,
.cpuset = CPUSET_T_INITIALIZER(1), // first core
.verbose = 0,
.enable_tls = 0,
.enable_ktls = 0,
.secret_key_fn = {0},
.cert_fn = {0},
.skq_dump = 0,
.skq_flag = 0,
.skq_rtshare = 100,
.skq_rtfreq = 0,
.mode = WORKLOAD_TYPE::ECHO,
.num_mode_params = 0 };
static ppd_options options = { .port = DEFAULT_PORT,
.shared_kq = 0,
.cpuset = CPUSET_T_INITIALIZER(1), // first core
.verbose = 0,
.enable_tls = 0,
.enable_ktls = 0,
.secret_key_fn = { 0 },
.cert_fn = { 0 },
.skq_dump = 0,
.skq_flag = 0,
.skq_rtshare = 100,
.skq_rtfreq = 0,
.module_path = { 0 },
.mod_argc = 0,
.ssl_ctx = nullptr,
.m_info = nullptr };
static int
static int
main_kqueue_create(std::vector<int> *server_socks)
{
int status;
@ -86,7 +113,7 @@ main_kqueue_create(std::vector<int> *server_socks)
E("Failed to create kqueue: %d.\n", errno);
}
for(uint32_t i = 0; i < server_socks->size(); i++) {
for (uint32_t i = 0; i < server_socks->size(); i++) {
EV_SET(&kev, server_socks->at(i), EVFILT_READ, EV_ADD, 0, 0, NULL);
status = kevent(mkqfd, &kev, 1, NULL, 0, NULL);
@ -101,43 +128,46 @@ main_kqueue_create(std::vector<int> *server_socks)
if (status == -1) {
E("kevent() timer failed: %d.\n", errno);
}
return mkqfd;
}
static void
create_tls_context()
{
SSL_CTX *ctx;
SSL_CTX *ctx;
ctx = SSL_CTX_new(TLS_server_method());
if (!ctx) {
E("SSL_CTX_new() failed: %ld\n", ERR_get_error());
}
ctx = SSL_CTX_new(TLS_server_method());
if (!ctx) {
E("SSL_CTX_new() failed: %ld\n", ERR_get_error());
}
SSL_CTX_set_min_proto_version(ctx, TLS1_2_VERSION);
SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION);
SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION);
if (options.enable_ktls) {
if ((SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS) & SSL_OP_ENABLE_KTLS) != SSL_OP_ENABLE_KTLS) {
E("SSL_CTX_set_options() SSL_OP_ENABLE_KTLS failed.\n");
}
if ((SSL_CTX_set_options(ctx, SSL_OP_ENABLE_KTLS) & SSL_OP_ENABLE_KTLS) !=
SSL_OP_ENABLE_KTLS) {
E("SSL_CTX_set_options() SSL_OP_ENABLE_KTLS failed.\n");
}
}
/* Set the key and cert */
if (SSL_CTX_use_certificate_file(ctx, options.cert_fn, SSL_FILETYPE_PEM) <= 0) {
E("SSL_CTX_use_certificate_file() %s failed: %ld\n", options.cert_fn, ERR_get_error());
}
if (SSL_CTX_use_certificate_file(ctx, options.cert_fn, SSL_FILETYPE_PEM) <= 0) {
E("SSL_CTX_use_certificate_file() %s failed: %ld\n", options.cert_fn,
ERR_get_error());
}
if (SSL_CTX_use_PrivateKey_file(ctx, options.secret_key_fn, SSL_FILETYPE_PEM) <= 0 ) {
E("SSL_CTX_use_PrivateKey_file() %s failed: %ld\n", options.secret_key_fn, ERR_get_error());
}
if (SSL_CTX_use_PrivateKey_file(ctx, options.secret_key_fn, SSL_FILETYPE_PEM) <= 0) {
E("SSL_CTX_use_PrivateKey_file() %s failed: %ld\n", options.secret_key_fn,
ERR_get_error());
}
options.ssl_ctx = ctx;
options.ssl_ctx = ctx;
}
static void
listen_socket_create(std::vector<int> *socks)
static void
listen_socket_create(std::vector<int> *socks)
{
struct sockaddr_in server_addr;
int status;
@ -164,12 +194,12 @@ listen_socket_create(std::vector<int> *socks)
E("setsockopt() NODELAY %d", errno);
}
status = bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
status = bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (status < 0) {
E("bind() returned %d", errno);
}
status = listen(fd, SOCK_BACKLOG);
status = listen(fd, SOCK_BACKLOG);
if (status < 0) {
E("listen() returned %d", errno);
}
@ -178,16 +208,25 @@ listen_socket_create(std::vector<int> *socks)
}
}
static void
ppd_conn_free(struct ppd_conn *conn)
static void
ppd_conn_free_no_ctx(struct ppd_conn *conn)
{
SSL_shutdown(conn->ssl);
SSL_free(conn->ssl);
if (conn->ssl != nullptr) {
SSL_shutdown(conn->ssl);
SSL_free(conn->ssl);
}
close(conn->conn_fd);
delete conn->proc;
delete conn;
}
static void
ppd_conn_free(struct ppd_thread_ctx *tinfo, struct ppd_conn *conn)
{
void *conn_ctx = conn->m_conn_ctx;
ppd_conn_free_no_ctx(conn);
options.m_info->conn_destroy_cb(options.m_global_ctx, tinfo->m_thread_ctx, conn_ctx);
}
static void
drop_conn(struct ppd_thread_ctx *tinfo, struct kevent *kev)
{
@ -195,18 +234,18 @@ drop_conn(struct ppd_thread_ctx *tinfo, struct kevent *kev)
int conn_fd = kev->ident;
struct kevent ev;
EV_SET(&ev, conn_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
status = kevent(tinfo->kqfd, &ev, 1, 0, 0, NULL);
if (status < 0) {
E("Failed to drop connection %d from kqueue: %d\n", conn_fd, errno);
}
ppd_conn_free((struct ppd_conn *)kev->udata);
ppd_conn_free(tinfo, (struct ppd_conn *)kev->udata);
}
static int
handle_event(struct ppd_thread_ctx *tinfo, struct kevent* kev)
handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
{
int conn_fd;
int status;
@ -217,28 +256,57 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent* kev)
}
conn_fd = kev->ident;
hint = (struct ppd_conn*)kev->udata;
hint = (struct ppd_conn *)kev->udata;
struct ppd_msg *msg;
size_t out_sz;
if (kev->flags & EV_EOF) {
V("Thread %d dropped connection %d due to EOF.\n", tinfo->tid, conn_fd);
drop_conn(tinfo, kev);
return ECONNRESET;
status = ECONNRESET;
goto fail;
}
status = hint->proc->proc_req(conn_fd);
status = ppd_readmsg(conn_fd, hint->ssl, tinfo->m_buf, MBUF_SZ);
if (status != 0) {
W("Thread %d dropped connection %d due to ppd_readmsg error %d\n", tinfo->tid,
conn_fd, errno);
goto fail;
}
if (status < 0) {
W("Thread %d dropped connection %d due to req_proc error %d\n", tinfo->tid, conn_fd, status);
drop_conn(tinfo, kev);
return status;
msg = (struct ppd_msg *)tinfo->m_buf;
status = options.m_info->conn_recv_cb(msg->data, msg->size, options.m_global_ctx,
tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_recv_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
status = options.m_info->conn_send_cb(msg->data, MBUF_SZ - sizeof(struct ppd_msg), &out_sz,
options.m_global_ctx, tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_send_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
msg->size = out_sz;
status = ppd_writemsg(conn_fd, hint->ssl, msg);
if (status != 0) {
W("Thread %d dropped connection %d due to ppd_writemsg error %d\n", tinfo->tid,
conn_fd, errno);
goto fail;
}
tinfo->evcnt++;
return 0;
fail:
drop_conn(tinfo, kev);
return status;
}
static void*
worker_main(void *info)
static void *
worker_main(void *info)
{
struct ppd_thread_ctx *tinfo = (struct ppd_thread_ctx *)info;
struct kevent kev[NEVENT];
@ -247,7 +315,7 @@ worker_main(void *info)
int status;
V("Thread %d started.\n", tinfo->tid);
skev_sz = 0;
while (1) {
status = kevent(tinfo->kqfd, skev, skev_sz, kev, NEVENT, NULL);
@ -260,7 +328,8 @@ worker_main(void *info)
for (int i = 0; i < status; i++) {
if (handle_event(tinfo, &kev[i]) == 0) {
if (options.shared_kq && options.skq_flag == SINGLE_LEGACY) {
EV_SET(&skev[skev_sz], (int)kev[i].ident, EVFILT_READ, EV_ENABLE, 0, 0, kev[i].udata);
EV_SET(&skev[skev_sz], (int)kev[i].ident, EVFILT_READ,
EV_ENABLE, 0, 0, kev[i].udata);
skev_sz++;
}
}
@ -274,13 +343,13 @@ dump_options()
std::stringstream ss;
ss << "Configuration:\n"
<< " port: " << options.port << std::endl
<< " mode: " << options.mode << std::endl
<< " module: " << options.module_path << std::endl
<< " enable TLS: " << options.enable_tls << std::endl
<< " enable kTLS: " << options.enable_ktls << std::endl
<< " TLS secret key: " << options.secret_key_fn << std::endl
<< " TLS certificate: " << options.cert_fn << std::endl
<< " shared Kqueue: " << options.shared_kq << std::endl
<< " number of threads: " << CPU_COUNT(&options.cpuset) << std::endl
<< " number of threads: " << CPU_COUNT(&options.cpuset) << std::endl
<< " verbose: " << options.verbose << std::endl
<< " SKQ flags: " << options.skq_flag << std::endl
<< " SKQ dump:" << options.skq_dump << std::endl
@ -288,7 +357,7 @@ dump_options()
<< " SKQ rtfreq: " << options.skq_rtfreq << std::endl
<< " SKQ high priority clients (" << options.hpip.size() << "): " << std::endl;
for(uint i = 0; i < options.hpip.size(); i++) {
for (uint i = 0; i < options.hpip.size(); i++) {
ss << " " << options.hpip.at(i) << std::endl;
}
@ -296,32 +365,29 @@ dump_options()
}
static void
usage()
usage()
{
fprintf(stdout, "Usage:\n"
" p: server port\n"
" v: verbose mode\n"
" m: shared Kqueue across workers\n"
" s: enable TLS\n"
" k: enable kTLS\n"
" S: TLS private key file\n"
" C: TLS certificate file\n"
" c: server thread cpu list\n"
" h: show help\n"
" d: SKQ dump interval (s)\n"
" r: SKQ realtime client hostname(s)\n"
" R: SKQ rt_share\n"
" F: SKQ rt_freq\n"
" M: server mode: 0 - ECHO, 1 - TOUCH, 2 - HTTP, 3 - RDB\n"
" O: mode specific parameters in the format \"key=value\"\n"
" Workload specific parameters:\n"
" TOUCH:\n"
" ENTRIES - Number of cache-aligned entries per connection.\n\n");
fprintf(stdout,
"Usage:\n"
" p: server port\n"
" v: verbose mode\n"
" m: shared Kqueue across workers\n"
" s: enable TLS\n"
" k: enable kTLS\n"
" S: TLS private key file\n"
" C: TLS certificate file\n"
" c: server thread cpu list\n"
" h: show help\n"
" d: SKQ dump interval (s)\n"
" r: SKQ realtime client hostname(s)\n"
" R: SKQ rt_share\n"
" F: SKQ rt_freq\n"
" M: module path\n"
" O: module specific parameters of format \"key=value\"\n");
}
static void
create_workers(std::vector<struct ppd_thread_ctx*> *workers)
create_workers(std::vector<struct ppd_thread_ctx *> *workers)
{
int kq = -1;
@ -353,7 +419,7 @@ create_workers(std::vector<struct ppd_thread_ctx*> *workers)
E("freq ioctl failed. ERR %d\n", errno);
}
V("KQ IOCTL: RTSHARE %d TFREQ %d\n", options.kq_rtshare, options.kq_tfreq);
#else
#else
/* legacy single KQ only supports flag -1 */
options.skq_flag = SINGLE_LEGACY;
#endif
@ -361,13 +427,15 @@ create_workers(std::vector<struct ppd_thread_ctx*> *workers)
int tid = 0;
int core;
CPU_FOREACH_ISSET(core, &options.cpuset) {
CPU_FOREACH_ISSET (core, &options.cpuset) {
V("Creating worker thread on core %d...\n", core);
struct ppd_thread_ctx *thrd = (struct ppd_thread_ctx *)aligned_alloc(CACHE_LINE_SIZE, sizeof(struct ppd_thread_ctx));
struct ppd_thread_ctx *thrd = (struct ppd_thread_ctx *)
aligned_alloc(CACHE_LINE_SIZE, sizeof(struct ppd_thread_ctx));
thrd->evcnt = 0;
thrd->tid = tid;
options.m_info->thread_create_cb(core, options.m_global_ctx, &thrd->m_thread_ctx);
if (!options.shared_kq) {
kq = kqueue();
if (kq <= 0) {
@ -380,7 +448,7 @@ create_workers(std::vector<struct ppd_thread_ctx*> *workers)
if (status == -1) {
E("rtshare ioctl failed. ERR %d\n", errno);
}
para = KQTUNE_MAKE(KQTUNE_FREQ, options.kq_tfreq);
status = ioctl(kq, FKQTUNE, &para);
if (status == -1) {
@ -390,21 +458,26 @@ create_workers(std::vector<struct ppd_thread_ctx*> *workers)
}
thrd->kqfd = kq;
pthread_attr_t attr;
pthread_attr_init(&attr);
int status;
if ((status = pthread_attr_init(&attr)) != 0) {
E("pthread_attr_init() failed: %d\n", status);
}
cpuset_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
int status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) {
E("pthread_attr_setaffinity_np() failed : %d\n", status);
E("pthread_attr_setaffinity_np() failed : %d\n", status);
}
status = pthread_create(&thrd->thrd, &attr, worker_main, thrd);
if (status != 0) {
E("pthread_create() failed: %d\n", status);
}
workers->push_back(thrd);
tid++;
@ -412,41 +485,38 @@ create_workers(std::vector<struct ppd_thread_ctx*> *workers)
}
static void
get_ip_from_hostname(const char* hostname, char* buf, int len)
get_ip_from_hostname(const char *hostname, char *buf, int len)
{
struct in_addr **addr;
struct hostent *he;
struct in_addr **addr;
struct hostent *he;
if ((he = gethostbyname(hostname)) == NULL) {
E("Hostname %s cannot be resolved.\n", hostname);
}
addr = (struct in_addr**)he->h_addr_list;
for (int i=0;addr[i]!=NULL;i++) {
strncpy(buf, inet_ntoa(*addr[i]), len);
break;
}
if ((he = gethostbyname(hostname)) == NULL) {
E("Hostname %s cannot be resolved.\n", hostname);
}
addr = (struct in_addr **)he->h_addr_list;
for (int i = 0; addr[i] != NULL; i++) {
strncpy(buf, inet_ntoa(*addr[i]), len);
break;
}
}
static void
parse_mode_params()
parse_mod_arg(char *arg, char *argk, size_t szk, char *argv, size_t szv)
{
char * saveptr;
char *saveptr = NULL;
for (int i = 0; i < options.num_mode_params; i++) {
saveptr = NULL;
char *key = strtok_r(options.mode_params[i], "=", &saveptr);
char *val = strtok_r(NULL, "=", &saveptr);
mode_params.insert({key, val});
char *key = strtok_r(arg, "=", &saveptr);
char *val = strtok_r(NULL, "=", &saveptr);
V("Parsed workload specific parameter: %s = %s\n", key, val);
}
V("Parsed module arg: %s = %s\n", key, val);
snprintf(argk, szk, "%s", key);
snprintf(argv, szv, "%s", val);
}
static SSL *
tls_handshake_server(int conn_fd)
{
SSL * ssl;
SSL *ssl;
int r;
ssl = SSL_new(options.ssl_ctx);
@ -464,7 +534,8 @@ tls_handshake_server(int conn_fd)
if (options.enable_ktls) {
// make sure ktls is enabled
if (!(BIO_get_ktls_send(SSL_get_wbio(ssl)) && BIO_get_ktls_recv(SSL_get_rbio(ssl)))) {
if (!(BIO_get_ktls_send(SSL_get_wbio(ssl)) &&
BIO_get_ktls_recv(SSL_get_rbio(ssl)))) {
E("kTLS not enabled on %d: %ld\n", conn_fd, ERR_get_error());
}
}
@ -473,7 +544,7 @@ tls_handshake_server(int conn_fd)
}
static void
loop_main(int m_kq, std::vector<struct ppd_thread_ctx*> *workers)
loop_main(int m_kq, std::vector<struct ppd_thread_ctx *> *workers)
{
struct kevent kev;
int cur_conn = 0;
@ -505,7 +576,7 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx*> *workers)
struct sockaddr_in client_addr;
socklen_t client_addr_size = sizeof(client_addr);
int conn_fd = accept(kev.ident, (struct sockaddr*)&client_addr, &client_addr_size);
int conn_fd = accept(kev.ident, (struct sockaddr *)&client_addr, &client_addr_size);
if (conn_fd < 0) {
W("accept() failed: %d\n", errno);
@ -514,8 +585,10 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx*> *workers)
{
const int enable = 1;
if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
W("setsockopt() nodelay failed on conn %d: err %d\n", conn_fd, errno);
if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) <
0) {
W("setsockopt() nodelay failed on conn %d: err %d\n", conn_fd,
errno);
continue;
}
}
@ -529,30 +602,23 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx*> *workers)
struct ppd_conn *conn = new struct ppd_conn;
conn->conn_fd = conn_fd;
switch (options.mode) {
case WORKLOAD_TYPE::ECHO:
conn->proc = new echo_proc(&mode_params);
break;
case WORKLOAD_TYPE::TOUCH:
conn->proc = new touch_proc(&mode_params);
break;
#ifdef WITH_ROCKSDB
case WORKLOAD_TYPE::RDB:
hint->proc = new rdb_proc(&mode_params);
break;
#endif
default:
E("Unknown server mode %d", options.mode);
}
if (options.enable_tls) {
conn->ssl = tls_handshake_server(conn_fd);
V("Established TLS on connection %d...\n", conn_fd);
} else {
conn->ssl = nullptr;
}
int target_kq = workers->at(cur_conn % workers->size())->kqfd;
int ev_flags = EV_ADD;
int worker_idx = cur_conn % workers->size();
if (options.m_info->conn_create_cb(options.m_global_ctx,
workers->at(worker_idx)->m_thread_ctx, &conn->m_conn_ctx) != 0) {
W("Failed to create connection %d ctx, dropped.\n", conn_fd);
ppd_conn_free_no_ctx(conn);
continue;
}
int ev_flags = EV_ADD;
#ifdef FKQMULTI
for (uint32_t i = 0; i < options.hpip.size(); i++) {
if (strcmp(ipaddr, options.hpip.at(i)) == 0) {
@ -566,25 +632,25 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx*> *workers)
ev_flags |= EV_DISPATCH;
}
int target_kq = workers->at(worker_idx)->kqfd;
EV_SET(&kev, conn_fd, EVFILT_READ, ev_flags, 0, 0, conn);
int status = kevent(target_kq, &kev, 1, NULL, 0, NULL);
if (status == -1) {
if (kevent(target_kq, &kev, 1, NULL, 0, NULL) == -1) {
E("kevent() failed: %d\n", errno);
}
V("Connection %d assigned to thread %d\n", conn_fd, workers->at(cur_conn % workers->size())->tid);
V("Connection %d assigned to thread %d\n", conn_fd,
workers->at(cur_conn % workers->size())->tid);
cur_conn++;
}
}
int
main(int argc, char *argv[])
int
main(int argc, char *argv[])
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
SSL_library_init();
srand(time(NULL));
// don't raise SIGPIPE when sending into broken TCP connections
::signal(SIGPIPE, SIG_IGN);
::signal(SIGPIPE, SIG_IGN);
#ifdef FKQMULTI
char dbuf[1024 * 1024 + 1];
@ -592,70 +658,86 @@ main(int argc, char *argv[])
char ch;
while ((ch = getopt(argc, argv, "d:p:c:m:vhr:R:F:M:O:skS:C:")) != -1) {
switch (ch) {
case 'd':
options.skq_dump = atoi(optarg);
break;
case 'p':
options.port = atoi(optarg);
break;
case 'c':
cpulist_to_cpuset(optarg, &options.cpuset);
break;
case 'm':
options.shared_kq = 1;
options.skq_flag = atoi(optarg);
break;
case 'O': {
if (options.num_mode_params >= MAX_MODE_PARAMS) {
E("Too many workload parameters.\n");
}
snprintf(options.mode_params[options.num_mode_params], MAX_MODE_PARAMS_LEN, "%s", optarg);
options.num_mode_params++;
break;
case 'd':
options.skq_dump = atoi(optarg);
break;
case 'p':
options.port = atoi(optarg);
break;
case 'c':
cpulist_to_cpuset(optarg, &options.cpuset);
break;
case 'm':
options.shared_kq = 1;
options.skq_flag = atoi(optarg);
break;
case 'O': {
if (options.mod_argc >= MAX_MODE_PARAMS) {
E("Too many module args, max %d.\n", MAX_MODE_PARAMS);
}
case 'M':
options.mode = atoi(optarg);
break;
case 'v':
options.verbose = 1;
W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some terminals!\n");
break;
case 'r': {
char* eip = new char[INET_ADDRSTRLEN + 1];
get_ip_from_hostname(optarg, eip, INET_ADDRSTRLEN);
options.hpip.push_back(eip);
break;
}
case 'R':
options.skq_rtshare = atoi(optarg);
break;
case 'F':
options.skq_rtfreq = atoi(optarg);
break;
case 's':
options.enable_tls = 1;
break;
case 'k':
options.enable_ktls = 1;
break;
case 'S':
snprintf(options.secret_key_fn, sizeof(options.secret_key_fn), "%s", optarg);
break;
case 'C':
snprintf(options.cert_fn, sizeof(options.cert_fn), "%s", optarg);
break;
case 'h':
usage();
exit(0);
default:
E("Unrecognized option -%c. See -h.\n\n", ch);
options.mod_argk[options.mod_argc] = new char[MAX_MODE_PARAMS_LEN];
options.mod_argv[options.mod_argc] = new char[MAX_MODE_PARAMS_LEN];
parse_mod_arg(optarg, options.mod_argk[options.mod_argc],
MAX_MODE_PARAMS_LEN, options.mod_argv[options.mod_argc],
MAX_MODE_PARAMS_LEN);
options.mod_argc++;
break;
}
case 'M':
snprintf(options.module_path, sizeof(options.module_path), "%s", optarg);
break;
case 'v':
options.verbose = 1;
W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some terminals!\n");
break;
case 'r': {
char *eip = new char[INET_ADDRSTRLEN + 1];
get_ip_from_hostname(optarg, eip, INET_ADDRSTRLEN);
options.hpip.push_back(eip);
break;
}
case 'R':
options.skq_rtshare = atoi(optarg);
break;
case 'F':
options.skq_rtfreq = atoi(optarg);
break;
case 's':
options.enable_tls = 1;
break;
case 'k':
options.enable_ktls = 1;
break;
case 'S':
snprintf(options.secret_key_fn, sizeof(options.secret_key_fn), "%s",
optarg);
break;
case 'C':
snprintf(options.cert_fn, sizeof(options.cert_fn), "%s", optarg);
break;
case 'h':
usage();
exit(0);
default:
E("Unrecognized option -%c. See -h.\n\n", ch);
}
}
dump_options();
parse_mode_params();
std::vector<struct ppd_thread_ctx *> wrk_thrds;
std::vector<int> server_socks;
std::vector<int> server_socks;
if (strlen(options.module_path) == 0) {
E("Must specify module path.\n");
}
V("Loading module %s...\n", options.module_path);
options.m_info = ppd_load_module(options.module_path);
V("Loaded module \"%s\".\n", options.m_info->name);
V("Initializing module...\n");
options.m_info->global_init_cb(options.mod_argc, options.mod_argk, options.mod_argv,
&options.m_global_ctx);
V("Setting up server sockets...\n");
listen_socket_create(&server_socks);

175
ppd/util.cc Normal file
View File

@ -0,0 +1,175 @@
#include <netinet/in.h>
#include <dlfcn.h>
#include <openssl/ssl.h>
#include <stdio.h>
#include <unistd.h>
#include "logger.h"
#include "msg.h"
#include "util.h"
#include <cerrno>
struct ppd_mod_info *
ppd_load_module(const char *path)
{
void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL);
if (handle == NULL) {
E("Failed to load module %s: %s.\n", path, dlerror());
}
ppd_get_mod_info_fn fn = (ppd_get_mod_info_fn)dlfunc(handle, PPD_GET_MOD_INFO_FN);
if (fn == NULL) {
E("Failed to find symbol %s: %s\n", PPD_GET_MOD_INFO_FN, dlerror());
}
return fn();
}
static int
ppd_ssl_error_retryable(int err)
{
return (err == SSL_ERROR_WANT_READ) || (err == SSL_ERROR_WANT_WRITE) ||
(err == SSL_ERROR_WANT_CONNECT) || (err == SSL_ERROR_WANT_ACCEPT) ||
(err == SSL_ERROR_WANT_X509_LOOKUP) || (err == SSL_ERROR_WANT_CLIENT_HELLO_CB);
}
int
ppd_readbuf_ssl(SSL *ssl, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = SSL_read(ssl, buf, len)) > 0) {
buf = (char *)buf + status;
len -= status;
} else {
status = SSL_get_error(ssl, status);
if (!ppd_ssl_error_retryable(status)) {
errno = status;
return -1;
}
}
};
return 0;
}
int
ppd_writebuf_ssl(SSL *ssl, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = SSL_write(ssl, buf, len)) > 0) {
buf = (char *)buf + status;
len -= status;
} else {
status = SSL_get_error(ssl, status);
if (!ppd_ssl_error_retryable(status)) {
errno = status;
return -1;
}
}
};
return 0;
}
int
ppd_readbuf(int fd, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = recv(fd, buf, len, 0)) > 0) {
buf = (char *)buf + status;
len -= status;
} else if (status == 0) {
errno = ECONNRESET;
return -1;
} else {
if (errno != EINTR) {
return -1;
}
}
};
return 0;
}
int
ppd_writebuf(int fd, void *buf, int len)
{
int status;
while (len > 0) {
if ((status = send(fd, buf, len, 0)) > 0) {
buf = (char *)buf + status;
len -= status;
} else if (status == 0) {
errno = ECONNRESET;
return -1;
} else {
return -1;
}
};
return 0;
}
int
ppd_readmsg(int fd, SSL *ssl, char *buf, size_t len)
{
int status;
struct ppd_msg *msg = (struct ppd_msg *)buf;
if (len < sizeof(struct ppd_msg)) {
return EOVERFLOW;
}
if (ssl != nullptr) {
status = ppd_readbuf_ssl(ssl, msg, sizeof(struct ppd_msg));
} else {
status = ppd_readbuf(fd, msg, sizeof(struct ppd_msg));
}
if (status != 0) {
return status;
}
int sz = ntohl(msg->size);
msg->size = sz;
if (sz > (int)len) {
return EOVERFLOW;
}
if (((struct ppd_msg *)buf)->size > 0) {
if (ssl != nullptr) {
status = ppd_readbuf_ssl(ssl, buf, sz);
} else {
status = ppd_readbuf(fd, buf, sz);
}
}
return status;
}
int
ppd_writemsg(int fd, SSL *ssl, struct ppd_msg *msg)
{
int status;
int sz = msg->size;
msg->size = htonl(msg->size);
if (ssl != nullptr) {
status = ppd_writebuf_ssl(ssl, msg, sizeof(struct ppd_msg) + sz);
} else {
status = ppd_writebuf(fd, msg, sizeof(struct ppd_msg) + sz);
}
return status;
}

49
ppd/util.h Normal file
View File

@ -0,0 +1,49 @@
#pragma once
#include <sys/types.h>
#include <sys/param.h>
#include <sys/cpuset.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <openssl/ssl.h>
#include <stdint.h>
#include <stdio.h>
#include <time.h>
#include "mod.h"
struct ppd_mod_info *ppd_load_module(const char *path);
static inline void
cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset)
{
char *cpu = strtok(cpulist, ",");
CPU_ZERO(cpuset);
while (cpu != nullptr) {
CPU_SET(atoi(cpu), cpuset);
cpu = strtok(nullptr, ",");
}
}
int ppd_readbuf_ssl(SSL *ssl, void *buf, int len);
int ppd_writebuf_ssl(SSL *ssl, void *buf, int len);
int ppd_readbuf(int fd, void *buf, int len);
int ppd_writebuf(int fd, void *buf, int len);
int ppd_readmsg(int fd, SSL *ssl, char *buf, size_t len);
int ppd_writemsg(int fd, SSL *ssl, struct ppd_msg *msg);
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}

View File

@ -1,19 +0,0 @@
#pragma once
#include <vector>
#include <unordered_map>
#include <string>
#include "const.h"
static constexpr int MAX_MODE_PARAMS = 8;
static constexpr int MAX_MODE_PARAMS_LEN = 64;
struct server_options {
int port = 9898;
WORKLOAD_TYPE mode = WORKLOAD_TYPE::ECHO;
int verbose = 0;
int num_mode_params;
char mode_params[MAX_MODE_PARAMS][MAX_MODE_PARAMS_LEN];
std::unordered_map<std::string, std::string> parsed_mode_params;
};
extern server_options options;

View File

@ -1,315 +0,0 @@
#include <cstdlib>
#include <strings.h>
#include <vector>
#include <sstream>
#include <getopt.h>
#include <unordered_map>
#include <ff_api.h>
#include <ff_config.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "const.h"
#include "util_ff.h"
#include "reqproc.h"
#include "msg.pb.h"
#include "options.h"
static constexpr int NEVENT = 128;
static constexpr int SOCK_BACKLOG = 512;
static constexpr int DEFAULT_PORT = 9898;
struct alignas(CACHE_LINE_SIZE) cache_item {
int val;
};
static_assert(sizeof(struct cache_item) == CACHE_LINE_SIZE, "cache_item not cache line sized");
struct conn_hint {
req_proc * proc;
};
struct alignas(CACHE_LINE_SIZE) loop_info {
int kqfd;
int srv_sock;
};
static_assert(sizeof(struct loop_info) == CACHE_LINE_SIZE, "loop_info not cache line sized");
struct server_options options;
static void
conn_hint_destroy(struct conn_hint *hint)
{
delete hint->proc;
delete hint;
}
static int
server_socket_create(int port)
{
struct sockaddr_in server_addr;
int status;
const int enable = 1;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int fd = ff_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
E("server listen socket");
}
// if (ff_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) {
// E("server listen setsockopt reuseaddr");
// }
// if (ff_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) {
// E("server listen setsockopt reuseport");
// }
if (ff_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
E("server listen setsockopt NODELAY");
}
status = ff_bind(fd, (struct linux_sockaddr*)&server_addr, sizeof(server_addr));
if (status < 0) {
E("server listen bind");
}
status = ff_listen(fd, SOCK_BACKLOG);
if (status < 0) {
E("listen");
}
return fd;
}
static void
drop_conn(int kqfd, int connfd, struct conn_hint * hint)
{
int status;
struct kevent ev;
EV_SET(&ev, connfd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
status = ff_kevent(kqfd, &ev, 1, 0, 0, NULL);
if (status < 0) {
E("Failed to delete connection %d from kqueue\n", connfd);
}
ff_close(connfd);
conn_hint_destroy(hint);
}
static int
loopm(void *info)
{
struct loop_info *linfo = (struct loop_info *)info;
struct kevent kev[NEVENT];
struct kevent skev[NEVENT + 1];
std::vector<int> socks;
int skev_sz = 0;
int status;
status = ff_kevent(linfo->kqfd, NULL, 0, kev, NEVENT, NULL);
if (status < 0) {
E("main_loop: kevent() failed with %d\n", errno);
}
skev_sz = 0;
for (int i = 0; i < status; i++) {
int fd = (int)kev[i].ident;
struct conn_hint * conn_hint = (struct conn_hint *)kev[i].udata;
V("Event fired on fd %d\n", fd);
if (kev->flags & EV_EOF) {
if (fd != linfo->srv_sock) {
V("main_loop: connection %d dropped due to EOF. ERR: %d\n", fd, kev->fflags);
drop_conn(linfo->kqfd, fd, conn_hint);
continue;
} else {
E("main_loop: unexpected EOF received on server socket.\n");
}
} else if (kev->flags & EV_ERROR) {
W("main_loop: kevent() fd %d EV_ERROR set : 0x%lx. Dropping...\n", fd, kev[i].data);
continue;
}
if (fd == linfo->srv_sock) {
struct sockaddr_in client_addr;
socklen_t client_addr_size = sizeof(client_addr);
int client_fd = ff_accept(fd, (struct linux_sockaddr*)&client_addr, &client_addr_size);
if (client_fd < 0) {
W("main_loop: ff_accept() failed\n");
continue;
}
// const int enable = 1;
// if (setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) < 0) {
// W("setsockopt() failed on socket %d\n", client_fd);
// ff_close(client_fd);
// continue;
// }
char ipaddr[INET_ADDRSTRLEN + 1];
strncpy(ipaddr, inet_ntoa(client_addr.sin_addr), INET_ADDRSTRLEN);
ipaddr[INET_ADDRSTRLEN] = 0;
V("main_loop: accepted new connection %d from %s\n", client_fd, ipaddr);
struct conn_hint *hint = new struct conn_hint;
switch (options.mode) {
case WORKLOAD_TYPE::ECHO:
hint->proc = new echo_proc(&options.parsed_mode_params);
break;
case WORKLOAD_TYPE::TOUCH:
hint->proc = new touch_proc(&options.parsed_mode_params);
break;
// case WORKLOAD_TYPE::RDB:
// hint->proc = new rdb_proc(&options.parsed_mode_params);
// break;
default:
E("Unknown server mode %d", options.mode);
}
EV_SET(&skev[skev_sz], client_fd, EVFILT_READ, EV_ADD, 0, 0, hint);
skev_sz++;
} else {
status = conn_hint->proc->proc_req(fd);
if (status < 0) {
W("Connection %d proc_req returned error %d\n", fd, status);
drop_conn(linfo->kqfd, fd, conn_hint);
} else {
// EV_SET(&skev[skev_sz], fd, EVFILT_READ, EV_ENABLE, 0, 0, conn_hint);
// skev_sz++;
}
}
}
assert(skev_sz <= NEVENT + 1);
status = ff_kevent(linfo->kqfd, skev, skev_sz, NULL, 0, NULL);
return 0;
}
void dump_options()
{
std::stringstream ss;
ss << "Configuration:" << std::endl
<< " port: " << options.port << std::endl
<< " mode: " << options.mode << std::endl
<< " verbose: " << options.verbose << std::endl
<< " mode parameters: " << std::endl;
for(int i = 0; i < options.num_mode_params; i++) {
ss << " " << options.mode_params[i] << std::endl;
}
V("%s", ss.str().c_str());
}
static void
usage()
{
fprintf(stdout, "Usage:\n"
" p: listen port\n"
" v: verbose mode\n"
" h: show help\n"
" M: server mode: 0 - ECHO, 1 - TOUCH, 2 - HTTP, 3 - RDB\n"
" O: mode specific parameters in the format \"key=value\"\n"
" Workload specific parameters:\n"
" TOUCH:\n"
" ENTRIES - Number of cache-aligned entries per connection.\n\n");
}
void parse_mode_params()
{
char * saveptr;
for (int i = 0; i < options.num_mode_params; i++) {
saveptr = NULL;
char *key = strtok_r(options.mode_params[i], "=", &saveptr);
char *val = strtok_r(NULL, "=", &saveptr);
options.parsed_mode_params.insert({key, val});
V("Parsed workload parameter: %s = %s\n", key, val);
}
}
int
main(int argc, char *argv[])
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// don't raise SIGPIPE when sending into broken TCP connections
::signal(SIGPIPE, SIG_IGN);
srand(time(NULL));
ff_init(argc, argv);
char ch;
while ((ch = getopt(argc, argv, "M:h:O:vp:")) != -1) {
switch (ch) {
case 'O': {
strncpy(options.mode_params[options.num_mode_params], optarg, MAX_MODE_PARAMS_LEN);
options.num_mode_params++;
break;
}
case 'M':
options.mode = (WORKLOAD_TYPE)strtod(optarg, nullptr);
break;
case 'v':
options.verbose = 1;
W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some terminals!\n");
break;
case 'p':
options.port = strtod(optarg, nullptr);
break;
case 'h':
usage();
exit(0);
default:
E("Unrecognized option -%c. See -h.\n\n", ch);
}
}
parse_mode_params();
dump_options();
V("Setting up listen sockets...\n");
int srv_sock = server_socket_create(options.port);
int kq = ff_kqueue();
if (kq <= 0) {
E("Cannot create kqueue\n");
}
struct kevent kev;
EV_SET(&kev, srv_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
int status = ff_kevent(kq, &kev, 1, NULL, 0, NULL);
if (status == -1) {
E("kevent() add srv_sock failed %d\n", errno);
}
V("Entering main event loop...\n");
struct loop_info info;
info.kqfd = kq;
info.srv_sock = srv_sock;
ff_run(loopm, &info);
// shouldn't get here
assert(false);
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -1,305 +0,0 @@
#include <arpa/inet.h>
#include <atomic>
#include <cassert>
#include <cerrno>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <new>
#include <random>
#include <stdint.h>
#include <sstream>
#include <sys/endian.h>
#include <sys/param.h>
#include <chrono>
#include <unistd.h>
// #include <rocksdb/cache.h>
// #include <rocksdb/status.h>
// #include <rocksdb/cache.h>
// #include <rocksdb/db.h>
// #include <rocksdb/env.h>
// #include <rocksdb/filter_policy.h>
// #include <rocksdb/memtablerep.h>
// #include <rocksdb/options.h>
// #include <rocksdb/perf_context.h>
// #include <rocksdb/persistent_cache.h>
// #include <rocksdb/rate_limiter.h>
// #include <rocksdb/slice.h>
// #include <rocksdb/slice_transform.h>
// #include <rocksdb/stats_history.h>
// #include <rocksdb/utilities/object_registry.h>
// #include <rocksdb/utilities/optimistic_transaction_db.h>
// #include <rocksdb/utilities/options_util.h>
// #include <rocksdb/utilities/sim_cache.h>
// #include <rocksdb/utilities/transaction.h>
// #include <rocksdb/utilities/transaction_db.h>
// #include <rocksdb/write_batch.h>
// #include <rocksdb/table.h>
#include "msg.pb.h"
#include "util_ff.h"
#include "const.h"
#include "reqproc.h"
#include "options.h"
////////////////
// TOUCH Generator
////////////////
touch_proc::touch_proc(std::unordered_map<std::string, std::string>* args) : req_proc()
{
if (args->find(PARAM_TBSZ) != args->end()) {
this->buffer_sz = atoi(args->at(PARAM_TBSZ).c_str());
} else {
this->buffer_sz = PARAM_TBSZ_DEFAULT;
}
V("Allocating %d items x %d CASZ for connnection\n", this->buffer_sz, CACHE_LINE_SIZE);
this->buffer = (struct ppd_touch_cache_item *)aligned_alloc(CACHE_LINE_SIZE, this->buffer_sz * sizeof(struct ppd_touch_cache_item));
this->rgen = new std::default_random_engine();
this->rdist = new std::uniform_int_distribution<int>(0, INT32_MAX);
}
touch_proc::~touch_proc()
{
delete this->buffer;
delete this->rgen;
delete this->rdist;
}
int touch_proc::proc_req(int fd)
{
ppd_touch_req req;
ppd_touch_resp resp;
struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
int ret = readmsg(fd, this->read_buf, this->MAX_READ_BUF_SIZE);
if (ret < 0) {
W("Readmsg failed with %d for connection %d\n", ret, fd);
return ret;
}
if (!req.ParseFromArray(msg->payload, msg->size)) {
W("ParseFromArray failed for connection %d\n", fd);
return EINVAL;
}
V("Conn %d touching %d items...\n", fd, req.touch_cnt());
int sum = 0;
int rand = (*this->rdist)(*this->rgen);
for(int64_t i = rand; i < rand + req.touch_cnt(); i++) {
if (req.inc() > 0) {
this->buffer[i % this->buffer_sz].val += 1;
} else {
/* only read */
sum += this->buffer[i % this->buffer_sz].val;
}
}
sum = sum + 1;
resp.set_status(0);
if (!resp.SerializeToArray(this->read_buf, MAX_READ_BUF_SIZE)) {
W("Couldn't searialize to array connection %d\n", fd);
}
ret = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
if (ret < 0) {
W("Writemsg failed with %d for connection %d\n", ret, fd);
}
return ret;
}
////////////////
// ECHO Generator
////////////////
echo_proc::echo_proc(std::unordered_map<std::string, std::string>*) : req_proc()
{
}
echo_proc::~echo_proc()
{
}
int echo_proc::proc_req(int fd)
{
uint64_t ms1, ms2;
struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
int ret = readmsg(fd, this->read_buf, MAX_READ_BUF_SIZE);
if (ret < 0) {
W("Readbuf failed with %d for connection %d\n", ret, fd);
return ret;
}
ms1 = get_time_us();
if (!req.ParseFromArray(msg->payload, msg->size)) {
W("ParseFromArray failed for connection %d\n", fd);
return EINVAL;
}
V("Connection %d delay %d us \n", fd, req.enable_delay());
if (req.enable_delay() > 0) {
uint64_t server_delay = req.enable_delay();
uint64_t now = get_time_us();
while (get_time_us() - now <= server_delay) {};
}
resp.set_status(0);
if (!resp.SerializeToArray(this->read_buf, MAX_READ_BUF_SIZE)) {
W("Couldn't searialize to array connection %d\n", fd);
}
ms2 = get_time_us();
V("Serialization: TIME: %ld\n", ms2 - ms1);
ret = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
if (ret < 0) {
W("Writemsg failed with %d for connection %d\n", ret, fd);
}
return ret;
}
// ////////////////
// // rdb Generator
// ////////////////
// rocksdb::DB * rdb_proc::db = nullptr;
// std::atomic<int> rdb_proc::db_init {0};
// rdb_proc::rdb_proc(std::unordered_map<std::string, std::string>* args) : req_proc()
// {
// const char * db_path;
// int desired = 0;
// int target = 1;
// if (std::atomic_compare_exchange_strong(&rdb_proc::db_init, &desired, target)) {
// if (args->find(PARAM_PATH) != args->end()) {
// db_path = args->at(PARAM_PATH).c_str();
// } else {
// E("Must specify -OPATH for rocksdb.\n");
// }
// V("Initializing rocksdb, path: %s.\n", db_path);
// rocksdb::Options opt;
// std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(CACHE_SIZE, 6, false, 0.0);
// opt.use_direct_io_for_flush_and_compaction = USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION;
// opt.use_direct_reads = USE_DIRECT_READS;
// rocksdb::BlockBasedTableOptions block_based_options;
// block_based_options.index_type = rocksdb::BlockBasedTableOptions::kBinarySearch;
// block_based_options.block_cache = cache;
// opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(block_based_options));
// opt.IncreaseParallelism(12);
// opt.OptimizeLevelStyleCompaction(1024 * 1024 * 1024);
// opt.OptimizeUniversalStyleCompaction(1024 * 1024 * 1024);
// opt.write_buffer_size = 1024 * 1024 * 1024;
// opt.create_if_missing = false;
// opt.compression = rocksdb::kNoCompression;
// rocksdb::Status s = rocksdb::DB::Open(opt, std::string(db_path), &this->db);
// if (!s.ok()) {
// E("Could not open rocksdb! Err %s\n", s.ToString().c_str());
// }
// rdb_proc::db_init.store(2);
// V("Finished initializing rocksdb.\n");
// } else {
// V("Checking for rocksdb initialization...\n");
// while(rdb_proc::db_init.load() != 2) {};
// V("Detected initialized rocksdb.\n");
// }
// }
// rdb_proc::~rdb_proc()
// {
// }
// int rdb_proc::proc_req(int fd)
// {
// ppd_rdb_resp resp;
// ppd_rdb_req req;
// rocksdb::Status s;
// struct ppd_msg *msg = (struct ppd_msg *)this->read_buf;
// int i = 0;
// int status = readmsg(fd, this->read_buf, MAX_READ_BUF_SIZE);
// if (status != 0) {
// W("Readmsg failed with %d for connection %d\n", status, fd);
// return status;
// }
// if (!req.ParseFromArray(msg->payload, msg->size)) {
// W("ParseFromArray failed for connection %d\n", fd);
// return EINVAL;
// }
// V("Connection %d op: %d, key: %s. val: %s. optarg: %d.\n", fd, req.op(), req.key().c_str(), req.val().c_str(), req.optarg());
// switch (req.op()) {
// case PPD_RDB_OP_PUT:{
// s = this->db->Put(rocksdb::WriteOptions(), req.key(), req.val());
// resp.set_status(s.code());
// break;
// }
// case PPD_RDB_OP_GET: {
// std::string val;
// s = this->db->Get(rocksdb::ReadOptions(), req.key(), &val);
// if (s.ok()) {
// resp.set_result(val);
// }
// resp.set_status(s.code());
// break;
// }
// case PPD_RDB_OP_SEEK: {
// rocksdb::Slice val;
// rocksdb::Iterator *it = this->db->NewIterator(rocksdb::ReadOptions(false, true));
// it->Seek(req.key());
// resp.set_status(it->status().code());
// if (it->Valid()) {
// val = it->value();
// resp.set_result(val.data(), val.size());
// }
// for(int64_t j = 0; j < req.optarg() && it->Valid(); j++) {
// rocksdb::Slice val = it->value();
// // do something about the key
// std::memcpy(this->read_buf, val.data(), MIN(val.size(), MAX_READ_BUF_SIZE));
// it->Next();
// if (!it->status().ok()) {
// resp.set_status(it->status().code());
// break;
// }
// }
// delete it;
// break;
// }
// default: {
// W("Invalid opcode %d for connection %d\n", req.op(), fd);
// return EINVAL;
// }
// }
// resp.set_status(0);
// status = writemsg(fd, this->read_buf, MAX_READ_BUF_SIZE, this->read_buf, resp.ByteSizeLong());
// if (status < 0) {
// W("Writemsg failed with %d for connection %d\n", status, fd);
// }
// return status;
// }

View File

@ -1,78 +0,0 @@
#pragma once
#include <cstdint>
#include <string>
#include <random>
#include <unordered_map>
#include <const.h>
#include <atomic>
#include <sys/param.h>
#include <msg.pb.h>
#define DISABLE_EVIL_CONSTRUCTORS(name) \
name(const name&) = delete; \
void operator=(const name) = delete
struct alignas(CACHE_LINE_SIZE) ppd_touch_cache_item {
int val;
};
class req_proc {
protected:
constexpr static int MAX_READ_BUF_SIZE = 1024 * 1024;
char * read_buf;
public:
req_proc() {this->read_buf = new char[MAX_READ_BUF_SIZE]; };
virtual ~req_proc() {delete[] this->read_buf;};
virtual int proc_req(int fd) = 0;
};
class touch_proc : public req_proc
{
private:
int buffer_sz;
std::default_random_engine * rgen;
std::uniform_int_distribution<int> * rdist;
struct ppd_touch_cache_item* buffer;
static constexpr const char* PARAM_TBSZ = "ENTRIES";
static constexpr const int PARAM_TBSZ_DEFAULT = 64;
constexpr static int MAX_SZ = 1024 * 1024;
public:
touch_proc(std::unordered_map<std::string, std::string>* args);
touch_proc() = delete;
~touch_proc();
DISABLE_EVIL_CONSTRUCTORS(touch_proc);
int proc_req(int fd);
};
class echo_proc : public req_proc
{
private:
ppd_echo_req req;
ppd_echo_resp resp;
public:
echo_proc(std::unordered_map<std::string, std::string>* args);
echo_proc() = delete;
~echo_proc();
DISABLE_EVIL_CONSTRUCTORS(echo_proc);
int proc_req(int fd);
};
// class rdb_proc : public req_proc
// {
// private:
// constexpr static bool USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION = true;
// constexpr static bool USE_DIRECT_READS = true;
// constexpr static int CACHE_SIZE = 268435456;
// constexpr static int MAX_MSG_SZ = 4096;
// static constexpr const char* PARAM_PATH = "PATH";
// static std::atomic<int> db_init;
// static rocksdb::DB *db;
// public:
// rdb_proc(std::unordered_map<std::string, std::string>* args);
// rdb_proc() = delete;
// ~rdb_proc();
// DISABLE_EVIL_CONSTRUCTORS(rdb_proc);
// int proc_req(int fd);
// };