cxgbei: Dispatch sent PDUs to the NIC asynchronously.

Previously the driver was called to send PDUs to the NIC synchronously
from the icl_conn_pdu_queue_cb callback.  However, this performed a
fair bit of work while holding the icl connection lock.  Instead,
change the callback to add sent PDUs to a STAILQ and defer dispatching
of PDUs to the NIC to a helper thread similar to the scheme used in
the TCP iSCSI backend.

- Replace rx_flags int and the sole RXF_ACTIVE flag with a simple
  rx_active bool.

- Add a pool of transmit worker threads for cxgbei.

- Fix worker thread exit to depend on the wakeup in kthread_exit()
  to fix a race with module unload.

Reported by:	mav
Sponsored by:	Chelsio Communications
This commit is contained in:
John Baldwin 2022-02-07 16:20:06 -08:00
parent e85af89fa7
commit fd8f61d6e9
3 changed files with 268 additions and 128 deletions

View File

@ -95,8 +95,9 @@ __FBSDID("$FreeBSD$");
#include "cxgbei.h"
static int worker_thread_count;
static struct cxgbei_worker_thread_softc *cwt_softc;
static struct proc *cxgbei_proc;
static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
static void
read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
@ -585,17 +586,9 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
icl_cxgbei_new_pdu_set_conn(ip, ic);
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
if ((icc->rx_flags & RXF_ACTIVE) == 0) {
struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
mtx_lock(&cwt->cwt_lock);
icc->rx_flags |= RXF_ACTIVE;
TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
if (cwt->cwt_state == CWT_SLEEPING) {
cwt->cwt_state = CWT_RUNNING;
cv_signal(&cwt->cwt_cv);
}
mtx_unlock(&cwt->cwt_lock);
if (!icc->rx_active) {
icc->rx_active = true;
cwt_queue_for_rx(icc);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@ -836,17 +829,9 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
/* Enqueue the PDU to the received pdus queue. */
STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
if ((icc->rx_flags & RXF_ACTIVE) == 0) {
struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
mtx_lock(&cwt->cwt_lock);
icc->rx_flags |= RXF_ACTIVE;
TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
if (cwt->cwt_state == CWT_SLEEPING) {
cwt->cwt_state = CWT_RUNNING;
cv_signal(&cwt->cwt_cv);
}
mtx_unlock(&cwt->cwt_lock);
if (!icc->rx_active) {
icc->rx_active = true;
cwt_queue_for_rx(icc);
}
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
@ -944,9 +929,9 @@ static struct uld_info cxgbei_uld_info = {
};
static void
cwt_main(void *arg)
cwt_rx_main(void *arg)
{
struct cxgbei_worker_thread_softc *cwt = arg;
struct cxgbei_worker_thread *cwt = arg;
struct icl_cxgbei_conn *icc = NULL;
struct icl_conn *ic;
struct icl_pdu *ip;
@ -962,8 +947,8 @@ cwt_main(void *arg)
while (__predict_true(cwt->cwt_state != CWT_STOP)) {
cwt->cwt_state = CWT_RUNNING;
while ((icc = TAILQ_FIRST(&cwt->rx_head)) != NULL) {
TAILQ_REMOVE(&cwt->rx_head, icc, rx_link);
while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
mtx_unlock(&cwt->cwt_lock);
ic = &icc->ic;
@ -979,7 +964,7 @@ cwt_main(void *arg)
*/
parse_pdus(icc, sb);
}
MPASS(icc->rx_flags & RXF_ACTIVE);
MPASS(icc->rx_active);
if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
MPASS(STAILQ_EMPTY(&rx_pdus));
STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
@ -994,11 +979,16 @@ cwt_main(void *arg)
SOCKBUF_LOCK(sb);
MPASS(STAILQ_EMPTY(&rx_pdus));
}
MPASS(icc->rx_flags & RXF_ACTIVE);
MPASS(icc->rx_active);
if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
icc->rx_flags &= ~RXF_ACTIVE;
icc->rx_active = false;
SOCKBUF_UNLOCK(sb);
mtx_lock(&cwt->cwt_lock);
} else {
SOCKBUF_UNLOCK(sb);
/*
* More PDUs were received while we were busy
* handing over the previous batch to ICL.
@ -1006,13 +996,9 @@ cwt_main(void *arg)
* queue.
*/
mtx_lock(&cwt->cwt_lock);
TAILQ_INSERT_TAIL(&cwt->rx_head, icc,
TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
rx_link);
mtx_unlock(&cwt->cwt_lock);
}
SOCKBUF_UNLOCK(sb);
mtx_lock(&cwt->cwt_lock);
}
/* Inner loop doesn't check for CWT_STOP, do that first. */
@ -1022,84 +1008,121 @@ cwt_main(void *arg)
cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
}
MPASS(TAILQ_FIRST(&cwt->rx_head) == NULL);
mtx_assert(&cwt->cwt_lock, MA_OWNED);
cwt->cwt_state = CWT_STOPPED;
cv_signal(&cwt->cwt_cv);
MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
mtx_unlock(&cwt->cwt_lock);
kthread_exit();
}
static void
cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
{
struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
mtx_lock(&cwt->cwt_lock);
TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
if (cwt->cwt_state == CWT_SLEEPING) {
cwt->cwt_state = CWT_RUNNING;
cv_signal(&cwt->cwt_cv);
}
mtx_unlock(&cwt->cwt_lock);
}
void
cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
{
struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
mtx_lock(&cwt->cwt_lock);
TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
if (cwt->cwt_state == CWT_SLEEPING) {
cwt->cwt_state = CWT_RUNNING;
cv_signal(&cwt->cwt_cv);
}
mtx_unlock(&cwt->cwt_lock);
}
static int
start_worker_threads(void)
{
struct proc *cxgbei_proc;
int i, rc;
struct cxgbei_worker_thread_softc *cwt;
struct cxgbei_worker_thread *cwt;
worker_thread_count = min(mp_ncpus, 32);
cwt_softc = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
M_WAITOK | M_ZERO);
cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
M_WAITOK | M_ZERO);
MPASS(cxgbei_proc == NULL);
for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
i++, cwt++) {
mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
cv_init(&cwt->cwt_cv, "cwt cv");
TAILQ_INIT(&cwt->rx_head);
rc = kproc_kthread_add(cwt_main, cwt, &cxgbei_proc, NULL, 0, 0,
"cxgbei", "%d", i);
if (rc != 0) {
printf("cxgbei: failed to start thread #%d/%d (%d)\n",
i + 1, worker_thread_count, rc);
mtx_destroy(&cwt->cwt_lock);
cv_destroy(&cwt->cwt_cv);
bzero(cwt, sizeof(*cwt));
if (i == 0) {
free(cwt_softc, M_CXGBE);
worker_thread_count = 0;
return (rc);
}
/* Not fatal, carry on with fewer threads. */
worker_thread_count = i;
rc = 0;
break;
}
/* Wait for thread to start before moving on to the next one. */
mtx_lock(&cwt->cwt_lock);
while (cwt->cwt_state == 0)
cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
mtx_unlock(&cwt->cwt_lock);
TAILQ_INIT(&cwt->icc_head);
}
for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
i++, cwt++) {
mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
cv_init(&cwt->cwt_cv, "cwt cv");
TAILQ_INIT(&cwt->icc_head);
}
cxgbei_proc = NULL;
for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
i++, cwt++) {
rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
&cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
if (rc != 0) {
printf("cxgbei: failed to start rx thread #%d/%d (%d)\n",
i + 1, worker_thread_count, rc);
return (rc);
}
}
for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
i++, cwt++) {
rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
&cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
if (rc != 0) {
printf("cxgbei: failed to start tx thread #%d/%d (%d)\n",
i + 1, worker_thread_count, rc);
return (rc);
}
}
MPASS(cwt_softc != NULL);
MPASS(worker_thread_count > 0);
return (0);
}
static void
stop_worker_threads1(struct cxgbei_worker_thread *threads)
{
struct cxgbei_worker_thread *cwt;
int i;
for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
mtx_lock(&cwt->cwt_lock);
if (cwt->cwt_td != NULL) {
MPASS(cwt->cwt_state == CWT_RUNNING ||
cwt->cwt_state == CWT_SLEEPING);
cwt->cwt_state = CWT_STOP;
cv_signal(&cwt->cwt_cv);
mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
}
mtx_unlock(&cwt->cwt_lock);
mtx_destroy(&cwt->cwt_lock);
cv_destroy(&cwt->cwt_cv);
}
free(threads, M_CXGBE);
}
static void
stop_worker_threads(void)
{
int i;
struct cxgbei_worker_thread_softc *cwt = &cwt_softc[0];
MPASS(worker_thread_count >= 0);
for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
mtx_lock(&cwt->cwt_lock);
MPASS(cwt->cwt_state == CWT_RUNNING ||
cwt->cwt_state == CWT_SLEEPING);
cwt->cwt_state = CWT_STOP;
cv_signal(&cwt->cwt_cv);
do {
cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
} while (cwt->cwt_state != CWT_STOPPED);
mtx_unlock(&cwt->cwt_lock);
mtx_destroy(&cwt->cwt_lock);
cv_destroy(&cwt->cwt_cv);
}
free(cwt_softc, M_CXGBE);
stop_worker_threads1(cwt_rx_threads);
stop_worker_threads1(cwt_tx_threads);
}
/* Select a worker thread for a connection. */

