event/dpaa: support select based event

This patch add support to use select call with qman portal fd
for timeout based dequeue request for eventdev.

If there is a event available qman portal fd will be set
and the function will be awakened. If no event is available,
it will only wait till the given timeout value.

In case of interrupt the timeout ticks are used as usecs.

Signed-off-by: Hemant Agrawal <hemant.agrawal@nxp.com>
This commit is contained in:
Hemant Agrawal 2018-09-25 12:32:35 +05:30 committed by Jerin Jacob
parent e7bbddb846
commit 77b5311d0e
3 changed files with 248 additions and 49 deletions

View File

@ -122,6 +122,8 @@ Example:
./your_eventdev_application --vdev="event_dpaa1"
* Use dev arg option ``disable_intr=1`` to disable the interrupt mode
Limitations
-----------

View File

@ -30,6 +30,7 @@
#include <rte_dpaa_bus.h>
#include <rte_dpaa_logs.h>
#include <rte_cycles.h>
#include <rte_kvargs.h>
#include <dpaa_ethdev.h>
#include "dpaa_eventdev.h"
@ -43,19 +44,31 @@
* 1 Eventdev can have N Eventqueue
*/
#define DISABLE_INTR_MODE "disable_intr"
static int
dpaa_event_dequeue_timeout_ticks(struct rte_eventdev *dev, uint64_t ns,
uint64_t *timeout_ticks)
{
uint64_t cycles_per_second;
EVENTDEV_INIT_FUNC_TRACE();
RTE_SET_USED(dev);
cycles_per_second = rte_get_timer_hz();
*timeout_ticks = ns * (cycles_per_second / NS_PER_S);
uint64_t cycles_per_second;
cycles_per_second = rte_get_timer_hz();
*timeout_ticks = (ns * cycles_per_second) / NS_PER_S;
return 0;
}
static int
dpaa_event_dequeue_timeout_ticks_intr(struct rte_eventdev *dev, uint64_t ns,
uint64_t *timeout_ticks)
{
RTE_SET_USED(dev);
*timeout_ticks = ns/1000;
return 0;
}
@ -100,6 +113,56 @@ dpaa_event_enqueue(void *port, const struct rte_event *ev)
return dpaa_event_enqueue_burst(port, ev, 1);
}
static void drain_4_bytes(int fd, fd_set *fdset)
{
if (FD_ISSET(fd, fdset)) {
/* drain 4 bytes */
uint32_t junk;
ssize_t sjunk = read(qman_thread_fd(), &junk, sizeof(junk));
if (sjunk != sizeof(junk))
DPAA_EVENTDEV_ERR("UIO irq read error");
}
}
static inline int
dpaa_event_dequeue_wait(uint64_t timeout_ticks)
{
int fd_qman, nfds;
int ret;
fd_set readset;
/* Go into (and back out of) IRQ mode for each select,
* it simplifies exit-path considerations and other
* potential nastiness.
*/
struct timeval tv = {
.tv_sec = timeout_ticks / 1000000,
.tv_usec = timeout_ticks % 1000000
};
fd_qman = qman_thread_fd();
nfds = fd_qman + 1;
FD_ZERO(&readset);
FD_SET(fd_qman, &readset);
qman_irqsource_add(QM_PIRQ_DQRI);
ret = select(nfds, &readset, NULL, NULL, &tv);
if (ret < 0)
return ret;
/* Calling irqsource_remove() prior to thread_irq()
* means thread_irq() will not process whatever caused
* the interrupts, however it does ensure that, once
* thread_irq() re-enables interrupts, they won't fire
* again immediately.
*/
qman_irqsource_remove(~0);
drain_4_bytes(fd_qman, &readset);
qman_thread_irq();
return ret;
}
static uint16_t
dpaa_event_dequeue_burst(void *port, struct rte_event ev[],
uint16_t nb_events, uint64_t timeout_ticks)
@ -107,8 +170,8 @@ dpaa_event_dequeue_burst(void *port, struct rte_event ev[],
int ret;
u16 ch_id;
void *buffers[8];
u32 num_frames, i;
uint64_t wait_time, cur_ticks, start_ticks;
u32 num_frames, i, irq = 0;
uint64_t cur_ticks = 0, wait_time_ticks = 0;
struct dpaa_port *portal = (struct dpaa_port *)port;
struct rte_mbuf *mbuf;
@ -147,20 +210,21 @@ dpaa_event_dequeue_burst(void *port, struct rte_event ev[],
}
DPAA_PER_LCORE_DQRR_HELD = 0;
if (portal->timeout == DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID)
wait_time = timeout_ticks;
if (timeout_ticks)
wait_time_ticks = timeout_ticks;
else
wait_time = portal->timeout;
wait_time_ticks = portal->timeout_us;
/* Lets dequeue the frames */
start_ticks = rte_get_timer_cycles();
wait_time += start_ticks;
wait_time_ticks += rte_get_timer_cycles();
do {
/* Lets dequeue the frames */
num_frames = qman_portal_dequeue(ev, nb_events, buffers);
if (num_frames != 0)
if (irq)
irq = 0;
if (num_frames)
break;
cur_ticks = rte_get_timer_cycles();
} while (cur_ticks < wait_time);
} while (cur_ticks < wait_time_ticks);
return num_frames;
}
@ -171,6 +235,86 @@ dpaa_event_dequeue(void *port, struct rte_event *ev, uint64_t timeout_ticks)
return dpaa_event_dequeue_burst(port, ev, 1, timeout_ticks);
}
static uint16_t
dpaa_event_dequeue_burst_intr(void *port, struct rte_event ev[],
uint16_t nb_events, uint64_t timeout_ticks)
{
int ret;
u16 ch_id;
void *buffers[8];
u32 num_frames, i, irq = 0;
uint64_t cur_ticks = 0, wait_time_ticks = 0;
struct dpaa_port *portal = (struct dpaa_port *)port;
struct rte_mbuf *mbuf;
if (unlikely(!RTE_PER_LCORE(dpaa_io))) {
/* Affine current thread context to a qman portal */
ret = rte_dpaa_portal_init((void *)0);
if (ret) {
DPAA_EVENTDEV_ERR("Unable to initialize portal");
return ret;
}
}
if (unlikely(!portal->is_port_linked)) {
/*
* Affine event queue for current thread context
* to a qman portal.
*/
for (i = 0; i < portal->num_linked_evq; i++) {
ch_id = portal->evq_info[i].ch_id;
dpaa_eventq_portal_add(ch_id);
}
portal->is_port_linked = true;
}
/* Check if there are atomic contexts to be released */
i = 0;
while (DPAA_PER_LCORE_DQRR_SIZE) {
if (DPAA_PER_LCORE_DQRR_HELD & (1 << i)) {
qman_dca_index(i, 0);
mbuf = DPAA_PER_LCORE_DQRR_MBUF(i);
mbuf->seqn = DPAA_INVALID_MBUF_SEQN;
DPAA_PER_LCORE_DQRR_HELD &= ~(1 << i);
DPAA_PER_LCORE_DQRR_SIZE--;
}
i++;
}
DPAA_PER_LCORE_DQRR_HELD = 0;
if (timeout_ticks)
wait_time_ticks = timeout_ticks;
else
wait_time_ticks = portal->timeout_us;
do {
/* Lets dequeue the frames */
num_frames = qman_portal_dequeue(ev, nb_events, buffers);
if (irq)
irq = 0;
if (num_frames)
break;
if (wait_time_ticks) { /* wait for time */
if (dpaa_event_dequeue_wait(wait_time_ticks) > 0) {
irq = 1;
continue;
}
break; /* no event after waiting */
}
cur_ticks = rte_get_timer_cycles();
} while (cur_ticks < wait_time_ticks);
return num_frames;
}
static uint16_t
dpaa_event_dequeue_intr(void *port,
struct rte_event *ev,
uint64_t timeout_ticks)
{
return dpaa_event_dequeue_burst_intr(port, ev, 1, timeout_ticks);
}
static void
dpaa_event_dev_info_get(struct rte_eventdev *dev,
struct rte_event_dev_info *dev_info)
@ -184,7 +328,7 @@ dpaa_event_dev_info_get(struct rte_eventdev *dev,
dev_info->max_dequeue_timeout_ns =
DPAA_EVENT_MAX_DEQUEUE_TIMEOUT;
dev_info->dequeue_timeout_ns =
DPAA_EVENT_MIN_DEQUEUE_TIMEOUT;
DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS;
dev_info->max_event_queues =
DPAA_EVENT_MAX_QUEUES;
dev_info->max_event_queue_flows =
@ -230,15 +374,6 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev)
priv->nb_event_port_enqueue_depth = conf->nb_event_port_enqueue_depth;
priv->event_dev_cfg = conf->event_dev_cfg;
/* Check dequeue timeout method is per dequeue or global */
if (priv->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
/*
* Use timeout value as given in dequeue operation.
* So invalidating this timetout value.
*/
priv->dequeue_timeout_ns = 0;
}
ch_id = rte_malloc("dpaa-channels",
sizeof(uint32_t) * priv->nb_event_queues,
RTE_CACHE_LINE_SIZE);
@ -260,24 +395,35 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev)
/* Lets prepare event ports */
memset(&priv->ports[0], 0,
sizeof(struct dpaa_port) * priv->nb_event_ports);
/* Check dequeue timeout method is per dequeue or global */
if (priv->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
for (i = 0; i < priv->nb_event_ports; i++) {
priv->ports[i].timeout =
DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID;
}
} else if (priv->dequeue_timeout_ns == 0) {
for (i = 0; i < priv->nb_event_ports; i++) {
dpaa_event_dequeue_timeout_ticks(NULL,
DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS,
&priv->ports[i].timeout);
}
/*
* Use timeout value as given in dequeue operation.
* So invalidating this timeout value.
*/
priv->dequeue_timeout_ns = 0;
} else if (conf->dequeue_timeout_ns == 0) {
priv->dequeue_timeout_ns = DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS;
} else {
priv->dequeue_timeout_ns = conf->dequeue_timeout_ns;
}
for (i = 0; i < priv->nb_event_ports; i++) {
dpaa_event_dequeue_timeout_ticks(NULL,
priv->dequeue_timeout_ns,
&priv->ports[i].timeout);
if (priv->intr_mode) {
priv->ports[i].timeout_us =
priv->dequeue_timeout_ns/1000;
} else {
uint64_t cycles_per_second;
cycles_per_second = rte_get_timer_hz();
priv->ports[i].timeout_us =
(priv->dequeue_timeout_ns * cycles_per_second)
/ NS_PER_S;
}
}
/*
* TODO: Currently portals are affined with threads. Maximum threads
* can be created equals to number of lcore.
@ -454,6 +600,7 @@ dpaa_event_port_unlink(struct rte_eventdev *dev, void *port,
event_queue->event_port = NULL;
}
if (event_port->num_linked_evq)
event_port->num_linked_evq = event_port->num_linked_evq - i;
return (int)i;
@ -593,8 +740,44 @@ static struct rte_eventdev_ops dpaa_eventdev_ops = {
.eth_rx_adapter_stop = dpaa_event_eth_rx_adapter_stop,
};
static int flag_check_handler(__rte_unused const char *key,
const char *value, __rte_unused void *opaque)
{
if (strcmp(value, "1"))
return -1;
return 0;
}
static int
dpaa_event_dev_create(const char *name)
dpaa_event_check_flags(const char *params)
{
struct rte_kvargs *kvlist;
if (params == NULL || params[0] == '\0')
return 0;
kvlist = rte_kvargs_parse(params, NULL);
if (kvlist == NULL)
return 0;
if (!rte_kvargs_count(kvlist, DISABLE_INTR_MODE)) {
rte_kvargs_free(kvlist);
return 0;
}
/* INTR MODE is disabled when there's key-value pair: disable_intr = 1*/
if (rte_kvargs_process(kvlist, DISABLE_INTR_MODE,
flag_check_handler, NULL) < 0) {
rte_kvargs_free(kvlist);
return 0;
}
rte_kvargs_free(kvlist);
return 1;
}
static int
dpaa_event_dev_create(const char *name, const char *params)
{
struct rte_eventdev *eventdev;
struct dpaa_eventdev *priv;
@ -606,18 +789,27 @@ dpaa_event_dev_create(const char *name)
DPAA_EVENTDEV_ERR("Failed to create eventdev vdev %s", name);
goto fail;
}
priv = eventdev->data->dev_private;
eventdev->dev_ops = &dpaa_eventdev_ops;
eventdev->enqueue = dpaa_event_enqueue;
eventdev->enqueue_burst = dpaa_event_enqueue_burst;
if (dpaa_event_check_flags(params)) {
eventdev->dequeue = dpaa_event_dequeue;
eventdev->dequeue_burst = dpaa_event_dequeue_burst;
} else {
priv->intr_mode = 1;
eventdev->dev_ops->timeout_ticks =
dpaa_event_dequeue_timeout_ticks_intr;
eventdev->dequeue = dpaa_event_dequeue_intr;
eventdev->dequeue_burst = dpaa_event_dequeue_burst_intr;
}
/* For secondary processes, the primary has done all the work */
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
priv = eventdev->data->dev_private;
priv->max_event_queues = DPAA_EVENT_MAX_QUEUES;
return 0;
@ -629,11 +821,14 @@ static int
dpaa_event_dev_probe(struct rte_vdev_device *vdev)
{
const char *name;
const char *params;
name = rte_vdev_device_name(vdev);
DPAA_EVENTDEV_INFO("Initializing %s", name);
return dpaa_event_dev_create(name);
params = rte_vdev_device_args(vdev);
return dpaa_event_dev_create(name, params);
}
static int
@ -653,3 +848,5 @@ static struct rte_vdev_driver vdev_eventdev_dpaa_pmd = {
};
RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DPAA_PMD, vdev_eventdev_dpaa_pmd);
RTE_PMD_REGISTER_PARAM_STRING(EVENTDEV_NAME_DPAA_PMD,
DISABLE_INTR_MODE "=<int>");

