This commit is contained in:
oscar 2023-02-24 18:25:52 -05:00
parent 0951aafc4c
commit fb3ece074c
7 changed files with 123 additions and 84 deletions

View File

@ -24,7 +24,6 @@
#include <pthread.h>
#include <pthread_np.h>
#include "msg.h"
#include "openssl/ssl.h"
#include "openssl/err.h"
#include "util.h"
@ -62,7 +61,7 @@ struct dsmbr_request_record {
uint64_t epoch;
};
struct dsmbr_ctrl_msg{
struct dsmbr_ctrl_msg {
int code;
char data[32]; //fixed size for simplicity
};
@ -521,7 +520,7 @@ void dsmbr_handle_event(struct dsmbr_thread_ctx *tinfo, struct kevent *kev)
}
void
worker_main(void * ctx)
dsmbr_worker_main(void * ctx)
{
struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx * tinfo)ctx;
@ -537,7 +536,7 @@ worker_main(void * ctx)
conns.at(i)->next_send = get_time_us();
}
while(1) {
while(true) {
struct kevent kevs[NEVENT];
if (kevent(kq, NULL, 0, &ev, 1, NULL) == 1) {
struct kqconn *conn = (struct kqconn *)ev.udata;
@ -546,7 +545,7 @@ worker_main(void * ctx)
E("Connection %d dropped due to EV_EOF. ERR: %d\n", conn->conn_fd, ev.fflags);
}
handle_event(id, &ev, conn);
dsmbr_handle_event(id, &ev, conn);
} else {
E("Thread %d kevent failed. ERR %d\n", id, errno);
}
@ -716,7 +715,7 @@ static void wait_master_prepare()
E("setsockopt reuseport");
}
if (::bind(listen_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) < 0) {
if (bind(listen_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) < 0) {
E("bind");
}

9
dsmbr/msg.h Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include <stdint.h>
#include <sys/endian.h>
struct dsmbr_msg {
uint32_t size;
char data[0];
};

View File

@ -6,6 +6,10 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <dlfcn.h>
#include <stdlib.h>
#include "logger.h"
typedef int (*ppd_global_init_cb)(int argc, char** argk, char** argv, void **ctx);
@ -43,6 +47,26 @@ typedef struct ppd_mod_info * (*ppd_get_mod_info_fn)(void);
#define PPD_GET_MOD_INFO_ID ppd_getmod_info
#define PPD_GET_MOD_INFO_FN ("##PPD_GET_MOD_INFO_ID")
#define inline inline __attribute__((unused))
static inline struct ppd_mod_info *
ppd_load_module(const char *path)
{
void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL);
if (handle == NULL) {
E("Failed to load module %s: %s.\n", path, dlerror());
}
ppd_get_mod_info_fn fn = (ppd_get_mod_info_fn)dlfunc(handle, PPD_GET_MOD_INFO_FN);
if (fn == NULL) {
E("Failed to find symbol %s: %s\n", PPD_GET_MOD_INFO_FN, dlerror());
}
return fn();
}
#undef inline
#ifdef __cplusplus
}
#endif

41
include/util.h Normal file
View File

@ -0,0 +1,41 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <sys/param.h>
#include <sys/cpuset.h>
#define inline inline __attribute__((unused))
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
static inline void
cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset)
{
char *cpu = strtok(cpulist, ",");
CPU_ZERO(cpuset);
while (cpu != nullptr) {
CPU_SET(atoi(cpu), cpuset);
cpu = strtok(nullptr, ",");
}
}
#undef inline
#ifdef __cplusplus
}
#endif

View File

@ -1,6 +1,5 @@
#include <netinet/in.h>
#include <dlfcn.h>
#include <openssl/ssl.h>
#include <stdio.h>
#include <unistd.h>
@ -8,27 +7,11 @@
#include "bsock/bsock.h"
#include "logger.h"
#include "msg.h"
#include "util.h"
#include "io.h"
#include <algorithm>
#include <cerrno>
struct ppd_mod_info *
ppd_load_module(const char *path)
{
void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL);
if (handle == NULL) {
E("Failed to load module %s: %s.\n", path, dlerror());
}
ppd_get_mod_info_fn fn = (ppd_get_mod_info_fn)dlfunc(handle, PPD_GET_MOD_INFO_FN);
if (fn == NULL) {
E("Failed to find symbol %s: %s\n", PPD_GET_MOD_INFO_FN, dlerror());
}
return fn();
}
static ssize_t
ppd_read_ssl(void * _ctx, void *buf, size_t len)
{

View File

@ -14,20 +14,6 @@
#include "mod.h"
struct ppd_mod_info *ppd_load_module(const char *path);
static inline void
cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset)
{
char *cpu = strtok(cpulist, ",");
CPU_ZERO(cpuset);
while (cpu != nullptr) {
CPU_SET(atoi(cpu), cpuset);
cpu = strtok(nullptr, ",");
}
}
struct ppd_bsock_io_ssl_ctx {
SSL * ssl;
char * ssl_readbuf;
@ -48,11 +34,3 @@ int ppd_readmsg(struct bsock *bsock, char *buf, size_t len);
int ppd_writemsg(struct bsock *bsock, struct ppd_msg *msg);
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}

