49594a6314
When calling __atomic_compare_exchange_n, use relaxed ordering for the success case, as multiple producers/consumers do not release updates to each other so no need for acquire or release ordering. Because the thread fence in place, ordering for the first iteration can be relaxed. Run the ring perf test on the following testbed: HW: ThunderX2 B0 CPU CN9975 v2.0, 2 sockets, 28core,4 threads/core,2.5GHz OS: Ubuntu 16.04.5 LTS, Kernel: 4.15.0-36-generic DPDK: 18.08, Configuration: arm64-armv8a-linuxapp-gcc gcc: 8.1.0 $sudo ./test/test/test -l 16-19,44-47,72-75,100-103 -n 4 \ --socket-mem=1024 -- -i Without the patch: *** Testing using two physical cores *** SP/SC bulk enq/dequeue (size: 8): 5.75 MP/MC bulk enq/dequeue (size: 8): 10.18 SP/SC bulk enq/dequeue (size: 32): 1.80 MP/MC bulk enq/dequeue (size: 32): 2.34 With the patch: *** Testing using two physical cores *** SP/SC bulk enq/dequeue (size: 8): 5.59 MP/MC bulk enq/dequeue (size: 8): 10.54 SP/SC bulk enq/dequeue (size: 32): 1.73 MP/MC bulk enq/dequeue (size: 32): 2.38 No significant improvement, nor regression was seen, as the optimisation is not at the critical path. Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option") Cc: stable@dpdk.org Signed-off-by: Gavin Hu <gavin.hu@arm.com> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> Reviewed-by: Steve Capper <steve.capper@arm.com> Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
182 lines
5.3 KiB
C
182 lines
5.3 KiB
C
/* SPDX-License-Identifier: BSD-3-Clause
|
|
*
|
|
* Copyright (c) 2017,2018 HXT-semitech Corporation.
|
|
* Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
|
|
* All rights reserved.
|
|
* Derived from FreeBSD's bufring.h
|
|
* Used as BSD-3 Licensed with permission from Kip Macy.
|
|
*/
|
|
|
|
#ifndef _RTE_RING_C11_MEM_H_
|
|
#define _RTE_RING_C11_MEM_H_
|
|
|
|
static __rte_always_inline void
|
|
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
|
|
uint32_t single, uint32_t enqueue)
|
|
{
|
|
RTE_SET_USED(enqueue);
|
|
|
|
/*
|
|
* If there are other enqueues/dequeues in progress that preceded us,
|
|
* we need to wait for them to complete
|
|
*/
|
|
if (!single)
|
|
while (unlikely(ht->tail != old_val))
|
|
rte_pause();
|
|
|
|
__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
|
|
}
|
|
|
|
/**
|
|
* @internal This function updates the producer head for enqueue
|
|
*
|
|
* @param r
|
|
* A pointer to the ring structure
|
|
* @param is_sp
|
|
* Indicates whether multi-producer path is needed or not
|
|
* @param n
|
|
* The number of elements we will want to enqueue, i.e. how far should the
|
|
* head be moved
|
|
* @param behavior
|
|
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
|
|
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
|
|
* @param old_head
|
|
* Returns head value as it was before the move, i.e. where enqueue starts
|
|
* @param new_head
|
|
* Returns the current/new head value i.e. where enqueue finishes
|
|
* @param free_entries
|
|
* Returns the amount of free space in the ring BEFORE head was moved
|
|
* @return
|
|
* Actual number of objects enqueued.
|
|
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
|
|
*/
|
|
static __rte_always_inline unsigned int
|
|
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
|
|
unsigned int n, enum rte_ring_queue_behavior behavior,
|
|
uint32_t *old_head, uint32_t *new_head,
|
|
uint32_t *free_entries)
|
|
{
|
|
const uint32_t capacity = r->capacity;
|
|
uint32_t cons_tail;
|
|
unsigned int max = n;
|
|
int success;
|
|
|
|
*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
|
|
do {
|
|
/* Reset n to the initial burst count */
|
|
n = max;
|
|
|
|
/* Ensure the head is read before tail */
|
|
__atomic_thread_fence(__ATOMIC_ACQUIRE);
|
|
|
|
/* load-acquire synchronize with store-release of ht->tail
|
|
* in update_tail.
|
|
*/
|
|
cons_tail = __atomic_load_n(&r->cons.tail,
|
|
__ATOMIC_ACQUIRE);
|
|
|
|
/* The subtraction is done between two unsigned 32bits value
|
|
* (the result is always modulo 32 bits even if we have
|
|
* *old_head > cons_tail). So 'free_entries' is always between 0
|
|
* and capacity (which is < size).
|
|
*/
|
|
*free_entries = (capacity + cons_tail - *old_head);
|
|
|
|
/* check that we have enough room in ring */
|
|
if (unlikely(n > *free_entries))
|
|
n = (behavior == RTE_RING_QUEUE_FIXED) ?
|
|
0 : *free_entries;
|
|
|
|
if (n == 0)
|
|
return 0;
|
|
|
|
*new_head = *old_head + n;
|
|
if (is_sp)
|
|
r->prod.head = *new_head, success = 1;
|
|
else
|
|
/* on failure, *old_head is updated */
|
|
success = __atomic_compare_exchange_n(&r->prod.head,
|
|
old_head, *new_head,
|
|
0, __ATOMIC_RELAXED,
|
|
__ATOMIC_RELAXED);
|
|
} while (unlikely(success == 0));
|
|
return n;
|
|
}
|
|
|
|
/**
|
|
* @internal This function updates the consumer head for dequeue
|
|
*
|
|
* @param r
|
|
* A pointer to the ring structure
|
|
* @param is_sc
|
|
* Indicates whether multi-consumer path is needed or not
|
|
* @param n
|
|
* The number of elements we will want to enqueue, i.e. how far should the
|
|
* head be moved
|
|
* @param behavior
|
|
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
|
|
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
|
|
* @param old_head
|
|
* Returns head value as it was before the move, i.e. where dequeue starts
|
|
* @param new_head
|
|
* Returns the current/new head value i.e. where dequeue finishes
|
|
* @param entries
|
|
* Returns the number of entries in the ring BEFORE head was moved
|
|
* @return
|
|
* - Actual number of objects dequeued.
|
|
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
|
|
*/
|
|
static __rte_always_inline unsigned int
|
|
__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
|
|
unsigned int n, enum rte_ring_queue_behavior behavior,
|
|
uint32_t *old_head, uint32_t *new_head,
|
|
uint32_t *entries)
|
|
{
|
|
unsigned int max = n;
|
|
uint32_t prod_tail;
|
|
int success;
|
|
|
|
/* move cons.head atomically */
|
|
*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
|
|
do {
|
|
/* Restore n as it may change every loop */
|
|
n = max;
|
|
|
|
/* Ensure the head is read before tail */
|
|
__atomic_thread_fence(__ATOMIC_ACQUIRE);
|
|
|
|
/* this load-acquire synchronize with store-release of ht->tail
|
|
* in update_tail.
|
|
*/
|
|
prod_tail = __atomic_load_n(&r->prod.tail,
|
|
__ATOMIC_ACQUIRE);
|
|
|
|
/* The subtraction is done between two unsigned 32bits value
|
|
* (the result is always modulo 32 bits even if we have
|
|
* cons_head > prod_tail). So 'entries' is always between 0
|
|
* and size(ring)-1.
|
|
*/
|
|
*entries = (prod_tail - *old_head);
|
|
|
|
/* Set the actual entries for dequeue */
|
|
if (n > *entries)
|
|
n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
|
|
|
|
if (unlikely(n == 0))
|
|
return 0;
|
|
|
|
*new_head = *old_head + n;
|
|
if (is_sc)
|
|
r->cons.head = *new_head, success = 1;
|
|
else
|
|
/* on failure, *old_head will be updated */
|
|
success = __atomic_compare_exchange_n(&r->cons.head,
|
|
old_head, *new_head,
|
|
0, __ATOMIC_RELAXED,
|
|
__ATOMIC_RELAXED);
|
|
} while (unlikely(success == 0));
|
|
return n;
|
|
}
|
|
|
|
#endif /* _RTE_RING_C11_MEM_H_ */
|