examples/eventdev_pipeline: support Tx adapter

Redo the worker pipelines and offload transmission to service cores
seamlessly through Tx adapter.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Reviewed-by: Nikhil Rao <nikhil.rao@intel.com>
Tested-by: Nikhil Rao <nikhil.rao@intel.com>
This commit is contained in:
Pavan Nikhilesh 2018-09-24 15:42:08 +05:30 committed by Jerin Jacob
parent de3466e3a4
commit 085edac2ca
4 changed files with 206 additions and 335 deletions

View File

@ -26,20 +26,6 @@ core_in_use(unsigned int lcore_id) {
fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
}
static void
eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
void *userdata)
{
int port_id = (uintptr_t) userdata;
unsigned int _sent = 0;
do {
/* Note: hard-coded TX queue */
_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
unsent - _sent);
} while (_sent != unsent);
}
/*
* Parse the coremask given as argument (hexadecimal string) and fill
* the global configuration (core role and core count) with the parsed
@ -263,6 +249,7 @@ parse_app_args(int argc, char **argv)
static inline int
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
{
struct rte_eth_rxconf rx_conf;
static const struct rte_eth_conf port_conf_default = {
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
@ -291,6 +278,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |=
DEV_TX_OFFLOAD_MBUF_FAST_FREE;
rx_conf = dev_info.default_rxconf;
rx_conf.offloads = port_conf.rxmode.offloads;
port_conf.rx_adv_conf.rss_conf.rss_hf &=
dev_info.flow_type_rss_offloads;
@ -311,7 +300,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
rte_eth_dev_socket_id(port), NULL, mbuf_pool);
rte_eth_dev_socket_id(port), &rx_conf,
mbuf_pool);
if (retval < 0)
return retval;
}
@ -350,7 +340,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
static int
init_ports(uint16_t num_ports)
{
uint16_t portid, i;
uint16_t portid;
if (!cdata.num_mbuf)
cdata.num_mbuf = 16384 * num_ports;
@ -367,36 +357,26 @@ init_ports(uint16_t num_ports)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
portid);
RTE_ETH_FOREACH_DEV(i) {
void *userdata = (void *)(uintptr_t) i;
fdata->tx_buf[i] =
rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
if (fdata->tx_buf[i] == NULL)
rte_panic("Out of memory\n");
rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
eth_tx_buffer_retry,
userdata);
}
return 0;
}
static void
do_capability_setup(uint8_t eventdev_id)
{
int ret;
uint16_t i;
uint8_t mt_unsafe = 0;
uint8_t generic_pipeline = 0;
uint8_t burst = 0;
RTE_ETH_FOREACH_DEV(i) {
struct rte_eth_dev_info dev_info;
memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
uint32_t caps = 0;
rte_eth_dev_info_get(i, &dev_info);
/* Check if it is safe ask worker to tx. */
mt_unsafe |= !(dev_info.tx_offload_capa &
DEV_TX_OFFLOAD_MT_LOCKFREE);
ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
if (ret)
rte_exit(EXIT_FAILURE,
"Invalid capability for Tx adptr port %d\n", i);
generic_pipeline |= !(caps &
RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
}
struct rte_event_dev_info eventdev_info;
@ -406,21 +386,42 @@ do_capability_setup(uint8_t eventdev_id)
burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
0;
if (mt_unsafe)
if (generic_pipeline)
set_worker_generic_setup_data(&fdata->cap, burst);
else
set_worker_tx_setup_data(&fdata->cap, burst);
set_worker_tx_enq_setup_data(&fdata->cap, burst);
}
static void
signal_handler(int signum)
{
static uint8_t once;
uint16_t portid;
if (fdata->done)
rte_exit(1, "Exiting on signal %d\n", signum);
if (signum == SIGINT || signum == SIGTERM) {
if ((signum == SIGINT || signum == SIGTERM) && !once) {
printf("\n\nSignal %d received, preparing to exit...\n",
signum);
if (cdata.dump_dev)
rte_event_dev_dump(0, stdout);
once = 1;
fdata->done = 1;
rte_smp_wmb();
RTE_ETH_FOREACH_DEV(portid) {
rte_event_eth_rx_adapter_stop(portid);
rte_event_eth_tx_adapter_stop(portid);
rte_eth_dev_stop(portid);
}
rte_eal_mp_wait_lcore();
RTE_ETH_FOREACH_DEV(portid) {
rte_eth_dev_close(portid);
}
rte_event_dev_close(0);
}
if (signum == SIGTSTP)
rte_event_dev_dump(0, stdout);
@ -499,7 +500,7 @@ main(int argc, char **argv)
if (worker_data == NULL)
rte_panic("rte_calloc failed\n");
int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
int dev_id = fdata->cap.evdev_setup(worker_data);
if (dev_id < 0)
rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
@ -524,8 +525,8 @@ main(int argc, char **argv)
if (fdata->tx_core[lcore_id])
printf(
"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
__func__, lcore_id, cons_data.port_id);
"[%s()] lcore %d executing NIC Tx\n",
__func__, lcore_id);
if (fdata->sched_core[lcore_id])
printf("[%s()] lcore %d executing scheduler\n",
@ -555,9 +556,6 @@ main(int argc, char **argv)
rte_eal_mp_wait_lcore();
if (cdata.dump_dev)
rte_event_dev_dump(dev_id, stdout);
if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
(uint64_t)-ENOTSUP)) {
printf("\nPort Workload distribution:\n");

View File

@ -16,6 +16,7 @@
#include <rte_ethdev.h>
#include <rte_eventdev.h>
#include <rte_event_eth_rx_adapter.h>
#include <rte_event_eth_tx_adapter.h>
#include <rte_service.h>
#include <rte_service_component.h>
@ -23,38 +24,30 @@
#define BATCH_SIZE 16
#define MAX_NUM_CORE 64
struct cons_data {
uint8_t dev_id;
uint8_t port_id;
uint8_t release;
} __rte_cache_aligned;
struct worker_data {
uint8_t dev_id;
uint8_t port_id;
} __rte_cache_aligned;
typedef int (*worker_loop)(void *);
typedef int (*consumer_loop)(void);
typedef void (*schedule_loop)(unsigned int);
typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
typedef void (*rx_adapter_setup)(uint16_t nb_ports);
typedef int (*eventdev_setup)(struct worker_data *);
typedef void (*adapter_setup)(uint16_t nb_ports);
typedef void (*opt_check)(void);
struct setup_data {
worker_loop worker;
consumer_loop consumer;
schedule_loop scheduler;
eventdev_setup evdev_setup;
rx_adapter_setup adptr_setup;
adapter_setup adptr_setup;
opt_check check_opt;
};
struct fastpath_data {
volatile int done;
uint32_t tx_lock;
uint32_t evdev_service_id;
uint32_t rxadptr_service_id;
uint32_t txadptr_service_id;
bool rx_single;
bool tx_single;
bool sched_single;
@ -62,7 +55,6 @@ struct fastpath_data {
unsigned int tx_core[MAX_NUM_CORE];
unsigned int sched_core[MAX_NUM_CORE];
unsigned int worker_core[MAX_NUM_CORE];
struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
struct setup_data cap;
} __rte_cache_aligned;
@ -88,6 +80,8 @@ struct config_data {
int16_t next_qid[MAX_NUM_STAGES+2];
int16_t qid[MAX_NUM_STAGES];
uint8_t rx_adapter_id;
uint8_t tx_adapter_id;
uint8_t tx_queue_id;
uint64_t worker_lcore_mask;
uint64_t rx_lcore_mask;
uint64_t tx_lcore_mask;
@ -99,8 +93,6 @@ struct port_link {
uint8_t priority;
};
struct cons_data cons_data;
struct fastpath_data *fdata;
struct config_data cdata;
@ -142,12 +134,11 @@ schedule_devices(unsigned int lcore_id)
}
}
if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
fdata->cap.consumer();
rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
if (fdata->tx_core[lcore_id]) {
rte_service_run_iter_on_app_lcore(fdata->txadptr_service_id,
!fdata->tx_single);
}
}
void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);

