85cffb2ecc
In weak memory models, like arm64, reading the prod.tail may get
reordered after reading the ring slots, which corrupts the ring and
stale data is observed.
This issue was reported by NXP on 8-A72 DPAA2 board. The problem is most
likely caused by missing the acquire semantics when reading
prod.tail (in SC dequeue) which makes it possible to read a
stale value from the ring slots.
For MP (and MC) case, rte_atomic32_cmpset() already provides the required
ordering. For SP case, the control depependency between if-statement (which
depends on the read of r->cons.tail) and the later stores to the ring slots
make RMB unnecessary. About the control dependency, read more at:
https://www.cl.cam.ac.uk/~pes20/ppc-supplemental/test7.pdf
This patch is adding the required read barrier to prevent reading the ring
slots get reordered before reading prod.tail for SC case.
Fixes: c9fb3c6289
("ring: move code in a new header file")
Cc: stable@dpdk.org
Signed-off-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
Acked-by: Nipun Gupta <nipun.gupta@nxp.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
174 lines
4.8 KiB
C
174 lines
4.8 KiB
C
/* SPDX-License-Identifier: BSD-3-Clause
|
|
*
|
|
* Copyright (c) 2010-2017 Intel Corporation
|
|
* Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
|
|
* All rights reserved.
|
|
* Derived from FreeBSD's bufring.h
|
|
* Used as BSD-3 Licensed with permission from Kip Macy.
|
|
*/
|
|
|
|
#ifndef _RTE_RING_GENERIC_H_
|
|
#define _RTE_RING_GENERIC_H_
|
|
|
|
static __rte_always_inline void
|
|
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
|
|
uint32_t single, uint32_t enqueue)
|
|
{
|
|
if (enqueue)
|
|
rte_smp_wmb();
|
|
else
|
|
rte_smp_rmb();
|
|
/*
|
|
* If there are other enqueues/dequeues in progress that preceded us,
|
|
* we need to wait for them to complete
|
|
*/
|
|
if (!single)
|
|
while (unlikely(ht->tail != old_val))
|
|
rte_pause();
|
|
|
|
ht->tail = new_val;
|
|
}
|
|
|
|
/**
|
|
* @internal This function updates the producer head for enqueue
|
|
*
|
|
* @param r
|
|
* A pointer to the ring structure
|
|
* @param is_sp
|
|
* Indicates whether multi-producer path is needed or not
|
|
* @param n
|
|
* The number of elements we will want to enqueue, i.e. how far should the
|
|
* head be moved
|
|
* @param behavior
|
|
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
|
|
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
|
|
* @param old_head
|
|
* Returns head value as it was before the move, i.e. where enqueue starts
|
|
* @param new_head
|
|
* Returns the current/new head value i.e. where enqueue finishes
|
|
* @param free_entries
|
|
* Returns the amount of free space in the ring BEFORE head was moved
|
|
* @return
|
|
* Actual number of objects enqueued.
|
|
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
|
|
*/
|
|
static __rte_always_inline unsigned int
|
|
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
|
|
unsigned int n, enum rte_ring_queue_behavior behavior,
|
|
uint32_t *old_head, uint32_t *new_head,
|
|
uint32_t *free_entries)
|
|
{
|
|
const uint32_t capacity = r->capacity;
|
|
unsigned int max = n;
|
|
int success;
|
|
|
|
do {
|
|
/* Reset n to the initial burst count */
|
|
n = max;
|
|
|
|
*old_head = r->prod.head;
|
|
|
|
/* add rmb barrier to avoid load/load reorder in weak
|
|
* memory model. It is noop on x86
|
|
*/
|
|
rte_smp_rmb();
|
|
|
|
/*
|
|
* The subtraction is done between two unsigned 32bits value
|
|
* (the result is always modulo 32 bits even if we have
|
|
* *old_head > cons_tail). So 'free_entries' is always between 0
|
|
* and capacity (which is < size).
|
|
*/
|
|
*free_entries = (capacity + r->cons.tail - *old_head);
|
|
|
|
/* check that we have enough room in ring */
|
|
if (unlikely(n > *free_entries))
|
|
n = (behavior == RTE_RING_QUEUE_FIXED) ?
|
|
0 : *free_entries;
|
|
|
|
if (n == 0)
|
|
return 0;
|
|
|
|
*new_head = *old_head + n;
|
|
if (is_sp)
|
|
r->prod.head = *new_head, success = 1;
|
|
else
|
|
success = rte_atomic32_cmpset(&r->prod.head,
|
|
*old_head, *new_head);
|
|
} while (unlikely(success == 0));
|
|
return n;
|
|
}
|
|
|
|
/**
|
|
* @internal This function updates the consumer head for dequeue
|
|
*
|
|
* @param r
|
|
* A pointer to the ring structure
|
|
* @param is_sc
|
|
* Indicates whether multi-consumer path is needed or not
|
|
* @param n
|
|
* The number of elements we will want to enqueue, i.e. how far should the
|
|
* head be moved
|
|
* @param behavior
|
|
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
|
|
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
|
|
* @param old_head
|
|
* Returns head value as it was before the move, i.e. where dequeue starts
|
|
* @param new_head
|
|
* Returns the current/new head value i.e. where dequeue finishes
|
|
* @param entries
|
|
* Returns the number of entries in the ring BEFORE head was moved
|
|
* @return
|
|
* - Actual number of objects dequeued.
|
|
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
|
|
*/
|
|
static __rte_always_inline unsigned int
|
|
__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
|
|
unsigned int n, enum rte_ring_queue_behavior behavior,
|
|
uint32_t *old_head, uint32_t *new_head,
|
|
uint32_t *entries)
|
|
{
|
|
unsigned int max = n;
|
|
int success;
|
|
|
|
/* move cons.head atomically */
|
|
do {
|
|
/* Restore n as it may change every loop */
|
|
n = max;
|
|
|
|
*old_head = r->cons.head;
|
|
|
|
/* add rmb barrier to avoid load/load reorder in weak
|
|
* memory model. It is noop on x86
|
|
*/
|
|
rte_smp_rmb();
|
|
|
|
/* The subtraction is done between two unsigned 32bits value
|
|
* (the result is always modulo 32 bits even if we have
|
|
* cons_head > prod_tail). So 'entries' is always between 0
|
|
* and size(ring)-1.
|
|
*/
|
|
*entries = (r->prod.tail - *old_head);
|
|
|
|
/* Set the actual entries for dequeue */
|
|
if (n > *entries)
|
|
n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
|
|
|
|
if (unlikely(n == 0))
|
|
return 0;
|
|
|
|
*new_head = *old_head + n;
|
|
if (is_sc) {
|
|
r->cons.head = *new_head;
|
|
rte_smp_rmb();
|
|
success = 1;
|
|
} else {
|
|
success = rte_atomic32_cmpset(&r->cons.head, *old_head,
|
|
*new_head);
|
|
}
|
|
} while (unlikely(success == 0));
|
|
return n;
|
|
}
|
|
|
|
#endif /* _RTE_RING_GENERIC_H_ */
|