examples/distributor: update dynamic configuration
In this patch, * It is possible to switch the running mode of the distributor using the command line argument. * With "-c" parameter, you can run RX and Distributor on the same core. * Without "-c" parameter, you can run RX and Distributor on the different core. * Consecutive termination of the lcores fixed. The termination order was wrong, and you couldn't terminate the application while traffic was capturing. The current order is RX -> Distributor -> TX -> Workers * When "-c" parameter is active, the wasted distributor core is also deactivated in the main function. Signed-off-by: Abdullah Ömer Yamaç <omer.yamac@ceng.metu.edu.tr>
This commit is contained in:
parent
450356b2ed
commit
3429d6dd5c
@ -42,11 +42,12 @@ Running the Application
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
./<build-dir>/examples/dpdk-distributor [EAL options] -- -p PORTMASK
|
||||
./<build-dir>/examples/dpdk-distributor [EAL options] -- -p PORTMASK [-c]
|
||||
|
||||
where,
|
||||
|
||||
* -p PORTMASK: Hexadecimal bitmask of ports to configure
|
||||
* -c: Combines the RX core with distribution core
|
||||
|
||||
#. To run the application in linux environment with 10 lcores, 4 ports,
|
||||
issue the command:
|
||||
|
@ -40,6 +40,8 @@ volatile uint8_t quit_signal_rx;
|
||||
volatile uint8_t quit_signal_dist;
|
||||
volatile uint8_t quit_signal_work;
|
||||
unsigned int power_lib_initialised;
|
||||
bool enable_lcore_rx_distributor;
|
||||
unsigned int num_workers;
|
||||
|
||||
static volatile struct app_stats {
|
||||
struct {
|
||||
@ -257,13 +259,81 @@ lcore_rx(struct lcore_params *p)
|
||||
}
|
||||
app_stats.rx.rx_pkts += nb_rx;
|
||||
|
||||
/*
|
||||
* You can run the distributor on the rx core with this code. Returned
|
||||
/*
|
||||
* Swap the following two lines if you want the rx traffic
|
||||
* to go directly to tx, no distribution.
|
||||
*/
|
||||
struct rte_ring *out_ring = p->rx_dist_ring;
|
||||
/* struct rte_ring *out_ring = p->dist_tx_ring; */
|
||||
|
||||
uint16_t sent = rte_ring_enqueue_burst(out_ring,
|
||||
(void *)bufs, nb_rx, NULL);
|
||||
|
||||
app_stats.rx.enqueued_pkts += sent;
|
||||
if (unlikely(sent < nb_rx)) {
|
||||
app_stats.rx.enqdrop_pkts += nb_rx - sent;
|
||||
RTE_LOG_DP(DEBUG, DISTRAPP,
|
||||
"%s:Packet loss due to full ring\n", __func__);
|
||||
while (sent < nb_rx)
|
||||
rte_pktmbuf_free(bufs[sent++]);
|
||||
}
|
||||
if (++port == nb_ports)
|
||||
port = 0;
|
||||
}
|
||||
if (power_lib_initialised)
|
||||
rte_power_exit(rte_lcore_id());
|
||||
printf("\nCore %u exiting rx task.\n", rte_lcore_id());
|
||||
/* set distributor threads quit flag */
|
||||
quit_signal_dist = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
lcore_rx_and_distributor(struct lcore_params *p)
|
||||
{
|
||||
struct rte_distributor *d = p->d;
|
||||
const uint16_t nb_ports = rte_eth_dev_count_avail();
|
||||
const int socket_id = rte_socket_id();
|
||||
uint16_t port;
|
||||
struct rte_mbuf *bufs[BURST_SIZE*2];
|
||||
|
||||
RTE_ETH_FOREACH_DEV(port) {
|
||||
/* skip ports that are not enabled */
|
||||
if ((enabled_port_mask & (1 << port)) == 0)
|
||||
continue;
|
||||
|
||||
if (rte_eth_dev_socket_id(port) > 0 &&
|
||||
rte_eth_dev_socket_id(port) != socket_id)
|
||||
printf("WARNING, port %u is on remote NUMA node to "
|
||||
"RX thread.\n\tPerformance will not "
|
||||
"be optimal.\n", port);
|
||||
}
|
||||
|
||||
printf("\nCore %u doing packet RX and Distributor.\n", rte_lcore_id());
|
||||
port = 0;
|
||||
while (!quit_signal_rx) {
|
||||
|
||||
/* skip ports that are not enabled */
|
||||
if ((enabled_port_mask & (1 << port)) == 0) {
|
||||
if (++port == nb_ports)
|
||||
port = 0;
|
||||
continue;
|
||||
}
|
||||
const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
|
||||
BURST_SIZE);
|
||||
if (unlikely(nb_rx == 0)) {
|
||||
if (++port == nb_ports)
|
||||
port = 0;
|
||||
continue;
|
||||
}
|
||||
app_stats.rx.rx_pkts += nb_rx;
|
||||
|
||||
/*
|
||||
* Run the distributor on the rx core. Returned
|
||||
* packets are then send straight to the tx core.
|
||||
*/
|
||||
#if 0
|
||||
rte_distributor_process(p->d, bufs, nb_rx);
|
||||
const uint16_t nb_ret = rte_distributor_returned_pkts(p->d,
|
||||
rte_distributor_process(d, bufs, nb_rx);
|
||||
const uint16_t nb_ret = rte_distributor_returned_pkts(d,
|
||||
bufs, BURST_SIZE*2);
|
||||
|
||||
app_stats.rx.returned_pkts += nb_ret;
|
||||
@ -276,18 +346,6 @@ lcore_rx(struct lcore_params *p)
|
||||
struct rte_ring *tx_ring = p->dist_tx_ring;
|
||||
uint16_t sent = rte_ring_enqueue_burst(tx_ring,
|
||||
(void *)bufs, nb_ret, NULL);
|
||||
#else
|
||||
uint16_t nb_ret = nb_rx;
|
||||
/*
|
||||
* Swap the following two lines if you want the rx traffic
|
||||
* to go directly to tx, no distribution.
|
||||
*/
|
||||
struct rte_ring *out_ring = p->rx_dist_ring;
|
||||
/* struct rte_ring *out_ring = p->dist_tx_ring; */
|
||||
|
||||
uint16_t sent = rte_ring_enqueue_burst(out_ring,
|
||||
(void *)bufs, nb_ret, NULL);
|
||||
#endif
|
||||
|
||||
app_stats.rx.enqueued_pkts += sent;
|
||||
if (unlikely(sent < nb_ret)) {
|
||||
@ -302,9 +360,14 @@ lcore_rx(struct lcore_params *p)
|
||||
}
|
||||
if (power_lib_initialised)
|
||||
rte_power_exit(rte_lcore_id());
|
||||
/* set worker & tx threads quit flag */
|
||||
printf("\nCore %u exiting rx task.\n", rte_lcore_id());
|
||||
/* set tx threads quit flag */
|
||||
quit_signal = 1;
|
||||
/* set worker threads quit flag */
|
||||
quit_signal_work = 1;
|
||||
rte_distributor_flush(d);
|
||||
/* Unblock any returns so workers can exit */
|
||||
rte_distributor_clear_returns(d);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -382,14 +445,16 @@ lcore_distributor(struct lcore_params *p)
|
||||
}
|
||||
}
|
||||
}
|
||||
printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
|
||||
quit_signal_work = 1;
|
||||
if (power_lib_initialised)
|
||||
rte_power_exit(rte_lcore_id());
|
||||
printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
|
||||
/* set tx threads quit flag */
|
||||
quit_signal = 1;
|
||||
/* set worker threads quit flag */
|
||||
quit_signal_work = 1;
|
||||
rte_distributor_flush(d);
|
||||
/* Unblock any returns so workers can exit */
|
||||
rte_distributor_clear_returns(d);
|
||||
quit_signal_rx = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -468,7 +533,7 @@ int_handler(int sig_num)
|
||||
{
|
||||
printf("Exiting on signal %d\n", sig_num);
|
||||
/* set quit flag for rx thread to exit */
|
||||
quit_signal_dist = 1;
|
||||
quit_signal_rx = 1;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -476,7 +541,6 @@ print_stats(void)
|
||||
{
|
||||
struct rte_eth_stats eth_stats;
|
||||
unsigned int i, j;
|
||||
const unsigned int num_workers = rte_lcore_count() - 4;
|
||||
|
||||
RTE_ETH_FOREACH_DEV(i) {
|
||||
rte_eth_stats_get(i, ð_stats);
|
||||
@ -505,6 +569,7 @@ print_stats(void)
|
||||
prev_app_stats.rx.enqdrop_pkts)/1000000.0,
|
||||
ANSI_COLOR_RESET);
|
||||
|
||||
if (!enable_lcore_rx_distributor) {
|
||||
printf("Distributor thread:\n");
|
||||
printf(" - In: %5.2f\n",
|
||||
(app_stats.dist.in_pkts -
|
||||
@ -519,6 +584,7 @@ print_stats(void)
|
||||
(app_stats.dist.enqdrop_pkts -
|
||||
prev_app_stats.dist.enqdrop_pkts)/1000000.0,
|
||||
ANSI_COLOR_RESET);
|
||||
}
|
||||
|
||||
printf("TX thread:\n");
|
||||
printf(" - Dequeued: %5.2f\n",
|
||||
@ -630,8 +696,9 @@ init_power_library(void)
|
||||
static void
|
||||
print_usage(const char *prgname)
|
||||
{
|
||||
printf("%s [EAL options] -- -p PORTMASK\n"
|
||||
" -p PORTMASK: hexadecimal bitmask of ports to configure\n",
|
||||
printf("%s [EAL options] -- -p PORTMASK [-c]\n"
|
||||
" -p PORTMASK: hexadecimal bitmask of ports to configure\n"
|
||||
" -c: Combines the RX core with the distribution core\n",
|
||||
prgname);
|
||||
}
|
||||
|
||||
@ -662,8 +729,8 @@ parse_args(int argc, char **argv)
|
||||
};
|
||||
|
||||
argvopt = argv;
|
||||
|
||||
while ((opt = getopt_long(argc, argvopt, "p:",
|
||||
enable_lcore_rx_distributor = false;
|
||||
while ((opt = getopt_long(argc, argvopt, "cp:",
|
||||
lgopts, &option_index)) != EOF) {
|
||||
|
||||
switch (opt) {
|
||||
@ -677,6 +744,10 @@ parse_args(int argc, char **argv)
|
||||
}
|
||||
break;
|
||||
|
||||
case 'c':
|
||||
enable_lcore_rx_distributor = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
print_usage(prgname);
|
||||
return -1;
|
||||
@ -706,6 +777,7 @@ main(int argc, char *argv[])
|
||||
unsigned int lcore_id, worker_id = 0;
|
||||
int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
|
||||
unsigned nb_ports;
|
||||
unsigned int min_cores;
|
||||
uint16_t portid;
|
||||
uint16_t nb_ports_available;
|
||||
uint64_t t, freq;
|
||||
@ -725,12 +797,21 @@ main(int argc, char *argv[])
|
||||
if (ret < 0)
|
||||
rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
|
||||
|
||||
if (rte_lcore_count() < 5)
|
||||
if (enable_lcore_rx_distributor) {
|
||||
/* RX and distributor combined, 3 fixed function cores (stat, TX, at least 1 worker) */
|
||||
min_cores = 4;
|
||||
num_workers = rte_lcore_count() - 3;
|
||||
} else {
|
||||
/* separate RX and distributor, 3 fixed function cores (stat, TX, at least 1 worker) */
|
||||
min_cores = 5;
|
||||
num_workers = rte_lcore_count() - 4;
|
||||
}
|
||||
|
||||
if (rte_lcore_count() < min_cores)
|
||||
rte_exit(EXIT_FAILURE, "Error, This application needs at "
|
||||
"least 5 logical cores to run:\n"
|
||||
"least 4 logical cores to run:\n"
|
||||
"1 lcore for stats (can be core 0)\n"
|
||||
"1 lcore for packet RX\n"
|
||||
"1 lcore for distribution\n"
|
||||
"1 or 2 lcore for packet RX and distribution\n"
|
||||
"1 lcore for packet TX\n"
|
||||
"and at least 1 lcore for worker threads\n");
|
||||
|
||||
@ -773,7 +854,7 @@ main(int argc, char *argv[])
|
||||
}
|
||||
|
||||
d = rte_distributor_create("PKT_DIST", rte_socket_id(),
|
||||
rte_lcore_count() - 4,
|
||||
num_workers,
|
||||
RTE_DIST_ALG_BURST);
|
||||
if (d == NULL)
|
||||
rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
|
||||
@ -809,7 +890,7 @@ main(int argc, char *argv[])
|
||||
if (lcore_cap.priority != 1)
|
||||
continue;
|
||||
|
||||
if (distr_core_id < 0) {
|
||||
if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
|
||||
distr_core_id = lcore_id;
|
||||
printf("Distributor on priority core %d\n",
|
||||
lcore_id);
|
||||
@ -840,7 +921,7 @@ main(int argc, char *argv[])
|
||||
lcore_id == (unsigned int)rx_core_id ||
|
||||
lcore_id == (unsigned int)tx_core_id)
|
||||
continue;
|
||||
if (distr_core_id < 0) {
|
||||
if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
|
||||
distr_core_id = lcore_id;
|
||||
printf("Distributor on core %d\n", lcore_id);
|
||||
continue;
|
||||
@ -857,6 +938,11 @@ main(int argc, char *argv[])
|
||||
}
|
||||
}
|
||||
|
||||
if (enable_lcore_rx_distributor)
|
||||
printf(" tx id %d, rx id %d\n",
|
||||
tx_core_id,
|
||||
rx_core_id);
|
||||
else
|
||||
printf(" tx id %d, dist id %d, rx id %d\n",
|
||||
tx_core_id,
|
||||
distr_core_id,
|
||||
@ -889,15 +975,16 @@ main(int argc, char *argv[])
|
||||
dist_tx_ring, tx_core_id);
|
||||
|
||||
/* Start distributor core */
|
||||
struct lcore_params *pd =
|
||||
rte_malloc(NULL, sizeof(*pd), 0);
|
||||
struct lcore_params *pd = NULL;
|
||||
if (!enable_lcore_rx_distributor) {
|
||||
pd = rte_malloc(NULL, sizeof(*pd), 0);
|
||||
if (!pd)
|
||||
rte_panic("malloc failure\n");
|
||||
*pd = (struct lcore_params){worker_id++, d,
|
||||
rx_dist_ring, dist_tx_ring, mbuf_pool};
|
||||
rte_eal_remote_launch(
|
||||
(lcore_function_t *)lcore_distributor,
|
||||
rte_eal_remote_launch((lcore_function_t *)lcore_distributor,
|
||||
pd, distr_core_id);
|
||||
}
|
||||
|
||||
/* Start rx core */
|
||||
struct lcore_params *pr =
|
||||
@ -906,12 +993,16 @@ main(int argc, char *argv[])
|
||||
rte_panic("malloc failure\n");
|
||||
*pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
|
||||
dist_tx_ring, mbuf_pool};
|
||||
if (enable_lcore_rx_distributor)
|
||||
rte_eal_remote_launch((lcore_function_t *)lcore_rx_and_distributor,
|
||||
pr, rx_core_id);
|
||||
else
|
||||
rte_eal_remote_launch((lcore_function_t *)lcore_rx,
|
||||
pr, rx_core_id);
|
||||
|
||||
freq = rte_get_timer_hz();
|
||||
t = rte_rdtsc() + freq;
|
||||
while (!quit_signal_dist) {
|
||||
while (!quit_signal) {
|
||||
if (t < rte_rdtsc()) {
|
||||
print_stats();
|
||||
t = rte_rdtsc() + freq;
|
||||
@ -926,6 +1017,7 @@ main(int argc, char *argv[])
|
||||
|
||||
print_stats();
|
||||
|
||||
if (pd)
|
||||
rte_free(pd);
|
||||
rte_free(pr);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user