View File

@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
return 0;
}
static __rte_always_inline int
consumer(void)
{
const uint64_t freq_khz = rte_get_timer_hz() / 1000;
struct rte_event packet;
static uint64_t received;
static uint64_t last_pkts;
static uint64_t last_time;
static uint64_t start_time;
int i;
uint8_t dev_id = cons_data.dev_id;
uint8_t port_id = cons_data.port_id;
do {
uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
&packet, 1, 0);
if (n == 0) {
RTE_ETH_FOREACH_DEV(i)
rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
return 0;
}
if (start_time == 0)
last_time = start_time = rte_get_timer_cycles();
received++;
uint8_t outport = packet.mbuf->port;
exchange_mac(packet.mbuf);
rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
packet.mbuf);
if (cons_data.release)
rte_event_enqueue_burst(dev_id, port_id,
&packet, n);
/* Print out mpps every 1<22 packets */
if (!cdata.quiet && received >= last_pkts + (1<<22)) {
const uint64_t now = rte_get_timer_cycles();
const uint64_t total_ms = (now - start_time) / freq_khz;
const uint64_t delta_ms = (now - last_time) / freq_khz;
uint64_t delta_pkts = received - last_pkts;
printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
"avg %.3f mpps [current %.3f mpps]\n",
__func__,
received,
total_ms,
received / (total_ms * 1000.0),
delta_pkts / (delta_ms * 1000.0));
last_pkts = received;
last_time = now;
}
cdata.num_packets--;
if (cdata.num_packets <= 0)
fdata->done = 1;
/* Be stuck in this loop if single. */
} while (!fdata->done && fdata->tx_single);
return 0;
}
static __rte_always_inline int
consumer_burst(void)
{
const uint64_t freq_khz = rte_get_timer_hz() / 1000;
struct rte_event packets[BATCH_SIZE];
static uint64_t received;
static uint64_t last_pkts;
static uint64_t last_time;
static uint64_t start_time;
unsigned int i, j;
uint8_t dev_id = cons_data.dev_id;
uint8_t port_id = cons_data.port_id;
do {
uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
packets, RTE_DIM(packets), 0);
if (n == 0) {
RTE_ETH_FOREACH_DEV(j)
rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
return 0;
}
if (start_time == 0)
last_time = start_time = rte_get_timer_cycles();
received += n;
for (i = 0; i < n; i++) {
uint8_t outport = packets[i].mbuf->port;
exchange_mac(packets[i].mbuf);
rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
packets[i].mbuf);
packets[i].op = RTE_EVENT_OP_RELEASE;
}
if (cons_data.release) {
uint16_t nb_tx;
nb_tx = rte_event_enqueue_burst(dev_id, port_id,
packets, n);
while (nb_tx < n)
nb_tx += rte_event_enqueue_burst(dev_id,
port_id, packets + nb_tx,
n - nb_tx);
}
/* Print out mpps every 1<22 packets */
if (!cdata.quiet && received >= last_pkts + (1<<22)) {
const uint64_t now = rte_get_timer_cycles();
const uint64_t total_ms = (now - start_time) / freq_khz;
const uint64_t delta_ms = (now - last_time) / freq_khz;
uint64_t delta_pkts = received - last_pkts;
printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
"avg %.3f mpps [current %.3f mpps]\n",
received,
total_ms,
received / (total_ms * 1000.0),
delta_pkts / (delta_ms * 1000.0));
last_pkts = received;
last_time = now;
}
cdata.num_packets -= n;
if (cdata.num_packets <= 0)
fdata->done = 1;
/* Be stuck in this loop if single. */
} while (!fdata->done && fdata->tx_single);
return 0;
}
static int
setup_eventdev_generic(struct cons_data *cons_data,
struct worker_data *worker_data)
setup_eventdev_generic(struct worker_data *worker_data)
{
const uint8_t dev_id = 0;
/* +1 stages is for a SINGLE_LINK TX stage */
const uint8_t nb_queues = cdata.num_stages + 1;
/* + 1 is one port for consumer */
const uint8_t nb_ports = cdata.num_workers + 1;
const uint8_t nb_ports = cdata.num_workers;
struct rte_event_dev_config config = {
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
.nb_atomic_flows = 1024,
.nb_atomic_order_sequences = 1024,
};
struct rte_event_port_conf tx_p_conf = {
.dequeue_depth = 128,
.enqueue_depth = 128,
.new_event_threshold = 4096,
};
struct rte_event_queue_conf tx_q_conf = {
.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
struct port_link worker_queues[MAX_NUM_STAGES];
uint8_t disable_implicit_release;
struct port_link tx_queue;
unsigned int i;
int ret, ndev = rte_event_dev_count();
@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
wkr_p_conf.disable_implicit_release = disable_implicit_release;
tx_p_conf.disable_implicit_release = disable_implicit_release;
if (dev_info.max_event_port_dequeue_depth <
config.nb_event_port_dequeue_depth)
@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
printf("%d: error creating qid %d\n", __LINE__, i);
return -1;
}
tx_queue.queue_id = i;
tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
cdata.tx_queue_id = i;
if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
w->port_id = i;
}
if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
/* port for consumer, linked to TX queue */
if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
printf("Error setting up port %d\n", i);
return -1;
}
if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
&tx_queue.priority, 1) != 1) {
printf("%d: error creating link for port %d\n",
__LINE__, i);
return -1;
}
*cons_data = (struct cons_data){.dev_id = dev_id,
.port_id = i,
.release = disable_implicit_release };
ret = rte_event_dev_service_id_get(dev_id,
&fdata->evdev_service_id);
if (ret != -ESRCH && ret != 0) {
@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
if (rte_event_dev_start(dev_id) < 0) {
printf("Error starting eventdev\n");
return -1;
}
return dev_id;
}
static void
init_rx_adapter(uint16_t nb_ports)
init_adapters(uint16_t nb_ports)
{
int i;
int ret;
uint8_t tx_port_id = 0;
uint8_t evdev_id = 0;
struct rte_event_dev_info dev_info;
ret = rte_event_dev_info_get(evdev_id, &dev_info);
struct rte_event_port_conf rx_p_conf = {
.dequeue_depth = 8,
.enqueue_depth = 8,
.new_event_threshold = 1200,
struct rte_event_port_conf adptr_p_conf = {
.dequeue_depth = cdata.worker_cq_depth,
.enqueue_depth = 64,
.new_event_threshold = 4096,
};
if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
adptr_p_conf.dequeue_depth =
dev_info.max_event_port_dequeue_depth;
if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
adptr_p_conf.enqueue_depth =
dev_info.max_event_port_enqueue_depth;
/* Create one adapter for all the ethernet ports. */
ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
&rx_p_conf);
&adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
cdata.rx_adapter_id);
ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
&adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
cdata.tx_adapter_id);
struct rte_event_eth_rx_adapter_queue_conf queue_conf;
memset(&queue_conf, 0, sizeof(queue_conf));
queue_conf.ev.sched_type = cdata.queue_type;
queue_conf.ev.queue_id = cdata.qid[0];
for (i = 0; i < nb_ports; i++) {
uint32_t cap;
ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
if (ret)
rte_exit(EXIT_FAILURE,
"failed to get event rx adapter "
"capabilities");
ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
-1, &queue_conf);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
-1);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Tx adapter");
}
ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
&tx_port_id);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to get Tx adapter port id");
ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
NULL, 1);
if (ret != 1)
rte_exit(EXIT_FAILURE,
"Unable to link Tx adapter port to Tx queue");
ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
&fdata->rxadptr_service_id);
if (ret != -ESRCH && ret != 0) {
rte_exit(EXIT_FAILURE,
"Error getting the service ID for sw eventdev\n");
"Error getting the service ID for Rx adapter\n");
}
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
&fdata->txadptr_service_id);
if (ret != -ESRCH && ret != 0) {
rte_exit(EXIT_FAILURE,
"Error getting the service ID for Tx adapter\n");
}
rte_service_runstate_set(fdata->txadptr_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
cdata.rx_adapter_id);
ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
if (ret)
rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
cdata.tx_adapter_id);
if (rte_event_dev_start(evdev_id) < 0)
rte_exit(EXIT_FAILURE, "Error starting eventdev");
}
static void
@ -510,6 +373,7 @@ generic_opt_check(void)
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@ -519,6 +383,8 @@ generic_opt_check(void)
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
sched_needed = !(eventdev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@ -531,9 +397,8 @@ generic_opt_check(void)
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
&& !(eventdev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
(cdata.tx_lcore_mask == 0) ||
(sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
@ -545,23 +410,24 @@ generic_opt_check(void)
rte_exit(-1, "Fix core masks\n");
}
if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
if (!sched_needed)
memset(fdata->sched_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
if (!rx_needed)
memset(fdata->rx_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
}
void
set_worker_generic_setup_data(struct setup_data *caps, bool burst)
{
if (burst) {
caps->consumer = consumer_burst;
caps->worker = worker_generic_burst;
} else {
caps->consumer = consumer;
caps->worker = worker_generic;
}
caps->adptr_setup = init_rx_adapter;
caps->adptr_setup = init_adapters;
caps->scheduler = schedule_devices;
caps->evdev_setup = setup_eventdev_generic;
caps->check_opt = generic_opt_check;

View File

@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
}
static __rte_always_inline void
worker_tx_pkt(struct rte_mbuf *mbuf)
worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
{
exchange_mac(mbuf);
while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
exchange_mac(ev->mbuf);
rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
rte_pause();
}
@ -64,15 +65,15 @@ worker_do_tx_single(void *arg)
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev.mbuf);
worker_tx_pkt(dev, port, &ev);
tx++;
continue;
} else {
work();
ev.queue_id++;
worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
worker_event_enqueue(dev, port, &ev);
fwd++;
}
work();
ev.queue_id++;
worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
worker_event_enqueue(dev, port, &ev);
fwd++;
}
if (!cdata.quiet)
@ -100,14 +101,14 @@ worker_do_tx_single_atq(void *arg)
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev.mbuf);
worker_tx_pkt(dev, port, &ev);
tx++;
continue;
} else {
work();
worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
worker_event_enqueue(dev, port, &ev);
fwd++;
}
work();
worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
worker_event_enqueue(dev, port, &ev);
fwd++;
}
if (!cdata.quiet)
@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev[i].mbuf);
worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev[i].mbuf);
worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
} else
@ -232,7 +233,7 @@ worker_do_tx(void *arg)
if (cq_id >= lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev.mbuf);
worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)
if (cq_id == lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev.mbuf);
worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)
if (cq_id >= lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev[i].mbuf);
worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)
if (cq_id == lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
worker_tx_pkt(ev[i].mbuf);
worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
}
static int
setup_eventdev_worker_tx(struct cons_data *cons_data,
struct worker_data *worker_data)
setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
{
RTE_SET_USED(cons_data);
uint8_t i;
const uint8_t atq = cdata.all_type_queues ? 1 : 0;
const uint8_t dev_id = 0;
@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
if (rte_event_dev_start(dev_id) < 0) {
printf("Error starting eventdev\n");
return -1;
}
if (rte_event_dev_start(dev_id) < 0)
rte_exit(EXIT_FAILURE, "Error starting eventdev");
return dev_id;
}
@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
}
static void
init_rx_adapter(uint16_t nb_ports)
init_adapters(uint16_t nb_ports)
{
int i;
int ret;
@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
ret = rte_event_dev_info_get(evdev_id, &dev_info);
adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
struct rte_event_port_conf rx_p_conf = {
.dequeue_depth = 8,
.enqueue_depth = 8,
.new_event_threshold = 1200,
struct rte_event_port_conf adptr_p_conf = {
.dequeue_depth = cdata.worker_cq_depth,
.enqueue_depth = 64,
.new_event_threshold = 4096,
};
if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
adptr_p_conf.dequeue_depth =
dev_info.max_event_port_dequeue_depth;
if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
adptr_p_conf.enqueue_depth =
dev_info.max_event_port_enqueue_depth;
struct rte_event_eth_rx_adapter_queue_conf queue_conf;
memset(&queue_conf, 0, sizeof(queue_conf));
@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
uint32_t cap;
uint32_t service_id;
ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
ret = rte_event_eth_rx_adapter_create(i, evdev_id,
&adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE,
"failed to create rx adapter[%d]",
cdata.rx_adapter_id);
"failed to create rx adapter[%d]", i);
ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
if (ret)
@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
/* Producer needs to be scheduled. */
if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
ret = rte_event_eth_rx_adapter_service_id_get(i,
@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
ret = rte_event_eth_rx_adapter_start(i);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
cdata.rx_adapter_id);
i);
}
/* We already know that Tx adapter has INTERNAL port cap*/
ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
&adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
cdata.tx_adapter_id);
for (i = 0; i < nb_ports; i++) {
ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
-1);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Tx adapter");
}
ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
if (ret)
rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
cdata.tx_adapter_id);
if (adptr_services->nb_rx_adptrs) {
struct rte_service_spec service;
@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
&fdata->rxadptr_service_id);
if (ret)
rte_exit(EXIT_FAILURE,
"Rx adapter[%d] service register failed",
cdata.rx_adapter_id);
"Rx adapter service register failed");
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_component_runstate_set(fdata->rxadptr_service_id,
@ -708,23 +725,19 @@ init_rx_adapter(uint16_t nb_ports)
rte_free(adptr_services);
}
if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
(dev_info.event_dev_cap &
if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
fdata->cap.scheduler = NULL;
if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
memset(fdata->sched_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
}
static void
worker_tx_opt_check(void)
worker_tx_enq_opt_check(void)
{
int i;
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@ -734,32 +747,38 @@ worker_tx_opt_check(void)
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
sched_needed = !(eventdev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
if (ret)
rte_exit(EXIT_FAILURE,
"failed to get event rx adapter "
"capabilities");
"failed to get event rx adapter capabilities");
rx_needed |=
!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
}
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
(cdata.sched_lcore_mask == 0 &&
!(eventdev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
(sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
"\n\tworkers: %"PRIu64"\n",
cdata.rx_lcore_mask, cdata.tx_lcore_mask,
cdata.sched_lcore_mask,
cdata.worker_lcore_mask);
"\trx: %"PRIu64"\n\tsched: %"PRIu64
"\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask,
cdata.sched_lcore_mask, cdata.worker_lcore_mask);
rte_exit(-1, "Fix core masks\n");
}
if (!sched_needed)
memset(fdata->sched_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
if (!rx_needed)
memset(fdata->rx_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
}
static worker_loop
@ -821,18 +840,15 @@ get_worker_multi_stage(bool burst)
}
void
set_worker_tx_setup_data(struct setup_data *caps, bool burst)
set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
{
if (cdata.num_stages == 1)
caps->worker = get_worker_single_stage(burst);
else
caps->worker = get_worker_multi_stage(burst);
memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
caps->check_opt = worker_tx_opt_check;
caps->consumer = NULL;
caps->check_opt = worker_tx_enq_opt_check;
caps->scheduler = schedule_devices;
caps->evdev_setup = setup_eventdev_worker_tx;
caps->adptr_setup = init_rx_adapter;
caps->evdev_setup = setup_eventdev_worker_tx_enq;
caps->adptr_setup = init_adapters;
}