add ipv4 and udp. Need to fix link up.

This commit is contained in:
quackerd 2020-11-18 09:01:55 +08:00
parent c34be253f7
commit e9e15caea8
4 changed files with 254 additions and 44 deletions

View File

@ -11,7 +11,11 @@ project(khat)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}")
find_package(dpdk REQUIRED)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations -msse4)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
-Wno-deprecated-declarations
-Wno-packed-not-aligned
-Wno-address-of-packed-member
-msse4)
include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS})

View File

@ -19,6 +19,7 @@
#include "ntrlog.h"
#include "pkt.h"
#include "rte_byteorder.h"
#include "rte_ip.h"
// init NTRLOG
NTR_DECL_IMPL;
@ -31,7 +32,6 @@ constexpr unsigned int RX_RING_NUM = 1;
constexpr unsigned int TX_RING_NUM = 1;
constexpr unsigned int BURST_SIZE = 32;
static const struct rte_eth_conf port_conf_default{};
struct datapt{
@ -114,6 +114,9 @@ locore_main(void * _unused __rte_unused)
uint32_t epoch = 0;
int state = STATE_SEND;
// XXX: check link status instead
sleep(1);
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
@ -128,34 +131,32 @@ locore_main(void * _unused __rte_unused)
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
}
tx_buf->l2_len = sizeof(struct rte_ether_hdr);
tx_buf->nb_segs = 1;
pkt_data = (struct packet_data *)rte_pktmbuf_append(tx_buf, sizeof(struct packet_data));
pkt_data = construct_udp_pkt_hdr(tx_buf,
&options.s_host_mac, &options.server_mac,
RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151),
1337, 1337);
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
}
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
pkt_data->eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
rte_ether_addr_copy(&options.server_mac, &pkt_data->eth_hdr.d_addr);
rte_ether_addr_copy(&options.s_host_mac, &pkt_data->eth_hdr.s_addr);
while(!options.s_stop.load()) {
// always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
if (nb_rx != 0 && state == STATE_RECV) {
if (nb_rx != 0) {
// only process packets when we are ready to receive
for (int i = 0; i < nb_rx; i++) {
struct packet_data * each = check_valid_packet(rx_bufs[i]);
if (each == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
dump_pkt(rx_bufs[i]);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
if (each->epoch == epoch) {
if (rte_be_to_cpu_32(each->epoch) == epoch && state == STATE_RECV) {
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: received packet %p for epoch %d\n", (void*)rx_bufs[i], epoch);
if (options.s_record.load()) {
@ -169,16 +170,17 @@ locore_main(void * _unused __rte_unused)
// bump the epoch and stop processing other packets
state = STATE_SEND;
epoch++;
break;
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: ignoring packet 0x%p with invalid epoch %d.\n", (void*)rx_bufs[i], epoch);
}
rte_pktmbuf_free(rx_bufs[i]);
}
}
if (state == STATE_SEND) {
// set new epoch
pkt_data->epoch = epoch;
pkt_data->epoch = rte_cpu_to_be_32(epoch);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1);
@ -218,9 +220,12 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
return ret;
}
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
@ -235,7 +240,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
}
@ -290,9 +295,14 @@ static void dump_options()
static void usage()
{
fprintf(stdout, "Usage:\n \
-v: verbose mode\n \
-h: display the information\n\n");
fprintf(stdout,
"Usage:\n " \
" -v(vv): verbose mode\n" \
" -h: display the information\n" \
" -o: output filename\n" \
" -t: run time\n" \
" -T: warmup time\n" \
" -s: server's mac\n\n" );
}
int main(int argc, char* argv[])

142
inc/pkt.h
View File

@ -1,15 +1,34 @@
#pragma once
#include <rte_mbuf_core.h>
#include <rte_mbuf.h>
#include <rte_udp.h>
#include <rte_byteorder.h>
#include <rte_ip.h>
#include <stdint.h>
#include <rte_flow.h>
#include <rte_ether.h>
#include <unistd.h>
#include <rte_net.h>
#include <rte_vxlan.h>
#define IP_DEFTTL 64 /* from RFC 1340. */
#define IP_VERSION 0x40
#define IP_HDRLEN 0x05 /* default IP header length == five 32-bits words. */
#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
#define IP_ADDR_FMT_SIZE 15
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
struct packet_hdr {
struct rte_ether_hdr eth_hdr;
struct rte_ipv4_hdr ipv4_hdr;
struct rte_udp_hdr udp_hdr;
} __attribute__((packed));
struct packet_data
{
struct rte_ether_hdr eth_hdr;
struct packet_hdr pkt_hdr;
uint32_t magic;
uint32_t epoch;
uint64_t clt_ts_tx;
@ -18,6 +37,125 @@ struct packet_data
uint64_t srv_ts_rx;
};
static inline void
print_mac(struct rte_ether_addr * mac)
{
printf("%x:%x:%x:%x:%x:%x", mac->addr_bytes[0],
mac->addr_bytes[1],
mac->addr_bytes[2],
mac->addr_bytes[3],
mac->addr_bytes[4],
mac->addr_bytes[5]);
}
static inline void
print_ipv4(uint32_t ip)
{
printf("%d-%d-%d-%d", (ip >> 24) & 0xff,
(ip >> 16) & 0xff,
(ip >> 8) & 0xff,
(ip >> 0) & 0xff);
}
static inline void
dump_pkt(struct rte_mbuf *pkt)
{
if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr)) {
return;
}
struct rte_ether_hdr _eth_hdr;
struct rte_ether_hdr * eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_read(pkt, 0, sizeof(struct rte_ether_hdr), &_eth_hdr);
if (eth_hdr == NULL) {
return;
}
// ethernet frame
printf("Packet %p: Length 0x%x\n", (void*)pkt, rte_pktmbuf_data_len(pkt));
printf(" Ethernet header:\n");
printf(" Src:");
print_mac(&eth_hdr->s_addr);
printf("\n");
printf(" Dst:");
print_mac(&eth_hdr->d_addr);
printf("\n");
printf(" Type: 0x%x\n", rte_be_to_cpu_16(eth_hdr->ether_type));
uint16_t ether_type = rte_be_to_cpu_16(eth_hdr->ether_type);
if (ether_type != RTE_ETHER_TYPE_IPV4) {
return;
}
if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)) {
return;
}
// dump ip header
struct rte_ipv4_hdr * ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
printf(" IPv4 header:\n");
printf(" Src:");
print_ipv4(rte_be_to_cpu_32(ipv4_hdr->src_addr));
printf("\n");
printf(" Dst:");
print_ipv4(rte_be_to_cpu_32(ipv4_hdr->dst_addr));
printf("\n");
printf(" Protocol: 0x%x\n", ipv4_hdr->next_proto_id);
}
static inline
struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac,
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
{
rte_pktmbuf_reset(buf);
struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data));
struct rte_ether_hdr * eth_hdr;
struct rte_ipv4_hdr * ipv4_hdr;
struct rte_udp_hdr * udp_hdr;
if (pkt_data == NULL)
return NULL;
// single segment
buf->nb_segs = 1;
// construct l2 header
eth_hdr = &pkt_data->pkt_hdr.eth_hdr;
rte_ether_addr_copy(src_mac, &eth_hdr->s_addr);
rte_ether_addr_copy(dst_mac, &eth_hdr->d_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
buf->l2_len = sizeof(struct rte_ether_hdr);
// construct l3 header
ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr;
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
ipv4_hdr->version_ihl = IP_VHL_DEF;
ipv4_hdr->type_of_service = 0;
ipv4_hdr->fragment_offset = 0;
ipv4_hdr->time_to_live = IP_DEFTTL;
ipv4_hdr->next_proto_id = IPPROTO_UDP;
ipv4_hdr->packet_id = 0;
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr));
ipv4_hdr->hdr_checksum = 0;
buf->l3_len = sizeof(struct rte_ipv4_hdr);
// construct l4 header
udp_hdr = &pkt_data->pkt_hdr.udp_hdr;
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) -
sizeof(struct rte_ether_hdr) -
sizeof(struct rte_udp_hdr));
buf->l4_len = sizeof(struct rte_udp_hdr);
return pkt_data;
}
static inline
struct packet_data * check_valid_packet(struct rte_mbuf * pkt)
{
@ -29,7 +167,7 @@ struct packet_data * check_valid_packet(struct rte_mbuf * pkt)
pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *);
if (rte_be_to_cpu_16(pkt_data->eth_hdr.ether_type) == RTE_ETHER_TYPE_IPV4 && rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) {
if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) {
return pkt_data;
}

