initial commit

This commit is contained in:
quackerd 2020-11-13 09:46:27 -05:00
commit cadd5a21a4
6 changed files with 936 additions and 0 deletions

23
CMakeLists.txt Normal file
View File

@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.0)
project(khat)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}")
find_package(dpdk REQUIRED)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations -msse4)
include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS})
add_executable(khat khat/khat.cc)
add_executable(cat cat/cat.cc)
set(LINK_LIBS ${dpdk_LIBRARIES} pthread numa)
target_link_libraries(khat ${LINK_LIBS})
target_compile_options(khat PRIVATE ${CC_FLAGS})
target_link_libraries(cat ${LINK_LIBS})
target_compile_options(cat PRIVATE ${CC_FLAGS})

142
Finddpdk.cmake Normal file
View File

@ -0,0 +1,142 @@
# Try to find dpdk
#
# Once done, this will define
#
# dpdk::dpdk
# dpdk_FOUND
# dpdk_INCLUDE_DIR
# dpdk_LIBRARIES
find_package(PkgConfig QUIET)
if(PKG_CONFIG_FOUND)
pkg_check_modules(dpdk QUIET libdpdk)
endif()
if(dpdk_INCLUDE_DIRS)
# good
elseif(TARGET dpdk::dpdk)
get_target_property(dpdk_INCLUDE_DIRS
dpdk::dpdk INTERFACE_INCLUDE_DIRECTORIES)
else()
find_path(dpdk_config_INCLUDE_DIR rte_config.h
HINTS
ENV DPDK_DIR
PATH_SUFFIXES
dpdk
include)
find_path(dpdk_common_INCLUDE_DIR rte_common.h
HINTS
ENC DPDK_DIR
PATH_SUFFIXES
dpdk
include)
set(dpdk_INCLUDE_DIRS "${dpdk_config_INCLUDE_DIR}")
if(NOT dpdk_config_INCLUDE_DIR EQUAL dpdk_common_INCLUDE_DIR)
list(APPEND dpdk_INCLUDE_DIRS "${dpdk_common_INCLUDE_DIR}")
endif()
endif()
set(components
bus_pci
bus_vdev
cfgfile
cmdline
eal
ethdev
hash
kvargs
mbuf
mempool
mempool_ring
mempool_stack
net
pci
pmd_af_packet
pmd_bnxt
pmd_bond
pmd_cxgbe
pmd_e1000
pmd_ena
pmd_enic
pmd_i40e
pmd_ixgbe
pmd_mlx5
pmd_nfp
pmd_qede
pmd_ring
pmd_sfc_efx
pmd_vmxnet3_uio
ring
timer)
# for collecting dpdk library targets, it will be used when defining dpdk::dpdk
set(_dpdk_libs)
# for list of dpdk library archive paths
set(dpdk_LIBRARIES)
foreach(c ${components})
set(dpdk_lib dpdk::${c})
if(TARGET ${dpdk_lib})
get_target_property(DPDK_rte_${c}_LIBRARY
${dpdk_lib} IMPORTED_LOCATION)
else()
find_library(DPDK_rte_${c}_LIBRARY rte_${c}
HINTS
ENV DPDK_DIR
${dpdk_LIBRARY_DIRS}
PATH_SUFFIXES lib)
endif()
if(DPDK_rte_${c}_LIBRARY)
if (NOT TARGET ${dpdk_lib})
add_library(${dpdk_lib} UNKNOWN IMPORTED)
set_target_properties(${dpdk_lib} PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${dpdk_INCLUDE_DIRS}"
IMPORTED_LOCATION "${DPDK_rte_${c}_LIBRARY}")
if(c STREQUAL pmd_mlx5)
find_package(verbs QUIET)
if(verbs_FOUND)
target_link_libraries(${dpdk_lib} INTERFACE IBVerbs::verbs)
endif()
endif()
endif()
list(APPEND _dpdk_libs ${dpdk_lib})
list(APPEND dpdk_LIBRARIES ${DPDK_rte_${c}_LIBRARY})
endif()
endforeach()
mark_as_advanced(dpdk_INCLUDE_DIRS ${dpdk_LIBRARIES})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(dpdk DEFAULT_MSG
dpdk_INCLUDE_DIRS
dpdk_LIBRARIES)
if(dpdk_FOUND)
if(NOT TARGET dpdk::cflags)
if(CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64|AMD64")
set(rte_cflags "-march=core2")
elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "arm|ARM")
set(rte_cflags "-march=armv7-a")
elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
set(rte_cflags "-march=armv8-a+crc")
endif()
add_library(dpdk::cflags INTERFACE IMPORTED)
if (rte_cflags)
set_target_properties(dpdk::cflags PROPERTIES
INTERFACE_COMPILE_OPTIONS "${rte_cflags}")
endif()
endif()
if(NOT TARGET dpdk::dpdk)
add_library(dpdk::dpdk INTERFACE IMPORTED)
find_package(Threads QUIET)
list(APPEND _dpdk_libs
Threads::Threads
dpdk::cflags)
set_target_properties(dpdk::dpdk PROPERTIES
INTERFACE_LINK_LIBRARIES "${_dpdk_libs}"
INTERFACE_INCLUDE_DIRECTORIES "${dpdk_INCLUDE_DIRS}")
endif()
endif()
unset(_dpdk_libs)

