diff --git a/cat/cat.cc b/cat/cat.cc index f727ccc..101bbcc 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -11,68 +11,117 @@ #include #include #include +#include +#include #include #include #include #include +#include "generator.h" #include "ntrlog.h" #include "pkt.h" -#include "rte_byteorder.h" -#include "rte_ip.h" +#include "util.h" // init NTRLOG NTR_DECL_IMPL; -constexpr unsigned int MBUF_MAX_COUNT = 8191; -constexpr unsigned int MBUF_CACHE_SIZE = 250; -constexpr unsigned int RX_RING_SIZE = 1024; -constexpr unsigned int TX_RING_SIZE = 1024; -constexpr unsigned int RX_RING_NUM = 1; -constexpr unsigned int TX_RING_NUM = 1; -constexpr unsigned int BURST_SIZE = 32; +constexpr static unsigned int MBUF_MAX_COUNT = 16384; +constexpr static unsigned int MBUF_CACHE_SIZE = 512; +constexpr static unsigned int RX_RING_SIZE = 4096; +constexpr static unsigned int TX_RING_SIZE = 4096; +constexpr static unsigned int BURST_SIZE = 32; + +constexpr static unsigned int MODE_MASTER = 0; +constexpr static unsigned int MODE_CLIENT = 1; static const struct rte_eth_conf port_conf_default{}; -struct datapt{ - uint64_t server_proc = 0; - uint64_t rtt = 0; +struct sendpt { + uint32_t epoch; + uint32_t valid; + uint64_t clt_hw_tx; + uint64_t clt_sw_tx; +}; + + +struct recvpt { + uint32_t epoch; + uint32_t valid; + uint64_t clt_hw_rx; + uint64_t clt_sw_rx; + uint64_t srv_hw_tx; + uint64_t srv_sw_tx; + uint64_t srv_hw_rx; + uint64_t srv_sw_rx; +}; + +struct thread_info { + unsigned int id; + unsigned int rxqid{0}; + unsigned int txqid{0}; + std::vector send_data; + std::vector recv_data; + unsigned int tot_send{0}; + unsigned int tot_recv{0}; + Generator * ia_gen; }; struct options_t { - unsigned int run_time = 5; - unsigned int warmup_time = 0; + unsigned int run_time{5}; + unsigned int warmup_time{0}; + unsigned int num_threads{1}; + unsigned int mode{MODE_MASTER}; char output[256] = "output.txt"; + char ia_gen[256] = "fixed:1"; struct rte_ether_addr server_mac; + uint64_t cpu_mask; // states - std::atomic s_stop {false}; - std::atomic s_record {false}; - std::vector s_stats; - struct rte_mempool * s_mbuf_pool; - uint16_t s_portid; + struct rte_mempool * mbuf_pool; struct rte_ether_addr s_host_mac; + uint16_t s_portid; + std::vector s_thr_info; + std::atomic s_epoch; + std::atomic s_stop {false}; + std::atomic s_record {0}; }; -struct options_t options; +static struct options_t options; static uint16_t -rx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused, +rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { - // XXX: need to get the timestamp in every loop? uint64_t now = rte_rdtsc(); - struct packet_data * pkt_data; + struct pkt_hdr * pkt_data; + struct timespec ts; + int ret; for (int i = 0; i < nb_pkts; i++) { pkt_data = check_valid_packet(pkts[i]); if (pkt_data == NULL) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_calc_latency: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); continue; } - - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now); - pkt_data->clt_ts_rx = rte_cpu_to_be_64(now); + + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) { + pkts[i]->userdata = nullptr; + if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { + // has hw rx timestamp + struct recvpt * datapt = new struct recvpt; + datapt->valid = options.s_record.load(); + datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; + datapt->clt_sw_rx = now; + pkts[i]->userdata = datapt; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, datapt->clt_hw_rx); + } else { + // leave as null + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); + } + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); + } } return nb_pkts; @@ -82,38 +131,46 @@ static uint16_t tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) { - // XXX: need to get the timestamp in every loop? uint64_t now = rte_rdtsc(); - struct packet_data * pkt_data; + struct pkt_hdr * pkt_data; for (int i = 0; i < nb_pkts; i++) { pkt_data = check_valid_packet(pkts[i]); if (pkt_data == NULL) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); - continue; + continue; } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now); - pkt_data->clt_ts_tx = rte_cpu_to_be_64(now); + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { + ((struct sendpt *)pkts[i]->userdata)->clt_sw_tx = now; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now); + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); + } } return nb_pkts; } -#define STATE_SEND (0) -#define STATE_RECV (1) - static int -locore_main(void * _unused __rte_unused) +locore_main(void * tif) { + struct thread_info * tinfo = (struct thread_info *)tif; struct rte_mbuf *tx_buf; struct rte_mbuf *rx_bufs[BURST_SIZE]; - struct packet_data *pkt_data; + struct pkt_hdr *pkt_data; uint32_t core_id = rte_lcore_id(); - uint32_t epoch = 0; - int state = STATE_SEND; + uint32_t epoch; + struct pkt_payload_epoch * pld_epoch; + struct pkt_payload_stat * pld_stat; + struct recvpt * recvpt; + int32_t ret; + struct sendpt * last_sendpt; + bool pending_hw_tx; + + uint64_t next_ts; // XXX: check link status instead sleep(1); @@ -123,72 +180,101 @@ locore_main(void * _unused __rte_unused) "not be optimal.\n", options.s_portid); } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id); - tx_buf = rte_pktmbuf_alloc(options.s_mbuf_pool); - - if (tx_buf == NULL) { - rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); - } - - pkt_data = construct_udp_pkt_hdr(tx_buf, - &options.s_host_mac, &options.server_mac, - RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), - 333, 319); - if (pkt_data == NULL) { - rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); - } - pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); + next_ts = get_time_us(); + pending_hw_tx = false; while(!options.s_stop.load()) { + uint64_t now = get_time_us(); // always pop incoming packets const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE); - if (nb_rx != 0) { - // only process packets when we are ready to receive + if (nb_rx > 0) { for (int i = 0; i < nb_rx; i++) { - struct packet_data * each = check_valid_packet(rx_bufs[i]); + struct pkt_hdr * each = check_valid_packet(rx_bufs[i]); if (each == NULL) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]); - dump_pkt(rx_bufs[i]); rte_pktmbuf_free(rx_bufs[i]); continue; } - if (rte_be_to_cpu_32(each->epoch) == epoch && state == STATE_RECV) { - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: received packet %p for epoch %d\n", (void*)rx_bufs[i], epoch); + uint16_t type = rte_be_to_cpu_16(each->type); - if (options.s_record.load()) { - // keep statistics - struct datapt * dpt = new datapt; - dpt->rtt = rte_be_to_cpu_64(each->clt_ts_rx) - rte_be_to_cpu_64(each->clt_ts_tx); - dpt->server_proc = rte_be_to_cpu_64(each->srv_ts_tx) - rte_be_to_cpu_64(each->srv_ts_rx); - options.s_stats.push_back(dpt); - } - - // bump the epoch and stop processing other packets - state = STATE_SEND; - epoch++; - } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: ignoring packet 0x%p with invalid epoch %d.\n", (void*)rx_bufs[i], epoch); + switch (type) { + case PKT_TYPE_RESP: + tinfo->tot_recv++; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, + rte_be_to_cpu_32(((struct pkt_payload_epoch *)(each->payload))->epoch)); + break; + case PKT_TYPE_STAT: + tinfo->tot_recv++; + recvpt = (struct recvpt *)rx_bufs[i]->userdata; + pld_stat = (struct pkt_payload_stat *)each->payload; + // keep stats + recvpt->epoch = rte_be_to_cpu_32(pld_stat->epoch); + recvpt->srv_hw_tx = rte_be_to_cpu_32(pld_stat->hw_tx); + recvpt->srv_hw_rx = rte_be_to_cpu_32(pld_stat->hw_rx); + recvpt->srv_sw_tx = rte_be_to_cpu_32(pld_stat->sw_tx); + recvpt->srv_sw_rx = rte_be_to_cpu_32(pld_stat->sw_rx); + tinfo->recv_data.push_back(recvpt); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, recvpt->epoch); + break; + default: + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with invalid type %d.\n", (void*)rx_bufs[i], type); + rte_pktmbuf_free(rx_bufs[i]); + continue; } rte_pktmbuf_free(rx_bufs[i]); } } - if (state == STATE_SEND) { - // set new epoch - pkt_data->epoch = rte_cpu_to_be_32(epoch); - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); + if (now >= next_ts) { //&& !pending_hw_tx) { + next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0); - const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1); + // generate the packet + tx_buf = rte_pktmbuf_alloc(options.mbuf_pool); - if (nb_tx < 1) { + if (tx_buf == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); + } + + pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE, + &options.s_host_mac, &options.server_mac, + RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), 319, 319); + if (pkt_data == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); + } + + pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload; + epoch = options.s_epoch.fetch_add(1); + pld_epoch->epoch = rte_cpu_to_be_32(epoch); + last_sendpt = new sendpt; + last_sendpt->epoch = 0; //epoch; + last_sendpt->valid = options.s_record.load(); + tx_buf->userdata = last_sendpt; + pending_hw_tx = true; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); + const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1); + + if (nb_tx != 1) { rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); } - state = STATE_RECV; + } + + if (pending_hw_tx) { + struct timespec ts; + if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS); + last_sendpt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; + tinfo->send_data.push_back(last_sendpt); + pending_hw_tx = false; + } else { + //ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp failed - %d.\n", ret); + } } } @@ -228,7 +314,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ - ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); + ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf); if (ret != 0) return ret; @@ -236,10 +322,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) if (ret != 0) return ret; - /* Allocate and set up 1 RX queue per Ethernet port. */ + /* Allocate and set up 1 RX queue per thread . */ rxconf = dev_info.default_rxconf; rxconf.offloads = port_conf.rxmode.offloads; - for (uint32_t i = 0; i < RX_RING_NUM; i++) { + for (uint32_t i = 0; i < options.num_threads; i++) { ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; @@ -248,7 +334,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; /* Allocate and set up 1 TX queue per Ethernet port. */ - for (uint32_t i = 0; i < TX_RING_NUM; i++) { + for (uint32_t i = 0; i < options.num_threads; i++) { ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); if (ret < 0) return ret; @@ -274,7 +360,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) return ret; rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL); - rte_eth_add_rx_callback(portid, 0, rx_calc_latency, NULL); + rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL); return 0; } @@ -306,15 +392,27 @@ static void usage() " -o: output filename\n" \ " -t: run time\n" \ " -T: warmup time\n" \ - " -s: server's mac\n\n" ); + " -s: server's mac\n" \ + " -A: affinity mask\n" \ + " -a: number of threads\n" \ + " -C: client mode\n" + " -i: inter-arrival time distribution\n\n"); } +// static void int_handler(int) +// { +// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); +// } int main(int argc, char* argv[]) { unsigned int nb_ports; - struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; + struct rte_mempool *mbuf_pool; std::ofstream log_file; + struct thread_info *tinfo; + + // signal(SIGINT, int_handler); + // init dpdk int ret = rte_eal_init(argc, argv); if (ret < 0) { @@ -329,7 +427,7 @@ int main(int argc, char* argv[]) { int c; // parse arguments - while((c = getopt(argc, argv, "hvo:t:T:s:")) != -1) { + while((c = getopt(argc, argv, "hvo:t:T:s:A:a:Ci:")) != -1) { switch (c) { case 'v': ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); @@ -352,6 +450,18 @@ int main(int argc, char* argv[]) case 'o': strncpy(options.output, optarg, sizeof(options.output) - 1); break; + case 'A': + options.cpu_mask = atoll(optarg); + break; + case 'a': + options.num_threads = atoi(optarg); + break; + case 'C': + options.mode = MODE_CLIENT; + break; + case 'i': + strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1); + break; default: usage(); rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c); @@ -371,24 +481,27 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); } - // create a mbuf memory pool on the socket - mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - if (mbuf_pool == nullptr) { - rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); - } - - mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - if (mbuf_pool_pkt == nullptr) { - rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); - } - options.s_mbuf_pool = mbuf_pool_pkt; - uint16_t portid = rte_eth_find_next(0); if (portid == RTE_MAX_ETHPORTS) { rte_exit(EXIT_FAILURE, "cannot find an available port\n"); } options.s_portid = portid; + + // create a mbuf memory pool on the socket + mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid)); + if (mbuf_pool == nullptr) { + rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); + } + options.mbuf_pool = mbuf_pool; + + for(int i = 0; i < 1; i++) { + tinfo = new thread_info; + tinfo->id = i; + tinfo->ia_gen = createGenerator(options.ia_gen); + options.s_thr_info.push_back(tinfo); + } + if (port_init(portid, mbuf_pool) != 0) { rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); } @@ -407,11 +520,11 @@ int main(int argc, char* argv[]) dump_options(); - uint16_t core_id = rte_get_next_lcore(0, true, false); - sleep(1); - if (rte_eal_remote_launch(locore_main, NULL, core_id) != 0) { + uint16_t core_id = rte_get_next_lcore(0, true, false); + + if (rte_eal_remote_launch(locore_main, options.s_thr_info.at(0), core_id) != 0) { rte_exit(EXIT_FAILURE, "failed to launch function on locore\n"); } @@ -423,7 +536,7 @@ int main(int argc, char* argv[]) uint32_t second = 0; while(true) { if (second >= options.warmup_time) { - options.s_record.store(true); + options.s_record.store(1); } if (second >= options.run_time + options.warmup_time) { options.s_stop.store(true); @@ -437,10 +550,10 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "failed to wait for job completion\n"); // dump stats - for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) { - log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl; - delete *it; - } + // for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) { + // log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl; + // delete *it; + // } log_file.close(); // clean up diff --git a/cat/generator.cc b/cat/generator.cc new file mode 100644 index 0000000..335b4e4 --- /dev/null +++ b/cat/generator.cc @@ -0,0 +1,74 @@ +// modified from mutilate + +#include "generator.h" + +Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); } + +Generator* createFacebookValue() { + Generator* g = new GPareto(15.0, 214.476, 0.348238); + + Discrete* d = new Discrete(g); + d->add(0.00536, 0.0); + d->add(0.00047, 1.0); + d->add(0.17820, 2.0); + d->add(0.09239, 3.0); + d->add(0.00018, 4.0); + d->add(0.02740, 5.0); + d->add(0.00065, 6.0); + d->add(0.00606, 7.0); + d->add(0.00023, 8.0); + d->add(0.00837, 9.0); + d->add(0.00837, 10.0); + d->add(0.08989, 11.0); + d->add(0.00092, 12.0); + d->add(0.00326, 13.0); + d->add(0.01980, 14.0); + + return d; +} + +Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); } + +Generator* createGenerator(std::string str) { + if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey(); + else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue(); + else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA(); + + char *s_copy = new char[str.length() + 1]; + strcpy(s_copy, str.c_str()); + char *saveptr = NULL; + + if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) { + double v = atof(s_copy); + delete[] s_copy; + return new Fixed(v); + } + + char *t_ptr = strtok_r(s_copy, ":", &saveptr); + char *a_ptr = strtok_r(NULL, ":", &saveptr); + + if (t_ptr == NULL) // || a_ptr == NULL) + DIE("strtok(.., \":\") failed to parse %s", str.c_str()); + + saveptr = NULL; + char *s1 = strtok_r(a_ptr, ",", &saveptr); + char *s2 = strtok_r(NULL, ",", &saveptr); + char *s3 = strtok_r(NULL, ",", &saveptr); + + double a1 = s1 ? atof(s1) : 0.0; + double a2 = s2 ? atof(s2) : 0.0; + double a3 = s3 ? atof(s3) : 0.0; + + delete[] s_copy; + + if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1); + else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2); + else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1); + else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3); + else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3); + else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1); + + DIE("Unable to create Generator '%s'", str.c_str()); + + return NULL; +} \ No newline at end of file diff --git a/cat/generator.h b/cat/generator.h new file mode 100644 index 0000000..1b52cce --- /dev/null +++ b/cat/generator.h @@ -0,0 +1,237 @@ +// modified from mutilate +// -*- c++ -*- + +// 1. implement "fixed" generator +// 2. implement discrete generator +// 3. implement combine generator? + +#ifndef GENERATOR_H +#define GENERATOR_H + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "util.h" + +#define D(fmt, ...) +#define DIE(fmt, ...) (void)0; + +#define FNV_64_PRIME (0x100000001b3ULL) +#define FNV1_64_INIT (0xcbf29ce484222325ULL) +static inline uint64_t fnv_64_buf(const void* buf, size_t len) { + uint64_t hval = FNV1_64_INIT; + + unsigned char *bp = (unsigned char *)buf; /* start of buffer */ + unsigned char *be = bp + len; /* beyond end of buffer */ + + while (bp < be) { + hval ^= (uint64_t)*bp++; + hval *= FNV_64_PRIME; + } + + return hval; +} + +static inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); } + + +// Generator syntax: +// +// \d+ == fixed +// n[ormal]:mean,sd +// e[xponential]:lambda +// p[areto]:scale,shape +// g[ev]:loc,scale,shape +// fb_value, fb_key, fb_rate + +class Generator { +public: + Generator() {} + // Generator(const Generator &g) = delete; + // virtual Generator& operator=(const Generator &g) = delete; + virtual ~Generator() {} + + virtual double generate(double U = -1.0) = 0; + virtual void set_lambda(double) {DIE("set_lambda() not implemented");} +protected: + std::string type; +}; + +class Fixed : public Generator { +public: + Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); } + virtual double generate(double) { return value; } + virtual void set_lambda(double lambda) { + if (lambda > 0.0) value = 1.0 / lambda; + else value = 0.0; + } + +private: + double value; +}; + +class Uniform : public Generator { +public: + Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return scale * U; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) scale = 2.0 / lambda; + else scale = 0.0; + } + +private: + double scale; +}; + +class Normal : public Generator { +public: + Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) { + D("Normal(mean=%f, sd=%f)", mean, sd); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + double V = U; // drand48(); + double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V); + return mean + sd * N; + } + + virtual void set_lambda(double lambda) { + if (lambda > 0.0) mean = 1.0 / lambda; + else mean = 0.0; + } + +private: + double mean, sd; +}; + +class Exponential : public Generator { +public: + Exponential(double _lambda = 1.0) : lambda(_lambda) { + D("Exponential(lambda=%f)", lambda); + } + + virtual double generate(double U = -1.0) { + if (lambda <= 0.0) return 0.0; + if (U < 0.0) U = drand48(); + return -log(U) / lambda; + } + + virtual void set_lambda(double lambda) { this->lambda = lambda; } + +private: + double lambda; +}; + +class GPareto : public Generator { +public: + GPareto(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) : + loc(_loc), scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GPareto(loc=%f, scale=%f, shape=%f)", loc, scale, shape); + } + + virtual double generate(double U = -1.0) { + if (U < 0.0) U = drand48(); + return loc + scale * (pow(U, -shape) - 1) / shape; + } + + virtual void set_lambda(double lambda) { + if (lambda <= 0.0) scale = 0.0; + else scale = (1 - shape) / lambda - (1 - shape) * loc; + } + +private: + double loc /* mu */; + double scale /* sigma */, shape /* k */; +}; + +class GEV : public Generator { +public: + GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) : + e(1.0), loc(_loc), scale(_scale), shape(_shape) { + assert(shape != 0.0); + D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape); + } + + virtual double generate(double U = -1.0) { + return loc + scale * (pow(e.generate(U), -shape) - 1) / shape; + } + +private: + Exponential e; + double loc /* mu */, scale /* sigma */, shape /* k */; +}; + +class Discrete : public Generator { +public: + ~Discrete() { delete def; } + Discrete(Generator* _def = NULL) : def(_def) { + if (def == NULL) def = new Fixed(0.0); + } + + virtual double generate(double U = -1.0) { + double Uc = U; + if (pv.size() > 0 && U < 0.0) U = drand48(); + + double sum = 0; + + for (auto p: pv) { + sum += p.first; + if (U < sum) return p.second; + } + + return def->generate(Uc); + } + + void add(double p, double v) { + pv.push_back(std::pair(p, v)); + } + +private: + Generator *def; + std::vector< std::pair > pv; +}; + +class KeyGenerator { +public: + KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {} + std::string generate(uint64_t ind) { + uint64_t h = fnv_64(ind); + double U = (double) h / (double)ULLONG_MAX; + double G = g->generate(U); + int keylen = MAX(round(G), floor(log10(max)) + 1); + char key[256]; + snprintf(key, 256, "%0*" PRIu64, keylen, ind); + + // D("%d = %s", ind, key); + return std::string(key); + } +private: + Generator* g; + double max; +}; + +Generator* createGenerator(std::string str); +Generator* createFacebookKey(); +Generator* createFacebookValue(); +Generator* createFacebookIA(); + +#endif // GENERATOR_H \ No newline at end of file diff --git a/inc/pkt.h b/inc/pkt.h index 8fbddbd..4bcbcc8 100644 --- a/inc/pkt.h +++ b/inc/pkt.h @@ -20,23 +20,49 @@ constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5; -struct packet_hdr { +struct ptp_hdr { + uint8_t ptp_msg_type; + uint8_t ptp_ver; + uint8_t unused[34]; +} __attribute__((packed)); + +struct pkt_proto_hdr { struct rte_ether_hdr eth_hdr; struct rte_ipv4_hdr ipv4_hdr; struct rte_udp_hdr udp_hdr; - uint8_t ptp_msg_type; - uint8_t ptp_ver; + struct ptp_hdr ptp_hdr; } __attribute__((packed)); -struct packet_data -{ - struct packet_hdr pkt_hdr; +struct pkt_hdr { + struct pkt_proto_hdr hdr; + uint16_t type; uint32_t magic; + char payload[0]; +} __attribute__((packed)); + +constexpr static uint16_t PKT_TYPE_LOAD = 0; +constexpr static uint16_t PKT_TYPE_PROBE = 1; +constexpr static uint16_t PKT_TYPE_RESP = 2; +struct pkt_payload_epoch { uint32_t epoch; - uint64_t clt_ts_tx; - uint64_t clt_ts_rx; - uint64_t srv_ts_tx; - uint64_t srv_ts_rx; +}; + +constexpr static uint16_t PKT_TYPE_STAT = 3; +struct pkt_payload_stat { + uint32_t epoch; + uint64_t hw_rx; + uint64_t hw_tx; + uint64_t sw_rx; + uint64_t sw_tx; +}; + +constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_STAT + 1; +// for fast packet verification +static const uint32_t expected_payload_size[NUM_PKT_TYPES] { + sizeof(struct pkt_payload_epoch), // LOAD + sizeof(struct pkt_payload_epoch), // PROBE + sizeof(struct pkt_payload_epoch), // RESP + sizeof(struct pkt_payload_stat) // STAT }; static inline void @@ -105,14 +131,17 @@ dump_pkt(struct rte_mbuf *pkt) } + +// fills the packet with the information except for the payload itself static inline -struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf, +struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type, struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) { rte_pktmbuf_reset(buf); - struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data)); + const uint32_t total_sz = sizeof(struct pkt_hdr) + expected_payload_size[type]; + struct pkt_hdr * pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz); struct rte_ether_hdr * eth_hdr; struct rte_ipv4_hdr * ipv4_hdr; struct rte_udp_hdr * udp_hdr; @@ -124,14 +153,14 @@ struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf, buf->nb_segs = 1; // construct l2 header - eth_hdr = &pkt_data->pkt_hdr.eth_hdr; + eth_hdr = &pkt_data->hdr.eth_hdr; rte_ether_addr_copy(src_mac, ð_hdr->s_addr); rte_ether_addr_copy(dst_mac, ð_hdr->d_addr); eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4); buf->l2_len = sizeof(struct rte_ether_hdr); // construct l3 header - ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr; + ipv4_hdr = &pkt_data->hdr.ipv4_hdr; memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr)); ipv4_hdr->version_ihl = IP_VHL_DEF; ipv4_hdr->type_of_service = 0; @@ -141,46 +170,62 @@ struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf, ipv4_hdr->packet_id = 0; ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip); ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip); - ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr)); + ipv4_hdr->total_length = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr)); ipv4_hdr->hdr_checksum = 0; buf->l3_len = sizeof(struct rte_ipv4_hdr); // construct l4 header - udp_hdr = &pkt_data->pkt_hdr.udp_hdr; + udp_hdr = &pkt_data->hdr.udp_hdr; udp_hdr->src_port = rte_cpu_to_be_16(src_port); udp_hdr->dst_port = rte_cpu_to_be_16(dst_port); udp_hdr->dgram_cksum = 0; /* No UDP checksum. */ - udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) - + udp_hdr->dgram_len = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr) - sizeof(struct rte_udp_hdr)); buf->l4_len = sizeof(struct rte_udp_hdr); - // construct ptp header - // so basically after some experiments at least the intel NIC categorizes PTP packets as: - // 1. Dest port 319 2. these two fields must be set and don't care about others - // Experiments: - // SPORT: 319 DPORT: 319 PTP HDR: valid => SET - // SPORT: 333 DPORT: 319 PTP HDR: valid => SET - // SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET - // SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET - pkt_data->pkt_hdr.ptp_ver = 0x2; // VER 2 - pkt_data->pkt_hdr.ptp_msg_type = 0x0; // SYNC + /* construct ptp header + * so basically after some experiments at least the intel NIC categorizes PTP packets as: + * 1. Dest port 319 2. these two fields must be set and don't care about others + * Experiments: + * SPORT: 319 DPORT: 319 PTP HDR: valid => SET + * SPORT: 333 DPORT: 319 PTP HDR: valid => SET + * SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET + * SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET */ + pkt_data->hdr.ptp_hdr.ptp_msg_type = 0x0; // SYNC + pkt_data->type = rte_cpu_to_be_16(type); + pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); + if (type == PKT_TYPE_PROBE) { + // only timestamp PROBE pkts + pkt_data->hdr.ptp_hdr.ptp_ver = 0x2; // VER 2 + buf->ol_flags |= PKT_TX_IEEE1588_TMST; + } else { + pkt_data->hdr.ptp_hdr.ptp_ver = 0xff; + } return pkt_data; } static inline -struct packet_data * check_valid_packet(struct rte_mbuf * pkt) +struct pkt_hdr * check_valid_packet(struct rte_mbuf * pkt) { - struct packet_data * pkt_data = NULL; + struct pkt_hdr * pkt_data = NULL; + const uint32_t data_len = rte_pktmbuf_data_len(pkt); - if (rte_pktmbuf_data_len(pkt) < sizeof(struct packet_data)) { + if (data_len < sizeof(struct pkt_hdr)) { return NULL; } - pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *); + pkt_data = rte_pktmbuf_mtod(pkt, struct pkt_hdr *); - if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) { + // check MAGIC + if (rte_be_to_cpu_32(pkt_data->magic) != ETHER_FRAME_MAGIC) { + return NULL; + } + + // check type and payload size + if ((rte_be_to_cpu_16(pkt_data->type) < NUM_PKT_TYPES) && + (data_len >= (sizeof(struct pkt_hdr) + expected_payload_size[rte_be_to_cpu_16(pkt_data->type)]))) { return pkt_data; } diff --git a/inc/util.h b/inc/util.h new file mode 100644 index 0000000..9eca27a --- /dev/null +++ b/inc/util.h @@ -0,0 +1,17 @@ +#pragma once +#include +#include +#include + +constexpr static unsigned int S2NS = 100000000UL; +constexpr static uint16_t SERVER_LOAD_PORT = 1234; +constexpr static uint16_t SERVER_PROBE_PORT = 319; +constexpr static uint32_t SERVER_IP = RTE_IPV4(192,168,123,0); + +static inline uint64_t +get_time_us() +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return ts.tv_sec * 1000000 + ts.tv_nsec / 1000; +} diff --git a/khat/khat.cc b/khat/khat.cc index 61125ec..fc31c0c 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -15,72 +16,126 @@ #include #include #include +#include #include "pkt.h" #include "ntrlog.h" -#include "rte_arp.h" -#include "rte_mbuf_core.h" +#include "util.h" NTR_DECL_IMPL; -constexpr unsigned int MBUF_MAX_COUNT = 8191; -constexpr unsigned int MBUF_CACHE_SIZE = 250; -constexpr unsigned int RX_RING_SIZE = 1024; -constexpr unsigned int TX_RING_SIZE = 1024; -constexpr unsigned int RX_RING_NUM = 1; -constexpr unsigned int TX_RING_NUM = 1; -constexpr unsigned int BURST_SIZE = 32; +constexpr static unsigned int MBUF_MAX_COUNT = 16384; +constexpr static unsigned int MBUF_CACHE_SIZE = 512; +constexpr static unsigned int RX_RING_SIZE = 4096; +constexpr static unsigned int TX_RING_SIZE = 4096; +constexpr static unsigned int BURST_SIZE = 32; + static const struct rte_eth_conf port_conf_default{}; +// keep track of the probe state +// when a probe packet first arrives this state is set to be influx and the rte_mbuf's userdata is set to PROBE_MAGIC +// which prevents other probe packets to be processed +// when the server sends the probe stats back to user influx is released +// this is to guarantee that the server only processes one probe packet at the time +// XXX: also this can be attached to the mbuf itself and processed by the lcore thread +// I kept this global because globally there could be only one pending probe request +// and rx_add_timestamp can save their shit here too +struct probe_state_t { + struct pkt_proto_hdr hdr; + uint32_t epoch; + uint32_t timesync; + uint64_t last_sw_rx; + uint64_t last_sw_tx; + uint64_t last_hw_rx; + uint64_t last_hw_tx; +}; + struct options_t { //states uint16_t s_portid; struct rte_ether_addr s_host_mac; struct rte_mempool * s_pkt_mempool; + std::atomic s_probe_influx{0}; + struct probe_state_t s_probe_info; }; -struct options_t options; +static struct options_t options; static uint16_t rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { uint64_t now = rte_rdtsc(); - struct packet_data * pkt_data; + struct timespec ts; + struct pkt_hdr * pkt_data; for (int i = 0; i < nb_pkts; i++) { pkt_data = check_valid_packet(pkts[i]); if (pkt_data == NULL) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]); - continue; + continue; } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now); - pkt_data->srv_ts_rx = rte_cpu_to_be_64(now); + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { + int one = 0; + pkts[i]->userdata = nullptr; + if (rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3) == 0) { + if (options.s_probe_influx.compare_exchange_strong(one, 1)) { + // mark the mbuf as probe packet being processed + pkts[i]->userdata = &options.s_probe_info; + // jot down some information and stats + options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + options.s_probe_info.last_hw_rx = ts.tv_nsec + ts.tv_sec * S2NS; + options.s_probe_info.last_sw_rx = now; + options.s_probe_info.timesync = pkts[i]->timesync; + // copy protocol header + rte_memcpy(&options.s_probe_info.hdr, &pkt_data->hdr, sizeof(struct pkt_proto_hdr)); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p epoch %d with sw: %llu hw:%llu.\n", (void*)pkts[i], options.s_probe_info.epoch, now, options.s_probe_info.last_hw_rx); + } else + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - another probe packet influx.\n", (void*)pkts[i]); + } else + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n", (void*)pkts[i]); + } else + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); } return nb_pkts; } static uint16_t -tx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused, +tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) { uint64_t now = rte_rdtsc(); - struct packet_data * pkt_data; + struct pkt_hdr * pkt_data; for (int i = 0; i < nb_pkts; i++) { pkt_data = check_valid_packet(pkts[i]); if (pkt_data == NULL) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_calc_latency: ignoring invalid packet %p.\n", (void*)pkts[i]); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]); continue; } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now); - pkt_data->srv_ts_tx = rte_cpu_to_be_64(now); + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) { + // this packet is the response to PROBE packets + + // at this time the packet is not sent to the NIC yet so + // the state must be influx + // XXX: this should be an assert + if(options.s_probe_influx.load() != 1 || pkts[i]->userdata == nullptr) { + rte_exit(EXIT_FAILURE, "packet %p sent to NIC before sw callback\n", (void*)pkts[i]); + } + + // just tag the option + ((struct probe_state_t *)pkts[i]->userdata)->last_sw_tx = now; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw tx %llu\n", (void*)pkts[i], options.s_probe_info.last_sw_tx); + } else { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d\n", (void*)pkts[i], pkt_data->type); + } } return nb_pkts; @@ -90,10 +145,15 @@ static int locore_main(void * _unused __rte_unused) { struct rte_mbuf *bufs[BURST_SIZE]; - struct rte_mbuf *tx_bufs[BURST_SIZE]; - struct packet_data *pkt_data; + // + 1 because it might involve an extra PKT_TYPE_STAT packet + // when all tx timestamps are ready + struct rte_mbuf *tx_bufs[BURST_SIZE + 1]; + struct pkt_hdr *pkt_data; + struct probe_state_t *probe_state; uint32_t core_id = rte_lcore_id(); + bool pending_hw_txts = false; + if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " "polling thread.\n\tPerformance will " @@ -105,13 +165,12 @@ locore_main(void * _unused __rte_unused) while(true) { uint16_t nb_tx = 0; const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE); + struct rte_mbuf * pkt_buf; + struct pkt_hdr * tx_data; - if (nb_rx == 0) { - continue; - } - for(int i = 0; i < nb_rx; i++) { - + // XXX: optimization: in rx_add_timestamp every packet is already validated once + // can just mark valid packet with a value so we can avoid this redundant check pkt_data = check_valid_packet(bufs[i]); if (pkt_data == NULL) { @@ -121,83 +180,122 @@ locore_main(void * _unused __rte_unused) continue; } - uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr); - uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr); - uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port); - uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port); - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n", + uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.dst_addr); + uint32_t src_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.src_addr); + uint16_t src_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.src_port); + uint16_t dst_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.dst_port); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, type %d\n", core_id, (void*)bufs[i], (src_ip >> 24) & 0xff, (src_ip >> 16) & 0xff, (src_ip >> 8) & 0xff, (src_ip >> 0) & 0xff, - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0], - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1], - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2], - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3], - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4], - pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[0], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[1], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[2], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[3], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[4], + pkt_data->hdr.eth_hdr.s_addr.addr_bytes[5], (dst_ip >> 24) & 0xff, (dst_ip >> 16) & 0xff, (dst_ip >> 8) & 0xff, (dst_ip >> 0) & 0xff, - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0], - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1], - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2], - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3], - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4], - pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[0], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[1], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[2], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[3], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[4], + pkt_data->hdr.eth_hdr.d_addr.addr_bytes[5], src_port, dst_port, - rte_be_to_cpu_32(pkt_data->epoch)); - //if (bufs[i]->ol_flags & PKT_RX_IEEE1588_TMST) { - struct timespec ts; - if (rte_eth_timesync_read_rx_timestamp(options.s_portid, &ts, bufs[i]->timesync & 0x3) == 0) { + rte_be_to_cpu_16(pkt_data->type)); + + switch (rte_be_to_cpu_16(pkt_data->type)) { + case PKT_TYPE_PROBE: + if (bufs[i]->userdata != nullptr) { + probe_state = (struct probe_state_t *)bufs[i]->userdata; + pending_hw_txts = true; + } + break; + case PKT_TYPE_LOAD: + // swap s_addr and d_addr + // XXX: can we avoid this allocation? + pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); + if (pkt_buf == NULL) { + rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + } - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Timestamped sec: %lld nsec: %lld\n", ts.tv_sec, ts.tv_nsec); - } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Not Timestamped!\n"); - } - // } - // else { - // ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Not Timestamped!\n"); - // } - - // swap s_addr and d_addr - struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); - if (pkt_buf == NULL) { - rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf"); - } + tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_RESP, + &options.s_host_mac, + &pkt_data->hdr.eth_hdr.s_addr, + dst_ip, + src_ip, + dst_port, + src_port); + + if (tx_data == NULL) { + rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); + } - struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf, - &options.s_host_mac, - &pkt_data->pkt_hdr.eth_hdr.s_addr, - dst_ip, - src_ip, - dst_port, - src_port); - if (tx_data == NULL) { - rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); + // endianess doesn't matter + rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch)); + tx_data->magic = pkt_data->magic; + + // queue for burst send + tx_bufs[nb_tx++] = pkt_buf; + // free rx packet + rte_pktmbuf_free(bufs[i]); + break; + default: + continue; } - // copy, endianess doesn't matter - tx_data->epoch = pkt_data->epoch; - tx_data->magic = pkt_data->magic; - tx_data->clt_ts_rx = pkt_data->clt_ts_rx; - tx_data->clt_ts_tx = pkt_data->clt_ts_tx; - tx_data->srv_ts_rx = pkt_data->srv_ts_rx; - tx_data->srv_ts_tx = pkt_data->srv_ts_tx; - // queue for burst send - tx_bufs[nb_tx++] = pkt_buf; - // free rx packet - rte_pktmbuf_free(bufs[i]); } - const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); - // cleanup unsent packets - // don't need to free others because it's offloaded - if (nb_tx_succ < nb_tx) { - rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); + // send the packets + if (nb_tx > 0) { + const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); + if (nb_tx_succ < nb_tx) { + rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); + } + } + + // we wanna check every loop not only when there are packets + if (pending_hw_txts) { + struct timespec ts; + struct pkt_payload_stat * stat; + if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: obtained hw tx timestamp %lld.\n", ts.tv_sec * S2NS + ts.tv_nsec); + // now we have everything we need + pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); + if (pkt_buf == NULL) { + rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + } + + tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT, + &options.s_host_mac, + &probe_state->hdr.eth_hdr.s_addr, + rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.dst_addr), + rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.src_addr), + rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port), + rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port)); + + // populate stats + stat = (struct pkt_payload_stat *)tx_data->payload; + stat->epoch = rte_cpu_to_be_64(probe_state->epoch); + stat->hw_rx = rte_cpu_to_be_64(probe_state->last_hw_rx); + stat->hw_tx = rte_cpu_to_be_64(probe_state->last_hw_tx); + stat->sw_rx = rte_cpu_to_be_64(probe_state->last_sw_rx); + stat->sw_tx = rte_cpu_to_be_64(probe_state->last_sw_tx); + tx_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); + + // queue for burst send + tx_bufs[nb_tx++] = pkt_buf; + + // release flux + pending_hw_txts = false; + options.s_probe_influx.store(0); + } } } @@ -232,7 +330,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ - ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); + ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); if (ret != 0) return ret; @@ -242,7 +340,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) /* Allocate and set up 1 RX queue per Ethernet port. */ rxconf = dev_info.default_rxconf; - for (uint32_t i = 0; i < RX_RING_NUM; i++) { + for (uint32_t i = 0; i < 1; i++) { ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; @@ -251,7 +349,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; /* Allocate and set up 1 TX queue per Ethernet port. */ - for (uint32_t i = 0; i < TX_RING_NUM; i++) { + for (uint32_t i = 0; i < 1; i++) { ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); if (ret < 0) return ret; @@ -276,7 +374,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) if (ret != 0) return ret; - if (rte_eth_add_tx_callback(portid, 0, tx_calc_latency, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) { + if (rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) { return -1; } @@ -291,11 +389,18 @@ static void usage() " -h: display the information\n"); } +// static void int_handler(int) +// { +// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); +// } + int main(int argc, char* argv[]) { unsigned int nb_ports; struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; + //signal(SIGINT, int_handler); + // init dpdk int ret = rte_eal_init(argc, argv); if (ret < 0) {