View File

@ -15,6 +15,8 @@
#include "pkt.h"
#include "ntrlog.h"
#include "rte_arp.h"
#include "rte_mbuf_core.h"
NTR_DECL_IMPL;
@ -32,6 +34,7 @@ struct options_t {
//states
uint16_t s_portid;
struct rte_ether_addr s_host_mac;
struct rte_mempool * s_pkt_mempool;
};
struct options_t options;
@ -86,7 +89,6 @@ locore_main(void * _unused __rte_unused)
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct packet_data *pkt_data;
struct rte_ether_addr eth_addr;
uint32_t core_id = rte_lcore_id();
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
@ -111,33 +113,75 @@ locore_main(void * _unused __rte_unused)
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]);
dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]);
continue;
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d rx packet %p from %x:%x:%x:%x:%x:%x\n", core_id, (void*)bufs[i],
pkt_data->eth_hdr.s_addr.addr_bytes[0],
pkt_data->eth_hdr.s_addr.addr_bytes[1],
pkt_data->eth_hdr.s_addr.addr_bytes[2],
pkt_data->eth_hdr.s_addr.addr_bytes[3],
pkt_data->eth_hdr.s_addr.addr_bytes[4],
pkt_data->eth_hdr.s_addr.addr_bytes[5]);
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr);
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr);
uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port);
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n",
core_id,
(void*)bufs[i],
(src_ip >> 24) & 0xff,
(src_ip >> 16) & 0xff,
(src_ip >> 8) & 0xff,
(src_ip >> 0) & 0xff,
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5],
(dst_ip >> 24) & 0xff,
(dst_ip >> 16) & 0xff,
(dst_ip >> 8) & 0xff,
(dst_ip >> 0) & 0xff,
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5],
src_port,
dst_port,
rte_be_to_cpu_32(pkt_data->epoch));
// swap s_addr and d_addr
rte_ether_addr_copy(&pkt_data->eth_hdr.s_addr, &eth_addr);
rte_ether_addr_copy(&pkt_data->eth_hdr.d_addr, &pkt_data->eth_hdr.s_addr);
rte_ether_addr_copy(&eth_addr, &pkt_data->eth_hdr.d_addr);
struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf");
}
struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf,
&options.s_host_mac,
&pkt_data->pkt_hdr.eth_hdr.s_addr,
dst_ip,
src_ip,
dst_port,
src_port);
if (tx_data == NULL) {
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
}
// copy, endianess doesn't matter
tx_data->epoch = pkt_data->epoch;
tx_data->magic = pkt_data->magic;
tx_data->clt_ts_rx = pkt_data->clt_ts_rx;
tx_data->clt_ts_tx = pkt_data->clt_ts_tx;
tx_data->srv_ts_rx = pkt_data->srv_ts_rx;
tx_data->srv_ts_tx = pkt_data->srv_ts_tx;
// queue for burst send
tx_bufs[nb_tx++] = bufs[i];
tx_bufs[nb_tx++] = pkt_buf;
// free rx packet
rte_pktmbuf_free(bufs[i]);
}
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx);
// cleanup unsent packets
// don't need to free others because it's offloaded
if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n");
// rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1,"locore_main: core %d some packets failed to queue to tx - expected: %d sent: %d", core_id, nb_tx, nb_tx_succ);
// for (uint16_t buf = nb_tx_succ; buf < nb_tx; buf++)
// rte_pktmbuf_free(tx_bufs[buf]);
}
}
@ -164,9 +208,12 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
return ret;
}
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
@ -218,15 +265,16 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
static void usage()
{
fprintf(stdout, "Usage:\n \
-v: verbose mode\n \
-h: display the information\n");
fprintf(stdout,
"Usage:\n" \
" -v(vv): verbose mode\n" \
" -h: display the information\n");
}
int main(int argc, char* argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt;
// init dpdk
int ret = rte_eal_init(argc, argv);
@ -259,6 +307,8 @@ int main(int argc, char* argv[])
}
}
// XXX: singal handler to exit
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
@ -270,6 +320,14 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
// create a pkt mbuf memory pool on the socket
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool_pkt == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n");
}
options.s_pkt_mempool = mbuf_pool_pkt;
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");