Use DDP to implement zerocopy TCP receive with aio_read().

Chelsio's TCP offload engine supports direct DMA of received TCP payload
into wired user buffers.  This feature is known as Direct-Data Placement.
However, to scale well the adapter needs to prepare buffers for DDP
before data arrives.  aio_read() is more amenable to this requirement than
read() as applications often call read() only after data is available in
the socket buffer.

When DDP is enabled, TOE sockets use the recently added pru_aio_queue
protocol hook to claim aio_read(2) requests instead of letting them use
the default AIO socket logic.  The DDP feature supports scheduling DMA
to two buffers at a time so that the second buffer is ready for use
after the first buffer is filled.  The aio/DDP code optimizes the case
of an application ping-ponging between two buffers (similar to the
zero-copy bpf(4) code) by keeping the two most recently used AIO buffers
wired.  If a buffer is reused, the aio/DDP code is able to reuse the
vm_page_t array as well as page pod mappings (a kind of MMU mapping the
Chelsio NIC uses to describe user buffers).  The generation of the
vmspace of the calling process is used in conjunction with the user
buffer's address and length to determine if a user buffer matches a
previously used buffer.  If an application queues a buffer for AIO that
does not match a previously used buffer then the least recently used
buffer is unwired before the new buffer is wired.  This ensures that no
more than two user buffers per socket are ever wired.

Note that this feature is best suited to applications sending a steady
stream of data vs short bursts of traffic.

Discussed with:	np
Relnotes:	yes
Sponsored by:	Chelsio Communications
This commit is contained in:
John Baldwin 2016-05-07 00:33:35 +00:00
parent 826c2372c5
commit dc9643853d
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=299210
6 changed files with 1308 additions and 739 deletions

View File

@ -145,8 +145,6 @@ struct uld_info {
struct tom_tunables {
int sndbuf;
int ddp;
int indsz;
int ddp_thres;
int rx_coalesce;
int tx_align;
};

View File

@ -4901,15 +4901,6 @@ t4_sysctls(struct adapter *sc)
SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp", CTLFLAG_RW,
&sc->tt.ddp, 0, "DDP allowed");
sc->tt.indsz = G_INDICATESIZE(t4_read_reg(sc, A_TP_PARA_REG5));
SYSCTL_ADD_INT(ctx, children, OID_AUTO, "indsz", CTLFLAG_RW,
&sc->tt.indsz, 0, "DDP max indicate size allowed");
sc->tt.ddp_thres =
G_RXCOALESCESIZE(t4_read_reg(sc, A_TP_PARA_REG2));
SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp_thres", CTLFLAG_RW,
&sc->tt.ddp_thres, 0, "DDP threshold");
sc->tt.rx_coalesce = 1;
SYSCTL_ADD_INT(ctx, children, OID_AUTO, "rx_coalesce",
CTLFLAG_RW, &sc->tt.rx_coalesce, 0, "receive coalescing");

View File

