ring: make bulk and burst return values consistent

The bulk fns for rings returns 0 for all elements enqueued and negative
for no space. Change that to make them consistent with the burst functions
in returning the number of elements enqueued/dequeued, i.e. 0 or N.
This change also allows the return value from enq/deq to be used directly
without a branch for error checking.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Reviewed-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
This commit is contained in:
Bruce Richardson 2017-03-29 16:21:23 +01:00 committed by Thomas Monjalon
parent 77dd306427
commit cfa7c9e6fc
14 changed files with 106 additions and 130 deletions

View File

@ -137,6 +137,17 @@ API Changes
* removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
* removed the function ``rte_ring_set_water_mark`` as part of a general
removal of watermarks support in the library.
* changed the return value of the enqueue and dequeue bulk functions to
match that of the burst equivalents. In all cases, ring functions which
operate on multiple packets now return the number of elements enqueued
or dequeued, as appropriate. The updated functions are:
- ``rte_ring_mp_enqueue_bulk``
- ``rte_ring_sp_enqueue_bulk``
- ``rte_ring_enqueue_bulk``
- ``rte_ring_mc_dequeue_bulk``
- ``rte_ring_sc_dequeue_bulk``
- ``rte_ring_dequeue_bulk``
ABI Changes
-----------

View File

@ -286,7 +286,7 @@ repeated infinitely.
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
cl_rx_buf[node].count) != 0){
cl_rx_buf[node].count) != cl_rx_buf[node].count){
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;

View File

@ -146,7 +146,7 @@ app_lcore_io_rx_buffer_to_send (
(void **) lp->rx.mbuf_out[worker].array,
bsz);
if (unlikely(ret == -ENOBUFS)) {
if (unlikely(ret == 0)) {
uint32_t k;
for (k = 0; k < bsz; k ++) {
struct rte_mbuf *m = lp->rx.mbuf_out[worker].array[k];
@ -312,7 +312,7 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
(void **) lp->rx.mbuf_out[worker].array,
lp->rx.mbuf_out[worker].n_mbufs);
if (unlikely(ret < 0)) {
if (unlikely(ret == 0)) {
uint32_t k;
for (k = 0; k < lp->rx.mbuf_out[worker].n_mbufs; k ++) {
struct rte_mbuf *pkt_to_free = lp->rx.mbuf_out[worker].array[k];
@ -349,9 +349,8 @@ app_lcore_io_tx(
(void **) &lp->tx.mbuf_out[port].array[n_mbufs],
bsz_rd);
if (unlikely(ret == -ENOENT)) {
if (unlikely(ret == 0))
continue;
}
n_mbufs += bsz_rd;
@ -505,9 +504,8 @@ app_lcore_worker(
(void **) lp->mbuf_in.array,
bsz_rd);
if (unlikely(ret == -ENOENT)) {
if (unlikely(ret == 0))
continue;
}
#if APP_WORKER_DROP_ALL_PACKETS
for (j = 0; j < bsz_rd; j ++) {
@ -559,7 +557,7 @@ app_lcore_worker(
#if APP_STATS
lp->rings_out_iters[port] ++;
if (ret == 0) {
if (ret > 0) {
lp->rings_out_count[port] += 1;
}
if (lp->rings_out_iters[port] == APP_STATS){
@ -572,7 +570,7 @@ app_lcore_worker(
}
#endif
if (unlikely(ret == -ENOBUFS)) {
if (unlikely(ret == 0)) {
uint32_t k;
for (k = 0; k < bsz_wr; k ++) {
struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
@ -609,7 +607,7 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp)
(void **) lp->mbuf_out[port].array,
lp->mbuf_out[port].n_mbufs);
if (unlikely(ret < 0)) {
if (unlikely(ret == 0)) {
uint32_t k;
for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];

View File

@ -276,14 +276,10 @@ main(int argc, char *argv[])
printf("[Press Ctrl-C to quit ...]\n");
for (;;) {
uint16_t i, rx_pkts = PKT_READ_SIZE;
uint16_t i, rx_pkts;
uint8_t port;
/* try dequeuing max possible packets first, if that fails, get the
* most we can. Loop body should only execute once, maximum */
while (rx_pkts > 0 &&
unlikely(rte_ring_dequeue_bulk(rx_ring, pkts, rx_pkts) != 0))
rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts, PKT_READ_SIZE);
if (unlikely(rx_pkts == 0)){
if (need_flush)

View File

@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client)
cl = &clients[client];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
cl_rx_buf[client].count) != 0){
cl_rx_buf[client].count) == 0){
for (j = 0; j < cl_rx_buf[client].count; j++)
rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[client].count;

View File

@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs)
}
if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
(void **)rx_mbufs, nb_rx) != 0)) {
(void **)rx_mbufs, nb_rx) == 0)) {
for(i = 0; i < nb_rx; i++) {
rte_pktmbuf_free(rx_mbufs[i]);
@ -180,7 +180,7 @@ app_tx_thread(struct thread_conf **confs)
while ((conf = confs[conf_idx])) {
retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void **)mbufs,
burst_conf.qos_dequeue);
if (likely(retval == 0)) {
if (likely(retval != 0)) {
app_send_packets(conf, mbufs, burst_conf.qos_dequeue);
conf->counter = 0; /* reset empty read loop counter */
@ -230,7 +230,9 @@ app_worker_thread(struct thread_conf **confs)
nb_pkt = rte_sched_port_dequeue(conf->sched_port, mbufs,
burst_conf.qos_dequeue);
if (likely(nb_pkt > 0))
while (rte_ring_sp_enqueue_bulk(conf->tx_ring, (void **)mbufs, nb_pkt) != 0);
while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
(void **)mbufs, nb_pkt) == 0)
; /* empty body */
conf_idx++;
if (confs[conf_idx] == NULL)

View File

@ -392,7 +392,7 @@ main(int argc, char *argv[])
*/
while (rx_pkts > 0 &&
unlikely(rte_ring_dequeue_bulk(rx_ring, pkts,
rx_pkts) != 0))
rx_pkts) == 0))
rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring),
PKT_READ_SIZE);

