diff --git a/sys/dev/cxgbe/offload.h b/sys/dev/cxgbe/offload.h index d1363c4a8c28..22612d5156a1 100644 --- a/sys/dev/cxgbe/offload.h +++ b/sys/dev/cxgbe/offload.h @@ -145,8 +145,6 @@ struct uld_info { struct tom_tunables { int sndbuf; int ddp; - int indsz; - int ddp_thres; int rx_coalesce; int tx_align; }; diff --git a/sys/dev/cxgbe/t4_main.c b/sys/dev/cxgbe/t4_main.c index 1e86aa9516a5..132d69e1c3e7 100644 --- a/sys/dev/cxgbe/t4_main.c +++ b/sys/dev/cxgbe/t4_main.c @@ -4901,15 +4901,6 @@ t4_sysctls(struct adapter *sc) SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp", CTLFLAG_RW, &sc->tt.ddp, 0, "DDP allowed"); - sc->tt.indsz = G_INDICATESIZE(t4_read_reg(sc, A_TP_PARA_REG5)); - SYSCTL_ADD_INT(ctx, children, OID_AUTO, "indsz", CTLFLAG_RW, - &sc->tt.indsz, 0, "DDP max indicate size allowed"); - - sc->tt.ddp_thres = - G_RXCOALESCESIZE(t4_read_reg(sc, A_TP_PARA_REG2)); - SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp_thres", CTLFLAG_RW, - &sc->tt.ddp_thres, 0, "DDP threshold"); - sc->tt.rx_coalesce = 1; SYSCTL_ADD_INT(ctx, children, OID_AUTO, "rx_coalesce", CTLFLAG_RW, &sc->tt.rx_coalesce, 0, "receive coalescing"); diff --git a/sys/dev/cxgbe/tom/t4_cpl_io.c b/sys/dev/cxgbe/tom/t4_cpl_io.c index dd6d5b52c477..47109774bdbc 100644 --- a/sys/dev/cxgbe/tom/t4_cpl_io.c +++ b/sys/dev/cxgbe/tom/t4_cpl_io.c @@ -343,7 +343,7 @@ send_rx_credits(struct adapter *sc, struct toepcb *toep, int credits) } void -t4_rcvd(struct toedev *tod, struct tcpcb *tp) +t4_rcvd_locked(struct toedev *tod, struct tcpcb *tp) { struct adapter *sc = tod->tod_softc; struct inpcb *inp = tp->t_inpcb; @@ -354,7 +354,7 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp) INP_WLOCK_ASSERT(inp); - SOCKBUF_LOCK(sb); + SOCKBUF_LOCK_ASSERT(sb); KASSERT(toep->sb_cc >= sbused(sb), ("%s: sb %p has more data (%d) than last time (%d).", __func__, sb, sbused(sb), toep->sb_cc)); @@ -372,6 +372,17 @@ t4_rcvd(struct toedev *tod, struct tcpcb *tp) tp->rcv_wnd += credits; tp->rcv_adv += credits; } +} + +void +t4_rcvd(struct toedev *tod, struct tcpcb *tp) +{ + struct inpcb *inp = tp->t_inpcb; + struct socket *so = inp->inp_socket; + struct sockbuf *sb = &so->so_rcv; + + SOCKBUF_LOCK(sb); + t4_rcvd_locked(tod, tp); SOCKBUF_UNLOCK(sb); } @@ -1042,7 +1053,6 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) struct inpcb *inp = toep->inp; struct tcpcb *tp = NULL; struct socket *so; - struct sockbuf *sb; #ifdef INVARIANTS unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl))); #endif @@ -1088,12 +1098,14 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_nxt++; /* FIN */ so = inp->inp_socket; - sb = &so->so_rcv; - SOCKBUF_LOCK(sb); - if (__predict_false(toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) { - handle_ddp_close(toep, tp, sb, cpl->rcv_nxt); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { + DDP_LOCK(toep); + if (__predict_false(toep->ddp_flags & + (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) + handle_ddp_close(toep, tp, cpl->rcv_nxt); + DDP_UNLOCK(toep); } - socantrcvmore_locked(so); /* unlocks the sockbuf */ + socantrcvmore(so); if (toep->ulp_mode != ULP_MODE_RDMA) { KASSERT(tp->rcv_nxt == be32toh(cpl->rcv_nxt), @@ -1409,6 +1421,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_wnd -= len; tp->t_rcvtime = ticks; + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_LOCK(toep); so = inp_inpcbtosocket(inp); sb = &so->so_rcv; SOCKBUF_LOCK(sb); @@ -1418,6 +1432,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) __func__, tid, len); m_freem(m); SOCKBUF_UNLOCK(sb); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_UNLOCK(toep); INP_WUNLOCK(inp); INP_INFO_RLOCK(&V_tcbinfo); @@ -1446,6 +1462,10 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) toep->rx_credits += newsize - hiwat; } + if (toep->ddp_waiting_count != 0 || toep->ddp_active_count != 0) + CTR3(KTR_CXGBE, "%s: tid %u, non-ddp rx (%d bytes)", __func__, + tid, len); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { int changed = !(toep->ddp_flags & DDP_ON) ^ cpl->ddp_off; @@ -1458,47 +1478,22 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) __func__)); /* Fell out of DDP mode */ - toep->ddp_flags &= ~(DDP_ON | DDP_BUF0_ACTIVE | - DDP_BUF1_ACTIVE); + toep->ddp_flags &= ~DDP_ON; + CTR1(KTR_CXGBE, "%s: fell out of DDP mode", + __func__); - if (ddp_placed) - insert_ddp_data(toep, ddp_placed); + insert_ddp_data(toep, ddp_placed); } } - if ((toep->ddp_flags & DDP_OK) == 0 && - time_uptime >= toep->ddp_disabled + DDP_RETRY_WAIT) { - toep->ddp_score = DDP_LOW_SCORE; - toep->ddp_flags |= DDP_OK; - CTR3(KTR_CXGBE, "%s: tid %u DDP_OK @ %u", - __func__, tid, time_uptime); - } - if (toep->ddp_flags & DDP_ON) { - /* - * CPL_RX_DATA with DDP on can only be an indicate. Ask - * soreceive to post a buffer or disable DDP. The - * payload that arrived in this indicate is appended to - * the socket buffer as usual. + * CPL_RX_DATA with DDP on can only be an indicate. + * Start posting queued AIO requests via DDP. The + * payload that arrived in this indicate is appended + * to the socket buffer as usual. */ - -#if 0 - CTR5(KTR_CXGBE, - "%s: tid %u (0x%x) DDP indicate (seq 0x%x, len %d)", - __func__, tid, toep->flags, be32toh(cpl->seq), len); -#endif - sb->sb_flags |= SB_DDP_INDICATE; - } else if ((toep->ddp_flags & (DDP_OK|DDP_SC_REQ)) == DDP_OK && - tp->rcv_wnd > DDP_RSVD_WIN && len >= sc->tt.ddp_thres) { - - /* - * DDP allowed but isn't on (and a request to switch it - * on isn't pending either), and conditions are ripe for - * it to work. Switch it on. - */ - - enable_ddp(sc, toep); + handle_ddp_indicate(toep); } } @@ -1516,8 +1511,16 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) tp->rcv_wnd += credits; tp->rcv_adv += credits; } + + if (toep->ddp_waiting_count > 0 && sbavail(sb) != 0) { + CTR2(KTR_CXGBE, "%s: tid %u queueing AIO task", __func__, + tid); + ddp_queue_toep(toep); + } sorwakeup_locked(so); SOCKBUF_UNLOCK_ASSERT(sb); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + DDP_UNLOCK(toep); INP_WUNLOCK(inp); CURVNET_RESTORE(); @@ -1680,6 +1683,7 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) struct adapter *sc = iq->adapter; const struct cpl_set_tcb_rpl *cpl = (const void *)(rss + 1); unsigned int tid = GET_TID(cpl); + struct toepcb *toep; #ifdef INVARIANTS unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl))); #endif @@ -1691,6 +1695,12 @@ do_set_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) if (is_ftid(sc, tid)) return (t4_filter_rpl(iq, rss, m)); /* TCB is a filter */ + toep = lookup_tid(sc, tid); + if (toep->ulp_mode == ULP_MODE_TCPDDP) { + handle_ddp_tcb_rpl(toep, cpl); + return (0); + } + /* * TOM and/or other ULPs don't request replies for CPL_SET_TCB or * CPL_SET_TCB_FIELD requests. This can easily change and when it does @@ -1731,6 +1741,31 @@ t4_set_tcb_field(struct adapter *sc, struct toepcb *toep, int ctrl, t4_wrq_tx(sc, wr); } +void +t4_set_tcb_field_rpl(struct adapter *sc, struct toepcb *toep, int ctrl, + uint16_t word, uint64_t mask, uint64_t val, uint8_t cookie) +{ + struct wrqe *wr; + struct cpl_set_tcb_field *req; + + KASSERT((cookie & ~M_COOKIE) == 0, ("%s: invalid cookie %#x", __func__, + cookie)); + wr = alloc_wrqe(sizeof(*req), ctrl ? toep->ctrlq : toep->ofld_txq); + if (wr == NULL) { + /* XXX */ + panic("%s: allocation failure.", __func__); + } + req = wrtod(wr); + + INIT_TP_WR_MIT_CPL(req, CPL_SET_TCB_FIELD, toep->tid); + req->reply_ctrl = htobe16(V_QUEUENO(toep->ofld_rxq->iq.abs_id)); + req->word_cookie = htobe16(V_WORD(word) | V_COOKIE(cookie)); + req->mask = htobe64(mask); + req->val = htobe64(val); + + t4_wrq_tx(sc, wr); +} + void t4_init_cpl_io_handlers(struct adapter *sc) { diff --git a/sys/dev/cxgbe/tom/t4_ddp.c b/sys/dev/cxgbe/tom/t4_ddp.c index 2aa774d0139c..8cba8ca817b6 100644 --- a/sys/dev/cxgbe/tom/t4_ddp.c +++ b/sys/dev/cxgbe/tom/t4_ddp.c @@ -31,7 +31,8 @@ __FBSDID("$FreeBSD$"); #include "opt_inet.h" #include -#include +#include +#include #include #include #include @@ -41,6 +42,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -72,7 +74,10 @@ VNET_DECLARE(int, tcp_autorcvbuf_inc); VNET_DECLARE(int, tcp_autorcvbuf_max); #define V_tcp_autorcvbuf_max VNET(tcp_autorcvbuf_max) -static struct mbuf *get_ddp_mbuf(int len); +static void aio_ddp_requeue_task(void *context, int pending); +static void ddp_complete_all(struct toepcb *toep, int error); +static void t4_aio_cancel_active(struct kaiocb *job); +static void t4_aio_cancel_queued(struct kaiocb *job); #define PPOD_SZ(n) ((n) * sizeof(struct pagepod)) #define PPOD_SIZE (PPOD_SZ(1)) @@ -80,6 +85,10 @@ static struct mbuf *get_ddp_mbuf(int len); /* XXX: must match A_ULP_RX_TDDP_PSZ */ static int t4_ddp_pgsz[] = {4096, 4096 << 2, 4096 << 4, 4096 << 6}; +static TAILQ_HEAD(, pageset) ddp_orphan_pagesets; +static struct mtx ddp_orphan_pagesets_lock; +static struct task ddp_orphan_task; + #define MAX_DDP_BUFFER_SIZE (M_TCB_RX_DDP_BUF0_LEN) static int alloc_ppods(struct tom_data *td, int n, u_int *ppod_addr) @@ -112,33 +121,199 @@ pages_to_nppods(int npages, int ddp_pgsz) return (howmany(nsegs, PPOD_PAGES)); } +/* + * A page set holds information about a buffer used for DDP. The page + * set holds resources such as the VM pages backing the buffer (either + * held or wired) and the page pods associated with the buffer. + * Recently used page sets are cached to allow for efficient reuse of + * buffers (avoiding the need to re-fault in pages, hold them, etc.). + * Note that cached page sets keep the backing pages wired. The + * number of wired pages is capped by only allowing for two wired + * pagesets per connection. This is not a perfect cap, but is a + * trade-off for performance. + * + * If an application ping-pongs two buffers for a connection via + * aio_read(2) then those buffers should remain wired and expensive VM + * fault lookups should be avoided after each buffer has been used + * once. If an application uses more than two buffers then this will + * fall back to doing expensive VM fault lookups for each operation. + */ +static void +free_pageset(struct tom_data *td, struct pageset *ps) +{ + vm_page_t p; + int i; + + if (ps->nppods > 0) + free_ppods(td, ps->ppod_addr, ps->nppods); + + if (ps->flags & PS_WIRED) { + for (i = 0; i < ps->npages; i++) { + p = ps->pages[i]; + vm_page_lock(p); + vm_page_unwire(p, PQ_INACTIVE); + vm_page_unlock(p); + } + } else + vm_page_unhold_pages(ps->pages, ps->npages); + mtx_lock(&ddp_orphan_pagesets_lock); + TAILQ_INSERT_TAIL(&ddp_orphan_pagesets, ps, link); + taskqueue_enqueue(taskqueue_thread, &ddp_orphan_task); + mtx_unlock(&ddp_orphan_pagesets_lock); +} + +static void +ddp_free_orphan_pagesets(void *context, int pending) +{ + struct pageset *ps; + + mtx_lock(&ddp_orphan_pagesets_lock); + while (!TAILQ_EMPTY(&ddp_orphan_pagesets)) { + ps = TAILQ_FIRST(&ddp_orphan_pagesets); + TAILQ_REMOVE(&ddp_orphan_pagesets, ps, link); + mtx_unlock(&ddp_orphan_pagesets_lock); + if (ps->vm) + vmspace_free(ps->vm); + free(ps, M_CXGBE); + mtx_lock(&ddp_orphan_pagesets_lock); + } + mtx_unlock(&ddp_orphan_pagesets_lock); +} + +static void +recycle_pageset(struct toepcb *toep, struct pageset *ps) +{ + + DDP_ASSERT_LOCKED(toep); + if (!(toep->ddp_flags & DDP_DEAD) && ps->flags & PS_WIRED) { + KASSERT(toep->ddp_cached_count + toep->ddp_active_count < + nitems(toep->db), ("too many wired pagesets")); + TAILQ_INSERT_HEAD(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count++; + } else + free_pageset(toep->td, ps); +} + +static void +ddp_complete_one(struct kaiocb *job, int error) +{ + long copied; + + /* + * If this job had copied data out of the socket buffer before + * it was cancelled, report it as a short read rather than an + * error. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0 || error == 0) + aio_complete(job, copied, 0); + else + aio_complete(job, -1, error); +} + static void free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db) { - if (db == NULL) - return; + if (db->job) { + /* + * XXX: If we are un-offloading the socket then we + * should requeue these on the socket somehow. If we + * got a FIN from the remote end, then this completes + * any remaining requests with an EOF read. + */ + if (!aio_clear_cancel_function(db->job)) + ddp_complete_one(db->job, 0); + } - if (db->pages) - free(db->pages, M_CXGBE); + if (db->ps) + free_pageset(td, db->ps); +} - if (db->nppods > 0) - free_ppods(td, db->ppod_addr, db->nppods); +void +ddp_init_toep(struct toepcb *toep) +{ - free(db, M_CXGBE); + TAILQ_INIT(&toep->ddp_aiojobq); + TASK_INIT(&toep->ddp_requeue_task, 0, aio_ddp_requeue_task, toep); + toep->ddp_active_id = -1; + mtx_init(&toep->ddp_lock, "t4 ddp", NULL, MTX_DEF); +} + +void +ddp_uninit_toep(struct toepcb *toep) +{ + + mtx_destroy(&toep->ddp_lock); } void release_ddp_resources(struct toepcb *toep) { + struct pageset *ps; int i; + DDP_LOCK(toep); + toep->flags |= DDP_DEAD; for (i = 0; i < nitems(toep->db); i++) { - if (toep->db[i] != NULL) { - free_ddp_buffer(toep->td, toep->db[i]); - toep->db[i] = NULL; - } + free_ddp_buffer(toep->td, &toep->db[i]); } + while ((ps = TAILQ_FIRST(&toep->ddp_cached_pagesets)) != NULL) { + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + free_pageset(toep->td, ps); + } + ddp_complete_all(toep, 0); + DDP_UNLOCK(toep); +} + +#ifdef INVARIANTS +void +ddp_assert_empty(struct toepcb *toep) +{ + int i; + + MPASS(!(toep->ddp_flags & DDP_TASK_ACTIVE)); + for (i = 0; i < nitems(toep->db); i++) { + MPASS(toep->db[i].job == NULL); + MPASS(toep->db[i].ps == NULL); + } + MPASS(TAILQ_EMPTY(&toep->ddp_cached_pagesets)); + MPASS(TAILQ_EMPTY(&toep->ddp_aiojobq)); +} +#endif + +static void +complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db, + unsigned int db_idx) +{ + unsigned int db_flag; + + toep->ddp_active_count--; + if (toep->ddp_active_id == db_idx) { + if (toep->ddp_active_count == 0) { + KASSERT(toep->db[db_idx ^ 1].job == NULL, + ("%s: active_count mismatch", __func__)); + toep->ddp_active_id = -1; + } else + toep->ddp_active_id ^= 1; + CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__, + toep->ddp_active_id); + } else { + KASSERT(toep->ddp_active_count != 0 && + toep->ddp_active_id != -1, + ("%s: active count mismatch", __func__)); + } + + db->cancel_pending = 0; + db->job = NULL; + recycle_pageset(toep, db->ps); + db->ps = NULL; + + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + KASSERT(toep->ddp_flags & db_flag, + ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x", + __func__, toep, toep->ddp_flags)); + toep->ddp_flags &= ~db_flag; } /* XXX: handle_ddp_data code duplication */ @@ -147,28 +322,59 @@ insert_ddp_data(struct toepcb *toep, uint32_t n) { struct inpcb *inp = toep->inp; struct tcpcb *tp = intotcpcb(inp); - struct sockbuf *sb = &inp->inp_socket->so_rcv; - struct mbuf *m; + struct ddp_buffer *db; + struct kaiocb *job; + size_t placed; + long copied; + unsigned int db_flag, db_idx; INP_WLOCK_ASSERT(inp); - SOCKBUF_LOCK_ASSERT(sb); + DDP_ASSERT_LOCKED(toep); - m = get_ddp_mbuf(n); tp->rcv_nxt += n; #ifndef USE_DDP_RX_FLOW_CONTROL KASSERT(tp->rcv_wnd >= n, ("%s: negative window size", __func__)); tp->rcv_wnd -= n; #endif - - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= n; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += n; #endif - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); + CTR2(KTR_CXGBE, "%s: placed %u bytes before falling out of DDP", + __func__, n); + while (toep->ddp_active_count > 0) { + MPASS(toep->ddp_active_id != -1); + db_idx = toep->ddp_active_id; + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + MPASS((toep->ddp_flags & db_flag) != 0); + db = &toep->db[db_idx]; + job = db->job; + copied = job->uaiocb._aiocb_private.status; + placed = n; + if (placed > job->uaiocb.aio_nbytes - copied) + placed = job->uaiocb.aio_nbytes - copied; + if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this + * request. + */ + job->uaiocb._aiocb_private.status += placed; + } else if (copied + placed != 0) { + CTR4(KTR_CXGBE, + "%s: completing %p (copied %ld, placed %lu)", + __func__, job, copied, placed); + /* XXX: This always completes if there is some data. */ + aio_complete(job, copied + placed, 0); + } else if (aio_set_cancel_function(job, t4_aio_cancel_queued)) { + TAILQ_INSERT_HEAD(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count++; + } else + aio_cancel(job); + n -= placed; + complete_ddp_buffer(toep, db, db_idx); + } + + MPASS(n == 0); } /* SET_TCB_FIELD sent as a ULP command looks like this */ @@ -236,42 +442,10 @@ mk_rx_data_ack_ulp(struct ulp_txpkt *ulpmc, struct toepcb *toep) return (ulpsc); } -static inline uint64_t -select_ddp_flags(struct socket *so, int flags, int db_idx) -{ - uint64_t ddp_flags = V_TF_DDP_INDICATE_OUT(0); - int waitall = flags & MSG_WAITALL; - int nb = so->so_state & SS_NBIO || flags & (MSG_DONTWAIT | MSG_NBIO); - - KASSERT(db_idx == 0 || db_idx == 1, - ("%s: bad DDP buffer index %d", __func__, db_idx)); - - if (db_idx == 0) { - ddp_flags |= V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_ACTIVE_BUF(0); - if (waitall) - ddp_flags |= V_TF_DDP_PUSH_DISABLE_0(1); - else if (nb) - ddp_flags |= V_TF_DDP_BUF0_FLUSH(1); - else - ddp_flags |= V_TF_DDP_BUF0_FLUSH(0); - } else { - ddp_flags |= V_TF_DDP_BUF1_VALID(1) | V_TF_DDP_ACTIVE_BUF(1); - if (waitall) - ddp_flags |= V_TF_DDP_PUSH_DISABLE_1(1); - else if (nb) - ddp_flags |= V_TF_DDP_BUF1_FLUSH(1); - else - ddp_flags |= V_TF_DDP_BUF1_FLUSH(0); - } - - return (ddp_flags); -} - static struct wrqe * mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, - int offset, uint64_t ddp_flags) + struct pageset *ps, int offset, uint64_t ddp_flags, uint64_t ddp_flags_mask) { - struct ddp_buffer *db = toep->db[db_idx]; struct wrqe *wr; struct work_request_hdr *wrh; struct ulp_txpkt *ulpmc; @@ -302,7 +476,7 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_BUF0_TAG + db_idx, V_TCB_RX_DDP_BUF0_TAG(M_TCB_RX_DDP_BUF0_TAG), - V_TCB_RX_DDP_BUF0_TAG(db->tag)); + V_TCB_RX_DDP_BUF0_TAG(ps->tag)); /* Update the current offset in the DDP buffer and its total length */ if (db_idx == 0) @@ -311,21 +485,18 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, V_TCB_RX_DDP_BUF0_OFFSET(M_TCB_RX_DDP_BUF0_OFFSET) | V_TCB_RX_DDP_BUF0_LEN(M_TCB_RX_DDP_BUF0_LEN), V_TCB_RX_DDP_BUF0_OFFSET(offset) | - V_TCB_RX_DDP_BUF0_LEN(db->len)); + V_TCB_RX_DDP_BUF0_LEN(ps->len)); else ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_BUF1_OFFSET, V_TCB_RX_DDP_BUF1_OFFSET(M_TCB_RX_DDP_BUF1_OFFSET) | V_TCB_RX_DDP_BUF1_LEN((u64)M_TCB_RX_DDP_BUF1_LEN << 32), V_TCB_RX_DDP_BUF1_OFFSET(offset) | - V_TCB_RX_DDP_BUF1_LEN((u64)db->len << 32)); + V_TCB_RX_DDP_BUF1_LEN((u64)ps->len << 32)); /* Update DDP flags */ ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_FLAGS, - V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF1_FLUSH(1) | - V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PUSH_DISABLE_1(1) | - V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1) | - V_TF_DDP_ACTIVE_BUF(1) | V_TF_DDP_INDICATE_OUT(1), ddp_flags); + ddp_flags_mask, ddp_flags); /* Gratuitous RX_DATA_ACK with RX_MODULATE set to speed up delivery. */ ulpmc = mk_rx_data_ack_ulp(ulpmc, toep); @@ -333,30 +504,20 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx, return (wr); } -static void -discourage_ddp(struct toepcb *toep) -{ - - if (toep->ddp_score && --toep->ddp_score == 0) { - toep->ddp_flags &= ~DDP_OK; - toep->ddp_disabled = time_uptime; - CTR3(KTR_CXGBE, "%s: tid %u !DDP_OK @ %u", - __func__, toep->tid, time_uptime); - } -} - static int handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) { uint32_t report = be32toh(ddp_report); - unsigned int db_flag; + unsigned int db_idx; struct inpcb *inp = toep->inp; + struct ddp_buffer *db; struct tcpcb *tp; struct socket *so; struct sockbuf *sb; - struct mbuf *m; + struct kaiocb *job; + long copied; - db_flag = report & F_DDP_BUF_IDX ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + db_idx = report & F_DDP_BUF_IDX ? 1 : 0; if (__predict_false(!(report & F_DDP_INV))) CXGBE_UNIMPLEMENTED("DDP buffer still valid"); @@ -364,19 +525,24 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) INP_WLOCK(inp); so = inp_inpcbtosocket(inp); sb = &so->so_rcv; + DDP_LOCK(toep); + + KASSERT(toep->ddp_active_id == db_idx, + ("completed DDP buffer (%d) != active_id (%d) for tid %d", db_idx, + toep->ddp_active_id, toep->tid)); + db = &toep->db[db_idx]; + job = db->job; + if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT))) { - /* - * XXX: think a bit more. - * tcpcb probably gone, but socket should still be around - * because we always wait for DDP completion in soreceive no - * matter what. Just wake it up and let it clean up. + * This can happen due to an administrative tcpdrop(8). + * Just fail the request with ECONNRESET. */ - CTR5(KTR_CXGBE, "%s: tid %u, seq 0x%x, len %d, inp_flags 0x%x", __func__, toep->tid, be32toh(rcv_nxt), len, inp->inp_flags); - SOCKBUF_LOCK(sb); - goto wakeup; + if (aio_clear_cancel_function(job)) + ddp_complete_one(job, ECONNRESET); + goto completed; } tp = intotcpcb(inp); @@ -386,7 +552,7 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) * sequence number of the next byte to receive. The length of * the data received for this message must be computed by * comparing the new and old values of rcv_nxt. - * + * * For RX_DATA_DDP, len might be non-zero, but it is only the * length of the most recent DMA. It does not include the * total length of the data received since the previous update @@ -400,15 +566,14 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) KASSERT(tp->rcv_wnd >= len, ("%s: negative window size", __func__)); tp->rcv_wnd -= len; #endif - m = get_ddp_mbuf(len); - - SOCKBUF_LOCK(sb); - if (report & F_DDP_BUF_COMPLETE) - toep->ddp_score = DDP_HIGH_SCORE; - else - discourage_ddp(toep); +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: DDP[%d] placed %d bytes (%#x)", __func__, db_idx, + len, report); +#endif /* receive buffer autosize */ + CURVNET_SET(so->so_vnet); + SOCKBUF_LOCK(sb); if (sb->sb_flags & SB_AUTOSIZE && V_tcp_do_autorcvbuf && sb->sb_hiwat < V_tcp_autorcvbuf_max && @@ -422,57 +587,185 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) else toep->rx_credits += newsize - hiwat; } + SOCKBUF_UNLOCK(sb); + CURVNET_RESTORE(); - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= len; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += len; #endif - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); -wakeup: - KASSERT(toep->ddp_flags & db_flag, - ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x, report 0x%x", - __func__, toep, toep->ddp_flags, report)); - toep->ddp_flags &= ~db_flag; - sorwakeup_locked(so); - SOCKBUF_UNLOCK_ASSERT(sb); + if (db->cancel_pending) { + /* + * Update the job's length but defer completion to the + * TCB_RPL callback. + */ + job->uaiocb._aiocb_private.status += len; + goto out; + } else if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this request. + */ + job->uaiocb._aiocb_private.status += len; + } else { + copied = job->uaiocb._aiocb_private.status; +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: completing %p (copied %ld, placed %d)", + __func__, job, copied, len); +#endif + aio_complete(job, copied + len, 0); + t4_rcvd(&toep->td->tod, tp); + } + +completed: + complete_ddp_buffer(toep, db, db_idx); + if (toep->ddp_waiting_count > 0) + ddp_queue_toep(toep); +out: + DDP_UNLOCK(toep); INP_WUNLOCK(inp); + return (0); } void -handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, struct sockbuf *sb, - __be32 rcv_nxt) +handle_ddp_indicate(struct toepcb *toep) { - struct mbuf *m; - int len; - SOCKBUF_LOCK_ASSERT(sb); + DDP_ASSERT_LOCKED(toep); + MPASS(toep->ddp_active_count == 0); + MPASS((toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0); + if (toep->ddp_waiting_count == 0) { + /* + * The pending requests that triggered the request for an + * an indicate were cancelled. Those cancels should have + * already disabled DDP. Just ignore this as the data is + * going into the socket buffer anyway. + */ + return; + } + CTR3(KTR_CXGBE, "%s: tid %d indicated (%d waiting)", __func__, + toep->tid, toep->ddp_waiting_count); + ddp_queue_toep(toep); +} + +enum { + DDP_BUF0_INVALIDATED = 0x2, + DDP_BUF1_INVALIDATED +}; + +void +handle_ddp_tcb_rpl(struct toepcb *toep, const struct cpl_set_tcb_rpl *cpl) +{ + unsigned int db_idx; + struct inpcb *inp = toep->inp; + struct ddp_buffer *db; + struct kaiocb *job; + long copied; + + if (cpl->status != CPL_ERR_NONE) + panic("XXX: tcp_rpl failed: %d", cpl->status); + + switch (cpl->cookie) { + case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF0_INVALIDATED): + case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF1_INVALIDATED): + /* + * XXX: This duplicates a lot of code with handle_ddp_data(). + */ + db_idx = G_COOKIE(cpl->cookie) - DDP_BUF0_INVALIDATED; + INP_WLOCK(inp); + DDP_LOCK(toep); + db = &toep->db[db_idx]; + + /* + * handle_ddp_data() should leave the job around until + * this callback runs once a cancel is pending. + */ + MPASS(db != NULL); + MPASS(db->job != NULL); + MPASS(db->cancel_pending); + + /* + * XXX: It's not clear what happens if there is data + * placed when the buffer is invalidated. I suspect we + * need to read the TCB to see how much data was placed. + * + * For now this just pretends like nothing was placed. + * + * XXX: Note that if we did check the PCB we would need to + * also take care of updating the tp, etc. + */ + job = db->job; + copied = job->uaiocb._aiocb_private.status; + if (copied == 0) { + CTR2(KTR_CXGBE, "%s: cancelling %p", __func__, job); + aio_cancel(job); + } else { + CTR3(KTR_CXGBE, "%s: completing %p (copied %ld)", + __func__, job, copied); + aio_complete(job, copied, 0); + t4_rcvd(&toep->td->tod, intotcpcb(inp)); + } + + complete_ddp_buffer(toep, db, db_idx); + if (toep->ddp_waiting_count > 0) + ddp_queue_toep(toep); + DDP_UNLOCK(toep); + INP_WUNLOCK(inp); + break; + default: + panic("XXX: unknown tcb_rpl offset %#x, cookie %#x", + G_WORD(cpl->cookie), G_COOKIE(cpl->cookie)); + } +} + +void +handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt) +{ + struct ddp_buffer *db; + struct kaiocb *job; + long copied; + unsigned int db_flag, db_idx; + int len, placed; + INP_WLOCK_ASSERT(toep->inp); + DDP_ASSERT_LOCKED(toep); len = be32toh(rcv_nxt) - tp->rcv_nxt; - /* Signal handle_ddp() to break out of its sleep loop. */ - toep->ddp_flags &= ~(DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE); - if (len == 0) - return; - tp->rcv_nxt += len; - KASSERT(toep->sb_cc >= sbused(sb), - ("%s: sb %p has more data (%d) than last time (%d).", - __func__, sb, sbused(sb), toep->sb_cc)); - toep->rx_credits += toep->sb_cc - sbused(sb); -#ifdef USE_DDP_RX_FLOW_CONTROL - toep->rx_credits -= len; /* adjust for F_RX_FC_DDP */ +#ifndef USE_DDP_RX_FLOW_CONTROL + toep->rx_credits += len; #endif - m = get_ddp_mbuf(len); + while (toep->ddp_active_count > 0) { + MPASS(toep->ddp_active_id != -1); + db_idx = toep->ddp_active_id; + db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE; + MPASS((toep->ddp_flags & db_flag) != 0); + db = &toep->db[db_idx]; + job = db->job; + copied = job->uaiocb._aiocb_private.status; + placed = len; + if (placed > job->uaiocb.aio_nbytes - copied) + placed = job->uaiocb.aio_nbytes - copied; + if (!aio_clear_cancel_function(job)) { + /* + * Update the copied length for when + * t4_aio_cancel_active() completes this + * request. + */ + job->uaiocb._aiocb_private.status += placed; + } else { + CTR4(KTR_CXGBE, "%s: tid %d completed buf %d len %d", + __func__, toep->tid, db_idx, placed); + aio_complete(job, copied + placed, 0); + } + len -= placed; + complete_ddp_buffer(toep, db, db_idx); + } - sbappendstream_locked(sb, m, 0); - toep->sb_cc = sbused(sb); + MPASS(len == 0); + ddp_complete_all(toep, 0); } #define DDP_ERR (F_DDP_PPOD_MISMATCH | F_DDP_LLIMIT_ERR | F_DDP_ULIMIT_ERR |\ @@ -529,7 +822,7 @@ do_rx_ddp_complete(struct sge_iq *iq, const struct rss_header *rss, return (0); } -void +static void enable_ddp(struct adapter *sc, struct toepcb *toep) { @@ -540,6 +833,7 @@ enable_ddp(struct adapter *sc, struct toepcb *toep) CTR3(KTR_CXGBE, "%s: tid %u (time %u)", __func__, toep->tid, time_uptime); + DDP_ASSERT_LOCKED(toep); toep->ddp_flags |= DDP_SC_REQ; t4_set_tcb_field(sc, toep, 1, W_TCB_RX_DDP_FLAGS, V_TF_DDP_OFF(1) | V_TF_DDP_INDICATE_OUT(1) | @@ -550,81 +844,6 @@ enable_ddp(struct adapter *sc, struct toepcb *toep) V_TF_RCV_COALESCE_ENABLE(1), 0); } -static inline void -disable_ddp(struct adapter *sc, struct toepcb *toep) -{ - - KASSERT((toep->ddp_flags & (DDP_ON | DDP_SC_REQ)) == DDP_ON, - ("%s: toep %p has bad ddp_flags 0x%x", - __func__, toep, toep->ddp_flags)); - - CTR3(KTR_CXGBE, "%s: tid %u (time %u)", - __func__, toep->tid, time_uptime); - - toep->ddp_flags |= DDP_SC_REQ; - t4_set_tcb_field(sc, toep, 1, W_TCB_T_FLAGS, - V_TF_RCV_COALESCE_ENABLE(1), V_TF_RCV_COALESCE_ENABLE(1)); - t4_set_tcb_field(sc, toep, 1, W_TCB_RX_DDP_FLAGS, V_TF_DDP_OFF(1), - V_TF_DDP_OFF(1)); -} - -static int -hold_uio(struct uio *uio, vm_page_t **ppages, int *pnpages) -{ - struct vm_map *map; - struct iovec *iov; - vm_offset_t start, end; - vm_page_t *pp; - int n; - - KASSERT(uio->uio_iovcnt == 1, - ("%s: uio_iovcnt %d", __func__, uio->uio_iovcnt)); - KASSERT(uio->uio_td->td_proc == curproc, - ("%s: uio proc (%p) is not curproc (%p)", - __func__, uio->uio_td->td_proc, curproc)); - - map = &curproc->p_vmspace->vm_map; - iov = &uio->uio_iov[0]; - start = trunc_page((uintptr_t)iov->iov_base); - end = round_page((vm_offset_t)iov->iov_base + iov->iov_len); - n = howmany(end - start, PAGE_SIZE); - - if (end - start > MAX_DDP_BUFFER_SIZE) - return (E2BIG); - - pp = malloc(n * sizeof(vm_page_t), M_CXGBE, M_NOWAIT); - if (pp == NULL) - return (ENOMEM); - - if (vm_fault_quick_hold_pages(map, (vm_offset_t)iov->iov_base, - iov->iov_len, VM_PROT_WRITE, pp, n) < 0) { - free(pp, M_CXGBE); - return (EFAULT); - } - - *ppages = pp; - *pnpages = n; - - return (0); -} - -static int -bufcmp(struct ddp_buffer *db, vm_page_t *pages, int npages, int offset, int len) -{ - int i; - - if (db == NULL || db->npages != npages || db->offset != offset || - db->len != len) - return (1); - - for (i = 0; i < npages; i++) { - if (pages[i]->phys_addr != db->pages[i]->phys_addr) - return (1); - } - - return (0); -} - static int calculate_hcf(int n1, int n2) { @@ -647,12 +866,13 @@ calculate_hcf(int n1, int n2) return (b); } -static struct ddp_buffer * -alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, - int len) +static int +alloc_page_pods(struct tom_data *td, struct pageset *ps) { int i, hcf, seglen, idx, ppod, nppods; - struct ddp_buffer *db; + u_int ppod_addr; + + KASSERT(ps->nppods == 0, ("%s: page pods already allocated", __func__)); /* * The DDP page size is unrelated to the VM page size. We combine @@ -662,10 +882,11 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, * the page list. */ hcf = 0; - for (i = 0; i < npages; i++) { + for (i = 0; i < ps->npages; i++) { seglen = PAGE_SIZE; - while (i < npages - 1 && - pages[i]->phys_addr + PAGE_SIZE == pages[i + 1]->phys_addr) { + while (i < ps->npages - 1 && + ps->pages[i]->phys_addr + PAGE_SIZE == + ps->pages[i + 1]->phys_addr) { seglen += PAGE_SIZE; i++; } @@ -683,7 +904,7 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, ("%s: PAGE_SIZE %d, hcf %d", __func__, PAGE_SIZE, hcf)); CTR3(KTR_CXGBE, "%s: PAGE_SIZE %d, hcf %d", __func__, PAGE_SIZE, hcf); - return (NULL); + return (0); } for (idx = nitems(t4_ddp_pgsz) - 1; idx > 0; idx--) { @@ -693,40 +914,29 @@ alloc_ddp_buffer(struct tom_data *td, vm_page_t *pages, int npages, int offset, have_pgsz: MPASS(idx <= M_PPOD_PGSZ); - db = malloc(sizeof(*db), M_CXGBE, M_NOWAIT); - if (db == NULL) { - CTR1(KTR_CXGBE, "%s: malloc failed.", __func__); - return (NULL); + nppods = pages_to_nppods(ps->npages, t4_ddp_pgsz[idx]); + if (alloc_ppods(td, nppods, &ppod_addr) != 0) { + CTR4(KTR_CXGBE, "%s: no pods, nppods %d, npages %d, pgsz %d", + __func__, nppods, ps->npages, t4_ddp_pgsz[idx]); + return (0); } - nppods = pages_to_nppods(npages, t4_ddp_pgsz[idx]); - if (alloc_ppods(td, nppods, &db->ppod_addr) != 0) { - free(db, M_CXGBE); - CTR4(KTR_CXGBE, "%s: no pods, nppods %d, resid %d, pgsz %d", - __func__, nppods, len, t4_ddp_pgsz[idx]); - return (NULL); - } - ppod = (db->ppod_addr - td->ppod_start) / PPOD_SIZE; + ppod = (ppod_addr - td->ppod_start) / PPOD_SIZE; + ps->tag = V_PPOD_PGSZ(idx) | V_PPOD_TAG(ppod); + ps->ppod_addr = ppod_addr; + ps->nppods = nppods; - db->tag = V_PPOD_PGSZ(idx) | V_PPOD_TAG(ppod); - db->nppods = nppods; - db->npages = npages; - db->pages = pages; - db->offset = offset; - db->len = len; + CTR5(KTR_CXGBE, "New page pods. " + "ps %p, ddp_pgsz %d, ppod 0x%x, npages %d, nppods %d", + ps, t4_ddp_pgsz[idx], ppod, ps->npages, ps->nppods); - CTR6(KTR_CXGBE, "New DDP buffer. " - "ddp_pgsz %d, ppod 0x%x, npages %d, nppods %d, offset %d, len %d", - t4_ddp_pgsz[idx], ppod, db->npages, db->nppods, db->offset, - db->len); - - return (db); + return (1); } #define NUM_ULP_TX_SC_IMM_PPODS (256 / PPOD_SIZE) static int -write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) +write_page_pods(struct adapter *sc, struct toepcb *toep, struct pageset *ps) { struct wrqe *wr; struct ulp_mem_io *ulpmc; @@ -736,17 +946,20 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) u_int ppod_addr; uint32_t cmd; + KASSERT(!(ps->flags & PS_PPODS_WRITTEN), + ("%s: page pods already written", __func__)); + cmd = htobe32(V_ULPTX_CMD(ULP_TX_MEM_WRITE)); if (is_t4(sc)) cmd |= htobe32(F_ULP_MEMIO_ORDER); else cmd |= htobe32(F_T5_ULP_MEMIO_IMM); - ddp_pgsz = t4_ddp_pgsz[G_PPOD_PGSZ(db->tag)]; - ppod_addr = db->ppod_addr; - for (i = 0; i < db->nppods; ppod_addr += chunk) { + ddp_pgsz = t4_ddp_pgsz[G_PPOD_PGSZ(ps->tag)]; + ppod_addr = ps->ppod_addr; + for (i = 0; i < ps->nppods; ppod_addr += chunk) { /* How many page pods are we writing in this cycle */ - n = min(db->nppods - i, NUM_ULP_TX_SC_IMM_PPODS); + n = min(ps->nppods - i, NUM_ULP_TX_SC_IMM_PPODS); chunk = PPOD_SZ(n); len = roundup2(sizeof(*ulpmc) + sizeof(*ulpsc) + chunk, 16); @@ -768,15 +981,15 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) ppod = (struct pagepod *)(ulpsc + 1); for (j = 0; j < n; i++, j++, ppod++) { ppod->vld_tid_pgsz_tag_color = htobe64(F_PPOD_VALID | - V_PPOD_TID(toep->tid) | db->tag); - ppod->len_offset = htobe64(V_PPOD_LEN(db->len) | - V_PPOD_OFST(db->offset)); + V_PPOD_TID(toep->tid) | ps->tag); + ppod->len_offset = htobe64(V_PPOD_LEN(ps->len) | + V_PPOD_OFST(ps->offset)); ppod->rsvd = 0; idx = i * PPOD_PAGES * (ddp_pgsz / PAGE_SIZE); for (k = 0; k < nitems(ppod->addr); k++) { - if (idx < db->npages) { + if (idx < ps->npages) { ppod->addr[k] = - htobe64(db->pages[idx]->phys_addr); + htobe64(ps->pages[idx]->phys_addr); idx += ddp_pgsz / PAGE_SIZE; } else ppod->addr[k] = 0; @@ -792,184 +1005,49 @@ write_page_pods(struct adapter *sc, struct toepcb *toep, struct ddp_buffer *db) t4_wrq_tx(sc, wr); } + ps->flags |= PS_PPODS_WRITTEN; return (0); } -/* - * Reuse, or allocate (and program the page pods for) a new DDP buffer. The - * "pages" array is handed over to this function and should not be used in any - * way by the caller after that. - */ -static int -select_ddp_buffer(struct adapter *sc, struct toepcb *toep, vm_page_t *pages, - int npages, int db_off, int db_len) -{ - struct ddp_buffer *db; - struct tom_data *td = sc->tom_softc; - int i, empty_slot = -1; - - /* Try to reuse */ - for (i = 0; i < nitems(toep->db); i++) { - if (bufcmp(toep->db[i], pages, npages, db_off, db_len) == 0) { - free(pages, M_CXGBE); - return (i); /* pages still held */ - } else if (toep->db[i] == NULL && empty_slot < 0) - empty_slot = i; - } - - /* Allocate new buffer, write its page pods. */ - db = alloc_ddp_buffer(td, pages, npages, db_off, db_len); - if (db == NULL) { - vm_page_unhold_pages(pages, npages); - free(pages, M_CXGBE); - return (-1); - } - if (write_page_pods(sc, toep, db) != 0) { - vm_page_unhold_pages(pages, npages); - free_ddp_buffer(td, db); - return (-1); - } - - i = empty_slot; - if (i < 0) { - i = arc4random() % nitems(toep->db); - free_ddp_buffer(td, toep->db[i]); - } - toep->db[i] = db; - - CTR5(KTR_CXGBE, "%s: tid %d, DDP buffer[%d] = %p (tag 0x%x)", - __func__, toep->tid, i, db, db->tag); - - return (i); -} - static void -wire_ddp_buffer(struct ddp_buffer *db) +wire_pageset(struct pageset *ps) { - int i; vm_page_t p; + int i; - for (i = 0; i < db->npages; i++) { - p = db->pages[i]; + KASSERT(!(ps->flags & PS_WIRED), ("pageset already wired")); + + for (i = 0; i < ps->npages; i++) { + p = ps->pages[i]; vm_page_lock(p); vm_page_wire(p); vm_page_unhold(p); vm_page_unlock(p); } + ps->flags |= PS_WIRED; } -static void -unwire_ddp_buffer(struct ddp_buffer *db) -{ - int i; - vm_page_t p; - - for (i = 0; i < db->npages; i++) { - p = db->pages[i]; - vm_page_lock(p); - vm_page_unwire(p, PQ_INACTIVE); - vm_page_unlock(p); - } -} - +/* + * Prepare a pageset for DDP. This wires the pageset and sets up page + * pods. + */ static int -handle_ddp(struct socket *so, struct uio *uio, int flags, int error) +prep_pageset(struct adapter *sc, struct toepcb *toep, struct pageset *ps) { - struct sockbuf *sb = &so->so_rcv; - struct tcpcb *tp = so_sototcpcb(so); - struct toepcb *toep = tp->t_toe; - struct adapter *sc = td_adapter(toep->td); - vm_page_t *pages; - int npages, db_idx, rc, buf_flag; - struct ddp_buffer *db; - struct wrqe *wr; - uint64_t ddp_flags; + struct tom_data *td = sc->tom_softc; - SOCKBUF_LOCK_ASSERT(sb); - -#if 0 - if (sbused(sb) + sc->tt.ddp_thres > uio->uio_resid) { - CTR4(KTR_CXGBE, "%s: sb_cc %d, threshold %d, resid %d", - __func__, sbused(sb), sc->tt.ddp_thres, uio->uio_resid); + if (!(ps->flags & PS_WIRED)) + wire_pageset(ps); + if (ps->nppods == 0 && !alloc_page_pods(td, ps)) { + return (0); } -#endif - - /* XXX: too eager to disable DDP, could handle NBIO better than this. */ - if (sbused(sb) >= uio->uio_resid || uio->uio_resid < sc->tt.ddp_thres || - uio->uio_resid > MAX_DDP_BUFFER_SIZE || uio->uio_iovcnt > 1 || - so->so_state & SS_NBIO || flags & (MSG_DONTWAIT | MSG_NBIO) || - error || so->so_error || sb->sb_state & SBS_CANTRCVMORE) - goto no_ddp; - - /* - * Fault in and then hold the pages of the uio buffers. We'll wire them - * a bit later if everything else works out. - */ - SOCKBUF_UNLOCK(sb); - if (hold_uio(uio, &pages, &npages) != 0) { - SOCKBUF_LOCK(sb); - goto no_ddp; - } - SOCKBUF_LOCK(sb); - if (__predict_false(so->so_error || sb->sb_state & SBS_CANTRCVMORE)) { - vm_page_unhold_pages(pages, npages); - free(pages, M_CXGBE); - goto no_ddp; + if (!(ps->flags & PS_PPODS_WRITTEN) && + write_page_pods(sc, toep, ps) != 0) { + return (0); } - /* - * Figure out which one of the two DDP buffers to use this time. - */ - db_idx = select_ddp_buffer(sc, toep, pages, npages, - (uintptr_t)uio->uio_iov->iov_base & PAGE_MASK, uio->uio_resid); - pages = NULL; /* handed off to select_ddp_buffer */ - if (db_idx < 0) - goto no_ddp; - db = toep->db[db_idx]; - buf_flag = db_idx == 0 ? DDP_BUF0_ACTIVE : DDP_BUF1_ACTIVE; - - /* - * Build the compound work request that tells the chip where to DMA the - * payload. - */ - ddp_flags = select_ddp_flags(so, flags, db_idx); - wr = mk_update_tcb_for_ddp(sc, toep, db_idx, sbused(sb), ddp_flags); - if (wr == NULL) { - /* - * Just unhold the pages. The DDP buffer's software state is - * left as-is in the toep. The page pods were written - * successfully and we may have an opportunity to use it in the - * future. - */ - vm_page_unhold_pages(db->pages, db->npages); - goto no_ddp; - } - - /* Wire (and then unhold) the pages, and give the chip the go-ahead. */ - wire_ddp_buffer(db); - t4_wrq_tx(sc, wr); - sb->sb_flags &= ~SB_DDP_INDICATE; - toep->ddp_flags |= buf_flag; - - /* - * Wait for the DDP operation to complete and then unwire the pages. - * The return code from the sbwait will be the final return code of this - * function. But we do need to wait for DDP no matter what. - */ - rc = sbwait(sb); - while (toep->ddp_flags & buf_flag) { - /* XXXGL: shouldn't here be sbwait() call? */ - sb->sb_flags |= SB_WAIT; - msleep(&sb->sb_acc, &sb->sb_mtx, PSOCK , "sbwait", 0); - } - unwire_ddp_buffer(db); - return (rc); -no_ddp: - disable_ddp(sc, toep); - discourage_ddp(toep); - sb->sb_flags &= ~SB_DDP_INDICATE; - return (0); + return (1); } void @@ -994,287 +1072,689 @@ t4_uninit_ddp(struct adapter *sc __unused, struct tom_data *td) } } -#define VNET_SO_ASSERT(so) \ - VNET_ASSERT(curvnet != NULL, \ - ("%s:%d curvnet is NULL, so=%p", __func__, __LINE__, (so))); -#define SBLOCKWAIT(f) (((f) & MSG_DONTWAIT) ? 0 : SBL_WAIT) static int -soreceive_rcvoob(struct socket *so, struct uio *uio, int flags) +pscmp(struct pageset *ps, struct vmspace *vm, vm_offset_t start, int npages, + int pgoff, int len) { - CXGBE_UNIMPLEMENTED(__func__); + if (ps->npages != npages || ps->offset != pgoff || ps->len != len) + return (1); + + return (ps->vm != vm || ps->vm_timestamp != vm->vm_map.timestamp); } -static char ddp_magic_str[] = "nothing to see here"; - -static struct mbuf * -get_ddp_mbuf(int len) -{ - struct mbuf *m; - - m = m_get(M_NOWAIT, MT_DATA); - if (m == NULL) - CXGBE_UNIMPLEMENTED("mbuf alloc failure"); - m->m_len = len; - m->m_data = &ddp_magic_str[0]; - - return (m); -} - -static inline int -is_ddp_mbuf(struct mbuf *m) -{ - - return (m->m_data == &ddp_magic_str[0]); -} - -/* - * Copy an mbuf chain into a uio limited by len if set. - */ static int -m_mbuftouio_ddp(struct uio *uio, struct mbuf *m, int len) +hold_aio(struct toepcb *toep, struct kaiocb *job, struct pageset **pps) { - int error, length, total; - int progress = 0; + struct vmspace *vm; + vm_map_t map; + vm_offset_t start, end, pgoff; + struct pageset *ps; + int n; - if (len > 0) - total = min(uio->uio_resid, len); - else - total = uio->uio_resid; + DDP_ASSERT_LOCKED(toep); - /* Fill the uio with data from the mbufs. */ - for (; m != NULL; m = m->m_next) { - length = min(m->m_len, total - progress); + /* + * The AIO subsystem will cancel and drain all requests before + * permitting a process to exit or exec, so p_vmspace should + * be stable here. + */ + vm = job->userproc->p_vmspace; + map = &vm->vm_map; + start = (uintptr_t)job->uaiocb.aio_buf; + pgoff = start & PAGE_MASK; + end = round_page(start + job->uaiocb.aio_nbytes); + start = trunc_page(start); - if (is_ddp_mbuf(m)) { - enum uio_seg segflag = uio->uio_segflg; - - uio->uio_segflg = UIO_NOCOPY; - error = uiomove(mtod(m, void *), length, uio); - uio->uio_segflg = segflag; - } else - error = uiomove(mtod(m, void *), length, uio); - if (error) - return (error); - - progress += length; + if (end - start > MAX_DDP_BUFFER_SIZE) { + /* + * Truncate the request to a short read. + * Alternatively, we could DDP in chunks to the larger + * buffer, but that would be quite a bit more work. + * + * When truncating, round the request down to avoid + * crossing a cache line on the final transaction. + */ + end = rounddown2(start + MAX_DDP_BUFFER_SIZE, CACHE_LINE_SIZE); +#ifdef VERBOSE_TRACES + CTR4(KTR_CXGBE, "%s: tid %d, truncating size from %lu to %lu", + __func__, toep->tid, (unsigned long)job->uaiocb.aio_nbytes, + (unsigned long)(end - (start + pgoff))); + job->uaiocb.aio_nbytes = end - (start + pgoff); +#endif + end = round_page(end); } + n = atop(end - start); + + /* + * Try to reuse a cached pageset. + */ + TAILQ_FOREACH(ps, &toep->ddp_cached_pagesets, link) { + if (pscmp(ps, vm, start, n, pgoff, + job->uaiocb.aio_nbytes) == 0) { + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count--; + *pps = ps; + return (0); + } + } + + /* + * If there are too many cached pagesets to create a new one, + * free a pageset before creating a new one. + */ + KASSERT(toep->ddp_active_count + toep->ddp_cached_count <= + nitems(toep->db), ("%s: too many wired pagesets", __func__)); + if (toep->ddp_active_count + toep->ddp_cached_count == + nitems(toep->db)) { + KASSERT(toep->ddp_cached_count > 0, + ("no cached pageset to free")); + ps = TAILQ_LAST(&toep->ddp_cached_pagesets, pagesetq); + TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link); + toep->ddp_cached_count--; + free_pageset(toep->td, ps); + } + DDP_UNLOCK(toep); + + /* Create a new pageset. */ + ps = malloc(sizeof(*ps) + n * sizeof(vm_page_t), M_CXGBE, M_WAITOK | + M_ZERO); + ps->pages = (vm_page_t *)(ps + 1); + ps->vm_timestamp = map->timestamp; + ps->npages = vm_fault_quick_hold_pages(map, start, end - start, + VM_PROT_WRITE, ps->pages, n); + + DDP_LOCK(toep); + if (ps->npages < 0) { + free(ps, M_CXGBE); + return (EFAULT); + } + + KASSERT(ps->npages == n, ("hold_aio: page count mismatch: %d vs %d", + ps->npages, n)); + + ps->offset = pgoff; + ps->len = job->uaiocb.aio_nbytes; + atomic_add_int(&vm->vm_refcnt, 1); + ps->vm = vm; + + CTR5(KTR_CXGBE, "%s: tid %d, new pageset %p for job %p, npages %d", + __func__, toep->tid, ps, job, ps->npages); + *pps = ps; return (0); } -/* - * Based on soreceive_stream() in uipc_socket.c - */ -int -t4_soreceive_ddp(struct socket *so, struct sockaddr **psa, struct uio *uio, - struct mbuf **mp0, struct mbuf **controlp, int *flagsp) +static void +ddp_complete_all(struct toepcb *toep, int error) { - int len = 0, error = 0, flags, oresid, ddp_handled = 0; - struct sockbuf *sb; - struct mbuf *m, *n = NULL; + struct kaiocb *job; - /* We only do stream sockets. */ - if (so->so_type != SOCK_STREAM) - return (EINVAL); - if (psa != NULL) - *psa = NULL; - if (controlp != NULL) - return (EINVAL); - if (flagsp != NULL) - flags = *flagsp &~ MSG_EOR; - else - flags = 0; - if (flags & MSG_OOB) - return (soreceive_rcvoob(so, uio, flags)); - if (mp0 != NULL) - *mp0 = NULL; - - sb = &so->so_rcv; - - /* Prevent other readers from entering the socket. */ - error = sblock(sb, SBLOCKWAIT(flags)); - SOCKBUF_LOCK(sb); - if (error) - goto out; - - /* Easy one, no space to copyout anything. */ - if (uio->uio_resid == 0) { - error = EINVAL; - goto out; + DDP_ASSERT_LOCKED(toep); + while (!TAILQ_EMPTY(&toep->ddp_aiojobq)) { + job = TAILQ_FIRST(&toep->ddp_aiojobq); + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count--; + if (aio_clear_cancel_function(job)) + ddp_complete_one(job, error); } - oresid = uio->uio_resid; - - /* We will never ever get anything unless we are or were connected. */ - if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) { - error = ENOTCONN; - goto out; - } - -restart: - SOCKBUF_LOCK_ASSERT(&so->so_rcv); - - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) { - - /* uio should be just as it was at entry */ - KASSERT(oresid == uio->uio_resid, - ("%s: oresid = %d, uio_resid = %zd, sbavail = %d", - __func__, oresid, uio->uio_resid, sbavail(sb))); - - error = handle_ddp(so, uio, flags, 0); - ddp_handled = 1; - if (error) - goto out; - } - - /* Abort if socket has reported problems. */ - if (so->so_error) { - if (sbavail(sb)) - goto deliver; - if (oresid > uio->uio_resid) - goto out; - error = so->so_error; - if (!(flags & MSG_PEEK)) - so->so_error = 0; - goto out; - } - - /* Door is closed. Deliver what is left, if any. */ - if (sb->sb_state & SBS_CANTRCVMORE) { - if (sbavail(sb)) - goto deliver; - else - goto out; - } - - /* Socket buffer is empty and we shall not block. */ - if (sbavail(sb) == 0 && - ((so->so_state & SS_NBIO) || (flags & (MSG_DONTWAIT|MSG_NBIO)))) { - error = EAGAIN; - goto out; - } - - /* Socket buffer got some data that we shall deliver now. */ - if (sbavail(sb) > 0 && !(flags & MSG_WAITALL) && - ((so->so_state & SS_NBIO) || - (flags & (MSG_DONTWAIT|MSG_NBIO)) || - sbavail(sb) >= sb->sb_lowat || - sbavail(sb) >= uio->uio_resid || - sbavail(sb) >= sb->sb_hiwat) ) { - goto deliver; - } - - /* On MSG_WAITALL we must wait until all data or error arrives. */ - if ((flags & MSG_WAITALL) && - (sbavail(sb) >= uio->uio_resid || sbavail(sb) >= sb->sb_lowat)) - goto deliver; - - /* - * Wait and block until (more) data comes in. - * NB: Drops the sockbuf lock during wait. - */ - error = sbwait(sb); - if (error) { - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) { - (void) handle_ddp(so, uio, flags, 1); - ddp_handled = 1; - } - goto out; - } - goto restart; - -deliver: - SOCKBUF_LOCK_ASSERT(&so->so_rcv); - KASSERT(sbavail(sb) > 0, ("%s: sockbuf empty", __func__)); - KASSERT(sb->sb_mb != NULL, ("%s: sb_mb == NULL", __func__)); - - if (sb->sb_flags & SB_DDP_INDICATE && !ddp_handled) - goto restart; - - /* Statistics. */ - if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; - - /* Fill uio until full or current end of socket buffer is reached. */ - len = min(uio->uio_resid, sbavail(sb)); - if (mp0 != NULL) { - /* Dequeue as many mbufs as possible. */ - if (!(flags & MSG_PEEK) && len >= sb->sb_mb->m_len) { - for (*mp0 = m = sb->sb_mb; - m != NULL && m->m_len <= len; - m = m->m_next) { - len -= m->m_len; - uio->uio_resid -= m->m_len; - sbfree(sb, m); - n = m; - } - sb->sb_mb = m; - if (sb->sb_mb == NULL) - SB_EMPTY_FIXUP(sb); - n->m_next = NULL; - } - /* Copy the remainder. */ - if (len > 0) { - KASSERT(sb->sb_mb != NULL, - ("%s: len > 0 && sb->sb_mb empty", __func__)); - - m = m_copym(sb->sb_mb, 0, len, M_NOWAIT); - if (m == NULL) - len = 0; /* Don't flush data from sockbuf. */ - else - uio->uio_resid -= m->m_len; - if (*mp0 != NULL) - n->m_next = m; - else - *mp0 = m; - if (*mp0 == NULL) { - error = ENOBUFS; - goto out; - } - } - } else { - /* NB: Must unlock socket buffer as uiomove may sleep. */ - SOCKBUF_UNLOCK(sb); - error = m_mbuftouio_ddp(uio, sb->sb_mb, len); - SOCKBUF_LOCK(sb); - if (error) - goto out; - } - SBLASTRECORDCHK(sb); - SBLASTMBUFCHK(sb); - - /* - * Remove the delivered data from the socket buffer unless we - * were only peeking. - */ - if (!(flags & MSG_PEEK)) { - if (len > 0) - sbdrop_locked(sb, len); - - /* Notify protocol that we drained some data. */ - if ((so->so_proto->pr_flags & PR_WANTRCVD) && - (((flags & MSG_WAITALL) && uio->uio_resid > 0) || - !(flags & MSG_SOCALLBCK))) { - SOCKBUF_UNLOCK(sb); - VNET_SO_ASSERT(so); - (*so->so_proto->pr_usrreqs->pru_rcvd)(so, flags); - SOCKBUF_LOCK(sb); - } - } - - /* - * For MSG_WAITALL we may have to loop again and wait for - * more data to come in. - */ - if ((flags & MSG_WAITALL) && uio->uio_resid > 0) - goto restart; -out: - SOCKBUF_LOCK_ASSERT(sb); - SBLASTRECORDCHK(sb); - SBLASTMBUFCHK(sb); - SOCKBUF_UNLOCK(sb); - sbunlock(sb); - return (error); } +static void +aio_ddp_cancel_one(struct kaiocb *job) +{ + long copied; + + /* + * If this job had copied data out of the socket buffer before + * it was cancelled, report it as a short read rather than an + * error. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) + aio_complete(job, copied, 0); + else + aio_cancel(job); +} + +/* + * Called when the main loop wants to requeue a job to retry it later. + * Deals with the race of the job being cancelled while it was being + * examined. + */ +static void +aio_ddp_requeue_one(struct toepcb *toep, struct kaiocb *job) +{ + + DDP_ASSERT_LOCKED(toep); + if (!(toep->ddp_flags & DDP_DEAD) && + aio_set_cancel_function(job, t4_aio_cancel_queued)) { + TAILQ_INSERT_HEAD(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count++; + } else + aio_ddp_cancel_one(job); +} + +static void +aio_ddp_requeue(struct toepcb *toep) +{ + struct adapter *sc = td_adapter(toep->td); + struct socket *so; + struct sockbuf *sb; + struct inpcb *inp; + struct kaiocb *job; + struct ddp_buffer *db; + size_t copied, offset, resid; + struct pageset *ps; + struct mbuf *m; + uint64_t ddp_flags, ddp_flags_mask; + struct wrqe *wr; + int buf_flag, db_idx, error; + + DDP_ASSERT_LOCKED(toep); + +restart: + if (toep->ddp_flags & DDP_DEAD) { + MPASS(toep->ddp_waiting_count == 0); + MPASS(toep->ddp_active_count == 0); + return; + } + + if (toep->ddp_waiting_count == 0 || + toep->ddp_active_count == nitems(toep->db)) { + return; + } + + job = TAILQ_FIRST(&toep->ddp_aiojobq); + so = job->fd_file->f_data; + sb = &so->so_rcv; + SOCKBUF_LOCK(sb); + + /* We will never get anything unless we are or were connected. */ + if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) { + SOCKBUF_UNLOCK(sb); + ddp_complete_all(toep, ENOTCONN); + return; + } + + KASSERT(toep->ddp_active_count == 0 || sbavail(sb) == 0, + ("%s: pending sockbuf data and DDP is active", __func__)); + + /* Abort if socket has reported problems. */ + /* XXX: Wait for any queued DDP's to finish and/or flush them? */ + if (so->so_error && sbavail(sb) == 0) { + toep->ddp_waiting_count--; + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) { + SOCKBUF_UNLOCK(sb); + goto restart; + } + + /* + * If this job has previously copied some data, report + * a short read and leave the error to be reported by + * a future request. + */ + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) { + SOCKBUF_UNLOCK(sb); + aio_complete(job, copied, 0); + goto restart; + } + error = so->so_error; + so->so_error = 0; + SOCKBUF_UNLOCK(sb); + aio_complete(job, -1, error); + goto restart; + } + + /* + * Door is closed. If there is pending data in the socket buffer, + * deliver it. If there are pending DDP requests, wait for those + * to complete. Once they have completed, return EOF reads. + */ + if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) { + SOCKBUF_UNLOCK(sb); + if (toep->ddp_active_count != 0) + return; + ddp_complete_all(toep, 0); + return; + } + + /* + * If DDP is not enabled and there is no pending socket buffer + * data, try to enable DDP. + */ + if (sbavail(sb) == 0 && (toep->ddp_flags & DDP_ON) == 0) { + SOCKBUF_UNLOCK(sb); + + /* + * Wait for the card to ACK that DDP is enabled before + * queueing any buffers. Currently this waits for an + * indicate to arrive. This could use a TCB_SET_FIELD_RPL + * message to know that DDP was enabled instead of waiting + * for the indicate which would avoid copying the indicate + * if no data is pending. + * + * XXX: Might want to limit the indicate size to the size + * of the first queued request. + */ + if ((toep->ddp_flags & DDP_SC_REQ) == 0) + enable_ddp(sc, toep); + return; + } + SOCKBUF_UNLOCK(sb); + + /* + * If another thread is queueing a buffer for DDP, let it + * drain any work and return. + */ + if (toep->ddp_queueing != NULL) + return; + + /* Take the next job to prep it for DDP. */ + toep->ddp_waiting_count--; + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + if (!aio_clear_cancel_function(job)) + goto restart; + toep->ddp_queueing = job; + + /* NB: This drops DDP_LOCK while it holds the backing VM pages. */ + error = hold_aio(toep, job, &ps); + if (error != 0) { + ddp_complete_one(job, error); + toep->ddp_queueing = NULL; + goto restart; + } + + SOCKBUF_LOCK(sb); + if (so->so_error && sbavail(sb) == 0) { + copied = job->uaiocb._aiocb_private.status; + if (copied != 0) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, copied, 0); + toep->ddp_queueing = NULL; + goto restart; + } + + error = so->so_error; + so->so_error = 0; + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, -1, error); + toep->ddp_queueing = NULL; + goto restart; + } + + if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + if (toep->ddp_active_count != 0) { + /* + * The door is closed, but there are still pending + * DDP buffers. Requeue. These jobs will all be + * completed once those buffers drain. + */ + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + return; + } + ddp_complete_one(job, 0); + ddp_complete_all(toep, 0); + toep->ddp_queueing = NULL; + return; + } + +sbcopy: + /* + * If the toep is dead, there shouldn't be any data in the socket + * buffer, so the above case should have handled this. + */ + MPASS(!(toep->ddp_flags & DDP_DEAD)); + + /* + * If there is pending data in the socket buffer (either + * from before the requests were queued or a DDP indicate), + * copy those mbufs out directly. + */ + copied = 0; + offset = ps->offset + job->uaiocb._aiocb_private.status; + MPASS(job->uaiocb._aiocb_private.status <= job->uaiocb.aio_nbytes); + resid = job->uaiocb.aio_nbytes - job->uaiocb._aiocb_private.status; + m = sb->sb_mb; + KASSERT(m == NULL || toep->ddp_active_count == 0, + ("%s: sockbuf data with active DDP", __func__)); + while (m != NULL && resid > 0) { + struct iovec iov[1]; + struct uio uio; + int error; + + iov[0].iov_base = mtod(m, void *); + iov[0].iov_len = m->m_len; + if (iov[0].iov_len > resid) + iov[0].iov_len = resid; + uio.uio_iov = iov; + uio.uio_iovcnt = 1; + uio.uio_offset = 0; + uio.uio_resid = iov[0].iov_len; + uio.uio_segflg = UIO_SYSSPACE; + uio.uio_rw = UIO_WRITE; + error = uiomove_fromphys(ps->pages, offset + copied, + uio.uio_resid, &uio); + MPASS(error == 0 && uio.uio_resid == 0); + copied += uio.uio_offset; + resid -= uio.uio_offset; + m = m->m_next; + } + if (copied != 0) { + sbdrop_locked(sb, copied); + job->uaiocb._aiocb_private.status += copied; + copied = job->uaiocb._aiocb_private.status; + inp = sotoinpcb(so); + if (!INP_TRY_WLOCK(inp)) { + /* + * The reference on the socket file descriptor in + * the AIO job should keep 'sb' and 'inp' stable. + * Our caller has a reference on the 'toep' that + * keeps it stable. + */ + SOCKBUF_UNLOCK(sb); + DDP_UNLOCK(toep); + INP_WLOCK(inp); + DDP_LOCK(toep); + SOCKBUF_LOCK(sb); + + /* + * If the socket has been closed, we should detect + * that and complete this request if needed on + * the next trip around the loop. + */ + } + t4_rcvd_locked(&toep->td->tod, intotcpcb(inp)); + INP_WUNLOCK(inp); + if (resid == 0 || toep->ddp_flags & DDP_DEAD) { + /* + * We filled the entire buffer with socket + * data, DDP is not being used, or the socket + * is being shut down, so complete the + * request. + */ + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_complete(job, copied, 0); + toep->ddp_queueing = NULL; + goto restart; + } + + /* + * If DDP is not enabled, requeue this request and restart. + * This will either enable DDP or wait for more data to + * arrive on the socket buffer. + */ + if ((toep->ddp_flags & (DDP_ON | DDP_SC_REQ)) != DDP_ON) { + SOCKBUF_UNLOCK(sb); + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + goto restart; + } + + /* + * An indicate might have arrived and been added to + * the socket buffer while it was unlocked after the + * copy to lock the INP. If so, restart the copy. + */ + if (sbavail(sb) != 0) + goto sbcopy; + } + SOCKBUF_UNLOCK(sb); + + if (prep_pageset(sc, toep, ps) == 0) { + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + + /* + * XXX: Need to retry this later. Mostly need a trigger + * when page pods are freed up. + */ + printf("%s: prep_pageset failed\n", __func__); + return; + } + + /* Determine which DDP buffer to use. */ + if (toep->db[0].job == NULL) { + db_idx = 0; + } else { + MPASS(toep->db[1].job == NULL); + db_idx = 1; + } + + ddp_flags = 0; + ddp_flags_mask = 0; + if (db_idx == 0) { + ddp_flags |= V_TF_DDP_BUF0_VALID(1); + if (so->so_state & SS_NBIO) + ddp_flags |= V_TF_DDP_BUF0_FLUSH(1); + ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE0(1) | + V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PSHF_ENABLE_0(1) | + V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF0_VALID(1); + buf_flag = DDP_BUF0_ACTIVE; + } else { + ddp_flags |= V_TF_DDP_BUF1_VALID(1); + if (so->so_state & SS_NBIO) + ddp_flags |= V_TF_DDP_BUF1_FLUSH(1); + ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE1(1) | + V_TF_DDP_PUSH_DISABLE_1(1) | V_TF_DDP_PSHF_ENABLE_1(1) | + V_TF_DDP_BUF1_FLUSH(1) | V_TF_DDP_BUF1_VALID(1); + buf_flag = DDP_BUF1_ACTIVE; + } + MPASS((toep->ddp_flags & buf_flag) == 0); + if ((toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0) { + MPASS(db_idx == 0); + MPASS(toep->ddp_active_id == -1); + MPASS(toep->ddp_active_count == 0); + ddp_flags_mask |= V_TF_DDP_ACTIVE_BUF(1); + } + + /* + * The TID for this connection should still be valid. If DDP_DEAD + * is set, SBS_CANTRCVMORE should be set, so we shouldn't be + * this far anyway. Even if the socket is closing on the other + * end, the AIO job holds a reference on this end of the socket + * which will keep it open and keep the TCP PCB attached until + * after the job is completed. + */ + wr = mk_update_tcb_for_ddp(sc, toep, db_idx, ps, + job->uaiocb._aiocb_private.status, ddp_flags, ddp_flags_mask); + if (wr == NULL) { + recycle_pageset(toep, ps); + aio_ddp_requeue_one(toep, job); + toep->ddp_queueing = NULL; + + /* + * XXX: Need a way to kick a retry here. + * + * XXX: We know the fixed size needed and could + * preallocate this using a blocking request at the + * start of the task to avoid having to handle this + * edge case. + */ + printf("%s: mk_update_tcb_for_ddp failed\n", __func__); + return; + } + + if (!aio_set_cancel_function(job, t4_aio_cancel_active)) { + free_wrqe(wr); + recycle_pageset(toep, ps); + aio_ddp_cancel_one(job); + toep->ddp_queueing = NULL; + goto restart; + } + +#ifdef VERBOSE_TRACES + CTR5(KTR_CXGBE, "%s: scheduling %p for DDP[%d] (flags %#lx/%#lx)", + __func__, job, db_idx, ddp_flags, ddp_flags_mask); +#endif + /* Give the chip the go-ahead. */ + t4_wrq_tx(sc, wr); + db = &toep->db[db_idx]; + db->cancel_pending = 0; + db->job = job; + db->ps = ps; + toep->ddp_queueing = NULL; + toep->ddp_flags |= buf_flag; + toep->ddp_active_count++; + if (toep->ddp_active_count == 1) { + MPASS(toep->ddp_active_id == -1); + toep->ddp_active_id = db_idx; + CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__, + toep->ddp_active_id); + } + goto restart; +} + +void +ddp_queue_toep(struct toepcb *toep) +{ + + DDP_ASSERT_LOCKED(toep); + if (toep->ddp_flags & DDP_TASK_ACTIVE) + return; + toep->ddp_flags |= DDP_TASK_ACTIVE; + hold_toepcb(toep); + soaio_enqueue(&toep->ddp_requeue_task); +} + +static void +aio_ddp_requeue_task(void *context, int pending) +{ + struct toepcb *toep = context; + + DDP_LOCK(toep); + aio_ddp_requeue(toep); + toep->ddp_flags &= ~DDP_TASK_ACTIVE; + DDP_UNLOCK(toep); + + free_toepcb(toep); +} + +static void +t4_aio_cancel_active(struct kaiocb *job) +{ + struct socket *so = job->fd_file->f_data; + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + struct adapter *sc = td_adapter(toep->td); + uint64_t valid_flag; + int i; + + DDP_LOCK(toep); + if (aio_cancel_cleared(job)) { + DDP_UNLOCK(toep); + aio_ddp_cancel_one(job); + return; + } + + for (i = 0; i < nitems(toep->db); i++) { + if (toep->db[i].job == job) { + /* Should only ever get one cancel request for a job. */ + MPASS(toep->db[i].cancel_pending == 0); + + /* + * Invalidate this buffer. It will be + * cancelled or partially completed once the + * card ACKs the invalidate. + */ + valid_flag = i == 0 ? V_TF_DDP_BUF0_VALID(1) : + V_TF_DDP_BUF1_VALID(1); + t4_set_tcb_field_rpl(sc, toep, 1, W_TCB_RX_DDP_FLAGS, + valid_flag, 0, i + DDP_BUF0_INVALIDATED); + toep->db[i].cancel_pending = 1; + CTR2(KTR_CXGBE, "%s: request %p marked pending", + __func__, job); + break; + } + } + DDP_UNLOCK(toep); +} + +static void +t4_aio_cancel_queued(struct kaiocb *job) +{ + struct socket *so = job->fd_file->f_data; + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + + DDP_LOCK(toep); + if (!aio_cancel_cleared(job)) { + TAILQ_REMOVE(&toep->ddp_aiojobq, job, list); + toep->ddp_waiting_count--; + if (toep->ddp_waiting_count == 0) + ddp_queue_toep(toep); + } + CTR2(KTR_CXGBE, "%s: request %p cancelled", __func__, job); + DDP_UNLOCK(toep); + + aio_ddp_cancel_one(job); +} + +int +t4_aio_queue_ddp(struct socket *so, struct kaiocb *job) +{ + struct tcpcb *tp = so_sototcpcb(so); + struct toepcb *toep = tp->t_toe; + + + /* Ignore writes. */ + if (job->uaiocb.aio_lio_opcode != LIO_READ) + return (EOPNOTSUPP); + + DDP_LOCK(toep); + + /* + * XXX: Think about possibly returning errors for ENOTCONN, + * etc. Perhaps the caller would only queue the request + * if it failed with EOPNOTSUPP? + */ + +#ifdef VERBOSE_TRACES + CTR2(KTR_CXGBE, "%s: queueing %p", __func__, job); +#endif + if (!aio_set_cancel_function(job, t4_aio_cancel_queued)) + panic("new job was cancelled"); + TAILQ_INSERT_TAIL(&toep->ddp_aiojobq, job, list); + job->uaiocb._aiocb_private.status = 0; + toep->ddp_waiting_count++; + toep->ddp_flags |= DDP_OK; + + /* + * Try to handle this request synchronously. If this has + * to block because the task is running, it will just bail + * and let the task handle it instead. + */ + aio_ddp_requeue(toep); + DDP_UNLOCK(toep); + return (0); +} + +int +t4_ddp_mod_load(void) +{ + + TAILQ_INIT(&ddp_orphan_pagesets); + mtx_init(&ddp_orphan_pagesets_lock, "ddp orphans", NULL, MTX_DEF); + TASK_INIT(&ddp_orphan_task, 0, ddp_free_orphan_pagesets, NULL); + return (0); +} + +void +t4_ddp_mod_unload(void) +{ + + taskqueue_drain(taskqueue_thread, &ddp_orphan_task); + MPASS(TAILQ_EMPTY(&ddp_orphan_pagesets)); + mtx_destroy(&ddp_orphan_pagesets_lock); +} #endif diff --git a/sys/dev/cxgbe/tom/t4_tom.c b/sys/dev/cxgbe/tom/t4_tom.c index fe7ec7608a79..452c47e5de96 100644 --- a/sys/dev/cxgbe/tom/t4_tom.c +++ b/sys/dev/cxgbe/tom/t4_tom.c @@ -41,6 +41,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -152,6 +153,7 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags) if (toep == NULL) return (NULL); + refcount_init(&toep->refcount, 1); toep->td = sc->tom_softc; toep->vi = vi; toep->tx_total = tx_credits; @@ -165,19 +167,32 @@ alloc_toepcb(struct vi_info *vi, int txqid, int rxqid, int flags) toep->txsd_avail = txsd_total; toep->txsd_pidx = 0; toep->txsd_cidx = 0; + ddp_init_toep(toep); return (toep); } +struct toepcb * +hold_toepcb(struct toepcb *toep) +{ + + refcount_acquire(&toep->refcount); + return (toep); +} + void free_toepcb(struct toepcb *toep) { + if (refcount_release(&toep->refcount) == 0) + return; + KASSERT(!(toep->flags & TPF_ATTACHED), ("%s: attached to an inpcb", __func__)); KASSERT(!(toep->flags & TPF_CPL_PENDING), ("%s: CPL pending", __func__)); + ddp_uninit_toep(toep); free(toep, M_CXGBE); } @@ -259,6 +274,8 @@ undo_offload_socket(struct socket *so) mtx_lock(&td->toep_list_lock); TAILQ_REMOVE(&td->toep_list, toep, link); mtx_unlock(&td->toep_list_lock); + + free_toepcb(toep); } static void @@ -283,9 +300,9 @@ release_offload_resources(struct toepcb *toep) */ MPASS(mbufq_len(&toep->ulp_pduq) == 0); MPASS(mbufq_len(&toep->ulp_pdu_reclaimq) == 0); - - if (toep->ulp_mode == ULP_MODE_TCPDDP) - release_ddp_resources(toep); +#ifdef INVARIANTS + ddp_assert_empty(toep); +#endif if (toep->l2te) t4_l2t_release(toep->l2te); @@ -389,6 +406,8 @@ final_cpl_received(struct toepcb *toep) CTR6(KTR_CXGBE, "%s: tid %d, toep %p (0x%x), inp %p (0x%x)", __func__, toep->tid, toep, toep->flags, inp, inp->inp_flags); + if (toep->ulp_mode == ULP_MODE_TCPDDP) + release_ddp_resources(toep); toep->inp = NULL; toep->flags &= ~TPF_CPL_PENDING; mbufq_drain(&toep->ulp_pdu_reclaimq); @@ -599,7 +618,6 @@ set_tcpddp_ulp_mode(struct toepcb *toep) toep->ulp_mode = ULP_MODE_TCPDDP; toep->ddp_flags = DDP_OK; - toep->ddp_score = DDP_LOW_SCORE; } int @@ -1109,12 +1127,16 @@ t4_tom_mod_load(void) int rc; struct protosw *tcp_protosw, *tcp6_protosw; + rc = t4_ddp_mod_load(); + if (rc != 0) + return (rc); + tcp_protosw = pffindproto(PF_INET, IPPROTO_TCP, SOCK_STREAM); if (tcp_protosw == NULL) return (ENOPROTOOPT); bcopy(tcp_protosw, &ddp_protosw, sizeof(ddp_protosw)); bcopy(tcp_protosw->pr_usrreqs, &ddp_usrreqs, sizeof(ddp_usrreqs)); - ddp_usrreqs.pru_soreceive = t4_soreceive_ddp; + ddp_usrreqs.pru_aio_queue = t4_aio_queue_ddp; ddp_protosw.pr_usrreqs = &ddp_usrreqs; tcp6_protosw = pffindproto(PF_INET6, IPPROTO_TCP, SOCK_STREAM); @@ -1122,7 +1144,7 @@ t4_tom_mod_load(void) return (ENOPROTOOPT); bcopy(tcp6_protosw, &ddp6_protosw, sizeof(ddp6_protosw)); bcopy(tcp6_protosw->pr_usrreqs, &ddp6_usrreqs, sizeof(ddp6_usrreqs)); - ddp6_usrreqs.pru_soreceive = t4_soreceive_ddp; + ddp6_usrreqs.pru_aio_queue = t4_aio_queue_ddp; ddp6_protosw.pr_usrreqs = &ddp6_usrreqs; TIMEOUT_TASK_INIT(taskqueue_thread, &clip_task, 0, t4_clip_task, NULL); @@ -1162,6 +1184,8 @@ t4_tom_mod_unload(void) taskqueue_cancel_timeout(taskqueue_thread, &clip_task, NULL); } + t4_ddp_mod_unload(); + return (0); } #endif /* TCP_OFFLOAD */ diff --git a/sys/dev/cxgbe/tom/t4_tom.h b/sys/dev/cxgbe/tom/t4_tom.h index f61888deb3d8..09238a49b346 100644 --- a/sys/dev/cxgbe/tom/t4_tom.h +++ b/sys/dev/cxgbe/tom/t4_tom.h @@ -74,6 +74,8 @@ enum { DDP_ON = (1 << 2), /* DDP is turned on */ DDP_BUF0_ACTIVE = (1 << 3), /* buffer 0 in use (not invalidated) */ DDP_BUF1_ACTIVE = (1 << 4), /* buffer 1 in use (not invalidated) */ + DDP_TASK_ACTIVE = (1 << 5), /* requeue task is queued / running */ + DDP_DEAD = (1 << 6), /* toepcb is shutting down */ }; struct ofld_tx_sdesc { @@ -81,19 +83,36 @@ struct ofld_tx_sdesc { uint8_t tx_credits; /* firmware tx credits (unit is 16B) */ }; -struct ddp_buffer { - uint32_t tag; /* includes color, page pod addr, and DDP page size */ +struct pageset { + TAILQ_ENTRY(pageset) link; + vm_page_t *pages; + int npages; + int flags; u_int ppod_addr; int nppods; - int offset; + uint32_t tag; /* includes color, page pod addr, and DDP page size */ + int offset; /* offset in first page */ int len; - int npages; - vm_page_t *pages; + struct vmspace *vm; + u_int vm_timestamp; +}; + +TAILQ_HEAD(pagesetq, pageset); + +#define PS_WIRED 0x0001 /* Pages wired rather than held. */ +#define PS_PPODS_WRITTEN 0x0002 /* Page pods written to the card. */ + +struct ddp_buffer { + struct pageset *ps; + + struct kaiocb *job; + int cancel_pending; }; struct toepcb { TAILQ_ENTRY(toepcb) link; /* toep_list */ u_int flags; /* miscellaneous flags */ + int refcount; struct tom_data *td; struct inpcb *inp; /* backpointer to host stack's PCB */ struct vi_info *vi; /* virtual interface */ @@ -121,9 +140,16 @@ struct toepcb { struct mbufq ulp_pdu_reclaimq; u_int ddp_flags; - struct ddp_buffer *db[2]; - time_t ddp_disabled; - uint8_t ddp_score; + struct ddp_buffer db[2]; + TAILQ_HEAD(, pageset) ddp_cached_pagesets; + TAILQ_HEAD(, kaiocb) ddp_aiojobq; + u_int ddp_waiting_count; + u_int ddp_active_count; + u_int ddp_cached_count; + int ddp_active_id; /* the currently active DDP buffer */ + struct task ddp_requeue_task; + struct kaiocb *ddp_queueing; + struct mtx ddp_lock; /* Tx software descriptor */ uint8_t txsd_total; @@ -133,6 +159,10 @@ struct toepcb { struct ofld_tx_sdesc txsd[]; }; +#define DDP_LOCK(toep) mtx_lock(&(toep)->ddp_lock) +#define DDP_UNLOCK(toep) mtx_unlock(&(toep)->ddp_lock) +#define DDP_ASSERT_LOCKED(toep) mtx_assert(&(toep)->ddp_lock, MA_OWNED) + struct flowc_tx_params { uint32_t snd_nxt; uint32_t rcv_nxt; @@ -242,6 +272,7 @@ mbuf_ulp_submode(struct mbuf *m) /* t4_tom.c */ struct toepcb *alloc_toepcb(struct vi_info *, int, int, int); +struct toepcb *hold_toepcb(struct toepcb *); void free_toepcb(struct toepcb *); void offload_socket(struct socket *, struct toepcb *); void undo_offload_socket(struct socket *); @@ -289,11 +320,14 @@ void send_flowc_wr(struct toepcb *, struct flowc_tx_params *); void send_reset(struct adapter *, struct toepcb *, uint32_t); void make_established(struct toepcb *, uint32_t, uint32_t, uint16_t); void t4_rcvd(struct toedev *, struct tcpcb *); +void t4_rcvd_locked(struct toedev *, struct tcpcb *); int t4_tod_output(struct toedev *, struct tcpcb *); int t4_send_fin(struct toedev *, struct tcpcb *); int t4_send_rst(struct toedev *, struct tcpcb *); void t4_set_tcb_field(struct adapter *, struct toepcb *, int, uint16_t, uint64_t, uint64_t); +void t4_set_tcb_field_rpl(struct adapter *, struct toepcb *, int, uint16_t, + uint64_t, uint64_t, uint8_t); void t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop); void t4_push_pdus(struct adapter *sc, struct toepcb *toep, int drop); @@ -302,10 +336,17 @@ void t4_init_ddp(struct adapter *, struct tom_data *); void t4_uninit_ddp(struct adapter *, struct tom_data *); int t4_soreceive_ddp(struct socket *, struct sockaddr **, struct uio *, struct mbuf **, struct mbuf **, int *); -void enable_ddp(struct adapter *, struct toepcb *toep); +int t4_aio_queue_ddp(struct socket *, struct kaiocb *); +int t4_ddp_mod_load(void); +void t4_ddp_mod_unload(void); +void ddp_assert_empty(struct toepcb *); +void ddp_init_toep(struct toepcb *); +void ddp_uninit_toep(struct toepcb *); +void ddp_queue_toep(struct toepcb *); void release_ddp_resources(struct toepcb *toep); -void handle_ddp_close(struct toepcb *, struct tcpcb *, struct sockbuf *, - uint32_t); +void handle_ddp_close(struct toepcb *, struct tcpcb *, uint32_t); +void handle_ddp_indicate(struct toepcb *); +void handle_ddp_tcb_rpl(struct toepcb *, const struct cpl_set_tcb_rpl *); void insert_ddp_data(struct toepcb *, uint32_t); #endif