diff --git a/CMakeLists.txt b/CMakeLists.txt index a9a1b42..733e1c8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,11 @@ project(khat) list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}") find_package(dpdk REQUIRED) -set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations -msse4) +set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 + -Wno-deprecated-declarations + -Wno-packed-not-aligned + -Wno-address-of-packed-member + -msse4) include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${dpdk_INCLUDE_DIRS}) diff --git a/cat/cat.cc b/cat/cat.cc index f078da7..4407683 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -19,6 +19,7 @@ #include "ntrlog.h" #include "pkt.h" #include "rte_byteorder.h" +#include "rte_ip.h" // init NTRLOG NTR_DECL_IMPL; @@ -31,7 +32,6 @@ constexpr unsigned int RX_RING_NUM = 1; constexpr unsigned int TX_RING_NUM = 1; constexpr unsigned int BURST_SIZE = 32; - static const struct rte_eth_conf port_conf_default{}; struct datapt{ @@ -114,6 +114,9 @@ locore_main(void * _unused __rte_unused) uint32_t epoch = 0; int state = STATE_SEND; + // XXX: check link status instead + + sleep(1); if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " "polling thread.\n\tPerformance will " @@ -128,34 +131,32 @@ locore_main(void * _unused __rte_unused) rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n"); } - tx_buf->l2_len = sizeof(struct rte_ether_hdr); - tx_buf->nb_segs = 1; - - pkt_data = (struct packet_data *)rte_pktmbuf_append(tx_buf, sizeof(struct packet_data)); + pkt_data = construct_udp_pkt_hdr(tx_buf, + &options.s_host_mac, &options.server_mac, + RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151), + 1337, 1337); if (pkt_data == NULL) { rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n"); } - pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC); - pkt_data->eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4); - rte_ether_addr_copy(&options.server_mac, &pkt_data->eth_hdr.d_addr); - rte_ether_addr_copy(&options.s_host_mac, &pkt_data->eth_hdr.s_addr); while(!options.s_stop.load()) { // always pop incoming packets const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE); - if (nb_rx != 0 && state == STATE_RECV) { + if (nb_rx != 0) { // only process packets when we are ready to receive for (int i = 0; i < nb_rx; i++) { struct packet_data * each = check_valid_packet(rx_bufs[i]); if (each == NULL) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]); + dump_pkt(rx_bufs[i]); + rte_pktmbuf_free(rx_bufs[i]); continue; } - if (each->epoch == epoch) { + if (rte_be_to_cpu_32(each->epoch) == epoch && state == STATE_RECV) { ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: received packet %p for epoch %d\n", (void*)rx_bufs[i], epoch); if (options.s_record.load()) { @@ -169,16 +170,17 @@ locore_main(void * _unused __rte_unused) // bump the epoch and stop processing other packets state = STATE_SEND; epoch++; - break; } else { ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: ignoring packet 0x%p with invalid epoch %d.\n", (void*)rx_bufs[i], epoch); } + + rte_pktmbuf_free(rx_bufs[i]); } } if (state == STATE_SEND) { // set new epoch - pkt_data->epoch = epoch; + pkt_data->epoch = rte_cpu_to_be_32(epoch); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1); @@ -218,9 +220,12 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) return ret; } - if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) - port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); @@ -235,7 +240,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) rxconf = dev_info.default_rxconf; rxconf.offloads = port_conf.rxmode.offloads; for (uint32_t i = 0; i < RX_RING_NUM; i++) { - ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), NULL, mbuf_pool); + ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; } @@ -290,9 +295,14 @@ static void dump_options() static void usage() { - fprintf(stdout, "Usage:\n \ - -v: verbose mode\n \ - -h: display the information\n\n"); + fprintf(stdout, + "Usage:\n " \ + " -v(vv): verbose mode\n" \ + " -h: display the information\n" \ + " -o: output filename\n" \ + " -t: run time\n" \ + " -T: warmup time\n" \ + " -s: server's mac\n\n" ); } int main(int argc, char* argv[]) diff --git a/inc/pkt.h b/inc/pkt.h index fb1bcdb..fe4d08e 100644 --- a/inc/pkt.h +++ b/inc/pkt.h @@ -1,15 +1,34 @@ #pragma once +#include +#include +#include #include #include #include +#include #include #include +#include +#include + +#define IP_DEFTTL 64 /* from RFC 1340. */ +#define IP_VERSION 0x40 +#define IP_HDRLEN 0x05 /* default IP header length == five 32-bits words. */ +#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN) +#define IP_ADDR_FMT_SIZE 15 constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5; + +struct packet_hdr { + struct rte_ether_hdr eth_hdr; + struct rte_ipv4_hdr ipv4_hdr; + struct rte_udp_hdr udp_hdr; +} __attribute__((packed)); + struct packet_data { - struct rte_ether_hdr eth_hdr; + struct packet_hdr pkt_hdr; uint32_t magic; uint32_t epoch; uint64_t clt_ts_tx; @@ -18,6 +37,125 @@ struct packet_data uint64_t srv_ts_rx; }; +static inline void +print_mac(struct rte_ether_addr * mac) +{ + printf("%x:%x:%x:%x:%x:%x", mac->addr_bytes[0], + mac->addr_bytes[1], + mac->addr_bytes[2], + mac->addr_bytes[3], + mac->addr_bytes[4], + mac->addr_bytes[5]); +} + +static inline void +print_ipv4(uint32_t ip) +{ + printf("%d-%d-%d-%d", (ip >> 24) & 0xff, + (ip >> 16) & 0xff, + (ip >> 8) & 0xff, + (ip >> 0) & 0xff); +} + +static inline void +dump_pkt(struct rte_mbuf *pkt) +{ + if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr)) { + return; + } + + struct rte_ether_hdr _eth_hdr; + struct rte_ether_hdr * eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_read(pkt, 0, sizeof(struct rte_ether_hdr), &_eth_hdr); + if (eth_hdr == NULL) { + return; + } + + // ethernet frame + printf("Packet %p: Length 0x%x\n", (void*)pkt, rte_pktmbuf_data_len(pkt)); + printf(" Ethernet header:\n"); + printf(" Src:"); + print_mac(ð_hdr->s_addr); + printf("\n"); + printf(" Dst:"); + print_mac(ð_hdr->d_addr); + printf("\n"); + printf(" Type: 0x%x\n", rte_be_to_cpu_16(eth_hdr->ether_type)); + + uint16_t ether_type = rte_be_to_cpu_16(eth_hdr->ether_type); + if (ether_type != RTE_ETHER_TYPE_IPV4) { + return; + } + + if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)) { + return; + } + + // dump ip header + struct rte_ipv4_hdr * ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1); + printf(" IPv4 header:\n"); + printf(" Src:"); + print_ipv4(rte_be_to_cpu_32(ipv4_hdr->src_addr)); + printf("\n"); + printf(" Dst:"); + print_ipv4(rte_be_to_cpu_32(ipv4_hdr->dst_addr)); + printf("\n"); + printf(" Protocol: 0x%x\n", ipv4_hdr->next_proto_id); + +} + +static inline +struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf, + struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac, + uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + rte_pktmbuf_reset(buf); + + struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data)); + struct rte_ether_hdr * eth_hdr; + struct rte_ipv4_hdr * ipv4_hdr; + struct rte_udp_hdr * udp_hdr; + + if (pkt_data == NULL) + return NULL; + + // single segment + buf->nb_segs = 1; + + // construct l2 header + eth_hdr = &pkt_data->pkt_hdr.eth_hdr; + rte_ether_addr_copy(src_mac, ð_hdr->s_addr); + rte_ether_addr_copy(dst_mac, ð_hdr->d_addr); + eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4); + buf->l2_len = sizeof(struct rte_ether_hdr); + + // construct l3 header + ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr; + memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr)); + ipv4_hdr->version_ihl = IP_VHL_DEF; + ipv4_hdr->type_of_service = 0; + ipv4_hdr->fragment_offset = 0; + ipv4_hdr->time_to_live = IP_DEFTTL; + ipv4_hdr->next_proto_id = IPPROTO_UDP; + ipv4_hdr->packet_id = 0; + ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip); + ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip); + ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr)); + ipv4_hdr->hdr_checksum = 0; + buf->l3_len = sizeof(struct rte_ipv4_hdr); + + // construct l4 header + udp_hdr = &pkt_data->pkt_hdr.udp_hdr; + udp_hdr->src_port = rte_cpu_to_be_16(src_port); + udp_hdr->dst_port = rte_cpu_to_be_16(dst_port); + udp_hdr->dgram_cksum = 0; /* No UDP checksum. */ + udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) - + sizeof(struct rte_ether_hdr) - + sizeof(struct rte_udp_hdr)); + buf->l4_len = sizeof(struct rte_udp_hdr); + + return pkt_data; +} + static inline struct packet_data * check_valid_packet(struct rte_mbuf * pkt) { @@ -29,7 +167,7 @@ struct packet_data * check_valid_packet(struct rte_mbuf * pkt) pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *); - if (rte_be_to_cpu_16(pkt_data->eth_hdr.ether_type) == RTE_ETHER_TYPE_IPV4 && rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) { + if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) { return pkt_data; } diff --git a/khat/khat.cc b/khat/khat.cc index 552acf5..ab509d3 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -15,6 +15,8 @@ #include "pkt.h" #include "ntrlog.h" +#include "rte_arp.h" +#include "rte_mbuf_core.h" NTR_DECL_IMPL; @@ -32,6 +34,7 @@ struct options_t { //states uint16_t s_portid; struct rte_ether_addr s_host_mac; + struct rte_mempool * s_pkt_mempool; }; struct options_t options; @@ -86,7 +89,6 @@ locore_main(void * _unused __rte_unused) struct rte_mbuf *bufs[BURST_SIZE]; struct rte_mbuf *tx_bufs[BURST_SIZE]; struct packet_data *pkt_data; - struct rte_ether_addr eth_addr; uint32_t core_id = rte_lcore_id(); if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { @@ -111,33 +113,75 @@ locore_main(void * _unused __rte_unused) if (pkt_data == NULL) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]); + dump_pkt(bufs[i]); + rte_pktmbuf_free(bufs[i]); continue; } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d rx packet %p from %x:%x:%x:%x:%x:%x\n", core_id, (void*)bufs[i], - pkt_data->eth_hdr.s_addr.addr_bytes[0], - pkt_data->eth_hdr.s_addr.addr_bytes[1], - pkt_data->eth_hdr.s_addr.addr_bytes[2], - pkt_data->eth_hdr.s_addr.addr_bytes[3], - pkt_data->eth_hdr.s_addr.addr_bytes[4], - pkt_data->eth_hdr.s_addr.addr_bytes[5]); - + uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr); + uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr); + uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port); + uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n", + core_id, + (void*)bufs[i], + (src_ip >> 24) & 0xff, + (src_ip >> 16) & 0xff, + (src_ip >> 8) & 0xff, + (src_ip >> 0) & 0xff, + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0], + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1], + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2], + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3], + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4], + pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5], + (dst_ip >> 24) & 0xff, + (dst_ip >> 16) & 0xff, + (dst_ip >> 8) & 0xff, + (dst_ip >> 0) & 0xff, + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0], + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1], + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2], + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3], + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4], + pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5], + src_port, + dst_port, + rte_be_to_cpu_32(pkt_data->epoch)); // swap s_addr and d_addr - rte_ether_addr_copy(&pkt_data->eth_hdr.s_addr, ð_addr); - rte_ether_addr_copy(&pkt_data->eth_hdr.d_addr, &pkt_data->eth_hdr.s_addr); - rte_ether_addr_copy(ð_addr, &pkt_data->eth_hdr.d_addr); + struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); + if (pkt_buf == NULL) { + rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf"); + } + struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf, + &options.s_host_mac, + &pkt_data->pkt_hdr.eth_hdr.s_addr, + dst_ip, + src_ip, + dst_port, + src_port); + if (tx_data == NULL) { + rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); + } + // copy, endianess doesn't matter + tx_data->epoch = pkt_data->epoch; + tx_data->magic = pkt_data->magic; + tx_data->clt_ts_rx = pkt_data->clt_ts_rx; + tx_data->clt_ts_tx = pkt_data->clt_ts_tx; + tx_data->srv_ts_rx = pkt_data->srv_ts_rx; + tx_data->srv_ts_tx = pkt_data->srv_ts_tx; // queue for burst send - tx_bufs[nb_tx++] = bufs[i]; + tx_bufs[nb_tx++] = pkt_buf; + // free rx packet + rte_pktmbuf_free(bufs[i]); } const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); // cleanup unsent packets + // don't need to free others because it's offloaded if (nb_tx_succ < nb_tx) { rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); - // rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1,"locore_main: core %d some packets failed to queue to tx - expected: %d sent: %d", core_id, nb_tx, nb_tx_succ); - // for (uint16_t buf = nb_tx_succ; buf < nb_tx; buf++) - // rte_pktmbuf_free(tx_bufs[buf]); } } @@ -164,9 +208,12 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) return ret; } - if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) - port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM; + port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM; + port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); @@ -218,15 +265,16 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) static void usage() { - fprintf(stdout, "Usage:\n \ - -v: verbose mode\n \ - -h: display the information\n"); + fprintf(stdout, + "Usage:\n" \ + " -v(vv): verbose mode\n" \ + " -h: display the information\n"); } int main(int argc, char* argv[]) { unsigned int nb_ports; - struct rte_mempool *mbuf_pool; + struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; // init dpdk int ret = rte_eal_init(argc, argv); @@ -259,6 +307,8 @@ int main(int argc, char* argv[]) } } + // XXX: singal handler to exit + nb_ports = rte_eth_dev_count_avail(); if (nb_ports == 0) { rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); @@ -270,6 +320,14 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); } + // create a pkt mbuf memory pool on the socket + mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + if (mbuf_pool_pkt == nullptr) { + rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n"); + } + options.s_pkt_mempool = mbuf_pool_pkt; + + uint16_t portid = rte_eth_find_next(0); if (portid == RTE_MAX_ETHPORTS) { rte_exit(EXIT_FAILURE, "cannot find an available port\n");