test/distributor: fix return buffer queue overload

The distributor library implementation uses a cyclic queue to store
packets returned from workers. These packets can be later collected
with rte_distributor_returned_pkts() call.
However the queue has limited capacity. It is able to contain only
127 packets (RTE_DISTRIB_RETURNS_MASK).

Big burst tests sent 1024 packets in 32 packets bursts without waiting
until they are processed by the distributor. In case when tests were
run with big number of worker threads, it happened that more than
127 packets were returned from workers and put into cyclic queue.
This caused packets to be dropped by the queue, making them impossible
to be collected later with rte_distributor_returned_pkts() calls.
However the test waited for all packets to be returned infinitely.

This patch fixes the big burst test by not allowing more than
queue capacity packets to be processed at the same time, making
impossible to drop any packets.
It also cleans up duplicated code in the same test.

Bugzilla ID: 612
Fixes: c0de0eb82e ("distributor: switch over to new API")
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Tested-by: David Marchand <david.marchand@redhat.com>
Reviewed-by: David Hunt <david.hunt@intel.com>
This commit is contained in:
Lukasz Wojciechowski 2021-01-19 04:59:10 +01:00 committed by David Marchand
parent b49c677a0d
commit 95bb247702

View File

@ -217,6 +217,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
clear_packet_count(); clear_packet_count();
struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
unsigned num_returned = 0; unsigned num_returned = 0;
unsigned int num_being_processed = 0;
unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */
/* flush out any remaining packets */ /* flush out any remaining packets */
rte_distributor_flush(db); rte_distributor_flush(db);
@ -233,16 +235,16 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < BIG_BATCH/BURST; i++) { for (i = 0; i < BIG_BATCH/BURST; i++) {
rte_distributor_process(db, rte_distributor_process(db,
&many_bufs[i*BURST], BURST); &many_bufs[i*BURST], BURST);
count = rte_distributor_returned_pkts(db, num_being_processed += BURST;
&return_bufs[num_returned], do {
BIG_BATCH - num_returned); count = rte_distributor_returned_pkts(db,
num_returned += count; &return_bufs[num_returned],
BIG_BATCH - num_returned);
num_being_processed -= count;
num_returned += count;
rte_distributor_flush(db);
} while (num_being_processed + BURST > return_buffer_capacity);
} }
rte_distributor_flush(db);
count = rte_distributor_returned_pkts(db,
&return_bufs[num_returned],
BIG_BATCH - num_returned);
num_returned += count;
retries = 0; retries = 0;
do { do {
rte_distributor_flush(db); rte_distributor_flush(db);