diff --git a/dsmbr/dmsg.cc b/dsmbr/dmsg.cc index df1c212..41d19cc 100644 --- a/dsmbr/dmsg.cc +++ b/dsmbr/dmsg.cc @@ -1,3 +1,4 @@ +#include #include "dmsg.h" #include "io.h" #include @@ -6,20 +7,13 @@ int dsmbr_send_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg) { - return ppd_writebuf(sockfd, &msg, sizeof(struct dsmbr_ctrl_msg)); -} + msg->code = htonl(msg->code); -int -dsmbr_sendall_ctrl_msg(int * sockfds, int count, int code) -{ - int status = 0; - for(int i = 0; i < count; i++) { - status = dsmbr_send_ctrl_msg(sockfds[i], code, nullptr, 0); - if (status != 0) { - break; - } + for (int i = 0; i < DSMBR_MSG_MAX_DATA; i++) { + msg->data[i] = htobe64(msg->data[i]); } - return status; + + return ppd_writebuf(sockfd, &msg, sizeof(struct dsmbr_ctrl_msg)); } int @@ -29,21 +23,11 @@ dsmbr_recv_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg) if (status == 0) { msg->code = ntohl(msg->code); - } - return status; -} - -int -dsmbr_recvall_ctrl_msg(int * sockfds, int count, struct dsmbr_ctrl_msg * msgs) -{ - int status = 0; - for(int i = 0; i < count; i++) { - status = dsmbr_recv_ctrl_msg(sockfds[i], &msgs[i]); - if (status != 0) { - break; + for (int i = 0; i < DSMBR_MSG_MAX_DATA; i++) { + msg->data[i] = be64toh(msg->data[i]); } } + return status; } - diff --git a/dsmbr/dmsg.h b/dsmbr/dmsg.h index 387b09b..553ada8 100644 --- a/dsmbr/dmsg.h +++ b/dsmbr/dmsg.h @@ -5,6 +5,7 @@ static constexpr int CTRL_SYNC = 0x1234; static constexpr int CTRL_ACK = 0x2345; +static constexpr int CTRL_START = 0x5678; static constexpr int CTRL_STOP = 0x3456; static constexpr int CTRL_STAT = 0x4567; @@ -14,14 +15,9 @@ struct dsmbr_ctrl_msg { uint64_t data[DSMBR_MSG_MAX_DATA]; }; -int -dsmbr_send_ctrl_msg(int sockfd, int code, char * data, size_t len); -int -dsmbr_sendall_ctrl_msg(int * sockfds, int count, int code); +int +dsmbr_send_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg); int dsmbr_recv_ctrl_msg(int sockfd, struct dsmbr_ctrl_msg * msg); - -int -dsmbr_recvall_ctrl_msg(int * sockfds, int count, struct dsmbr_ctrl_msg * msgs); \ No newline at end of file diff --git a/dsmbr/dsmbr.cc b/dsmbr/dsmbr.cc index 5673d30..886bf3f 100755 --- a/dsmbr/dsmbr.cc +++ b/dsmbr/dsmbr.cc @@ -1,41 +1,43 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include #include -#include +#include +#include #include +#include +#include + #include #include + #include -#include -#include +#include +#include #include +#include +#include #include #include -#include -#include -#include +#include +#include -#include "util.h" +#include "dmsg.h" #include "generator.h" +#include "io.h" #include "logger.h" #include "mod.h" #include "msg.h" -#include "io.h" -#include "dmsg.h" +#include "util.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include static constexpr int MAX_MOD_ARGS = 32; static constexpr int MAX_MOD_ARG_LEN = 128; @@ -45,6 +47,8 @@ static constexpr int CTRL_PORT = 15367; static constexpr int CTRL_TIMEOUT = 3; static constexpr int CTRL_BACKLOG = 4096; static constexpr int BSOCK_BUF_SZ = 4096; +static constexpr int THRD_INIT_PARALLEL = 2; +static constexpr int TIMER_MAGIC = 0x123456; static constexpr int STATE_WAITING = 0; static constexpr int STATE_RUN = 1; @@ -52,12 +56,9 @@ static constexpr int STATE_STOP = 2; struct dsmbr_thread_ctx { int tid; - int kqfd; - - struct ppd_msg * mbuf; + int core; + std::list stats; pthread_t thrd; - void * m_ctx; - std::vector conns; }; struct dsmbr_request_record { @@ -65,29 +66,28 @@ struct dsmbr_request_record { uint64_t send_size; uint64_t send_ts; uint64_t recv_ts; - uint64_t epoch; }; struct dsmbr_conn { Generator *ia_gen; - SSL* ssl; - char * ssl_readbuf; + SSL *ssl; + char *ssl_readbuf; int conn_fd; - struct bsock * bsock; + struct bsock *bsock; struct ppd_bsock_io_ssl_ctx bsock_ctx; - void * m_ctx; + void *m_ctx; uint64_t next_send; - std::vector stats; std::list req_in_flight; }; struct dsmbr_slave { int conn_fd; /* stats */ - uint64_t runtime; - uint64_t avg_pkt_sz; - uint64_t pkt_cnt; + uint64_t time; + uint64_t send_sz; + uint64_t recv_sz; + uint64_t reqs; }; struct dsmbr_options { @@ -103,54 +103,47 @@ struct dsmbr_options { char output_file[128]; cpuset_t cpuset; char module_path[128]; - char* mod_argk[MAX_MOD_ARGS]; - char* mod_argv[MAX_MOD_ARGS]; + char mod_argk[MAX_MOD_ARGS][64]; + char mod_argv[MAX_MOD_ARGS][64]; int mod_argc; int enable_tls; + int master_mode; + int slave_mode; // master mode - char * slave_ips[MAX_SLAVES]; + char slave_ips[MAX_SLAVES][64]; int num_slaves; - + // global states std::atomic state = STATE_WAITING; - int is_master; - int is_slave; - int slave_sockfds[MAX_SLAVES]; - int slave_ctrl_sock; - SSL_CTX * ssl_ctx; - - ppd_mod_info * m_info; - void * m_global_ctx; - std::vector workers; + sem_t worker_init_sem; + std::atomic worker_init_comp = 0; + SSL_CTX *ssl_ctx; + ppd_mod_info *m_info; + void *m_global_ctx; }; -static struct dsmbr_options options = { - .verbose = 0, - .server_ip = "127.0.0.1", - .conn_per_thread = 1, - .target_qps = 0, - .depth = 1, - .warmup = 0, - .duration = 5, - .ia_dist = "fb_ia", - .output_file = "samples.txt", - .cpuset = CPUSET_T_INITIALIZER(1), - .module_path = {0}, - .mod_argc = 0, - .enable_tls = 0, - .slave_ips = {nullptr}, - .num_slaves = 0, - .is_master = 0, - .is_slave = 0, - .slave_sockfds = {-1}, - .slave_ctrl_sock = -1, - .ssl_ctx = nullptr, - .m_info = nullptr, - .m_global_ctx = nullptr -}; +static struct dsmbr_options options = { .verbose = 0, + .server_ip = "127.0.0.1", + .conn_per_thread = 1, + .target_qps = 0, + .depth = 1, + .warmup = 0, + .duration = 5, + .ia_dist = "fb_ia", + .output_file = "samples.txt", + .cpuset = CPUSET_T_INITIALIZER(1), + .module_path = { 0 }, + .mod_argc = 0, + .enable_tls = 0, + .master_mode = 0, + .slave_mode = 0, + .num_slaves = 0, + .ssl_ctx = nullptr, + .m_info = nullptr, + .m_global_ctx = nullptr }; -static void +static void dsmbr_dump_options() { // V ("Configuration:\n" @@ -185,7 +178,6 @@ dsmbr_dump_options() return; } - static SSL * dsmbr_tls_handshake_client(int conn_fd) { @@ -208,7 +200,6 @@ dsmbr_tls_handshake_client(int conn_fd) return ssl; } - static void dsmbr_create_tls_context() { @@ -225,17 +216,16 @@ dsmbr_create_tls_context() options.ssl_ctx = ctx; } - -static struct dsmbr_conn * -dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) +static struct dsmbr_conn * +dsmbr_conn_create(void *thrd_ctx) { struct sockaddr_in server_addr; - SSL * ssl = nullptr; + SSL *ssl = nullptr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(options.server_port); server_addr.sin_addr.s_addr = inet_addr(options.server_ip); - + int enable = 1; struct timeval tv = { .tv_sec = CTRL_TIMEOUT, .tv_usec = 0 }; @@ -243,9 +233,9 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) if (conn_fd == -1) { E("socket() error: %d\n", errno); } - if (setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)) != 0) { + if (setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)) != 0) { E("setsockopt() rcvtimeo: %d\n", errno); - } + } if (setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) != 0) { E("setsockopt() reuseaddr: %d\n", errno); } @@ -255,16 +245,15 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) if (setsockopt(conn_fd, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) != 0) { E("setsockopt() nodelay: %d\n", errno); } - if (connect(conn_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) != 0) { + if (connect(conn_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0) { E("connect() failed: %d\n", errno); } V("Established client connection %d...", conn_fd); - - struct dsmbr_conn * conn = new struct dsmbr_conn; + struct dsmbr_conn *conn = new struct dsmbr_conn; conn->conn_fd = conn_fd; struct bsock_ringbuf_io io; - void * bsock_ctx = nullptr; + void *bsock_ctx = nullptr; if (options.enable_tls) { V("Initiating TLS handshake on connection %d...", conn_fd); conn->ssl = dsmbr_tls_handshake_client(conn_fd); @@ -280,77 +269,86 @@ dsmbr_conn_create(struct dsmbr_thread_ctx * thrd_ctx) conn->ssl = nullptr; bsock_ctx = (void *)(uintptr_t)conn_fd; } - + conn->bsock = bsock_create(bsock_ctx, &io, BSOCK_BUF_SZ, BSOCK_BUF_SZ); conn->ia_gen = createGenerator(options.ia_dist); if (conn->ia_gen == NULL) { E("Failed to create generator \"%s\"\n", options.ia_dist); } - conn->ia_gen->set_lambda((double)options.target_qps / (double)(options.conn_per_thread * CPU_COUNT(&options.cpuset))); + conn->ia_gen->set_lambda((double)options.target_qps / + (double)(options.conn_per_thread * CPU_COUNT(&options.cpuset))); int status; - if ((status = options.m_info->conn_create_cb(options.m_global_ctx, thrd_ctx->m_ctx, &conn->m_ctx)) != 0) { + if ((status = options.m_info->conn_create_cb(options.m_global_ctx, thrd_ctx, + &conn->m_ctx)) != 0) { E("Failed to create conn m_ctx: %d\n", status); } - struct kevent kev; - EV_SET(&kev, conn->conn_fd, EVFILT_READ, EV_ADD, 0, 0, conn); - if (kevent(thrd_ctx->kqfd, &kev, 1, NULL, 0, NULL) == -1) { - E("kevent() failed: %d\n", errno); - } - conn->next_send = 0; - thrd_ctx->conns.push_back(conn); return conn; } static void -dsmbr_conn_free(struct dsmbr_thread_ctx *ctx, struct dsmbr_conn *conn) +dsmbr_conn_free(struct dsmbr_conn *conn, void *thrd_ctx) { - struct kevent kev; - EV_SET(&kev, conn->conn_fd, EVFILT_READ, EV_DELETE, 0, 0, conn); - if (kevent(ctx->kqfd, &kev, 1, NULL, 0, NULL) == -1) { - E("kevent() failed: %d\n", errno); - } - if (conn->ssl != nullptr) { SSL_shutdown(conn->ssl); SSL_free(conn->ssl); delete[] conn->ssl_readbuf; } + options.m_info->conn_destroy_cb(options.m_global_ctx, thrd_ctx, conn->m_ctx); bsock_free(conn->bsock); close(conn->conn_fd); - - for (auto it = ctx->conns.begin(); it != ctx->conns.end(); ) { - if (conn == *it) { - it = ctx->conns.erase(it); - } else { - ++it; - } - } - delete conn->ia_gen; delete conn; } void * -dsmbr_worker_main(void * ctx) +dsmbr_worker_main(void *ctx) { - struct dsmbr_thread_ctx * tinfo = (struct dsmbr_thread_ctx *)ctx; - struct timespec timesp = {.tv_sec = 0, .tv_nsec = 0}; + struct dsmbr_thread_ctx *tinfo = (struct dsmbr_thread_ctx *)ctx; + struct timespec timesp = { .tv_sec = 0, .tv_nsec = 0 }; struct kevent kevs[NEVENT]; + std::vector conns; + sem_wait(&options.worker_init_sem); + V("Thread %d initializing...\n", tinfo->tid); + int kqfd = kqueue(); + if (kqfd != 0) { + E("Thread %d kqueue() failed: %d\n", tinfo->tid, errno); + } + struct ppd_msg *mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ]; + + void *m_ctx; + if (options.m_info->thread_create_cb(tinfo->core, options.m_global_ctx, &m_ctx) != 0) { + E("thread_create_cb() failed: %d\n", errno); + } + + for (int i = 0; i < options.conn_per_thread; i++) { + struct dsmbr_conn *conn = dsmbr_conn_create(m_ctx); + conns.push_back(conn); + + struct kevent kev; + EV_SET(&kev, conn->conn_fd, EVFILT_READ, EV_ADD, 0, 0, conn); + if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) { + E("kevent() failed: %d\n", errno); + } + } + options.worker_init_comp.fetch_add(1, std::memory_order_relaxed); + sem_post(&options.worker_init_sem); + + // create connections V("Thread %d waiting for start...\n", tinfo->tid); - while(options.state.load(std::memory_order_relaxed) == STATE_WAITING) { + while (options.state.load(std::memory_order_relaxed) == STATE_WAITING) { }; - for (struct dsmbr_conn * conn : tinfo->conns) { + for (struct dsmbr_conn *conn : conns) { conn->next_send = get_time_us(); } V("Thread %d starting...\n", tinfo->tid); - while(true) { + while (true) { int ret = 0; struct kevent kev; @@ -359,74 +357,110 @@ dsmbr_worker_main(void * ctx) } // read resp first if possible - if ((ret = kevent(tinfo->kqfd, NULL, 0, kevs, NEVENT, ×p)) < 0) { + if ((ret = kevent(kqfd, NULL, 0, kevs, NEVENT, ×p)) < 0) { E("Thread %d kevent() error %d.\n", tinfo->tid, errno); } int size = ret; for (int i = 0; i < size; i++) { - struct dsmbr_conn * conn = (struct dsmbr_conn *)kevs[i].udata; + struct dsmbr_conn *conn = (struct dsmbr_conn *)kevs[i].udata; if (kevs[i].flags & EV_EOF) { - E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid, conn->conn_fd, kevs[i].fflags); + E("Thread %d connection %d EV_EOF ERR: %d.\n", tinfo->tid, + conn->conn_fd, kevs[i].fflags); } ret = bsock_poll(conn->bsock); if (ret < 0) { - E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid, conn->conn_fd, ret); + E("Thread %d connection %d bsock_poll() ERR %d.\n", tinfo->tid, + conn->conn_fd, ret); } } uint64_t cur_ts = get_time_us(); - for (struct dsmbr_conn * conn : tinfo->conns) { - ret = ppd_readmsg(conn->bsock, (char *)tinfo->mbuf, PPD_MSG_MAX_SZ); - if (ret == 0) { - ret = options.m_info->conn_recv_cb(tinfo->mbuf->data, tinfo->mbuf->size, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx); - if (ret != 0) { - E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + for (struct dsmbr_conn *conn : conns) { + while (true) { + // read as many messages as possible + ret = ppd_readmsg(conn->bsock, (char *)mbuf, PPD_MSG_MAX_SZ); + if (ret == 0) { + size_t recv_size = mbuf->size + sizeof(struct ppd_msg); + ret = options.m_info->conn_recv_cb(mbuf->data, mbuf->size, + options.m_global_ctx, m_ctx, conn->m_ctx); + if (ret != 0) { + E("Thread %d connection %d conn_recv_cb() failed ERR %d.\n", + tinfo->tid, conn->conn_fd, errno); + } + + struct dsmbr_request_record *rec = + conn->req_in_flight.front(); + conn->req_in_flight.pop_front(); + rec->recv_size = recv_size; + rec->recv_ts = cur_ts; + tinfo->stats.push_back(rec); + } else { + if (errno == ENODATA) { + // not enough data for next msg + break; + } else { + E("ppd_readmsg failed with err %d\n", errno); + } } - } - if (conn->next_send <= cur_ts && conn->req_in_flight.size() < (unsigned int)options.depth + 1) { + if (conn->next_send <= cur_ts && + conn->req_in_flight.size() < (unsigned int)options.depth) { // send size_t out_sz; - ret = options.m_info->conn_send_cb(tinfo->mbuf->data, PPD_MSG_MAX_SZ, &out_sz, options.m_global_ctx, tinfo->m_ctx, conn->m_ctx); + ret = options.m_info->conn_send_cb(mbuf->data, PPD_MSG_MAX_DATA_SZ, + &out_sz, options.m_global_ctx, m_ctx, conn->m_ctx); if (ret != 0) { - E("Thread %d connection %d conn_send_cb() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + E("Thread %d connection %d conn_send_cb() failed ERR %d.\n", + tinfo->tid, conn->conn_fd, errno); } - tinfo->mbuf->size = out_sz; + mbuf->size = out_sz; + + struct dsmbr_request_record *rec = new struct dsmbr_request_record; + rec->send_size = mbuf->size + sizeof(struct ppd_msg); + rec->send_ts = get_time_us(); + // no blocking, always flush - ret = ppd_writemsg(conn->bsock, tinfo->mbuf); + ret = ppd_writemsg(conn->bsock, mbuf); if (ret != 0) { - E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n", tinfo->tid, conn->conn_fd, errno); + E("Thread %d connection %d ppd_writemsg() failed ERR %d.\n", + tinfo->tid, conn->conn_fd, errno); } - conn->req_in_flight - conn->in_flight++; - conn->next_send += (uint64_t)(conn->ia_gen->generate() * (double)S2US); + + conn->req_in_flight.push_back(rec); + conn->next_send += (uint64_t)(conn->ia_gen->generate() * + (double)S2US); } } } V("Thread %d exiting...\n", tinfo->tid); + close(kqfd); + delete[] mbuf; + + for (unsigned int i = 0; i < conns.size(); i++) { + dsmbr_conn_free(conns.at(i), m_ctx); + } return NULL; } -static struct dsmbr_thread_ctx * +static void +dsmbr_free_worker(struct dsmbr_thread_ctx * worker) +{ + for (auto stat : worker->stats) { + delete stat; + } + delete worker; +} + +static struct dsmbr_thread_ctx * dsmbr_create_worker(int tid, int core) { - struct dsmbr_thread_ctx * tinfo = new struct dsmbr_thread_ctx; - - int kqfd = kqueue(); - if (kqfd != 0) { - E("kqueue() failed: %d\n", errno); - } - tinfo->kqfd = kqfd; - - tinfo->mbuf = (struct ppd_msg *)new char[PPD_MSG_MAX_SZ]; - - if (options.m_info->thread_create_cb(core, options.m_global_ctx, &tinfo->m_ctx) != 0) { - E("thread_create_cb() failed: %d\n", errno); - } + struct dsmbr_thread_ctx *tinfo = new struct dsmbr_thread_ctx; + tinfo->tid = tid; + tinfo->core = core; pthread_attr_t attr; int status; @@ -442,7 +476,7 @@ dsmbr_create_worker(int tid, int core) if (status != 0) { E("pthread_attr_setaffinity_np() failed : %d\n", status); } - + status = pthread_create(&tinfo->thrd, &attr, dsmbr_worker_main, tinfo); if (status != 0) { E("pthread_create() failed: %d\n", status); @@ -451,75 +485,44 @@ dsmbr_create_worker(int tid, int core) return tinfo; } -static void -dsmbr_create_workers() +static void +dsmbr_nslookup(const char *hostname, char *out, int len) { - int tid = 0; - int core; - - CPU_FOREACH_ISSET(core, &options.cpuset) { - options.workers.push_back(dsmbr_create_worker(tid, core)); - tid++; - V("Created thread %d on core %d\n", tid, core); + int ret; + struct addrinfo *addrinfo; + ret = getaddrinfo(hostname, nullptr, nullptr, &addrinfo); + if (ret != 0) { + E("getaddrinfo for host %s failed with %d\n", hostname, ret); } + if (inet_ntop(AF_UNSPEC, addrinfo->ai_addr, out, len) == NULL) { + E("inet_ntop for host %s failed with %d\n", hostname, errno); + } + freeaddrinfo(addrinfo); } -static std::string -dsmbr_nslookup(std::string hostname) -{ - static char rt[100] = {0}; - struct in_addr **addr; - struct hostent *he; - - if ((he = gethostbyname(hostname.c_str())) == NULL) { - E("Hostname %s cannot be resolved.\n", hostname.c_str()); - } - addr = (struct in_addr**)he->h_addr_list; - for (int i=0;addr[i]!=NULL;i++) { - strncpy(rt, inet_ntoa(*addr[i]), 99); - return rt; - } - return rt; -} - -static void -establish_slave_conns() +static void +dsmbr_master_connect_slaves(struct dsmbr_slave * slaves) { /* create a connection to the clients */ - for (uint32_t i=0; i < client_ips.size(); i++) { + for (int i = 0; i < options.num_slaves; i++) { struct sockaddr_in csock_addr; int c_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); csock_addr.sin_family = AF_INET; - csock_addr.sin_addr.s_addr = inet_addr(client_ips[i]); - csock_addr.sin_port = htons(DEFAULT_CLIENT_CTL_PORT); + csock_addr.sin_addr.s_addr = inet_addr(options.slave_ips[i]); + csock_addr.sin_port = htons(CTRL_PORT); - V("Connecting to client %s...\n", client_ips.at(i)); + V("Connecting to slave %s...\n", options.slave_ips[i]); - if (connect(c_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) != 0) { + if (connect(c_fd, (struct sockaddr *)&csock_addr, sizeof(csock_addr)) != 0) { E("Connect failed. ERR %d\n", errno); } - - if (writebuf(c_fd, &options, sizeof(options)) < 0) { - E("Write to client. ERR %d\n", errno); - } - - client_fds.push_back(c_fd); - V("Client connected %d/%lu.\n", i + 1, client_ips.size()); - - EV_SET(&kev[i], c_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - - options.global_conn_start_idx += options.client_conn * options.client_thread_count; - } - - V("Registering client fds to mtkq...\n"); - /* add to main thread's kq */ - if (kevent(mt_kq, kev, client_ips.size(), NULL, 0, NULL) == -1) { - E("Failed to add some clients to mtkq. ERR %d\n", errno); + slaves[i].conn_fd = c_fd; } } + static int -dsmbr_slave_ctrl_sock_create() +dsmbr_slave_ctrl_sock_create(void) { struct sockaddr_in server_addr; int status; @@ -550,440 +553,378 @@ dsmbr_slave_ctrl_sock_create() return fd; } -static void client_send_stats(int qps, int send_sz, int recv_sz) +static void +usage() { - struct kevent kev; - int msg = 0; - ppd_slave_resp resp; - char buf[1024]; + fprintf(stdout, + "Options:\n" + " -q: target qps.\n" + " -s: server addr.\n" + " -p: server port.\n" + " -o: output file path.\n" + " -a: worker thread cpulist.\n" + " -c: connections per thread.\n" + " -m: module path.\n" + " -M: slave ips, repeatable, enables master mode.\n" + " -S: slave mode.\n" + " -v: verbose mode.\n" + " -t: run time.\n" + " -T: warmup time.\n" + " -i: interarrival time distribution.\n" + " -O: module arguments ('key'='value'), repeatable.\n\n"); +} - resp.set_qps(qps); - resp.set_send_sz(send_sz); - resp.set_resp_sz(recv_sz); - resp.SerializeToArray(buf + sizeof(long), 1024 - sizeof(long)); - long sz = htonl(resp.ByteSizeLong()); - memcpy(buf, &sz, sizeof(long)); +static int +dsmbr_slave_accept_master(int slave_ctrl_sock) +{ + int ret; + int conn_fd; + struct sockaddr addr; + socklen_t addrlen; + char ip[INET_ADDRSTRLEN]; + struct in_addr inaddr = ((struct sockaddr_in *)&addr)->sin_addr; - /* clients need to send qps */ - V("Sending master stats, qps %d, send %d, resp %d...\n", qps, send_sz, recv_sz); - if (writebuf(master_fd, buf, resp.ByteSizeLong() + sizeof(long)) < 0) { - E("Error writing stats to master\n"); + ret = accept(slave_ctrl_sock, &addr, &addrlen); + if (ret < 0) { + E("Failed to accept master connection: %d\n", errno); + } + conn_fd = ret; + + if (inet_ntop(AF_INET, &inaddr, ip, INET_ADDRSTRLEN) == NULL) { + E("inet_ntop failed with %d\n", errno); } - V("Waiting for master ACK...\n"); + V("Accepted master connection %d from %s.", conn_fd, ip); - if (kevent(mt_kq, NULL, 0, &kev, 1, NULL) != 1) { - E("kevent wait for master ack %d\n", errno); - } + return conn_fd; +} - if (readbuf((int)kev.ident, &msg, sizeof(int)) < 0) { - E("Failed to receive ack from master\n"); - } - - if (msg != MSG_TEST_QPS_ACK) { - E("Invalid ack message\n"); +static void +dsmbr_getopt(int argc, char *argv[]) +{ + char ch; + while ((ch = getopt(argc, argv, "q:s:p:o:a:c:m:M:Sv:t:T:i:O:h")) != -1) { + switch (ch) { + case 'q': { + options.target_qps = strtoul(optarg, NULL, 10); + break; + } + case 's': { + dsmbr_nslookup(optarg, options.server_ip, sizeof(options.server_ip)); + break; + } + case 'p': { + options.server_port = strtoul(optarg, NULL, 10); + break; + } + case 'o': { + strncpy(options.output_file, optarg, sizeof(options.output_file)); + break; + } + case 'a': { + cpulist_to_cpuset(optarg, &options.cpuset); + break; + } + case 'c': { + options.conn_per_thread = strtoul(optarg, NULL, 10); + break; + } + case 'm': { + strncpy(options.module_path, optarg, sizeof(options.module_path)); + break; + } + case 'M': { + options.master_mode = 1; + dsmbr_nslookup(optarg, options.slave_ips[options.num_slaves], + sizeof(options.slave_ips[0])); + options.num_slaves++; + if (options.num_slaves > MAX_SLAVES) { + E("Too many slaves: %d > %d\n", options.num_slaves, MAX_SLAVES); + } + break; + } + case 'S': { + options.slave_mode = 1; + break; + } + case 'v': { + options.verbose = 1; + W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some(XFCE) terminals!\n"); + break; + } + case 't': { + options.duration = strtoul(optarg, NULL, 10); + break; + } + case 'T': { + options.warmup = strtoul(optarg, NULL, 10); + break; + } + case 'i': { + strncpy(options.ia_dist, optarg, sizeof(options.ia_dist)); + break; + } + case 'O': { + split_kvstr(optarg, "=", options.mod_argk[options.mod_argc], + sizeof(options.mod_argk[0]), options.mod_argv[options.mod_argc], + sizeof(options.mod_argv[0])); + options.mod_argc++; + break; + } + case 'h': + usage(); + exit(0); + default: + E("Unrecognized option -%c\n\n", ch); + } } } -static void wait_master_prepare() +static void +dsmbr_send_ctrl_code(int sock, int code) { - V("Waiting for master's PREPARE...\n"); - - struct sockaddr_in csock_addr; - int listen_fd; - int enable = 1; - csock_addr.sin_family = AF_INET; - csock_addr.sin_addr.s_addr = htonl(INADDR_ANY); - csock_addr.sin_port = htons(DEFAULT_CLIENT_CTL_PORT); - listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - - if (listen_fd < 0) { - E("socket"); + int ret; + struct dsmbr_ctrl_msg msg; + msg.code = code; + ret = dsmbr_send_ctrl_msg(sock, &msg); + if (ret != 0) { + E("Failed to send ctrl message %d\n", errno); } - - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) < 0) { - E("setsockopt reuseaddr"); - } - - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) < 0) { - E("setsockopt reuseport"); - } - - if (bind(listen_fd, (struct sockaddr*)&csock_addr, sizeof(csock_addr)) < 0) { - E("bind"); - } - - if (listen(listen_fd, 10) < 0) { - E("ctl listen"); - } - - master_fd = accept(listen_fd, NULL, NULL); - if (master_fd == -1) { - E("Failed to accept master. ERR %d\n", errno); - } - - close(listen_fd); - - if (readbuf(master_fd, &options, sizeof(options)) < 0) { - E("Failed to receive options from master. ERR %d\n", errno); - } - - V("Registering master fd to mtkq...\n"); - struct kevent kev; - - EV_SET(&kev, master_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - - if (kevent(mt_kq, &kev, 1, NULL, 0, NULL) == -1) { - E("Failed to register master fd to mtkq. ERR %d\n", errno); - } - - /* set the correct mode */ - options.master_mode = 0; - options.client_mode = 1; - options.output_name = NULL; + return; } -static void usage() +static void +dsmbr_wait_ctrl_code(int sock, int code) { - fprintf(stdout, "Usage:\n" - " -s: server addr.\n" - " -p: server port.\n" - " -q: target qps.\n" - " -c: worker thread cpulist.\n" - " -C: connections per worker thread.\n" - " -o: output file.\n" - " -h: show help.\n" - " -v: verbose mode.\n" - " -T: warm up time.\n" - " -t: duration.\n" - " -i: interarrival distribution.\n" - " -M: module path.\n" - " -O: module args of format key=value.\n" - " -a: client addrs (master mode).\n" - " -A: client mode.\n\n"); + int ret; + struct dsmbr_ctrl_msg msg; + ret = dsmbr_recv_ctrl_msg(sock, &msg); + if (ret != 0) { + E("Failed to recv ctrl message %d\n", errno); + } + if (msg.code != CTRL_SYNC) { + E("Unexpected message code %d, expected %d\n", msg.code, code); + } + return; } /* * protocol: - * + * * master -> client SYNC * client and master all establish connections to the server * client -> master ACK - * master -> client START (client runs forever) - * client -> master START_ACK - * master RUNS for X seconds + * master -> client START + * master RUNS for X seconds, client runs forever * master -> client STOP - * client -> master STATS + * client -> master STAT */ int -main(int argc, char* argv[]) +main(int argc, char *argv[]) { - int ch; - FILE *resp_fp_csv; + dsmbr_getopt(argc, argv); + dsmbr_dump_options(); - while ((ch = getopt(argc, argv, "q:s:C:p:o:t:c:hvW:w:T:Aa:Q:i:S:l:O:")) != -1) { - switch (ch) { - case 'q': - options.target_qps = atoi(optarg); - if (options.target_qps < 0) { - E("Target QPS must be positive\n"); - } - break; - case 'Q': - options.master_qps = atoi(optarg); - if (options.master_qps < 0) { - E("Master QPS must be positive\n"); - } - break; - case 's': { - string ip = get_ip_from_hostname(optarg); - strncpy(options.server_ip, ip.c_str(), INET_ADDRSTRLEN); - break; - } - case 'p': - options.server_port = atoi(optarg); - if (options.server_port <= 0) { - E("Server port must be positive\n"); - } - break; - case 'o': - options.output_name = optarg; - break; - case 't': - options.client_thread_count = atoi(optarg); - if (options.client_thread_count <= 0) { - E("Client threads must be positive\n"); - } - break; - case 'T': - options.master_thread_count = atoi(optarg); - if (options.master_thread_count <= 0) { - E("Master threads must be positive\n"); - } - break; - case 'c': - options.client_conn = atoi(optarg); - if (options.client_conn <= 0) { - E("Client connections must be positive\n"); - } - break; - case 'l': { - options.workload_type = static_cast(atoi(optarg)); - break; - } - case 'C': - options.master_conn = atoi(optarg); - if (options.master_conn <= 0) { - E("Master connections must be positive\n"); - } - break; - case 'S': { - string ip = get_ip_from_hostname(optarg); - options.master_server_ip_given = 1; - strncpy(options.master_server_ip, ip.c_str(), INET_ADDRSTRLEN); - break; - } - case 'O': { - strncpy(options.gen_params[options.num_gen_params], optarg, MAX_GEN_PARAMS_LEN); - options.num_gen_params++; - break; - } - case 'h': - case '?': - usage(); - exit(0); - case 'v': - options.verbose = 1; - W("Verbose mode can cause SUBSTANTIAL latency fluctuations in some(XFCE) terminals!\n"); - break; - case 'a': { - if (options.client_mode == 1) { - E("Cannot be both master and client\n"); - } - string ip = get_ip_from_hostname(optarg); - char *rip = new char[INET_ADDRSTRLEN + 1]; - strncpy(rip, ip.c_str(), INET_ADDRSTRLEN); - client_ips.push_back(rip); - options.master_mode = 1; - break; - } - case 'W': - options.warmup = atoi(optarg); - if (options.warmup < 0) { - E("Warmup must be positive\n"); - } - break; - case 'w': - options.duration = atoi(optarg); - if (options.duration <= 0) { - E("Test duration must be positive\n"); - } - break; - case 'A': { - if (options.master_mode == 1) { - E("Cannot be both master and client\n"); - } - options.client_mode = 1; - break; - } - case 'i': { - strncpy(options.generator_name, optarg, MAX_GEN_LEN); - break; - } - default: - E("Unrecognized option -%c\n\n", ch); - } + if (sem_init(&options.worker_init_sem, 0, THRD_INIT_PARALLEL) != 0) { + E("sem_init failed: %d\n", errno); } - if (!options.client_mode) { - resp_fp_csv = fopen(options.output_name, "w"); + FILE *resp_fp_csv = NULL; + if (!options.slave_mode) { + resp_fp_csv = fopen(options.output_file, "w"); if (resp_fp_csv == NULL) { - E("Cannot open file %s for writing %d.", options.output_name, errno); + E("Cannot open file %s for writing %d.", options.output_file, errno); exit(1); } } - /* create main thread's kqueue */ - int mt_kq = kqueue(); + struct dsmbr_slave slaves[MAX_SLAVES]; + int ret; + int slave_ctrl_sock = -1; + int master_ctrl_conn = -1; + std::vector workers; + uint64_t start_ts; + uint64_t end_ts; struct dsmbr_ctrl_msg ctrl_msg; - int ret = 0; - int slave_sock = -1; - int master_conn = -1; - std::vector slaves; + int kqfd = kqueue(); + if (kqfd < 0) { + E("kqueue() failed with %d\n", errno); + } if (options.master_mode) { /* initiate connections to clients */ - - } else if (options.client_mode) { - slave_sock = dsmbr_slave_ctrl_sock_create(); - V("Waiting for master connection...\n"); - ret = ppd_readbuf(slave_sock, &ctrl_msg, sizeof(ctrl_msg)); - if (ret != 0) { - E("Failed to read from listen socket: %d\n", errno); + V("M: connecting to slaves.\n"); + dsmbr_master_connect_slaves(slaves); + V("M: SYNC to slaves.\n"); + for (int i = 0; i < options.num_slaves; i++) { + dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_SYNC); } - if (ctrl_msg.) + } else if (options.slave_mode) { + slave_ctrl_sock = dsmbr_slave_ctrl_sock_create(); + V("S: waiting for master connection...\n"); + master_ctrl_conn = dsmbr_slave_accept_master(slave_ctrl_sock); + // add to kqueue + struct kevent kev; + EV_SET(&kev, master_ctrl_conn, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) { + E("kevent() failed: %d\n", errno); + } + V("S: waiting for master SYNC...\n"); + dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_SYNC); } - /* here EVERYONE is on the same page */ - options.dump(); - parse_rgen_params(); + // here slaves and master are on the same page */ + int core; + int tid = 0; + CPU_FOREACH_ISSET(core, &options.cpuset) { + workers.push_back(dsmbr_create_worker(tid, core)); + V("Created thread %d on core %d.\n", tid, core); + tid++; + } - vector threads; - vector send_pipes; - vector recv_pipes; - send_pipes.resize(options.client_thread_count); - recv_pipes.resize(options.client_thread_count); + V("Waiting for worker threads connection establishment.\n"); + while(options.worker_init_comp.load(std::memory_order_relaxed) != CPU_COUNT(&options.cpuset)) {}; - /* do our setup according to the options */ - pthread_barrier_init(&prepare_barrier, NULL, options.client_thread_count + 1); - pthread_barrier_init(&ok_barrier, NULL, options.client_thread_count + 1); - - V("Creating %d threads...\n", options.client_thread_count); - - /* create worker threads */ - vector *data = new vector[options.client_thread_count]; - for (int i = 0; i < options.client_thread_count; i++) { - int pipes[2]; - if (pipe(&pipes[0]) == -1) { - E("Cannot create pipes. ERR %d\n", errno); + // now all connections are established + if (options.master_mode) { + V("M: waiting for slaves ACK...\n"); + for (int i = 0; i < options.num_slaves; i++) { + dsmbr_wait_ctrl_code(slaves[i].conn_fd, CTRL_ACK); } + V("M: START to slaves...\n"); + for (int i = 0; i < options.num_slaves; i++) { + dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_START); + } + } else if (options.slave_mode) { + V("S: ACK to master...\n"); + dsmbr_send_ctrl_code(master_ctrl_conn, CTRL_ACK); + V("S: waiting for master START...\n"); + dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_START); + } - struct perf_counter *perf = new struct perf_counter; - perf->cnt = 0; - send_pipes[i] = pipes[0]; - recv_pipes[i] = pipes[1]; + V("Starting benchmark...\n"); + start_ts = get_time_us(); + options.state.store(STATE_RUN, std::memory_order_relaxed); - thrd_perf_counters.push_back(perf); - std::thread * thrd = new std::thread(worker_main, i, pipes[1], &data[i]); - threads.push_back(thrd); - pthread_t handle = thrd->native_handle(); - cpuset_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(i * 2, &cpuset); - V("Setting thread %d's affinity to %d.\n", i , i * 2); - if (pthread_setaffinity_np(handle, sizeof(cpuset_t), &cpuset) != 0) { - E("Failed to set affinity for thread %d\n", i); - } - } - - V("Waiting for thread connection establishment.\n"); - pthread_barrier_wait(&prepare_barrier); - - if (options.master_mode) { - V("Waiting for clients...\n"); - /* wait for ok messages from the clients */ - wait_clients_ok(); - } else if (options.client_mode) { - V("Sending OK to master...\n"); - /* send ok messages to the master */ - send_master_ok(); - } - - - /* create main thread timer loop */ + int cur_second = 0; struct kevent kev; -#define TIMER_FD (-1) - EV_SET(&kev, TIMER_FD, EVFILT_TIMER, EV_ADD, NOTE_SECONDS, 1, NULL); - - if (kevent(mt_kq, &kev, 1, NULL, 0, NULL) == -1) { - E("Cannot create kevent timer. ERR %d\n", errno); + if (!options.slave_mode) { + // slave mode runs forever + EV_SET(&kev, TIMER_MAGIC, EVFILT_TIMER, EV_ADD | EV_ONESHOT, NOTE_USECONDS, S2US, NULL); + if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) { + E("kevent() failed: %d\n", errno); + } } - - /* kick off worker threads */ - pthread_barrier_wait(&ok_barrier); - /* now we are free to start the experiment */ - V("Main thread running.\n"); - - while(1) { - /* client mode runs forever unless server sends us */ - if ((int)cur_time >= options.duration + options.warmup && !options.client_mode) { - break; + while (true) { + if (kevent(kqfd, NULL, 0, &kev, 1, NULL) == -1) { + E("kevent() failed: %d\n", errno); } - if (kevent(mt_kq, NULL, 0, &kev, 1, NULL) != 1) { - E("Error in main event loop. ERR %d\n", errno); - } - - if ((int)kev.ident == TIMER_FD) { - cur_time++; - } else { - /* its from either master or client */ - if (kev.flags & EV_EOF) { - E("Client or master %d disconnected\n", (int)kev.ident); - } - - int msg; - - if (readbuf(kev.ident, &msg, sizeof(msg)) == -1) - E("Failed to read from master_fd or message invalid. ERR %d\n", errno); - - if (msg == MSG_TEST_STOP) { - V("Received STOP from master\n"); + if (kev.ident == TIMER_MAGIC) { + cur_second++; + if ((int)cur_second >= options.duration + options.warmup) { break; - } else { - E("Unexpected message from master: %d\n", msg); } + } else if (options.slave_mode && (int)kev.ident == master_ctrl_conn) { + dsmbr_wait_ctrl_code(master_ctrl_conn, CTRL_STOP); + V("S: received STOP from master.\n"); + break; + } else { + E("Unknown kevent ident %d\n", (int)kev.ident); } } - V("Signaling threads to exit...\n"); - for (int i = 0; i < options.client_thread_count; i++) { - if (write(send_pipes[i], "e", sizeof(char)) == -1) { - E("Couldn't write to thread pipe %d. ERR %d\n", send_pipes[i], errno); - } - } + V("Stopping benchmark...\n"); + options.state.store(STATE_STOP, std::memory_order_relaxed); + end_ts = get_time_us(); - for (int i = 0; i < options.client_thread_count; i++) { - threads.at(i)->join(); - delete(threads.at(i)); - delete thrd_perf_counters[i]; - close(send_pipes[i]); - close(recv_pipes[i]); - } - - int qps = 0; - - for(int i = 0; i < options.client_thread_count; i++) { - qps += thrd_perf_counters[i]->cnt; - } - V("Total requests: %d\n", qps); - - if (options.client_mode) { - client_send_stats(qps, send_sz, recv_sz); - close(master_fd); - } - - struct slave_stats stats; - stats.qps = 0; - stats.recv_sz = 0; - stats.send_sz = 0; if (options.master_mode) { - V("Shutting down clients...\n"); - client_stop(&stats); - qps += stats.qps; + V("M: STOP to slaves...\n"); + for (int i = 0; i < options.num_slaves; i++) { + dsmbr_send_ctrl_code(slaves[i].conn_fd, CTRL_STOP); + } + } + + // wait for thread exit + for (unsigned int i = 0; i < workers.size(); i++) { + if ((ret = pthread_join(workers.at(i)->thrd, NULL)) != 0) { + E("pthread_join tid %d err %d\n", workers.at(i)->tid, ret); + } } - V("Aggregated %d operations over %d seconds\n", qps, options.duration); - qps = qps / (options.duration); + // obtain stats from threads + uint64_t total_req = 0; + uint64_t total_send = 0; + uint64_t total_recv = 0; + for (auto worker : workers) { + for (auto stat : worker->stats) { + total_req++; + total_send += stat->send_size; + total_recv += stat->recv_size; + } + } - if (!options.client_mode) { - /* stop the measurement */ - V("Saving results...\n"); - for (int i = 0; i < options.client_thread_count; i++) { - fprintf(resp_fp_csv, "%d,%d,%d\n", qps, stats.send_sz, stats.recv_sz); - for (uint32_t j = 0; j < data[i].size(); j++) { - struct datapt *pt = data[i].at(j); - fprintf(resp_fp_csv, "%ld\n", pt->lat); - delete pt; + V("Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", total_req, end_ts - start_ts, total_send, total_recv); + + if (options.master_mode) { + V("M: waiting for slaves STAT...\n"); + for (int i = 0; i < options.num_slaves; i++) { + if (dsmbr_recv_ctrl_msg(slaves[i].conn_fd, &ctrl_msg) != 0) { + E("dsmbr_recv_ctrl_msg failed with %d.\n", errno); + } + if (ctrl_msg.code != CTRL_STAT) { + E("Unexpected ctrl code %d, expected: %d.\n", ctrl_msg.code, CTRL_STAT); + } + slaves[i].reqs = ctrl_msg.data[0]; + slaves[i].time = ctrl_msg.data[1]; + slaves[i].send_sz = ctrl_msg.data[2]; + slaves[i].recv_sz = ctrl_msg.data[3]; + V("M: received stats - Reqs: %lu Time: %lu us Send: %lu bytes Recv: %lu bytes\n", slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz); + } + } else if (options.slave_mode) { + V("M: STAT to master...\n"); + ctrl_msg.code = CTRL_STAT; + ctrl_msg.data[0] = total_req; + ctrl_msg.data[1] = end_ts - start_ts; + ctrl_msg.data[2] = total_send; + ctrl_msg.data[4] = total_recv; + if (dsmbr_send_ctrl_msg(master_ctrl_conn, &ctrl_msg) != 0) { + E("dsmbr_send_ctrl_msg failed with %d.\n", errno); + } + } + + V("Saving results to %s ...", options.output_file); + if (!options.slave_mode) { + for (int i = 0; i < options.num_slaves; i++) { + fprintf(resp_fp_csv, "S,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz); + } + fprintf(resp_fp_csv, "M,%lu,%lu,%lu,%lu\n", slaves[i].reqs, slaves[i].time, slaves[i].send_sz, slaves[i].recv_sz); + for (auto worker : workers) { + for (auto stat : worker->stats) { + fprintf(resp_fp_csv, "R,%lu,%lu,%lu,%lu\n", stat->send_ts, stat->recv_ts, stat->send_size, stat->recv_size); } } - - delete[] data; + fflush(resp_fp_csv); + } + // clean up + if (resp_fp_csv != NULL) { fclose(resp_fp_csv); } + sem_destroy(&options.worker_init_sem); + close(kqfd); + if (slave_ctrl_sock != -1) { + close(slave_ctrl_sock); + } + if (master_ctrl_conn != -1) { + close(master_ctrl_conn); + } + for (int i = 0; i < options.num_slaves; i++) { + close(slaves[i].conn_fd); + } + for (auto worker : workers) { + dsmbr_free_worker(worker); + } - close(mt_kq); - return 0; } diff --git a/include/util.h b/include/util.h index b858b4c..654e6a2 100644 --- a/include/util.h +++ b/include/util.h @@ -39,6 +39,23 @@ cpulist_to_cpuset(char *cpulist, cpuset_t *cpuset) } } +static inline int +split_kvstr(char * str, const char * delim, char * key, int klen, char * val, int vlen) +{ + char* token = strtok(str, delim); + + if (token == NULL) + return -1; + strncpy(key, token, klen); + + token = strtok(NULL, delim); + if (token == NULL) + return -1; + strncpy(val, token, vlen); + + return 0; +} + #undef inline #ifdef __cplusplus