ring/c11: synchronize load and store of the tail
Synchronize the load-acquire of the tail and the store-release
within update_tail, the store release ensures all the ring operations,
enqueue or dequeue, are seen by the observers on the other side as soon
as they see the updated tail. The load-acquire is needed here as the
data dependency is not a reliable way for ordering as the compiler might
break it by saving to temporary values to boost performance.
When computing the free_entries and avail_entries, use atomic semantics
to load the heads and tails instead.
The patch was benchmarked with test/ring_perf_autotest and it decreases
the enqueue/dequeue latency by 5% ~ 27.6% with two lcores, the real gains
are dependent on the number of lcores, depth of the ring, SPSC or MPMC.
For 1 lcore, it also improves a little, about 3 ~ 4%.
It is a big improvement, in case of MPMC, with two lcores and ring size
of 32, it saves latency up to (3.26-2.36)/3.26 = 27.6%.
This patch is a bug fix, while the improvement is a bonus. In our analysis
the improvement comes from the cacheline pre-filling after hoisting load-
acquire from _atomic_compare_exchange_n up above.
The test command:
$sudo ./test/test/test -l 16-19,44-47,72-75,100-103 -n 4 --socket-mem=\
1024 -- -i
Test result with this patch(two cores):
SP/SC bulk enq/dequeue (size: 8): 5.86
MP/MC bulk enq/dequeue (size: 8): 10.15
SP/SC bulk enq/dequeue (size: 32): 1.94
MP/MC bulk enq/dequeue (size: 32): 2.36
In comparison of the test result without this patch:
SP/SC bulk enq/dequeue (size: 8): 6.67
MP/MC bulk enq/dequeue (size: 8): 13.12
SP/SC bulk enq/dequeue (size: 32): 2.04
MP/MC bulk enq/dequeue (size: 32): 3.26
Fixes: 39368ebfc6
("ring: introduce C11 memory model barrier option")
Cc: stable@dpdk.org
Signed-off-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
Reviewed-by: Jia He <justin.he@arm.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
Tested-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
This commit is contained in:
parent
305921f450
commit
9ed8770628
@ -57,6 +57,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
|
||||
uint32_t *free_entries)
|
||||
{
|
||||
const uint32_t capacity = r->capacity;
|
||||
uint32_t cons_tail;
|
||||
unsigned int max = n;
|
||||
int success;
|
||||
|
||||
@ -67,13 +68,18 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
|
||||
*old_head = __atomic_load_n(&r->prod.head,
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
/*
|
||||
* The subtraction is done between two unsigned 32bits value
|
||||
/* load-acquire synchronize with store-release of ht->tail
|
||||
* in update_tail.
|
||||
*/
|
||||
cons_tail = __atomic_load_n(&r->cons.tail,
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
/* The subtraction is done between two unsigned 32bits value
|
||||
* (the result is always modulo 32 bits even if we have
|
||||
* *old_head > cons_tail). So 'free_entries' is always between 0
|
||||
* and capacity (which is < size).
|
||||
*/
|
||||
*free_entries = (capacity + r->cons.tail - *old_head);
|
||||
*free_entries = (capacity + cons_tail - *old_head);
|
||||
|
||||
/* check that we have enough room in ring */
|
||||
if (unlikely(n > *free_entries))
|
||||
@ -125,21 +131,29 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
|
||||
uint32_t *entries)
|
||||
{
|
||||
unsigned int max = n;
|
||||
uint32_t prod_tail;
|
||||
int success;
|
||||
|
||||
/* move cons.head atomically */
|
||||
do {
|
||||
/* Restore n as it may change every loop */
|
||||
n = max;
|
||||
|
||||
*old_head = __atomic_load_n(&r->cons.head,
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
/* this load-acquire synchronize with store-release of ht->tail
|
||||
* in update_tail.
|
||||
*/
|
||||
prod_tail = __atomic_load_n(&r->prod.tail,
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
/* The subtraction is done between two unsigned 32bits value
|
||||
* (the result is always modulo 32 bits even if we have
|
||||
* cons_head > prod_tail). So 'entries' is always between 0
|
||||
* and size(ring)-1.
|
||||
*/
|
||||
*entries = (r->prod.tail - *old_head);
|
||||
*entries = (prod_tail - *old_head);
|
||||
|
||||
/* Set the actual entries for dequeue */
|
||||
if (n > *entries)
|
||||
|
Loading…
Reference in New Issue
Block a user