event/sw: support event ports

Add in the data-structures for the ports used by workers to send
packets to/from the scheduler. Also add in the functions to
create/destroy those ports.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
This commit is contained in:
Bruce Richardson 2017-03-30 20:30:36 +01:00 committed by Jerin Jacob
parent 5ffb2f142d
commit 98dc055fd4
3 changed files with 346 additions and 0 deletions

View File

@ -0,0 +1,185 @@
/*-
* BSD LICENSE
*
* Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Intel Corporation nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Generic ring structure for passing events from one core to another.
*
* Used by the software scheduler for the producer and consumer rings for
* each port, i.e. for passing events from worker cores to scheduler and
* vice-versa. Designed for single-producer, single-consumer use with two
* cores working on each ring.
*/
#ifndef _EVENT_RING_
#define _EVENT_RING_
#include <stdint.h>
#include <rte_common.h>
#include <rte_memory.h>
#include <rte_malloc.h>
#define QE_RING_NAMESIZE 32
struct qe_ring {
char name[QE_RING_NAMESIZE] __rte_cache_aligned;
uint32_t ring_size; /* size of memory block allocated to the ring */
uint32_t mask; /* mask for read/write values == ring_size -1 */
uint32_t size; /* actual usable space in the ring */
volatile uint32_t write_idx __rte_cache_aligned;
volatile uint32_t read_idx __rte_cache_aligned;
struct rte_event ring[0] __rte_cache_aligned;
};
#ifndef force_inline
#define force_inline inline __attribute__((always_inline))
#endif
static inline struct qe_ring *
qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
{
struct qe_ring *retval;
const uint32_t ring_size = rte_align32pow2(size + 1);
size_t memsize = sizeof(*retval) +
(ring_size * sizeof(retval->ring[0]));
retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id);
if (retval == NULL)
goto end;
snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name);
retval->ring_size = ring_size;
retval->mask = ring_size - 1;
retval->size = size;
end:
return retval;
}
static inline void
qe_ring_destroy(struct qe_ring *r)
{
rte_free(r);
}
static force_inline unsigned int
qe_ring_count(const struct qe_ring *r)
{
return r->write_idx - r->read_idx;
}
static force_inline unsigned int
qe_ring_free_count(const struct qe_ring *r)
{
return r->size - qe_ring_count(r);
}
static force_inline unsigned int
qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes,
unsigned int nb_qes, uint16_t *free_count)
{
const uint32_t size = r->size;
const uint32_t mask = r->mask;
const uint32_t read = r->read_idx;
uint32_t write = r->write_idx;
const uint32_t space = read + size - write;
uint32_t i;
if (space < nb_qes)
nb_qes = space;
for (i = 0; i < nb_qes; i++, write++)
r->ring[write & mask] = qes[i];
rte_smp_wmb();
if (nb_qes != 0)
r->write_idx = write;
*free_count = space - nb_qes;
return nb_qes;
}
static force_inline unsigned int
qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes,
unsigned int nb_qes, uint8_t *ops)
{
const uint32_t size = r->size;
const uint32_t mask = r->mask;
const uint32_t read = r->read_idx;
uint32_t write = r->write_idx;
const uint32_t space = read + size - write;
uint32_t i;
if (space < nb_qes)
nb_qes = space;
for (i = 0; i < nb_qes; i++, write++) {
r->ring[write & mask] = qes[i];
r->ring[write & mask].op = ops[i];
}
rte_smp_wmb();
if (nb_qes != 0)
r->write_idx = write;
return nb_qes;
}
static force_inline unsigned int
qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,
unsigned int nb_qes)
{
const uint32_t mask = r->mask;
uint32_t read = r->read_idx;
const uint32_t write = r->write_idx;
const uint32_t items = write - read;
uint32_t i;
if (items < nb_qes)
nb_qes = items;
for (i = 0; i < nb_qes; i++, read++)
qes[i] = r->ring[read & mask];
rte_smp_rmb();
if (nb_qes != 0)
r->read_idx += nb_qes;
return nb_qes;
}
#endif

View File