View File

@ -21,6 +21,7 @@
#include "mod.h"
#include "msg.h"
#include "util.h"
#include "io.h"
#include <cerrno>
#include <csignal>
@ -287,47 +288,49 @@ handle_event(struct ppd_thread_ctx *tinfo, struct kevent *kev)
goto fail;
}
status = ppd_readmsg(hint->bsock, tinfo->m_buf, MBUF_SZ);
if (status != 0) {
if (errno == ERANGE) {
// not enough data yet. try again later.
goto end;
} else {
W("Thread %d dropped connection %d due to ppd_readmsg error %d\n", tinfo->tid, conn_fd, errno);
while (true) {
status = ppd_readmsg(hint->bsock, tinfo->m_buf, MBUF_SZ);
if (status != 0) {
if (errno == ERANGE) {
// not enough data yet. try again later.
goto end;
} else {
W("Thread %d dropped connection %d due to ppd_readmsg error %d\n", tinfo->tid, conn_fd, errno);
goto fail;
}
}
msg = (struct ppd_msg *)tinfo->m_buf;
status = options.m_info->conn_recv_cb(msg->data, msg->size, options.m_global_ctx,
tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_recv_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
}
msg = (struct ppd_msg *)tinfo->m_buf;
status = options.m_info->conn_recv_cb(msg->data, msg->size, options.m_global_ctx,
tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_recv_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
status = options.m_info->conn_send_cb(msg->data, MBUF_SZ - sizeof(struct ppd_msg), &out_sz,
options.m_global_ctx, tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_send_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
status = options.m_info->conn_send_cb(msg->data, MBUF_SZ - sizeof(struct ppd_msg), &out_sz,
options.m_global_ctx, tinfo->m_thread_ctx, hint->m_conn_ctx);
if (status != 0) {
W("Thread %d dropped connection %d due to conn_send_cb error %d\n", tinfo->tid,
conn_fd, status);
goto fail;
}
msg->size = out_sz;
status = ppd_writemsg(hint->bsock, msg);
if (status != 0) {
// shouldn't be error here unless msg is too big to fit in bsock buffer
W("Thread %d dropped connection %d due to ppd_writemsg error %d\n", tinfo->tid, conn_fd, errno);
goto fail;
}
msg->size = out_sz;
status = ppd_writemsg(hint->bsock, msg);
if (status != 0) {
// shouldn't be error here unless msg is too big to fit in bsock buffer
W("Thread %d dropped connection %d due to ppd_writemsg error %d\n", tinfo->tid, conn_fd, errno);
goto fail;
}
// flush bsock immediately
status = bsock_flush(hint->bsock);
if (status <= 0) {
W("Thread %d dropped connection %d due to bsock_flush ret %d errno %d\n", tinfo->tid, conn_fd, status, errno);
goto fail;
// flush bsock immediately
status = bsock_flush(hint->bsock);
if (status <= 0) {
W("Thread %d dropped connection %d due to bsock_flush ret %d errno %d\n", tinfo->tid, conn_fd, status, errno);
goto fail;
}
}
tinfo->evcnt++;
@ -581,7 +584,9 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx *> *workers)
{
struct kevent kev;
int cur_conn = 0;
long cur_ts = 0;
#ifdef FKQMULTI
int cur_ts = 0;
#endif
while (1) {
if (kevent(m_kq, NULL, 0, &kev, 1, NULL) != 1) {
E("kevent() failed: %d\n", errno);
@ -600,8 +605,8 @@ loop_main(int m_kq, std::vector<struct ppd_thread_ctx *> *workers)
fprintf(stdout, "====== KQ DUMP ======\n%s\n", dbuf);
}
}
#endif
cur_ts++;
#endif
continue;
}