if_epair: implement fanout

Allow multiple cores to be used to process if_epair traffic. We do this
(if RSS is enabled) based on the RSS hash of the incoming packet. This
allows us to distribute the load over multiple cores, rather than
sending everything to the same one.

We also switch from swi_sched() to taskqueues, which also contributes to
better throughput.

Benchmark results:
With net.isr.maxthreads=-1

Setup A: (cc0 - bridge0 - epair0a) (epair0b - bridge1 - cc1)

Before          627 Kpps
After (no RSS)  1.198 Mpps
After (RSS)     3.148 Mpps

Setup B: (cc0 - bridge0 - epaira0) (epair0b - vnet jail - epair1a) (epair1b - bridge1 - cc1)

Before          7.705 Kpps
After (no RSS)  1.017 Mpps
After (RSS)     2.083 Mpps

MFC after:	3 weeks
Sponsored by:	Orange Business Services
Differential Revision:	https://reviews.freebsd.org/D33731
This commit is contained in:
Kristof Provost 2021-12-09 14:24:13 +01:00
parent 31566b98b8
commit 24f0bfbad5

View File

@ -40,6 +40,8 @@
#include <sys/cdefs.h>
__FBSDID("$FreeBSD$");
#include "opt_rss.h"
#include <sys/param.h>
#include <sys/hash.h>
#include <sys/jail.h>
@ -50,10 +52,11 @@ __FBSDID("$FreeBSD$");
#include <sys/module.h>
#include <sys/proc.h>
#include <sys/queue.h>
#include <sys/sched.h>
#include <sys/smp.h>
#include <sys/socket.h>
#include <sys/sockio.h>
#include <sys/sysctl.h>
#include <sys/taskqueue.h>
#include <sys/types.h>
#include <sys/buf_ring.h>
#include <sys/bus.h>
@ -68,6 +71,11 @@ __FBSDID("$FreeBSD$");
#include <net/if_var.h>
#include <net/if_types.h>
#include <net/netisr.h>
#ifdef RSS
#include <net/rss_config.h>
#include <netinet/in_rss.h>
#include <netinet6/in6_rss.h>
#endif
#include <net/vnet.h>
static int epair_clone_match(struct if_clone *, const char *);
@ -90,21 +98,32 @@ static unsigned int next_index = 0;
#define EPAIR_LOCK() mtx_lock(&epair_n_index_mtx)
#define EPAIR_UNLOCK() mtx_unlock(&epair_n_index_mtx)
static void *swi_cookie[MAXCPU]; /* swi(9). */
static STAILQ_HEAD(, epair_softc) swi_sc[MAXCPU];
struct epair_softc;
struct epair_queue {
int id;
struct buf_ring *rxring[2];
volatile int ridx; /* 0 || 1 */
struct task tx_task;
struct epair_softc *sc;
};
static struct mtx epair_n_index_mtx;
struct epair_softc {
struct ifnet *ifp; /* This ifp. */
struct ifnet *oifp; /* other ifp of pair. */
void *swi_cookie; /* swi(9). */
struct buf_ring *rxring[2];
volatile int ridx; /* 0 || 1 */
struct ifmedia media; /* Media config (fake). */
uint32_t cpuidx;
struct ifnet *ifp; /* This ifp. */
struct ifnet *oifp; /* other ifp of pair. */
int num_queues;
struct epair_queue *queues;
struct ifmedia media; /* Media config (fake). */
STAILQ_ENTRY(epair_softc) entry;
};
struct epair_tasks_t {
int tasks;
struct taskqueue *tq[MAXCPU];
};
static struct epair_tasks_t epair_tasks;
static void
epair_clear_mbuf(struct mbuf *m)
{
@ -119,61 +138,45 @@ epair_clear_mbuf(struct mbuf *m)
}
static void
epair_if_input(struct epair_softc *sc, int ridx)
epair_if_input(struct epair_softc *sc, struct epair_queue *q, int ridx)
{
struct epoch_tracker et;
struct ifnet *ifp;
struct mbuf *m;
ifp = sc->ifp;
NET_EPOCH_ENTER(et);
do {
m = buf_ring_dequeue_sc(sc->rxring[ridx]);
CURVNET_SET(ifp->if_vnet);
while (! buf_ring_empty(q->rxring[ridx])) {
m = buf_ring_dequeue_mc(q->rxring[ridx]);
if (m == NULL)
break;
continue;
MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
(*ifp->if_input)(ifp, m);
} while (1);
NET_EPOCH_EXIT(et);
}
CURVNET_RESTORE();
}
static void
epair_sintr(struct epair_softc *sc)
epair_tx_start_deferred(void *arg, int pending)
{
struct epair_queue *q = (struct epair_queue *)arg;
struct epair_softc *sc = q->sc;
int ridx, nidx;
if_ref(sc->ifp);
ridx = atomic_load_int(&q->ridx);
do {
ridx = sc->ridx;
nidx = (ridx == 0) ? 1 : 0;
} while (!atomic_cmpset_int(&sc->ridx, ridx, nidx));
epair_if_input(sc, ridx);
} while (!atomic_fcmpset_int(&q->ridx, &ridx, nidx));
epair_if_input(sc, q, ridx);
if (! buf_ring_empty(q->rxring[nidx]))
taskqueue_enqueue(epair_tasks.tq[q->id], &q->tx_task);
if_rele(sc->ifp);
}
static void
epair_intr(void *arg)
{
struct epair_softc *sc;
uint32_t cpuidx;
cpuidx = (uintptr_t)arg;
/* If this is a problem, this is a read-mostly situation. */
EPAIR_LOCK();
STAILQ_FOREACH(sc, &swi_sc[cpuidx], entry) {
/* Do this lockless. */
if (buf_ring_empty(sc->rxring[sc->ridx]))
continue;
epair_sintr(sc);
}
EPAIR_UNLOCK();
return;
}
static int
epair_menq(struct mbuf *m, struct epair_softc *osc)
{
@ -181,7 +184,12 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
int len, ret;
int ridx;
short mflags;
struct epair_queue *q = NULL;
uint32_t bucket;
bool was_empty;
#ifdef RSS
struct ether_header *eh;
#endif
/*
* I know this looks weird. We pass the "other sc" as we need that one
@ -202,13 +210,38 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
MPASS(m->m_nextpkt == NULL);
MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
ridx = atomic_load_int(&osc->ridx);
was_empty = buf_ring_empty(osc->rxring[ridx]);
ret = buf_ring_enqueue(osc->rxring[ridx], m);
#ifdef RSS
ret = rss_m2bucket(m, &bucket);
if (ret) {
/* Actually hash the packet. */
eh = mtod(m, struct ether_header *);
switch (ntohs(eh->ether_type)) {
case ETHERTYPE_IP:
rss_soft_m2cpuid_v4(m, 0, &bucket);
break;
case ETHERTYPE_IPV6:
rss_soft_m2cpuid_v6(m, 0, &bucket);
break;
default:
bucket = 0;
break;
}
}
bucket %= osc->num_queues;
#else
bucket = 0;
#endif
q = &osc->queues[bucket];
ridx = atomic_load_int(&q->ridx);
was_empty = buf_ring_empty(q->rxring[ridx]);
ret = buf_ring_enqueue(q->rxring[ridx], m);
if (ret != 0) {
/* Ring is full. */
if_inc_counter(ifp, IFCOUNTER_OQDROPS, 1);
m_freem(m);
return (0);
goto done;
}
if_inc_counter(ifp, IFCOUNTER_OPACKETS, 1);
@ -223,9 +256,9 @@ epair_menq(struct mbuf *m, struct epair_softc *osc)
/* Someone else received the packet. */
if_inc_counter(oifp, IFCOUNTER_IPACKETS, 1);
/* Kick the interrupt handler for the first packet. */
if (was_empty && osc->swi_cookie != NULL)
swi_sched(osc->swi_cookie, 0);
done:
if (was_empty)
taskqueue_enqueue(epair_tasks.tq[bucket], &q->tx_task);
return (0);
}
@ -495,16 +528,27 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
/* Allocate memory for both [ab] interfaces */
sca = malloc(sizeof(struct epair_softc), M_EPAIR, M_WAITOK | M_ZERO);
sca->ifp = if_alloc(IFT_ETHER);
sca->num_queues = epair_tasks.tasks;
if (sca->ifp == NULL) {
free(sca, M_EPAIR);
ifc_free_unit(ifc, unit);
return (ENOSPC);
}
sca->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK,NULL);
sca->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
sca->queues = mallocarray(sca->num_queues, sizeof(struct epair_queue),
M_EPAIR, M_WAITOK);
for (int i = 0; i < sca->num_queues; i++) {
struct epair_queue *q = &sca->queues[i];
q->id = i;
q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->ridx = 0;
q->sc = sca;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}
scb = malloc(sizeof(struct epair_softc), M_EPAIR, M_WAITOK | M_ZERO);
scb->ifp = if_alloc(IFT_ETHER);
scb->num_queues = epair_tasks.tasks;
if (scb->ifp == NULL) {
free(scb, M_EPAIR);
if_free(sca->ifp);
@ -512,8 +556,17 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
ifc_free_unit(ifc, unit);
return (ENOSPC);
}
scb->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
scb->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
scb->queues = mallocarray(scb->num_queues, sizeof(struct epair_queue),
M_EPAIR, M_WAITOK);
for (int i = 0; i < scb->num_queues; i++) {
struct epair_queue *q = &scb->queues[i];
q->id = i;
q->rxring[0] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->rxring[1] = buf_ring_alloc(RXRSIZE, M_EPAIR, M_WAITOK, NULL);
q->ridx = 0;
q->sc = scb;
NET_TASK_INIT(&q->tx_task, 0, epair_tx_start_deferred, q);
}
/*
* Cross-reference the interfaces so we will be able to free both.
@ -528,41 +581,6 @@ epair_clone_create(struct if_clone *ifc, char *name, size_t len, caddr_t params)
#else
hash = 0;
#endif
if (swi_cookie[hash] == NULL) {
void *cookie;
EPAIR_UNLOCK();
error = swi_add(NULL, epairname,
epair_intr, (void *)(uintptr_t)hash,
SWI_NET, INTR_MPSAFE, &cookie);
if (error) {
buf_ring_free(scb->rxring[0], M_EPAIR);
buf_ring_free(scb->rxring[1], M_EPAIR);
if_free(scb->ifp);
free(scb, M_EPAIR);
buf_ring_free(sca->rxring[0], M_EPAIR);
buf_ring_free(sca->rxring[1], M_EPAIR);
if_free(sca->ifp);
free(sca, M_EPAIR);
ifc_free_unit(ifc, unit);
return (ENOSPC);
}
EPAIR_LOCK();
/* Recheck under lock even though a race is very unlikely. */
if (swi_cookie[hash] == NULL) {
swi_cookie[hash] = cookie;
} else {
EPAIR_UNLOCK();
(void) swi_remove(cookie);
EPAIR_LOCK();
}
}
sca->cpuidx = hash;
STAILQ_INSERT_TAIL(&swi_sc[hash], sca, entry);
sca->swi_cookie = swi_cookie[hash];
scb->cpuidx = hash;
STAILQ_INSERT_TAIL(&swi_sc[hash], scb, entry);
scb->swi_cookie = swi_cookie[hash];
EPAIR_UNLOCK();
/* Initialise pseudo media types. */
@ -669,12 +687,15 @@ epair_drain_rings(struct epair_softc *sc)
struct mbuf *m;
for (ridx = 0; ridx < 2; ridx++) {
do {
m = buf_ring_dequeue_sc(sc->rxring[ridx]);
if (m == NULL)
break;
m_freem(m);
} while (1);
for (int i = 0; i < sc->num_queues; i++) {
struct epair_queue *q = &sc->queues[i];
do {
m = buf_ring_dequeue_sc(q->rxring[ridx]);
if (m == NULL)
break;
m_freem(m);
} while (1);
}
}
}
@ -707,14 +728,6 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
ether_ifdetach(ifp);
ether_ifdetach(oifp);
/* Second stop interrupt handler. */
EPAIR_LOCK();
STAILQ_REMOVE(&swi_sc[sca->cpuidx], sca, epair_softc, entry);
STAILQ_REMOVE(&swi_sc[scb->cpuidx], scb, epair_softc, entry);
EPAIR_UNLOCK();
sca->swi_cookie = NULL;
scb->swi_cookie = NULL;
/* Third free any queued packets and all the resources. */
CURVNET_SET_QUIET(oifp->if_vnet);
epair_drain_rings(scb);
@ -725,16 +738,24 @@ epair_clone_destroy(struct if_clone *ifc, struct ifnet *ifp)
__func__, error);
if_free(oifp);
ifmedia_removeall(&scb->media);
buf_ring_free(scb->rxring[0], M_EPAIR);
buf_ring_free(scb->rxring[1], M_EPAIR);
for (int i = 0; i < scb->num_queues; i++) {
struct epair_queue *q = &scb->queues[i];
buf_ring_free(q->rxring[0], M_EPAIR);
buf_ring_free(q->rxring[1], M_EPAIR);
}
free(scb->queues, M_EPAIR);
free(scb, M_EPAIR);
CURVNET_RESTORE();
epair_drain_rings(sca);
if_free(ifp);
ifmedia_removeall(&sca->media);
buf_ring_free(sca->rxring[0], M_EPAIR);
buf_ring_free(sca->rxring[1], M_EPAIR);
for (int i = 0; i < sca->num_queues; i++) {
struct epair_queue *q = &sca->queues[i];
buf_ring_free(q->rxring[0], M_EPAIR);
buf_ring_free(q->rxring[1], M_EPAIR);
}
free(sca->queues, M_EPAIR);
free(sca, M_EPAIR);
/* Last free the cloner unit. */
@ -762,34 +783,76 @@ vnet_epair_uninit(const void *unused __unused)
VNET_SYSUNINIT(vnet_epair_uninit, SI_SUB_INIT_IF, SI_ORDER_ANY,
vnet_epair_uninit, NULL);
static int
epair_mod_init()
{
char name[32];
epair_tasks.tasks = 0;
#ifdef RSS
struct pcpu *pcpu;
int cpu;
CPU_FOREACH(cpu) {
cpuset_t cpu_mask;
/* Pin to this CPU so we get appropriate NUMA allocations. */
pcpu = pcpu_find(cpu);
thread_lock(curthread);
sched_bind(curthread, cpu);
thread_unlock(curthread);
snprintf(name, sizeof(name), "epair_task_%d", cpu);
epair_tasks.tq[cpu] = taskqueue_create(name, M_WAITOK,
taskqueue_thread_enqueue,
&epair_tasks.tq[cpu]);
CPU_SETOF(cpu, &cpu_mask);
taskqueue_start_threads_cpuset(&epair_tasks.tq[cpu], 1, PI_NET,
&cpu_mask, "%s", name);
epair_tasks.tasks++;
}
#else
snprintf(name, sizeof(name), "epair_task");
epair_tasks.tq[0] = taskqueue_create(name, M_WAITOK,
taskqueue_thread_enqueue,
&epair_tasks.tq[0]);
taskqueue_start_threads(&epair_tasks.tq[0], 1, PI_NET, "%s", name);
epair_tasks.tasks = 1;
#endif
return (0);
}
static void
epair_mod_cleanup()
{
for (int i = 0; i < epair_tasks.tasks; i++) {
taskqueue_drain_all(epair_tasks.tq[i]);
taskqueue_free(epair_tasks.tq[i]);
}
}
static int
epair_modevent(module_t mod, int type, void *data)
{
int i;
int ret;
switch (type) {
case MOD_LOAD:
for (i = 0; i < MAXCPU; i++) {
swi_cookie[i] = NULL;
STAILQ_INIT(&swi_sc[i]);
}
EPAIR_LOCK_INIT();
ret = epair_mod_init();
if (ret != 0)
return (ret);
if (bootverbose)
printf("%s: %s initialized.\n", __func__, epairname);
break;
case MOD_UNLOAD:
EPAIR_LOCK();
for (i = 0; i < MAXCPU; i++) {
if (!STAILQ_EMPTY(&swi_sc[i])) {
printf("%s: swi_sc[%d] active\n", __func__, i);
EPAIR_UNLOCK();
return (EBUSY);
}
}
EPAIR_UNLOCK();
for (i = 0; i < MAXCPU; i++)
if (swi_cookie[i] != NULL)
(void) swi_remove(swi_cookie[i]);
epair_mod_cleanup();
EPAIR_LOCK_DESTROY();
if (bootverbose)
printf("%s: %s unloaded.\n", __func__, epairname);