if_epair: fix race condition on multi-core systems

As an unwanted side effect of the performance improvements in
24f0bfbad5, epair interfaces stop forwarding traffic on higher
load levels when running on multi-core systems.

This happens due to a race condition in the logic that decides when to
place work in the task queue(s) responsible for processing the content
of ring buffers.

In order to fix this, a field named state is added to the epair_queue
structure. This field is used by the affected functions to signal each
other that something happened in the underlying ring buffers that might
require work to be scheduled in task queue(s), replacing the existing
logic, which relied on checking if ring buffers are empty or not.

epair_menq() does:
  - set BIT_MBUF_QUEUED
  - queue mbuf
  - if testandset BIT_QUEUE_TASK:
      enqueue task

epair_tx_start_deferred() does:
  - swap ring buffers
  - process mbufs
  - clear BIT_QUEUE_TASK
  - if testandclear BIT_MBUF_QUEUED
      enqueue task

PR:		262571
Reported by:	Johan Hendriks <joh.hendriks@gmail.com>
MFC after:	3 days
Differential Revision:	https://reviews.freebsd.org/D34569
This commit is contained in:
Michael Gmelin 2022-03-16 23:08:55 +01:00 committed by Kristof Provost
parent ba46c6c4b7
commit 66acf7685b

View File

@ -104,11 +104,15 @@ static unsigned int next_index = 0;
#define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx)
#define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx)
#define BIT_QUEUE_TASK 0
#define BIT_MBUF_QUEUED 1
struct epair_softc;
struct epair_queue {
int id;
struct buf_ring *rxring[2];
volatile int ridx; /* 0 || 1 */
volatile int state; /* taskqueue coordination */
struct task tx_task;
struct epair_softc *sc;
};
@ -177,7 +181,8 @@ epair_tx_start_deferred(void *arg, int pending)
} while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx));
epair_if_input(sc, q, ridx);
if (! buf_ring_empty(q->rxring[nidx]))
atomic_clear_int(&q->state, (1 << BIT_QUEUE_TASK));
if (atomic_testandclear_int(&q->state, BIT_MBUF_QUEUED))
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
if_rele(sc->ifp);
@ -192,7 +197,6 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
short mflags;
struct epair_queue *q = NULL;
uint32_t bucket;
bool was_empty;
#ifdef RSS
struct ether_header *eh;
#endif
@ -244,14 +248,14 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
#endif
q = &osc->queues[bucket];
atomic_set_int(&q->state, (1 << BIT_MBUF_QUEUED));
ridx = atomic_load_int(&q->ridx);
was_empty = buf_ring_empty(q->rxring[ridx]);
ret = buf_ring_enqueue(q->rxring[ridx], m);
if (ret != 0) {
/* Ring is full. */
if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
m_freem(m);
goto done;
return (0);
}
if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
@ -266,8 +270,7 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
/* Someone else received the packet. */
if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
done:
if (was_empty)
if (!atomic_testandset_int(&q->state, BIT_QUEUE_TASK))
taskqueue_enqueue(epair_tasks.tq[bucket], &q->tx_task);
return (0);
@ -552,6 +555,7 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->ridx = 0;
q->state = 0;
q->sc = sca;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}
@ -574,6 +578,7 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->ridx = 0;
q->state = 0;
q->sc = scb;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}