event/dsw: add event scheduling and device start/stop
With this patch, the DSW event device can be started and stopped, and also supports scheduling events between ports. Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com> Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
This commit is contained in:
parent
0cb8b0a03e
commit
1c8e3caa3b
@ -21,6 +21,6 @@ LIBABIVER := 1
|
||||
|
||||
EXPORT_MAP := rte_pmd_dsw_event_version.map
|
||||
|
||||
SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
|
||||
SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c
|
||||
|
||||
include $(RTE_SDK)/mk/rte.lib.mk
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <rte_eventdev_pmd.h>
|
||||
#include <rte_eventdev_pmd_vdev.h>
|
||||
#include <rte_random.h>
|
||||
|
||||
#include "dsw_evdev.h"
|
||||
|
||||
@ -201,10 +202,125 @@ dsw_configure(const struct rte_eventdev *dev)
|
||||
{
|
||||
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
|
||||
const struct rte_event_dev_config *conf = &dev->data->dev_conf;
|
||||
int32_t min_max_in_flight;
|
||||
|
||||
dsw->num_ports = conf->nb_event_ports;
|
||||
dsw->num_queues = conf->nb_event_queues;
|
||||
|
||||
/* Avoid a situation where consumer ports are holding all the
|
||||
* credits, without making use of them.
|
||||
*/
|
||||
min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
|
||||
|
||||
dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
initial_flow_to_port_assignment(struct dsw_evdev *dsw)
|
||||
{
|
||||
uint8_t queue_id;
|
||||
for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
|
||||
struct dsw_queue *queue = &dsw->queues[queue_id];
|
||||
uint16_t flow_hash;
|
||||
for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
|
||||
uint8_t port_idx =
|
||||
rte_rand() % queue->num_serving_ports;
|
||||
uint8_t port_id =
|
||||
queue->serving_ports[port_idx];
|
||||
dsw->queues[queue_id].flow_to_port_map[flow_hash] =
|
||||
port_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
dsw_start(struct rte_eventdev *dev)
|
||||
{
|
||||
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
|
||||
|
||||
rte_atomic32_init(&dsw->credits_on_loan);
|
||||
|
||||
initial_flow_to_port_assignment(dsw);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
|
||||
eventdev_stop_flush_t flush, void *flush_arg)
|
||||
{
|
||||
uint16_t i;
|
||||
|
||||
for (i = 0; i < buf_len; i++)
|
||||
flush(dev_id, buf[i], flush_arg);
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
|
||||
eventdev_stop_flush_t flush, void *flush_arg)
|
||||
{
|
||||
uint16_t dport_id;
|
||||
|
||||
for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
|
||||
if (dport_id != port->id)
|
||||
dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
|
||||
port->out_buffer_len[dport_id],
|
||||
flush, flush_arg);
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
|
||||
eventdev_stop_flush_t flush, void *flush_arg)
|
||||
{
|
||||
struct rte_event ev;
|
||||
|
||||
while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
|
||||
flush(dev_id, ev, flush_arg);
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
|
||||
eventdev_stop_flush_t flush, void *flush_arg)
|
||||
{
|
||||
uint16_t port_id;
|
||||
|
||||
if (flush == NULL)
|
||||
return;
|
||||
|
||||
for (port_id = 0; port_id < dsw->num_ports; port_id++) {
|
||||
struct dsw_port *port = &dsw->ports[port_id];
|
||||
|
||||
dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
|
||||
dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_stop(struct rte_eventdev *dev)
|
||||
{
|
||||
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
|
||||
uint8_t dev_id;
|
||||
eventdev_stop_flush_t flush;
|
||||
void *flush_arg;
|
||||
|
||||
dev_id = dev->data->dev_id;
|
||||
flush = dev->dev_ops->dev_stop_flush;
|
||||
flush_arg = dev->data->dev_stop_flush_arg;
|
||||
|
||||
dsw_drain(dev_id, dsw, flush, flush_arg);
|
||||
}
|
||||
|
||||
static int
|
||||
dsw_close(struct rte_eventdev *dev)
|
||||
{
|
||||
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
|
||||
|
||||
dsw->num_ports = 0;
|
||||
dsw->num_queues = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -219,6 +335,9 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
|
||||
.port_unlink = dsw_port_unlink,
|
||||
.dev_infos_get = dsw_info_get,
|
||||
.dev_configure = dsw_configure,
|
||||
.dev_start = dsw_start,
|
||||
.dev_stop = dsw_stop,
|
||||
.dev_close = dsw_close
|
||||
};
|
||||
|
||||
static int
|
||||
@ -236,6 +355,12 @@ dsw_probe(struct rte_vdev_device *vdev)
|
||||
return -EFAULT;
|
||||
|
||||
dev->dev_ops = &dsw_evdev_ops;
|
||||
dev->enqueue = dsw_event_enqueue;
|
||||
dev->enqueue_burst = dsw_event_enqueue_burst;
|
||||
dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
|
||||
dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
|
||||
dev->dequeue = dsw_event_dequeue;
|
||||
dev->dequeue_burst = dsw_event_dequeue_burst;
|
||||
|
||||
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
|
||||
return 0;
|
||||
|
@ -14,6 +14,7 @@
|
||||
#define DSW_MAX_PORTS (64)
|
||||
#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
|
||||
#define DSW_MAX_PORT_ENQUEUE_DEPTH (128)
|
||||
#define DSW_MAX_PORT_OUT_BUFFER (32)
|
||||
|
||||
#define DSW_MAX_QUEUES (16)
|
||||
|
||||
@ -24,6 +25,24 @@
|
||||
#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
|
||||
#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
|
||||
|
||||
/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,
|
||||
* but the 'dsw' scheduler (more or less) randomly assign flow id to
|
||||
* events on parallel queues, to be able to reuse some of the
|
||||
* migration mechanism and scheduling logic from
|
||||
* RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a
|
||||
* particular port, the likely-hood of events being scheduled to this
|
||||
* port is reduced, and thus a kind of statistical load balancing is
|
||||
* achieved.
|
||||
*/
|
||||
#define DSW_PARALLEL_FLOWS (1024)
|
||||
|
||||
/* Avoid making small 'loans' from the central in-flight event credit
|
||||
* pool, to improve efficiency.
|
||||
*/
|
||||
#define DSW_MIN_CREDIT_LOAN (64)
|
||||
#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)
|
||||
#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)
|
||||
|
||||
/* The rings are dimensioned so that all in-flight events can reside
|
||||
* on any one of the port rings, to avoid the trouble of having to
|
||||
* care about the case where there's no room on the destination port's
|
||||
@ -44,8 +63,17 @@ struct dsw_port {
|
||||
uint16_t dequeue_depth;
|
||||
uint16_t enqueue_depth;
|
||||
|
||||
int32_t inflight_credits;
|
||||
|
||||
int32_t new_event_threshold;
|
||||
|
||||
uint16_t pending_releases;
|
||||
|
||||
uint16_t next_parallel_flow_id;
|
||||
|
||||
uint16_t out_buffer_len[DSW_MAX_PORTS];
|
||||
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
|
||||
|
||||
struct rte_event_ring *in_ring __rte_cache_aligned;
|
||||
} __rte_cache_aligned;
|
||||
|
||||
@ -53,6 +81,8 @@ struct dsw_queue {
|
||||
uint8_t schedule_type;
|
||||
uint8_t serving_ports[DSW_MAX_PORTS];
|
||||
uint16_t num_serving_ports;
|
||||
|
||||
uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;
|
||||
};
|
||||
|
||||
struct dsw_evdev {
|
||||
@ -62,12 +92,38 @@ struct dsw_evdev {
|
||||
uint16_t num_ports;
|
||||
struct dsw_queue queues[DSW_MAX_QUEUES];
|
||||
uint8_t num_queues;
|
||||
int32_t max_inflight;
|
||||
|
||||
rte_atomic32_t credits_on_loan __rte_cache_aligned;
|
||||
};
|
||||
|
||||
uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
|
||||
uint16_t dsw_event_enqueue_burst(void *port,
|
||||
const struct rte_event events[],
|
||||
uint16_t events_len);
|
||||
uint16_t dsw_event_enqueue_new_burst(void *port,
|
||||
const struct rte_event events[],
|
||||
uint16_t events_len);
|
||||
uint16_t dsw_event_enqueue_forward_burst(void *port,
|
||||
const struct rte_event events[],
|
||||
uint16_t events_len);
|
||||
|
||||
uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
|
||||
uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
|
||||
uint16_t num, uint64_t wait);
|
||||
|
||||
static inline struct dsw_evdev *
|
||||
dsw_pmd_priv(const struct rte_eventdev *eventdev)
|
||||
{
|
||||
return eventdev->data->dev_private;
|
||||
}
|
||||
|
||||
#define DSW_LOG_DP(level, fmt, args...) \
|
||||
RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt, \
|
||||
DSW_PMD_NAME, \
|
||||
__func__, __LINE__, ## args)
|
||||
|
||||
#define DSW_LOG_DP_PORT(level, port_id, fmt, args...) \
|
||||
DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args)
|
||||
|
||||
#endif
|
||||
|
359
drivers/event/dsw/dsw_event.c
Normal file
359
drivers/event/dsw/dsw_event.c
Normal file
@ -0,0 +1,359 @@
|
||||
/* SPDX-License-Identifier: BSD-3-Clause
|
||||
* Copyright(c) 2018 Ericsson AB
|
||||
*/
|
||||
|
||||
#include "dsw_evdev.h"
|
||||
|
||||
#include <stdbool.h>
|
||||
|
||||
#include <rte_atomic.h>
|
||||
#include <rte_random.h>
|
||||
|
||||
static bool
|
||||
dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
|
||||
int32_t credits)
|
||||
{
|
||||
int32_t inflight_credits = port->inflight_credits;
|
||||
int32_t missing_credits = credits - inflight_credits;
|
||||
int32_t total_on_loan;
|
||||
int32_t available;
|
||||
int32_t acquired_credits;
|
||||
int32_t new_total_on_loan;
|
||||
|
||||
if (likely(missing_credits <= 0)) {
|
||||
port->inflight_credits -= credits;
|
||||
return true;
|
||||
}
|
||||
|
||||
total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
|
||||
available = dsw->max_inflight - total_on_loan;
|
||||
acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
|
||||
|
||||
if (available < acquired_credits)
|
||||
return false;
|
||||
|
||||
/* This is a race, no locks are involved, and thus some other
|
||||
* thread can allocate tokens in between the check and the
|
||||
* allocation.
|
||||
*/
|
||||
new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
|
||||
acquired_credits);
|
||||
|
||||
if (unlikely(new_total_on_loan > dsw->max_inflight)) {
|
||||
/* Some other port took the last credits */
|
||||
rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
|
||||
return false;
|
||||
}
|
||||
|
||||
DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
|
||||
acquired_credits);
|
||||
|
||||
port->inflight_credits += acquired_credits;
|
||||
port->inflight_credits -= credits;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
|
||||
int32_t credits)
|
||||
{
|
||||
port->inflight_credits += credits;
|
||||
|
||||
if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
|
||||
int32_t leave_credits = DSW_PORT_MIN_CREDITS;
|
||||
int32_t return_credits =
|
||||
port->inflight_credits - leave_credits;
|
||||
|
||||
port->inflight_credits = leave_credits;
|
||||
|
||||
rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
|
||||
|
||||
DSW_LOG_DP_PORT(DEBUG, port->id,
|
||||
"Returned %d tokens to pool.\n",
|
||||
return_credits);
|
||||
}
|
||||
}
|
||||
|
||||
static uint8_t
|
||||
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
|
||||
{
|
||||
struct dsw_queue *queue = &dsw->queues[queue_id];
|
||||
uint8_t port_id;
|
||||
|
||||
if (queue->num_serving_ports > 1)
|
||||
port_id = queue->flow_to_port_map[flow_hash];
|
||||
else
|
||||
/* A single-link queue, or atomic/ordered/parallel but
|
||||
* with just a single serving port.
|
||||
*/
|
||||
port_id = queue->serving_ports[0];
|
||||
|
||||
DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
|
||||
"to port %d.\n", queue_id, flow_hash, port_id);
|
||||
|
||||
return port_id;
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
|
||||
uint8_t dest_port_id)
|
||||
{
|
||||
struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
|
||||
uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
|
||||
struct rte_event *buffer = source_port->out_buffer[dest_port_id];
|
||||
uint16_t enqueued = 0;
|
||||
|
||||
if (*buffer_len == 0)
|
||||
return;
|
||||
|
||||
/* The rings are dimensioned to fit all in-flight events (even
|
||||
* on a single ring), so looping will work.
|
||||
*/
|
||||
do {
|
||||
enqueued +=
|
||||
rte_event_ring_enqueue_burst(dest_port->in_ring,
|
||||
buffer+enqueued,
|
||||
*buffer_len-enqueued,
|
||||
NULL);
|
||||
} while (unlikely(enqueued != *buffer_len));
|
||||
|
||||
(*buffer_len) = 0;
|
||||
}
|
||||
|
||||
static uint16_t
|
||||
dsw_port_get_parallel_flow_id(struct dsw_port *port)
|
||||
{
|
||||
uint16_t flow_id = port->next_parallel_flow_id;
|
||||
|
||||
port->next_parallel_flow_id =
|
||||
(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
|
||||
|
||||
return flow_id;
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
|
||||
uint8_t dest_port_id, const struct rte_event *event)
|
||||
{
|
||||
struct rte_event *buffer = source_port->out_buffer[dest_port_id];
|
||||
uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
|
||||
|
||||
if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
|
||||
dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
|
||||
|
||||
buffer[*buffer_len] = *event;
|
||||
|
||||
(*buffer_len)++;
|
||||
}
|
||||
|
||||
#define DSW_FLOW_ID_BITS (24)
|
||||
static uint16_t
|
||||
dsw_flow_id_hash(uint32_t flow_id)
|
||||
{
|
||||
uint16_t hash = 0;
|
||||
uint16_t offset = 0;
|
||||
|
||||
do {
|
||||
hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
|
||||
offset += DSW_MAX_FLOWS_BITS;
|
||||
} while (offset < DSW_FLOW_ID_BITS);
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
|
||||
struct rte_event event)
|
||||
{
|
||||
uint8_t dest_port_id;
|
||||
|
||||
event.flow_id = dsw_port_get_parallel_flow_id(source_port);
|
||||
|
||||
dest_port_id = dsw_schedule(dsw, event.queue_id,
|
||||
dsw_flow_id_hash(event.flow_id));
|
||||
|
||||
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
|
||||
const struct rte_event *event)
|
||||
{
|
||||
uint16_t flow_hash;
|
||||
uint8_t dest_port_id;
|
||||
|
||||
if (unlikely(dsw->queues[event->queue_id].schedule_type ==
|
||||
RTE_SCHED_TYPE_PARALLEL)) {
|
||||
dsw_port_buffer_parallel(dsw, source_port, *event);
|
||||
return;
|
||||
}
|
||||
|
||||
flow_hash = dsw_flow_id_hash(event->flow_id);
|
||||
|
||||
dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
|
||||
|
||||
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
|
||||
}
|
||||
|
||||
static void
|
||||
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
|
||||
{
|
||||
uint16_t dest_port_id;
|
||||
|
||||
for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
|
||||
dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_enqueue(void *port, const struct rte_event *ev)
|
||||
{
|
||||
return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
|
||||
}
|
||||
|
||||
static __rte_always_inline uint16_t
|
||||
dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
|
||||
uint16_t events_len, bool op_types_known,
|
||||
uint16_t num_new, uint16_t num_release,
|
||||
uint16_t num_non_release)
|
||||
{
|
||||
struct dsw_port *source_port = port;
|
||||
struct dsw_evdev *dsw = source_port->dsw;
|
||||
bool enough_credits;
|
||||
uint16_t i;
|
||||
|
||||
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
|
||||
"events to port %d.\n", events_len, source_port->id);
|
||||
|
||||
/* XXX: For performance (=ring efficiency) reasons, the
|
||||
* scheduler relies on internal non-ring buffers instead of
|
||||
* immediately sending the event to the destination ring. For
|
||||
* a producer that doesn't intend to produce or consume any
|
||||
* more events, the scheduler provides a way to flush the
|
||||
* buffer, by means of doing an enqueue of zero events. In
|
||||
* addition, a port cannot be left "unattended" (e.g. unused)
|
||||
* for long periods of time, since that would stall
|
||||
* migration. Eventdev API extensions to provide a cleaner way
|
||||
* to archieve both of these functions should be
|
||||
* considered.
|
||||
*/
|
||||
if (unlikely(events_len == 0)) {
|
||||
dsw_port_flush_out_buffers(dsw, source_port);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(events_len > source_port->enqueue_depth))
|
||||
events_len = source_port->enqueue_depth;
|
||||
|
||||
if (!op_types_known)
|
||||
for (i = 0; i < events_len; i++) {
|
||||
switch (events[i].op) {
|
||||
case RTE_EVENT_OP_RELEASE:
|
||||
num_release++;
|
||||
break;
|
||||
case RTE_EVENT_OP_NEW:
|
||||
num_new++;
|
||||
/* Falls through. */
|
||||
default:
|
||||
num_non_release++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Technically, we could allow the non-new events up to the
|
||||
* first new event in the array into the system, but for
|
||||
* simplicity reasons, we deny the whole burst if the port is
|
||||
* above the water mark.
|
||||
*/
|
||||
if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
|
||||
source_port->new_event_threshold))
|
||||
return 0;
|
||||
|
||||
enough_credits = dsw_port_acquire_credits(dsw, source_port,
|
||||
num_non_release);
|
||||
if (unlikely(!enough_credits))
|
||||
return 0;
|
||||
|
||||
source_port->pending_releases -= num_release;
|
||||
|
||||
for (i = 0; i < events_len; i++) {
|
||||
const struct rte_event *event = &events[i];
|
||||
|
||||
if (likely(num_release == 0 ||
|
||||
event->op != RTE_EVENT_OP_RELEASE))
|
||||
dsw_port_buffer_event(dsw, source_port, event);
|
||||
}
|
||||
|
||||
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
|
||||
"accepted.\n", num_non_release);
|
||||
|
||||
return num_non_release;
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_enqueue_burst(void *port, const struct rte_event events[],
|
||||
uint16_t events_len)
|
||||
{
|
||||
return dsw_event_enqueue_burst_generic(port, events, events_len, false,
|
||||
0, 0, 0);
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
|
||||
uint16_t events_len)
|
||||
{
|
||||
return dsw_event_enqueue_burst_generic(port, events, events_len, true,
|
||||
events_len, 0, events_len);
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
|
||||
uint16_t events_len)
|
||||
{
|
||||
return dsw_event_enqueue_burst_generic(port, events, events_len, true,
|
||||
0, 0, events_len);
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
|
||||
{
|
||||
return dsw_event_dequeue_burst(port, events, 1, wait);
|
||||
}
|
||||
|
||||
static uint16_t
|
||||
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
|
||||
uint16_t num)
|
||||
{
|
||||
return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
|
||||
}
|
||||
|
||||
uint16_t
|
||||
dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
|
||||
uint64_t wait __rte_unused)
|
||||
{
|
||||
struct dsw_port *source_port = port;
|
||||
struct dsw_evdev *dsw = source_port->dsw;
|
||||
uint16_t dequeued;
|
||||
|
||||
source_port->pending_releases = 0;
|
||||
|
||||
if (unlikely(num > source_port->dequeue_depth))
|
||||
num = source_port->dequeue_depth;
|
||||
|
||||
dequeued = dsw_port_dequeue_burst(source_port, events, num);
|
||||
|
||||
source_port->pending_releases = dequeued;
|
||||
|
||||
if (dequeued > 0) {
|
||||
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
|
||||
dequeued);
|
||||
|
||||
dsw_port_return_credits(dsw, source_port, dequeued);
|
||||
}
|
||||
/* XXX: Assuming the port can't produce any more work,
|
||||
* consider flushing the output buffer, on dequeued ==
|
||||
* 0.
|
||||
*/
|
||||
|
||||
return dequeued;
|
||||
}
|
@ -3,4 +3,4 @@
|
||||
|
||||
allow_experimental_apis = true
|
||||
deps += ['bus_vdev']
|
||||
sources = files('dsw_evdev.c')
|
||||
sources = files('dsw_evdev.c', 'dsw_event.c')
|
||||
|
Loading…
Reference in New Issue
Block a user