@ -39,12 +39,91 @@
#include "sw_evdev.h"
#include "iq_ring.h"
#include "event_ring.h"
#define EVENTDEV_NAME_SW_PMD event_sw
#define NUMA_NODE_ARG "numa_node"
#define SCHED_QUANTA_ARG "sched_quanta"
#define CREDIT_QUANTA_ARG "credit_quanta"
static void
sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
static int
sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
const struct rte_event_port_conf *conf)
{
struct sw_evdev *sw = sw_pmd_priv(dev);
struct sw_port *p = &sw->ports[port_id];
char buf[QE_RING_NAMESIZE];
unsigned int i;
struct rte_event_dev_info info;
sw_info_get(dev, &info);
/* detect re-configuring and return credits to instance if needed */
if (p->initialized) {
/* taking credits from pool is done one quanta at a time, and
* credits may be spend (counted in p->inflights) or still
* available in the port (p->inflight_credits). We must return
* the sum to no leak credits
*/
int possible_inflights = p->inflight_credits + p->inflights;
rte_atomic32_sub(&sw->inflights, possible_inflights);
}
*p = (struct sw_port){0}; /* zero entire structure */
p->id = port_id;
p->sw = sw;
snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
"rx_worker_ring");
p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
dev->data->socket_id);
if (p->rx_worker_ring == NULL) {
SW_LOG_ERR("Error creating RX worker ring for port %d\n",
port_id);
return -1;
}
p->inflight_max = conf->new_event_threshold;
snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
"cq_worker_ring");
p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth,
dev->data->socket_id);
if (p->cq_worker_ring == NULL) {
qe_ring_destroy(p->rx_worker_ring);
SW_LOG_ERR("Error creating CQ worker ring for port %d\n",
port_id);
return -1;
}
sw->cq_ring_space[port_id] = conf->dequeue_depth;
/* set hist list contents to empty */
for (i = 0; i < SW_PORT_HIST_LIST; i++) {
p->hist_list[i].fid = -1;
p->hist_list[i].qid = -1;
}
dev->data->ports[port_id] = p;
rte_smp_wmb();
p->initialized = 1;
return 0;
}
static void
sw_port_release(void *port)
{
struct sw_port *p = (void *)port;
if (p == NULL)
return;
qe_ring_destroy(p->rx_worker_ring);
qe_ring_destroy(p->cq_worker_ring);
memset(p, 0, sizeof(*p));
}
static int32_t
qid_init(struct sw_evdev *sw, unsigned int idx, int type,
const struct rte_event_queue_conf *queue_conf)
@ -319,6 +398,8 @@ sw_probe(const char *name, const char *params)
.queue_setup = sw_queue_setup,
.queue_release = sw_queue_release,
.port_def_conf = sw_port_def_conf,
.port_setup = sw_port_setup,
.port_release = sw_port_release,
};
static const char *const args[] = {

View File

@ -49,6 +49,13 @@
#define MAX_SW_PROD_Q_DEPTH 4096
#define SW_FRAGMENTS_MAX 16
/* report dequeue burst sizes in buckets */
#define SW_DEQ_STAT_BUCKET_SHIFT 2
/* how many packets pulled from port by sched */
#define SCHED_DEQUEUE_BURST_SIZE 32
#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
#define EVENTDEV_NAME_SW_PMD event_sw
#define SW_PMD_NAME RTE_STR(event_sw)
@ -129,12 +136,82 @@ struct sw_qid {
uint8_t priority;
};
struct sw_hist_list_entry {
int32_t qid;
int32_t fid;
struct reorder_buffer_entry *rob_entry;
};
struct sw_evdev;
struct sw_port {
/* new enqueue / dequeue API doesn't have an instance pointer, only the
* pointer to the port being enqueue/dequeued from
*/
struct sw_evdev *sw;
/* set when the port is initialized */
uint8_t initialized;
/* A numeric ID for the port */
uint8_t id;
int16_t is_directed; /** Takes from a single directed QID */
/**
* For loadbalanced we can optimise pulling packets from
* producers if there is no reordering involved
*/
int16_t num_ordered_qids;
/** Ring and buffer for pulling events from workers for scheduling */
struct qe_ring *rx_worker_ring __rte_cache_aligned;
/** Ring and buffer for pushing packets to workers after scheduling */
struct qe_ring *cq_worker_ring;
/* hole */
/* num releases yet to be completed on this port */
uint16_t outstanding_releases __rte_cache_aligned;
uint16_t inflight_max; /* app requested max inflights for this port */
uint16_t inflight_credits; /* num credits this port has right now */
uint16_t last_dequeue_burst_sz; /* how big the burst was */
uint64_t last_dequeue_ticks; /* used to track burst processing time */
uint64_t avg_pkt_ticks; /* tracks average over NUM_SAMPLES burst */
uint64_t total_polls; /* how many polls were counted in stats */
uint64_t zero_polls; /* tracks polls returning nothing */
uint32_t poll_buckets[MAX_SW_CONS_Q_DEPTH >> SW_DEQ_STAT_BUCKET_SHIFT];
/* bucket values in 4s for shorter reporting */
/* History list structs, containing info on pkts egressed to worker */
uint16_t hist_head __rte_cache_aligned;
uint16_t hist_tail;
uint16_t inflights;
struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST];
/* track packets in and out of this port */
struct sw_point_stats stats;
uint32_t pp_buf_start;
uint32_t pp_buf_count;
uint16_t cq_buf_count;
struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE];
struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];
uint8_t num_qids_mapped;
};
struct sw_evdev {
struct rte_eventdev_data *data;
uint32_t port_count;
uint32_t qid_count;
/* Contains all ports - load balanced and directed */
struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
rte_atomic32_t inflights __rte_cache_aligned;
/*
* max events in this instance. Cached here for performance.
* (also available in data->conf.nb_events_limit)
@ -144,6 +221,9 @@ struct sw_evdev {
/* Internal queues - one per logical queue */
struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
/* Cache how many packets are in each cq */
uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
int32_t sched_quanta;
uint32_t credit_update_quanta;