numam/net/khat.cc
2023-01-04 17:25:32 +01:00

702 lines
19 KiB
C++

#include <atomic>
#include <cassert>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <vector>
#include <unistd.h>
#include <sys/cpuset.h>
#include <sys/endian.h>
#include <sys/sched.h>
#include <sys/types.h>
#include <topo.h>
#include <rte_common.h>
#include <rte_config.h>
#include <rte_cycles.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include "ntr.h"
//#include "gen.hh"
#include "net/netsup.hh"
#include "net/pkt.hh"
#include "nms.h"
#include "rte_byteorder.h"
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int CACHELINE_SIZE = 64;
constexpr static uint16_t THREAD_LOAD_BUFFER_SZ = 16384;
struct probe_state_t {
struct net_spec dst;
struct conn_spec cspec {
.dst = &dst
};
uint64_t last_sw_rx;
uint64_t last_sw_tx;
uint64_t last_hw_rx;
uint32_t epoch;
};
// keep track of the probe state
// when a probe packet first arrives this state is set to be influx and the
// rte_mbuf's userdata is set to PROBE_MAGIC which prevents other probe packets
// to be processed when the server sends the probe stats back to user influx is
// released this is to guarantee that the server only processes one probe packet
// at the time
// XXX: also this can be attached to the mbuf itself and processed by the lcore
// thread
// I kept this global because globally there could be only one pending
// probe request and rx_add_timestamp can save their shit here too
struct thread_info {
int tid;
int rxqid;
int txqid;
int lcore_id;
int node_id;
void *cache_lines;
void *load_buffer;
};
struct options_t {
// config
int num_threads { 1 };
cpuset_t cpu_set = CPUSET_T_INITIALIZER(0x2); // 2nd core
bool jumbo_frame_enabled {
false
}; // setting this to true changes mbuf size and mtu
int port_mtu { MAX_STANDARD_MTU };
int thread_cacheline_cnt = { 1600 }; // 100MB data per thread
uint16_t portid { 0 };
// states
struct net_spec s_host_spec { };
std::vector<struct thread_info *> s_thr_info;
int probe_state_offset { 0 };
bool s_hwtimestamp { true };
struct probe_state_t s_probe_info;
std::atomic<bool> is_probing { false };
};
struct options_t options;
static bool
mbuf_is_probe_valid(struct rte_mbuf *pkt)
{
return *RTE_MBUF_DYNFIELD(pkt, options.probe_state_offset, bool *);
}
static void
mbuf_set_probe_valid(struct rte_mbuf *pkt, bool b)
{
*RTE_MBUF_DYNFIELD(pkt, options.probe_state_offset, bool *) = b;
}
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused,
void *_ __rte_unused)
{
int rc = 0;
uint64_t now = topo_uptime_ns();
struct timespec ts { };
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i],
&options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
bool cmp = false;
mbuf_set_probe_valid(pkts[i], false);
if (options.is_probing.compare_exchange_strong(cmp,
true)) {
options.s_probe_info.last_sw_rx = now;
if (options.s_hwtimestamp) {
if ((rc = rte_eth_timesync_read_rx_timestamp(
port, &ts,
pkts[i]->timesync & 0x3)) ==
0) {
options.s_probe_info
.last_hw_rx = ts.tv_nsec +
ts.tv_sec * S2NS;
ntr(NTR_DEP_USER1,
NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p with sw rx: %lu hw rx:%lu.\n",
(void *)pkts[i],
options.s_probe_info
.last_sw_rx,
options.s_probe_info
.last_hw_rx);
mbuf_set_probe_valid(pkts[i],
true);
} else {
options.is_probing.store(false);
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"rx_add_timestamp: packet %p not tagged - failed to read hw rx timestamp: %d.\n",
(void *)pkts[i], rc);
}
} else {
mbuf_set_probe_valid(pkts[i], true);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p with sw rx only: %lu.\n",
(void *)pkts[i], now);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: packet %p not tagged - server is probing.\n",
(void *)pkts[i]);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: packet %p not tagged - not PROBE packet: type %d.\n",
(void *)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
}
return nb_pkts;
}
static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = topo_uptime_ns();
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i],
&options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
// this packet is the response to PROBE packets
// at this time the packet is not sent to the NIC yet so
// the state must be waiting stats
assert(options.is_probing.load() &&
mbuf_is_probe_valid(pkts[i]));
options.s_probe_info.last_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: tagged packet %p with sw tx %lu\n",
(void *)pkts[i], options.s_probe_info.last_sw_tx);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: packet %p not tagged - type %d\n",
(void *)pkts[i], pkt_data->type);
}
}
return nb_pkts;
}
static void
worker_cpu_load(unsigned long us)
{
uint64_t now = topo_uptime_ns();
while(true) {
uint64_t cur = topo_uptime_ns();
if (cur - now >= us * 1000) {
break;
}
}
}
static void
worker_memory_load(int tid, uint32_t which, uint32_t load)
{
uint32_t start_cacheline = which % (options.thread_cacheline_cnt * options.s_thr_info.size());
uint32_t thrd = start_cacheline / options.thread_cacheline_cnt;
uint32_t start = start_cacheline % options.thread_cacheline_cnt;
struct thread_info * cur = options.s_thr_info.at(tid);
struct thread_info * tgt = options.s_thr_info.at(thrd);
for (uint32_t i = 0; i < load; i++) {
*(uint32_t *)cur->load_buffer = *(uint32_t *)((char *)tgt->cache_lines + ((start + i) % options.thread_cacheline_cnt) * CACHELINE_SIZE);
}
}
static int
locore_main(void *ti)
{
auto tinfo = (struct thread_info *)ti;
struct rte_mbuf *bufs[BURST_SIZE];
// + 1 because it might involve an extra PKT_TYPE_STAT packet
// when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
// XXX: hack hardcode to be larger than MTU
bool pending_probe = false;
if (rte_eth_dev_socket_id(options.portid) > 0 &&
rte_eth_dev_socket_id(options.portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n",
tinfo->tid, options.portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"locore_main <thread %d>: running on locore %d with txqid %d and rxqid %d.\n",
tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid);
while (true) {
uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(options.portid,
tinfo->rxqid, bufs, BURST_SIZE);
struct rte_mbuf *pkt_buf;
struct pkt_hdr *tx_data;
for (int i = 0; i < nb_rx; i++) {
// XXX: optimization: in rx_add_timestamp every packet
// is already validated once can just mark valid packet
// with a value so we can avoid this redundant check
pkt_data = check_valid_packet(bufs[i],
&options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: skipping invalid packet %p.\n",
tinfo->tid, (void *)bufs[i]);
// dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]);
continue;
}
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, pkt_data,
"locore_main <thread %d>: received packet ", tinfo->tid);
switch (rte_be_to_cpu_16(pkt_data->type)) {
case PKT_TYPE_PROBE: {
if (mbuf_is_probe_valid(bufs[i])) {
// send back probe_resp pkt to probe for
// return latency
pending_probe = true;
// book keep probe results
options.s_probe_info.epoch =
rte_be_to_cpu_32(
((struct pkt_payload_epoch *)
pkt_data->payload)
->epoch);
pkt_hdr_to_netspec(pkt_data,
&options.s_probe_info.dst,
&options.s_probe_info.cspec
.dst_port,
nullptr,
&options.s_probe_info.cspec
.src_port);
options.s_probe_info.cspec.src =
&options.s_host_spec;
if (alloc_pkt_hdr(mempool_get(
tinfo->node_id),
PKT_TYPE_PROBE_RESP,
&options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload,
pkt_data->payload,
sizeof(struct pkt_payload_epoch));
mbuf_set_probe_valid(pkt_buf, true);
// queue for burst send
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, tx_data,
"locore_main <thread %d>: sending packet ", tinfo->tid);
tx_bufs[nb_tx++] = pkt_buf;
}
break;
}
case PKT_TYPE_LOAD: {
struct conn_spec cspec;
struct net_spec src;
struct net_spec dst;
// touch the unused data to pretend that we read
// those dummy fields
memcpy(tinfo->load_buffer, pkt_data->payload,
MIN(bufs[i]->data_len -
sizeof(struct pkt_hdr),
THREAD_LOAD_BUFFER_SZ));
// perform the load
auto pld = (struct pkt_payload_load *)
pkt_data->payload;
uint32_t load_type = rte_be_to_cpu_32(pld->type);
uint32_t load_arg0 = rte_be_to_cpu_32(pld->arg0);
uint32_t load_arg1 = rte_be_to_cpu_32(pld->arg1);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: LOAD type %d, arg0 %d, arg1 %d\n",
tinfo->tid, load_type, load_arg0, load_arg1);
if (load_type == LOAD_TYPE_CPU) {
worker_cpu_load(load_arg0);
} else if (load_type == LOAD_TYPE_MEM) {
worker_memory_load(tinfo->tid, load_arg0, load_arg1);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: unknown LOAD type %d, ignoring...", tinfo->tid, load_type);
break;
}
// reply
pkt_hdr_to_netspec(pkt_data, &src,
&cspec.dst_port, &dst, &cspec.src_port);
cspec.dst = &src;
cspec.src = &dst;
// printf("LOAD PKT SIZE: %d\n",
// bufs[i]->data_len); we reply to load packet
// regardless of the server state
if (alloc_pkt_hdr(mempool_get(tinfo->node_id),
PKT_TYPE_LOAD_RESP, &cspec, 0, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload, pkt_data->payload,
sizeof(struct pkt_payload_load));
// queue for burst send
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, tx_data,
"locore_main <thread %d>: sending packet ", tinfo->tid);
tx_bufs[nb_tx++] = pkt_buf;
break;
}
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: ignoring packet %p with unknown type %d.\n",
tinfo->tid, (void *)bufs[i],
rte_be_to_cpu_16(pkt_data->type));
break;
}
rte_pktmbuf_free(bufs[i]);
}
// send all packets
tx_burst_all(options.portid, tinfo->txqid, tx_bufs, nb_tx);
// we wanna check every loop not only when there are packets
if (pending_probe) {
assert(options.is_probing.load());
struct timespec ts { };
struct pkt_payload_stat *stat;
int status = 0;
if (options.s_hwtimestamp) {
if ((status = rte_eth_timesync_read_tx_timestamp(
options.portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: obtained hw tx timestamp %lu.\n",
tinfo->tid,
(ts.tv_sec * S2NS + ts.tv_nsec));
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: failed to obtain hw tx timestamp: %d.\n",
tinfo->tid, status);
}
}
if (status == 0) {
// now we have everything we need
if (alloc_pkt_hdr(mempool_get(tinfo->node_id),
PKT_TYPE_STAT, &options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
}
// populate stats
stat = (struct pkt_payload_stat *)tx_data->payload;
stat->epoch = rte_cpu_to_be_32(
options.s_probe_info.epoch);
if (options.s_hwtimestamp) {
stat->hw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(
ts.tv_nsec + ts.tv_sec * S2NS);
} else {
stat->hw_rx = 0;
stat->hw_tx = 0;
}
stat->sw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_tx);
// send the packet
tx_burst_all(options.portid, tinfo->txqid, &pkt_buf, 1);
// release flux
pending_probe = false;
options.is_probing.store(false);
}
}
}
}
static void
usage()
{
fprintf(stdout,
"Usage:\n"
" -v(vv): verbose mode\n"
" -h: seek help\n"
" -A: cpu list for worker threads\n"
" -m: enable memory load generator(MLG)\n"
" -b: MLG trunk size\n"
" -x: MLG thread affinity mask\n"
" -X: MLG target domain affinity mask\n"
" -S: MLG shared buffer\n"
" -H: host spec\n"
" -J: enable jumbo frames\n"
" -p: port id\n");
fflush(stdout);
}
static void
dump_options()
{
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: khat configuration:\n"
" verbosity: +%d\n"
" thread count: %d\n"
" ip: 0x%x\n"
" jumbo frame: %d\n"
" port id: %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.s_host_spec.ip,
options.jumbo_frame_enabled, options.portid);
}
int
main(int argc, char *argv[])
{
bool has_host_spec { false };
struct mem_conf mconf;
struct device_conf dconf;
ntr_init();
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:H:Jp:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "\n");
case 'A':
cpulist_to_cpuset(optarg, &options.cpu_set);
options.num_threads = CPU_COUNT(
&options.cpu_set);
if (options.num_threads == 0) {
rte_exit(EXIT_FAILURE,
"must run at least one thread\n");
}
break;
case 'H':
if (str_to_netspec(optarg,
&options.s_host_spec) != 0) {
rte_exit(EXIT_FAILURE,
"invalid host spec\n");
}
has_host_spec = true;
break;
case 'J':
options.jumbo_frame_enabled = true;
options.port_mtu = MAX_JUMBO_MTU;
break;
case 'p':
options.portid = atoi(optarg);
break;
default:
usage();
rte_exit(EXIT_SUCCESS, "unknown argument: %c",
c);
}
}
}
if (!has_host_spec) {
rte_exit(EXIT_FAILURE, "Must specify host spec\n");
}
// init libtopo
if (topo_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) !=
0) {
rte_exit(EXIT_FAILURE, "libtopo init failed!\n");
}
// init libnms
if (nms_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "libnms init failed!\n");
}
dump_options();
// register dynamic field
struct rte_mbuf_dynfield rte_mbuf_dynfield_probe_flag = {
.name = "rte_mbuf_dynfield_probe_valid",
.size = sizeof(bool),
.align = __alignof__(uint32_t),
.flags = 0
};
options.probe_state_offset = rte_mbuf_dynfield_register(
&rte_mbuf_dynfield_probe_flag);
if (options.probe_state_offset == -1) {
rte_exit(EXIT_FAILURE, "failed to register dynamic field: %d\n",
rte_errno);
}
// configure memory and port
struct port_conf pconf;
portconf_get(options.portid, &pconf);
if (!pconf.timesync) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"main: timesync disabled. hw timestamp unavailable.\n ");
options.s_hwtimestamp = false;
}
dconf.mtu = options.port_mtu;
CPU_COPY(&options.cpu_set, &dconf.core_affinity);
dconf.portid = options.portid;
dconf.rss_hf = pconf.rss_hf;
dconf.rx_offloads = pconf.rxoffload;
dconf.tx_offloads = pconf.txoffload;
dconf.timesync = pconf.timesync;
dconf.rx_fn = rx_add_timestamp;
dconf.rx_user = nullptr;
dconf.rx_ring_sz = 2048;
dconf.tx_fn = tx_add_timestamp;
dconf.tx_user = nullptr;
dconf.tx_ring_sz = 2048;
mconf.cache_size = 512;
mconf.priv_size = 0;
mconf.num_elements = (dconf.rx_ring_sz + dconf.tx_ring_sz) *
rte_lcore_count() / rte_socket_count();
mconf.data_room_size = RTE_MBUF_DEFAULT_BUF_SIZE + MAX_JUMBO_MTU -
MAX_STANDARD_MTU;
mconf.max_pools = -1;
dpdk_init(&dconf, &mconf);
if (rte_eth_macaddr_get(options.portid,
&options.s_host_spec.mac_addr) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n",
options.portid);
}
// init threads
uint32_t cpu_idx = CPU_FFS(&options.cpu_set);
uint32_t tid = 0;
while (cpu_idx != 0) {
uint32_t lcore_id = cpu_idx - 1;
uint32_t node_id = rte_lcore_to_socket_id(lcore_id);
auto *tinfo = (struct thread_info *)nms_malloc(node_id,
sizeof(struct thread_info));
tinfo->cache_lines = nms_malloc(node_id,
CACHELINE_SIZE * options.thread_cacheline_cnt);
tinfo->load_buffer = nms_malloc(node_id,
THREAD_LOAD_BUFFER_SZ);
tinfo->tid = tid;
tinfo->lcore_id = lcore_id;
tinfo->node_id = node_id;
tinfo->rxqid = tid;
tinfo->txqid = tid;
options.s_thr_info.push_back(tinfo);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: thread %d assigned to cpu %d, node %d\n", tinfo->tid,
tinfo->lcore_id, topo_core_to_numa(lcore_id));
tid++;
CPU_CLR(cpu_idx - 1, &options.cpu_set);
cpu_idx = CPU_FFS(&options.cpu_set);
}
sleep(INIT_DELAY);
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: launching thread %d on locore %d\n", tinfo->tid,
tinfo->lcore_id);
if (rte_eal_remote_launch(locore_main,
(void *)options.s_thr_info.at(i),
tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE,
"failed to launch function on locore %d\n",
tinfo->lcore_id);
}
}
while (true) {
usleep(S2US);
}
// shouldn't get here
// clean up
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n",
tinfo->lcore_id);
}
}
dpdk_cleanup(&dconf);
return 0;
}