400
cat/cat.cc Normal file
View File

@ -0,0 +1,400 @@
#include <ctime>
#include <netinet/in.h>
#include <rte_config.h>
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_log.h>
#include <atomic>
#include <vector>
#include <fstream>
#include <unistd.h>
#include "pkt.h"
constexpr unsigned int MBUF_MAX_COUNT = 1024;
constexpr unsigned int MBUF_CACHE_SIZE = 256;
constexpr unsigned int RX_RING_SIZE = 1024;
constexpr unsigned int TX_RING_SIZE = 1024;
constexpr unsigned int RX_RING_NUM = 1;
constexpr unsigned int TX_RING_NUM = 1;
constexpr unsigned int BURST_SIZE = 4;
struct datapt{
uint64_t server_proc = 0;
uint64_t rtt = 0;
};
struct options_t {
unsigned int run_time = 5;
unsigned int warmup_time = 0;
char output[256] = "output.txt";
struct rte_ether_addr server_mac;
// states
std::atomic<bool> s_stop {false};
std::atomic<bool> s_record {false};
std::vector<struct datapt *> s_stats;
struct rte_mempool * s_mbuf_pool;
uint16_t s_portid;
struct rte_ether_addr s_host_mac;
};
struct options_t options;
static uint16_t
rx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{
// XXX: need to get the timestamp in every loop?
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "rx_calc_latency: ignoring invalid packet 0x%p.", (void*)pkts[i]);
continue;
}
pkt_data->clt_ts_rx = htonll(now);
}
return nb_pkts;
}
static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
// XXX: need to get the timestamp in every loop?
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "tx_add_timestamp: ignoring invalid packet 0x%p.", (void*)pkts[i]);
continue;
}
pkt_data->clt_ts_tx = htonll(now);
}
return nb_pkts;
}
#define STATE_SEND (0)
#define STATE_RECV (1)
static int
locore_main(void * _unused __rte_unused)
{
struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE];
struct packet_data *pkt_data;
uint32_t core_id = rte_lcore_id();
uint32_t epoch = 0;
int state = STATE_SEND;
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid);
}
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "locore_main: core %d sending packets... [Ctrl+C to quit]\n", core_id);
tx_buf = rte_pktmbuf_alloc(options.s_mbuf_pool);
if (tx_buf == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf");
}
pkt_data = (struct packet_data *)rte_pktmbuf_append(tx_buf, sizeof(struct packet_data));
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf");
}
// construct the packet ether type
pkt_data->eth_hdr.ether_type = 0x1234;
rte_ether_addr_copy(&options.server_mac, &pkt_data->eth_hdr.d_addr);
rte_ether_addr_copy(&options.s_host_mac, &pkt_data->eth_hdr.s_addr);
while(!options.s_stop.load()) {
// always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
if (nb_rx != 0 && state == STATE_RECV) {
// only process packets when we are ready to receive
for (int i = 0; i < nb_rx; i++) {
struct packet_data * each = check_valid_packet(rx_bufs[i]);
if (each == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "locore_main: ignoring invalid packet 0x%p.", (void*)rx_bufs[i]);
continue;
}
if (each->epoch == epoch) {
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "locore_main: received packet 0x%p for epoch %d", (void*)rx_bufs[i], epoch);
if (options.s_record.load()) {
// keep statistics
struct datapt * dpt = new datapt;
dpt->rtt = ntohll(pkt_data->clt_ts_rx) - ntohll(pkt_data->clt_ts_tx);
dpt->server_proc = ntohll(pkt_data->srv_ts_tx) - ntohll(pkt_data->srv_ts_rx);
options.s_stats.push_back(dpt);
}
// bump the epoch and stop processing other packets
state = STATE_SEND;
epoch++;
break;
} else {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "locore_main: ignoring packet 0x%p with invalid epoch %d.", (void*)rx_bufs[i], epoch);
}
}
}
if (state == STATE_SEND) {
// set new epoch
pkt_data->epoch = epoch;
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "locore_main: sending packet 0x%p with epoch %d", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1);
if (nb_tx < 1) {
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d", (void*)tx_buf, epoch);
}
}
}
rte_pktmbuf_free(tx_buf);
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "locore_main: core %d successfully stopped.", core_id);
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info;
struct rte_eth_conf port_conf;
struct rte_eth_txconf txconf;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
if(!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
if(dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM)
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (ret < 0)
return ret;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < TX_RING_NUM; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr;
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
" %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
portid,
addr.addr_bytes[0], addr.addr_bytes[1],
addr.addr_bytes[2], addr.addr_bytes[3],
addr.addr_bytes[4], addr.addr_bytes[5]);
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
rte_eth_add_rx_callback(portid, 0, rx_calc_latency, NULL);
return 0;
}
static void usage()
{
fprintf(stdout, "Usage:\n \
-v: verbose mode\n \
-h: display the information\n");
}
int main(int argc, char* argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
std::ofstream log_file;
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!");
}
argc -= ret;
argv += ret;
// set warning level
rte_log_set_level(RTE_LOGTYPE_USER1, RTE_LOG_WARNING);
{
int c;
// parse arguments
while((c = getopt(argc, argv, "hvo:t:T:s:")) != -1) {
switch (c) {
case 'v':
rte_log_set_level(RTE_LOGTYPE_USER1, RTE_LOG_INFO);
break;
case 's':
if (rte_ether_unformat_addr(optarg, &options.server_mac) == -1) {
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.", optarg);
}
break;
case 't':
options.run_time = atoi(optarg);
break;
case 'T':
options.warmup_time = atoi(optarg);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, NULL);
break;
case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1);
break;
default:
usage();
rte_exit(EXIT_FAILURE, "unknown argument: %c", c);
break;
}
}
}
// open log file for writing
log_file.open(options.output, std::ofstream::out);
if (!log_file) {
rte_exit(EXIT_FAILURE, "failed to open log file");
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports < 2 || (nb_ports & 1)) {
rte_exit(EXIT_FAILURE, "number of ports must be even");
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool");
}
options.s_mbuf_pool = mbuf_pool;
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port");
}
options.s_portid = portid;
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d", portid);
}
if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d", portid);
}
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "Configured port %d with mac addr %x-%x-%x-%x-%x-%x", portid,
options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]);
uint16_t core_id = rte_get_next_lcore(0, true, false);
if (rte_eal_remote_launch(locore_main, NULL, core_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore");
}
// poor man's timer
// XXX: use kqueue instead
struct timespec ts;
ts.tv_sec = 1;
ts.tv_nsec = 0;
uint32_t second = 0;
while(true) {
if (second >= options.warmup_time) {
options.s_record.store(true);
}
if (second >= options.run_time + options.warmup_time) {
options.s_stop.store(true);
break;
}
clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL);
second++;
}
if (rte_eal_wait_lcore(core_id) < 0)
rte_exit(EXIT_FAILURE, "failed to wait for job completion");
// dump stats
for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) {
log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl;
delete *it;
}
log_file.close();
// clean up
rte_eth_dev_stop(portid);
rte_eth_dev_close(portid);
return 0;
}

