eventdev: add implicit release disable capability
This commit introduces a capability for disabling the "implicit" release functionality for a port, which prevents the eventdev PMD from issuing outstanding releases for previously dequeued events when dequeuing a new batch of events. If a PMD does not support this capability, the application will receive an error if it attempts to setup a port with implicit releases disabled. Otherwise, if the port is configured with implicit releases disabled, the application must release each dequeued event by invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or RTE_EVENT_OP_FORWARD. Signed-off-by: Gage Eads <gage.eads@intel.com> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
This commit is contained in:
parent
dca926ca9f
commit
ec36d881f5
@ -414,6 +414,7 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
|
||||
DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
|
||||
port_conf->enqueue_depth =
|
||||
DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
|
||||
port_conf->disable_implicit_release = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -235,6 +235,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
|
||||
port_conf->new_event_threshold = edev->max_num_events;
|
||||
port_conf->dequeue_depth = 1;
|
||||
port_conf->enqueue_depth = 1;
|
||||
port_conf->disable_implicit_release = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -209,6 +209,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
|
||||
port_conf->new_event_threshold = 32 * 1024;
|
||||
port_conf->dequeue_depth = 16;
|
||||
port_conf->enqueue_depth = 16;
|
||||
port_conf->disable_implicit_release = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -163,6 +163,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
|
||||
}
|
||||
|
||||
p->inflight_max = conf->new_event_threshold;
|
||||
p->implicit_release = !conf->disable_implicit_release;
|
||||
|
||||
/* check if ring exists, same as rx_worker above */
|
||||
snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
|
||||
@ -385,6 +386,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
|
||||
port_conf->new_event_threshold = 1024;
|
||||
port_conf->dequeue_depth = 16;
|
||||
port_conf->enqueue_depth = 16;
|
||||
port_conf->disable_implicit_release = 0;
|
||||
}
|
||||
|
||||
static int
|
||||
@ -454,9 +456,11 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
|
||||
.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
|
||||
.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
|
||||
.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
|
||||
.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
|
||||
RTE_EVENT_DEV_CAP_BURST_MODE |
|
||||
RTE_EVENT_DEV_CAP_EVENT_QOS),
|
||||
.event_dev_cap = (
|
||||
RTE_EVENT_DEV_CAP_QUEUE_QOS |
|
||||
RTE_EVENT_DEV_CAP_BURST_MODE |
|
||||
RTE_EVENT_DEV_CAP_EVENT_QOS |
|
||||
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE),
|
||||
};
|
||||
|
||||
*info = evdev_sw_info;
|
||||
|
@ -185,6 +185,7 @@ struct sw_port {
|
||||
uint16_t outstanding_releases __rte_cache_aligned;
|
||||
uint16_t inflight_max; /* app requested max inflights for this port */
|
||||
uint16_t inflight_credits; /* num credits this port has right now */
|
||||
uint8_t implicit_release; /* release events before dequeueing */
|
||||
|
||||
uint16_t last_dequeue_burst_sz; /* how big the burst was */
|
||||
uint64_t last_dequeue_ticks; /* used to track burst processing time */
|
||||
|
@ -81,7 +81,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t forwards = 0;
|
||||
uint32_t completions = 0;
|
||||
for (i = 0; i < num; i++) {
|
||||
int op = ev[i].op;
|
||||
int outstanding = p->outstanding_releases > 0;
|
||||
@ -90,7 +90,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
|
||||
p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
|
||||
p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
|
||||
outstanding;
|
||||
forwards += (op == RTE_EVENT_OP_FORWARD);
|
||||
|
||||
new_ops[i] = sw_qe_flag_map[op];
|
||||
new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
|
||||
@ -99,8 +98,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
|
||||
* correct usage of the API), providing very high correct
|
||||
* prediction rate.
|
||||
*/
|
||||
if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
|
||||
if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
|
||||
p->outstanding_releases--;
|
||||
completions++;
|
||||
}
|
||||
|
||||
/* error case: branch to avoid touching p->stats */
|
||||
if (unlikely(invalid_qid)) {
|
||||
@ -109,8 +110,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
|
||||
}
|
||||
}
|
||||
|
||||
/* handle directed port forward credits */
|
||||
p->inflight_credits -= forwards * p->is_directed;
|
||||
/* handle directed port forward and release credits */
|
||||
p->inflight_credits -= completions * p->is_directed;
|
||||
|
||||
/* returns number of events actually enqueued */
|
||||
uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
|
||||
@ -144,7 +145,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
|
||||
uint32_t credit_update_quanta = sw->credit_update_quanta;
|
||||
|
||||
/* check that all previous dequeues have been released */
|
||||
if (!p->is_directed) {
|
||||
if (p->implicit_release && !p->is_directed) {
|
||||
uint16_t out_rels = p->outstanding_releases;
|
||||
uint16_t i;
|
||||
for (i = 0; i < out_rels; i++)
|
||||
@ -154,7 +155,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
|
||||
/* returns number of events actually dequeued */
|
||||
uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
|
||||
if (unlikely(ndeq == 0)) {
|
||||
p->outstanding_releases = 0;
|
||||
p->zero_polls++;
|
||||
p->total_polls++;
|
||||
goto end;
|
||||
@ -162,7 +162,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
|
||||
|
||||
/* only add credits for directed ports - LB ports send RELEASEs */
|
||||
p->inflight_credits += ndeq * p->is_directed;
|
||||
p->outstanding_releases = ndeq;
|
||||
p->outstanding_releases += ndeq;
|
||||
p->last_dequeue_burst_sz = ndeq;
|
||||
p->last_dequeue_ticks = rte_get_timer_cycles();
|
||||
p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
|
||||
|
@ -34,6 +34,7 @@ struct prod_data {
|
||||
struct cons_data {
|
||||
uint8_t dev_id;
|
||||
uint8_t port_id;
|
||||
uint8_t release;
|
||||
} __rte_cache_aligned;
|
||||
|
||||
static struct prod_data prod_data;
|
||||
@ -139,6 +140,18 @@ consumer(void)
|
||||
uint8_t outport = packets[i].mbuf->port;
|
||||
rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
|
||||
packets[i].mbuf);
|
||||
|
||||
packets[i].op = RTE_EVENT_OP_RELEASE;
|
||||
}
|
||||
|
||||
if (cons_data.release) {
|
||||
uint16_t nb_tx;
|
||||
|
||||
nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
|
||||
while (nb_tx < n)
|
||||
nb_tx += rte_event_enqueue_burst(dev_id, port_id,
|
||||
packets + nb_tx,
|
||||
n - nb_tx);
|
||||
}
|
||||
|
||||
/* Print out mpps every 1<22 packets */
|
||||
@ -685,6 +698,7 @@ setup_eventdev(struct prod_data *prod_data,
|
||||
};
|
||||
|
||||
struct port_link worker_queues[MAX_NUM_STAGES];
|
||||
uint8_t disable_implicit_release;
|
||||
struct port_link tx_queue;
|
||||
unsigned int i;
|
||||
|
||||
@ -698,6 +712,12 @@ setup_eventdev(struct prod_data *prod_data,
|
||||
ret = rte_event_dev_info_get(dev_id, &dev_info);
|
||||
printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
|
||||
|
||||
disable_implicit_release = (dev_info.event_dev_cap &
|
||||
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
|
||||
|
||||
wkr_p_conf.disable_implicit_release = disable_implicit_release;
|
||||
tx_p_conf.disable_implicit_release = disable_implicit_release;
|
||||
|
||||
if (dev_info.max_event_port_dequeue_depth <
|
||||
config.nb_event_port_dequeue_depth)
|
||||
config.nb_event_port_dequeue_depth =
|
||||
@ -806,6 +826,7 @@ setup_eventdev(struct prod_data *prod_data,
|
||||
.dequeue_depth = 8,
|
||||
.enqueue_depth = 8,
|
||||
.new_event_threshold = 1200,
|
||||
.disable_implicit_release = disable_implicit_release,
|
||||
};
|
||||
|
||||
if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
|
||||
@ -822,7 +843,8 @@ setup_eventdev(struct prod_data *prod_data,
|
||||
.port_id = i + 1,
|
||||
.qid = cdata.qid[0] };
|
||||
*cons_data = (struct cons_data){.dev_id = dev_id,
|
||||
.port_id = i };
|
||||
.port_id = i,
|
||||
.release = disable_implicit_release };
|
||||
|
||||
ret = rte_event_dev_service_id_get(dev_id,
|
||||
&fdata->evdev_service_id);
|
||||
|
@ -658,6 +658,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id,
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (port_conf && port_conf->disable_implicit_release &&
|
||||
!(dev->data->event_dev_cap &
|
||||
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
|
||||
RTE_EDEV_LOG_ERR(
|
||||
"dev%d port%d Implicit release disable not supported",
|
||||
dev_id, port_id);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (dev->data->dev_started) {
|
||||
RTE_EDEV_LOG_ERR(
|
||||
"device %d must be stopped to allow port setup", dev_id);
|
||||
|
@ -283,6 +283,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */
|
||||
*
|
||||
* @see rte_event_dequeue_burst() rte_event_enqueue_burst()
|
||||
*/
|
||||
#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE (1ULL << 5)
|
||||
/**< Event device ports support disabling the implicit release feature, in
|
||||
* which the port will release all unreleased events in its dequeue operation.
|
||||
* If this capability is set and the port is configured with implicit release
|
||||
* disabled, the application is responsible for explicitly releasing events
|
||||
* using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event
|
||||
* enqueue operations.
|
||||
*
|
||||
* @see rte_event_dequeue_burst() rte_event_enqueue_burst()
|
||||
*/
|
||||
|
||||
/* Event device priority levels */
|
||||
#define RTE_EVENT_DEV_PRIORITY_HIGHEST 0
|
||||
@ -687,6 +697,13 @@ struct rte_event_port_conf {
|
||||
* which previously supplied to rte_event_dev_configure().
|
||||
* Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable.
|
||||
*/
|
||||
uint8_t disable_implicit_release;
|
||||
/**< Configure the port not to release outstanding events in
|
||||
* rte_event_dev_dequeue_burst(). If true, all events received through
|
||||
* the port must be explicitly released with RTE_EVENT_OP_RELEASE or
|
||||
* RTE_EVENT_OP_FORWARD. Must be false when the device is not
|
||||
* RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable.
|
||||
*/
|
||||
};
|
||||
|
||||
/**
|
||||
@ -1382,9 +1399,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns,
|
||||
*
|
||||
* The number of events dequeued is the number of scheduler contexts held by
|
||||
* this port. These contexts are automatically released in the next
|
||||
* rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst()
|
||||
* with RTE_EVENT_OP_RELEASE operation can be used to release the
|
||||
* contexts early.
|
||||
* rte_event_dequeue_burst() invocation if the port supports implicit
|
||||
* releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE
|
||||
* operation can be used to release the contexts early.
|
||||
*
|
||||
* Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be
|
||||
* enqueued to the same port that their associated events were dequeued from.
|
||||
|
@ -553,6 +553,15 @@ test_eventdev_port_setup(void)
|
||||
ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
|
||||
TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
|
||||
|
||||
if (!(info.event_dev_cap &
|
||||
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
|
||||
pconf.enqueue_depth = info.max_event_port_enqueue_depth;
|
||||
pconf.disable_implicit_release = 1;
|
||||
ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
|
||||
TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
|
||||
pconf.disable_implicit_release = 0;
|
||||
}
|
||||
|
||||
ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports,
|
||||
&pconf);
|
||||
TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
|
||||
|
@ -171,6 +171,7 @@ create_ports(struct test *t, int num_ports)
|
||||
.new_event_threshold = 1024,
|
||||
.dequeue_depth = 32,
|
||||
.enqueue_depth = 64,
|
||||
.disable_implicit_release = 0,
|
||||
};
|
||||
if (num_ports > MAX_PORTS)
|
||||
return -1;
|
||||
@ -1225,6 +1226,7 @@ port_reconfig_credits(struct test *t)
|
||||
.new_event_threshold = 128,
|
||||
.dequeue_depth = 32,
|
||||
.enqueue_depth = 64,
|
||||
.disable_implicit_release = 0,
|
||||
};
|
||||
if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
|
||||
printf("%d Error setting up port\n", __LINE__);
|
||||
@ -1314,6 +1316,7 @@ port_single_lb_reconfig(struct test *t)
|
||||
.new_event_threshold = 128,
|
||||
.dequeue_depth = 32,
|
||||
.enqueue_depth = 64,
|
||||
.disable_implicit_release = 0,
|
||||
};
|
||||
if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
|
||||
printf("%d Error setting up port\n", __LINE__);
|
||||
@ -2906,7 +2909,7 @@ worker_loopback_producer_fn(void *arg)
|
||||
}
|
||||
|
||||
static int
|
||||
worker_loopback(struct test *t)
|
||||
worker_loopback(struct test *t, uint8_t disable_implicit_release)
|
||||
{
|
||||
/* use a single producer core, and a worker core to see what happens
|
||||
* if the worker loops packets back multiple times
|
||||
@ -2932,6 +2935,7 @@ worker_loopback(struct test *t)
|
||||
* only be initialized once - and this needs to be set for multiple runs
|
||||
*/
|
||||
conf.new_event_threshold = 512;
|
||||
conf.disable_implicit_release = disable_implicit_release;
|
||||
|
||||
if (rte_event_port_setup(evdev, 0, &conf) < 0) {
|
||||
printf("Error setting up RX port\n");
|
||||
@ -3206,15 +3210,23 @@ test_sw_eventdev(void)
|
||||
}
|
||||
if (rte_lcore_count() >= 3) {
|
||||
printf("*** Running Worker loopback test...\n");
|
||||
ret = worker_loopback(t);
|
||||
ret = worker_loopback(t, 0);
|
||||
if (ret != 0) {
|
||||
printf("ERROR - Worker loopback test FAILED.\n");
|
||||
return ret;
|
||||
}
|
||||
|
||||
printf("*** Running Worker loopback test (implicit release disabled)...\n");
|
||||
ret = worker_loopback(t, 1);
|
||||
if (ret != 0) {
|
||||
printf("ERROR - Worker loopback test FAILED.\n");
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
printf("### Not enough cores for worker loopback test.\n");
|
||||
printf("### Need at least 3 cores for test.\n");
|
||||
printf("### Not enough cores for worker loopback tests.\n");
|
||||
printf("### Need at least 3 cores for the tests.\n");
|
||||
}
|
||||
|
||||
/*
|
||||
* Free test instance, leaving mempool initialized, and a pointer to it
|
||||
* in static eventdev_func_mempool, as it is re-used on re-runs
|
||||
|
Loading…
Reference in New Issue
Block a user