View File

@ -36,23 +36,19 @@ enum {
CWT_SLEEPING = 1,
CWT_RUNNING = 2,
CWT_STOP = 3,
CWT_STOPPED = 4,
};
struct cxgbei_worker_thread_softc {
struct cxgbei_worker_thread {
struct mtx cwt_lock;
struct cv cwt_cv;
volatile int cwt_state;
struct thread *cwt_td;
TAILQ_HEAD(, icl_cxgbei_conn) rx_head;
TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
} __aligned(CACHE_LINE_SIZE);
#define CXGBEI_CONN_SIGNATURE 0x56788765
enum {
RXF_ACTIVE = 1 << 0, /* In the worker thread's queue */
};
struct cxgbei_cmp {
LIST_ENTRY(cxgbei_cmp) link;
@ -71,16 +67,21 @@ struct icl_cxgbei_conn {
int ulp_submode;
struct adapter *sc;
struct toepcb *toep;
u_int cwt;
/* Receive related. */
u_int rx_flags; /* protected by so_rcv lock */
u_int cwt;
bool rx_active; /* protected by so_rcv lock */
STAILQ_HEAD(, icl_pdu) rcvd_pdus; /* protected by so_rcv lock */
TAILQ_ENTRY(icl_cxgbei_conn) rx_link; /* protected by cwt lock */
struct cxgbei_cmp_head *cmp_table; /* protected by cmp_lock */
struct mtx cmp_lock;
unsigned long cmp_hash_mask;
/* Transmit related. */
bool tx_active; /* protected by ic lock */
STAILQ_HEAD(, icl_pdu) sent_pdus; /* protected by ic lock */
TAILQ_ENTRY(icl_cxgbei_conn) tx_link; /* protected by cwt lock */
};
static inline struct icl_cxgbei_conn *
@ -134,8 +135,10 @@ struct cxgbei_data {
/* cxgbei.c */
u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
void cwt_queue_for_tx(struct icl_cxgbei_conn *);
/* icl_cxgbei.c */
void cwt_tx_main(void *);
int icl_cxgbei_mod_load(void);
int icl_cxgbei_mod_unload(void);
struct icl_pdu *icl_cxgbei_new_pdu(int);

View File

@ -421,6 +421,128 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp)
return (m);
}
static void
cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
{
struct epoch_tracker et;
struct icl_conn *ic = &icc->ic;
struct toepcb *toep = icc->toep;
struct inpcb *inp;
/*
* Do not get inp from toep->inp as the toepcb might have
* detached already.
*/
inp = sotoinpcb(so);
CURVNET_SET(toep->vnet);
NET_EPOCH_ENTER(et);
INP_WLOCK(inp);
ICL_CONN_UNLOCK(ic);
if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
__predict_false((toep->flags & TPF_ATTACHED) == 0)) {
mbufq_drain(mq);
} else {
mbufq_concat(&toep->ulp_pduq, mq);
t4_push_pdus(icc->sc, toep, 0);
}
INP_WUNLOCK(inp);
NET_EPOCH_EXIT(et);
CURVNET_RESTORE();
ICL_CONN_LOCK(ic);
}
void
cwt_tx_main(void *arg)
{
struct cxgbei_worker_thread *cwt = arg;
struct icl_cxgbei_conn *icc;
struct icl_conn *ic;
struct icl_pdu *ip;
struct socket *so;
struct mbuf *m;
struct mbufq mq;
STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
MPASS(cwt != NULL);
mtx_lock(&cwt->cwt_lock);
MPASS(cwt->cwt_state == 0);
cwt->cwt_state = CWT_RUNNING;
cv_signal(&cwt->cwt_cv);
mbufq_init(&mq, INT_MAX);
while (__predict_true(cwt->cwt_state != CWT_STOP)) {
cwt->cwt_state = CWT_RUNNING;
while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
mtx_unlock(&cwt->cwt_lock);
ic = &icc->ic;
ICL_CONN_LOCK(ic);
MPASS(icc->tx_active);
STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
ICL_CONN_UNLOCK(ic);
while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
m = finalize_pdu(icc, ip_to_icp(ip));
M_ASSERTPKTHDR(m);
MPASS((m->m_pkthdr.len & 3) == 0);
mbufq_enqueue(&mq, m);
}
ICL_CONN_LOCK(ic);
so = ic->ic_socket;
if (__predict_false(ic->ic_disconnecting) ||
__predict_false(so == NULL)) {
mbufq_drain(&mq);
icc->tx_active = false;
ICL_CONN_UNLOCK(ic);
mtx_lock(&cwt->cwt_lock);
continue;
}
cwt_push_pdus(icc, so, &mq);
MPASS(icc->tx_active);
if (STAILQ_EMPTY(&icc->sent_pdus)) {
icc->tx_active = false;
ICL_CONN_UNLOCK(ic);
mtx_lock(&cwt->cwt_lock);
} else {
ICL_CONN_UNLOCK(ic);
/*
* More PDUs were queued while we were
* busy sending the previous batch.
* Re-add this connection to the end
* of the queue.
*/
mtx_lock(&cwt->cwt_lock);
TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
tx_link);
}
}
/* Inner loop doesn't check for CWT_STOP, do that first. */
if (__predict_false(cwt->cwt_state == CWT_STOP))
break;
cwt->cwt_state = CWT_SLEEPING;
cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
}
MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
mtx_unlock(&cwt->cwt_lock);
kthread_exit();
}
int
icl_cxgbei_conn_pdu_append_data(struct icl_conn *ic, struct icl_pdu *ip,
const void *addr, size_t len, int flags)
@ -534,13 +656,9 @@ void
icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
icl_pdu_cb cb)
{
struct epoch_tracker et;
struct icl_cxgbei_conn *icc = ic_to_icc(ic);
struct icl_cxgbei_pdu *icp = ip_to_icp(ip);
struct socket *so = ic->ic_socket;
struct toepcb *toep = icc->toep;
struct inpcb *inp;
struct mbuf *m;
MPASS(ic == ip->ip_conn);
MPASS(ip->ip_bhs_mbuf != NULL);
@ -557,28 +675,11 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
return;
}
m = finalize_pdu(icc, icp);
M_ASSERTPKTHDR(m);
MPASS((m->m_pkthdr.len & 3) == 0);
/*
* Do not get inp from toep->inp as the toepcb might have detached
* already.
*/
inp = sotoinpcb(so);
CURVNET_SET(toep->vnet);
NET_EPOCH_ENTER(et);
INP_WLOCK(inp);
if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
__predict_false((toep->flags & TPF_ATTACHED) == 0))
m_freem(m);
else {
mbufq_enqueue(&toep->ulp_pduq, m);
t4_push_pdus(icc->sc, toep, 0);
STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
if (!icc->tx_active) {
icc->tx_active = true;
cwt_queue_for_tx(icc);
}
INP_WUNLOCK(inp);
NET_EPOCH_EXIT(et);
CURVNET_RESTORE();
}
static struct icl_conn *
@ -593,6 +694,7 @@ icl_cxgbei_new_conn(const char *name, struct mtx *lock)
M_WAITOK | M_ZERO);
icc->icc_signature = CXGBEI_CONN_SIGNATURE;
STAILQ_INIT(&icc->rcvd_pdus);
STAILQ_INIT(&icc->sent_pdus);
icc->cmp_table = hashinit(64, M_CXGBEI, &icc->cmp_hash_mask);
mtx_init(&icc->cmp_lock, "cxgbei_cmp", NULL, MTX_DEF);
@ -935,21 +1037,33 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
if (toep != NULL) { /* NULL if connection was never offloaded. */
toep->ulpcb = NULL;
/*
* Wait for the cwt threads to stop processing this
* connection for transmit.
*/
while (icc->tx_active)
rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
/* Discard PDUs queued for TX. */
while (!STAILQ_EMPTY(&icc->sent_pdus)) {
ip = STAILQ_FIRST(&icc->sent_pdus);
STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
icl_cxgbei_pdu_done(ip, ENOTCONN);
}
mbufq_drain(&toep->ulp_pduq);
/*
* Wait for the cwt threads to stop processing this
* connection.
* connection for receive.
*/
SOCKBUF_LOCK(sb);
if (icc->rx_flags & RXF_ACTIVE) {
volatile u_int *p = &icc->rx_flags;
if (icc->rx_active) {
volatile bool *p = &icc->rx_active;
SOCKBUF_UNLOCK(sb);
INP_WUNLOCK(inp);
while (*p & RXF_ACTIVE)
while (*p)
pause("conclo", 1);
INP_WLOCK(inp);