ring: introduce peek style API

For rings with producer/consumer in RTE_RING_SYNC_ST, RTE_RING_SYNC_MT_HTS
mode, provide an ability to split enqueue/dequeue operation
into two phases:
      - enqueue/dequeue start
      - enqueue/dequeue finish
That allows user to inspect objects in the ring without removing
them from it (aka MT safe peek).

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
This commit is contained in:
Konstantin Ananyev 2020-04-20 13:28:28 +01:00 committed by David Marchand
parent 1cf65e4341
commit 664ff4b172
8 changed files with 614 additions and 1 deletions

View File

@ -405,6 +405,45 @@ update and helps to improve ring enqueue/dequeue behavior in overcommitted
scenarios. Another advantage of fully serialized producer/consumer -
it provides the ability to implement MT safe peek API for rte_ring.
Ring Peek API
-------------
For ring with serialized producer/consumer (HTS sync mode) it is possible
to split public enqueue/dequeue API into two phases:
* enqueue/dequeue start
* enqueue/dequeue finish
That allows user to inspect objects in the ring without removing them
from it (aka MT safe peek) and reserve space for the objects in the ring
before actual enqueue.
Note that this API is available only for two sync modes:
* Single Producer/Single Consumer (SP/SC)
* Multi-producer/Multi-consumer with Head/Tail Sync (HTS)
It is a user responsibility to create/init ring with appropriate sync modes
selected. As an example of usage:
.. code-block:: c
/* read 1 elem from the ring: */
uint32_t n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL);
if (n != 0) {
/* examine object */
if (object_examine(obj) == KEEP)
/* decided to keep it in the ring. */
rte_ring_dequeue_finish(ring, 0);
else
/* decided to remove it from the ring. */
rte_ring_dequeue_finish(ring, n);
}
Note that between ``_start_`` and ``_finish_`` none other thread can proceed
with enqueue(/dequeue) operation till ``_finish_`` completes.
References
----------

View File

@ -56,13 +56,22 @@ New Features
Also, make sure to start the actual text at the margin.
=========================================================
* **New synchronization modes for rte_ring.**
* **Added new API for rte_ring.**
* New synchronization modes for rte_ring.
Introduced new optional MT synchronization modes for rte_ring:
Relaxed Tail Sync (RTS) mode and Head/Tail Sync (HTS) mode.
With these mode selected, rte_ring shows significant improvements for
average enqueue/dequeue times on overcommitted systems.
* Added peek style API for rte_ring.
For rings with producer/consumer in RTE_RING_SYNC_ST, RTE_RING_SYNC_MT_HTS
mode, provide an ability to split enqueue/dequeue operation into two phases
(enqueue/dequeue start; enqueue/dequeue finish). That allows user to inspect
objects in the ring without removing them from it (aka MT safe peek).
* **Updated Mellanox mlx5 driver.**
Updated Mellanox mlx5 driver with new features and improvements, including:

View File

@ -22,6 +22,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
rte_ring_c11_mem.h \
rte_ring_hts.h \
rte_ring_hts_c11_mem.h \
rte_ring_peek.h \
rte_ring_peek_c11_mem.h \
rte_ring_rts.h \
rte_ring_rts_c11_mem.h

View File

@ -9,5 +9,7 @@ headers = files('rte_ring.h',
'rte_ring_generic.h',
'rte_ring_hts.h',
'rte_ring_hts_c11_mem.h',
'rte_ring_peek.h',
'rte_ring_peek_c11_mem.h',
'rte_ring_rts.h',
'rte_ring_rts_c11_mem.h')

View File

@ -25,6 +25,9 @@
* - Multi- or single-producer enqueue.
* - Bulk dequeue.
* - Bulk enqueue.
* - Ability to select different sync modes for producer/consumer.
* - Dequeue start/finish (depending on consumer sync modes).
* - Enqueue start/finish (depending on producer sync mode).
*
* Note: the ring implementation is not preemptible. Refer to Programmer's
* guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring

View File

@ -1089,6 +1089,10 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
return 0;
}
#ifdef ALLOW_EXPERIMENTAL_API
#include <rte_ring_peek.h>
#endif
#include <rte_ring.h>
#ifdef __cplusplus

View File