View File

@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node)
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
cl_rx_buf[node].count) != 0){
cl_rx_buf[node].count) != cl_rx_buf[node].count){
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;

View File

@ -42,26 +42,30 @@ static int
common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
unsigned n)
{
return rte_ring_mp_enqueue_bulk(mp->pool_data, obj_table, n);
return rte_ring_mp_enqueue_bulk(mp->pool_data,
obj_table, n) == 0 ? -ENOBUFS : 0;
}
static int
common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
unsigned n)
{
return rte_ring_sp_enqueue_bulk(mp->pool_data, obj_table, n);
return rte_ring_sp_enqueue_bulk(mp->pool_data,
obj_table, n) == 0 ? -ENOBUFS : 0;
}
static int
common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
{
return rte_ring_mc_dequeue_bulk(mp->pool_data, obj_table, n);
return rte_ring_mc_dequeue_bulk(mp->pool_data,
obj_table, n) == 0 ? -ENOBUFS : 0;
}
static int
common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
{
return rte_ring_sc_dequeue_bulk(mp->pool_data, obj_table, n);
return rte_ring_sc_dequeue_bulk(mp->pool_data,
obj_table, n) == 0 ? -ENOBUFS : 0;
}
static unsigned

View File

@ -349,14 +349,10 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
* Actual number of objects enqueued.
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
@ -388,7 +384,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
if (behavior == RTE_RING_QUEUE_FIXED)
return -ENOBUFS;
return 0;
else {
/* No free entry available */
if (unlikely(free_entries == 0))
@ -414,7 +410,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
rte_pause();
r->prod.tail = prod_next;
return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
return n;
}
/**
@ -430,14 +426,10 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
* Actual number of objects enqueued.
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
@ -457,7 +449,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
if (behavior == RTE_RING_QUEUE_FIXED)
return -ENOBUFS;
return 0;
else {
/* No free entry available */
if (unlikely(free_entries == 0))
@ -474,7 +466,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
rte_smp_wmb();
r->prod.tail = prod_next;
return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
return n;
}
/**
@ -495,16 +487,11 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects dequeued.
* - Actual number of objects dequeued.
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
@ -536,7 +523,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
/* Set the actual entries for dequeue */
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED)
return -ENOENT;
return 0;
else {
if (unlikely(entries == 0))
return 0;
@ -562,7 +549,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
r->cons.tail = cons_next;
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
return n;
}
/**
@ -580,15 +567,10 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects dequeued.
* - Actual number of objects dequeued.
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
@ -607,7 +589,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED)
return -ENOENT;
return 0;
else {
if (unlikely(entries == 0))
return 0;
@ -623,7 +605,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
rte_smp_rmb();
r->cons.tail = cons_next;
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
return n;
}
/**
@ -639,10 +621,9 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
* @param n
* The number of objects to add in the ring from the obj_table.
* @return
* - 0: Success; objects enqueue.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* The number of objects enqueued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
@ -659,10 +640,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @param n
* The number of objects to add in the ring from the obj_table.
* @return
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
* The number of objects enqueued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
@ -683,10 +663,9 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @param n
* The number of objects to add in the ring from the obj_table.
* @return
* - 0: Success; objects enqueued.
* - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
* The number of objects enqueued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
@ -713,7 +692,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
static inline int __attribute__((always_inline))
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
return rte_ring_mp_enqueue_bulk(r, &obj, 1);
return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
}
/**
@ -730,7 +709,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
static inline int __attribute__((always_inline))
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{
return rte_ring_sp_enqueue_bulk(r, &obj, 1);
return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
}
/**
@ -751,10 +730,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{
if (r->prod.single)
return rte_ring_sp_enqueue(r, obj);
else
return rte_ring_mp_enqueue(r, obj);
return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
}
/**
@ -770,11 +746,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
* The number of objects dequeued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@ -791,11 +765,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
* The number of objects to dequeue from the ring to the obj_table,
* must be strictly positive.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
* The number of objects dequeued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@ -815,11 +787,9 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue, no object is
* dequeued.
* The number of objects dequeued, either 0 or n
*/
static inline int __attribute__((always_inline))
static inline unsigned int __attribute__((always_inline))
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
if (r->cons.single)
@ -846,7 +816,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
static inline int __attribute__((always_inline))
rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
{
return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
return rte_ring_mc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
}
/**
@ -864,7 +834,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
static inline int __attribute__((always_inline))
rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
{
return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
}
/**
@ -886,10 +856,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
static inline int __attribute__((always_inline))
rte_ring_dequeue(struct rte_ring *r, void **obj_p)
{
if (r->cons.single)
return rte_ring_sc_dequeue(r, obj_p);
else
return rte_ring_mc_dequeue(r, obj_p);
return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
}
/**

View File

@ -547,6 +547,6 @@ app_main_loop_rx_metadata(void) {
app.rings_rx[i],
(void **) app.mbuf_rx.array,
n_mbufs);
} while (ret < 0);
} while (ret == 0);
}
}

View File

@ -98,7 +98,7 @@ app_main_loop_rx(void) {
app.rings_rx[i],
(void **) app.mbuf_rx.array,
n_mbufs);
} while (ret < 0);
} while (ret == 0);
}
}
@ -123,7 +123,7 @@ app_main_loop_worker(void) {
(void **) worker_mbuf->array,
app.burst_size_worker_read);
if (ret == -ENOENT)
if (ret == 0)
continue;
do {
@ -131,7 +131,7 @@ app_main_loop_worker(void) {
app.rings_tx[i ^ 1],
(void **) worker_mbuf->array,
app.burst_size_worker_write);
} while (ret < 0);
} while (ret == 0);
}
}
@ -152,7 +152,7 @@ app_main_loop_tx(void) {
(void **) &app.mbuf_tx[i].array[n_mbufs],
app.burst_size_tx_read);
if (ret == -ENOENT)
if (ret == 0)
continue;
n_mbufs += app.burst_size_tx_read;

View File

@ -117,20 +117,18 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
printf("%s: iteration %u, random shift: %u;\n",
__func__, i, rand);
TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
rand));
TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0);
TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
/* fill the ring */
TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
rsz));
TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
TEST_RING_VERIFY(0 == rte_ring_free_count(r));
TEST_RING_VERIFY(rsz == rte_ring_count(r));
TEST_RING_VERIFY(rte_ring_full(r));
TEST_RING_VERIFY(0 == rte_ring_empty(r));
/* empty the ring */
TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rsz) == rsz);
TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
TEST_RING_VERIFY(0 == rte_ring_count(r));
TEST_RING_VERIFY(0 == rte_ring_full(r));
@ -171,37 +169,37 @@ test_ring_basic(void)
printf("enqueue 1 obj\n");
ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
cur_src += 1;
if (ret != 0)
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
cur_src += 2;
if (ret != 0)
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
cur_src += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue 1 obj\n");
ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
cur_dst += 1;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue 2 objs\n");
ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
cur_dst += 2;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue MAX_BULK objs\n");
ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
cur_dst += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
/* check data */
@ -217,37 +215,37 @@ test_ring_basic(void)
printf("enqueue 1 obj\n");
ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
cur_src += 1;
if (ret != 0)
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
cur_src += 2;
if (ret != 0)
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
cur_src += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue 1 obj\n");
ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
cur_dst += 1;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue 2 objs\n");
ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
cur_dst += 2;
if (ret != 0)
if (ret == 0)
goto fail;
printf("dequeue MAX_BULK objs\n");
ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
cur_dst += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
/* check data */
@ -264,11 +262,11 @@ test_ring_basic(void)
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
cur_src += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
cur_dst += MAX_BULK;
if (ret != 0)
if (ret == 0)
goto fail;
}
@ -294,25 +292,25 @@ test_ring_basic(void)
ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
cur_src += num_elems;
if (ret != 0) {
if (ret == 0) {
printf("Cannot enqueue\n");
goto fail;
}
ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
cur_src += num_elems;
if (ret != 0) {
if (ret == 0) {
printf("Cannot enqueue\n");
goto fail;
}
ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
cur_dst += num_elems;
if (ret != 0) {
if (ret == 0) {
printf("Cannot dequeue\n");
goto fail;
}
ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
cur_dst += num_elems;
if (ret != 0) {
if (ret == 0) {
printf("Cannot dequeue2\n");
goto fail;
}

View File

@ -195,13 +195,13 @@ enqueue_bulk(void *p)
const uint64_t sp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
while (rte_ring_sp_enqueue_bulk(r, burst, size) != 0)
while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
rte_pause();
const uint64_t sp_end = rte_rdtsc();
const uint64_t mp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
while (rte_ring_mp_enqueue_bulk(r, burst, size) != 0)
while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
rte_pause();
const uint64_t mp_end = rte_rdtsc();
@ -230,13 +230,13 @@ dequeue_bulk(void *p)
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
while (rte_ring_sc_dequeue_bulk(r, burst, size) != 0)
while (rte_ring_sc_dequeue_bulk(r, burst, size) == 0)
rte_pause();
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
while (rte_ring_mc_dequeue_bulk(r, burst, size) != 0)
while (rte_ring_mc_dequeue_bulk(r, burst, size) == 0)
rte_pause();
const uint64_t mc_end = rte_rdtsc();