View File

@ -12,8 +12,8 @@
#define EVENTDEV_NAME_DPAA_PMD event_dpaa1
#define DPAA_EVENT_MAX_PORTS 8
#define DPAA_EVENT_MAX_QUEUES 16
#define DPAA_EVENT_MAX_PORTS 4
#define DPAA_EVENT_MAX_QUEUES 8
#define DPAA_EVENT_MIN_DEQUEUE_TIMEOUT 1
#define DPAA_EVENT_MAX_DEQUEUE_TIMEOUT (UINT32_MAX - 1)
#define DPAA_EVENT_MAX_QUEUE_FLOWS 2048
@ -21,7 +21,7 @@
#define DPAA_EVENT_MAX_EVENT_PRIORITY_LEVELS 0
#define DPAA_EVENT_MAX_EVENT_PORT RTE_MIN(RTE_MAX_LCORE, INT8_MAX)
#define DPAA_EVENT_MAX_PORT_DEQUEUE_DEPTH 8
#define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS 100UL
#define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS 100000UL
#define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID ((uint64_t)-1)
#define DPAA_EVENT_MAX_PORT_ENQUEUE_DEPTH 1
#define DPAA_EVENT_MAX_NUM_EVENTS (INT32_MAX - 1)
@ -54,7 +54,7 @@ struct dpaa_port {
struct dpaa_eventq evq_info[DPAA_EVENT_MAX_QUEUES];
uint8_t num_linked_evq;
uint8_t is_port_linked;
uint64_t timeout;
uint64_t timeout_us;
};
struct dpaa_eventdev {
@ -65,7 +65,7 @@ struct dpaa_eventdev {
uint8_t max_event_queues;
uint8_t nb_event_queues;
uint8_t nb_event_ports;
uint8_t resvd;
uint8_t intr_mode;
uint32_t nb_event_queue_flows;
uint32_t nb_event_port_dequeue_depth;
uint32_t nb_event_port_enqueue_depth;