@ -0,0 +1,444 @@
/* SPDX-License-Identifier: BSD-3-Clause
*
* Copyright (c) 2010-2020 Intel Corporation
* Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
* All rights reserved.
* Derived from FreeBSD's bufring.h
* Used as BSD-3 Licensed with permission from Kip Macy.
*/
#ifndef _RTE_RING_PEEK_H_
#define _RTE_RING_PEEK_H_
/**
* @file
* @b EXPERIMENTAL: this API may change without prior notice
* It is not recommended to include this file directly.
* Please include <rte_ring_elem.h> instead.
*
* Ring Peek API
* Introduction of rte_ring with serialized producer/consumer (HTS sync mode)
* makes possible to split public enqueue/dequeue API into two phases:
* - enqueue/dequeue start
* - enqueue/dequeue finish
* That allows user to inspect objects in the ring without removing them
* from it (aka MT safe peek).
* Note that right now this new API is available only for two sync modes:
* 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
* 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
* It is a user responsibility to create/init ring with appropriate sync
* modes selected.
* As an example:
* // read 1 elem from the ring:
* n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL);
* if (n != 0) {
* //examine object
* if (object_examine(obj) == KEEP)
* //decided to keep it in the ring.
* rte_ring_dequeue_finish(ring, 0);
* else
* //decided to remove it from the ring.
* rte_ring_dequeue_finish(ring, n);
* }
* Note that between _start_ and _finish_ none other thread can proceed
* with enqueue(/dequeue) operation till _finish_ completes.
*/
#ifdef __cplusplus
extern "C" {
#endif
#include <rte_ring_peek_c11_mem.h>
/**
* @internal This function moves prod head value.
*/
static __rte_always_inline unsigned int
__rte_ring_do_enqueue_start(struct rte_ring *r, uint32_t n,
enum rte_ring_queue_behavior behavior, uint32_t *free_space)
{
uint32_t free, head, next;
switch (r->prod.sync_type) {
case RTE_RING_SYNC_ST:
n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
behavior, &head, &next, &free);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_move_prod_head(r, n, behavior,
&head, &free);
break;
default:
/* unsupported mode, shouldn't be here */
RTE_ASSERT(0);
n = 0;
}
if (free_space != NULL)
*free_space = free - n;
return n;
}
/**
* Start to enqueue several objects on the ring.
* Note that no actual objects are put in the queue by this function,
* it just reserves for user such ability.
* User has to call appropriate enqueue_elem_finish() to copy objects into the
* queue and complete given enqueue operation.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to add in the ring from the obj_table.
* @param free_space
* if non-NULL, returns the amount of space in the ring after the
* enqueue operation has finished.
* @return
* The number of objects that can be enqueued, either 0 or n
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_enqueue_bulk_elem_start(struct rte_ring *r, unsigned int n,
unsigned int *free_space)
{
return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_FIXED,
free_space);
}
/**
* Start to enqueue several objects on the ring.
* Note that no actual objects are put in the queue by this function,
* it just reserves for user such ability.
* User has to call appropriate enqueue_finish() to copy objects into the
* queue and complete given enqueue operation.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to add in the ring from the obj_table.
* @param free_space
* if non-NULL, returns the amount of space in the ring after the
* enqueue operation has finished.
* @return
* The number of objects that can be enqueued, either 0 or n
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_enqueue_bulk_start(struct rte_ring *r, unsigned int n,
unsigned int *free_space)
{
return rte_ring_enqueue_bulk_elem_start(r, n, free_space);
}
/**
* Start to enqueue several objects on the ring.
* Note that no actual objects are put in the queue by this function,
* it just reserves for user such ability.
* User has to call appropriate enqueue_elem_finish() to copy objects into the
* queue and complete given enqueue operation.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to add in the ring from the obj_table.
* @param free_space
* if non-NULL, returns the amount of space in the ring after the
* enqueue operation has finished.
* @return
* Actual number of objects that can be enqueued.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_enqueue_burst_elem_start(struct rte_ring *r, unsigned int n,
unsigned int *free_space)
{
return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_VARIABLE,
free_space);
}
/**
* Start to enqueue several objects on the ring.
* Note that no actual objects are put in the queue by this function,
* it just reserves for user such ability.
* User has to call appropriate enqueue_finish() to copy objects into the
* queue and complete given enqueue operation.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to add in the ring from the obj_table.
* @param free_space
* if non-NULL, returns the amount of space in the ring after the
* enqueue operation has finished.
* @return
* Actual number of objects that can be enqueued.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_enqueue_burst_start(struct rte_ring *r, unsigned int n,
unsigned int *free_space)
{
return rte_ring_enqueue_burst_elem_start(r, n, free_space);
}
/**
* Complete to enqueue several objects on the ring.
* Note that number of objects to enqueue should not exceed previous
* enqueue_start return value.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of objects.
* @param esize
* The size of ring element, in bytes. It must be a multiple of 4.
* This must be the same value used while creating the ring. Otherwise
* the results are undefined.
* @param n
* The number of objects to add to the ring from the obj_table.
*/
__rte_experimental
static __rte_always_inline void
rte_ring_enqueue_elem_finish(struct rte_ring *r, const void *obj_table,
unsigned int esize, unsigned int n)
{
uint32_t tail;
switch (r->prod.sync_type) {
case RTE_RING_SYNC_ST:
n = __rte_ring_st_get_tail(&r->prod, &tail, n);
if (n != 0)
__rte_ring_enqueue_elems(r, tail, obj_table, esize, n);
__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
if (n != 0)
__rte_ring_enqueue_elems(r, tail, obj_table, esize, n);
__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
break;
default:
/* unsupported mode, shouldn't be here */
RTE_ASSERT(0);
}
}
/**
* Complete to enqueue several objects on the ring.
* Note that number of objects to enqueue should not exceed previous
* enqueue_start return value.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of objects.
* @param n
* The number of objects to add to the ring from the obj_table.
*/
__rte_experimental
static __rte_always_inline void
rte_ring_enqueue_finish(struct rte_ring *r, void * const *obj_table,
unsigned int n)
{
rte_ring_enqueue_elem_finish(r, obj_table, sizeof(uintptr_t), n);
}
/**
* @internal This function moves cons head value and copies up to *n*
* objects from the ring to the user provided obj_table.
*/
static __rte_always_inline unsigned int
__rte_ring_do_dequeue_start(struct rte_ring *r, void *obj_table,
uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
uint32_t *available)
{
uint32_t avail, head, next;
switch (r->cons.sync_type) {
case RTE_RING_SYNC_ST:
n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
behavior, &head, &next, &avail);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_move_cons_head(r, n, behavior,
&head, &avail);
break;
default:
/* unsupported mode, shouldn't be here */
RTE_ASSERT(0);
n = 0;
}
if (n != 0)
__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
if (available != NULL)
*available = avail - n;
return n;
}
/**
* Start to dequeue several objects from the ring.
* Note that user has to call appropriate dequeue_finish()
* to complete given dequeue operation and actually remove objects the ring.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of objects that will be filled.
* @param esize
* The size of ring element, in bytes. It must be a multiple of 4.
* This must be the same value used while creating the ring. Otherwise
* the results are undefined.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param available
* If non-NULL, returns the number of remaining ring entries after the
* dequeue has finished.
* @return
* The number of objects dequeued, either 0 or n.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_dequeue_bulk_elem_start(struct rte_ring *r, void *obj_table,
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_start(r, obj_table, esize, n,
RTE_RING_QUEUE_FIXED, available);
}
/**
* Start to dequeue several objects from the ring.
* Note that user has to call appropriate dequeue_finish()
* to complete given dequeue operation and actually remove objects the ring.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param available
* If non-NULL, returns the number of remaining ring entries after the
* dequeue has finished.
* @return
* Actual number of objects dequeued.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_dequeue_bulk_start(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return rte_ring_dequeue_bulk_elem_start(r, obj_table, sizeof(uintptr_t),
n, available);
}
/**
* Start to dequeue several objects from the ring.
* Note that user has to call appropriate dequeue_finish()
* to complete given dequeue operation and actually remove objects the ring.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of objects that will be filled.
* @param esize
* The size of ring element, in bytes. It must be a multiple of 4.
* This must be the same value used while creating the ring. Otherwise
* the results are undefined.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param available
* If non-NULL, returns the number of remaining ring entries after the
* dequeue has finished.
* @return
* The actual number of objects dequeued.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_dequeue_burst_elem_start(struct rte_ring *r, void *obj_table,
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_start(r, obj_table, esize, n,
RTE_RING_QUEUE_VARIABLE, available);
}
/**
* Start to dequeue several objects from the ring.
* Note that user has to call appropriate dequeue_finish()
* to complete given dequeue operation and actually remove objects the ring.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param available
* If non-NULL, returns the number of remaining ring entries after the
* dequeue has finished.
* @return
* The actual number of objects dequeued.
*/
__rte_experimental
static __rte_always_inline unsigned int
rte_ring_dequeue_burst_start(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
return rte_ring_dequeue_burst_elem_start(r, obj_table,
sizeof(uintptr_t), n, available);
}
/**
* Complete to dequeue several objects from the ring.
* Note that number of objects to dequeue should not exceed previous
* dequeue_start return value.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to remove from the ring.
*/
__rte_experimental
static __rte_always_inline void
rte_ring_dequeue_elem_finish(struct rte_ring *r, unsigned int n)
{
uint32_t tail;
switch (r->cons.sync_type) {
case RTE_RING_SYNC_ST:
n = __rte_ring_st_get_tail(&r->cons, &tail, n);
__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
break;
default:
/* unsupported mode, shouldn't be here */
RTE_ASSERT(0);
}
}
/**
* Complete to dequeue several objects from the ring.
* Note that number of objects to dequeue should not exceed previous
* dequeue_start return value.
*
* @param r
* A pointer to the ring structure.
* @param n
* The number of objects to remove from the ring.
*/
__rte_experimental
static __rte_always_inline void
rte_ring_dequeue_finish(struct rte_ring *r, unsigned int n)
{
rte_ring_dequeue_elem_finish(r, n);
}
#ifdef __cplusplus
}
#endif
#endif /* _RTE_RING_PEEK_H_ */