9
compile_flags.txt Normal file
View File

@ -0,0 +1,9 @@
-xc++
-O2
-std=c++11
-Wall
-Werror
-Wpedantic
-I/usr/include/dpdk
-Iinc
-Wno-deprecated-declarations

54
inc/pkt.h Normal file
View File

@ -0,0 +1,54 @@
#pragma once
#include <stdint.h>
#include <rte_ether.h>
#include <unistd.h>
#include <netinet/in.h>
constexpr static uint16_t ETHER_FRAME_MAGIC = 0x1234;
struct packet_data
{
struct rte_ether_hdr eth_hdr;
uint32_t epoch;
uint64_t clt_ts_tx;
uint64_t clt_ts_rx;
uint64_t srv_ts_tx;
uint64_t srv_ts_rx;
};
static inline
struct packet_data * check_valid_packet(struct rte_mbuf * pkt)
{
struct packet_data * pkt_data = NULL;
if (rte_pktmbuf_data_len(pkt) < sizeof(struct packet_data)) {
return NULL;
}
pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *);
if (ntohs(pkt_data->eth_hdr.ether_type) != ETHER_FRAME_MAGIC) {
return NULL;
}
return pkt_data;
}
static inline uint64_t htonll(uint64_t x)
{
#if __BIG_ENDIAN__
return x;
#else
return ((uint64_t)htonl((x) & 0xFFFFFFFFLL) << 32) | htonl((x) >> 32);
#endif
}
static inline uint64_t ntohll(uint64_t x)
{
#if __BIG_ENDIAN__
return x;
#else
return ((uint64_t)ntohl((x) & 0xFFFFFFFFLL) << 32) | ntohl((x) >> 32);
#endif
}

308
khat/khat.cc Normal file
View File

