From 87eda68957aaec25a51dcbea8eca9070c2302976 Mon Sep 17 00:00:00 2001 From: Cristian Dumitrescu Date: Fri, 5 Aug 2022 22:00:25 +0000 Subject: [PATCH] port: rework ring output port behavior to non-blocking Drop packets that cannot be sent instead of retry sending the same packets potentially forever when the ring consumer that is down. Signed-off-by: Cristian Dumitrescu --- lib/port/rte_swx_port_ring.c | 93 ++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 30 deletions(-) diff --git a/lib/port/rte_swx_port_ring.c b/lib/port/rte_swx_port_ring.c index c726e50a57..0ae23de8e8 100644 --- a/lib/port/rte_swx_port_ring.c +++ b/lib/port/rte_swx_port_ring.c @@ -173,6 +173,7 @@ struct writer { struct rte_mbuf **pkts; int n_pkts; + uint32_t n_bytes; }; static void * @@ -219,31 +220,55 @@ writer_create(void *args) return NULL; } -static void +static inline void __writer_flush(struct writer *p) { - int n_pkts; + struct rte_mbuf **pkts = p->pkts; + uint64_t n_pkts_total = p->stats.n_pkts; + uint64_t n_bytes_total = p->stats.n_bytes; + uint64_t n_pkts_drop_total = p->stats.n_pkts_drop; + uint64_t n_bytes_drop_total = p->stats.n_bytes_drop; + int n_pkts = p->n_pkts, n_pkts_drop, n_pkts_tx; + uint32_t n_bytes = p->n_bytes, n_bytes_drop = 0; - for (n_pkts = 0; ; ) { - n_pkts += rte_ring_sp_enqueue_burst(p->params.ring, - (void **)p->pkts + n_pkts, - p->n_pkts - n_pkts, - NULL); + /* Packet TX. */ + n_pkts_tx = rte_ring_sp_enqueue_burst(p->params.ring, + (void **)pkts, + n_pkts, + NULL); - TRACE("[Ring %s] %d packets out\n", p->params.name, n_pkts); + /* Packet drop. */ + n_pkts_drop = n_pkts - n_pkts_tx; - if (n_pkts == p->n_pkts) - break; + for ( ; n_pkts_tx < n_pkts; n_pkts_tx++) { + struct rte_mbuf *m = pkts[n_pkts_tx]; + + n_bytes_drop += m->pkt_len; + rte_pktmbuf_free(m); } + /* Port update. */ + p->stats.n_pkts = n_pkts_total + n_pkts - n_pkts_drop; + p->stats.n_bytes = n_bytes_total + n_bytes - n_bytes_drop; + p->stats.n_pkts_drop = n_pkts_drop_total + n_pkts_drop; + p->stats.n_bytes_drop = n_bytes_drop_total + n_bytes_drop; p->n_pkts = 0; + p->n_bytes = 0; + + TRACE("[Ring %s] Buffered packets flushed: %d out, %d dropped\n", + p->params.name, + n_pkts - n_pkts_drop, + n_pkts_drop); } static void writer_pkt_tx(void *port, struct rte_swx_pkt *pkt) { struct writer *p = port; + int n_pkts = p->n_pkts; + uint32_t n_bytes = p->n_bytes; struct rte_mbuf *m = pkt->handle; + uint32_t pkt_length = pkt->length; TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n", p->params.name, @@ -253,15 +278,15 @@ writer_pkt_tx(void *port, struct rte_swx_pkt *pkt) if (TRACE_LEVEL) rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length); - m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len); - m->pkt_len = pkt->length; + m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len); + m->pkt_len = pkt_length; m->data_off = (uint16_t)pkt->offset; - p->stats.n_pkts++; - p->stats.n_bytes += pkt->length; + p->pkts[n_pkts++] = m; + p->n_pkts = n_pkts; + p->n_bytes = n_bytes + pkt_length; - p->pkts[p->n_pkts++] = m; - if (p->n_pkts == (int)p->params.burst_size) + if (n_pkts == (int)p->params.burst_size) __writer_flush(p); } @@ -269,7 +294,11 @@ static void writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt) { struct writer *p = port; + int n_pkts = p->n_pkts; + uint32_t n_bytes = p->n_bytes; + uint64_t n_pkts_clone = p->stats.n_pkts_clone; struct rte_mbuf *m = pkt->handle; + uint32_t pkt_length = pkt->length; TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (fast clone)\n", p->params.name, @@ -279,17 +308,17 @@ writer_pkt_fast_clone_tx(void *port, struct rte_swx_pkt *pkt) if (TRACE_LEVEL) rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length); - m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len); - m->pkt_len = pkt->length; + m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len); + m->pkt_len = pkt_length; m->data_off = (uint16_t)pkt->offset; rte_pktmbuf_refcnt_update(m, 1); - p->stats.n_pkts++; - p->stats.n_bytes += pkt->length; - p->stats.n_pkts_clone++; + p->pkts[n_pkts++] = m; + p->n_pkts = n_pkts; + p->n_bytes = n_bytes + pkt_length; + p->stats.n_pkts_clone = n_pkts_clone + 1; - p->pkts[p->n_pkts++] = m; - if (p->n_pkts == (int)p->params.burst_size) + if (n_pkts == (int)p->params.burst_size) __writer_flush(p); } @@ -297,7 +326,11 @@ static void writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_length) { struct writer *p = port; + int n_pkts = p->n_pkts; + uint32_t n_bytes = p->n_bytes; + uint64_t n_pkts_clone = p->stats.n_pkts_clone; struct rte_mbuf *m = pkt->handle, *m_clone; + uint32_t pkt_length = pkt->length; TRACE("[Ring %s] Pkt %d (%u bytes at offset %u) (clone)\n", p->params.name, @@ -307,8 +340,8 @@ writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_len if (TRACE_LEVEL) rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length); - m->data_len = (uint16_t)(pkt->length + m->data_len - m->pkt_len); - m->pkt_len = pkt->length; + m->data_len = (uint16_t)(pkt_length + m->data_len - m->pkt_len); + m->pkt_len = pkt_length; m->data_off = (uint16_t)pkt->offset; m_clone = rte_pktmbuf_copy(m, m->pool, 0, truncation_length); @@ -317,12 +350,12 @@ writer_pkt_clone_tx(void *port, struct rte_swx_pkt *pkt, uint32_t truncation_len return; } - p->stats.n_pkts++; - p->stats.n_bytes += pkt->length; - p->stats.n_pkts_clone++; + p->pkts[n_pkts++] = m_clone; + p->n_pkts = n_pkts; + p->n_bytes = n_bytes + pkt_length; + p->stats.n_pkts_clone = n_pkts_clone + 1; - p->pkts[p->n_pkts++] = m_clone; - if (p->n_pkts == (int)p->params.burst_size) + if (n_pkts == (int)p->params.burst_size) __writer_flush(p); }