epair: Simplify the transmit path and address lost wakeups
epairs currently shuttle all transmitted packets through a single global taskqueue thread. To hand packets over to the taskqueue thread, each epair maintains a pair of ring buffers and a lockless scheme for notifying the thread of pending work. The implementation can lead to lost wakeups, causing to-be-transmitted packets to end up stuck in the queue. Rather than extending the existing scheme, simply replace it with a linked list protected by a mutex, and use the mutex to synchronize wakeups of the taskqueue thread. This appears to give equivalent or better throughput with >= 16 producer threads and eliminates the lost wakeups. Reviewed by: kp MFC after: 1 week Sponsored by: Klara, Inc. Sponsored by: Modirum MDPay Differential Revision: https://reviews.freebsd.org/D38843
This commit is contained in:
parent
ce9bc02975
commit
5d36c4fd28
@ -101,15 +101,16 @@ static unsigned int next_index = 0;
|
|||||||
#define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx)
|
#define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx)
|
||||||
#define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx)
|
#define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx)
|
||||||
|
|
||||||
#define BIT_QUEUE_TASK 0
|
|
||||||
#define BIT_MBUF_QUEUED 1
|
|
||||||
|
|
||||||
struct epair_softc;
|
struct epair_softc;
|
||||||
struct epair_queue {
|
struct epair_queue {
|
||||||
|
struct mtx mtx;
|
||||||
|
struct mbufq q;
|
||||||
int id;
|
int id;
|
||||||
struct buf_ring *rxring[2];
|
enum {
|
||||||
volatile int ridx; /* 0 || 1 */
|
EPAIR_QUEUE_IDLE,
|
||||||
volatile long state; /* taskqueue coordination */
|
EPAIR_QUEUE_WAKING,
|
||||||
|
EPAIR_QUEUE_RUNNING,
|
||||||
|
} state;
|
||||||
struct task tx_task;
|
struct task tx_task;
|
||||||
struct epair_softc *sc;
|
struct epair_softc *sc;
|
||||||
};
|
};
|
||||||
@ -144,45 +145,50 @@ epair_clear_mbuf(struct mbuf *m)
|
|||||||
m_tag_delete_nonpersistent(m);
|
m_tag_delete_nonpersistent(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
epair_if_input(struct epair_softc *sc, struct epair_queue *q, int ridx)
|
|
||||||
{
|
|
||||||
struct ifnet *ifp;
|
|
||||||
struct mbuf *m;
|
|
||||||
|
|
||||||
ifp = sc->ifp;
|
|
||||||
CURVNET_SET(ifp->if_vnet);
|
|
||||||
while (! buf_ring_empty(q->rxring[ridx])) {
|
|
||||||
m = buf_ring_dequeue_mc(q->rxring[ridx]);
|
|
||||||
if (m == NULL)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
|
|
||||||
(*ifp->if_input)(ifp, m);
|
|
||||||
|
|
||||||
}
|
|
||||||
CURVNET_RESTORE();
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
epair_tx_start_deferred(void *arg, int pending)
|
epair_tx_start_deferred(void *arg, int pending)
|
||||||
{
|
{
|
||||||
struct epair_queue *q = (struct epair_queue *)arg;
|
struct epair_queue *q = (struct epair_queue *)arg;
|
||||||
struct epair_softc *sc = q->sc;
|
if_t ifp;
|
||||||
int ridx, nidx;
|
struct mbuf *m, *n;
|
||||||
|
bool resched;
|
||||||
|
|
||||||
if_ref(sc->ifp);
|
ifp = q->sc->ifp;
|
||||||
ridx = atomic_load_int(&q->ridx);
|
|
||||||
do {
|
|
||||||
nidx = (ridx == 0) ? 1 : 0;
|
|
||||||
} while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx));
|
|
||||||
epair_if_input(sc, q, ridx);
|
|
||||||
|
|
||||||
atomic_clear_long(&q->state, (1 << BIT_QUEUE_TASK));
|
if_ref(ifp);
|
||||||
if (atomic_testandclear_long(&q->state, BIT_MBUF_QUEUED))
|
CURVNET_SET(ifp->if_vnet);
|
||||||
|
|
||||||
|
mtx_lock(&q->mtx);
|
||||||
|
m = mbufq_flush(&q->q);
|
||||||
|
q->state = EPAIR_QUEUE_RUNNING;
|
||||||
|
mtx_unlock(&q->mtx);
|
||||||
|
|
||||||
|
while (m != NULL) {
|
||||||
|
n = STAILQ_NEXT(m, m_stailqpkt);
|
||||||
|
m->m_nextpkt = NULL;
|
||||||
|
if_input(ifp, m);
|
||||||
|
m = n;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Avoid flushing the queue more than once per task. We can otherwise
|
||||||
|
* end up starving ourselves in a multi-epair routing configuration.
|
||||||
|
*/
|
||||||
|
mtx_lock(&q->mtx);
|
||||||
|
if (mbufq_len(&q->q) > 0) {
|
||||||
|
resched = true;
|
||||||
|
q->state = EPAIR_QUEUE_WAKING;
|
||||||
|
} else {
|
||||||
|
resched = false;
|
||||||
|
q->state = EPAIR_QUEUE_IDLE;
|
||||||
|
}
|
||||||
|
mtx_unlock(&q->mtx);
|
||||||
|
|
||||||
|
if (resched)
|
||||||
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
|
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
|
||||||
|
|
||||||
if_rele(sc->ifp);
|
CURVNET_RESTORE();
|
||||||
|
if_rele(ifp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct epair_queue *
|
static struct epair_queue *
|
||||||
@ -236,9 +242,9 @@ epair_prepare_mbuf(struct mbuf *m, struct ifnet *src_ifp)
|
|||||||
static void
|
static void
|
||||||
epair_menq(struct mbuf *m, struct epair_softc *osc)
|
epair_menq(struct mbuf *m, struct epair_softc *osc)
|
||||||
{
|
{
|
||||||
|
struct epair_queue *q;
|
||||||
struct ifnet *ifp, *oifp;
|
struct ifnet *ifp, *oifp;
|
||||||
int len, ret;
|
int error, len;
|
||||||
int ridx;
|
|
||||||
bool mcast;
|
bool mcast;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -254,32 +260,26 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
|
|||||||
len = m->m_pkthdr.len;
|
len = m->m_pkthdr.len;
|
||||||
mcast = (m->m_flags & (M_BCAST | M_MCAST)) != 0;
|
mcast = (m->m_flags & (M_BCAST | M_MCAST)) != 0;
|
||||||
|
|
||||||
struct epair_queue *q = epair_select_queue(osc, m);
|
q = epair_select_queue(osc, m);
|
||||||
|
|
||||||
atomic_set_long(&q->state, (1 << BIT_MBUF_QUEUED));
|
mtx_lock(&q->mtx);
|
||||||
ridx = atomic_load_int(&q->ridx);
|
if (q->state == EPAIR_QUEUE_IDLE) {
|
||||||
ret = buf_ring_enqueue(q->rxring[ridx], m);
|
q->state = EPAIR_QUEUE_WAKING;
|
||||||
if (ret != 0) {
|
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
|
||||||
/* Ring is full. */
|
|
||||||
if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
|
|
||||||
m_freem(m);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
error = mbufq_enqueue(&q->q, m);
|
||||||
|
mtx_unlock(&q->mtx);
|
||||||
|
|
||||||
|
if (error != 0) {
|
||||||
|
m_freem(m);
|
||||||
|
if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
|
||||||
|
} else {
|
||||||
if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
|
if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
|
||||||
/*
|
|
||||||
* IFQ_HANDOFF_ADJ/ip_handoff() update statistics,
|
|
||||||
* but as we bypass all this we have to duplicate
|
|
||||||
* the logic another time.
|
|
||||||
*/
|
|
||||||
if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
|
if_inc_counter(ifp, IFCOUNTER_OBYTES, len);
|
||||||
if (mcast)
|
if (mcast)
|
||||||
if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
|
if_inc_counter(ifp, IFCOUNTER_OMCASTS, 1);
|
||||||
/* Someone else received the packet. */
|
|
||||||
if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
|
if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
|
||||||
|
}
|
||||||
if (!atomic_testandset_long(&q->state, BIT_QUEUE_TASK))
|
|
||||||
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -514,10 +514,9 @@ epair_alloc_sc(struct if_clone *ifc)
|
|||||||
for (int i = 0; i < sc->num_queues; i++) {
|
for (int i = 0; i < sc->num_queues; i++) {
|
||||||
struct epair_queue *q = &sc->queues[i];
|
struct epair_queue *q = &sc->queues[i];
|
||||||
q->id = i;
|
q->id = i;
|
||||||
q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
|
q->state = EPAIR_QUEUE_IDLE;
|
||||||
q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
|
mtx_init(&q->mtx, "epairq", NULL, MTX_DEF | MTX_NEW);
|
||||||
q->ridx = 0;
|
mbufq_init(&q->q, RXRSIZE);
|
||||||
q->state = 0;
|
|
||||||
q->sc = sc;
|
q->sc = sc;
|
||||||
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
|
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
|
||||||
}
|
}
|
||||||
@ -610,8 +609,7 @@ epair_free_sc(struct epair_softc *sc)
|
|||||||
ifmedia_removeall(&sc->media);
|
ifmedia_removeall(&sc->media);
|
||||||
for (int i = 0; i < sc->num_queues; i++) {
|
for (int i = 0; i < sc->num_queues; i++) {
|
||||||
struct epair_queue *q = &sc->queues[i];
|
struct epair_queue *q = &sc->queues[i];
|
||||||
buf_ring_free(q->rxring[0], M_EPAIR);
|
mtx_destroy(&q->mtx);
|
||||||
buf_ring_free(q->rxring[1], M_EPAIR);
|
|
||||||
}
|
}
|
||||||
free(sc->queues, M_EPAIR);
|
free(sc->queues, M_EPAIR);
|
||||||
free(sc, M_EPAIR);
|
free(sc, M_EPAIR);
|
||||||
@ -756,18 +754,18 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len,
|
|||||||
static void
|
static void
|
||||||
epair_drain_rings(struct epair_softc *sc)
|
epair_drain_rings(struct epair_softc *sc)
|
||||||
{
|
{
|
||||||
int ridx;
|
|
||||||
struct mbuf *m;
|
|
||||||
|
|
||||||
for (ridx = 0; ridx < 2; ridx++) {
|
|
||||||
for (int i = 0; i < sc->num_queues; i++) {
|
for (int i = 0; i < sc->num_queues; i++) {
|
||||||
struct epair_queue *q = &sc->queues[i];
|
struct epair_queue *q;
|
||||||
do {
|
struct mbuf *m, *n;
|
||||||
m = buf_ring_dequeue_sc(q->rxring[ridx]);
|
|
||||||
if (m == NULL)
|
q = &sc->queues[i];
|
||||||
break;
|
mtx_lock(&q->mtx);
|
||||||
|
m = mbufq_flush(&q->q);
|
||||||
|
mtx_unlock(&q->mtx);
|
||||||
|
|
||||||
|
for (; m != NULL; m = n) {
|
||||||
|
n = m->m_nextpkt;
|
||||||
m_freem(m);
|
m_freem(m);
|
||||||
} while (1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user