@ -0,0 +1,308 @@
#include <cstdlib>
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <atomic>
#include <unistd.h>
#include "pkt.h"
#include "rte_launch.h"
#include "rte_log.h"
constexpr unsigned int MBUF_MAX_COUNT = 1024;
constexpr unsigned int MBUF_CACHE_SIZE = 256;
constexpr unsigned int RX_RING_SIZE = 1024;
constexpr unsigned int TX_RING_SIZE = 1024;
constexpr unsigned int RX_RING_NUM = 1;
constexpr unsigned int TX_RING_NUM = 1;
constexpr unsigned int BURST_SIZE = 4;
struct options_t {
//states
uint16_t s_portid;
struct rte_ether_addr s_host_mac;
};
struct options_t options;
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "rx_add_timestamp: ignoring invalid packet 0x%p.", (void*)pkts[i]);
continue;
}
pkt_data->srv_ts_rx = htonll(now);
}
return nb_pkts;
}
static uint16_t
tx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "tx_calc_latency: ignoring invalid packet 0x%p.", (void*)pkts[i]);
continue;
}
pkt_data->srv_ts_tx = htonll(now);
}
return nb_pkts;
}
static int
locore_main(void * _unused __rte_unused)
{
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct packet_data *pkt_data;
struct rte_ether_addr eth_addr;
uint32_t core_id = rte_lcore_id();
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid);
}
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "locore_main: core %d forwarding packets. [Ctrl+C to quit]\n", core_id);
while(1) {
uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE);
if (nb_rx == 0)
continue;
for(int i = 0; i < nb_rx; i++) {
pkt_data = check_valid_packet(bufs[i]);
if (pkt_data == NULL) {
rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1, "locore_main: core %d skipping invalid packet %p.", core_id, (void*)bufs[i]);
continue;
}
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1,"locore_main: core %d rx packet from %x-%x-%x-%x-%x-%x", core_id,
pkt_data->eth_hdr.s_addr.addr_bytes[0],
pkt_data->eth_hdr.s_addr.addr_bytes[1],
pkt_data->eth_hdr.s_addr.addr_bytes[2],
pkt_data->eth_hdr.s_addr.addr_bytes[3],
pkt_data->eth_hdr.s_addr.addr_bytes[4],
pkt_data->eth_hdr.s_addr.addr_bytes[5]);
// swap s_addr and d_addr
rte_ether_addr_copy(&pkt_data->eth_hdr.s_addr, &eth_addr);
rte_ether_addr_copy(&pkt_data->eth_hdr.d_addr, &pkt_data->eth_hdr.s_addr);
rte_ether_addr_copy(&eth_addr, &pkt_data->eth_hdr.d_addr);
// queue for burst send
tx_bufs[nb_tx++] = bufs[i];
}
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx);
// cleanup unsent packets
if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.");
// rte_log(RTE_LOG_WARNING, RTE_LOGTYPE_USER1,"locore_main: core %d some packets failed to queue to tx - expected: %d sent: %d", core_id, nb_tx, nb_tx_succ);
// for (uint16_t buf = nb_tx_succ; buf < nb_tx; buf++)
// rte_pktmbuf_free(tx_bufs[buf]);
}
}
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info;
struct rte_eth_conf port_conf;
struct rte_eth_txconf txconf;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
if(!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
if(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
if(dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM)
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (ret < 0)
return ret;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < TX_RING_NUM; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr;
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
" %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
portid,
addr.addr_bytes[0], addr.addr_bytes[1],
addr.addr_bytes[2], addr.addr_bytes[3],
addr.addr_bytes[4], addr.addr_bytes[5]);
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
rte_eth_add_tx_callback(portid, 0, tx_calc_latency, NULL);
rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL);
return 0;
}
static void usage()
{
fprintf(stdout, "Usage:\n \
-v: verbose mode\n \
-h: display the information\n");
}
int main(int argc, char* argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!");
}
argc -= ret;
argv += ret;
// set warning level
rte_log_set_level(RTE_LOGTYPE_USER1, RTE_LOG_WARNING);
{
int c;
// parse arguments
while((c = getopt(argc, argv, "hv")) != -1) {
switch (c) {
case 'v':
rte_log_set_level(RTE_LOGTYPE_USER1, RTE_LOG_INFO);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, NULL);
break;
default:
usage();
rte_exit(EXIT_SUCCESS, "unknown argument: %c", c);
break;
}
}
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports < 2 || (nb_ports & 1)) {
rte_exit(EXIT_FAILURE, "number of ports must be even");
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port");
}
options.s_portid = portid;
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d", portid);
}
if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d", portid);
}
rte_log(RTE_LOG_INFO, RTE_LOGTYPE_USER1, "Configured port %d with mac addr %x-%x-%x-%x-%x-%x", portid,
options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]);
uint16_t lcore_id = rte_get_next_lcore(0, true, false);
if (rte_eal_remote_launch(locore_main, NULL, lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d", lcore_id);
}
if (rte_eal_wait_lcore(lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d", lcore_id);
}
// shouldn't get here
return 0;
}