Use unmapped (M_NOMAP) mbufs for zero-copy AIO writes via TOE.

Previously the TOE code used its own custom unmapped mbufs via
EXT_FLAG_VENDOR1.  The old version always wired the entire AIO request
buffer first for the duration of the AIO operation and constructed
multiple mbufs which used the wired buffer as an external buffer.

The new version determines how much room is available in the socket
buffer and only wires the pages needed for the available room building
chains of M_NOMAP mbufs.  This means that a large AIO write will now
limit the amount of wired memory it uses to the size of the socket
buffer.

Reviewed by:	gallatin, np
Sponsored by:	Chelsio Communications
Differential Revision:	https://reviews.freebsd.org/D20839
This commit is contained in:
John Baldwin 2019-07-03 16:06:11 +00:00
parent 8996977a89
commit 7b17c92129
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=349649
3 changed files with 140 additions and 144 deletions

View File

@ -76,28 +76,6 @@ __FBSDID("$FreeBSD$");
static void t4_aiotx_cancel(struct kaiocb *job);
static void t4_aiotx_queue_toep(struct socket *so, struct toepcb *toep);
static size_t
aiotx_mbuf_pgoff(struct mbuf *m)
{
struct aiotx_buffer *ab;
MPASS(IS_AIOTX_MBUF(m));
ab = m->m_ext.ext_arg1;
return ((ab->ps.offset + (uintptr_t)m->m_ext.ext_arg2) % PAGE_SIZE);
}
static vm_page_t *
aiotx_mbuf_pages(struct mbuf *m)
{
struct aiotx_buffer *ab;
int npages;
MPASS(IS_AIOTX_MBUF(m));
ab = m->m_ext.ext_arg1;
npages = (ab->ps.offset + (uintptr_t)m->m_ext.ext_arg2) / PAGE_SIZE;
return (ab->ps.pages + npages);
}
void
send_flowc_wr(struct toepcb *toep, struct flowc_tx_params *ftxp)
{
@ -647,10 +625,7 @@ write_tx_sgl(void *dst, struct mbuf *start, struct mbuf *stop, int nsegs, int n)
i = -1;
for (m = start; m != stop; m = m->m_next) {
if (IS_AIOTX_MBUF(m))
rc = sglist_append_vmpages(&sg, aiotx_mbuf_pages(m),
aiotx_mbuf_pgoff(m), m->m_len);
else if (m->m_flags & M_NOMAP)
if (m->m_flags & M_NOMAP)
rc = sglist_append_mb_ext_pgs(&sg, m);
else
rc = sglist_append(&sg, mtod(m, void *), m->m_len);
@ -713,7 +688,7 @@ t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop)
struct sockbuf *sb = &so->so_snd;
int tx_credits, shove, compl, sowwakeup;
struct ofld_tx_sdesc *txsd;
bool aiotx_mbuf_seen;
bool nomap_mbuf_seen;
INP_WLOCK_ASSERT(inp);
KASSERT(toep->flags & TPF_FLOWC_WR_SENT,
@ -766,14 +741,11 @@ t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop)
plen = 0;
nsegs = 0;
max_nsegs_1mbuf = 0; /* max # of SGL segments in any one mbuf */
aiotx_mbuf_seen = false;
nomap_mbuf_seen = false;
for (m = sndptr; m != NULL; m = m->m_next) {
int n;
if (IS_AIOTX_MBUF(m))
n = sglist_count_vmpages(aiotx_mbuf_pages(m),
aiotx_mbuf_pgoff(m), m->m_len);
else if (m->m_flags & M_NOMAP)
if (m->m_flags & M_NOMAP)
n = sglist_count_mb_ext_pgs(m);
else
n = sglist_count(mtod(m, void *), m->m_len);
@ -802,8 +774,8 @@ t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop)
break;
}
if (IS_AIOTX_MBUF(m))
aiotx_mbuf_seen = true;
if (m->m_flags & M_NOMAP)
nomap_mbuf_seen = true;
if (max_nsegs_1mbuf < n)
max_nsegs_1mbuf = n;
sb_sndptr = m; /* new sb->sb_sndptr if all goes well */
@ -852,7 +824,7 @@ t4_push_frames(struct adapter *sc, struct toepcb *toep, int drop)
panic("%s: excess tx.", __func__);
shove = m == NULL && !(tp->t_flags & TF_MORETOCOME);
if (plen <= max_imm && !aiotx_mbuf_seen) {
if (plen <= max_imm && !nomap_mbuf_seen) {
/* Immediate data tx */
@ -1910,71 +1882,94 @@ t4_uninit_cpl_io_handlers(void)
}
/*
* Use the 'backend3' field in AIO jobs to store the amount of data
* sent by the AIO job so far and the 'backend4' field to hold an
* error that should be reported when the job is completed.
* Use the 'backend1' field in AIO jobs to hold an error that should
* be reported when the job is completed, the 'backend3' field to
* store the amount of data sent by the AIO job so far, and the
* 'backend4' field to hold a reference count on the job.
*
* Each unmapped mbuf holds a reference on the job as does the queue
* so long as the job is queued.
*/
#define aio_error backend1
#define aio_sent backend3
#define aio_error backend4
#define aio_refs backend4
#define jobtotid(job) \
(((struct toepcb *)(so_sototcpcb((job)->fd_file->f_data)->t_toe))->tid)
static void
free_aiotx_buffer(struct aiotx_buffer *ab)
aiotx_free_job(struct kaiocb *job)
{
struct kaiocb *job;
long status;
int error;
if (refcount_release(&ab->refcount) == 0)
if (refcount_release(&job->aio_refs) == 0)
return;
job = ab->job;
error = job->aio_error;
error = (intptr_t)job->aio_error;
status = job->aio_sent;
vm_page_unhold_pages(ab->ps.pages, ab->ps.npages);
free(ab, M_CXGBE);
#ifdef VERBOSE_TRACES
CTR5(KTR_CXGBE, "%s: tid %d completed %p len %ld, error %d", __func__,
jobtotid(job), job, status, error);
#endif
if (error == ECANCELED && status != 0)
if (error != 0 && status != 0)
error = 0;
if (error == ECANCELED)
aio_cancel(job);
else if (error)
aio_complete(job, -1, error);
else
else {
job->msgsnd = 1;
aio_complete(job, status, 0);
}
}
static void
t4_aiotx_mbuf_free(struct mbuf *m)
aiotx_free_pgs(struct mbuf *m)
{
struct aiotx_buffer *ab = m->m_ext.ext_arg1;
struct mbuf_ext_pgs *ext_pgs;
struct kaiocb *job;
struct mtx *mtx;
vm_page_t pg;
MBUF_EXT_PGS_ASSERT(m);
ext_pgs = m->m_ext.ext_pgs;
job = m->m_ext.ext_arg1;
#ifdef VERBOSE_TRACES
CTR3(KTR_CXGBE, "%s: completed %d bytes for tid %d", __func__,
m->m_len, jobtotid(ab->job));
m->m_len, jobtotid(job));
#endif
free_aiotx_buffer(ab);
mtx = NULL;
for (int i = 0; i < ext_pgs->npgs; i++) {
pg = PHYS_TO_VM_PAGE(ext_pgs->pa[i]);
vm_page_change_lock(pg, &mtx);
vm_page_unhold(pg);
}
if (mtx != NULL)
mtx_unlock(mtx);
aiotx_free_job(job);
}
/*
* Hold the buffer backing an AIO request and return an AIO transmit
* buffer.
* Allocate a chain of unmapped mbufs describing the next 'len' bytes
* of an AIO job.
*/
static int
hold_aio(struct kaiocb *job)
static struct mbuf *
alloc_aiotx_mbuf(struct kaiocb *job, int len)
{
struct aiotx_buffer *ab;
struct vmspace *vm;
vm_page_t pgs[MBUF_PEXT_MAX_PGS];
struct mbuf *m, *top, *last;
struct mbuf_ext_pgs *ext_pgs;
vm_map_t map;
vm_offset_t start, end, pgoff;
int n;
vm_offset_t start;
int i, mlen, npages, pgoff;
MPASS(job->backend1 == NULL);
KASSERT(job->aio_sent + len <= job->uaiocb.aio_nbytes,
("%s(%p, %d): request to send beyond end of buffer", __func__,
job, len));
/*
* The AIO subsystem will cancel and drain all requests before
@ -1983,35 +1978,65 @@ hold_aio(struct kaiocb *job)
*/
vm = job->userproc->p_vmspace;
map = &vm->vm_map;
start = (uintptr_t)job->uaiocb.aio_buf;
start = (uintptr_t)job->uaiocb.aio_buf + job->aio_sent;
pgoff = start & PAGE_MASK;
end = round_page(start + job->uaiocb.aio_nbytes);
start = trunc_page(start);
n = atop(end - start);
ab = malloc(sizeof(*ab) + n * sizeof(vm_page_t), M_CXGBE, M_WAITOK |
M_ZERO);
refcount_init(&ab->refcount, 1);
ab->ps.pages = (vm_page_t *)(ab + 1);
ab->ps.npages = vm_fault_quick_hold_pages(map, start, end - start,
VM_PROT_WRITE, ab->ps.pages, n);
if (ab->ps.npages < 0) {
free(ab, M_CXGBE);
return (EFAULT);
top = NULL;
last = NULL;
while (len > 0) {
mlen = imin(len, MBUF_PEXT_MAX_PGS * PAGE_SIZE - pgoff);
KASSERT(mlen == len || (start + mlen & PAGE_MASK) == 0,
("%s: next start (%#jx + %#x) is not page aligned",
__func__, (uintmax_t)start, mlen));
npages = vm_fault_quick_hold_pages(map, start, mlen,
VM_PROT_WRITE, pgs, nitems(pgs));
if (npages < 0)
break;
m = mb_alloc_ext_pgs(M_WAITOK, false, aiotx_free_pgs);
if (m == NULL) {
vm_page_unhold_pages(pgs, npages);
break;
}
ext_pgs = m->m_ext.ext_pgs;
ext_pgs->first_pg_off = pgoff;
ext_pgs->npgs = npages;
if (npages == 1) {
KASSERT(mlen + pgoff <= PAGE_SIZE,
("%s: single page is too large (off %d len %d)",
__func__, pgoff, mlen));
ext_pgs->last_pg_len = mlen;
} else {
ext_pgs->last_pg_len = mlen - (PAGE_SIZE - pgoff) -
(npages - 2) * PAGE_SIZE;
}
for (i = 0; i < npages; i++)
ext_pgs->pa[i] = VM_PAGE_TO_PHYS(pgs[i]);
m->m_len = mlen;
m->m_ext.ext_size = npages * PAGE_SIZE;
m->m_ext.ext_arg1 = job;
refcount_acquire(&job->aio_refs);
#ifdef VERBOSE_TRACES
CTR5(KTR_CXGBE, "%s: tid %d, new mbuf %p for job %p, npages %d",
__func__, jobtotid(job), m, job, npages);
#endif
if (top == NULL)
top = m;
else
last->m_next = m;
last = m;
len -= mlen;
start += mlen;
pgoff = 0;
}
KASSERT(ab->ps.npages == n,
("hold_aio: page count mismatch: %d vs %d", ab->ps.npages, n));
ab->ps.offset = pgoff;
ab->ps.len = job->uaiocb.aio_nbytes;
ab->job = job;
job->backend1 = ab;
#ifdef VERBOSE_TRACES
CTR5(KTR_CXGBE, "%s: tid %d, new pageset %p for job %p, npages %d",
__func__, jobtotid(job), &ab->ps, job, ab->ps.npages);
#endif
return (0);
return (top);
}
static void
@ -2020,18 +2045,16 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
struct adapter *sc;
struct sockbuf *sb;
struct file *fp;
struct aiotx_buffer *ab;
struct inpcb *inp;
struct tcpcb *tp;
struct mbuf *m;
int error;
int error, len;
bool moretocome, sendmore;
sc = td_adapter(toep->td);
sb = &so->so_snd;
SOCKBUF_UNLOCK(sb);
fp = job->fd_file;
ab = job->backend1;
m = NULL;
#ifdef MAC
@ -2040,23 +2063,12 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
goto out;
#endif
if (ab == NULL) {
error = hold_aio(job);
if (error != 0)
goto out;
ab = job->backend1;
}
/* Inline sosend_generic(). */
job->msgsnd = 1;
error = sblock(sb, SBL_WAIT);
MPASS(error == 0);
sendanother:
m = m_get(M_WAITOK, MT_DATA);
SOCKBUF_LOCK(sb);
if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
SOCKBUF_UNLOCK(sb);
@ -2105,14 +2117,14 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
* Write as much data as the socket permits, but no more than a
* a single sndbuf at a time.
*/
m->m_len = sbspace(sb);
if (m->m_len > ab->ps.len - job->aio_sent) {
m->m_len = ab->ps.len - job->aio_sent;
len = sbspace(sb);
if (len > job->uaiocb.aio_nbytes - job->aio_sent) {
len = job->uaiocb.aio_nbytes - job->aio_sent;
moretocome = false;
} else
moretocome = true;
if (m->m_len > sc->tt.sndbuf) {
m->m_len = sc->tt.sndbuf;
if (len > sc->tt.sndbuf) {
len = sc->tt.sndbuf;
sendmore = true;
} else
sendmore = false;
@ -2120,7 +2132,14 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
if (!TAILQ_EMPTY(&toep->aiotx_jobq))
moretocome = true;
SOCKBUF_UNLOCK(sb);
MPASS(m->m_len != 0);
MPASS(len != 0);
m = alloc_aiotx_mbuf(job, len);
if (m == NULL) {
sbunlock(sb);
error = EFAULT;
goto out;
}
/* Inlined tcp_usr_send(). */
@ -2133,12 +2152,8 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
goto out;
}
refcount_acquire(&ab->refcount);
m_extadd(m, NULL, ab->ps.len, t4_aiotx_mbuf_free, ab,
(void *)(uintptr_t)job->aio_sent, 0, EXT_NET_DRV);
m->m_ext.ext_flags |= EXT_FLAG_AIOTX;
job->aio_sent += m->m_len;
job->aio_sent += m_length(m, NULL);
sbappendstream(sb, m, 0);
m = NULL;
@ -2160,8 +2175,8 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
goto out;
/*
* If this is a non-blocking socket and the request has not
* been fully completed, requeue it until the socket is ready
* If this is a blocking socket and the request has not been
* fully completed, requeue it until the socket is ready
* again.
*/
if (job->aio_sent < job->uaiocb.aio_nbytes &&
@ -2177,22 +2192,18 @@ t4_aiotx_process_job(struct toepcb *toep, struct socket *so, struct kaiocb *job)
}
/*
* If the request will not be requeued, drop a reference on
* the aiotx buffer. Any mbufs in flight should still
* contain a reference, but this drops the reference that the
* job owns while it is waiting to queue mbufs to the socket.
* If the request will not be requeued, drop the queue's
* reference to the job. Any mbufs in flight should still
* hold a reference, but this drops the reference that the
* queue owns while it is waiting to queue mbufs to the
* socket.
*/
free_aiotx_buffer(ab);
aiotx_free_job(job);
out:
if (error) {
if (ab != NULL) {
job->aio_error = error;
free_aiotx_buffer(ab);
} else {
MPASS(job->aio_sent == 0);
aio_complete(job, -1, error);
}
job->aio_error = (void *)(intptr_t)error;
aiotx_free_job(job);
}
if (m != NULL)
m_free(m);
@ -2246,7 +2257,6 @@ t4_aiotx_queue_toep(struct socket *so, struct toepcb *toep)
static void
t4_aiotx_cancel(struct kaiocb *job)
{
struct aiotx_buffer *ab;
struct socket *so;
struct sockbuf *sb;
struct tcpcb *tp;
@ -2263,11 +2273,8 @@ t4_aiotx_cancel(struct kaiocb *job)
TAILQ_REMOVE(&toep->aiotx_jobq, job, list);
SOCKBUF_UNLOCK(sb);
ab = job->backend1;
if (ab != NULL)
free_aiotx_buffer(ab);
else
aio_cancel(job);
job->aio_error = (void *)(intptr_t)ECANCELED;
aiotx_free_job(job);
}
int
@ -2293,6 +2300,7 @@ t4_aio_queue_aiotx(struct socket *so, struct kaiocb *job)
#endif
if (!aio_set_cancel_function(job, t4_aiotx_cancel))
panic("new job was cancelled");
refcount_init(&job->aio_refs, 1);
TAILQ_INSERT_TAIL(&toep->aiotx_jobq, job, list);
if (sowriteable(so))
t4_aiotx_queue_toep(so, toep);

View File

@ -1193,7 +1193,6 @@ t4_push_tls_records(struct adapter *sc, struct toepcb *toep, int drop)
/* Read the header of the next TLS record. */
sndptr = sbsndmbuf(sb, tls_ofld->sb_off, &sndptroff);
MPASS(!IS_AIOTX_MBUF(sndptr));
m_copydata(sndptr, sndptroff, sizeof(thdr), (caddr_t)&thdr);
tls_size = htons(thdr.length);
plen = TLS_HEADER_LENGTH + tls_size;

View File

@ -127,11 +127,6 @@ TAILQ_HEAD(pagesetq, pageset);
#define PS_WIRED 0x0001 /* Pages wired rather than held. */
#define PS_PPODS_WRITTEN 0x0002 /* Page pods written to the card. */
#define EXT_FLAG_AIOTX EXT_FLAG_VENDOR1
#define IS_AIOTX_MBUF(m) \
((m)->m_flags & M_EXT && (m)->m_ext.ext_flags & EXT_FLAG_AIOTX)
struct ddp_buffer {
struct pageset *ps;
@ -153,12 +148,6 @@ struct ddp_pcb {
struct mtx lock;
};
struct aiotx_buffer {
struct pageset ps;
struct kaiocb *job;
int refcount;
};
struct toepcb {
TAILQ_ENTRY(toepcb) link; /* toep_list */
u_int flags; /* miscellaneous flags */