diff --git a/CMakeLists.txt b/CMakeLists.txt index 733e1c8..ddd08ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,13 +15,16 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations -Wno-packed-not-aligned -Wno-address-of-packed-member - -msse4) + -Wno-zero-length-array + -Wno-gnu-zero-variadic-macro-arguments + -msse4 + -mavx) include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${dpdk_INCLUDE_DIRS}) -add_executable(khat khat/khat.cc) -add_executable(cat cat/cat.cc) +add_executable(khat khat/khat.cc ) +add_executable(cat cat/cat.cc cat/generator.cc) set(LINK_LIBS ${dpdk_LIBRARIES} pthread) diff --git a/cat/cat.cc b/cat/cat.cc index 101bbcc..9beef94 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -37,17 +37,11 @@ constexpr static unsigned int MODE_CLIENT = 1; static const struct rte_eth_conf port_conf_default{}; -struct sendpt { +struct datapt { uint32_t epoch; uint32_t valid; uint64_t clt_hw_tx; uint64_t clt_sw_tx; -}; - - -struct recvpt { - uint32_t epoch; - uint32_t valid; uint64_t clt_hw_rx; uint64_t clt_sw_rx; uint64_t srv_hw_tx; @@ -60,8 +54,8 @@ struct thread_info { unsigned int id; unsigned int rxqid{0}; unsigned int txqid{0}; - std::vector send_data; - std::vector recv_data; + std::vector data; + struct datapt * last_datapt{nullptr}; unsigned int tot_send{0}; unsigned int tot_recv{0}; Generator * ia_gen; @@ -88,8 +82,13 @@ struct options_t { static struct options_t options; +static struct thread_info * get_thread_info(int qid) +{ + return options.s_thr_info.at(qid); +} + static uint16_t -rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, +rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { uint64_t now = rte_rdtsc(); @@ -105,22 +104,23 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, continue; } - if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) { - pkts[i]->userdata = nullptr; - if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { - // has hw rx timestamp - struct recvpt * datapt = new struct recvpt; - datapt->valid = options.s_record.load(); - datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; - datapt->clt_sw_rx = now; - pkts[i]->userdata = datapt; - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, datapt->clt_hw_rx); + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) { + struct thread_info * tinfo = get_thread_info(qidx); + uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) { + if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { + // has hw rx timestamp + tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; + tinfo->last_datapt->clt_sw_rx = now; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx); + } else { + rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); + } } else { - // leave as null - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch); } } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type)); } } @@ -143,7 +143,14 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, } if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { - ((struct sendpt *)pkts[i]->userdata)->clt_sw_tx = now; + struct thread_info * tinfo = get_thread_info(qidx); + uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch); + } + + tinfo->last_datapt->clt_sw_tx = now; ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now); } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); @@ -161,14 +168,11 @@ locore_main(void * tif) struct rte_mbuf *rx_bufs[BURST_SIZE]; struct pkt_hdr *pkt_data; uint32_t core_id = rte_lcore_id(); - uint32_t epoch; - struct pkt_payload_epoch * pld_epoch; - struct pkt_payload_stat * pld_stat; - struct recvpt * recvpt; int32_t ret; - struct sendpt * last_sendpt; - bool pending_hw_tx; + bool read_tx = true; + bool recv_stat = true; + bool recv_resp = true; uint64_t next_ts; // XXX: check link status instead @@ -183,7 +187,6 @@ locore_main(void * tif) ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id); next_ts = get_time_us(); - pending_hw_tx = false; while(!options.s_stop.load()) { uint64_t now = get_time_us(); @@ -201,28 +204,45 @@ locore_main(void * tif) } uint16_t type = rte_be_to_cpu_16(each->type); - + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type); switch (type) { - case PKT_TYPE_RESP: + struct pkt_payload_epoch * pld_epoch; + struct pkt_payload_stat * pld_stat; + uint32_t epoch; + + case PKT_TYPE_PROBE_RESP: + pld_epoch = (struct pkt_payload_epoch *)each->payload; + epoch = rte_be_to_cpu_32(pld_epoch->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + break; + } + tinfo->tot_recv++; - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, - rte_be_to_cpu_32(((struct pkt_payload_epoch *)(each->payload))->epoch)); + + recv_resp = true; break; case PKT_TYPE_STAT: - tinfo->tot_recv++; - recvpt = (struct recvpt *)rx_bufs[i]->userdata; pld_stat = (struct pkt_payload_stat *)each->payload; - // keep stats - recvpt->epoch = rte_be_to_cpu_32(pld_stat->epoch); - recvpt->srv_hw_tx = rte_be_to_cpu_32(pld_stat->hw_tx); - recvpt->srv_hw_rx = rte_be_to_cpu_32(pld_stat->hw_rx); - recvpt->srv_sw_tx = rte_be_to_cpu_32(pld_stat->sw_tx); - recvpt->srv_sw_rx = rte_be_to_cpu_32(pld_stat->sw_rx); - tinfo->recv_data.push_back(recvpt); - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d epoch %d.\n", (void*)rx_bufs[i], type, recvpt->epoch); + epoch = rte_be_to_cpu_32(pld_stat->epoch); + + if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch); + break; + } + + tinfo->last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx); + tinfo->last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx); + tinfo->last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx); + tinfo->last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx); + + tinfo->tot_recv++; + + recv_stat = true; break; default: - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with invalid type %d.\n", (void*)rx_bufs[i], type); + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type); rte_pktmbuf_free(rx_bufs[i]); continue; } @@ -231,49 +251,81 @@ locore_main(void * tif) } } - if (now >= next_ts) { //&& !pending_hw_tx) { - next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0); + if (read_tx && recv_stat & recv_resp) { + // if we have all the data - // generate the packet - tx_buf = rte_pktmbuf_alloc(options.mbuf_pool); + if (tinfo->last_datapt != nullptr) { + // push the data to the queue if we haven't done so already + tinfo->data.push_back(tinfo->last_datapt); - if (tx_buf == NULL) { - rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \ + " Valid: %d\n" + " client TX HW: %llu\n" \ + " client TX SW: %llu\n" \ + " client RX HW: %llu\n" \ + " client RX SW: %llu\n" \ + " server TX HW: %llu\n" \ + " server TX SW: %llu\n" \ + " server RX HW: %llu\n" \ + " server RX SW: %llu\n\n", + tinfo->last_datapt->epoch, + tinfo->last_datapt->valid, + tinfo->last_datapt->clt_hw_tx, + tinfo->last_datapt->clt_sw_tx, + tinfo->last_datapt->clt_hw_rx, + tinfo->last_datapt->clt_sw_rx, + tinfo->last_datapt->srv_hw_tx, + tinfo->last_datapt->srv_sw_tx, + tinfo->last_datapt->srv_hw_rx, + tinfo->last_datapt->srv_sw_rx); + tinfo->last_datapt = nullptr; } - pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE, - &options.s_host_mac, &options.server_mac, - RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), 319, 319); - if (pkt_data == NULL) { - rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); - } + if (now >= next_ts) { + struct pkt_payload_epoch * pld_epoch; + uint32_t epoch; - pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload; - epoch = options.s_epoch.fetch_add(1); - pld_epoch->epoch = rte_cpu_to_be_32(epoch); - last_sendpt = new sendpt; - last_sendpt->epoch = 0; //epoch; - last_sendpt->valid = options.s_record.load(); - tx_buf->userdata = last_sendpt; - pending_hw_tx = true; + next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0); - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); - const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1); - - if (nb_tx != 1) { - rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); + // generate the packet + tx_buf = rte_pktmbuf_alloc(options.mbuf_pool); + + if (tx_buf == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); + } + + pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE, + &options.s_host_mac, &options.server_mac); + if (pkt_data == NULL) { + rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); + } + + epoch = options.s_epoch.fetch_add(1); + pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload; + pld_epoch->epoch = rte_cpu_to_be_32(epoch); + tinfo->last_datapt = new struct datapt; + tinfo->last_datapt->epoch = epoch; + tinfo->last_datapt->valid = options.s_record.load(); + + read_tx = false; + recv_resp = false; + recv_stat = false; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); + const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1); + + if (nb_tx != 1) { + rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); + } } } - if (pending_hw_tx) { + if (!read_tx) { struct timespec ts; if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS); - last_sendpt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; - tinfo->send_data.push_back(last_sendpt); - pending_hw_tx = false; - } else { - //ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp failed - %d.\n", ret); + tinfo->last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS; + read_tx = true; } } } diff --git a/compile_flags.txt b/compile_flags.txt index 03babec..53b2ac7 100644 --- a/compile_flags.txt +++ b/compile_flags.txt @@ -2,8 +2,12 @@ -O2 -std=c++11 -Wall +-Wextra -Werror --Wpedantic -I/usr/include/dpdk -Iinc --Wno-deprecated-declarations \ No newline at end of file +-Wno-deprecated-declarations +-Wno-packed-not-aligned +-Wno-address-of-packed-member +-Wno-zero-length-array +-Wno-gnu-zero-variadic-macro-arguments \ No newline at end of file diff --git a/inc/pkt.h b/inc/pkt.h index 4bcbcc8..b1d59db 100644 --- a/inc/pkt.h +++ b/inc/pkt.h @@ -12,13 +12,9 @@ #include #include -#define IP_DEFTTL 64 /* from RFC 1340. */ -#define IP_VERSION 0x40 -#define IP_HDRLEN 0x05 /* default IP header length == five 32-bits words. */ -#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN) -#define IP_ADDR_FMT_SIZE 15 - constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5; +const static struct rte_ether_addr PROBE_MAC_ADDR {0x01,0x1B,0x19,0x00,0x00,0x00}; +const static uint16_t ETHER_TYPE_LOCAL_EXP = 0x88b5; struct ptp_hdr { uint8_t ptp_msg_type; @@ -26,15 +22,9 @@ struct ptp_hdr { uint8_t unused[34]; } __attribute__((packed)); -struct pkt_proto_hdr { - struct rte_ether_hdr eth_hdr; - struct rte_ipv4_hdr ipv4_hdr; - struct rte_udp_hdr udp_hdr; - struct ptp_hdr ptp_hdr; -} __attribute__((packed)); - struct pkt_hdr { - struct pkt_proto_hdr hdr; + struct rte_ether_hdr eth_hdr; + struct ptp_hdr ptp_hdr; uint16_t type; uint32_t magic; char payload[0]; @@ -42,12 +32,13 @@ struct pkt_hdr { constexpr static uint16_t PKT_TYPE_LOAD = 0; constexpr static uint16_t PKT_TYPE_PROBE = 1; -constexpr static uint16_t PKT_TYPE_RESP = 2; +constexpr static uint16_t PKT_TYPE_LOAD_RESP = 2; +constexpr static uint16_t PKT_TYPE_PROBE_RESP = 3; struct pkt_payload_epoch { uint32_t epoch; }; -constexpr static uint16_t PKT_TYPE_STAT = 3; +constexpr static uint16_t PKT_TYPE_STAT = 4; struct pkt_payload_stat { uint32_t epoch; uint64_t hw_rx; @@ -61,8 +52,9 @@ constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_STAT + 1; static const uint32_t expected_payload_size[NUM_PKT_TYPES] { sizeof(struct pkt_payload_epoch), // LOAD sizeof(struct pkt_payload_epoch), // PROBE - sizeof(struct pkt_payload_epoch), // RESP - sizeof(struct pkt_payload_stat) // STAT + sizeof(struct pkt_payload_epoch), // LOAD_RESP + sizeof(struct pkt_payload_epoch), // PROBE_RESP + sizeof(struct pkt_payload_stat) //STAT }; static inline void @@ -135,16 +127,13 @@ dump_pkt(struct rte_mbuf *pkt) // fills the packet with the information except for the payload itself static inline struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type, - struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac, - uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) + struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac) { rte_pktmbuf_reset(buf); const uint32_t total_sz = sizeof(struct pkt_hdr) + expected_payload_size[type]; struct pkt_hdr * pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz); struct rte_ether_hdr * eth_hdr; - struct rte_ipv4_hdr * ipv4_hdr; - struct rte_udp_hdr * udp_hdr; if (pkt_data == NULL) return NULL; @@ -153,55 +142,23 @@ struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type, buf->nb_segs = 1; // construct l2 header - eth_hdr = &pkt_data->hdr.eth_hdr; + eth_hdr = &pkt_data->eth_hdr; rte_ether_addr_copy(src_mac, ð_hdr->s_addr); - rte_ether_addr_copy(dst_mac, ð_hdr->d_addr); - eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4); - buf->l2_len = sizeof(struct rte_ether_hdr); - - // construct l3 header - ipv4_hdr = &pkt_data->hdr.ipv4_hdr; - memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr)); - ipv4_hdr->version_ihl = IP_VHL_DEF; - ipv4_hdr->type_of_service = 0; - ipv4_hdr->fragment_offset = 0; - ipv4_hdr->time_to_live = IP_DEFTTL; - ipv4_hdr->next_proto_id = IPPROTO_UDP; - ipv4_hdr->packet_id = 0; - ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip); - ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip); - ipv4_hdr->total_length = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr)); - ipv4_hdr->hdr_checksum = 0; - buf->l3_len = sizeof(struct rte_ipv4_hdr); - - // construct l4 header - udp_hdr = &pkt_data->hdr.udp_hdr; - udp_hdr->src_port = rte_cpu_to_be_16(src_port); - udp_hdr->dst_port = rte_cpu_to_be_16(dst_port); - udp_hdr->dgram_cksum = 0; /* No UDP checksum. */ - udp_hdr->dgram_len = rte_cpu_to_be_16(total_sz - - sizeof(struct rte_ether_hdr) - - sizeof(struct rte_udp_hdr)); - buf->l4_len = sizeof(struct rte_udp_hdr); - - /* construct ptp header - * so basically after some experiments at least the intel NIC categorizes PTP packets as: - * 1. Dest port 319 2. these two fields must be set and don't care about others - * Experiments: - * SPORT: 319 DPORT: 319 PTP HDR: valid => SET - * SPORT: 333 DPORT: 319 PTP HDR: valid => SET - * SPORT: 319 DPORT: 333 PTP HDR: valid => NOT SET - * SPORT: 319 DPORT: 319 PTP HDR: invalid => NOT SET */ - pkt_data->hdr.ptp_hdr.ptp_msg_type = 0x0; // SYNC - pkt_data->type = rte_cpu_to_be_16(type); - pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); - if (type == PKT_TYPE_PROBE) { - // only timestamp PROBE pkts - pkt_data->hdr.ptp_hdr.ptp_ver = 0x2; // VER 2 + if (type == PKT_TYPE_PROBE || type == PKT_TYPE_PROBE_RESP) { + rte_ether_addr_copy(&PROBE_MAC_ADDR, ð_hdr->d_addr); + eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_1588); + pkt_data->ptp_hdr.ptp_ver = 0x2; // VER 2 buf->ol_flags |= PKT_TX_IEEE1588_TMST; } else { - pkt_data->hdr.ptp_hdr.ptp_ver = 0xff; + rte_ether_addr_copy(dst_mac, ð_hdr->d_addr); + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_LOCAL_EXP); + pkt_data->ptp_hdr.ptp_ver = 0xff; } + buf->l2_len = sizeof(struct rte_ether_hdr); + + pkt_data->ptp_hdr.ptp_msg_type = 0x0; // SYNC + pkt_data->type = rte_cpu_to_be_16(type); + pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); return pkt_data; } diff --git a/inc/util.h b/inc/util.h index 9eca27a..20ab299 100644 --- a/inc/util.h +++ b/inc/util.h @@ -3,7 +3,8 @@ #include #include -constexpr static unsigned int S2NS = 100000000UL; +constexpr static unsigned long S2NS = 100000000UL; +constexpr static unsigned long S2US = 1000000L; constexpr static uint16_t SERVER_LOAD_PORT = 1234; constexpr static uint16_t SERVER_PROBE_PORT = 319; constexpr static uint32_t SERVER_IP = RTE_IPV4(192,168,123,0); diff --git a/khat/khat.cc b/khat/khat.cc index fc31c0c..03f6482 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -22,8 +22,20 @@ #include "ntrlog.h" #include "util.h" + +/* Protocol: +* regular client: +* client -> LOAD -> server +* server -> LOAD_RESP -> client +* measuring client: +* client -> PROBE -> server (client tx timestamps) +* server -> PROBE_RESP -> client (client rx timestamps and server tx/rx timestamps) +* server -> STAT -> client (server sends its tx/rx timestamps) +*/ + NTR_DECL_IMPL; +static void * const PROBE_MAGIC = (void*)0x12344444; constexpr static unsigned int MBUF_MAX_COUNT = 16384; constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int RX_RING_SIZE = 4096; @@ -42,21 +54,25 @@ static const struct rte_eth_conf port_conf_default{}; // I kept this global because globally there could be only one pending probe request // and rx_add_timestamp can save their shit here too struct probe_state_t { - struct pkt_proto_hdr hdr; + struct rte_ether_hdr hdr; uint32_t epoch; uint32_t timesync; uint64_t last_sw_rx; uint64_t last_sw_tx; uint64_t last_hw_rx; - uint64_t last_hw_tx; }; + +// state machine: +constexpr static int SERVER_STATE_WAIT = 0; +constexpr static int SERVER_STATE_PROBE = 1; + struct options_t { //states uint16_t s_portid; struct rte_ether_addr s_host_mac; struct rte_mempool * s_pkt_mempool; - std::atomic s_probe_influx{0}; + std::atomic s_state {SERVER_STATE_WAIT}; struct probe_state_t s_probe_info; }; @@ -78,26 +94,23 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, } if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { - int one = 0; + int state_wait = SERVER_STATE_WAIT; pkts[i]->userdata = nullptr; if (rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3) == 0) { - if (options.s_probe_influx.compare_exchange_strong(one, 1)) { + if (options.s_state.compare_exchange_strong(state_wait, SERVER_STATE_PROBE)) { // mark the mbuf as probe packet being processed - pkts[i]->userdata = &options.s_probe_info; - // jot down some information and stats - options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + // only the locore that receives the pkt w/ userdata != nullptr processes that packet + pkts[i]->userdata = PROBE_MAGIC; + // tag with timestamps options.s_probe_info.last_hw_rx = ts.tv_nsec + ts.tv_sec * S2NS; options.s_probe_info.last_sw_rx = now; - options.s_probe_info.timesync = pkts[i]->timesync; - // copy protocol header - rte_memcpy(&options.s_probe_info.hdr, &pkt_data->hdr, sizeof(struct pkt_proto_hdr)); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p epoch %d with sw: %llu hw:%llu.\n", (void*)pkts[i], options.s_probe_info.epoch, now, options.s_probe_info.last_hw_rx); } else - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - another probe packet influx.\n", (void*)pkts[i]); + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - server is processing a probe.\n", (void*)pkts[i]); } else ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n", (void*)pkts[i]); } else - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type)); } return nb_pkts; @@ -119,18 +132,17 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, continue; } - if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_STAT) { + if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) { // this packet is the response to PROBE packets // at this time the packet is not sent to the NIC yet so - // the state must be influx + // the state must be waiting stats // XXX: this should be an assert - if(options.s_probe_influx.load() != 1 || pkts[i]->userdata == nullptr) { + if(options.s_state.load() != SERVER_STATE_PROBE || pkts[i]->userdata != PROBE_MAGIC) { rte_exit(EXIT_FAILURE, "packet %p sent to NIC before sw callback\n", (void*)pkts[i]); } - // just tag the option - ((struct probe_state_t *)pkts[i]->userdata)->last_sw_tx = now; + options.s_probe_info.last_sw_tx = now; ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw tx %llu\n", (void*)pkts[i], options.s_probe_info.last_sw_tx); } else { @@ -147,12 +159,11 @@ locore_main(void * _unused __rte_unused) struct rte_mbuf *bufs[BURST_SIZE]; // + 1 because it might involve an extra PKT_TYPE_STAT packet // when all tx timestamps are ready - struct rte_mbuf *tx_bufs[BURST_SIZE + 1]; + struct rte_mbuf *tx_bufs[BURST_SIZE]; struct pkt_hdr *pkt_data; - struct probe_state_t *probe_state; uint32_t core_id = rte_lcore_id(); - bool pending_hw_txts = false; + bool pending_probe = false; if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " @@ -180,78 +191,86 @@ locore_main(void * _unused __rte_unused) continue; } - uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.dst_addr); - uint32_t src_ip = rte_be_to_cpu_32(pkt_data->hdr.ipv4_hdr.src_addr); - uint16_t src_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.src_port); - uint16_t dst_port = rte_be_to_cpu_16(pkt_data->hdr.udp_hdr.dst_port); - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, type %d\n", + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n", core_id, (void*)bufs[i], - (src_ip >> 24) & 0xff, - (src_ip >> 16) & 0xff, - (src_ip >> 8) & 0xff, - (src_ip >> 0) & 0xff, - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[0], - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[1], - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[2], - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[3], - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[4], - pkt_data->hdr.eth_hdr.s_addr.addr_bytes[5], - (dst_ip >> 24) & 0xff, - (dst_ip >> 16) & 0xff, - (dst_ip >> 8) & 0xff, - (dst_ip >> 0) & 0xff, - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[0], - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[1], - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[2], - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[3], - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[4], - pkt_data->hdr.eth_hdr.d_addr.addr_bytes[5], - src_port, - dst_port, + pkt_data->eth_hdr.s_addr.addr_bytes[0], + pkt_data->eth_hdr.s_addr.addr_bytes[1], + pkt_data->eth_hdr.s_addr.addr_bytes[2], + pkt_data->eth_hdr.s_addr.addr_bytes[3], + pkt_data->eth_hdr.s_addr.addr_bytes[4], + pkt_data->eth_hdr.s_addr.addr_bytes[5], + pkt_data->eth_hdr.d_addr.addr_bytes[0], + pkt_data->eth_hdr.d_addr.addr_bytes[1], + pkt_data->eth_hdr.d_addr.addr_bytes[2], + pkt_data->eth_hdr.d_addr.addr_bytes[3], + pkt_data->eth_hdr.d_addr.addr_bytes[4], + pkt_data->eth_hdr.d_addr.addr_bytes[5], rte_be_to_cpu_16(pkt_data->type)); - + + switch (rte_be_to_cpu_16(pkt_data->type)) { - case PKT_TYPE_PROBE: - if (bufs[i]->userdata != nullptr) { - probe_state = (struct probe_state_t *)bufs[i]->userdata; - pending_hw_txts = true; + case PKT_TYPE_PROBE: { + if (options.s_state.load() == SERVER_STATE_PROBE && bufs[i]->userdata == PROBE_MAGIC) { + // send back probe_resp pkt to probe for return latency + pending_probe = true; + + // book keep probe results + options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); + options.s_probe_info.timesync = bufs[i]->timesync; + rte_memcpy(&options.s_probe_info.hdr, &pkt_data->eth_hdr, sizeof(struct rte_ether_hdr)); + + pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); + + if (pkt_buf == NULL) { + rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + } + + tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP, + &options.s_host_mac, + &pkt_data->eth_hdr.s_addr); + + if (tx_data == NULL) { + rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); + } + + rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch)); + + pkt_buf->userdata = PROBE_MAGIC; + + // queue for burst send + tx_bufs[nb_tx++] = pkt_buf; } break; - case PKT_TYPE_LOAD: - // swap s_addr and d_addr - // XXX: can we avoid this allocation? + } + case PKT_TYPE_LOAD: { + // we reply to load packet regardless of the server state pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); + if (pkt_buf == NULL) { rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); } - tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_RESP, + tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP, &options.s_host_mac, - &pkt_data->hdr.eth_hdr.s_addr, - dst_ip, - src_ip, - dst_port, - src_port); + &pkt_data->eth_hdr.s_addr); if (tx_data == NULL) { rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); } - // endianess doesn't matter rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch)); - tx_data->magic = pkt_data->magic; // queue for burst send tx_bufs[nb_tx++] = pkt_buf; - // free rx packet - rte_pktmbuf_free(bufs[i]); break; + } default: - continue; + break; } + rte_pktmbuf_free(bufs[i]); } - + // send the packets if (nb_tx > 0) { const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); @@ -261,7 +280,7 @@ locore_main(void * _unused __rte_unused) } // we wanna check every loop not only when there are packets - if (pending_hw_txts) { + if (pending_probe) { struct timespec ts; struct pkt_payload_stat * stat; if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) { @@ -274,27 +293,28 @@ locore_main(void * _unused __rte_unused) tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT, &options.s_host_mac, - &probe_state->hdr.eth_hdr.s_addr, - rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.dst_addr), - rte_be_to_cpu_32(probe_state->hdr.ipv4_hdr.src_addr), - rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port), - rte_be_to_cpu_16(probe_state->hdr.udp_hdr.dst_port)); + &options.s_probe_info.hdr.s_addr); // populate stats stat = (struct pkt_payload_stat *)tx_data->payload; - stat->epoch = rte_cpu_to_be_64(probe_state->epoch); - stat->hw_rx = rte_cpu_to_be_64(probe_state->last_hw_rx); - stat->hw_tx = rte_cpu_to_be_64(probe_state->last_hw_tx); - stat->sw_rx = rte_cpu_to_be_64(probe_state->last_sw_rx); - stat->sw_tx = rte_cpu_to_be_64(probe_state->last_sw_tx); - tx_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); - - // queue for burst send - tx_bufs[nb_tx++] = pkt_buf; + stat->epoch = rte_cpu_to_be_32(options.s_probe_info.epoch); + stat->hw_rx = rte_cpu_to_be_64(options.s_probe_info.last_hw_rx); + stat->hw_tx = rte_cpu_to_be_64(ts.tv_nsec + ts.tv_sec * S2NS); + stat->sw_rx = rte_cpu_to_be_64(options.s_probe_info.last_sw_rx); + stat->sw_tx = rte_cpu_to_be_64(options.s_probe_info.last_sw_tx); + + // send the packet + if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) { + rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); + } // release flux - pending_hw_txts = false; - options.s_probe_influx.store(0); + pending_probe = false; + + int expected = SERVER_STATE_PROBE; + if (!options.s_state.compare_exchange_strong(expected, SERVER_STATE_WAIT)) { + rte_exit(EXIT_FAILURE, "s_state changed unexpectedly!"); + } } } }