@ -343,7 +343,7 @@ send_rx_credits(struct adapter *sc, struct toepcb *toep, int credits)
}
void
t4_rcvd(struct toedev *tod, struct tcpcb *tp)
t4_rcvd_locked(struct toedev *tod, struct tcpcb *tp)
{
struct adapter *sc = tod->tod_softc;
struct inpcb *inp = tp->t_inpcb;
@ -354,7 +354,7 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp)
INP_WLOCK_ASSERT(inp);
SOCKBUF_LOCK(sb);
SOCKBUF_LOCK_ASSERT(sb);
KASSERT(toep->sb_cc >= sbused(sb),
("%s: sb %p has more data (%d) than last time (%d).",
__func__, sb, sbused(sb), toep->sb_cc));
@ -372,6 +372,17 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp)
tp->rcv_wnd += credits;
tp->rcv_adv += credits;
}
}
void
t4_rcvd(struct toedev *tod, struct tcpcb *tp)
{
struct inpcb *inp = tp->t_inpcb;
struct socket *so = inp->inp_socket;
struct sockbuf *sb = &so->so_rcv;
SOCKBUF_LOCK(sb);
t4_rcvd_locked(tod, tp);
SOCKBUF_UNLOCK(sb);
}
@ -1042,7 +1053,6 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
struct inpcb *inp = toep->inp;
struct tcpcb *tp = NULL;
struct socket *so;
struct sockbuf *sb;
#ifdef INVARIANTS
unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl)));
#endif
@ -1088,12 +1098,14 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
tp->rcv_nxt++; /* FIN */
so = inp->inp_socket;
sb = &so->so_rcv;
SOCKBUF_LOCK(sb);
if (__predict_false(toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) {
handle_ddp_close(toep, tp, sb, cpl->rcv_nxt);
if (toep->ulp_mode == ULP_MODE_TCPDDP) {
DDP_LOCK(toep);
if (__predict_false(toep->ddp_flags &
(DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)))
handle_ddp_close(toep, tp, cpl->rcv_nxt);
DDP_UNLOCK(toep);
}
socantrcvmore_locked(so); /* unlocks the sockbuf */
socantrcvmore(so);
if (toep->ulp_mode != ULP_MODE_RDMA) {
KASSERT(tp->rcv_nxt == be32toh(cpl->rcv_nxt),
@ -1409,6 +1421,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
tp->rcv_wnd -= len;
tp->t_rcvtime = ticks;
if (toep->ulp_mode == ULP_MODE_TCPDDP)
DDP_LOCK(toep);
so = inp_inpcbtosocket(inp);
sb = &so->so_rcv;
SOCKBUF_LOCK(sb);
@ -1418,6 +1432,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
__func__, tid, len);
m_freem(m);
SOCKBUF_UNLOCK(sb);
if (toep->ulp_mode == ULP_MODE_TCPDDP)
DDP_UNLOCK(toep);
INP_WUNLOCK(inp);
INP_INFO_RLOCK(&V_tcbinfo);
@ -1446,6 +1462,10 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
toep->rx_credits += newsize - hiwat;
}
if (toep->ddp_waiting_count != 0 || toep->ddp_active_count != 0)
CTR3(KTR_CXGBE, "%s: tid %u, non-ddp rx (%d bytes)", __func__,
tid, len);
if (toep->ulp_mode == ULP_MODE_TCPDDP) {
int changed = !(toep->ddp_flags & DDP_ON) ^ cpl->ddp_off;
@ -1458,47 +1478,22 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
__func__));
/* Fell out of DDP mode */
toep->ddp_flags &= ~(DDP_ON | DDP_BUF0_ACTIVE |
DDP_BUF1_ACTIVE);
toep->ddp_flags &= ~DDP_ON;
CTR1(KTR_CXGBE, "%s: fell out of DDP mode",
__func__);
if (ddp_placed)
insert_ddp_data(toep, ddp_placed);
insert_ddp_data(toep, ddp_placed);
}
}
if ((toep->ddp_flags & DDP_OK) == 0 &&
time_uptime >= toep->ddp_disabled + DDP_RETRY_WAIT) {
toep->ddp_score = DDP_LOW_SCORE;
toep->ddp_flags |= DDP_OK;
CTR3(KTR_CXGBE, "%s: tid %u DDP_OK @ %u",
__func__, tid, time_uptime);
}
if (toep->ddp_flags & DDP_ON) {
/*
* CPL_RX_DATA with DDP on can only be an indicate. Ask
* soreceive to post a buffer or disable DDP. The
* payload that arrived in this indicate is appended to
* the socket buffer as usual.
* CPL_RX_DATA with DDP on can only be an indicate.
* Start posting queued AIO requests via DDP. The
* payload that arrived in this indicate is appended
* to the socket buffer as usual.
*/
#if 0
CTR5(KTR_CXGBE,
"%s: tid %u (0x%x) DDP indicate (seq 0x%x, len %d)",
__func__, tid, toep->flags, be32toh(cpl->seq), len);
#endif
sb->sb_flags |= SB_DDP_INDICATE;
} else if ((toep->ddp_flags & (DDP_OK|DDP_SC_REQ)) == DDP_OK &&
tp->rcv_wnd > DDP_RSVD_WIN && len >= sc->tt.ddp_thres) {
/*
* DDP allowed but isn't on (and a request to switch it
* on isn't pending either), and conditions are ripe for
* it to work. Switch it on.
*/
enable_ddp(sc, toep);
handle_ddp_indicate(toep);
}
}
@ -1516,8 +1511,16 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
tp->rcv_wnd += credits;
tp->rcv_adv += credits;
}
if (toep->ddp_waiting_count > 0 && sbavail(sb) != 0) {
CTR2(KTR_CXGBE, "%s: tid %u queueing AIO task", __func__,
tid);
ddp_queue_toep(toep);
}
sorwakeup_locked(so);
SOCKBUF_UNLOCK_ASSERT(sb);
if (toep->ulp_mode == ULP_MODE_TCPDDP)
DDP_UNLOCK(toep);
INP_WUNLOCK(inp);
CURVNET_RESTORE();
@ -1680,6 +1683,7 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
struct adapter *sc = iq->adapter;
const struct cpl_set_tcb_rpl *cpl = (const void *)(rss + 1);
unsigned int tid = GET_TID(cpl);
struct toepcb *toep;
#ifdef INVARIANTS
unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl)));
#endif
@ -1691,6 +1695,12 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
if (is_ftid(sc, tid))
return (t4_filter_rpl(iq, rss, m)); /* TCB is a filter */
toep = lookup_tid(sc, tid);
if (toep->ulp_mode == ULP_MODE_TCPDDP) {
handle_ddp_tcb_rpl(toep, cpl);
return (0);
}
/*
* TOM and/or other ULPs don't request replies for CPL_SET_TCB or
* CPL_SET_TCB_FIELD requests. This can easily change and when it does
@ -1731,6 +1741,31 @@ t4_set_tcb_field(struct adapter *sc, struct toepcb *toep, int ctrl,
t4_wrq_tx(sc, wr);
}
void
t4_set_tcb_field_rpl(struct adapter *sc, struct toepcb *toep, int ctrl,
uint16_t word, uint64_t mask, uint64_t val, uint8_t cookie)
{
struct wrqe *wr;
struct cpl_set_tcb_field *req;
KASSERT((cookie & ~M_COOKIE) == 0, ("%s: invalid cookie %#x", __func__,
cookie));
wr = alloc_wrqe(sizeof(*req), ctrl ? toep->ctrlq : toep->ofld_txq);
if (wr == NULL) {
/* XXX */
panic("%s: allocation failure.", __func__);
}
req = wrtod(wr);
INIT_TP_WR_MIT_CPL(req, CPL_SET_TCB_FIELD, toep->tid);
req->reply_ctrl = htobe16(V_QUEUENO(toep->ofld_rxq->iq.abs_id));
req->word_cookie = htobe16(V_WORD(word) | V_COOKIE(cookie));
req->mask = htobe64(mask);
req->val = htobe64(val);
t4_wrq_tx(sc, wr);
}
void
t4_init_cpl_io_handlers(struct adapter *sc)
{

File diff suppressed because it is too large Load Diff

View File

@ -41,6 +41,7 @@ __FBSDID("$FreeBSD$");
#include <sys/module.h>
#include <sys/protosw.h>
#include <sys/domain.h>
#include <sys/refcount.h>
#include <sys/rmlock.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
@ -152,6 +153,7 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags)
if (toep == NULL)
return (NULL);
refcount_init(&toep->refcount, 1);
toep->td = sc->tom_softc;
toep->vi = vi;
toep->tx_total = tx_credits;
@ -165,19 +167,32 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags)
toep->txsd_avail = txsd_total;
toep->txsd_pidx = 0;
toep->txsd_cidx = 0;
ddp_init_toep(toep);
return (toep);
}
struct toepcb *
hold_toepcb(struct toepcb *toep)
{
refcount_acquire(&toep->refcount);
return (toep);
}
void
free_toepcb(struct toepcb *toep)
{
if (refcount_release(&toep->refcount) == 0)
return;
KASSERT(!(toep->flags & TPF_ATTACHED),
("%s: attached to an inpcb", __func__));
KASSERT(!(toep->flags & TPF_CPL_PENDING),
("%s: CPL pending", __func__));
ddp_uninit_toep(toep);
free(toep, M_CXGBE);
}
@ -259,6 +274,8 @@ undo_offload_socket(struct socket *so)
mtx_lock(&td->toep_list_lock);
TAILQ_REMOVE(&td->toep_list, toep, link);
mtx_unlock(&td->toep_list_lock);
free_toepcb(toep);
}
static void
@ -283,9 +300,9 @@ release_offload_resources(struct toepcb *toep)
*/
MPASS(mbufq_len(&toep->ulp_pduq) == 0);
MPASS(mbufq_len(&toep->ulp_pdu_reclaimq) == 0);
if (toep->ulp_mode == ULP_MODE_TCPDDP)
release_ddp_resources(toep);
#ifdef INVARIANTS
ddp_assert_empty(toep);
#endif
if (toep->l2te)
t4_l2t_release(toep->l2te);
@ -389,6 +406,8 @@ final_cpl_received(struct toepcb *toep)
CTR6(KTR_CXGBE, "%s: tid %d, toep %p (0x%x), inp %p (0x%x)",
__func__, toep->tid, toep, toep->flags, inp, inp->inp_flags);
if (toep->ulp_mode == ULP_MODE_TCPDDP)
release_ddp_resources(toep);
toep->inp = NULL;
toep->flags &= ~TPF_CPL_PENDING;
mbufq_drain(&toep->ulp_pdu_reclaimq);
@ -599,7 +618,6 @@ set_tcpddp_ulp_mode(struct toepcb *toep)
toep->ulp_mode = ULP_MODE_TCPDDP;
toep->ddp_flags = DDP_OK;
toep->ddp_score = DDP_LOW_SCORE;
}
int
@ -1109,12 +1127,16 @@ t4_tom_mod_load(void)
int rc;
struct protosw *tcp_protosw, *tcp6_protosw;
rc = t4_ddp_mod_load();
if (rc != 0)
return (rc);
tcp_protosw = pffindproto(PF_INET, IPPROTO_TCP, SOCK_STREAM);
if (tcp_protosw == NULL)
return (ENOPROTOOPT);
bcopy(tcp_protosw, &ddp_protosw, sizeof(ddp_protosw));
bcopy(tcp_protosw->pr_usrreqs, &ddp_usrreqs, sizeof(ddp_usrreqs));
ddp_usrreqs.pru_soreceive = t4_soreceive_ddp;
ddp_usrreqs.pru_aio_queue = t4_aio_queue_ddp;
ddp_protosw.pr_usrreqs = &ddp_usrreqs;
tcp6_protosw = pffindproto(PF_INET6, IPPROTO_TCP, SOCK_STREAM);
@ -1122,7 +1144,7 @@ t4_tom_mod_load(void)
return (ENOPROTOOPT);
bcopy(tcp6_protosw, &ddp6_protosw, sizeof(ddp6_protosw));
bcopy(tcp6_protosw->pr_usrreqs, &ddp6_usrreqs, sizeof(ddp6_usrreqs));
ddp6_usrreqs.pru_soreceive = t4_soreceive_ddp;
ddp6_usrreqs.pru_aio_queue = t4_aio_queue_ddp;
ddp6_protosw.pr_usrreqs = &ddp6_usrreqs;
TIMEOUT_TASK_INIT(taskqueue_thread, &clip_task, 0, t4_clip_task, NULL);
@ -1162,6 +1184,8 @@ t4_tom_mod_unload(void)
taskqueue_cancel_timeout(taskqueue_thread, &clip_task, NULL);
}
t4_ddp_mod_unload();
return (0);
}
#endif /* TCP_OFFLOAD */

