app/testeventdev: add order queue worker functions

Signed-off-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
This commit is contained in:
Jerin Jacob 2017-07-04 10:23:12 +05:30
parent 02ec2955cf
commit 4d04346f1e
2 changed files with 147 additions and 0 deletions

View File

@ -90,6 +90,53 @@ order_nb_event_ports(struct evt_options *opt)
return evt_nr_active_lcores(opt->wlcores) + 1 /* producer */;
}
static inline __attribute__((always_inline)) void
order_process_stage_1(struct test_order *const t,
struct rte_event *const ev, const uint32_t nb_flows,
uint32_t *const expected_flow_seq,
rte_atomic64_t *const outstand_pkts)
{
const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
/* compare the seqn against expected value */
if (ev->mbuf->seqn != expected_flow_seq[flow]) {
evt_err("flow=%x seqn mismatch got=%x expected=%x",
flow, ev->mbuf->seqn, expected_flow_seq[flow]);
t->err = true;
rte_smp_wmb();
}
/*
* Events from an atomic flow of an event queue can be scheduled only to
* a single port at a time. The port is guaranteed to have exclusive
* (atomic) access for given atomic flow.So we don't need to update
* expected_flow_seq in critical section.
*/
expected_flow_seq[flow]++;
rte_pktmbuf_free(ev->mbuf);
rte_atomic64_sub(outstand_pkts, 1);
}
static inline __attribute__((always_inline)) void
order_process_stage_invalid(struct test_order *const t,
struct rte_event *const ev)
{
evt_err("invalid queue %d", ev->queue_id);
t->err = true;
rte_smp_wmb();
}
#define ORDER_WORKER_INIT\
struct worker_data *w = arg;\
struct test_order *t = w->t;\
struct evt_options *opt = t->opt;\
const uint8_t dev_id = w->dev_id;\
const uint8_t port = w->port_id;\
const uint32_t nb_flows = t->nb_flows;\
uint32_t *expected_flow_seq = t->expected_flow_seq;\
rte_atomic64_t *outstand_pkts = &t->outstand_pkts;\
if (opt->verbose_level > 1)\
printf("%s(): lcore %d dev_id %d port=%d\n",\
__func__, rte_lcore_id(), dev_id, port)
int order_test_result(struct evt_test *test, struct evt_options *opt);
int order_opt_check(struct evt_options *opt);
int order_test_setup(struct evt_test *test, struct evt_options *opt);

View File

@ -37,6 +37,105 @@
/* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
static inline __attribute__((always_inline)) void
order_queue_process_stage_0(struct rte_event *const ev)
{
ev->queue_id = 1; /* q1 atomic queue */
ev->op = RTE_EVENT_OP_FORWARD;
ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
ev->event_type = RTE_EVENT_TYPE_CPU;
}
static int
order_queue_worker(void *arg)
{
ORDER_WORKER_INIT;
struct rte_event ev;
while (t->err == false) {
uint16_t event = rte_event_dequeue_burst(dev_id, port,
&ev, 1, 0);
if (!event) {
if (rte_atomic64_read(outstand_pkts) <= 0)
break;
rte_pause();
continue;
}
if (ev.queue_id == 0) { /* from ordered queue */
order_queue_process_stage_0(&ev);
while (rte_event_enqueue_burst(dev_id, port, &ev, 1)
!= 1)
rte_pause();
} else if (ev.queue_id == 1) { /* from atomic queue */
order_process_stage_1(t, &ev, nb_flows,
expected_flow_seq, outstand_pkts);
} else {
order_process_stage_invalid(t, &ev);
}
}
return 0;
}
static int
order_queue_worker_burst(void *arg)
{
ORDER_WORKER_INIT;
struct rte_event ev[BURST_SIZE];
uint16_t i;
while (t->err == false) {
uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev,
BURST_SIZE, 0);
if (nb_rx == 0) {
if (rte_atomic64_read(outstand_pkts) <= 0)
break;
rte_pause();
continue;
}
for (i = 0; i < nb_rx; i++) {
if (ev[i].queue_id == 0) { /* from ordered queue */
order_queue_process_stage_0(&ev[i]);
} else if (ev[i].queue_id == 1) {/* from atomic queue */
order_process_stage_1(t, &ev[i], nb_flows,
expected_flow_seq, outstand_pkts);
ev[i].op = RTE_EVENT_OP_RELEASE;
} else {
order_process_stage_invalid(t, &ev[i]);
}
}
uint16_t enq;
enq = rte_event_enqueue_burst(dev_id, port, ev, nb_rx);
while (enq < nb_rx) {
enq += rte_event_enqueue_burst(dev_id, port,
ev + enq, nb_rx - enq);
}
}
return 0;
}
static int
worker_wrapper(void *arg)
{
struct worker_data *w = arg;
const bool burst = evt_has_burst_mode(w->dev_id);
if (burst)
return order_queue_worker_burst(arg);
else
return order_queue_worker(arg);
}
static int
order_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
{
return order_launch_lcores(test, opt, worker_wrapper);
}
#define NB_QUEUES 2
static int
order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
@ -133,6 +232,7 @@ static const struct evt_test_ops order_queue = {
.test_setup = order_test_setup,
.mempool_setup = order_mempool_setup,
.eventdev_setup = order_queue_eventdev_setup,
.launch_lcores = order_queue_launch_lcores,
.eventdev_destroy = order_eventdev_destroy,
.mempool_destroy = order_mempool_destroy,
.test_result = order_test_result,