#include #include #include #include #include #include #include #include #include #include #include "gen.h" #include "nm.h" #include "ntr.h" #include "pkt.h" #include "util.h" #include #include #include #include #include #include constexpr static unsigned int MBUF_MAX_COUNT = 65536; constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int RX_RING_SIZE = 1024; constexpr static unsigned int TX_RING_SIZE = 1024; constexpr static unsigned int BURST_SIZE = 8; static const struct rte_eth_conf port_conf_default { }; static unsigned int epoch_mk(unsigned int id, unsigned int epoch) { return (id << 24) | epoch; } static unsigned int epoch_get_id(unsigned int epoch) { return epoch >> 24; } static unsigned int epoch_get_epoch(unsigned int epoch) { return epoch & 0x00FFFFFF; } struct epoch_info { unsigned int epoch; uint64_t ts; }; struct thread_info { unsigned int id { 0 }; unsigned int lcore_id { 0 }; unsigned int rxqid { 0 }; unsigned int txqid { 0 }; // this field is read by the stat collecting thread std::atomic total_pkts { 0 }; std::atomic lost_pkts { 0 }; Generator *ia_gen { nullptr }; Generator *load_gen { nullptr }; std::mutex mtx; // this lock protects data shared between worker threads, i.e.: std::list recved_epochs; }; constexpr static int STATE_SYNC = 0; // waiting for SYNC constexpr static int STATE_SYNC_ACK = 1; // Waiting for sending SYNC_ACK constexpr static int STATE_RUNNING = 2; // Running constexpr static int STATE_FIN = 3; // FIN received struct options_t { unsigned int run_time { 5 }; // parameters int slave_mode { 0 }; uint32_t rage_quit_time { UINT32_MAX }; char ia_gen[256] { "fixed" }; char ld_gen[256] { "fixed:0" }; uint32_t target_qps { 0 }; uint32_t depth = 1; struct net_spec server_spec { }; uint64_t cpu_mask { 0x4 }; // 1 thread @ core 2 uint32_t pkt_loss_delay_ms = UINT32_MAX; // states unsigned int s_num_threads { 1 }; // 1 thread struct rte_mempool *mbuf_pool { nullptr }; struct net_spec s_host_spec { }; struct net_spec s_master_spec { }; struct conn_spec s_master_cspec { .src = &s_host_spec, .src_port = DEFAULT_RAT_PORT, .dst = &s_master_spec, .dst_port = DEFAULT_RAT_PORT, }; uint16_t s_portid { 0 }; std::vector s_thr_info; std::atomic s_state { STATE_RUNNING }; // default non master mode // states for qps std::atomic s_ts_begin { 0 }; }; static struct options_t options; static inline void calc_stats( uint64_t now, uint32_t *qps, uint32_t *total_pkt, uint32_t *total_loss) { uint32_t tot = 0; uint32_t loss = 0; for (auto i : options.s_thr_info) { tot += i->total_pkts.load(); loss += i->lost_pkts.load(); } if (total_pkt != nullptr) { *total_pkt = tot; } if (total_loss != nullptr) { *total_loss = loss; } if (qps != nullptr) { *qps = (uint32_t)((double)(tot - loss) / ((double)(now - options.s_ts_begin.load()) / (double)S2NS)); } } static void proto_loop(struct thread_info *tinfo) { struct rte_mbuf *tx_buf; struct rte_mbuf *rx_bufs[BURST_SIZE]; struct pkt_hdr *pkt_data; ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "proto_loop : waiting for SYNC from cat\n", tinfo->id); while (options.s_state.load() == STATE_SYNC) { const uint16_t nb_rx = rte_eth_rx_burst( options.s_portid, tinfo->rxqid, rx_bufs, BURST_SIZE); if (nb_rx > 0) { for (int i = 0; i < nb_rx; i++) { struct pkt_hdr *each = check_valid_packet( rx_bufs[i], &options.s_host_spec.mac_addr); if (each != nullptr) { uint16_t type = rte_be_to_cpu_16( each->type); if (type == PKT_TYPE_SYNC) { int expected = STATE_SYNC; ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "proto_loop : received SYNC from cat\n", tinfo->id); if (!options.s_state .compare_exchange_strong( expected, STATE_SYNC_ACK)) { // someone barged in, // listen to that guy ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "proto_loop : failed to cmpxchg sync_recv.\n", tinfo->id); } else { pkt_hdr_to_netspec(each, &options .s_master_spec, nullptr, nullptr, nullptr); if (alloc_pkt_hdr( options .mbuf_pool, PKT_TYPE_SYNC_ACK, &options .s_master_cspec, &tx_buf, &pkt_data) != 0) { rte_exit( EXIT_FAILURE, "failed to alloc pkt hdr\n"); } if (rte_eth_tx_burst( options .s_portid, tinfo->txqid, &tx_buf, 1) != 1) { rte_exit( EXIT_FAILURE, "failed to send packet\n"); } expected = STATE_SYNC_ACK; // we've done our job, // set off the threads if (!options.s_state .compare_exchange_strong( expected, STATE_RUNNING)) { rte_exit( EXIT_FAILURE, "state unexpectedly changed\n"); } ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "proto_loop : sent SYNC_ACK to cat\n", tinfo->id); } } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "proto_loop : ignoring invalid packet %p type %d.\n", tinfo->id, (void *)rx_bufs[i], type); } } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "proto_loop : ignoring invalid packet %p.\n", tinfo->id, (void *)rx_bufs[i]); } rte_pktmbuf_free(rx_bufs[i]); } } } ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "proto_loop : exiting loop...\n", tinfo->id); } static void pkt_loop(struct thread_info *tinfo) { struct rte_mbuf *tx_bufs[BURST_SIZE]; struct rte_mbuf *rx_bufs[BURST_SIZE]; std::vector recved_epochs; std::map sent_epochs; uint64_t cur_epoch = 0; uint64_t next_ts; uint64_t last_recv_ts = 0; struct conn_spec srv_cspec; rdport_generator src_port_gen(MIN_RANDOM_PORT); rdport_generator dst_port_gen(MIN_RANDOM_PORT); srv_cspec.src = &options.s_host_spec; srv_cspec.dst = &options.server_spec; next_ts = nm_get_uptime_ns(); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "pkt_loop : entering\n", tinfo->id); while (options.s_state.load() == STATE_RUNNING) { uint64_t now = nm_get_uptime_ns(); // always pop incoming packets const uint16_t nb_rx = rte_eth_rx_burst( options.s_portid, tinfo->rxqid, rx_bufs, BURST_SIZE); if (nb_rx > 0) { for (int i = 0; i < nb_rx; i++) { struct pkt_hdr *each = check_valid_packet( rx_bufs[i], &options.s_host_spec.mac_addr); if (each == nullptr) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : ignoring invalid packet %p.\n", tinfo->id, (void *)rx_bufs[i]); rte_pktmbuf_free(rx_bufs[i]); continue; } uint16_t type = rte_be_to_cpu_16(each->type); NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, each, "locore_main : ", tinfo->id); struct pkt_payload_epoch *pld_epoch; struct epoch_info *einfo; uint32_t epoch; uint32_t id; struct thread_info *other_t; int int_expected = STATE_RUNNING; switch (type) { case PKT_TYPE_LOAD_RESP: pld_epoch = (struct pkt_payload_epoch *) each->payload; epoch = rte_be_to_cpu_32( pld_epoch->epoch); id = epoch_get_id(epoch); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : packet %p epoch 0x%x id %d.\n", tinfo->id, (void *)rx_bufs[i], epoch, id); if (id >= options.s_num_threads) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "pkt_loop : packet %p invalid id %d.\n", tinfo->id, (void *)rx_bufs[i], id); break; } einfo = new struct epoch_info; einfo->epoch = epoch; einfo->ts = now; other_t = options.s_thr_info.at(id); other_t->mtx.lock(); other_t->recved_epochs.push_back(einfo); other_t->mtx.unlock(); break; case PKT_TYPE_FIN: if (rte_is_same_ether_addr( &each->eth_hdr.s_addr, &options.s_master_spec .mac_addr)) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : recved FIN from cat.\n", tinfo->id); // master told us to stop! if (!options.s_state .compare_exchange_strong( int_expected, STATE_FIN)) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "pkt_loop : failed to cmpxchg state.\n", tinfo->id); } uint32_t qps; uint32_t total_pkt; uint32_t total_loss; calc_stats(now, &qps, &total_pkt, &total_loss); struct pkt_hdr *pkt_hdr; if (alloc_pkt_hdr( options.mbuf_pool, PKT_TYPE_FIN_ACK, &options.s_master_cspec, &tx_bufs[0], &pkt_hdr) != 0) { rte_exit(EXIT_FAILURE, "failed to allocate pkt hdr\n"); } auto pld_qps = (struct pkt_payload_qps *) pkt_hdr->payload; pld_qps->qps = rte_cpu_to_be_32( qps); pld_qps->total_loss = rte_cpu_to_be_32( total_loss); pld_qps->total_pkts = rte_cpu_to_be_32(total_pkt); const uint16_t nb_tx = rte_eth_tx_burst( options.s_portid, tinfo->txqid, &tx_bufs[0], 1); if (nb_tx != 1) { rte_exit(EXIT_FAILURE, "failed to send packet\n"); } options.s_state.store(STATE_FIN); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : sent FIN_ACK to cat. QPS = %d.\n", tinfo->id, qps); } else { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "pkt_loop : invalid FIN packet from a different cat.\n", tinfo->id); } break; default: ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop: ignoring packet %p with unknown type %d.\n", (void *)rx_bufs[i], type); } rte_pktmbuf_free(rx_bufs[i]); } } // dequeue receved epochs struct epoch_info *einfo; tinfo->mtx.lock(); while (!tinfo->recved_epochs.empty()) { // only dequeue, process later einfo = tinfo->recved_epochs.front(); tinfo->recved_epochs.pop_front(); // XXX: might call into the allocator // otherwise we need to have an array and do batching // => complex code and don't think it's worth it recved_epochs.push_back(einfo); } tinfo->mtx.unlock(); if (!recved_epochs.empty()) ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : dequeued %lu received epochs\n", tinfo->id, recved_epochs.size()); // process epochs while (!recved_epochs.empty()) { einfo = recved_epochs.back(); recved_epochs.pop_back(); auto it = sent_epochs.find(einfo->epoch); if (it != sent_epochs.end()) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : received epoch 0x%x\n", tinfo->id, epoch_get_epoch(einfo->epoch)); if (einfo->ts > last_recv_ts) { last_recv_ts = einfo->ts; } delete it->second; sent_epochs.erase(it); } else { // we recved an epoch we never sent ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : received epoch 0x%x but never sent it. Packet loss?\n", tinfo->id, einfo->epoch); } delete einfo; } // handle packet loss for (auto it = sent_epochs.begin(); it != sent_epochs.end();) { einfo = it->second; if (now - einfo->ts > options.pkt_loss_delay_ms * MS2NS) { // timed out ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : epoch 0x%x is lost after not receiving for too long\n", tinfo->id, einfo->epoch); delete it->second; it = sent_epochs.erase(it); tinfo->lost_pkts.fetch_add(1); } else { ++it; } } // check to send the next packet uint32_t total_send = 0; while (now >= next_ts && sent_epochs.size() < options.depth && total_send < BURST_SIZE) { struct pkt_payload_load *pld_load; struct pkt_hdr *pkt_data; next_ts += (int)(tinfo->ia_gen->generate() * S2NS); // change dst port for every packet for RSS srv_cspec.dst_port = dst_port_gen.next(); srv_cspec.src_port = src_port_gen.next(); if (alloc_pkt_hdr(options.mbuf_pool, PKT_TYPE_LOAD, &srv_cspec, &tx_bufs[total_send], &pkt_data) != 0) { rte_exit(EXIT_FAILURE, "failed to allocate pkt hdr\n"); } pld_load = (struct pkt_payload_load *)pkt_data->payload; pld_load->load = rte_cpu_to_be_32( tinfo->load_gen->generate()); unsigned int epoch = epoch_mk(tinfo->id, cur_epoch); pld_load->epoch = rte_cpu_to_be_32(epoch); cur_epoch++; einfo = new struct epoch_info; einfo->epoch = epoch; einfo->ts = now; sent_epochs.insert({ epoch, einfo }); tinfo->total_pkts.fetch_add(1); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : sending packet %p with epoch 0x%x\n", tinfo->id, (void *)tx_bufs[total_send], epoch); total_send++; } if (total_send > 0) { const uint16_t nb_tx = rte_eth_tx_burst( options.s_portid, tinfo->txqid, tx_bufs, total_send); if (nb_tx != total_send) { rte_exit( EXIT_FAILURE, "failed to send packet\n"); } } // check rage quit if (last_recv_ts == 0) { last_recv_ts = nm_get_uptime_ns(); } if (nm_get_uptime_ns() - last_recv_ts > options.rage_quit_time * MS2NS) { rte_exit(EXIT_FAILURE, "rat: thread %d waiting too long for resp. I QUIT!!\n", tinfo->id); } } // clean up for (auto it = sent_epochs.begin(); it != sent_epochs.end();) { delete it->second; ++it; } ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "pkt_loop : exiting loop...\n", tinfo->id); } static int locore_main(void *tif) { auto tinfo = (struct thread_info *)tif; uint32_t core_id = rte_lcore_id(); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main : running on core %d...\n", tinfo->id, core_id); if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main : WARNING, port %d is on remote NUMA node to " "polling thread.\n\tPerformance will " "not be optimal.\n", tinfo->id, options.s_portid); } if (options.slave_mode == 1) { // perform rat protocol proto_loop(tinfo); } // wait for the primary thread sending SYNC_ACK while (options.s_state.load() != STATE_RUNNING) { } // store the current timestamp options.s_ts_begin.store(nm_get_uptime_ns()); pkt_loop(tinfo); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main : exited\n", tinfo->id); return 0; } static int port_init(uint16_t portid, struct rte_mempool *mbuf_pool) { struct rte_eth_dev_info dev_info { }; struct rte_eth_conf port_conf = port_conf_default; struct rte_eth_txconf txconf { }; struct rte_eth_rxconf rxconf { }; uint16_t nb_rxd = RX_RING_SIZE; uint16_t nb_txd = TX_RING_SIZE; if (!rte_eth_dev_is_valid_port(portid)) { return -1; } int ret = rte_eth_dev_info_get(portid, &dev_info); if (ret != 0) { return ret; } port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS; port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP | ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP; port_conf.rx_adv_conf.rss_conf.rss_key = nullptr; port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM; port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM; port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_RSS_HASH; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ ret = rte_eth_dev_configure( portid, options.s_num_threads, options.s_num_threads, &port_conf); if (ret != 0) return ret; ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd); if (ret != 0) return ret; /* Allocate and set up 1 RX queue per thread . */ rxconf = dev_info.default_rxconf; rxconf.offloads = port_conf.rxmode.offloads; for (uint32_t i = 0; i < options.s_num_threads; i++) { ret = rte_eth_rx_queue_setup(portid, options.s_thr_info.at(i)->rxqid, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; } txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; /* Allocate and set up 1 TX queue per Ethernet port. */ for (uint32_t i = 0; i < options.s_num_threads; i++) { ret = rte_eth_tx_queue_setup(portid, options.s_thr_info.at(i)->txqid, nb_txd, rte_eth_dev_socket_id(portid), &txconf); if (ret < 0) return ret; } ret = rte_eth_dev_start(portid); if (ret < 0) return ret; /* Display the port MAC address. */ struct rte_ether_addr addr { }; ret = rte_eth_macaddr_get(portid, &addr); if (ret != 0) return ret; // no promiscuous mode required return 0; } static void dump_options() { ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configuration:\n" " verbosity = +%d\n" " run time = %d\n" " num threads = %d\n" " rage quit time = %ul\n" " cpu mask = 0x%lx\n" " slave mode = %d\n" " interarrival dist = %s\n" " workload dist = %s\n" " qps = %d\n" " host IP = 0x%x\n" " depth = %u\n" " packet loss time threshold = %u\n", ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING, options.run_time, options.s_num_threads, options.rage_quit_time, options.cpu_mask, options.slave_mode, options.ia_gen, options.ld_gen, options.target_qps, options.s_host_spec.ip, options.depth, options.pkt_loss_delay_ms); } static void usage() { fprintf(stdout, "Usage:\n" " -v(vv): verbose mode\n" " -h: display the information\n" " -t: run time\n" " -s: server net spec\n" " -S: slave(rat) mode\n" " -A: affinity mask\n" " -i: inter-arrival time distribution\n" " -w: workload distribution\n" " -r: rage quit time (in ms)\n" " -q: target QPS\n" " -H: host net spec\n" " -D: max number of packets in flight\n" " -l: packet loss time threshold\n"); } int main(int argc, char *argv[]) { unsigned int nb_ports; struct rte_mempool *mbuf_pool; struct thread_info *tinfo; bool has_host_spec = false; ntr_init(); // init dpdk int ret = rte_eal_init(argc, argv); if (ret < 0) { rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n"); } argc -= ret; argv += ret; // set warning level ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING); { int c; // parse arguments while ( (c = getopt(argc, argv, "vht:s:SA:i:w:r:q:H:D:l:")) != -1) { switch (c) { case 'v': ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); break; case 'h': usage(); rte_exit(EXIT_SUCCESS, "\n"); case 't': options.run_time = strtol(optarg, nullptr, 10); break; case 's': if (str_to_netspec( optarg, &options.server_spec) != 0) { rte_exit(EXIT_FAILURE, "invalid server net spec\n"); } break; case 'S': options.slave_mode = 1; options.s_state = STATE_SYNC; // set state to wait for SYNC break; case 'A': options.cpu_mask = strtoull( optarg, nullptr, 16); options.s_num_threads = cmask_get_num_cpus( options.cpu_mask); if (options.s_num_threads == 0) { rte_exit(EXIT_FAILURE, "invalid cpu mask 0x%lx\n", options.cpu_mask); } break; case 'i': strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1); break; case 'w': strncpy(options.ld_gen, optarg, sizeof(options.ld_gen) - 1); break; case 'r': options.rage_quit_time = strtol( optarg, nullptr, 10); break; case 'q': options.target_qps = strtol( optarg, nullptr, 10); break; case 'H': has_host_spec = true; if (str_to_netspec( optarg, &options.s_host_spec) != 0) { rte_exit(EXIT_FAILURE, "invalid host net spec.\n"); } break; case 'D': options.depth = strtol(optarg, nullptr, 10); if (options.depth == 0) { options.depth = UINT32_MAX; } break; case 'l': options.pkt_loss_delay_ms = strtol( optarg, nullptr, 10); if (options.pkt_loss_delay_ms == 0) { options.pkt_loss_delay_ms = UINT32_MAX; } break; default: usage(); rte_exit( EXIT_FAILURE, "unknown argument: %c\n", c); } } } if (!has_host_spec) { rte_exit(EXIT_FAILURE, "Must specify host IP.\n"); } // init nm if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) { rte_exit(EXIT_FAILURE, "nm init failed!\n"); } dump_options(); nb_ports = rte_eth_dev_count_avail(); if (nb_ports == 0) { rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); } uint16_t portid = rte_eth_find_next(0); if (portid == RTE_MAX_ETHPORTS) { rte_exit(EXIT_FAILURE, "cannot find an available port\n"); } options.s_portid = portid; if (rte_eth_macaddr_get(portid, &options.s_host_spec.mac_addr) != 0) { rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid); } // create a mbuf memory pool on the socket mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid)); if (mbuf_pool == nullptr) { rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); } options.mbuf_pool = mbuf_pool; uint64_t cmask = options.cpu_mask; for (unsigned int i = 0; i < options.s_num_threads; i++) { tinfo = new thread_info; tinfo->ia_gen = createGenerator(options.ia_gen); tinfo->load_gen = createGenerator(options.ld_gen); if (tinfo->ia_gen == nullptr || tinfo->load_gen == nullptr) { rte_exit( EXIT_FAILURE, "invalid ia_gen or ld_gen string\n"); } tinfo->ia_gen->set_lambda((double)options.target_qps / (double)(options.s_num_threads)); tinfo->id = i; tinfo->lcore_id = cmask_get_next_cpu(&cmask); tinfo->rxqid = i; tinfo->txqid = i; options.s_thr_info.push_back(tinfo); } if (port_init(portid, mbuf_pool) != 0) { rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); } ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, options.s_host_spec.mac_addr.addr_bytes[0], options.s_host_spec.mac_addr.addr_bytes[1], options.s_host_spec.mac_addr.addr_bytes[2], options.s_host_spec.mac_addr.addr_bytes[3], options.s_host_spec.mac_addr.addr_bytes[4], options.s_host_spec.mac_addr.addr_bytes[5]); sleep(INIT_DELAY); for (unsigned int i = 0; i < options.s_num_threads; i++) { tinfo = options.s_thr_info.at(i); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread %d on locore %d\n", tinfo->id, tinfo->lcore_id); if (rte_eal_remote_launch(locore_main, (void *)options.s_thr_info.at(i), tinfo->lcore_id) != 0) { rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", tinfo->lcore_id); } } // poor man's timer uint32_t second = 0; // this loop exit is signaled by SYNC_FIN in slave mode and by itself in // non slave mode while (options.s_state.load() != STATE_FIN) { if (options.slave_mode != 1) { if (second >= options.run_time) { options.s_state.store(STATE_FIN); break; } usleep(1 * S2US); second++; } } for (unsigned int i = 0; i < options.s_num_threads; i++) { tinfo = options.s_thr_info.at(i); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: waiting for locore %d...\n", tinfo->lcore_id); if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) { rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", tinfo->lcore_id); } } uint32_t qps; uint32_t total_pkts; uint32_t total_loss; calc_stats(nm_get_uptime_ns(), &qps, &total_pkts, &total_loss); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "qps = %d, total = %d, loss = %d\n", qps, total_pkts, total_loss); for (auto each : options.s_thr_info) { delete each->load_gen; delete each->ia_gen; delete each; } // clean up rte_eth_dev_stop(portid); return 0; }