stuff
This commit is contained in:
parent
73c70a5c52
commit
855b9cf714
329
cat/cat.cc
329
cat/cat.cc
@ -11,68 +11,117 @@
|
|||||||
#include <rte_ether.h>
|
#include <rte_ether.h>
|
||||||
#include <rte_launch.h>
|
#include <rte_launch.h>
|
||||||
#include <rte_log.h>
|
#include <rte_log.h>
|
||||||
|
#include <rte_byteorder.h>
|
||||||
|
#include <rte_ip.h>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "generator.h"
|
||||||
#include "ntrlog.h"
|
#include "ntrlog.h"
|
||||||
#include "pkt.h"
|
#include "pkt.h"
|
||||||
#include "rte_byteorder.h"
|
#include "util.h"
|
||||||
#include "rte_ip.h"
|
|
||||||
|
|
||||||
// init NTRLOG
|
// init NTRLOG
|
||||||
NTR_DECL_IMPL;
|
NTR_DECL_IMPL;
|
||||||
|
|
||||||
constexpr unsigned int MBUF_MAX_COUNT = 8191;
|
constexpr static unsigned int MBUF_MAX_COUNT = 16384;
|
||||||
constexpr unsigned int MBUF_CACHE_SIZE = 250;
|
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
|
||||||
constexpr unsigned int RX_RING_SIZE = 1024;
|
constexpr static unsigned int RX_RING_SIZE = 4096;
|
||||||
constexpr unsigned int TX_RING_SIZE = 1024;
|
constexpr static unsigned int TX_RING_SIZE = 4096;
|
||||||
constexpr unsigned int RX_RING_NUM = 1;
|
constexpr static unsigned int BURST_SIZE = 32;
|
||||||
constexpr unsigned int TX_RING_NUM = 1;
|
|
||||||
constexpr unsigned int BURST_SIZE = 32;
|
constexpr static unsigned int MODE_MASTER = 0;
|
||||||
|
constexpr static unsigned int MODE_CLIENT = 1;
|
||||||
|
|
||||||
static const struct rte_eth_conf port_conf_default{};
|
static const struct rte_eth_conf port_conf_default{};
|
||||||
|
|
||||||
struct datapt{
|
struct sendpt {
|
||||||
uint64_t server_proc = 0;
|
uint32_t epoch;
|
||||||
uint64_t rtt = 0;
|
uint32_t valid;
|
||||||
|
uint64_t clt_hw_tx;
|
||||||
|
uint64_t clt_sw_tx;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct recvpt {
|
||||||
|
uint32_t epoch;
|
||||||
|
uint32_t valid;
|
||||||
|
uint64_t clt_hw_rx;
|
||||||
|
uint64_t clt_sw_rx;
|
||||||
|
uint64_t srv_hw_tx;
|
||||||
|
uint64_t srv_sw_tx;
|
||||||
|
uint64_t srv_hw_rx;
|
||||||
|
uint64_t srv_sw_rx;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct thread_info {
|
||||||
|
unsigned int id;
|
||||||
|
unsigned int rxqid{0};
|
||||||
|
unsigned int txqid{0};
|
||||||
|
std::vector<struct sendpt *> send_data;
|
||||||
|
std::vector<struct recvpt *> recv_data;
|
||||||
|
unsigned int tot_send{0};
|
||||||
|
unsigned int tot_recv{0};
|
||||||
|
Generator * ia_gen;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct options_t {
|
struct options_t {
|
||||||
unsigned int run_time = 5;
|
unsigned int run_time{5};
|
||||||
unsigned int warmup_time = 0;
|
unsigned int warmup_time{0};
|
||||||
|
unsigned int num_threads{1};
|
||||||
|
unsigned int mode{MODE_MASTER};
|
||||||
char output[256] = "output.txt";
|
char output[256] = "output.txt";
|
||||||
|
char ia_gen[256] = "fixed:1";
|
||||||
struct rte_ether_addr server_mac;
|
struct rte_ether_addr server_mac;
|
||||||
|
uint64_t cpu_mask;
|
||||||
// states
|
// states
|
||||||
std::atomic<bool> s_stop {false};
|
struct rte_mempool * mbuf_pool;
|
||||||
std::atomic<bool> s_record {false};
|
|
||||||
std::vector<struct datapt *> s_stats;
|
|
||||||
struct rte_mempool * s_mbuf_pool;
|
|
||||||
uint16_t s_portid;
|
|
||||||
struct rte_ether_addr s_host_mac;
|
struct rte_ether_addr s_host_mac;
|
||||||
|
uint16_t s_portid;
|
||||||
|
std::vector<struct thread_info *> s_thr_info;
|
||||||
|
std::atomic<uint32_t> s_epoch;
|
||||||
|
std::atomic<bool> s_stop {false};
|
||||||
|
std::atomic<uint32_t> s_record {0};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct options_t options;
|
static struct options_t options;
|
||||||
|
|
||||||
static uint16_t
|
static uint16_t
|
||||||
rx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||||
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
|
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
|
||||||
{
|
{
|
||||||
// XXX: need to get the timestamp in every loop?
|
|
||||||
uint64_t now = rte_rdtsc();
|
uint64_t now = rte_rdtsc();
|
||||||
struct packet_data * pkt_data;
|
struct pkt_hdr * pkt_data;
|
||||||
|
struct timespec ts;
|
||||||
|
int ret;
|
||||||
|
|
||||||
for (int i = 0; i < nb_pkts; i++) {
|
for (int i = 0; i < nb_pkts; i++) {
|
||||||
pkt_data = check_valid_packet(pkts[i]);
|
pkt_data = check_valid_packet(pkts[i]);
|
||||||
|
|
||||||
if (pkt_data == NULL) {
|
if (pkt_data == NULL) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_calc_latency: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now);
|
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) {
|
||||||
pkt_data->clt_ts_rx = rte_cpu_to_be_64(now);
|
pkts[i]->userdata = nullptr;
|
||||||
|
if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) {
|
||||||
|
// has hw rx timestamp
|
||||||
|
struct recvpt * datapt = new struct recvpt;
|
||||||
|
datapt->valid = options.s_record.load();
|
||||||
|
datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
|
||||||
|
datapt->clt_sw_rx = now;
|
||||||
|
pkts[i]->userdata = datapt;
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, datapt->clt_hw_rx);
|
||||||
|
} else {
|
||||||
|
// leave as null
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nb_pkts;
|
return nb_pkts;
|
||||||
@ -82,38 +131,46 @@ static uint16_t
|
|||||||
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||||
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
|
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
|
||||||
{
|
{
|
||||||
// XXX: need to get the timestamp in every loop?
|
|
||||||
uint64_t now = rte_rdtsc();
|
uint64_t now = rte_rdtsc();
|
||||||
struct packet_data * pkt_data;
|
struct pkt_hdr * pkt_data;
|
||||||
|
|
||||||
for (int i = 0; i < nb_pkts; i++) {
|
for (int i = 0; i < nb_pkts; i++) {
|
||||||
pkt_data = check_valid_packet(pkts[i]);
|
pkt_data = check_valid_packet(pkts[i]);
|
||||||
|
|
||||||
if (pkt_data == NULL) {
|
if (pkt_data == NULL) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now);
|
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
|
||||||
pkt_data->clt_ts_tx = rte_cpu_to_be_64(now);
|
((struct sendpt *)pkts[i]->userdata)->clt_sw_tx = now;
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now);
|
||||||
|
} else {
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nb_pkts;
|
return nb_pkts;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define STATE_SEND (0)
|
|
||||||
#define STATE_RECV (1)
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
locore_main(void * _unused __rte_unused)
|
locore_main(void * tif)
|
||||||
{
|
{
|
||||||
|
struct thread_info * tinfo = (struct thread_info *)tif;
|
||||||
struct rte_mbuf *tx_buf;
|
struct rte_mbuf *tx_buf;
|
||||||
struct rte_mbuf *rx_bufs[BURST_SIZE];
|
struct rte_mbuf *rx_bufs[BURST_SIZE];
|
||||||
struct packet_data *pkt_data;
|
struct pkt_hdr *pkt_data;
|
||||||
uint32_t core_id = rte_lcore_id();
|
uint32_t core_id = rte_lcore_id();
|
||||||
uint32_t epoch = 0;
|
uint32_t epoch;
|
||||||
int state = STATE_SEND;
|
struct pkt_payload_epoch * pld_epoch;
|
||||||
|
struct pkt_payload_stat * pld_stat;
|
||||||
|
struct recvpt * recvpt;
|
||||||
|
int32_t ret;
|
||||||
|
|
||||||
|
struct sendpt * last_sendpt;
|
||||||
|
bool pending_hw_tx;
|
||||||
|
|
||||||
|
uint64_t next_ts;
|
||||||
// XXX: check link status instead
|
// XXX: check link status instead
|
||||||
|
|
||||||
sleep(1);
|
sleep(1);
|
||||||
@ -123,72 +180,101 @@ locore_main(void * _unused __rte_unused)
|
|||||||
"not be optimal.\n", options.s_portid);
|
"not be optimal.\n", options.s_portid);
|
||||||
}
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id);
|
||||||
|
|
||||||
tx_buf = rte_pktmbuf_alloc(options.s_mbuf_pool);
|
next_ts = get_time_us();
|
||||||
|
pending_hw_tx = false;
|
||||||
if (tx_buf == NULL) {
|
|
||||||
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
pkt_data = construct_udp_pkt_hdr(tx_buf,
|
|
||||||
&options.s_host_mac, &options.server_mac,
|
|
||||||
RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151),
|
|
||||||
333, 319);
|
|
||||||
if (pkt_data == NULL) {
|
|
||||||
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
|
|
||||||
}
|
|
||||||
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
|
||||||
|
|
||||||
while(!options.s_stop.load()) {
|
while(!options.s_stop.load()) {
|
||||||
|
uint64_t now = get_time_us();
|
||||||
// always pop incoming packets
|
// always pop incoming packets
|
||||||
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
|
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
|
||||||
|
|
||||||
if (nb_rx != 0) {
|
if (nb_rx > 0) {
|
||||||
// only process packets when we are ready to receive
|
|
||||||
for (int i = 0; i < nb_rx; i++) {
|
for (int i = 0; i < nb_rx; i++) {
|
||||||
struct packet_data * each = check_valid_packet(rx_bufs[i]);
|
struct pkt_hdr * each = check_valid_packet(rx_bufs[i]);
|
||||||
|
|
||||||
if (each == NULL) {
|
if (each == NULL) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
|
||||||
dump_pkt(rx_bufs[i]);
|
|
||||||
rte_pktmbuf_free(rx_bufs[i]);
|
rte_pktmbuf_free(rx_bufs[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rte_be_to_cpu_32(each->epoch) == epoch && state == STATE_RECV) {
|
uint16_t type = rte_be_to_cpu_16(each->type);
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: received packet %p for epoch %d\n", (void*)rx_bufs[i], epoch);
|
|
||||||
|
|
||||||
if (options.s_record.load()) {
|
switch (type) {
|
||||||
// keep statistics
|
case PKT_TYPE_RESP:
|
||||||
struct datapt * dpt = new datapt;
|
tinfo->tot_recv++;
|
||||||
dpt->rtt = rte_be_to_cpu_64(each->clt_ts_rx) - rte_be_to_cpu_64(each->clt_ts_tx);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type,
|
||||||
dpt->server_proc = rte_be_to_cpu_64(each->srv_ts_tx) - rte_be_to_cpu_64(each->srv_ts_rx);
|
rte_be_to_cpu_32(((struct pkt_payload_epoch *)(each->payload))->epoch));
|
||||||
options.s_stats.push_back(dpt);
|
break;
|
||||||
}
|
case PKT_TYPE_STAT:
|
||||||
|
tinfo->tot_recv++;
|
||||||
// bump the epoch and stop processing other packets
|
recvpt = (struct recvpt *)rx_bufs[i]->userdata;
|
||||||
state = STATE_SEND;
|
pld_stat = (struct pkt_payload_stat *)each->payload;
|
||||||
epoch++;
|
// keep stats
|
||||||
} else {
|
recvpt->epoch = rte_be_to_cpu_32(pld_stat->epoch);
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: ignoring packet 0x%p with invalid epoch %d.\n", (void*)rx_bufs[i], epoch);
|
recvpt->srv_hw_tx = rte_be_to_cpu_32(pld_stat->hw_tx);
|
||||||
|
recvpt->srv_hw_rx = rte_be_to_cpu_32(pld_stat->hw_rx);
|
||||||
|
recvpt->srv_sw_tx = rte_be_to_cpu_32(pld_stat->sw_tx);
|
||||||
|
recvpt->srv_sw_rx = rte_be_to_cpu_32(pld_stat->sw_rx);
|
||||||
|
tinfo->recv_data.push_back(recvpt);
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, recvpt->epoch);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with invalid type %d.\n", (void*)rx_bufs[i], type);
|
||||||
|
rte_pktmbuf_free(rx_bufs[i]);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
rte_pktmbuf_free(rx_bufs[i]);
|
rte_pktmbuf_free(rx_bufs[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == STATE_SEND) {
|
if (now >= next_ts) { //&& !pending_hw_tx) {
|
||||||
// set new epoch
|
next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0);
|
||||||
pkt_data->epoch = rte_cpu_to_be_32(epoch);
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
|
|
||||||
|
|
||||||
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1);
|
// generate the packet
|
||||||
|
tx_buf = rte_pktmbuf_alloc(options.mbuf_pool);
|
||||||
|
|
||||||
if (nb_tx < 1) {
|
if (tx_buf == NULL) {
|
||||||
|
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE,
|
||||||
|
&options.s_host_mac, &options.server_mac,
|
||||||
|
RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), 319, 319);
|
||||||
|
if (pkt_data == NULL) {
|
||||||
|
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
|
||||||
|
epoch = options.s_epoch.fetch_add(1);
|
||||||
|
pld_epoch->epoch = rte_cpu_to_be_32(epoch);
|
||||||
|
last_sendpt = new sendpt;
|
||||||
|
last_sendpt->epoch = 0; //epoch;
|
||||||
|
last_sendpt->valid = options.s_record.load();
|
||||||
|
tx_buf->userdata = last_sendpt;
|
||||||
|
pending_hw_tx = true;
|
||||||
|
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
|
||||||
|
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1);
|
||||||
|
|
||||||
|
if (nb_tx != 1) {
|
||||||
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
|
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
|
||||||
}
|
}
|
||||||
state = STATE_RECV;
|
}
|
||||||
|
|
||||||
|
if (pending_hw_tx) {
|
||||||
|
struct timespec ts;
|
||||||
|
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS);
|
||||||
|
last_sendpt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
|
||||||
|
tinfo->send_data.push_back(last_sendpt);
|
||||||
|
pending_hw_tx = false;
|
||||||
|
} else {
|
||||||
|
//ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp failed - %d.\n", ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,7 +314,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
|
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
|
||||||
|
|
||||||
/* Configure the Ethernet device. */
|
/* Configure the Ethernet device. */
|
||||||
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
|
ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf);
|
||||||
if (ret != 0)
|
if (ret != 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
@ -236,10 +322,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
if (ret != 0)
|
if (ret != 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
/* Allocate and set up 1 RX queue per Ethernet port. */
|
/* Allocate and set up 1 RX queue per thread . */
|
||||||
rxconf = dev_info.default_rxconf;
|
rxconf = dev_info.default_rxconf;
|
||||||
rxconf.offloads = port_conf.rxmode.offloads;
|
rxconf.offloads = port_conf.rxmode.offloads;
|
||||||
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
|
for (uint32_t i = 0; i < options.num_threads; i++) {
|
||||||
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
|
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
@ -248,7 +334,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
txconf = dev_info.default_txconf;
|
txconf = dev_info.default_txconf;
|
||||||
txconf.offloads = port_conf.txmode.offloads;
|
txconf.offloads = port_conf.txmode.offloads;
|
||||||
/* Allocate and set up 1 TX queue per Ethernet port. */
|
/* Allocate and set up 1 TX queue per Ethernet port. */
|
||||||
for (uint32_t i = 0; i < TX_RING_NUM; i++) {
|
for (uint32_t i = 0; i < options.num_threads; i++) {
|
||||||
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
|
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
@ -274,7 +360,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
|
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
|
||||||
rte_eth_add_rx_callback(portid, 0, rx_calc_latency, NULL);
|
rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -306,15 +392,27 @@ static void usage()
|
|||||||
" -o: output filename\n" \
|
" -o: output filename\n" \
|
||||||
" -t: run time\n" \
|
" -t: run time\n" \
|
||||||
" -T: warmup time\n" \
|
" -T: warmup time\n" \
|
||||||
" -s: server's mac\n\n" );
|
" -s: server's mac\n" \
|
||||||
|
" -A: affinity mask\n" \
|
||||||
|
" -a: number of threads\n" \
|
||||||
|
" -C: client mode\n"
|
||||||
|
" -i: inter-arrival time distribution\n\n");
|
||||||
}
|
}
|
||||||
|
// static void int_handler(int)
|
||||||
|
// {
|
||||||
|
// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n");
|
||||||
|
// }
|
||||||
|
|
||||||
int main(int argc, char* argv[])
|
int main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
unsigned int nb_ports;
|
unsigned int nb_ports;
|
||||||
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt;
|
struct rte_mempool *mbuf_pool;
|
||||||
std::ofstream log_file;
|
std::ofstream log_file;
|
||||||
|
struct thread_info *tinfo;
|
||||||
|
|
||||||
|
|
||||||
|
// signal(SIGINT, int_handler);
|
||||||
|
|
||||||
// init dpdk
|
// init dpdk
|
||||||
int ret = rte_eal_init(argc, argv);
|
int ret = rte_eal_init(argc, argv);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
@ -329,7 +427,7 @@ int main(int argc, char* argv[])
|
|||||||
{
|
{
|
||||||
int c;
|
int c;
|
||||||
// parse arguments
|
// parse arguments
|
||||||
while((c = getopt(argc, argv, "hvo:t:T:s:")) != -1) {
|
while((c = getopt(argc, argv, "hvo:t:T:s:A:a:Ci:")) != -1) {
|
||||||
switch (c) {
|
switch (c) {
|
||||||
case 'v':
|
case 'v':
|
||||||
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
|
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
|
||||||
@ -352,6 +450,18 @@ int main(int argc, char* argv[])
|
|||||||
case 'o':
|
case 'o':
|
||||||
strncpy(options.output, optarg, sizeof(options.output) - 1);
|
strncpy(options.output, optarg, sizeof(options.output) - 1);
|
||||||
break;
|
break;
|
||||||
|
case 'A':
|
||||||
|
options.cpu_mask = atoll(optarg);
|
||||||
|
break;
|
||||||
|
case 'a':
|
||||||
|
options.num_threads = atoi(optarg);
|
||||||
|
break;
|
||||||
|
case 'C':
|
||||||
|
options.mode = MODE_CLIENT;
|
||||||
|
break;
|
||||||
|
case 'i':
|
||||||
|
strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
usage();
|
usage();
|
||||||
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
|
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
|
||||||
@ -371,24 +481,27 @@ int main(int argc, char* argv[])
|
|||||||
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
|
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a mbuf memory pool on the socket
|
|
||||||
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
|
|
||||||
if (mbuf_pool == nullptr) {
|
|
||||||
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
|
|
||||||
if (mbuf_pool_pkt == nullptr) {
|
|
||||||
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
|
|
||||||
}
|
|
||||||
options.s_mbuf_pool = mbuf_pool_pkt;
|
|
||||||
|
|
||||||
uint16_t portid = rte_eth_find_next(0);
|
uint16_t portid = rte_eth_find_next(0);
|
||||||
if (portid == RTE_MAX_ETHPORTS) {
|
if (portid == RTE_MAX_ETHPORTS) {
|
||||||
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
|
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
|
||||||
}
|
}
|
||||||
options.s_portid = portid;
|
options.s_portid = portid;
|
||||||
|
|
||||||
|
|
||||||
|
// create a mbuf memory pool on the socket
|
||||||
|
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid));
|
||||||
|
if (mbuf_pool == nullptr) {
|
||||||
|
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
|
||||||
|
}
|
||||||
|
options.mbuf_pool = mbuf_pool;
|
||||||
|
|
||||||
|
for(int i = 0; i < 1; i++) {
|
||||||
|
tinfo = new thread_info;
|
||||||
|
tinfo->id = i;
|
||||||
|
tinfo->ia_gen = createGenerator(options.ia_gen);
|
||||||
|
options.s_thr_info.push_back(tinfo);
|
||||||
|
}
|
||||||
|
|
||||||
if (port_init(portid, mbuf_pool) != 0) {
|
if (port_init(portid, mbuf_pool) != 0) {
|
||||||
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
|
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
|
||||||
}
|
}
|
||||||
@ -407,11 +520,11 @@ int main(int argc, char* argv[])
|
|||||||
|
|
||||||
dump_options();
|
dump_options();
|
||||||
|
|
||||||
uint16_t core_id = rte_get_next_lcore(0, true, false);
|
|
||||||
|
|
||||||
sleep(1);
|
sleep(1);
|
||||||
|
|
||||||
if (rte_eal_remote_launch(locore_main, NULL, core_id) != 0) {
|
uint16_t core_id = rte_get_next_lcore(0, true, false);
|
||||||
|
|
||||||
|
if (rte_eal_remote_launch(locore_main, options.s_thr_info.at(0), core_id) != 0) {
|
||||||
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
|
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -423,7 +536,7 @@ int main(int argc, char* argv[])
|
|||||||
uint32_t second = 0;
|
uint32_t second = 0;
|
||||||
while(true) {
|
while(true) {
|
||||||
if (second >= options.warmup_time) {
|
if (second >= options.warmup_time) {
|
||||||
options.s_record.store(true);
|
options.s_record.store(1);
|
||||||
}
|
}
|
||||||
if (second >= options.run_time + options.warmup_time) {
|
if (second >= options.run_time + options.warmup_time) {
|
||||||
options.s_stop.store(true);
|
options.s_stop.store(true);
|
||||||
@ -437,10 +550,10 @@ int main(int argc, char* argv[])
|
|||||||
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
|
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
|
||||||
|
|
||||||
// dump stats
|
// dump stats
|
||||||
for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) {
|
// for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) {
|
||||||
log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl;
|
// log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl;
|
||||||
delete *it;
|
// delete *it;
|
||||||
}
|
// }
|
||||||
log_file.close();
|
log_file.close();
|
||||||
|
|
||||||
// clean up
|
// clean up
|
||||||
|
74
cat/generator.cc
Normal file
74
cat/generator.cc
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
// modified from mutilate
|
||||||
|
|
||||||
|
#include "generator.h"
|
||||||
|
|
||||||
|
Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); }
|
||||||
|
|
||||||
|
Generator* createFacebookValue() {
|
||||||
|
Generator* g = new GPareto(15.0, 214.476, 0.348238);
|
||||||
|
|
||||||
|
Discrete* d = new Discrete(g);
|
||||||
|
d->add(0.00536, 0.0);
|
||||||
|
d->add(0.00047, 1.0);
|
||||||
|
d->add(0.17820, 2.0);
|
||||||
|
d->add(0.09239, 3.0);
|
||||||
|
d->add(0.00018, 4.0);
|
||||||
|
d->add(0.02740, 5.0);
|
||||||
|
d->add(0.00065, 6.0);
|
||||||
|
d->add(0.00606, 7.0);
|
||||||
|
d->add(0.00023, 8.0);
|
||||||
|
d->add(0.00837, 9.0);
|
||||||
|
d->add(0.00837, 10.0);
|
||||||
|
d->add(0.08989, 11.0);
|
||||||
|
d->add(0.00092, 12.0);
|
||||||
|
d->add(0.00326, 13.0);
|
||||||
|
d->add(0.01980, 14.0);
|
||||||
|
|
||||||
|
return d;
|
||||||
|
}
|
||||||
|
|
||||||
|
Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); }
|
||||||
|
|
||||||
|
Generator* createGenerator(std::string str) {
|
||||||
|
if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey();
|
||||||
|
else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue();
|
||||||
|
else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA();
|
||||||
|
|
||||||
|
char *s_copy = new char[str.length() + 1];
|
||||||
|
strcpy(s_copy, str.c_str());
|
||||||
|
char *saveptr = NULL;
|
||||||
|
|
||||||
|
if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) {
|
||||||
|
double v = atof(s_copy);
|
||||||
|
delete[] s_copy;
|
||||||
|
return new Fixed(v);
|
||||||
|
}
|
||||||
|
|
||||||
|
char *t_ptr = strtok_r(s_copy, ":", &saveptr);
|
||||||
|
char *a_ptr = strtok_r(NULL, ":", &saveptr);
|
||||||
|
|
||||||
|
if (t_ptr == NULL) // || a_ptr == NULL)
|
||||||
|
DIE("strtok(.., \":\") failed to parse %s", str.c_str());
|
||||||
|
|
||||||
|
saveptr = NULL;
|
||||||
|
char *s1 = strtok_r(a_ptr, ",", &saveptr);
|
||||||
|
char *s2 = strtok_r(NULL, ",", &saveptr);
|
||||||
|
char *s3 = strtok_r(NULL, ",", &saveptr);
|
||||||
|
|
||||||
|
double a1 = s1 ? atof(s1) : 0.0;
|
||||||
|
double a2 = s2 ? atof(s2) : 0.0;
|
||||||
|
double a3 = s3 ? atof(s3) : 0.0;
|
||||||
|
|
||||||
|
delete[] s_copy;
|
||||||
|
|
||||||
|
if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1);
|
||||||
|
else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2);
|
||||||
|
else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1);
|
||||||
|
else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3);
|
||||||
|
else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3);
|
||||||
|
else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1);
|
||||||
|
|
||||||
|
DIE("Unable to create Generator '%s'", str.c_str());
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
237
cat/generator.h
Normal file
237
cat/generator.h
Normal file
@ -0,0 +1,237 @@
|
|||||||
|
// modified from mutilate
|
||||||
|
// -*- c++ -*-
|
||||||
|
|
||||||
|
// 1. implement "fixed" generator
|
||||||
|
// 2. implement discrete generator
|
||||||
|
// 3. implement combine generator?
|
||||||
|
|
||||||
|
#ifndef GENERATOR_H
|
||||||
|
#define GENERATOR_H
|
||||||
|
|
||||||
|
#include <netinet/in.h>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
#include <inttypes.h>
|
||||||
|
#include <limits.h>
|
||||||
|
#include <math.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/param.h>
|
||||||
|
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
|
#define D(fmt, ...)
|
||||||
|
#define DIE(fmt, ...) (void)0;
|
||||||
|
|
||||||
|
#define FNV_64_PRIME (0x100000001b3ULL)
|
||||||
|
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
|
||||||
|
static inline uint64_t fnv_64_buf(const void* buf, size_t len) {
|
||||||
|
uint64_t hval = FNV1_64_INIT;
|
||||||
|
|
||||||
|
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
|
||||||
|
unsigned char *be = bp + len; /* beyond end of buffer */
|
||||||
|
|
||||||
|
while (bp < be) {
|
||||||
|
hval ^= (uint64_t)*bp++;
|
||||||
|
hval *= FNV_64_PRIME;
|
||||||
|
}
|
||||||
|
|
||||||
|
return hval;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); }
|
||||||
|
|
||||||
|
|
||||||
|
// Generator syntax:
|
||||||
|
//
|
||||||
|
// \d+ == fixed
|
||||||
|
// n[ormal]:mean,sd
|
||||||
|
// e[xponential]:lambda
|
||||||
|
// p[areto]:scale,shape
|
||||||
|
// g[ev]:loc,scale,shape
|
||||||
|
// fb_value, fb_key, fb_rate
|
||||||
|
|
||||||
|
class Generator {
|
||||||
|
public:
|
||||||
|
Generator() {}
|
||||||
|
// Generator(const Generator &g) = delete;
|
||||||
|
// virtual Generator& operator=(const Generator &g) = delete;
|
||||||
|
virtual ~Generator() {}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) = 0;
|
||||||
|
virtual void set_lambda(double) {DIE("set_lambda() not implemented");}
|
||||||
|
protected:
|
||||||
|
std::string type;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Fixed : public Generator {
|
||||||
|
public:
|
||||||
|
Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); }
|
||||||
|
virtual double generate(double) { return value; }
|
||||||
|
virtual void set_lambda(double lambda) {
|
||||||
|
if (lambda > 0.0) value = 1.0 / lambda;
|
||||||
|
else value = 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
double value;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Uniform : public Generator {
|
||||||
|
public:
|
||||||
|
Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); }
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
if (U < 0.0) U = drand48();
|
||||||
|
return scale * U;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void set_lambda(double lambda) {
|
||||||
|
if (lambda > 0.0) scale = 2.0 / lambda;
|
||||||
|
else scale = 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
double scale;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Normal : public Generator {
|
||||||
|
public:
|
||||||
|
Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) {
|
||||||
|
D("Normal(mean=%f, sd=%f)", mean, sd);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
if (U < 0.0) U = drand48();
|
||||||
|
double V = U; // drand48();
|
||||||
|
double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V);
|
||||||
|
return mean + sd * N;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void set_lambda(double lambda) {
|
||||||
|
if (lambda > 0.0) mean = 1.0 / lambda;
|
||||||
|
else mean = 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
double mean, sd;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Exponential : public Generator {
|
||||||
|
public:
|
||||||
|
Exponential(double _lambda = 1.0) : lambda(_lambda) {
|
||||||
|
D("Exponential(lambda=%f)", lambda);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
if (lambda <= 0.0) return 0.0;
|
||||||
|
if (U < 0.0) U = drand48();
|
||||||
|
return -log(U) / lambda;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void set_lambda(double lambda) { this->lambda = lambda; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
double lambda;
|
||||||
|
};
|
||||||
|
|
||||||
|
class GPareto : public Generator {
|
||||||
|
public:
|
||||||
|
GPareto(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) :
|
||||||
|
loc(_loc), scale(_scale), shape(_shape) {
|
||||||
|
assert(shape != 0.0);
|
||||||
|
D("GPareto(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
if (U < 0.0) U = drand48();
|
||||||
|
return loc + scale * (pow(U, -shape) - 1) / shape;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void set_lambda(double lambda) {
|
||||||
|
if (lambda <= 0.0) scale = 0.0;
|
||||||
|
else scale = (1 - shape) / lambda - (1 - shape) * loc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
double loc /* mu */;
|
||||||
|
double scale /* sigma */, shape /* k */;
|
||||||
|
};
|
||||||
|
|
||||||
|
class GEV : public Generator {
|
||||||
|
public:
|
||||||
|
GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) :
|
||||||
|
e(1.0), loc(_loc), scale(_scale), shape(_shape) {
|
||||||
|
assert(shape != 0.0);
|
||||||
|
D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
return loc + scale * (pow(e.generate(U), -shape) - 1) / shape;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Exponential e;
|
||||||
|
double loc /* mu */, scale /* sigma */, shape /* k */;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Discrete : public Generator {
|
||||||
|
public:
|
||||||
|
~Discrete() { delete def; }
|
||||||
|
Discrete(Generator* _def = NULL) : def(_def) {
|
||||||
|
if (def == NULL) def = new Fixed(0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual double generate(double U = -1.0) {
|
||||||
|
double Uc = U;
|
||||||
|
if (pv.size() > 0 && U < 0.0) U = drand48();
|
||||||
|
|
||||||
|
double sum = 0;
|
||||||
|
|
||||||
|
for (auto p: pv) {
|
||||||
|
sum += p.first;
|
||||||
|
if (U < sum) return p.second;
|
||||||
|
}
|
||||||
|
|
||||||
|
return def->generate(Uc);
|
||||||
|
}
|
||||||
|
|
||||||
|
void add(double p, double v) {
|
||||||
|
pv.push_back(std::pair<double,double>(p, v));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Generator *def;
|
||||||
|
std::vector< std::pair<double,double> > pv;
|
||||||
|
};
|
||||||
|
|
||||||
|
class KeyGenerator {
|
||||||
|
public:
|
||||||
|
KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {}
|
||||||
|
std::string generate(uint64_t ind) {
|
||||||
|
uint64_t h = fnv_64(ind);
|
||||||
|
double U = (double) h / (double)ULLONG_MAX;
|
||||||
|
double G = g->generate(U);
|
||||||
|
int keylen = MAX(round(G), floor(log10(max)) + 1);
|
||||||
|
char key[256];
|
||||||
|
snprintf(key, 256, "%0*" PRIu64, keylen, ind);
|
||||||
|
|
||||||
|
// D("%d = %s", ind, key);
|
||||||
|
return std::string(key);
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
Generator* g;
|
||||||
|
double max;
|
||||||
|
};
|
||||||
|
|
||||||
|
Generator* createGenerator(std::string str);
|
||||||
|
Generator* createFacebookKey();
|
||||||
|
Generator* createFacebookValue();
|
||||||
|
Generator* createFacebookIA();
|
||||||
|
|
||||||
|
#endif // GENERATOR_H
|
109
inc/pkt.h
109
inc/pkt.h
@ -20,23 +20,49 @@
|
|||||||
|
|
||||||
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
|
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
|
||||||
|
|
||||||
struct packet_hdr {
|
struct ptp_hdr {
|
||||||
|
uint8_t ptp_msg_type;
|
||||||
|
uint8_t ptp_ver;
|
||||||
|
uint8_t unused[34];
|
||||||
|
} __attribute__((packed));
|
||||||
|
|
||||||
|
struct pkt_proto_hdr {
|
||||||
struct rte_ether_hdr eth_hdr;
|
struct rte_ether_hdr eth_hdr;
|
||||||
struct rte_ipv4_hdr ipv4_hdr;
|
struct rte_ipv4_hdr ipv4_hdr;
|
||||||
struct rte_udp_hdr udp_hdr;
|
struct rte_udp_hdr udp_hdr;
|
||||||
uint8_t ptp_msg_type;
|
struct ptp_hdr ptp_hdr;
|
||||||
uint8_t ptp_ver;
|
|
||||||
} __attribute__((packed));
|
} __attribute__((packed));
|
||||||
|
|
||||||
struct packet_data
|
struct pkt_hdr {
|
||||||
{
|
struct pkt_proto_hdr hdr;
|
||||||
struct packet_hdr pkt_hdr;
|
uint16_t type;
|
||||||
uint32_t magic;
|
uint32_t magic;
|
||||||
|
char payload[0];
|
||||||
|
} __attribute__((packed));
|
||||||
|
|
||||||
|
constexpr static uint16_t PKT_TYPE_LOAD = 0;
|
||||||
|
constexpr static uint16_t PKT_TYPE_PROBE = 1;
|
||||||
|
constexpr static uint16_t PKT_TYPE_RESP = 2;
|
||||||
|
struct pkt_payload_epoch {
|
||||||
uint32_t epoch;
|
uint32_t epoch;
|
||||||
uint64_t clt_ts_tx;
|
};
|
||||||
uint64_t clt_ts_rx;
|
|
||||||
uint64_t srv_ts_tx;
|
constexpr static uint16_t PKT_TYPE_STAT = 3;
|
||||||
uint64_t srv_ts_rx;
|
struct pkt_payload_stat {
|
||||||
|
uint32_t epoch;
|
||||||
|
uint64_t hw_rx;
|
||||||
|
uint64_t hw_tx;
|
||||||
|
uint64_t sw_rx;
|
||||||
|
uint64_t sw_tx;
|
||||||
|
};
|
||||||
|
|
||||||
|
constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_STAT + 1;
|
||||||
|
// for fast packet verification
|
||||||
|
static const uint32_t expected_payload_size[NUM_PKT_TYPES] {
|
||||||
|
sizeof(struct pkt_payload_epoch), // LOAD
|
||||||
|
sizeof(struct pkt_payload_epoch), // PROBE
|
||||||
|
sizeof(struct pkt_payload_epoch), // RESP
|
||||||
|
sizeof(struct pkt_payload_stat) // STAT
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
@ -105,14 +131,17 @@ dump_pkt(struct rte_mbuf *pkt)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// fills the packet with the information except for the payload itself
|
||||||
static inline
|
static inline
|
||||||
struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
|
struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type,
|
||||||
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac,
|
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac,
|
||||||
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||||||
{
|
{
|
||||||
rte_pktmbuf_reset(buf);
|
rte_pktmbuf_reset(buf);
|
||||||
|
|
||||||
struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data));
|
const uint32_t total_sz = sizeof(struct pkt_hdr) + expected_payload_size[type];
|
||||||
|
struct pkt_hdr * pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz);
|
||||||
struct rte_ether_hdr * eth_hdr;
|
struct rte_ether_hdr * eth_hdr;
|
||||||
struct rte_ipv4_hdr * ipv4_hdr;
|
struct rte_ipv4_hdr * ipv4_hdr;
|
||||||
struct rte_udp_hdr * udp_hdr;
|
struct rte_udp_hdr * udp_hdr;
|
||||||
@ -124,14 +153,14 @@ struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
|
|||||||
buf->nb_segs = 1;
|
buf->nb_segs = 1;
|
||||||
|
|
||||||
// construct l2 header
|
// construct l2 header
|
||||||
eth_hdr = &pkt_data->pkt_hdr.eth_hdr;
|
eth_hdr = &pkt_data->hdr.eth_hdr;
|
||||||
rte_ether_addr_copy(src_mac, ð_hdr->s_addr);
|
rte_ether_addr_copy(src_mac, ð_hdr->s_addr);
|
||||||
rte_ether_addr_copy(dst_mac, ð_hdr->d_addr);
|
rte_ether_addr_copy(dst_mac, ð_hdr->d_addr);
|
||||||
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
|
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
|
||||||
buf->l2_len = sizeof(struct rte_ether_hdr);
|
buf->l2_len = sizeof(struct rte_ether_hdr);
|
||||||
|
|
||||||
// construct l3 header
|
// construct l3 header
|
||||||
ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr;
|
ipv4_hdr = &pkt_data->hdr.ipv4_hdr;
|
||||||
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
|
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
|
||||||
ipv4_hdr->version_ihl = IP_VHL_DEF;
|
ipv4_hdr->version_ihl = IP_VHL_DEF;
|
||||||
ipv4_hdr->type_of_service = 0;
|
ipv4_hdr->type_of_service = 0;
|
||||||
@ -141,46 +170,62 @@ struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
|
|||||||
ipv4_hdr->packet_id = 0;
|
ipv4_hdr->packet_id = 0;
|
||||||
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
|
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
|
||||||
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
|
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
|
||||||
ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr));
|
ipv4_hdr->total_length = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr));
|
||||||
ipv4_hdr->hdr_checksum = 0;
|
ipv4_hdr->hdr_checksum = 0;
|
||||||
buf->l3_len = sizeof(struct rte_ipv4_hdr);
|
buf->l3_len = sizeof(struct rte_ipv4_hdr);
|
||||||
|
|
||||||
// construct l4 header
|
// construct l4 header
|
||||||
udp_hdr = &pkt_data->pkt_hdr.udp_hdr;
|
udp_hdr = &pkt_data->hdr.udp_hdr;
|
||||||
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
|
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
|
||||||
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
|
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
|
||||||
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
|
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
|
||||||
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) -
|
udp_hdr->dgram_len = rte_cpu_to_be_16(total_sz -
|
||||||
sizeof(struct rte_ether_hdr) -
|
sizeof(struct rte_ether_hdr) -
|
||||||
sizeof(struct rte_udp_hdr));
|
sizeof(struct rte_udp_hdr));
|
||||||
buf->l4_len = sizeof(struct rte_udp_hdr);
|
buf->l4_len = sizeof(struct rte_udp_hdr);
|
||||||
|
|
||||||
// construct ptp header
|
/* construct ptp header
|
||||||
// so basically after some experiments at least the intel NIC categorizes PTP packets as:
|
* so basically after some experiments at least the intel NIC categorizes PTP packets as:
|
||||||
// 1. Dest port 319 2. these two fields must be set and don't care about others
|
* 1. Dest port 319 2. these two fields must be set and don't care about others
|
||||||
// Experiments:
|
* Experiments:
|
||||||
// SPORT: 319 DPORT: 319 PTP HDR: valid => SET
|
* SPORT: 319 DPORT: 319 PTP HDR: valid => SET
|
||||||
// SPORT: 333 DPORT: 319 PTP HDR: valid => SET
|
* SPORT: 333 DPORT: 319 PTP HDR: valid => SET
|
||||||
// SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET
|
* SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET
|
||||||
// SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET
|
* SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET */
|
||||||
pkt_data->pkt_hdr.ptp_ver = 0x2; // VER 2
|
pkt_data->hdr.ptp_hdr.ptp_msg_type = 0x0; // SYNC
|
||||||
pkt_data->pkt_hdr.ptp_msg_type = 0x0; // SYNC
|
pkt_data->type = rte_cpu_to_be_16(type);
|
||||||
|
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
||||||
|
if (type == PKT_TYPE_PROBE) {
|
||||||
|
// only timestamp PROBE pkts
|
||||||
|
pkt_data->hdr.ptp_hdr.ptp_ver = 0x2; // VER 2
|
||||||
|
buf->ol_flags |= PKT_TX_IEEE1588_TMST;
|
||||||
|
} else {
|
||||||
|
pkt_data->hdr.ptp_hdr.ptp_ver = 0xff;
|
||||||
|
}
|
||||||
|
|
||||||
return pkt_data;
|
return pkt_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline
|
static inline
|
||||||
struct packet_data * check_valid_packet(struct rte_mbuf * pkt)
|
struct pkt_hdr * check_valid_packet(struct rte_mbuf * pkt)
|
||||||
{
|
{
|
||||||
struct packet_data * pkt_data = NULL;
|
struct pkt_hdr * pkt_data = NULL;
|
||||||
|
const uint32_t data_len = rte_pktmbuf_data_len(pkt);
|
||||||
|
|
||||||
if (rte_pktmbuf_data_len(pkt) < sizeof(struct packet_data)) {
|
if (data_len < sizeof(struct pkt_hdr)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *);
|
pkt_data = rte_pktmbuf_mtod(pkt, struct pkt_hdr *);
|
||||||
|
|
||||||
if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) {
|
// check MAGIC
|
||||||
|
if (rte_be_to_cpu_32(pkt_data->magic) != ETHER_FRAME_MAGIC) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check type and payload size
|
||||||
|
if ((rte_be_to_cpu_16(pkt_data->type) < NUM_PKT_TYPES) &&
|
||||||
|
(data_len >= (sizeof(struct pkt_hdr) + expected_payload_size[rte_be_to_cpu_16(pkt_data->type)]))) {
|
||||||
return pkt_data;
|
return pkt_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
17
inc/util.h
Normal file
17
inc/util.h
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <rte_ip.h>
|
||||||
|
|
||||||
|
constexpr static unsigned int S2NS = 100000000UL;
|
||||||
|
constexpr static uint16_t SERVER_LOAD_PORT = 1234;
|
||||||
|
constexpr static uint16_t SERVER_PROBE_PORT = 319;
|
||||||
|
constexpr static uint32_t SERVER_IP = RTE_IPV4(192,168,123,0);
|
||||||
|
|
||||||
|
static inline uint64_t
|
||||||
|
get_time_us()
|
||||||
|
{
|
||||||
|
struct timespec ts;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &ts);
|
||||||
|
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
|
||||||
|
}
|
285
khat/khat.cc
285
khat/khat.cc
@ -1,4 +1,5 @@
|
|||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
|
#include <cassert>
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <rte_config.h>
|
#include <rte_config.h>
|
||||||
@ -15,72 +16,126 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <signal.h>
|
||||||
|
|
||||||
#include "pkt.h"
|
#include "pkt.h"
|
||||||
#include "ntrlog.h"
|
#include "ntrlog.h"
|
||||||
#include "rte_arp.h"
|
#include "util.h"
|
||||||
#include "rte_mbuf_core.h"
|
|
||||||
|
|
||||||
NTR_DECL_IMPL;
|
NTR_DECL_IMPL;
|
||||||
|
|
||||||
constexpr unsigned int MBUF_MAX_COUNT = 8191;
|
constexpr static unsigned int MBUF_MAX_COUNT = 16384;
|
||||||
constexpr unsigned int MBUF_CACHE_SIZE = 250;
|
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
|
||||||
constexpr unsigned int RX_RING_SIZE = 1024;
|
constexpr static unsigned int RX_RING_SIZE = 4096;
|
||||||
constexpr unsigned int TX_RING_SIZE = 1024;
|
constexpr static unsigned int TX_RING_SIZE = 4096;
|
||||||
constexpr unsigned int RX_RING_NUM = 1;
|
constexpr static unsigned int BURST_SIZE = 32;
|
||||||
constexpr unsigned int TX_RING_NUM = 1;
|
|
||||||
constexpr unsigned int BURST_SIZE = 32;
|
|
||||||
|
|
||||||
static const struct rte_eth_conf port_conf_default{};
|
static const struct rte_eth_conf port_conf_default{};
|
||||||
|
|
||||||
|
// keep track of the probe state
|
||||||
|
// when a probe packet first arrives this state is set to be influx and the rte_mbuf's userdata is set to PROBE_MAGIC
|
||||||
|
// which prevents other probe packets to be processed
|
||||||
|
// when the server sends the probe stats back to user influx is released
|
||||||
|
// this is to guarantee that the server only processes one probe packet at the time
|
||||||
|
// XXX: also this can be attached to the mbuf itself and processed by the lcore thread
|
||||||
|
// I kept this global because globally there could be only one pending probe request
|
||||||
|
// and rx_add_timestamp can save their shit here too
|
||||||
|
struct probe_state_t {
|
||||||
|
struct pkt_proto_hdr hdr;
|
||||||
|
uint32_t epoch;
|
||||||
|
uint32_t timesync;
|
||||||
|
uint64_t last_sw_rx;
|
||||||
|
uint64_t last_sw_tx;
|
||||||
|
uint64_t last_hw_rx;
|
||||||
|
uint64_t last_hw_tx;
|
||||||
|
};
|
||||||
|
|
||||||
struct options_t {
|
struct options_t {
|
||||||
//states
|
//states
|
||||||
uint16_t s_portid;
|
uint16_t s_portid;
|
||||||
struct rte_ether_addr s_host_mac;
|
struct rte_ether_addr s_host_mac;
|
||||||
struct rte_mempool * s_pkt_mempool;
|
struct rte_mempool * s_pkt_mempool;
|
||||||
|
std::atomic<int> s_probe_influx{0};
|
||||||
|
struct probe_state_t s_probe_info;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct options_t options;
|
static struct options_t options;
|
||||||
|
|
||||||
static uint16_t
|
static uint16_t
|
||||||
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||||
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
|
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
|
||||||
{
|
{
|
||||||
uint64_t now = rte_rdtsc();
|
uint64_t now = rte_rdtsc();
|
||||||
struct packet_data * pkt_data;
|
struct timespec ts;
|
||||||
|
struct pkt_hdr * pkt_data;
|
||||||
for (int i = 0; i < nb_pkts; i++) {
|
for (int i = 0; i < nb_pkts; i++) {
|
||||||
pkt_data = check_valid_packet(pkts[i]);
|
pkt_data = check_valid_packet(pkts[i]);
|
||||||
|
|
||||||
if (pkt_data == NULL) {
|
if (pkt_data == NULL) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now);
|
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
|
||||||
pkt_data->srv_ts_rx = rte_cpu_to_be_64(now);
|
int one = 0;
|
||||||
|
pkts[i]->userdata = nullptr;
|
||||||
|
if (rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3) == 0) {
|
||||||
|
if (options.s_probe_influx.compare_exchange_strong(one, 1)) {
|
||||||
|
// mark the mbuf as probe packet being processed
|
||||||
|
pkts[i]->userdata = &options.s_probe_info;
|
||||||
|
// jot down some information and stats
|
||||||
|
options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
|
||||||
|
options.s_probe_info.last_hw_rx = ts.tv_nsec + ts.tv_sec * S2NS;
|
||||||
|
options.s_probe_info.last_sw_rx = now;
|
||||||
|
options.s_probe_info.timesync = pkts[i]->timesync;
|
||||||
|
// copy protocol header
|
||||||
|
rte_memcpy(&options.s_probe_info.hdr, &pkt_data->hdr, sizeof(struct pkt_proto_hdr));
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p epoch %d with sw: %llu hw:%llu.\n", (void*)pkts[i], options.s_probe_info.epoch, now, options.s_probe_info.last_hw_rx);
|
||||||
|
} else
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - another probe packet influx.\n", (void*)pkts[i]);
|
||||||
|
} else
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n", (void*)pkts[i]);
|
||||||
|
} else
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
return nb_pkts;
|
return nb_pkts;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint16_t
|
static uint16_t
|
||||||
tx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||||
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
|
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
|
||||||
{
|
{
|
||||||
uint64_t now = rte_rdtsc();
|
uint64_t now = rte_rdtsc();
|
||||||
struct packet_data * pkt_data;
|
struct pkt_hdr * pkt_data;
|
||||||
|
|
||||||
for (int i = 0; i < nb_pkts; i++) {
|
for (int i = 0; i < nb_pkts; i++) {
|
||||||
|
|
||||||
pkt_data = check_valid_packet(pkts[i]);
|
pkt_data = check_valid_packet(pkts[i]);
|
||||||
|
|
||||||
if (pkt_data == NULL) {
|
if (pkt_data == NULL) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_calc_latency: ignoring invalid packet %p.\n", (void*)pkts[i]);
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now);
|
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) {
|
||||||
pkt_data->srv_ts_tx = rte_cpu_to_be_64(now);
|
// this packet is the response to PROBE packets
|
||||||
|
|
||||||
|
// at this time the packet is not sent to the NIC yet so
|
||||||
|
// the state must be influx
|
||||||
|
// XXX: this should be an assert
|
||||||
|
if(options.s_probe_influx.load() != 1 || pkts[i]->userdata == nullptr) {
|
||||||
|
rte_exit(EXIT_FAILURE, "packet %p sent to NIC before sw callback\n", (void*)pkts[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// just tag the option
|
||||||
|
((struct probe_state_t *)pkts[i]->userdata)->last_sw_tx = now;
|
||||||
|
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw tx %llu\n", (void*)pkts[i], options.s_probe_info.last_sw_tx);
|
||||||
|
} else {
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d\n", (void*)pkts[i], pkt_data->type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nb_pkts;
|
return nb_pkts;
|
||||||
@ -90,10 +145,15 @@ static int
|
|||||||
locore_main(void * _unused __rte_unused)
|
locore_main(void * _unused __rte_unused)
|
||||||
{
|
{
|
||||||
struct rte_mbuf *bufs[BURST_SIZE];
|
struct rte_mbuf *bufs[BURST_SIZE];
|
||||||
struct rte_mbuf *tx_bufs[BURST_SIZE];
|
// + 1 because it might involve an extra PKT_TYPE_STAT packet
|
||||||
struct packet_data *pkt_data;
|
// when all tx timestamps are ready
|
||||||
|
struct rte_mbuf *tx_bufs[BURST_SIZE + 1];
|
||||||
|
struct pkt_hdr *pkt_data;
|
||||||
|
struct probe_state_t *probe_state;
|
||||||
uint32_t core_id = rte_lcore_id();
|
uint32_t core_id = rte_lcore_id();
|
||||||
|
|
||||||
|
bool pending_hw_txts = false;
|
||||||
|
|
||||||
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
|
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
|
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
|
||||||
"polling thread.\n\tPerformance will "
|
"polling thread.\n\tPerformance will "
|
||||||
@ -105,13 +165,12 @@ locore_main(void * _unused __rte_unused)
|
|||||||
while(true) {
|
while(true) {
|
||||||
uint16_t nb_tx = 0;
|
uint16_t nb_tx = 0;
|
||||||
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE);
|
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE);
|
||||||
|
struct rte_mbuf * pkt_buf;
|
||||||
|
struct pkt_hdr * tx_data;
|
||||||
|
|
||||||
if (nb_rx == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int i = 0; i < nb_rx; i++) {
|
for(int i = 0; i < nb_rx; i++) {
|
||||||
|
// XXX: optimization: in rx_add_timestamp every packet is already validated once
|
||||||
|
// can just mark valid packet with a value so we can avoid this redundant check
|
||||||
pkt_data = check_valid_packet(bufs[i]);
|
pkt_data = check_valid_packet(bufs[i]);
|
||||||
|
|
||||||
if (pkt_data == NULL) {
|
if (pkt_data == NULL) {
|
||||||
@ -121,83 +180,122 @@ locore_main(void * _unused __rte_unused)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr);
|
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.dst_addr);
|
||||||
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr);
|
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.src_addr);
|
||||||
uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port);
|
uint16_t src_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.src_port);
|
||||||
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port);
|
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.dst_port);
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n",
|
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, type %d\n",
|
||||||
core_id,
|
core_id,
|
||||||
(void*)bufs[i],
|
(void*)bufs[i],
|
||||||
(src_ip >> 24) & 0xff,
|
(src_ip >> 24) & 0xff,
|
||||||
(src_ip >> 16) & 0xff,
|
(src_ip >> 16) & 0xff,
|
||||||
(src_ip >> 8) & 0xff,
|
(src_ip >> 8) & 0xff,
|
||||||
(src_ip >> 0) & 0xff,
|
(src_ip >> 0) & 0xff,
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[0],
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[1],
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[2],
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[3],
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[4],
|
||||||
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5],
|
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[5],
|
||||||
(dst_ip >> 24) & 0xff,
|
(dst_ip >> 24) & 0xff,
|
||||||
(dst_ip >> 16) & 0xff,
|
(dst_ip >> 16) & 0xff,
|
||||||
(dst_ip >> 8) & 0xff,
|
(dst_ip >> 8) & 0xff,
|
||||||
(dst_ip >> 0) & 0xff,
|
(dst_ip >> 0) & 0xff,
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[0],
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[1],
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[2],
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[3],
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[4],
|
||||||
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5],
|
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[5],
|
||||||
src_port,
|
src_port,
|
||||||
dst_port,
|
dst_port,
|
||||||
rte_be_to_cpu_32(pkt_data->epoch));
|
rte_be_to_cpu_16(pkt_data->type));
|
||||||
//if (bufs[i]->ol_flags & PKT_RX_IEEE1588_TMST) {
|
|
||||||
struct timespec ts;
|
switch (rte_be_to_cpu_16(pkt_data->type)) {
|
||||||
if (rte_eth_timesync_read_rx_timestamp(options.s_portid, &ts, bufs[i]->timesync & 0x3) == 0) {
|
case PKT_TYPE_PROBE:
|
||||||
|
if (bufs[i]->userdata != nullptr) {
|
||||||
|
probe_state = (struct probe_state_t *)bufs[i]->userdata;
|
||||||
|
pending_hw_txts = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case PKT_TYPE_LOAD:
|
||||||
|
// swap s_addr and d_addr
|
||||||
|
// XXX: can we avoid this allocation?
|
||||||
|
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
|
||||||
|
if (pkt_buf == NULL) {
|
||||||
|
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n");
|
||||||
|
}
|
||||||
|
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Timestamped sec: %lld nsec: %lld\n", ts.tv_sec, ts.tv_nsec);
|
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_RESP,
|
||||||
} else {
|
&options.s_host_mac,
|
||||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Not Timestamped!\n");
|
&pkt_data->hdr.eth_hdr.s_addr,
|
||||||
}
|
dst_ip,
|
||||||
// }
|
src_ip,
|
||||||
// else {
|
dst_port,
|
||||||
// ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "Not Timestamped!\n");
|
src_port);
|
||||||
// }
|
|
||||||
|
if (tx_data == NULL) {
|
||||||
// swap s_addr and d_addr
|
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
|
||||||
struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
|
}
|
||||||
if (pkt_buf == NULL) {
|
|
||||||
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf");
|
|
||||||
}
|
|
||||||
|
|
||||||
struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf,
|
// endianess doesn't matter
|
||||||
&options.s_host_mac,
|
rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch));
|
||||||
&pkt_data->pkt_hdr.eth_hdr.s_addr,
|
tx_data->magic = pkt_data->magic;
|
||||||
dst_ip,
|
|
||||||
src_ip,
|
// queue for burst send
|
||||||
dst_port,
|
tx_bufs[nb_tx++] = pkt_buf;
|
||||||
src_port);
|
// free rx packet
|
||||||
if (tx_data == NULL) {
|
rte_pktmbuf_free(bufs[i]);
|
||||||
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
|
break;
|
||||||
|
default:
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
// copy, endianess doesn't matter
|
|
||||||
tx_data->epoch = pkt_data->epoch;
|
|
||||||
tx_data->magic = pkt_data->magic;
|
|
||||||
tx_data->clt_ts_rx = pkt_data->clt_ts_rx;
|
|
||||||
tx_data->clt_ts_tx = pkt_data->clt_ts_tx;
|
|
||||||
tx_data->srv_ts_rx = pkt_data->srv_ts_rx;
|
|
||||||
tx_data->srv_ts_tx = pkt_data->srv_ts_tx;
|
|
||||||
// queue for burst send
|
|
||||||
tx_bufs[nb_tx++] = pkt_buf;
|
|
||||||
// free rx packet
|
|
||||||
rte_pktmbuf_free(bufs[i]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx);
|
// send the packets
|
||||||
// cleanup unsent packets
|
if (nb_tx > 0) {
|
||||||
// don't need to free others because it's offloaded
|
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx);
|
||||||
if (nb_tx_succ < nb_tx) {
|
if (nb_tx_succ < nb_tx) {
|
||||||
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n");
|
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we wanna check every loop not only when there are packets
|
||||||
|
if (pending_hw_txts) {
|
||||||
|
struct timespec ts;
|
||||||
|
struct pkt_payload_stat * stat;
|
||||||
|
if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) {
|
||||||
|
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: obtained hw tx timestamp %lld.\n", ts.tv_sec * S2NS + ts.tv_nsec);
|
||||||
|
// now we have everything we need
|
||||||
|
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
|
||||||
|
if (pkt_buf == NULL) {
|
||||||
|
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT,
|
||||||
|
&options.s_host_mac,
|
||||||
|
&probe_state->hdr.eth_hdr.s_addr,
|
||||||
|
rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.dst_addr),
|
||||||
|
rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.src_addr),
|
||||||
|
rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port),
|
||||||
|
rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port));
|
||||||
|
|
||||||
|
// populate stats
|
||||||
|
stat = (struct pkt_payload_stat *)tx_data->payload;
|
||||||
|
stat->epoch = rte_cpu_to_be_64(probe_state->epoch);
|
||||||
|
stat->hw_rx = rte_cpu_to_be_64(probe_state->last_hw_rx);
|
||||||
|
stat->hw_tx = rte_cpu_to_be_64(probe_state->last_hw_tx);
|
||||||
|
stat->sw_rx = rte_cpu_to_be_64(probe_state->last_sw_rx);
|
||||||
|
stat->sw_tx = rte_cpu_to_be_64(probe_state->last_sw_tx);
|
||||||
|
tx_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
||||||
|
|
||||||
|
// queue for burst send
|
||||||
|
tx_bufs[nb_tx++] = pkt_buf;
|
||||||
|
|
||||||
|
// release flux
|
||||||
|
pending_hw_txts = false;
|
||||||
|
options.s_probe_influx.store(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,7 +330,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
|
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
|
||||||
|
|
||||||
/* Configure the Ethernet device. */
|
/* Configure the Ethernet device. */
|
||||||
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
|
ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
|
||||||
if (ret != 0)
|
if (ret != 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
@ -242,7 +340,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
|
|
||||||
/* Allocate and set up 1 RX queue per Ethernet port. */
|
/* Allocate and set up 1 RX queue per Ethernet port. */
|
||||||
rxconf = dev_info.default_rxconf;
|
rxconf = dev_info.default_rxconf;
|
||||||
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
|
for (uint32_t i = 0; i < 1; i++) {
|
||||||
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
|
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
@ -251,7 +349,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
txconf = dev_info.default_txconf;
|
txconf = dev_info.default_txconf;
|
||||||
txconf.offloads = port_conf.txmode.offloads;
|
txconf.offloads = port_conf.txmode.offloads;
|
||||||
/* Allocate and set up 1 TX queue per Ethernet port. */
|
/* Allocate and set up 1 TX queue per Ethernet port. */
|
||||||
for (uint32_t i = 0; i < TX_RING_NUM; i++) {
|
for (uint32_t i = 0; i < 1; i++) {
|
||||||
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
|
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
@ -276,7 +374,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
|
|||||||
if (ret != 0)
|
if (ret != 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
if (rte_eth_add_tx_callback(portid, 0, tx_calc_latency, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) {
|
if (rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,11 +389,18 @@ static void usage()
|
|||||||
" -h: display the information\n");
|
" -h: display the information\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// static void int_handler(int)
|
||||||
|
// {
|
||||||
|
// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n");
|
||||||
|
// }
|
||||||
|
|
||||||
int main(int argc, char* argv[])
|
int main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
unsigned int nb_ports;
|
unsigned int nb_ports;
|
||||||
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt;
|
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt;
|
||||||
|
|
||||||
|
//signal(SIGINT, int_handler);
|
||||||
|
|
||||||
// init dpdk
|
// init dpdk
|
||||||
int ret = rte_eal_init(argc, argv);
|
int ret = rte_eal_init(argc, argv);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
Loading…
Reference in New Issue
Block a user