View File

@ -74,6 +74,8 @@ enum {
DDP_ON = (1 << 2), /* DDP is turned on */
DDP_BUF0_ACTIVE = (1 << 3), /* buffer 0 in use (not invalidated) */
DDP_BUF1_ACTIVE = (1 << 4), /* buffer 1 in use (not invalidated) */
DDP_TASK_ACTIVE = (1 << 5), /* requeue task is queued / running */
DDP_DEAD = (1 << 6), /* toepcb is shutting down */
};
struct ofld_tx_sdesc {
@ -81,19 +83,36 @@ struct ofld_tx_sdesc {
uint8_t tx_credits; /* firmware tx credits (unit is 16B) */
};
struct ddp_buffer {
uint32_t tag; /* includes color, page pod addr, and DDP page size */
struct pageset {
TAILQ_ENTRY(pageset) link;
vm_page_t *pages;
int npages;
int flags;
u_int ppod_addr;
int nppods;
int offset;
uint32_t tag; /* includes color, page pod addr, and DDP page size */
int offset; /* offset in first page */
int len;
int npages;
vm_page_t *pages;
struct vmspace *vm;
u_int vm_timestamp;
};
TAILQ_HEAD(pagesetq, pageset);
#define PS_WIRED 0x0001 /* Pages wired rather than held. */
#define PS_PPODS_WRITTEN 0x0002 /* Page pods written to the card. */
struct ddp_buffer {
struct pageset *ps;
struct kaiocb *job;
int cancel_pending;
};
struct toepcb {
TAILQ_ENTRY(toepcb) link; /* toep_list */
u_int flags; /* miscellaneous flags */
int refcount;
struct tom_data *td;
struct inpcb *inp; /* backpointer to host stack's PCB */
struct vi_info *vi; /* virtual interface */
@ -121,9 +140,16 @@ struct toepcb {
struct mbufq ulp_pdu_reclaimq;
u_int ddp_flags;
struct ddp_buffer *db[2];
time_t ddp_disabled;
uint8_t ddp_score;
struct ddp_buffer db[2];
TAILQ_HEAD(, pageset) ddp_cached_pagesets;
TAILQ_HEAD(, kaiocb) ddp_aiojobq;
u_int ddp_waiting_count;
u_int ddp_active_count;
u_int ddp_cached_count;
int ddp_active_id; /* the currently active DDP buffer */
struct task ddp_requeue_task;
struct kaiocb *ddp_queueing;
struct mtx ddp_lock;
/* Tx software descriptor */
uint8_t txsd_total;
@ -133,6 +159,10 @@ struct toepcb {
struct ofld_tx_sdesc txsd[];
};
#define DDP_LOCK(toep) mtx_lock(&(toep)->ddp_lock)
#define DDP_UNLOCK(toep) mtx_unlock(&(toep)->ddp_lock)
#define DDP_ASSERT_LOCKED(toep) mtx_assert(&(toep)->ddp_lock, MA_OWNED)
struct flowc_tx_params {
uint32_t snd_nxt;
uint32_t rcv_nxt;
@ -242,6 +272,7 @@ mbuf_ulp_submode(struct mbuf *m)
/* t4_tom.c */
struct toepcb *alloc_toepcb(struct vi_info *, int, int, int);
struct toepcb *hold_toepcb(struct toepcb *);
void free_toepcb(struct toepcb *);
void offload_socket(struct socket *, struct toepcb *);
void undo_offload_socket(struct socket *);
@ -289,11 +320,14 @@ void send_flowc_wr(struct toepcb *, struct flowc_tx_params *);
void send_reset(struct adapter *, struct toepcb *, uint32_t);
void make_established(struct toepcb *, uint32_t, uint32_t, uint16_t);
void t4_rcvd(struct toedev *, struct tcpcb *);
void t4_rcvd_locked(struct toedev *, struct tcpcb *);
int t4_tod_output(struct toedev *, struct tcpcb *);
int t4_send_fin(struct toedev *, struct tcpcb *);
int t4_send_rst(struct toedev *, struct tcpcb *);
void t4_set_tcb_field(struct adapter *, struct toepcb *, int, uint16_t,
uint64_t, uint64_t);
void t4_set_tcb_field_rpl(struct adapter *, struct toepcb *, int, uint16_t,
uint64_t, uint64_t, uint8_t);
void t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop);
void t4_push_pdus(struct adapter *sc, struct toepcb *toep, int drop);
@ -302,10 +336,17 @@ void t4_init_ddp(struct adapter *, struct tom_data *);
void t4_uninit_ddp(struct adapter *, struct tom_data *);
int t4_soreceive_ddp(struct socket *, struct sockaddr **, struct uio *,
struct mbuf **, struct mbuf **, int *);
void enable_ddp(struct adapter *, struct toepcb *toep);
int t4_aio_queue_ddp(struct socket *, struct kaiocb *);
int t4_ddp_mod_load(void);
void t4_ddp_mod_unload(void);
void ddp_assert_empty(struct toepcb *);
void ddp_init_toep(struct toepcb *);
void ddp_uninit_toep(struct toepcb *);
void ddp_queue_toep(struct toepcb *);
void release_ddp_resources(struct toepcb *toep);
void handle_ddp_close(struct toepcb *, struct tcpcb *, struct sockbuf *,
uint32_t);
void handle_ddp_close(struct toepcb *, struct tcpcb *, uint32_t);
void handle_ddp_indicate(struct toepcb *);
void handle_ddp_tcb_rpl(struct toepcb *, const struct cpl_set_tcb_rpl *);
void insert_ddp_data(struct toepcb *, uint32_t);
#endif