ptp working
Summary: +arc stuff khat timestamp protocol working Test Plan: by hand Reviewers: ali Differential Revision: https://review.rcs.uwaterloo.ca/D408
This commit is contained in:
parent
855b9cf714
commit
82e1098f3b
@ -15,13 +15,16 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
|
||||
-Wno-deprecated-declarations
|
||||
-Wno-packed-not-aligned
|
||||
-Wno-address-of-packed-member
|
||||
-msse4)
|
||||
-Wno-zero-length-array
|
||||
-Wno-gnu-zero-variadic-macro-arguments
|
||||
-msse4
|
||||
-mavx)
|
||||
|
||||
include_directories(${CMAKE_SOURCE_DIR}/inc)
|
||||
include_directories(${dpdk_INCLUDE_DIRS})
|
||||
|
||||
add_executable(khat khat/khat.cc)
|
||||
add_executable(cat cat/cat.cc)
|
||||
add_executable(khat khat/khat.cc )
|
||||
add_executable(cat cat/cat.cc cat/generator.cc)
|
||||
|
||||
set(LINK_LIBS ${dpdk_LIBRARIES} pthread)
|
||||
|
||||
|
170
cat/cat.cc
170
cat/cat.cc
@ -37,17 +37,11 @@ constexpr static unsigned int MODE_CLIENT = 1;
|
||||
|
||||
static const struct rte_eth_conf port_conf_default{};
|
||||
|
||||
struct sendpt {
|
||||
struct datapt {
|
||||
uint32_t epoch;
|
||||
uint32_t valid;
|
||||
uint64_t clt_hw_tx;
|
||||
uint64_t clt_sw_tx;
|
||||
};
|
||||
|
||||
|
||||
struct recvpt {
|
||||
uint32_t epoch;
|
||||
uint32_t valid;
|
||||
uint64_t clt_hw_rx;
|
||||
uint64_t clt_sw_rx;
|
||||
uint64_t srv_hw_tx;
|
||||
@ -60,8 +54,8 @@ struct thread_info {
|
||||
unsigned int id;
|
||||
unsigned int rxqid{0};
|
||||
unsigned int txqid{0};
|
||||
std::vector<struct sendpt *> send_data;
|
||||
std::vector<struct recvpt *> recv_data;
|
||||
std::vector<struct datapt *> data;
|
||||
struct datapt * last_datapt{nullptr};
|
||||
unsigned int tot_send{0};
|
||||
unsigned int tot_recv{0};
|
||||
Generator * ia_gen;
|
||||
@ -88,8 +82,13 @@ struct options_t {
|
||||
|
||||
static struct options_t options;
|
||||
|
||||
static struct thread_info * get_thread_info(int qid)
|
||||
{
|
||||
return options.s_thr_info.at(qid);
|
||||
}
|
||||
|
||||
static uint16_t
|
||||
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx,
|
||||
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
|
||||
{
|
||||
uint64_t now = rte_rdtsc();
|
||||
@ -105,22 +104,23 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) {
|
||||
pkts[i]->userdata = nullptr;
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
|
||||
struct thread_info * tinfo = get_thread_info(qidx);
|
||||
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
|
||||
if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) {
|
||||
if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) {
|
||||
// has hw rx timestamp
|
||||
struct recvpt * datapt = new struct recvpt;
|
||||
datapt->valid = options.s_record.load();
|
||||
datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
|
||||
datapt->clt_sw_rx = now;
|
||||
pkts[i]->userdata = datapt;
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, datapt->clt_hw_rx);
|
||||
tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
|
||||
tinfo->last_datapt->clt_sw_rx = now;
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx);
|
||||
} else {
|
||||
// leave as null
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
|
||||
rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
|
||||
}
|
||||
} else {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch);
|
||||
}
|
||||
} else {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,7 +143,14 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||
}
|
||||
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
|
||||
((struct sendpt *)pkts[i]->userdata)->clt_sw_tx = now;
|
||||
struct thread_info * tinfo = get_thread_info(qidx);
|
||||
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
|
||||
|
||||
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
|
||||
rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch);
|
||||
}
|
||||
|
||||
tinfo->last_datapt->clt_sw_tx = now;
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now);
|
||||
} else {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||
@ -161,14 +168,11 @@ locore_main(void * tif)
|
||||
struct rte_mbuf *rx_bufs[BURST_SIZE];
|
||||
struct pkt_hdr *pkt_data;
|
||||
uint32_t core_id = rte_lcore_id();
|
||||
uint32_t epoch;
|
||||
struct pkt_payload_epoch * pld_epoch;
|
||||
struct pkt_payload_stat * pld_stat;
|
||||
struct recvpt * recvpt;
|
||||
int32_t ret;
|
||||
|
||||
struct sendpt * last_sendpt;
|
||||
bool pending_hw_tx;
|
||||
bool read_tx = true;
|
||||
bool recv_stat = true;
|
||||
bool recv_resp = true;
|
||||
|
||||
uint64_t next_ts;
|
||||
// XXX: check link status instead
|
||||
@ -183,7 +187,6 @@ locore_main(void * tif)
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id);
|
||||
|
||||
next_ts = get_time_us();
|
||||
pending_hw_tx = false;
|
||||
|
||||
while(!options.s_stop.load()) {
|
||||
uint64_t now = get_time_us();
|
||||
@ -201,28 +204,45 @@ locore_main(void * tif)
|
||||
}
|
||||
|
||||
uint16_t type = rte_be_to_cpu_16(each->type);
|
||||
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type);
|
||||
switch (type) {
|
||||
case PKT_TYPE_RESP:
|
||||
struct pkt_payload_epoch * pld_epoch;
|
||||
struct pkt_payload_stat * pld_stat;
|
||||
uint32_t epoch;
|
||||
|
||||
case PKT_TYPE_PROBE_RESP:
|
||||
pld_epoch = (struct pkt_payload_epoch *)each->payload;
|
||||
epoch = rte_be_to_cpu_32(pld_epoch->epoch);
|
||||
|
||||
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
tinfo->tot_recv++;
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type,
|
||||
rte_be_to_cpu_32(((struct pkt_payload_epoch *)(each->payload))->epoch));
|
||||
|
||||
recv_resp = true;
|
||||
break;
|
||||
case PKT_TYPE_STAT:
|
||||
tinfo->tot_recv++;
|
||||
recvpt = (struct recvpt *)rx_bufs[i]->userdata;
|
||||
pld_stat = (struct pkt_payload_stat *)each->payload;
|
||||
// keep stats
|
||||
recvpt->epoch = rte_be_to_cpu_32(pld_stat->epoch);
|
||||
recvpt->srv_hw_tx = rte_be_to_cpu_32(pld_stat->hw_tx);
|
||||
recvpt->srv_hw_rx = rte_be_to_cpu_32(pld_stat->hw_rx);
|
||||
recvpt->srv_sw_tx = rte_be_to_cpu_32(pld_stat->sw_tx);
|
||||
recvpt->srv_sw_rx = rte_be_to_cpu_32(pld_stat->sw_rx);
|
||||
tinfo->recv_data.push_back(recvpt);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, recvpt->epoch);
|
||||
epoch = rte_be_to_cpu_32(pld_stat->epoch);
|
||||
|
||||
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
tinfo->last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx);
|
||||
tinfo->last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx);
|
||||
tinfo->last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx);
|
||||
tinfo->last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx);
|
||||
|
||||
tinfo->tot_recv++;
|
||||
|
||||
recv_stat = true;
|
||||
break;
|
||||
default:
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with invalid type %d.\n", (void*)rx_bufs[i], type);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type);
|
||||
rte_pktmbuf_free(rx_bufs[i]);
|
||||
continue;
|
||||
}
|
||||
@ -231,7 +251,40 @@ locore_main(void * tif)
|
||||
}
|
||||
}
|
||||
|
||||
if (now >= next_ts) { //&& !pending_hw_tx) {
|
||||
if (read_tx && recv_stat & recv_resp) {
|
||||
// if we have all the data
|
||||
|
||||
if (tinfo->last_datapt != nullptr) {
|
||||
// push the data to the queue if we haven't done so already
|
||||
tinfo->data.push_back(tinfo->last_datapt);
|
||||
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \
|
||||
" Valid: %d\n"
|
||||
" client TX HW: %llu\n" \
|
||||
" client TX SW: %llu\n" \
|
||||
" client RX HW: %llu\n" \
|
||||
" client RX SW: %llu\n" \
|
||||
" server TX HW: %llu\n" \
|
||||
" server TX SW: %llu\n" \
|
||||
" server RX HW: %llu\n" \
|
||||
" server RX SW: %llu\n\n",
|
||||
tinfo->last_datapt->epoch,
|
||||
tinfo->last_datapt->valid,
|
||||
tinfo->last_datapt->clt_hw_tx,
|
||||
tinfo->last_datapt->clt_sw_tx,
|
||||
tinfo->last_datapt->clt_hw_rx,
|
||||
tinfo->last_datapt->clt_sw_rx,
|
||||
tinfo->last_datapt->srv_hw_tx,
|
||||
tinfo->last_datapt->srv_sw_tx,
|
||||
tinfo->last_datapt->srv_hw_rx,
|
||||
tinfo->last_datapt->srv_sw_rx);
|
||||
tinfo->last_datapt = nullptr;
|
||||
}
|
||||
|
||||
if (now >= next_ts) {
|
||||
struct pkt_payload_epoch * pld_epoch;
|
||||
uint32_t epoch;
|
||||
|
||||
next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0);
|
||||
|
||||
// generate the packet
|
||||
@ -242,20 +295,21 @@ locore_main(void * tif)
|
||||
}
|
||||
|
||||
pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE,
|
||||
&options.s_host_mac, &options.server_mac,
|
||||
RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), 319, 319);
|
||||
&options.s_host_mac, &options.server_mac);
|
||||
if (pkt_data == NULL) {
|
||||
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
|
||||
}
|
||||
|
||||
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
|
||||
epoch = options.s_epoch.fetch_add(1);
|
||||
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
|
||||
pld_epoch->epoch = rte_cpu_to_be_32(epoch);
|
||||
last_sendpt = new sendpt;
|
||||
last_sendpt->epoch = 0; //epoch;
|
||||
last_sendpt->valid = options.s_record.load();
|
||||
tx_buf->userdata = last_sendpt;
|
||||
pending_hw_tx = true;
|
||||
tinfo->last_datapt = new struct datapt;
|
||||
tinfo->last_datapt->epoch = epoch;
|
||||
tinfo->last_datapt->valid = options.s_record.load();
|
||||
|
||||
read_tx = false;
|
||||
recv_resp = false;
|
||||
recv_stat = false;
|
||||
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
|
||||
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1);
|
||||
@ -264,16 +318,14 @@ locore_main(void * tif)
|
||||
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pending_hw_tx) {
|
||||
if (!read_tx) {
|
||||
struct timespec ts;
|
||||
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS);
|
||||
last_sendpt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
|
||||
tinfo->send_data.push_back(last_sendpt);
|
||||
pending_hw_tx = false;
|
||||
} else {
|
||||
//ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp failed - %d.\n", ret);
|
||||
tinfo->last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
|
||||
read_tx = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,8 +2,12 @@
|
||||
-O2
|
||||
-std=c++11
|
||||
-Wall
|
||||
-Wextra
|
||||
-Werror
|
||||
-Wpedantic
|
||||
-I/usr/include/dpdk
|
||||
-Iinc
|
||||
-Wno-deprecated-declarations
|
||||
-Wno-packed-not-aligned
|
||||
-Wno-address-of-packed-member
|
||||
-Wno-zero-length-array
|
||||
-Wno-gnu-zero-variadic-macro-arguments
|
91
inc/pkt.h
91
inc/pkt.h
@ -12,13 +12,9 @@
|
||||
#include <rte_net.h>
|
||||
#include <rte_vxlan.h>
|
||||
|
||||
#define IP_DEFTTL 64 /* from RFC 1340. */
|
||||
#define IP_VERSION 0x40
|
||||
#define IP_HDRLEN 0x05 /* default IP header length == five 32-bits words. */
|
||||
#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
|
||||
#define IP_ADDR_FMT_SIZE 15
|
||||
|
||||
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
|
||||
const static struct rte_ether_addr PROBE_MAC_ADDR {0x01,0x1B,0x19,0x00,0x00,0x00};
|
||||
const static uint16_t ETHER_TYPE_LOCAL_EXP = 0x88b5;
|
||||
|
||||
struct ptp_hdr {
|
||||
uint8_t ptp_msg_type;
|
||||
@ -26,15 +22,9 @@ struct ptp_hdr {
|
||||
uint8_t unused[34];
|
||||
} __attribute__((packed));
|
||||
|
||||
struct pkt_proto_hdr {
|
||||
struct rte_ether_hdr eth_hdr;
|
||||
struct rte_ipv4_hdr ipv4_hdr;
|
||||
struct rte_udp_hdr udp_hdr;
|
||||
struct ptp_hdr ptp_hdr;
|
||||
} __attribute__((packed));
|
||||
|
||||
struct pkt_hdr {
|
||||
struct pkt_proto_hdr hdr;
|
||||
struct rte_ether_hdr eth_hdr;
|
||||
struct ptp_hdr ptp_hdr;
|
||||
uint16_t type;
|
||||
uint32_t magic;
|
||||
char payload[0];
|
||||
@ -42,12 +32,13 @@ struct pkt_hdr {
|
||||
|
||||
constexpr static uint16_t PKT_TYPE_LOAD = 0;
|
||||
constexpr static uint16_t PKT_TYPE_PROBE = 1;
|
||||
constexpr static uint16_t PKT_TYPE_RESP = 2;
|
||||
constexpr static uint16_t PKT_TYPE_LOAD_RESP = 2;
|
||||
constexpr static uint16_t PKT_TYPE_PROBE_RESP = 3;
|
||||
struct pkt_payload_epoch {
|
||||
uint32_t epoch;
|
||||
};
|
||||
|
||||
constexpr static uint16_t PKT_TYPE_STAT = 3;
|
||||
constexpr static uint16_t PKT_TYPE_STAT = 4;
|
||||
struct pkt_payload_stat {
|
||||
uint32_t epoch;
|
||||
uint64_t hw_rx;
|
||||
@ -61,8 +52,9 @@ constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_STAT + 1;
|
||||
static const uint32_t expected_payload_size[NUM_PKT_TYPES] {
|
||||
sizeof(struct pkt_payload_epoch), // LOAD
|
||||
sizeof(struct pkt_payload_epoch), // PROBE
|
||||
sizeof(struct pkt_payload_epoch), // RESP
|
||||
sizeof(struct pkt_payload_stat) // STAT
|
||||
sizeof(struct pkt_payload_epoch), // LOAD_RESP
|
||||
sizeof(struct pkt_payload_epoch), // PROBE_RESP
|
||||
sizeof(struct pkt_payload_stat) //STAT
|
||||
};
|
||||
|
||||
static inline void
|
||||
@ -135,16 +127,13 @@ dump_pkt(struct rte_mbuf *pkt)
|
||||
// fills the packet with the information except for the payload itself
|
||||
static inline
|
||||
struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type,
|
||||
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac,
|
||||
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
|
||||
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac)
|
||||
{
|
||||
rte_pktmbuf_reset(buf);
|
||||
|
||||
const uint32_t total_sz = sizeof(struct pkt_hdr) + expected_payload_size[type];
|
||||
struct pkt_hdr * pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz);
|
||||
struct rte_ether_hdr * eth_hdr;
|
||||
struct rte_ipv4_hdr * ipv4_hdr;
|
||||
struct rte_udp_hdr * udp_hdr;
|
||||
|
||||
if (pkt_data == NULL)
|
||||
return NULL;
|
||||
@ -153,55 +142,23 @@ struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type,
|
||||
buf->nb_segs = 1;
|
||||
|
||||
// construct l2 header
|
||||
eth_hdr = &pkt_data->hdr.eth_hdr;
|
||||
eth_hdr = &pkt_data->eth_hdr;
|
||||
rte_ether_addr_copy(src_mac, ð_hdr->s_addr);
|
||||
rte_ether_addr_copy(dst_mac, ð_hdr->d_addr);
|
||||
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
|
||||
buf->l2_len = sizeof(struct rte_ether_hdr);
|
||||
|
||||
// construct l3 header
|
||||
ipv4_hdr = &pkt_data->hdr.ipv4_hdr;
|
||||
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
|
||||
ipv4_hdr->version_ihl = IP_VHL_DEF;
|
||||
ipv4_hdr->type_of_service = 0;
|
||||
ipv4_hdr->fragment_offset = 0;
|
||||
ipv4_hdr->time_to_live = IP_DEFTTL;
|
||||
ipv4_hdr->next_proto_id = IPPROTO_UDP;
|
||||
ipv4_hdr->packet_id = 0;
|
||||
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
|
||||
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
|
||||
ipv4_hdr->total_length = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr));
|
||||
ipv4_hdr->hdr_checksum = 0;
|
||||
buf->l3_len = sizeof(struct rte_ipv4_hdr);
|
||||
|
||||
// construct l4 header
|
||||
udp_hdr = &pkt_data->hdr.udp_hdr;
|
||||
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
|
||||
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
|
||||
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
|
||||
udp_hdr->dgram_len = rte_cpu_to_be_16(total_sz -
|
||||
sizeof(struct rte_ether_hdr) -
|
||||
sizeof(struct rte_udp_hdr));
|
||||
buf->l4_len = sizeof(struct rte_udp_hdr);
|
||||
|
||||
/* construct ptp header
|
||||
* so basically after some experiments at least the intel NIC categorizes PTP packets as:
|
||||
* 1. Dest port 319 2. these two fields must be set and don't care about others
|
||||
* Experiments:
|
||||
* SPORT: 319 DPORT: 319 PTP HDR: valid => SET
|
||||
* SPORT: 333 DPORT: 319 PTP HDR: valid => SET
|
||||
* SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET
|
||||
* SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET */
|
||||
pkt_data->hdr.ptp_hdr.ptp_msg_type = 0x0; // SYNC
|
||||
pkt_data->type = rte_cpu_to_be_16(type);
|
||||
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
||||
if (type == PKT_TYPE_PROBE) {
|
||||
// only timestamp PROBE pkts
|
||||
pkt_data->hdr.ptp_hdr.ptp_ver = 0x2; // VER 2
|
||||
if (type == PKT_TYPE_PROBE || type == PKT_TYPE_PROBE_RESP) {
|
||||
rte_ether_addr_copy(&PROBE_MAC_ADDR, ð_hdr->d_addr);
|
||||
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_1588);
|
||||
pkt_data->ptp_hdr.ptp_ver = 0x2; // VER 2
|
||||
buf->ol_flags |= PKT_TX_IEEE1588_TMST;
|
||||
} else {
|
||||
pkt_data->hdr.ptp_hdr.ptp_ver = 0xff;
|
||||
rte_ether_addr_copy(dst_mac, ð_hdr->d_addr);
|
||||
eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_LOCAL_EXP);
|
||||
pkt_data->ptp_hdr.ptp_ver = 0xff;
|
||||
}
|
||||
buf->l2_len = sizeof(struct rte_ether_hdr);
|
||||
|
||||
pkt_data->ptp_hdr.ptp_msg_type = 0x0; // SYNC
|
||||
pkt_data->type = rte_cpu_to_be_16(type);
|
||||
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
||||
|
||||
return pkt_data;
|
||||
}
|
||||
|
@ -3,7 +3,8 @@
|
||||
#include <time.h>
|
||||
#include <rte_ip.h>
|
||||
|
||||
constexpr static unsigned int S2NS = 100000000UL;
|
||||
constexpr static unsigned long S2NS = 100000000UL;
|
||||
constexpr static unsigned long S2US = 1000000L;
|
||||
constexpr static uint16_t SERVER_LOAD_PORT = 1234;
|
||||
constexpr static uint16_t SERVER_PROBE_PORT = 319;
|
||||
constexpr static uint32_t SERVER_IP = RTE_IPV4(192,168,123,0);
|
||||
|
192
khat/khat.cc
192
khat/khat.cc
@ -22,8 +22,20 @@
|
||||
#include "ntrlog.h"
|
||||
#include "util.h"
|
||||
|
||||
|
||||
/* Protocol:
|
||||
* regular client:
|
||||
* client -> LOAD -> server
|
||||
* server -> LOAD_RESP -> client
|
||||
* measuring client:
|
||||
* client -> PROBE -> server (client tx timestamps)
|
||||
* server -> PROBE_RESP -> client (client rx timestamps and server tx/rx timestamps)
|
||||
* server -> STAT -> client (server sends its tx/rx timestamps)
|
||||
*/
|
||||
|
||||
NTR_DECL_IMPL;
|
||||
|
||||
static void * const PROBE_MAGIC = (void*)0x12344444;
|
||||
constexpr static unsigned int MBUF_MAX_COUNT = 16384;
|
||||
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
|
||||
constexpr static unsigned int RX_RING_SIZE = 4096;
|
||||
@ -42,21 +54,25 @@ static const struct rte_eth_conf port_conf_default{};
|
||||
// I kept this global because globally there could be only one pending probe request
|
||||
// and rx_add_timestamp can save their shit here too
|
||||
struct probe_state_t {
|
||||
struct pkt_proto_hdr hdr;
|
||||
struct rte_ether_hdr hdr;
|
||||
uint32_t epoch;
|
||||
uint32_t timesync;
|
||||
uint64_t last_sw_rx;
|
||||
uint64_t last_sw_tx;
|
||||
uint64_t last_hw_rx;
|
||||
uint64_t last_hw_tx;
|
||||
};
|
||||
|
||||
|
||||
// state machine:
|
||||
constexpr static int SERVER_STATE_WAIT = 0;
|
||||
constexpr static int SERVER_STATE_PROBE = 1;
|
||||
|
||||
struct options_t {
|
||||
//states
|
||||
uint16_t s_portid;
|
||||
struct rte_ether_addr s_host_mac;
|
||||
struct rte_mempool * s_pkt_mempool;
|
||||
std::atomic<int> s_probe_influx{0};
|
||||
std::atomic<int> s_state {SERVER_STATE_WAIT};
|
||||
struct probe_state_t s_probe_info;
|
||||
};
|
||||
|
||||
@ -78,26 +94,23 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||
}
|
||||
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
|
||||
int one = 0;
|
||||
int state_wait = SERVER_STATE_WAIT;
|
||||
pkts[i]->userdata = nullptr;
|
||||
if (rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3) == 0) {
|
||||
if (options.s_probe_influx.compare_exchange_strong(one, 1)) {
|
||||
if (options.s_state.compare_exchange_strong(state_wait, SERVER_STATE_PROBE)) {
|
||||
// mark the mbuf as probe packet being processed
|
||||
pkts[i]->userdata = &options.s_probe_info;
|
||||
// jot down some information and stats
|
||||
options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
|
||||
// only the locore that receives the pkt w/ userdata != nullptr processes that packet
|
||||
pkts[i]->userdata = PROBE_MAGIC;
|
||||
// tag with timestamps
|
||||
options.s_probe_info.last_hw_rx = ts.tv_nsec + ts.tv_sec * S2NS;
|
||||
options.s_probe_info.last_sw_rx = now;
|
||||
options.s_probe_info.timesync = pkts[i]->timesync;
|
||||
// copy protocol header
|
||||
rte_memcpy(&options.s_probe_info.hdr, &pkt_data->hdr, sizeof(struct pkt_proto_hdr));
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p epoch %d with sw: %llu hw:%llu.\n", (void*)pkts[i], options.s_probe_info.epoch, now, options.s_probe_info.last_hw_rx);
|
||||
} else
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - another probe packet influx.\n", (void*)pkts[i]);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - server is processing a probe.\n", (void*)pkts[i]);
|
||||
} else
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n", (void*)pkts[i]);
|
||||
} else
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
|
||||
}
|
||||
|
||||
return nb_pkts;
|
||||
@ -119,18 +132,17 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) {
|
||||
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
|
||||
// this packet is the response to PROBE packets
|
||||
|
||||
// at this time the packet is not sent to the NIC yet so
|
||||
// the state must be influx
|
||||
// the state must be waiting stats
|
||||
// XXX: this should be an assert
|
||||
if(options.s_probe_influx.load() != 1 || pkts[i]->userdata == nullptr) {
|
||||
if(options.s_state.load() != SERVER_STATE_PROBE || pkts[i]->userdata != PROBE_MAGIC) {
|
||||
rte_exit(EXIT_FAILURE, "packet %p sent to NIC before sw callback\n", (void*)pkts[i]);
|
||||
}
|
||||
|
||||
// just tag the option
|
||||
((struct probe_state_t *)pkts[i]->userdata)->last_sw_tx = now;
|
||||
options.s_probe_info.last_sw_tx = now;
|
||||
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw tx %llu\n", (void*)pkts[i], options.s_probe_info.last_sw_tx);
|
||||
} else {
|
||||
@ -147,12 +159,11 @@ locore_main(void * _unused __rte_unused)
|
||||
struct rte_mbuf *bufs[BURST_SIZE];
|
||||
// + 1 because it might involve an extra PKT_TYPE_STAT packet
|
||||
// when all tx timestamps are ready
|
||||
struct rte_mbuf *tx_bufs[BURST_SIZE + 1];
|
||||
struct rte_mbuf *tx_bufs[BURST_SIZE];
|
||||
struct pkt_hdr *pkt_data;
|
||||
struct probe_state_t *probe_state;
|
||||
uint32_t core_id = rte_lcore_id();
|
||||
|
||||
bool pending_hw_txts = false;
|
||||
bool pending_probe = false;
|
||||
|
||||
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
|
||||
@ -180,76 +191,84 @@ locore_main(void * _unused __rte_unused)
|
||||
continue;
|
||||
}
|
||||
|
||||
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.dst_addr);
|
||||
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.src_addr);
|
||||
uint16_t src_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.src_port);
|
||||
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.dst_port);
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, type %d\n",
|
||||
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n",
|
||||
core_id,
|
||||
(void*)bufs[i],
|
||||
(src_ip >> 24) & 0xff,
|
||||
(src_ip >> 16) & 0xff,
|
||||
(src_ip >> 8) & 0xff,
|
||||
(src_ip >> 0) & 0xff,
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[0],
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[1],
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[2],
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[3],
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[4],
|
||||
pkt_data->hdr.eth_hdr.s_addr.addr_bytes[5],
|
||||
(dst_ip >> 24) & 0xff,
|
||||
(dst_ip >> 16) & 0xff,
|
||||
(dst_ip >> 8) & 0xff,
|
||||
(dst_ip >> 0) & 0xff,
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[0],
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[1],
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[2],
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[3],
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[4],
|
||||
pkt_data->hdr.eth_hdr.d_addr.addr_bytes[5],
|
||||
src_port,
|
||||
dst_port,
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[0],
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[1],
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[2],
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[3],
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[4],
|
||||
pkt_data->eth_hdr.s_addr.addr_bytes[5],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[0],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[1],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[2],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[3],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[4],
|
||||
pkt_data->eth_hdr.d_addr.addr_bytes[5],
|
||||
rte_be_to_cpu_16(pkt_data->type));
|
||||
|
||||
|
||||
switch (rte_be_to_cpu_16(pkt_data->type)) {
|
||||
case PKT_TYPE_PROBE:
|
||||
if (bufs[i]->userdata != nullptr) {
|
||||
probe_state = (struct probe_state_t *)bufs[i]->userdata;
|
||||
pending_hw_txts = true;
|
||||
}
|
||||
break;
|
||||
case PKT_TYPE_LOAD:
|
||||
// swap s_addr and d_addr
|
||||
// XXX: can we avoid this allocation?
|
||||
case PKT_TYPE_PROBE: {
|
||||
if (options.s_state.load() == SERVER_STATE_PROBE && bufs[i]->userdata == PROBE_MAGIC) {
|
||||
// send back probe_resp pkt to probe for return latency
|
||||
pending_probe = true;
|
||||
|
||||
// book keep probe results
|
||||
options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
|
||||
options.s_probe_info.timesync = bufs[i]->timesync;
|
||||
rte_memcpy(&options.s_probe_info.hdr, &pkt_data->eth_hdr, sizeof(struct rte_ether_hdr));
|
||||
|
||||
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
|
||||
|
||||
if (pkt_buf == NULL) {
|
||||
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n");
|
||||
}
|
||||
|
||||
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_RESP,
|
||||
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP,
|
||||
&options.s_host_mac,
|
||||
&pkt_data->hdr.eth_hdr.s_addr,
|
||||
dst_ip,
|
||||
src_ip,
|
||||
dst_port,
|
||||
src_port);
|
||||
&pkt_data->eth_hdr.s_addr);
|
||||
|
||||
if (tx_data == NULL) {
|
||||
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
|
||||
}
|
||||
|
||||
// endianess doesn't matter
|
||||
rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch));
|
||||
tx_data->magic = pkt_data->magic;
|
||||
|
||||
pkt_buf->userdata = PROBE_MAGIC;
|
||||
|
||||
// queue for burst send
|
||||
tx_bufs[nb_tx++] = pkt_buf;
|
||||
// free rx packet
|
||||
rte_pktmbuf_free(bufs[i]);
|
||||
break;
|
||||
default:
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PKT_TYPE_LOAD: {
|
||||
// we reply to load packet regardless of the server state
|
||||
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
|
||||
|
||||
if (pkt_buf == NULL) {
|
||||
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n");
|
||||
}
|
||||
|
||||
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP,
|
||||
&options.s_host_mac,
|
||||
&pkt_data->eth_hdr.s_addr);
|
||||
|
||||
if (tx_data == NULL) {
|
||||
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
|
||||
}
|
||||
|
||||
rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch));
|
||||
|
||||
// queue for burst send
|
||||
tx_bufs[nb_tx++] = pkt_buf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
rte_pktmbuf_free(bufs[i]);
|
||||
}
|
||||
|
||||
// send the packets
|
||||
@ -261,7 +280,7 @@ locore_main(void * _unused __rte_unused)
|
||||
}
|
||||
|
||||
// we wanna check every loop not only when there are packets
|
||||
if (pending_hw_txts) {
|
||||
if (pending_probe) {
|
||||
struct timespec ts;
|
||||
struct pkt_payload_stat * stat;
|
||||
if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) {
|
||||
@ -274,27 +293,28 @@ locore_main(void * _unused __rte_unused)
|
||||
|
||||
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT,
|
||||
&options.s_host_mac,
|
||||
&probe_state->hdr.eth_hdr.s_addr,
|
||||
rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.dst_addr),
|
||||
rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.src_addr),
|
||||
rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port),
|
||||
rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port));
|
||||
&options.s_probe_info.hdr.s_addr);
|
||||
|
||||
// populate stats
|
||||
stat = (struct pkt_payload_stat *)tx_data->payload;
|
||||
stat->epoch = rte_cpu_to_be_64(probe_state->epoch);
|
||||
stat->hw_rx = rte_cpu_to_be_64(probe_state->last_hw_rx);
|
||||
stat->hw_tx = rte_cpu_to_be_64(probe_state->last_hw_tx);
|
||||
stat->sw_rx = rte_cpu_to_be_64(probe_state->last_sw_rx);
|
||||
stat->sw_tx = rte_cpu_to_be_64(probe_state->last_sw_tx);
|
||||
tx_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
|
||||
stat->epoch = rte_cpu_to_be_32(options.s_probe_info.epoch);
|
||||
stat->hw_rx = rte_cpu_to_be_64(options.s_probe_info.last_hw_rx);
|
||||
stat->hw_tx = rte_cpu_to_be_64(ts.tv_nsec + ts.tv_sec * S2NS);
|
||||
stat->sw_rx = rte_cpu_to_be_64(options.s_probe_info.last_sw_rx);
|
||||
stat->sw_tx = rte_cpu_to_be_64(options.s_probe_info.last_sw_tx);
|
||||
|
||||
// queue for burst send
|
||||
tx_bufs[nb_tx++] = pkt_buf;
|
||||
// send the packet
|
||||
if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) {
|
||||
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n");
|
||||
}
|
||||
|
||||
// release flux
|
||||
pending_hw_txts = false;
|
||||
options.s_probe_influx.store(0);
|
||||
pending_probe = false;
|
||||
|
||||
int expected = SERVER_STATE_PROBE;
|
||||
if (!options.s_state.compare_exchange_strong(expected, SERVER_STATE_WAIT)) {
|
||||
rte_exit(EXIT_FAILURE, "s_state changed unexpectedly!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user