View File

@ -0,0 +1,110 @@
/* SPDX-License-Identifier: BSD-3-Clause
*
* Copyright (c) 2010-2020 Intel Corporation
* Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
* All rights reserved.
* Derived from FreeBSD's bufring.h
* Used as BSD-3 Licensed with permission from Kip Macy.
*/
#ifndef _RTE_RING_PEEK_C11_MEM_H_
#define _RTE_RING_PEEK_C11_MEM_H_
/**
* @file rte_ring_peek_c11_mem.h
* It is not recommended to include this file directly,
* include <rte_ring.h> instead.
* Contains internal helper functions for rte_ring peek API.
* For more information please refer to <rte_ring_peek.h>.
*/
/**
* @internal get current tail value.
* This function should be used only for single thread producer/consumer.
* Check that user didn't request to move tail above the head.
* In that situation:
* - return zero, that will cause abort any pending changes and
* return head to its previous position.
* - throw an assert in debug mode.
*/
static __rte_always_inline uint32_t
__rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail,
uint32_t num)
{
uint32_t h, n, t;
h = ht->head;
t = ht->tail;
n = h - t;
RTE_ASSERT(n >= num);
num = (n >= num) ? num : 0;
*tail = h;
return num;
}
/**
* @internal set new values for head and tail.
* This function should be used only for single thread producer/consumer.
* Should be used only in conjunction with __rte_ring_st_get_tail.
*/
static __rte_always_inline void
__rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail,
uint32_t num, uint32_t enqueue)
{
uint32_t pos;
RTE_SET_USED(enqueue);
pos = tail + num;
ht->head = pos;
__atomic_store_n(&ht->tail, pos, __ATOMIC_RELEASE);
}
/**
* @internal get current tail value.
* This function should be used only for producer/consumer in MT_HTS mode.
* Check that user didn't request to move tail above the head.
* In that situation:
* - return zero, that will cause abort any pending changes and
* return head to its previous position.
* - throw an assert in debug mode.
*/
static __rte_always_inline uint32_t
__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail,
uint32_t num)
{
uint32_t n;
union __rte_ring_hts_pos p;
p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
n = p.pos.head - p.pos.tail;
RTE_ASSERT(n >= num);
num = (n >= num) ? num : 0;
*tail = p.pos.tail;
return num;
}
/**
* @internal set new values for head and tail as one atomic 64 bit operation.
* This function should be used only for producer/consumer in MT_HTS mode.
* Should be used only in conjunction with __rte_ring_hts_get_tail.
*/
static __rte_always_inline void
__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail,
uint32_t num, uint32_t enqueue)
{
union __rte_ring_hts_pos p;
RTE_SET_USED(enqueue);
p.pos.head = tail + num;
p.pos.tail = p.pos.head;
__atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE);
}
#endif /* _RTE_RING_PEEK_C11_MEM_H_ */