Stephen Hemminger cb056611a8 eal: rename lcore master and slave
Replace master lcore with main lcore and
replace slave lcore with worker lcore.

Keep the old functions and macros but mark them as deprecated
for this release.

The "--master-lcore" command line option is also deprecated
and any usage will print a warning and use "--main-lcore"
as replacement.

Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
Acked-by: Anatoly Burakov <anatoly.burakov@intel.com>
2020-10-20 13:17:08 +02:00

736 lines
19 KiB
C

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2010-2016 Intel Corporation
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/queue.h>
#include <netinet/in.h>
#include <setjmp.h>
#include <stdarg.h>
#include <ctype.h>
#include <errno.h>
#include <getopt.h>
#include <rte_common.h>
#include <rte_log.h>
#include <rte_malloc.h>
#include <rte_memory.h>
#include <rte_memcpy.h>
#include <rte_eal.h>
#include <rte_launch.h>
#include <rte_atomic.h>
#include <rte_cycles.h>
#include <rte_prefetch.h>
#include <rte_lcore.h>
#include <rte_per_lcore.h>
#include <rte_branch_prediction.h>
#include <rte_interrupts.h>
#include <rte_random.h>
#include <rte_debug.h>
#include <rte_ether.h>
#include <rte_ethdev.h>
#include <rte_mempool.h>
#include <rte_mbuf.h>
#define RTE_LOGTYPE_LSI RTE_LOGTYPE_USER1
#define NB_MBUF 8192
#define MAX_PKT_BURST 32
#define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
/*
* Configurable number of RX/TX ring descriptors
*/
#define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
/* ethernet addresses of ports */
static struct rte_ether_addr lsi_ports_eth_addr[RTE_MAX_ETHPORTS];
/* mask of enabled ports */
static uint32_t lsi_enabled_port_mask = 0;
static unsigned int lsi_rx_queue_per_lcore = 1;
/* destination port for L2 forwarding */
static unsigned lsi_dst_ports[RTE_MAX_ETHPORTS] = {0};
#define MAX_PKT_BURST 32
#define MAX_RX_QUEUE_PER_LCORE 16
#define MAX_TX_QUEUE_PER_PORT 16
struct lcore_queue_conf {
unsigned n_rx_port;
unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
unsigned tx_queue_id;
} __rte_cache_aligned;
struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
static struct rte_eth_conf port_conf = {
.rxmode = {
.split_hdr_size = 0,
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
.intr_conf = {
.lsc = 1, /**< lsc interrupt feature enabled */
},
};
struct rte_mempool * lsi_pktmbuf_pool = NULL;
/* Per-port statistics struct */
struct lsi_port_statistics {
uint64_t tx;
uint64_t rx;
uint64_t dropped;
} __rte_cache_aligned;
struct lsi_port_statistics port_statistics[RTE_MAX_ETHPORTS];
/* A tsc-based timer responsible for triggering statistics printout */
#define TIMER_MILLISECOND 2000000ULL /* around 1ms at 2 Ghz */
#define MAX_TIMER_PERIOD 86400 /* 1 day max */
static int64_t timer_period = 10 * TIMER_MILLISECOND * 1000; /* default period is 10 seconds */
/* Print out statistics on packets dropped */
static void
print_stats(void)
{
struct rte_eth_link link;
uint64_t total_packets_dropped, total_packets_tx, total_packets_rx;
uint16_t portid;
total_packets_dropped = 0;
total_packets_tx = 0;
total_packets_rx = 0;
const char clr[] = { 27, '[', '2', 'J', '\0' };
const char topLeft[] = { 27, '[', '1', ';', '1', 'H','\0' };
int link_get_err;
/* Clear screen and move to top left */
printf("%s%s", clr, topLeft);
printf("\nPort statistics ====================================");
for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
/* skip ports that are not enabled */
if ((lsi_enabled_port_mask & (1 << portid)) == 0)
continue;
memset(&link, 0, sizeof(link));
link_get_err = rte_eth_link_get_nowait(portid, &link);
printf("\nStatistics for port %u ------------------------------"
"\nLink status: %25s"
"\nLink speed: %26s"
"\nLink duplex: %25s"
"\nPackets sent: %24"PRIu64
"\nPackets received: %20"PRIu64
"\nPackets dropped: %21"PRIu64,
portid,
link_get_err < 0 ? "Link get failed" :
(link.link_status ? "Link up" : "Link down"),
link_get_err < 0 ? "0" :
rte_eth_link_speed_to_str(link.link_speed),
link_get_err < 0 ? "Link get failed" :
(link.link_duplex == ETH_LINK_FULL_DUPLEX ? \
"full-duplex" : "half-duplex"),
port_statistics[portid].tx,
port_statistics[portid].rx,
port_statistics[portid].dropped);
total_packets_dropped += port_statistics[portid].dropped;
total_packets_tx += port_statistics[portid].tx;
total_packets_rx += port_statistics[portid].rx;
}
printf("\nAggregate statistics ==============================="
"\nTotal packets sent: %18"PRIu64
"\nTotal packets received: %14"PRIu64
"\nTotal packets dropped: %15"PRIu64,
total_packets_tx,
total_packets_rx,
total_packets_dropped);
printf("\n====================================================\n");
fflush(stdout);
}
static void
lsi_simple_forward(struct rte_mbuf *m, unsigned portid)
{
struct rte_ether_hdr *eth;
void *tmp;
unsigned dst_port = lsi_dst_ports[portid];
int sent;
struct rte_eth_dev_tx_buffer *buffer;
eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
/* 02:00:00:00:00:xx */
tmp = &eth->d_addr.addr_bytes[0];
*((uint64_t *)tmp) = 0x000000000002 + ((uint64_t)dst_port << 40);
/* src addr */
rte_ether_addr_copy(&lsi_ports_eth_addr[dst_port], &eth->s_addr);
buffer = tx_buffer[dst_port];
sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
if (sent)
port_statistics[dst_port].tx += sent;
}
/* main processing loop */
static void
lsi_main_loop(void)
{
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
unsigned sent;
uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;
unsigned i, j, portid, nb_rx;
struct lcore_queue_conf *qconf;
const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S *
BURST_TX_DRAIN_US;
struct rte_eth_dev_tx_buffer *buffer;
prev_tsc = 0;
timer_tsc = 0;
lcore_id = rte_lcore_id();
qconf = &lcore_queue_conf[lcore_id];
if (qconf->n_rx_port == 0) {
RTE_LOG(INFO, LSI, "lcore %u has nothing to do\n", lcore_id);
return;
}
RTE_LOG(INFO, LSI, "entering main loop on lcore %u\n", lcore_id);
for (i = 0; i < qconf->n_rx_port; i++) {
portid = qconf->rx_port_list[i];
RTE_LOG(INFO, LSI, " -- lcoreid=%u portid=%u\n", lcore_id,
portid);
}
while (1) {
cur_tsc = rte_rdtsc();
/*
* TX burst queue drain
*/
diff_tsc = cur_tsc - prev_tsc;
if (unlikely(diff_tsc > drain_tsc)) {
for (i = 0; i < qconf->n_rx_port; i++) {
portid = lsi_dst_ports[qconf->rx_port_list[i]];
buffer = tx_buffer[portid];
sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
if (sent)
port_statistics[portid].tx += sent;
}
/* if timer is enabled */
if (timer_period > 0) {
/* advance the timer */
timer_tsc += diff_tsc;
/* if timer has reached its timeout */
if (unlikely(timer_tsc >= (uint64_t) timer_period)) {
/* do this only on main core */
if (lcore_id == rte_get_main_lcore()) {
print_stats();
/* reset the timer */
timer_tsc = 0;
}
}
}
prev_tsc = cur_tsc;
}
/*
* Read packet from RX queues
*/
for (i = 0; i < qconf->n_rx_port; i++) {
portid = qconf->rx_port_list[i];
nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
pkts_burst, MAX_PKT_BURST);
port_statistics[portid].rx += nb_rx;
for (j = 0; j < nb_rx; j++) {
m = pkts_burst[j];
rte_prefetch0(rte_pktmbuf_mtod(m, void *));
lsi_simple_forward(m, portid);
}
}
}
}
static int
lsi_launch_one_lcore(__rte_unused void *dummy)
{
lsi_main_loop();
return 0;
}
/* display usage */
static void
lsi_usage(const char *prgname)
{
printf("%s [EAL options] -- -p PORTMASK [-q NQ]\n"
" -p PORTMASK: hexadecimal bitmask of ports to configure\n"
" -q NQ: number of queue (=ports) per lcore (default is 1)\n"
" -T PERIOD: statistics will be refreshed each PERIOD seconds (0 to disable, 10 default, 86400 maximum)\n",
prgname);
}
static int
lsi_parse_portmask(const char *portmask)
{
char *end = NULL;
unsigned long pm;
/* parse hexadecimal string */
pm = strtoul(portmask, &end, 16);
if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
return 0;
return pm;
}
static unsigned int
lsi_parse_nqueue(const char *q_arg)
{
char *end = NULL;
unsigned long n;
/* parse hexadecimal string */
n = strtoul(q_arg, &end, 10);
if ((q_arg[0] == '\0') || (end == NULL) || (*end != '\0'))
return 0;
if (n == 0)
return 0;
if (n >= MAX_RX_QUEUE_PER_LCORE)
return 0;
return n;
}
static int
lsi_parse_timer_period(const char *q_arg)
{
char *end = NULL;
int n;
/* parse number string */
n = strtol(q_arg, &end, 10);
if ((q_arg[0] == '\0') || (end == NULL) || (*end != '\0'))
return -1;
if (n >= MAX_TIMER_PERIOD)
return -1;
return n;
}
/* Parse the argument given in the command line of the application */
static int
lsi_parse_args(int argc, char **argv)
{
int opt, ret;
char **argvopt;
int option_index;
char *prgname = argv[0];
static struct option lgopts[] = {
{NULL, 0, 0, 0}
};
argvopt = argv;
while ((opt = getopt_long(argc, argvopt, "p:q:T:",
lgopts, &option_index)) != EOF) {
switch (opt) {
/* portmask */
case 'p':
lsi_enabled_port_mask = lsi_parse_portmask(optarg);
if (lsi_enabled_port_mask == 0) {
printf("invalid portmask\n");
lsi_usage(prgname);
return -1;
}
break;
/* nqueue */
case 'q':
lsi_rx_queue_per_lcore = lsi_parse_nqueue(optarg);
if (lsi_rx_queue_per_lcore == 0) {
printf("invalid queue number\n");
lsi_usage(prgname);
return -1;
}
break;
/* timer period */
case 'T':
timer_period = lsi_parse_timer_period(optarg) * 1000 * TIMER_MILLISECOND;
if (timer_period < 0) {
printf("invalid timer period\n");
lsi_usage(prgname);
return -1;
}
break;
/* long options */
case 0:
lsi_usage(prgname);
return -1;
default:
lsi_usage(prgname);
return -1;
}
}
if (optind >= 0)
argv[optind-1] = prgname;
ret = optind-1;
optind = 1; /* reset getopt lib */
return ret;
}
/**
* It will be called as the callback for specified port after a LSI interrupt
* has been fully handled. This callback needs to be implemented carefully as
* it will be called in the interrupt host thread which is different from the
* application main thread.
*
* @param port_id
* Port id.
* @param type
* event type.
* @param param
* Pointer to(address of) the parameters.
*
* @return
* int.
*/
static int
lsi_event_callback(uint16_t port_id, enum rte_eth_event_type type, void *param,
void *ret_param)
{
struct rte_eth_link link;
int ret;
char link_status_text[RTE_ETH_LINK_MAX_STR_LEN];
RTE_SET_USED(param);
RTE_SET_USED(ret_param);
printf("\n\nIn registered callback...\n");
printf("Event type: %s\n", type == RTE_ETH_EVENT_INTR_LSC ? "LSC interrupt" : "unknown event");
ret = rte_eth_link_get_nowait(port_id, &link);
if (ret < 0) {
printf("Failed link get on port %d: %s\n",
port_id, rte_strerror(-ret));
return ret;
}
rte_eth_link_to_str(link_status_text, sizeof(link_status_text), &link);
printf("Port %d %s\n\n", port_id, link_status_text);
return 0;
}
/* Check the link status of all ports in up to 9s, and print them finally */
static void
check_all_ports_link_status(uint16_t port_num, uint32_t port_mask)
{
#define CHECK_INTERVAL 100 /* 100ms */
#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
uint8_t count, all_ports_up, print_flag = 0;
uint16_t portid;
struct rte_eth_link link;
int ret;
char link_status_text[RTE_ETH_LINK_MAX_STR_LEN];
printf("\nChecking link status");
fflush(stdout);
for (count = 0; count <= MAX_CHECK_TIME; count++) {
all_ports_up = 1;
for (portid = 0; portid < port_num; portid++) {
if ((port_mask & (1 << portid)) == 0)
continue;
memset(&link, 0, sizeof(link));
ret = rte_eth_link_get_nowait(portid, &link);
if (ret < 0) {
all_ports_up = 0;
if (print_flag == 1)
printf("Port %u link get failed: %s\n",
portid, rte_strerror(-ret));
continue;
}
/* print link status if flag set */
if (print_flag == 1) {
rte_eth_link_to_str(link_status_text,
sizeof(link_status_text), &link);
printf("Port %d %s", portid,
link_status_text);
continue;
}
/* clear all_ports_up flag if any link down */
if (link.link_status == ETH_LINK_DOWN) {
all_ports_up = 0;
break;
}
}
/* after finally printing all link status, get out */
if (print_flag == 1)
break;
if (all_ports_up == 0) {
printf(".");
fflush(stdout);
rte_delay_ms(CHECK_INTERVAL);
}
/* set the print_flag if all ports up or timeout */
if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
print_flag = 1;
printf("done\n");
}
}
}
int
main(int argc, char **argv)
{
struct lcore_queue_conf *qconf;
int ret;
uint16_t nb_ports;
uint16_t portid, portid_last = 0;
unsigned lcore_id, rx_lcore_id;
unsigned nb_ports_in_mask = 0;
/* init EAL */
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eal_init failed");
argc -= ret;
argv += ret;
/* parse application arguments (after the EAL ones) */
ret = lsi_parse_args(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid arguments");
/* create the mbuf pool */
lsi_pktmbuf_pool =
rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF, 32, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (lsi_pktmbuf_pool == NULL)
rte_panic("Cannot init mbuf pool\n");
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0)
rte_panic("No Ethernet port - bye\n");
/*
* Each logical core is assigned a dedicated TX queue on each port.
*/
for (portid = 0; portid < nb_ports; portid++) {
/* skip ports that are not enabled */
if ((lsi_enabled_port_mask & (1 << portid)) == 0)
continue;
/* save the destination port id */
if (nb_ports_in_mask % 2) {
lsi_dst_ports[portid] = portid_last;
lsi_dst_ports[portid_last] = portid;
}
else
portid_last = portid;
nb_ports_in_mask++;
}
if (nb_ports_in_mask < 2 || nb_ports_in_mask % 2)
rte_exit(EXIT_FAILURE, "Current enabled port number is %u, "
"but it should be even and at least 2\n",
nb_ports_in_mask);
rx_lcore_id = 0;
qconf = &lcore_queue_conf[rx_lcore_id];
/* Initialize the port/queue configuration of each logical core */
for (portid = 0; portid < nb_ports; portid++) {
/* skip ports that are not enabled */
if ((lsi_enabled_port_mask & (1 << portid)) == 0)
continue;
/* get the lcore_id for this port */
while (rte_lcore_is_enabled(rx_lcore_id) == 0 ||
lcore_queue_conf[rx_lcore_id].n_rx_port ==
lsi_rx_queue_per_lcore) {
rx_lcore_id++;
if (rx_lcore_id >= RTE_MAX_LCORE)
rte_exit(EXIT_FAILURE, "Not enough cores\n");
}
if (qconf != &lcore_queue_conf[rx_lcore_id])
/* Assigned a new logical core in the loop above. */
qconf = &lcore_queue_conf[rx_lcore_id];
qconf->rx_port_list[qconf->n_rx_port] = portid;
qconf->n_rx_port++;
printf("Lcore %u: RX port %u\n",rx_lcore_id, (unsigned) portid);
}
/* Initialise each port */
for (portid = 0; portid < nb_ports; portid++) {
struct rte_eth_rxconf rxq_conf;
struct rte_eth_txconf txq_conf;
struct rte_eth_conf local_port_conf = port_conf;
struct rte_eth_dev_info dev_info;
/* skip ports that are not enabled */
if ((lsi_enabled_port_mask & (1 << portid)) == 0) {
printf("Skipping disabled port %u\n", (unsigned) portid);
continue;
}
/* init port */
printf("Initializing port %u... ", (unsigned) portid);
fflush(stdout);
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
rte_exit(EXIT_FAILURE,
"Error during getting device (port %u) info: %s\n",
portid, strerror(-ret));
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
local_port_conf.txmode.offloads |=
DEV_TX_OFFLOAD_MBUF_FAST_FREE;
ret = rte_eth_dev_configure(portid, 1, 1, &local_port_conf);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n",
ret, (unsigned) portid);
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd,
&nb_txd);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"rte_eth_dev_adjust_nb_rx_tx_desc: err=%d, port=%u\n",
ret, (unsigned) portid);
/* register lsi interrupt callback, need to be after
* rte_eth_dev_configure(). if (intr_conf.lsc == 0), no
* lsc interrupt will be present, and below callback to
* be registered will never be called.
*/
rte_eth_dev_callback_register(portid,
RTE_ETH_EVENT_INTR_LSC, lsi_event_callback, NULL);
ret = rte_eth_macaddr_get(portid,
&lsi_ports_eth_addr[portid]);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"rte_eth_macaddr_get: err=%d, port=%u\n",
ret, (unsigned int)portid);
/* init one RX queue */
fflush(stdout);
rxq_conf = dev_info.default_rxconf;
rxq_conf.offloads = local_port_conf.rxmode.offloads;
ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
rte_eth_dev_socket_id(portid),
&rxq_conf,
lsi_pktmbuf_pool);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup: err=%d, port=%u\n",
ret, (unsigned) portid);
/* init one TX queue logical core on each port */
fflush(stdout);
txq_conf = dev_info.default_txconf;
txq_conf.offloads = local_port_conf.txmode.offloads;
ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
rte_eth_dev_socket_id(portid),
&txq_conf);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup: err=%d,port=%u\n",
ret, (unsigned) portid);
/* Initialize TX buffers */
tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
rte_eth_dev_socket_id(portid));
if (tx_buffer[portid] == NULL)
rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
(unsigned) portid);
rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
rte_eth_tx_buffer_count_callback,
&port_statistics[portid].dropped);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot set error callback for "
"tx buffer on port %u\n", (unsigned) portid);
/* Start device */
ret = rte_eth_dev_start(portid);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_dev_start: err=%d, port=%u\n",
ret, (unsigned) portid);
printf("done:\n");
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
rte_exit(EXIT_FAILURE,
"rte_eth_promiscuous_enable: err=%s, port=%u\n",
rte_strerror(-ret), portid);
printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
(unsigned) portid,
lsi_ports_eth_addr[portid].addr_bytes[0],
lsi_ports_eth_addr[portid].addr_bytes[1],
lsi_ports_eth_addr[portid].addr_bytes[2],
lsi_ports_eth_addr[portid].addr_bytes[3],
lsi_ports_eth_addr[portid].addr_bytes[4],
lsi_ports_eth_addr[portid].addr_bytes[5]);
/* initialize port stats */
memset(&port_statistics, 0, sizeof(port_statistics));
}
check_all_ports_link_status(nb_ports, lsi_enabled_port_mask);
/* launch per-lcore init on every lcore */
rte_eal_mp_remote_launch(lsi_launch_one_lcore, NULL, CALL_MAIN);
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (rte_eal_wait_lcore(lcore_id) < 0)
return -1;
}
return 0;
}