test/distributor: add test with packets marking

All of the former tests analyzed only statistics
of packets processed by all workers.
The new test verifies also if packets are processed
on workers as expected.
Every packets processed by the worker is marked
and analyzed after it is returned to distributor.

This test allows finding issues in matching algorithms.

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
This commit is contained in:
Lukasz Wojciechowski 2020-10-17 05:06:57 +02:00 committed by David Marchand
parent 626ceefbf4
commit 3c57ec0098

View File

@ -543,6 +543,141 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
return 0;
}
static int
handle_and_mark_work(void *arg)
{
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *db = wp->dist;
unsigned int num, i;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_RELAXED);
for (i = 0; i < num; i++)
buf[i]->udata64 += id + 1;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_RELAXED);
rte_distributor_return_pkt(db, id, buf, num);
return 0;
}
/* sanity_mark_test sends packets to workers which mark them.
* Every packet has also encoded sequence number.
* The returned packets are sorted and verified if they were handled
* by proper workers.
*/
static int
sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
{
const unsigned int buf_count = 24;
const unsigned int burst = 8;
const unsigned int shift = 12;
const unsigned int seq_shift = 10;
struct rte_distributor *db = wp->dist;
struct rte_mbuf *bufs[buf_count];
struct rte_mbuf *returns[buf_count];
unsigned int i, count, id;
unsigned int sorted[buf_count], seq;
unsigned int failed = 0;
printf("=== Marked packets test ===\n");
clear_packet_count();
if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
printf("line %d: Error getting mbufs from pool\n", __LINE__);
return -1;
}
/* bufs' hashes will be like these below, but shifted left.
* The shifting is for avoiding collisions with backlogs
* and in-flight tags left by previous tests.
* [1, 1, 1, 1, 1, 1, 1, 1
* 1, 1, 1, 1, 2, 2, 2, 2
* 2, 2, 2, 2, 1, 1, 1, 1]
*/
for (i = 0; i < burst; i++) {
bufs[0 * burst + i]->hash.usr = 1 << shift;
bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
<< shift;
bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
<< shift;
}
/* Assign a sequence number to each packet. The sequence is shifted,
* so that lower bits of the udate64 will hold mark from worker.
*/
for (i = 0; i < buf_count; i++)
bufs[i]->udata64 = i << seq_shift;
count = 0;
for (i = 0; i < buf_count/burst; i++) {
rte_distributor_process(db, &bufs[i * burst], burst);
count += rte_distributor_returned_pkts(db, &returns[count],
buf_count - count);
}
do {
rte_distributor_flush(db);
count += rte_distributor_returned_pkts(db, &returns[count],
buf_count - count);
} while (count < buf_count);
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(&worker_stats[i].handled_packets,
__ATOMIC_RELAXED));
/* Sort returned packets by sent order (sequence numbers). */
for (i = 0; i < buf_count; i++) {
seq = returns[i]->udata64 >> seq_shift;
id = returns[i]->udata64 - (seq << seq_shift);
sorted[seq] = id;
}
/* Verify that packets [0-11] and [20-23] were processed
* by the same worker
*/
for (i = 1; i < 12; i++) {
if (sorted[i] != sorted[0]) {
printf("Packet number %u processed by worker %u,"
" but should be processes by worker %u\n",
i, sorted[i], sorted[0]);
failed = 1;
}
}
for (i = 20; i < 24; i++) {
if (sorted[i] != sorted[0]) {
printf("Packet number %u processed by worker %u,"
" but should be processes by worker %u\n",
i, sorted[i], sorted[0]);
failed = 1;
}
}
/* And verify that packets [12-19] were processed
* by the another worker
*/
for (i = 13; i < 20; i++) {
if (sorted[i] != sorted[12]) {
printf("Packet number %u processed by worker %u,"
" but should be processes by worker %u\n",
i, sorted[i], sorted[12]);
failed = 1;
}
}
rte_mempool_put_bulk(p, (void *)bufs, buf_count);
if (failed)
return -1;
printf("Marked packets test passed\n");
return 0;
}
static
int test_error_distributor_create_name(void)
{
@ -727,6 +862,12 @@ test_distributor(void)
goto err;
quit_workers(&worker_params, p);
rte_eal_mp_remote_launch(handle_and_mark_work,
&worker_params, SKIP_MASTER);
if (sanity_mark_test(&worker_params, p) < 0)
goto err;
quit_workers(&worker_params, p);
} else {
printf("Too few cores to run worker shutdown test\n");
}