net/af_xdp: optimize Rx mbuf allocation

While receiving packets, the max bunch number of mbufs are allocated
and if hardware does not receive the max bunch number packets, it
will free redundancy mbufs, this is low performance.

So optimize Rx performance, by allocating number of mbuf based on
result of xsk_ring_cons__peek, to avoid to redundancy allocation,
and free mbuf when receive packets.

And Rx cached_cons must be roll backed if fails to allocate mbuf.

Signed-off-by: RongQing Li <lirongqing@baidu.com>
Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
Acked-by: Ciara Loftus <ciara.loftus@intel.com>
This commit is contained in:
RongQing Li 2020-11-25 19:01:32 +08:00 committed by Ferruh Yigit
parent bf403cfe3f
commit 543e64d244

View File

@ -255,28 +255,32 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xsk_umem_info *umem = rxq->umem;
uint32_t idx_rx = 0;
unsigned long rx_bytes = 0;
int rcvd, i;
int i;
struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
/* allocate bufs for fill queue replenishment after rx */
if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
AF_XDP_LOG(DEBUG,
"Failed to get enough buffers for fq.\n");
return 0;
}
nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
if (rcvd == 0) {
if (nb_pkts == 0) {
#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(fq))
(void)poll(rxq->fds, 1, 1000);
#endif
goto out;
return 0;
}
for (i = 0; i < rcvd; i++) {
/* allocate bufs for fill queue replenishment after rx */
if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
AF_XDP_LOG(DEBUG,
"Failed to get enough buffers for fq.\n");
/* rollback cached_cons which is added by
* xsk_ring_cons__peek
*/
rx->cached_cons -= nb_pkts;
return 0;
}
for (i = 0; i < nb_pkts; i++) {
const struct xdp_desc *desc;
uint64_t addr;
uint32_t len;
@ -301,20 +305,14 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
rx_bytes += len;
}
xsk_ring_cons__release(rx, rcvd);
(void)reserve_fill_queue(umem, rcvd, fq_bufs, fq);
xsk_ring_cons__release(rx, nb_pkts);
(void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
/* statistics */
rxq->stats.rx_pkts += rcvd;
rxq->stats.rx_pkts += nb_pkts;
rxq->stats.rx_bytes += rx_bytes;
out:
if (rcvd != nb_pkts)
rte_mempool_put_bulk(umem->mb_pool, (void **)&fq_bufs[rcvd],
nb_pkts - rcvd);
return rcvd;
return nb_pkts;
}
#else
static uint16_t
@ -326,7 +324,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xsk_ring_prod *fq = &rxq->fq;
uint32_t idx_rx = 0;
unsigned long rx_bytes = 0;
int rcvd, i;
int i;
uint32_t free_thresh = fq->size >> 1;
struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
@ -334,20 +332,24 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE,
NULL, fq);
if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
return 0;
rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
if (rcvd == 0) {
nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
if (nb_pkts == 0) {
#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(fq))
(void)poll(rxq->fds, 1, 1000);
#endif
goto out;
return 0;
}
for (i = 0; i < rcvd; i++) {
if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts))) {
/* rollback cached_cons which is added by
* xsk_ring_cons__peek
*/
rx->cached_cons -= nb_pkts;
return 0;
}
for (i = 0; i < nb_pkts; i++) {
const struct xdp_desc *desc;
uint64_t addr;
uint32_t len;
@ -366,18 +368,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
bufs[i] = mbufs[i];
}
xsk_ring_cons__release(rx, rcvd);
xsk_ring_cons__release(rx, nb_pkts);
/* statistics */
rxq->stats.rx_pkts += rcvd;
rxq->stats.rx_pkts += nb_pkts;
rxq->stats.rx_bytes += rx_bytes;
out:
if (rcvd != nb_pkts)
rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd],
nb_pkts - rcvd);
return rcvd;
return nb_pkts;
}
#endif