numam/cat/cat.cc

621 lines
23 KiB
C++

#include <cstdio>
#include <ctime>
#include <netinet/in.h>
#include <rte_config.h>
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_log.h>
#include <rte_byteorder.h>
#include <rte_ip.h>
#include <atomic>
#include <vector>
#include <fstream>
#include <unistd.h>
#include "nm.h"
#include "gen.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
constexpr static unsigned int MBUF_MAX_COUNT = 16384;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 32;
static const struct rte_eth_conf port_conf_default{};
struct datapt {
uint32_t epoch;
uint32_t valid;
uint64_t clt_hw_tx;
uint64_t clt_sw_tx;
uint64_t clt_hw_rx;
uint64_t clt_sw_rx;
uint64_t srv_hw_tx;
uint64_t srv_sw_tx;
uint64_t srv_hw_rx;
uint64_t srv_sw_rx;
};
struct options_t {
// parameters
unsigned int run_time{5};
unsigned int warmup_time{3};
char output[256] = "output.txt";
char ia_gen_str[256] = "fixed:0.01";
struct rte_ether_addr server_mac;
uint64_t cpu_mask{0x2}; // 2nd core
std::vector<struct rte_ether_addr *> slaves;
unsigned long rage_quit_time = (unsigned long)-1;
unsigned long last_sent_ts = 0;
// states
struct rte_mempool * mbuf_pool;
struct rte_ether_addr s_host_mac;
uint16_t s_portid;
unsigned int s_rxqid;
unsigned int s_txqid;
unsigned int s_total_pkts{0};
Generator * s_iagen{nullptr};
std::vector<struct datapt *> s_data;
struct datapt * s_last_datapt{nullptr};
uint32_t s_epoch;
std::atomic<bool> s_stop {false};
std::atomic<uint32_t> s_record {0};
};
static struct options_t options;
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct pkt_hdr * pkt_data;
struct timespec ts;
int ret;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (options.s_last_datapt != nullptr && options.s_last_datapt->epoch == epoch) {
if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) {
// has hw rx timestamp
options.s_last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
options.s_last_datapt->clt_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, options.s_last_datapt->clt_hw_rx);
} else {
rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, options.s_last_datapt->epoch);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
}
return nb_pkts;
}
static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct pkt_hdr * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, options.s_last_datapt->epoch);
}
options.s_last_datapt->clt_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
}
}
return nb_pkts;
}
static int
locore_main(void * tif __rte_unused)
{
struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
uint32_t core_id = rte_lcore_id();
int32_t ret;
bool read_tx = true;
bool recv_stat = true;
bool recv_resp = true;
uint64_t next_ts;
// XXX: check link status instead
sleep(1);
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id);
next_ts = get_time_us();
while(!options.s_stop.load()) {
uint64_t now = get_time_us();
// always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
if (nb_rx > 0) {
for (int i = 0; i < nb_rx; i++) {
struct pkt_hdr * each = check_valid_packet(rx_bufs[i]);
if (each == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
uint16_t type = rte_be_to_cpu_16(each->type);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type);
switch (type) {
struct pkt_payload_epoch * pld_epoch;
struct pkt_payload_stat * pld_stat;
uint32_t epoch;
case PKT_TYPE_PROBE_RESP:
pld_epoch = (struct pkt_payload_epoch *)each->payload;
epoch = rte_be_to_cpu_32(pld_epoch->epoch);
if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch);
break;
}
options.s_total_pkts++;
recv_resp = true;
break;
case PKT_TYPE_STAT:
pld_stat = (struct pkt_payload_stat *)each->payload;
epoch = rte_be_to_cpu_32(pld_stat->epoch);
if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch);
break;
}
options.s_last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx);
options.s_last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx);
options.s_last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx);
options.s_last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx);
recv_stat = true;
break;
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
rte_pktmbuf_free(rx_bufs[i]);
}
}
if (read_tx && recv_stat & recv_resp) {
// if we have all the data
if (options.s_last_datapt != nullptr) {
// push the data to the queue if we haven't done so already
options.s_data.push_back(options.s_last_datapt);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \
" Valid: %d\n"
" client TX HW: %llu\n" \
" client TX SW: %llu\n" \
" client RX HW: %llu\n" \
" client RX SW: %llu\n" \
" server TX HW: %llu\n" \
" server TX SW: %llu\n" \
" server RX HW: %llu\n" \
" server RX SW: %llu\n\n",
options.s_last_datapt->epoch,
options.s_last_datapt->valid,
options.s_last_datapt->clt_hw_tx,
options.s_last_datapt->clt_sw_tx,
options.s_last_datapt->clt_hw_rx,
options.s_last_datapt->clt_sw_rx,
options.s_last_datapt->srv_hw_tx,
options.s_last_datapt->srv_sw_tx,
options.s_last_datapt->srv_hw_rx,
options.s_last_datapt->srv_sw_rx);
options.s_last_datapt = nullptr;
}
if (now >= next_ts) {
struct pkt_payload_epoch * pld_epoch;
uint32_t epoch;
next_ts += (int)(options.s_iagen->generate() * 1000000.0);
// generate the packet
tx_buf = rte_pktmbuf_alloc(options.mbuf_pool);
if (tx_buf == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
}
pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE,
&options.s_host_mac, &options.server_mac);
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
}
epoch = options.s_epoch;
options.s_epoch++;
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
pld_epoch->epoch = rte_cpu_to_be_32(epoch);
options.s_last_datapt = new struct datapt;
options.s_last_datapt->epoch = epoch;
options.s_last_datapt->valid = options.s_record.load();
read_tx = false;
recv_resp = false;
recv_stat = false;
options.last_sent_ts = get_time_us();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, options.s_txqid, &tx_buf, 1);
if (nb_tx != 1) {
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
}
}
}
if (!recv_stat) {
// if we haven't recevied the stats get ready to rage quit
if(get_time_us() - options.last_sent_ts > options.rage_quit_time * 1000) {
rte_exit(EXIT_FAILURE, "waiting too long for resp. I QUIT!!\n");
}
}
if (!read_tx) {
struct timespec ts;
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS);
options.s_last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
read_tx = true;
}
}
}
rte_pktmbuf_free(tx_buf);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d successfully stopped.\n", core_id);
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info;
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf;
struct rte_eth_rxconf rxconf;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
if(!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per thread . */
rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < 1; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < 1; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr;
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL);
return 0;
}
static void dump_options()
{
fprintf(stdout, "Configuration:\n" \
" run time = %d\n" \
" warmup time = %d\n" \
" output file = %s\n" \
" rage quit time = %ld\n"\
" host MAC = %x:%x:%x:%x:%x:%x\n",
options.run_time,
options.warmup_time,
options.output,
options.rage_quit_time,
options.server_mac.addr_bytes[0],
options.server_mac.addr_bytes[1],
options.server_mac.addr_bytes[2],
options.server_mac.addr_bytes[3],
options.server_mac.addr_bytes[4],
options.server_mac.addr_bytes[5]);
}
static void usage()
{
fprintf(stdout,
"Usage:\n " \
" -v(vv): verbose mode\n" \
" -s: server's mac\n" \
" -S: slave(rat)'s mac\n" \
" -t: run time\n" \
" -T: warmup time\n" \
" -h: display the information\n" \
" -o: output filename\n" \
" -A: affinity mask\n" \
" -i: inter-arrival time distribution\n" \
" -r: rage quit time (in ms)\n");
fflush(stdout);
}
int main(int argc, char* argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
std::ofstream log_file;
ntr_init();
if (nm_init() != 0)
rte_exit(EXIT_FAILURE, "failed to init libnm\n");
// create default generator
options.s_iagen = createGenerator(options.ia_gen_str);
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:r:")) != -1) {
switch (c) {
struct rte_ether_addr * addr;
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 's':
if (rte_ether_unformat_addr(optarg, &options.server_mac) == -1) {
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg);
}
break;
case 'S':
addr = new struct rte_ether_addr;
if (rte_ether_unformat_addr(optarg, addr) == -1) {
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg);
}
options.slaves.push_back(addr);
break;
case 't':
options.run_time = atoi(optarg);
break;
case 'T':
options.warmup_time = atoi(optarg);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "\n");
case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1);
break;
case 'A':
options.cpu_mask = strtoull(optarg, nullptr, 16);
break;
case 'i':
strncpy(options.ia_gen_str, optarg, sizeof(options.ia_gen_str) - 1);
if (options.s_iagen != nullptr) {
delete options.s_iagen;
}
options.s_iagen = createGenerator(options.ia_gen_str);
if (options.s_iagen == nullptr) {
rte_exit(EXIT_FAILURE, "invalid generator string %s\n", options.ia_gen_str);
}
break;
case 'r':
options.rage_quit_time = atoi(optarg);
break;
default:
usage();
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
break;
}
}
}
// open log file for writing
log_file.open(options.output, std::ofstream::out);
if (!log_file) {
rte_exit(EXIT_FAILURE, "failed to open log file %s\n", options.output);
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.mbuf_pool = mbuf_pool;
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid,
options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]);
dump_options();
sleep(1);
uint64_t cmask = options.cpu_mask;
const int16_t core_id = cmask_get_next_cpu(&cmask);
if (core_id == NEXT_CPU_NULL) {
rte_exit(EXIT_FAILURE, "invalid cpu mask 0x%lx\n", cmask);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread on core %d\n", core_id);
if (rte_eal_remote_launch(locore_main, nullptr, core_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
}
// XXX: poor man's timer
uint32_t second = 0;
while(true) {
if (second >= options.warmup_time) {
options.s_record.store(1);
}
if (second >= options.run_time + options.warmup_time) {
options.s_stop.store(true);
break;
}
usleep(S2US);
second++;
}
if (rte_eal_wait_lcore(core_id) < 0)
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
uint32_t qps = 0;
// dump stats
for (auto it : options.s_data) {
if (it->valid) {
qps++;
log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ','
<< it->clt_hw_rx << ',' << it->clt_hw_tx << ','
<< it->srv_sw_rx << ',' << it->srv_sw_tx << ','
<< it->srv_hw_rx << ',' << it->srv_hw_tx << std::endl;
}
}
log_file.close();
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Processed %d packets in %d seconds, QPS: %d\n", qps, options.run_time, qps);
// clean up
rte_eth_dev_stop(portid);
rte_eth_dev_close(portid);
return 0;
}