numam/net/khat.cc
2021-03-04 01:54:13 -05:00

799 lines
23 KiB
C++

#include <rte_common.h>
#include <rte_config.h>
#include <rte_cycles.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <unistd.h>
#include "nm.h"
#include "ntr.h"
#include "net/pkt.h"
#include "net/util.h"
#include <atomic>
#include <ctime>
#include <vector>
constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 2048;
constexpr static unsigned int TX_RING_SIZE = 2048;
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int CACHELINE_SIZE = 64;
constexpr static uint16_t THREAD_LOAD_BUFFER_SZ = 16384;
constexpr static size_t MEMPOOL_NAME_BUF_LEN = 64;
static const struct rte_mbuf_dynfield rte_mbuf_dynfield_probe_flag = {
.name = "rte_mbuf_dynfield_probe_flag",
.size = sizeof(uint32_t),
.align = __alignof__(uint32_t),
.flags = 0
};
static int PROBE_FLAG_OFFSET { 0 };
static const struct rte_eth_conf port_conf_default {
};
// keep track of the probe state
// when a probe packet first arrives this state is set to be influx and the
// rte_mbuf's userdata is set to PROBE_MAGIC which prevents other probe packets
// to be processed when the server sends the probe stats back to user influx is
// released this is to guarantee that the server only processes one probe packet
// at the time
// XXX: also this can be attached to the mbuf itself and processed by the lcore
// thread
// I kept this global because globally there could be only one pending
// probe request and rx_add_timestamp can save their shit here too
struct thread_info {
int tid;
int rxqid;
int txqid;
int lcore_id;
int node_id;
void * cache_lines;
void * load_buffer;
};
// state machine:
constexpr static int SERVER_STATE_WAIT = 0;
constexpr static int SERVER_STATE_PROBE = 1;
struct probe_state_t {
struct net_spec dst;
struct conn_spec cspec {
.dst = &dst
};
uint32_t epoch;
uint64_t last_sw_rx;
uint64_t last_sw_tx;
uint64_t last_hw_rx;
};
struct options_t {
// config
int num_threads { 1 };
uint64_t cpuset { 0x4 }; // 2nd core
uint64_t memmask { 0x0 }; // same socket as the NIC
char mempool_name_buf[MEMPOOL_NAME_BUF_LEN];
bool jumbo_frame_enabled { false }; // setting this to true changes mbuf size and mtu
int port_mtu { MAX_STANDARD_MTU };
int thread_cacheline_cnt = { 128 };
bool mlg_enabled { false };
uint64_t mlg_bps { 0 };
uint64_t mlg_cmask { 0 };
uint64_t mlg_dmask { 0 };
memload_generator *mlg { nullptr };
// states
uint16_t s_portid { 0 };
struct net_spec s_host_spec {
};
std::atomic<int> s_state { SERVER_STATE_WAIT };
struct probe_state_t s_probe_info;
std::vector<struct thread_info *> s_thr_info;
struct rte_mempool *s_mempools[MAX_NODES];
};
struct options_t options;
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused,
void *_ __rte_unused)
{
uint64_t now = nm_get_uptime_ns();
struct timespec ts {
};
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(
pkts[i], &options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
int state_wait = SERVER_STATE_WAIT;
*RTE_MBUF_DYNFIELD(
pkts[i], PROBE_FLAG_OFFSET, uint32_t *) = 0;
if (rte_eth_timesync_read_rx_timestamp(
port, &ts, pkts[i]->timesync & 0x3) == 0) {
if (options.s_state.compare_exchange_strong(
state_wait, SERVER_STATE_PROBE)) {
// mark the mbuf as probe packet being
// processed only the locore that
// receives the pkt w/ userdata !=
// nullptr processes that packet
*RTE_MBUF_DYNFIELD(pkts[i],
PROBE_FLAG_OFFSET, uint32_t *) = 1;
// tag with timestamps
options.s_probe_info.last_hw_rx =
ts.tv_nsec + ts.tv_sec * S2NS;
options.s_probe_info.last_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p epoch %d with sw: %lu hw:%lu.\n",
(void *)pkts[i],
options.s_probe_info.epoch, now,
options.s_probe_info.last_hw_rx);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"rx_add_timestamp: packet %p not tagged - server is processing a probe.\n",
(void *)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n",
(void *)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: packet %p not tagged - type %d.\n",
(void *)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
return nb_pkts;
}
static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = nm_get_uptime_ns();
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(
pkts[i], &options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
// this packet is the response to PROBE packets
// at this time the packet is not sent to the NIC yet so
// the state must be waiting stats
// XXX: this should be an assert
if (options.s_state.load() != SERVER_STATE_PROBE ||
*RTE_MBUF_DYNFIELD(
pkts[i], PROBE_FLAG_OFFSET, uint32_t *) != 1) {
rte_exit(EXIT_FAILURE,
"packet %p sent to NIC before sw callback\n",
(void *)pkts[i]);
}
options.s_probe_info.last_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: tagged packet %p with sw tx %lu\n",
(void *)pkts[i], options.s_probe_info.last_sw_tx);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: packet %p not tagged - type %d\n",
(void *)pkts[i], pkt_data->type);
}
}
return nb_pkts;
}
static int
locore_main(void *ti)
{
auto tinfo = (struct thread_info *)ti;
struct rte_mbuf *bufs[BURST_SIZE];
// + 1 because it might involve an extra PKT_TYPE_STAT packet
// when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
// XXX: hack hardcode to be larger than MTU
bool pending_probe = false;
if (rte_eth_dev_socket_id(options.s_portid) > 0 &&
rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n",
tinfo->tid, options.s_portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"locore_main <thread %d>: running on locore %d with txidx %d and rxidx %d.\n",
tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid);
while (true) {
uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(
options.s_portid, tinfo->rxqid, bufs, BURST_SIZE);
struct rte_mbuf *pkt_buf;
struct pkt_hdr *tx_data;
for (int i = 0; i < nb_rx; i++) {
// XXX: optimization: in rx_add_timestamp every packet
// is already validated once can just mark valid packet
// with a value so we can avoid this redundant check
pkt_data = check_valid_packet(
bufs[i], &options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: skipping invalid packet %p.\n",
tinfo->tid, (void *)bufs[i]);
// dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]);
continue;
}
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, pkt_data,
"locore_main <thread %d>: ", tinfo->tid);
switch (rte_be_to_cpu_16(pkt_data->type)) {
case PKT_TYPE_PROBE: {
if (options.s_state.load() ==
SERVER_STATE_PROBE &&
*RTE_MBUF_DYNFIELD(bufs[i],
PROBE_FLAG_OFFSET, uint32_t *) == 1) {
// send back probe_resp pkt to probe for
// return latency
pending_probe = true;
// book keep probe results
options.s_probe_info.epoch =
rte_be_to_cpu_32(
((struct pkt_payload_epoch *)
pkt_data->payload)
->epoch);
pkt_hdr_to_netspec(pkt_data,
&options.s_probe_info.dst,
&options.s_probe_info.cspec
.dst_port,
nullptr,
&options.s_probe_info.cspec
.src_port);
options.s_probe_info.cspec.src =
&options.s_host_spec;
if (alloc_pkt_hdr(
options
.s_mempools[tinfo->node_id],
PKT_TYPE_PROBE_RESP,
&options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload,
pkt_data->payload,
sizeof(struct pkt_payload_epoch));
*RTE_MBUF_DYNFIELD(pkt_buf,
PROBE_FLAG_OFFSET, uint32_t *) = 1;
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
}
break;
}
case PKT_TYPE_LOAD: {
struct conn_spec cspec;
struct net_spec src;
struct net_spec dst;
// touch the unused data to pretend that we read those dummy fields
memcpy(tinfo->load_buffer, pkt_data->payload, MIN(bufs[i]->data_len - sizeof(struct pkt_hdr), THREAD_LOAD_BUFFER_SZ));
// perform the load
auto pld = (struct pkt_payload_load *)pkt_data->payload;
uint32_t which = rte_be_to_cpu_32(pld->which);
uint32_t load = rte_be_to_cpu_32(pld->load);
uint32_t start_cacheline = which % (options.thread_cacheline_cnt * options.s_thr_info.size());
uint32_t thrd = start_cacheline / options.thread_cacheline_cnt;
uint32_t start = start_cacheline % options.thread_cacheline_cnt;
for (uint j = 0; j < load; j++) {
*(uint32_t *)tinfo->load_buffer = (start + j) % options.thread_cacheline_cnt;
}
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main <thread %d>: LOAD @ thread %d, start %d, load %d\n", tinfo->tid, thrd, start, load);
// reply
pkt_hdr_to_netspec(pkt_data, &src,
&cspec.dst_port, &dst, &cspec.src_port);
cspec.dst = &src;
cspec.src = &dst;
// printf("LOAD PKT SIZE: %d\n", bufs[i]->data_len);
// we reply to load packet regardless of the
// server state
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_LOAD_RESP, &cspec, 0, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload, pkt_data->payload,
sizeof(struct pkt_payload_load));
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
break;
}
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: ignoring packet %p with unknown type %d.\n",
tinfo->tid, (void *)bufs[i],
rte_be_to_cpu_16(pkt_data->type));
break;
}
rte_pktmbuf_free(bufs[i]);
}
// send all packets
tx_burst_all(options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
// we wanna check every loop not only when there are packets
if (pending_probe) {
struct timespec ts {
};
struct pkt_payload_stat *stat;
if (rte_eth_timesync_read_tx_timestamp(
options.s_portid, &ts) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: obtained hw tx timestamp %lu.\n",
tinfo->tid,
(ts.tv_sec * S2NS + ts.tv_nsec));
// now we have everything we need
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_STAT,
&options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
}
// populate stats
stat = (struct pkt_payload_stat *)
tx_data->payload;
stat->epoch = rte_cpu_to_be_32(
options.s_probe_info.epoch);
stat->hw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(
ts.tv_nsec + ts.tv_sec * S2NS);
stat->sw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_tx);
// send the packet
tx_burst_all(options.s_portid, tinfo->txqid, &pkt_buf, 1);
// release flux
pending_probe = false;
int expected = SERVER_STATE_PROBE;
if (!options.s_state.compare_exchange_strong(
expected, SERVER_STATE_WAIT)) {
rte_exit(EXIT_FAILURE,
"s_state changed unexpectedly!");
}
}
}
}
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info {
};
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf {
};
struct rte_eth_rxconf rxconf {
};
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
if (!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
port_conf.rxmode.max_rx_pkt_len = mtu_to_pkt_size(options.port_mtu);
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP |
ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP;
port_conf.rx_adv_conf.rss_conf.rss_key = nullptr;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_RSS_HASH;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
if (options.jumbo_frame_enabled) {
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
portid, options.num_threads, options.num_threads, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf;
if (options.jumbo_frame_enabled) {
rxconf.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd,
rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
options.s_thr_info.at(i)->rxqid = i;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per thread per Ethernet port. */
for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_tx_queue_setup(
portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
options.s_thr_info.at(i)->txqid = i;
}
// set mtu
ret = rte_eth_dev_set_mtu(portid, options.port_mtu);
if (ret != 0)
return ret;
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr {
};
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
for (int i = 0; i < options.num_threads; i++) {
if (rte_eth_add_tx_callback(portid,
options.s_thr_info.at(i)->txqid, tx_add_timestamp,
nullptr) == nullptr ||
rte_eth_add_rx_callback(portid,
options.s_thr_info.at(i)->rxqid, rx_add_timestamp,
nullptr) == nullptr) {
return -1;
}
}
// sync_port_clock(portid);
return 0;
}
static void
usage()
{
fprintf(stdout,
"Usage:\n"
" -v(vv): verbose mode\n"
" -h: seek help\n"
" -A: cpu mask for worker threads\n"
" -M: mempool socket affinity mask\n"
" -m: enable memory load generator(MLG)\n"
" -b: MLG bytes per second\n"
" -x: MLG thread affinity mask\n"
" -X: MLG target domain affinity mask\n"
" -H: host spec\n"
" -J: enable jumbo frames\n");
fflush(stdout);
}
static void
dump_options()
{
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: khat configuration:\n"
" verbosity: +%d\n"
" thread count: %d\n"
" cpu mask: 0x%lx\n"
" mempool mask: 0x%lx\n"
" ip: 0x%x\n"
" MLG: %s [bps: %ld, threads: 0x%lx, domain: 0x%lx]\n"
" jumbo frame: %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.cpuset, options.memmask,
options.s_host_spec.ip, options.mlg_enabled ? "on" : "off",
options.mlg_bps, options.mlg_cmask, options.mlg_dmask, options.jumbo_frame_enabled);
}
int
main(int argc, char *argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
bool has_host_spec { false };
ntr_init();
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:M:H:mb:X:x:J")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "\n");
case 'A':
options.cpuset = strtoull(optarg, nullptr, 16);
options.num_threads = cmask_get_num_cpus(
options.cpuset);
if (options.num_threads == 0) {
rte_exit(EXIT_FAILURE,
"must run at least one thread\n");
}
break;
case 'M':
options.memmask = strtoull(optarg, nullptr, 16);
break;
case 'H':
if (str_to_netspec(
optarg, &options.s_host_spec) != 0) {
rte_exit(EXIT_FAILURE,
"invalid host spec\n");
}
has_host_spec = true;
break;
case 'm':
options.mlg_enabled = true;
break;
case 'b':
options.mlg_bps = strtoull(optarg, nullptr, 10);
break;
case 'X':
options.mlg_dmask = strtoull(
optarg, nullptr, 16);
break;
case 'x':
options.mlg_cmask = strtoull(
optarg, nullptr, 16);
break;
case 'J':
options.jumbo_frame_enabled = true;
options.port_mtu = MAX_JUMBO_MTU;
break;
default:
usage();
rte_exit(
EXIT_SUCCESS, "unknown argument: %c", c);
}
}
}
if (!has_host_spec) {
rte_exit(EXIT_FAILURE, "Must specify host spec\n");
}
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
dump_options();
// init mlg
if (options.mlg_enabled) {
bool success = false;
options.mlg = new memload_generator(options.mlg_cmask,
options.mlg_dmask, options.mlg_bps, &success);
if (!success) {
rte_exit(EXIT_FAILURE, "failed to init mlg\n");
}
}
// register dynamic field
PROBE_FLAG_OFFSET = rte_mbuf_dynfield_register(
&rte_mbuf_dynfield_probe_flag);
if (PROBE_FLAG_OFFSET < 0) {
rte_exit(EXIT_FAILURE, "failed to register dynamic field\n");
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
if (rte_eth_macaddr_get(portid, &options.s_host_spec.mac_addr) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n",
portid);
}
if (nm_obj_count(NM_LEVEL_NUMA) > (int)MAX_NODES) {
rte_exit(EXIT_FAILURE, "Too many numa nodes\n");
}
for (int i = 0; i < nm_obj_count(NM_LEVEL_NUMA); i++) {
uint32_t nodeid = i;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: creating mempool for node %d\n", nodeid);
// create one mbuf pool per socket
snprintf(options.mempool_name_buf, MEMPOOL_NAME_BUF_LEN,
"khat_mempool_%d", nodeid);
mbuf_pool = rte_pktmbuf_pool_create(options.mempool_name_buf,
MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0,
options.jumbo_frame_enabled ?
RTE_MBUF_DEFAULT_BUF_SIZE + (MAX_JUMBO_MTU - MAX_STANDARD_MTU) :
RTE_MBUF_DEFAULT_BUF_SIZE, nodeid);
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %d\n",
rte_errno);
}
options.s_mempools[nodeid] = mbuf_pool;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: created mempool for node %d\n", nodeid);
break; // XXX: hack
}
// init threads
// auto cores = nm_get_cores();
uint64_t cpuset = options.cpuset;
for (int i = 0; i < options.num_threads; i++) {
uint32_t lcore_id = cmask_get_next_cpu(&cpuset);
uint32_t node_id = get_node_from_core(lcore_id);
auto *tinfo = (struct thread_info *)nm_malloc(node_id, sizeof(struct thread_info));
tinfo->cache_lines = nm_malloc(node_id, CACHELINE_SIZE * options.thread_cacheline_cnt);
tinfo->load_buffer = nm_malloc(node_id, THREAD_LOAD_BUFFER_SZ);
tinfo->tid = i;
tinfo->lcore_id = lcore_id;
tinfo->node_id = 0; // XXX: hack
options.s_thr_info.push_back(tinfo);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: thread %d assigned to cpu %d, node %d\n", tinfo->tid,
tinfo->lcore_id, get_node_from_core(lcore_id));
}
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"Configured port %d on socket %d with mac addr %x:%x:%x:%x:%x:%x\n",
portid, rte_eth_dev_socket_id(portid),
options.s_host_spec.mac_addr.addr_bytes[0],
options.s_host_spec.mac_addr.addr_bytes[1],
options.s_host_spec.mac_addr.addr_bytes[2],
options.s_host_spec.mac_addr.addr_bytes[3],
options.s_host_spec.mac_addr.addr_bytes[4],
options.s_host_spec.mac_addr.addr_bytes[5]);
sleep(INIT_DELAY);
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: launching thread %d on locore %d\n", tinfo->tid,
tinfo->lcore_id);
if (rte_eal_remote_launch(locore_main,
(void *)options.s_thr_info.at(i),
tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE,
"failed to launch function on locore %d\n",
tinfo->lcore_id);
}
}
if (options.mlg_enabled)
options.mlg->start();
while (true) {
usleep(S2US);
if (options.mlg_enabled) {
uint64_t bps = options.mlg->get_bps();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
}
}
if (options.mlg_enabled)
options.mlg->stop();
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n",
tinfo->lcore_id);
}
}
// shouldn't get here
// clean up
rte_eth_dev_stop(portid);
delete options.mlg;
return 0;
}