From f2be62a9bef53fe0997d167d068b0ea20b6e06b9 Mon Sep 17 00:00:00 2001 From: quackerd Date: Sun, 31 Jan 2021 02:50:58 -0500 Subject: [PATCH] cat refactor + rat reborn + statskeeping --- CMakeLists.txt | 25 +- cat/cat.cc | 215 ++++++------ cat/generator.h => inc/gen.h | 5 +- inc/util.h | 15 + khat/khat.cc | 13 +- {cat => libgen}/generator.cc | 2 +- rat/rat.cc | 624 +++++++++++++++++++++++++++++++++++ 7 files changed, 762 insertions(+), 137 deletions(-) rename cat/generator.h => inc/gen.h (98%) rename {cat => libgen}/generator.cc (98%) create mode 100644 rat/rat.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index ba3f64f..d6c0ecd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,13 +21,19 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -msse4 -mavx) -set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) -set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${dpdk_INCLUDE_DIRS}) include_directories(${Hwloc_INCLUDE_DIRS}) +set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) +set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) +set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) + +set(KHAT_LINKLIBS pthread nm ntr) +set(CAT_LINKLIBS pthread nm ntr gen) +set(RAT_LINKLIBS pthread nm ntr gen) + add_library(nm libnm/nm.cc) target_link_libraries(nm ${Hwloc_LIBRARIES}) target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS}) @@ -35,11 +41,18 @@ target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS}) add_library(ntr libntr/ntr.c) target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS}) -add_executable(khat khat/khat.cc ) -target_link_libraries(khat ${dpdk_LIBRARIES} pthread nm ntr) +add_library(gen libgen/generator.cc) +target_link_libraries(gen ${Hwloc_LIBRARIES}) +target_compile_options(gen PRIVATE ${LIBGEN_CC_FLAGS}) + +add_executable(khat khat/khat.cc) +target_link_libraries(khat ${dpdk_LIBRARIES} ${KHAT_LINKLIBS}) target_compile_options(khat PRIVATE ${CC_FLAGS}) -add_executable(cat cat/cat.cc cat/generator.cc) -target_link_libraries(cat ${dpdk_LIBRARIES} pthread nm ntr) +add_executable(cat cat/cat.cc) +target_link_libraries(cat ${dpdk_LIBRARIES} ${CAT_LINKLIBS}) target_compile_options(cat PRIVATE ${CC_FLAGS}) +add_executable(rat rat/rat.cc) +target_link_libraries(rat ${dpdk_LIBRARIES} ${RAT_LINKLIBS}) +target_compile_options(rat PRIVATE ${CC_FLAGS}) \ No newline at end of file diff --git a/cat/cat.cc b/cat/cat.cc index 7431fe5..eaba959 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -19,7 +19,7 @@ #include #include "nm.h" -#include "generator.h" +#include "gen.h" #include "ntr.h" #include "pkt.h" #include "util.h" @@ -30,9 +30,6 @@ constexpr static unsigned int RX_RING_SIZE = 4096; constexpr static unsigned int TX_RING_SIZE = 4096; constexpr static unsigned int BURST_SIZE = 32; -constexpr static unsigned int MODE_MASTER = 0; -constexpr static unsigned int MODE_CLIENT = 1; - static const struct rte_eth_conf port_conf_default{}; struct datapt { @@ -48,45 +45,35 @@ struct datapt { uint64_t srv_sw_rx; }; -struct thread_info { - unsigned int id; - unsigned int rxqid{0}; - unsigned int txqid{0}; - std::vector data; - struct datapt * last_datapt{nullptr}; - unsigned int tot_send{0}; - unsigned int tot_recv{0}; - Generator * ia_gen; -}; - struct options_t { + // parameters unsigned int run_time{5}; - unsigned int warmup_time{0}; - unsigned int num_threads{1}; - unsigned int mode{MODE_MASTER}; + unsigned int warmup_time{3}; char output[256] = "output.txt"; - char ia_gen[256] = "fixed:1"; + char ia_gen_str[256] = "fixed:0.01"; struct rte_ether_addr server_mac; - uint64_t cpu_mask; + uint64_t cpu_mask{0x2}; // 2nd core + std::vector slaves; + // states struct rte_mempool * mbuf_pool; struct rte_ether_addr s_host_mac; uint16_t s_portid; - std::vector s_thr_info; - std::atomic s_epoch; + unsigned int s_rxqid; + unsigned int s_txqid; + unsigned int s_total_pkts{0}; + Generator * s_iagen{nullptr}; + std::vector s_data; + struct datapt * s_last_datapt{nullptr}; + uint32_t s_epoch; std::atomic s_stop {false}; std::atomic s_record {0}; }; static struct options_t options; -static struct thread_info * get_thread_info(int qid) -{ - return options.s_thr_info.at(qid); -} - static uint16_t -rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx, +rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { uint64_t now = rte_rdtsc(); @@ -103,19 +90,18 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx, } if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) { - struct thread_info * tinfo = get_thread_info(qidx); uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); - if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) { + if (options.s_last_datapt != nullptr && options.s_last_datapt->epoch == epoch) { if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { // has hw rx timestamp - tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; - tinfo->last_datapt->clt_sw_rx = now; - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx); + options.s_last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; + options.s_last_datapt->clt_sw_rx = now; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, options.s_last_datapt->clt_hw_rx); } else { rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); } } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch); + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, options.s_last_datapt->epoch); } } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type)); @@ -141,14 +127,13 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, } if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { - struct thread_info * tinfo = get_thread_info(qidx); uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); - if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { - rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch); + if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) { + rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, options.s_last_datapt->epoch); } - tinfo->last_datapt->clt_sw_tx = now; + options.s_last_datapt->clt_sw_tx = now; ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now); } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); @@ -159,9 +144,8 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, } static int -locore_main(void * tif) +locore_main(void * tif __rte_unused) { - struct thread_info * tinfo = (struct thread_info *)tif; struct rte_mbuf *tx_buf; struct rte_mbuf *rx_bufs[BURST_SIZE]; struct pkt_hdr *pkt_data; @@ -182,7 +166,7 @@ locore_main(void * tif) "not be optimal.\n", options.s_portid); } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id); next_ts = get_time_us(); @@ -212,12 +196,12 @@ locore_main(void * tif) pld_epoch = (struct pkt_payload_epoch *)each->payload; epoch = rte_be_to_cpu_32(pld_epoch->epoch); - if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch); break; } - tinfo->tot_recv++; + options.s_total_pkts++; recv_resp = true; break; @@ -225,17 +209,15 @@ locore_main(void * tif) pld_stat = (struct pkt_payload_stat *)each->payload; epoch = rte_be_to_cpu_32(pld_stat->epoch); - if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch); break; } - tinfo->last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx); - tinfo->last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx); - tinfo->last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx); - tinfo->last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx); - - tinfo->tot_recv++; + options.s_last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx); + options.s_last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx); + options.s_last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx); + options.s_last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx); recv_stat = true; break; @@ -252,9 +234,9 @@ locore_main(void * tif) if (read_tx && recv_stat & recv_resp) { // if we have all the data - if (tinfo->last_datapt != nullptr) { + if (options.s_last_datapt != nullptr) { // push the data to the queue if we haven't done so already - tinfo->data.push_back(tinfo->last_datapt); + options.s_data.push_back(options.s_last_datapt); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \ " Valid: %d\n" @@ -266,24 +248,24 @@ locore_main(void * tif) " server TX SW: %llu\n" \ " server RX HW: %llu\n" \ " server RX SW: %llu\n\n", - tinfo->last_datapt->epoch, - tinfo->last_datapt->valid, - tinfo->last_datapt->clt_hw_tx, - tinfo->last_datapt->clt_sw_tx, - tinfo->last_datapt->clt_hw_rx, - tinfo->last_datapt->clt_sw_rx, - tinfo->last_datapt->srv_hw_tx, - tinfo->last_datapt->srv_sw_tx, - tinfo->last_datapt->srv_hw_rx, - tinfo->last_datapt->srv_sw_rx); - tinfo->last_datapt = nullptr; + options.s_last_datapt->epoch, + options.s_last_datapt->valid, + options.s_last_datapt->clt_hw_tx, + options.s_last_datapt->clt_sw_tx, + options.s_last_datapt->clt_hw_rx, + options.s_last_datapt->clt_sw_rx, + options.s_last_datapt->srv_hw_tx, + options.s_last_datapt->srv_sw_tx, + options.s_last_datapt->srv_hw_rx, + options.s_last_datapt->srv_sw_rx); + options.s_last_datapt = nullptr; } if (now >= next_ts) { struct pkt_payload_epoch * pld_epoch; uint32_t epoch; - next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0); + next_ts += (int)(options.s_iagen->generate() * 1000000.0); // generate the packet tx_buf = rte_pktmbuf_alloc(options.mbuf_pool); @@ -298,19 +280,20 @@ locore_main(void * tif) rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); } - epoch = options.s_epoch.fetch_add(1); + epoch = options.s_epoch; + options.s_epoch++; pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload; pld_epoch->epoch = rte_cpu_to_be_32(epoch); - tinfo->last_datapt = new struct datapt; - tinfo->last_datapt->epoch = epoch; - tinfo->last_datapt->valid = options.s_record.load(); + options.s_last_datapt = new struct datapt; + options.s_last_datapt->epoch = epoch; + options.s_last_datapt->valid = options.s_record.load(); read_tx = false; recv_resp = false; recv_stat = false; ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); - const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1); + const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, options.s_txqid, &tx_buf, 1); if (nb_tx != 1) { rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); @@ -322,7 +305,7 @@ locore_main(void * tif) struct timespec ts; if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS); - tinfo->last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; + options.s_last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; read_tx = true; } } @@ -364,7 +347,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ - ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf); + ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); if (ret != 0) return ret; @@ -375,7 +358,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) /* Allocate and set up 1 RX queue per thread . */ rxconf = dev_info.default_rxconf; rxconf.offloads = port_conf.rxmode.offloads; - for (uint32_t i = 0; i < options.num_threads; i++) { + for (uint32_t i = 0; i < 1; i++) { ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; @@ -384,7 +367,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; /* Allocate and set up 1 TX queue per Ethernet port. */ - for (uint32_t i = 0; i < options.num_threads; i++) { + for (uint32_t i = 0; i < 1; i++) { ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); if (ret < 0) return ret; @@ -438,33 +421,29 @@ static void usage() fprintf(stdout, "Usage:\n " \ " -v(vv): verbose mode\n" \ - " -h: display the information\n" \ - " -o: output filename\n" \ + " -s: server's mac\n" \ + " -S: slave(rat)'s mac\n" \ " -t: run time\n" \ " -T: warmup time\n" \ - " -s: server's mac\n" \ + " -h: display the information\n" \ + " -o: output filename\n" \ " -A: affinity mask\n" \ - " -a: number of threads\n" \ - " -C: client mode\n" " -i: inter-arrival time distribution\n\n"); } -// static void int_handler(int) -// { -// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); -// } int main(int argc, char* argv[]) { unsigned int nb_ports; struct rte_mempool *mbuf_pool; std::ofstream log_file; - struct thread_info *tinfo; ntr_init(); if (nm_init() != 0) rte_exit(EXIT_FAILURE, "failed to init libnm\n"); - // signal(SIGINT, int_handler); + // create default generator + options.s_iagen = createGenerator(options.ia_gen_str); + // init dpdk int ret = rte_eal_init(argc, argv); if (ret < 0) { @@ -479,8 +458,9 @@ int main(int argc, char* argv[]) { int c; // parse arguments - while((c = getopt(argc, argv, "hvo:t:T:s:A:a:Ci:")) != -1) { + while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:")) != -1) { switch (c) { + struct rte_ether_addr * addr; case 'v': ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); break; @@ -489,6 +469,13 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg); } break; + case 'S': + addr = new struct rte_ether_addr; + if (rte_ether_unformat_addr(optarg, addr) == -1) { + rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg); + } + options.slaves.push_back(addr); + break; case 't': options.run_time = atoi(optarg); break; @@ -504,14 +491,15 @@ int main(int argc, char* argv[]) case 'A': options.cpu_mask = atoll(optarg); break; - case 'a': - options.num_threads = atoi(optarg); - break; - case 'C': - options.mode = MODE_CLIENT; - break; case 'i': - strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1); + strncpy(options.ia_gen_str, optarg, sizeof(options.ia_gen_str) - 1); + if (options.s_iagen != nullptr) { + delete options.s_iagen; + } + options.s_iagen = createGenerator(options.ia_gen_str); + if (options.s_iagen == nullptr) { + rte_exit(EXIT_FAILURE, "invalid generator string %s\n", options.ia_gen_str); + } break; default: usage(); @@ -538,7 +526,6 @@ int main(int argc, char* argv[]) } options.s_portid = portid; - // create a mbuf memory pool on the socket mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid)); if (mbuf_pool == nullptr) { @@ -546,13 +533,6 @@ int main(int argc, char* argv[]) } options.mbuf_pool = mbuf_pool; - for(int i = 0; i < 1; i++) { - tinfo = new thread_info; - tinfo->id = i; - tinfo->ia_gen = createGenerator(options.ia_gen); - options.s_thr_info.push_back(tinfo); - } - if (port_init(portid, mbuf_pool) != 0) { rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); } @@ -573,17 +553,13 @@ int main(int argc, char* argv[]) sleep(1); - uint16_t core_id = rte_get_next_lcore(0, true, false); - - if (rte_eal_remote_launch(locore_main, options.s_thr_info.at(0), core_id) != 0) { + uint64_t cmask = options.cpu_mask; + const uint16_t core_id = cmask_get_next_cpu(&cmask); + if (rte_eal_remote_launch(locore_main, nullptr, core_id) != 0) { rte_exit(EXIT_FAILURE, "failed to launch function on locore\n"); } - // poor man's timer - // XXX: use kqueue instead - struct timespec ts; - ts.tv_sec = 1; - ts.tv_nsec = 0; + // XXX: poor man's timer uint32_t second = 0; while(true) { if (second >= options.warmup_time) { @@ -593,20 +569,29 @@ int main(int argc, char* argv[]) options.s_stop.store(true); break; } - clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL); + usleep(S2US); second++; } if (rte_eal_wait_lcore(core_id) < 0) rte_exit(EXIT_FAILURE, "failed to wait for job completion\n"); + + uint32_t qps = 0; // dump stats - // for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) { - // log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl; - // delete *it; - // } + for (auto it : options.s_data) { + if (it->valid) { + qps++; + log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ',' + << it->clt_hw_rx << ',' << it->clt_hw_tx << ',' + << it->srv_sw_rx << ',' << it->srv_sw_tx << ',' + << it->srv_hw_rx << ',' << it->srv_hw_tx << std::endl; + } + } log_file.close(); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Processed %d packets in %d seconds, QPS: %d\n", qps, options.run_time, qps); + // clean up rte_eth_dev_stop(portid); rte_eth_dev_close(portid); diff --git a/cat/generator.h b/inc/gen.h similarity index 98% rename from cat/generator.h rename to inc/gen.h index 1b52cce..be8b976 100644 --- a/cat/generator.h +++ b/inc/gen.h @@ -5,8 +5,7 @@ // 2. implement discrete generator // 3. implement combine generator? -#ifndef GENERATOR_H -#define GENERATOR_H +#pragma once #include @@ -233,5 +232,3 @@ Generator* createGenerator(std::string str); Generator* createFacebookKey(); Generator* createFacebookValue(); Generator* createFacebookIA(); - -#endif // GENERATOR_H \ No newline at end of file diff --git a/inc/util.h b/inc/util.h index 20ab299..3165a9f 100644 --- a/inc/util.h +++ b/inc/util.h @@ -16,3 +16,18 @@ get_time_us() clock_gettime(CLOCK_REALTIME, &ts); return ts.tv_sec * 1000000 + ts.tv_nsec / 1000; } + +constexpr static int NEXT_CPU_NULL = -1; +static inline int +cmask_get_next_cpu(uint64_t * mask) +{ + int ffs = ffsll(*mask); + *mask &= ~(1 << (ffs - 1)); + return ffs - 1; +} + +static inline int +cmask_get_num_cpus(const uint64_t mask) +{ + return _mm_popcnt_u64(mask); +} diff --git a/khat/khat.cc b/khat/khat.cc index 98754db..6220638 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -473,7 +473,7 @@ int main(int argc, char* argv[]) break; case 'm': options.cpuset = strtoull(optarg, nullptr, 16); - options.num_threads = _mm_popcnt_u64(options.cpuset); + options.num_threads = cmask_get_num_cpus(options.cpuset); if (options.num_threads == 0) { rte_exit(EXIT_FAILURE, "must run at least one thread\n"); } @@ -512,9 +512,7 @@ int main(int argc, char* argv[]) for(int i = 0; i < options.num_threads; i++) { struct thread_info * tinfo = new thread_info; tinfo->tid = i; - int ffs = ffsll(cpuset); - tinfo->lcore_id = ffs - 1; - cpuset = cpuset & ~(1 << (ffs - 1)); + tinfo->lcore_id = cmask_get_next_cpu(&cpuset); options.s_thr_info.push_back(tinfo); } @@ -544,13 +542,6 @@ int main(int argc, char* argv[]) } } - // while(true) { - // struct rte_eth_stats stats; - // rte_eth_stats_get(portid, &stats); - // printf("recv: %d missed: %d err: %d\n",(uint32_t)stats.ipackets, (uint32_t)stats.imissed,(uint32_t)stats.ierrors); - // usleep(1000000); - // } - for(int i = 0; i < options.num_threads; i++) { struct thread_info * tinfo = options.s_thr_info.at(i); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: waiting for locore %d...\n", tinfo->lcore_id); diff --git a/cat/generator.cc b/libgen/generator.cc similarity index 98% rename from cat/generator.cc rename to libgen/generator.cc index 335b4e4..57964f1 100644 --- a/cat/generator.cc +++ b/libgen/generator.cc @@ -1,6 +1,6 @@ // modified from mutilate -#include "generator.h" +#include "gen.h" Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); } diff --git a/rat/rat.cc b/rat/rat.cc new file mode 100644 index 0000000..97dcac0 --- /dev/null +++ b/rat/rat.cc @@ -0,0 +1,624 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nm.h" +#include "gen.h" +#include "ntr.h" +#include "pkt.h" +#include "util.h" + +constexpr static unsigned int MBUF_MAX_COUNT = 16384; +constexpr static unsigned int MBUF_CACHE_SIZE = 512; +constexpr static unsigned int RX_RING_SIZE = 4096; +constexpr static unsigned int TX_RING_SIZE = 4096; +constexpr static unsigned int BURST_SIZE = 32; + +constexpr static unsigned int MODE_MASTER = 0; +constexpr static unsigned int MODE_CLIENT = 1; + +static const struct rte_eth_conf port_conf_default{}; + +struct datapt { + uint32_t epoch; + uint32_t valid; + uint64_t clt_hw_tx; + uint64_t clt_sw_tx; + uint64_t clt_hw_rx; + uint64_t clt_sw_rx; + uint64_t srv_hw_tx; + uint64_t srv_sw_tx; + uint64_t srv_hw_rx; + uint64_t srv_sw_rx; +}; + +struct thread_info { + unsigned int id; + unsigned int rxqid{0}; + unsigned int txqid{0}; + std::vector data; + struct datapt * last_datapt{nullptr}; + unsigned int tot_send{0}; + unsigned int tot_recv{0}; + Generator * ia_gen; +}; + +struct options_t { + unsigned int run_time{5}; + unsigned int warmup_time{0}; + unsigned int num_threads{1}; + unsigned int mode{MODE_MASTER}; + char output[256] = "output.txt"; + char ia_gen[256] = "fixed:1"; + struct rte_ether_addr server_mac; + uint64_t cpu_mask; + // states + struct rte_mempool * mbuf_pool; + struct rte_ether_addr s_host_mac; + uint16_t s_portid; + std::vector s_thr_info; + std::atomic s_epoch; + std::atomic s_stop {false}; + std::atomic s_record {0}; +}; + +static struct options_t options; + +static struct thread_info * get_thread_info(int qid) +{ + return options.s_thr_info.at(qid); +} + +static uint16_t +rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx, + struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) +{ + uint64_t now = rte_rdtsc(); + struct pkt_hdr * pkt_data; + struct timespec ts; + int ret; + + for (int i = 0; i < nb_pkts; i++) { + pkt_data = check_valid_packet(pkts[i]); + + if (pkt_data == NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); + continue; + } + + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) { + struct thread_info * tinfo = get_thread_info(qidx); + uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) { + if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { + // has hw rx timestamp + tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; + tinfo->last_datapt->clt_sw_rx = now; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx); + } else { + rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); + } + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch); + } + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type)); + } + } + + return nb_pkts; +} + +static uint16_t +tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, + struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) +{ + uint64_t now = rte_rdtsc(); + struct pkt_hdr * pkt_data; + + for (int i = 0; i < nb_pkts; i++) { + pkt_data = check_valid_packet(pkts[i]); + + if (pkt_data == NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); + continue; + } + + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { + struct thread_info * tinfo = get_thread_info(qidx); + uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch); + } + + tinfo->last_datapt->clt_sw_tx = now; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now); + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); + } + } + + return nb_pkts; +} + +static int +locore_main(void * tif) +{ + struct thread_info * tinfo = (struct thread_info *)tif; + struct rte_mbuf *tx_buf; + struct rte_mbuf *rx_bufs[BURST_SIZE]; + struct pkt_hdr *pkt_data; + uint32_t core_id = rte_lcore_id(); + int32_t ret; + + bool read_tx = true; + bool recv_stat = true; + bool recv_resp = true; + + uint64_t next_ts; + // XXX: check link status instead + + sleep(1); + if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " + "polling thread.\n\tPerformance will " + "not be optimal.\n", options.s_portid); + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id); + + next_ts = get_time_us(); + + while(!options.s_stop.load()) { + uint64_t now = get_time_us(); + // always pop incoming packets + const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE); + + if (nb_rx > 0) { + for (int i = 0; i < nb_rx; i++) { + struct pkt_hdr * each = check_valid_packet(rx_bufs[i]); + + if (each == NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]); + rte_pktmbuf_free(rx_bufs[i]); + continue; + } + + uint16_t type = rte_be_to_cpu_16(each->type); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type); + switch (type) { + struct pkt_payload_epoch * pld_epoch; + struct pkt_payload_stat * pld_stat; + uint32_t epoch; + + case PKT_TYPE_PROBE_RESP: + pld_epoch = (struct pkt_payload_epoch *)each->payload; + epoch = rte_be_to_cpu_32(pld_epoch->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + break; + } + + tinfo->tot_recv++; + + recv_resp = true; + break; + case PKT_TYPE_STAT: + pld_stat = (struct pkt_payload_stat *)each->payload; + epoch = rte_be_to_cpu_32(pld_stat->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + break; + } + + tinfo->last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx); + tinfo->last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx); + tinfo->last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx); + tinfo->last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx); + + tinfo->tot_recv++; + + recv_stat = true; + break; + default: + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type); + rte_pktmbuf_free(rx_bufs[i]); + continue; + } + + rte_pktmbuf_free(rx_bufs[i]); + } + } + + if (read_tx && recv_stat & recv_resp) { + // if we have all the data + + if (tinfo->last_datapt != nullptr) { + // push the data to the queue if we haven't done so already + tinfo->data.push_back(tinfo->last_datapt); + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \ + " Valid: %d\n" + " client TX HW: %llu\n" \ + " client TX SW: %llu\n" \ + " client RX HW: %llu\n" \ + " client RX SW: %llu\n" \ + " server TX HW: %llu\n" \ + " server TX SW: %llu\n" \ + " server RX HW: %llu\n" \ + " server RX SW: %llu\n\n", + tinfo->last_datapt->epoch, + tinfo->last_datapt->valid, + tinfo->last_datapt->clt_hw_tx, + tinfo->last_datapt->clt_sw_tx, + tinfo->last_datapt->clt_hw_rx, + tinfo->last_datapt->clt_sw_rx, + tinfo->last_datapt->srv_hw_tx, + tinfo->last_datapt->srv_sw_tx, + tinfo->last_datapt->srv_hw_rx, + tinfo->last_datapt->srv_sw_rx); + tinfo->last_datapt = nullptr; + } + + if (now >= next_ts) { + struct pkt_payload_epoch * pld_epoch; + uint32_t epoch; + + next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0); + + // generate the packet + tx_buf = rte_pktmbuf_alloc(options.mbuf_pool); + + if (tx_buf == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); + } + + pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE, + &options.s_host_mac, &options.server_mac); + if (pkt_data == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); + } + + epoch = options.s_epoch.fetch_add(1); + pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload; + pld_epoch->epoch = rte_cpu_to_be_32(epoch); + tinfo->last_datapt = new struct datapt; + tinfo->last_datapt->epoch = epoch; + tinfo->last_datapt->valid = options.s_record.load(); + + read_tx = false; + recv_resp = false; + recv_stat = false; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); + const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1); + + if (nb_tx != 1) { + rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); + } + } + } + + if (!read_tx) { + struct timespec ts; + if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS); + tinfo->last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; + read_tx = true; + } + } + } + + rte_pktmbuf_free(tx_buf); + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d successfully stopped.\n", core_id); + + return 0; +} + +static int +port_init(uint16_t portid, struct rte_mempool *mbuf_pool) +{ + struct rte_eth_dev_info dev_info; + struct rte_eth_conf port_conf = port_conf_default; + struct rte_eth_txconf txconf; + struct rte_eth_rxconf rxconf; + + uint16_t nb_rxd = RX_RING_SIZE; + uint16_t nb_txd = TX_RING_SIZE; + port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; + + if(!rte_eth_dev_is_valid_port(portid)) { + return -1; + } + + int ret = rte_eth_dev_info_get(portid, &dev_info); + if (ret != 0) { + return ret; + } + + port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; + + /* Configure the Ethernet device. */ + ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf); + if (ret != 0) + return ret; + + ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd); + if (ret != 0) + return ret; + + /* Allocate and set up 1 RX queue per thread . */ + rxconf = dev_info.default_rxconf; + rxconf.offloads = port_conf.rxmode.offloads; + for (uint32_t i = 0; i < options.num_threads; i++) { + ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); + if (ret < 0) + return ret; + } + + txconf = dev_info.default_txconf; + txconf.offloads = port_conf.txmode.offloads; + /* Allocate and set up 1 TX queue per Ethernet port. */ + for (uint32_t i = 0; i < options.num_threads; i++) { + ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); + if (ret < 0) + return ret; + } + + ret = rte_eth_dev_start(portid); + if (ret < 0) + return ret; + + /* Display the port MAC address. */ + struct rte_ether_addr addr; + ret = rte_eth_macaddr_get(portid, &addr); + if (ret != 0) + return ret; + + ret = rte_eth_timesync_enable(portid); + if (ret != 0) + return ret; + + /* Enable RX in promiscuous mode for the Ethernet device. */ + ret = rte_eth_promiscuous_enable(portid); + if (ret != 0) + return ret; + + rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL); + rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL); + + return 0; +} + +static void dump_options() +{ + fprintf(stdout, "Configuration:\n" \ + " run time = %d\n" \ + " warmup time = %d\n" \ + " output file = %s\n" \ + " server MAC = %x:%x:%x:%x:%x:%x\n", + options.run_time, + options.warmup_time, + options.output, + options.server_mac.addr_bytes[0], + options.server_mac.addr_bytes[1], + options.server_mac.addr_bytes[2], + options.server_mac.addr_bytes[3], + options.server_mac.addr_bytes[4], + options.server_mac.addr_bytes[5]); +} + +static void usage() +{ + fprintf(stdout, + "Usage:\n " \ + " -v(vv): verbose mode\n" \ + " -h: display the information\n" \ + " -o: output filename\n" \ + " -t: run time\n" \ + " -T: warmup time\n" \ + " -s: server's mac\n" \ + " -A: affinity mask\n" \ + " -a: number of threads\n" \ + " -C: client mode\n" + " -i: inter-arrival time distribution\n\n"); +} +// static void int_handler(int) +// { +// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); +// } + +int main(int argc, char* argv[]) +{ + unsigned int nb_ports; + struct rte_mempool *mbuf_pool; + std::ofstream log_file; + struct thread_info *tinfo; + + ntr_init(); + if (nm_init() != 0) + rte_exit(EXIT_FAILURE, "failed to init libnm\n"); + // signal(SIGINT, int_handler); + + // init dpdk + int ret = rte_eal_init(argc, argv); + if (ret < 0) { + rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n"); + } + + argc -= ret; + argv += ret; + + // set warning level + ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING); + { + int c; + // parse arguments + while((c = getopt(argc, argv, "hvo:t:T:s:A:a:Ci:")) != -1) { + switch (c) { + case 'v': + ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); + break; + case 's': + if (rte_ether_unformat_addr(optarg, &options.server_mac) == -1) { + rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg); + } + break; + case 't': + options.run_time = atoi(optarg); + break; + case 'T': + options.warmup_time = atoi(optarg); + break; + case 'h': + usage(); + rte_exit(EXIT_SUCCESS, "success\n"); + case 'o': + strncpy(options.output, optarg, sizeof(options.output) - 1); + break; + case 'A': + options.cpu_mask = atoll(optarg); + break; + case 'a': + options.num_threads = atoi(optarg); + break; + case 'C': + options.mode = MODE_CLIENT; + break; + case 'i': + strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1); + break; + default: + usage(); + rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c); + break; + } + } + } + + // open log file for writing + if (options.mode == MODE_MASTER) { + log_file.open(options.output, std::ofstream::out); + if (!log_file) { + rte_exit(EXIT_FAILURE, "failed to open log file %s\n", options.output); + } + } + + nb_ports = rte_eth_dev_count_avail(); + if (nb_ports == 0) { + rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); + } + + uint16_t portid = rte_eth_find_next(0); + if (portid == RTE_MAX_ETHPORTS) { + rte_exit(EXIT_FAILURE, "cannot find an available port\n"); + } + options.s_portid = portid; + + + // create a mbuf memory pool on the socket + mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid)); + if (mbuf_pool == nullptr) { + rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); + } + options.mbuf_pool = mbuf_pool; + + for(int i = 0; i < 1; i++) { + tinfo = new thread_info; + tinfo->id = i; + tinfo->ia_gen = createGenerator(options.ia_gen); + options.s_thr_info.push_back(tinfo); + } + + if (port_init(portid, mbuf_pool) != 0) { + rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); + } + + if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) { + rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid); + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, + options.s_host_mac.addr_bytes[0], + options.s_host_mac.addr_bytes[1], + options.s_host_mac.addr_bytes[2], + options.s_host_mac.addr_bytes[3], + options.s_host_mac.addr_bytes[4], + options.s_host_mac.addr_bytes[5]); + + dump_options(); + + sleep(1); + + uint16_t core_id = rte_get_next_lcore(0, true, false); + + if (rte_eal_remote_launch(locore_main, options.s_thr_info.at(0), core_id) != 0) { + rte_exit(EXIT_FAILURE, "failed to launch function on locore\n"); + } + + // poor man's timer + // XXX: use kqueue instead + struct timespec ts; + ts.tv_sec = 1; + ts.tv_nsec = 0; + uint32_t second = 0; + while(true) { + if (second >= options.warmup_time) { + options.s_record.store(1); + } + if (second >= options.run_time + options.warmup_time) { + options.s_stop.store(true); + break; + } + clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL); + second++; + } + + if (rte_eal_wait_lcore(core_id) < 0) + rte_exit(EXIT_FAILURE, "failed to wait for job completion\n"); + + // dump stats + if (options.mode == MODE_MASTER) { + thread_info * master_thrd = options.s_thr_info.at(0); + for (auto it : master_thrd->data) { + if (it->valid) { + log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ',' + << it->clt_hw_rx << ',' << it->clt_hw_tx << ',' + << it->srv_sw_rx << ',' << it->srv_sw_tx << ',' + << it->srv_hw_rx << ',' << it->srv_hw_tx << std::endl; + } + } + } + log_file.close(); + + // clean up + rte_eth_dev_stop(portid); + rte_eth_dev_close(portid); + + return 0; +} \ No newline at end of file