Coalesce socket reads in software iSCSI.

Instead of 2-4 socket reads per PDU this can do as low as one read
per megabyte, dramatically reducing TCP overhead and lock contention.

With this on iSCSI target I can write more than 4GB/s through a
single connection.

MFC after:	1 month
This commit is contained in:
Alexander Motin 2021-02-22 12:23:35 -05:00
parent c1b554c868
commit 6895f89fe5

View File

@ -165,68 +165,6 @@ icl_conn_fail(struct icl_conn *ic)
(ic->ic_error)(ic); (ic->ic_error)(ic);
} }
static struct mbuf *
icl_conn_receive(struct icl_conn *ic, size_t len)
{
struct uio uio;
struct socket *so;
struct mbuf *m;
int error, flags;
so = ic->ic_socket;
memset(&uio, 0, sizeof(uio));
uio.uio_resid = len;
flags = MSG_DONTWAIT;
error = soreceive(so, NULL, &uio, &m, NULL, &flags);
if (error != 0) {
ICL_DEBUG("soreceive error %d", error);
return (NULL);
}
if (uio.uio_resid != 0) {
m_freem(m);
ICL_DEBUG("short read");
return (NULL);
}
return (m);
}
static int
icl_conn_receive_buf(struct icl_conn *ic, void *buf, size_t len)
{
struct iovec iov[1];
struct uio uio;
struct socket *so;
int error, flags;
so = ic->ic_socket;
memset(&uio, 0, sizeof(uio));
iov[0].iov_base = buf;
iov[0].iov_len = len;
uio.uio_iov = iov;
uio.uio_iovcnt = 1;
uio.uio_offset = 0;
uio.uio_resid = len;
uio.uio_segflg = UIO_SYSSPACE;
uio.uio_rw = UIO_READ;
flags = MSG_DONTWAIT;
error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
if (error != 0) {
ICL_DEBUG("soreceive error %d", error);
return (-1);
}
if (uio.uio_resid != 0) {
ICL_DEBUG("short read");
return (-1);
}
return (0);
}
static void static void
icl_soft_conn_pdu_free(struct icl_conn *ic, struct icl_pdu *ip) icl_soft_conn_pdu_free(struct icl_conn *ic, struct icl_pdu *ip)
{ {
@ -384,37 +322,28 @@ icl_pdu_size(const struct icl_pdu *response)
return (len); return (len);
} }
static int static void
icl_pdu_receive_bhs(struct icl_pdu *request, size_t *availablep) icl_soft_receive_buf(struct mbuf **r, size_t *rs, void *buf, size_t s)
{ {
if (icl_conn_receive_buf(request->ip_conn, m_copydata(*r, 0, s, buf);
request->ip_bhs, sizeof(struct iscsi_bhs))) { m_adj(*r, s);
ICL_DEBUG("failed to receive BHS"); while ((*r) != NULL && (*r)->m_len == 0)
return (-1); *r = m_free(*r);
} *rs -= s;
*availablep -= sizeof(struct iscsi_bhs);
return (0);
} }
static int static void
icl_pdu_receive_ahs(struct icl_pdu *request, size_t *availablep) icl_pdu_receive_ahs(struct icl_pdu *request, struct mbuf **r, size_t *rs)
{ {
request->ip_ahs_len = icl_pdu_ahs_length(request); request->ip_ahs_len = icl_pdu_ahs_length(request);
if (request->ip_ahs_len == 0) if (request->ip_ahs_len == 0)
return (0); return;
request->ip_ahs_mbuf = icl_conn_receive(request->ip_conn, request->ip_ahs_mbuf = *r;
request->ip_ahs_len); *r = m_split(request->ip_ahs_mbuf, request->ip_ahs_len, M_WAITOK);
if (request->ip_ahs_mbuf == NULL) { *rs -= request->ip_ahs_len;
ICL_DEBUG("failed to receive AHS");
return (-1);
}
*availablep -= request->ip_ahs_len;
return (0);
} }
static uint32_t static uint32_t
@ -433,7 +362,7 @@ icl_mbuf_to_crc32c(const struct mbuf *m0)
} }
static int static int
icl_pdu_check_header_digest(struct icl_pdu *request, size_t *availablep) icl_pdu_check_header_digest(struct icl_pdu *request, struct mbuf **r, size_t *rs)
{ {
uint32_t received_digest, valid_digest; uint32_t received_digest, valid_digest;
@ -441,12 +370,7 @@ icl_pdu_check_header_digest(struct icl_pdu *request, size_t *availablep)
return (0); return (0);
CTASSERT(sizeof(received_digest) == ISCSI_HEADER_DIGEST_SIZE); CTASSERT(sizeof(received_digest) == ISCSI_HEADER_DIGEST_SIZE);
if (icl_conn_receive_buf(request->ip_conn, icl_soft_receive_buf(r, rs, &received_digest, ISCSI_HEADER_DIGEST_SIZE);
&received_digest, ISCSI_HEADER_DIGEST_SIZE)) {
ICL_DEBUG("failed to receive header digest");
return (-1);
}
*availablep -= ISCSI_HEADER_DIGEST_SIZE;
/* Temporary attach AHS to BHS to calculate header digest. */ /* Temporary attach AHS to BHS to calculate header digest. */
request->ip_bhs_mbuf->m_next = request->ip_ahs_mbuf; request->ip_bhs_mbuf->m_next = request->ip_ahs_mbuf;
@ -514,8 +438,8 @@ icl_pdu_data_segment_receive_len(const struct icl_pdu *request)
} }
static int static int
icl_pdu_receive_data_segment(struct icl_pdu *request, icl_pdu_receive_data_segment(struct icl_pdu *request, struct mbuf **r,
size_t *availablep, bool *more_neededp) size_t *rs, bool *more_neededp)
{ {
struct icl_conn *ic; struct icl_conn *ic;
size_t len, padding = 0; size_t len, padding = 0;
@ -539,7 +463,7 @@ icl_pdu_receive_data_segment(struct icl_pdu *request,
KASSERT(len > request->ip_data_len, ("len <= request->ip_data_len")); KASSERT(len > request->ip_data_len, ("len <= request->ip_data_len"));
len -= request->ip_data_len; len -= request->ip_data_len;
if (len + padding > *availablep) { if (len + padding > *rs) {
/* /*
* Not enough data in the socket buffer. Receive as much * Not enough data in the socket buffer. Receive as much
* as we can. Don't receive padding, since, obviously, it's * as we can. Don't receive padding, since, obviously, it's
@ -547,9 +471,9 @@ icl_pdu_receive_data_segment(struct icl_pdu *request,
*/ */
#if 0 #if 0
ICL_DEBUG("limited from %zd to %zd", ICL_DEBUG("limited from %zd to %zd",
len + padding, *availablep - padding)); len + padding, *rs - padding));
#endif #endif
len = *availablep - padding; len = *rs - padding;
*more_neededp = true; *more_neededp = true;
padding = 0; padding = 0;
} }
@ -559,11 +483,9 @@ icl_pdu_receive_data_segment(struct icl_pdu *request,
* of actual data segment. * of actual data segment.
*/ */
if (len > 0) { if (len > 0) {
m = icl_conn_receive(request->ip_conn, len + padding); m = *r;
if (m == NULL) { *r = m_split(m, len + padding, M_WAITOK);
ICL_DEBUG("failed to receive data segment"); *rs -= len + padding;
return (-1);
}
if (request->ip_data_mbuf == NULL) if (request->ip_data_mbuf == NULL)
request->ip_data_mbuf = m; request->ip_data_mbuf = m;
@ -571,7 +493,6 @@ icl_pdu_receive_data_segment(struct icl_pdu *request,
m_cat(request->ip_data_mbuf, m); m_cat(request->ip_data_mbuf, m);
request->ip_data_len += len; request->ip_data_len += len;
*availablep -= len + padding;
} else } else
ICL_DEBUG("len 0"); ICL_DEBUG("len 0");
@ -583,7 +504,7 @@ icl_pdu_receive_data_segment(struct icl_pdu *request,
} }
static int static int
icl_pdu_check_data_digest(struct icl_pdu *request, size_t *availablep) icl_pdu_check_data_digest(struct icl_pdu *request, struct mbuf **r, size_t *rs)
{ {
uint32_t received_digest, valid_digest; uint32_t received_digest, valid_digest;
@ -594,12 +515,7 @@ icl_pdu_check_data_digest(struct icl_pdu *request, size_t *availablep)
return (0); return (0);
CTASSERT(sizeof(received_digest) == ISCSI_DATA_DIGEST_SIZE); CTASSERT(sizeof(received_digest) == ISCSI_DATA_DIGEST_SIZE);
if (icl_conn_receive_buf(request->ip_conn, icl_soft_receive_buf(r, rs, &received_digest, ISCSI_DATA_DIGEST_SIZE);
&received_digest, ISCSI_DATA_DIGEST_SIZE)) {
ICL_DEBUG("failed to receive data digest");
return (-1);
}
*availablep -= ISCSI_DATA_DIGEST_SIZE;
/* /*
* Note that ip_data_mbuf also contains padding; since digest * Note that ip_data_mbuf also contains padding; since digest
@ -621,16 +537,13 @@ icl_pdu_check_data_digest(struct icl_pdu *request, size_t *availablep)
* "part" of PDU at a time; call it repeatedly until it returns non-NULL. * "part" of PDU at a time; call it repeatedly until it returns non-NULL.
*/ */
static struct icl_pdu * static struct icl_pdu *
icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep) icl_conn_receive_pdu(struct icl_conn *ic, struct mbuf **r, size_t *rs)
{ {
struct icl_pdu *request; struct icl_pdu *request;
struct socket *so;
size_t len; size_t len;
int error; int error = 0;
bool more_needed; bool more_needed;
so = ic->ic_socket;
if (ic->ic_receive_state == ICL_CONN_STATE_BHS) { if (ic->ic_receive_state == ICL_CONN_STATE_BHS) {
KASSERT(ic->ic_receive_pdu == NULL, KASSERT(ic->ic_receive_pdu == NULL,
("ic->ic_receive_pdu != NULL")); ("ic->ic_receive_pdu != NULL"));
@ -648,23 +561,11 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
request = ic->ic_receive_pdu; request = ic->ic_receive_pdu;
} }
if (*availablep < ic->ic_receive_len) {
#if 0
ICL_DEBUG("not enough data; need %zd, "
"have %zd", ic->ic_receive_len, *availablep);
#endif
return (NULL);
}
switch (ic->ic_receive_state) { switch (ic->ic_receive_state) {
case ICL_CONN_STATE_BHS: case ICL_CONN_STATE_BHS:
//ICL_DEBUG("receiving BHS"); //ICL_DEBUG("receiving BHS");
error = icl_pdu_receive_bhs(request, availablep); icl_soft_receive_buf(r, rs, request->ip_bhs,
if (error != 0) { sizeof(struct iscsi_bhs));
ICL_DEBUG("failed to receive BHS; "
"dropping connection");
break;
}
/* /*
* We don't enforce any limit for AHS length; * We don't enforce any limit for AHS length;
@ -686,12 +587,7 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
case ICL_CONN_STATE_AHS: case ICL_CONN_STATE_AHS:
//ICL_DEBUG("receiving AHS"); //ICL_DEBUG("receiving AHS");
error = icl_pdu_receive_ahs(request, availablep); icl_pdu_receive_ahs(request, r, rs);
if (error != 0) {
ICL_DEBUG("failed to receive AHS; "
"dropping connection");
break;
}
ic->ic_receive_state = ICL_CONN_STATE_HEADER_DIGEST; ic->ic_receive_state = ICL_CONN_STATE_HEADER_DIGEST;
if (ic->ic_header_crc32c == false) if (ic->ic_header_crc32c == false)
ic->ic_receive_len = 0; ic->ic_receive_len = 0;
@ -701,7 +597,7 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
case ICL_CONN_STATE_HEADER_DIGEST: case ICL_CONN_STATE_HEADER_DIGEST:
//ICL_DEBUG("receiving header digest"); //ICL_DEBUG("receiving header digest");
error = icl_pdu_check_header_digest(request, availablep); error = icl_pdu_check_header_digest(request, r, rs);
if (error != 0) { if (error != 0) {
ICL_DEBUG("header digest failed; " ICL_DEBUG("header digest failed; "
"dropping connection"); "dropping connection");
@ -715,7 +611,7 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
case ICL_CONN_STATE_DATA: case ICL_CONN_STATE_DATA:
//ICL_DEBUG("receiving data segment"); //ICL_DEBUG("receiving data segment");
error = icl_pdu_receive_data_segment(request, availablep, error = icl_pdu_receive_data_segment(request, r, rs,
&more_needed); &more_needed);
if (error != 0) { if (error != 0) {
ICL_DEBUG("failed to receive data segment;" ICL_DEBUG("failed to receive data segment;"
@ -735,7 +631,7 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
case ICL_CONN_STATE_DATA_DIGEST: case ICL_CONN_STATE_DATA_DIGEST:
//ICL_DEBUG("receiving data digest"); //ICL_DEBUG("receiving data digest");
error = icl_pdu_check_data_digest(request, availablep); error = icl_pdu_check_data_digest(request, r, rs);
if (error != 0) { if (error != 0) {
ICL_DEBUG("data digest failed; " ICL_DEBUG("data digest failed; "
"dropping connection"); "dropping connection");
@ -767,44 +663,27 @@ icl_conn_receive_pdu(struct icl_conn *ic, size_t *availablep)
} }
static void static void
icl_conn_receive_pdus(struct icl_conn *ic, size_t available) icl_conn_receive_pdus(struct icl_conn *ic, struct mbuf **r, size_t *rs)
{ {
struct icl_pdu *response; struct icl_pdu *response;
struct socket *so;
so = ic->ic_socket;
/*
* This can never happen; we're careful to only mess with ic->ic_socket
* pointer when the send/receive threads are not running.
*/
KASSERT(so != NULL, ("NULL socket"));
for (;;) { for (;;) {
if (ic->ic_disconnecting) if (ic->ic_disconnecting)
return; return;
if (so->so_error != 0) {
ICL_DEBUG("connection error %d; "
"dropping connection", so->so_error);
icl_conn_fail(ic);
return;
}
/* /*
* Loop until we have a complete PDU or there is not enough * Loop until we have a complete PDU or there is not enough
* data in the socket buffer. * data in the socket buffer.
*/ */
if (available < ic->ic_receive_len) { if (*rs < ic->ic_receive_len) {
#if 0 #if 0
ICL_DEBUG("not enough data; have %zd, " ICL_DEBUG("not enough data; have %zd, need %zd",
"need %zd", available, *rs, ic->ic_receive_len);
ic->ic_receive_len);
#endif #endif
return; return;
} }
response = icl_conn_receive_pdu(ic, &available); response = icl_conn_receive_pdu(ic, r, rs);
if (response == NULL) if (response == NULL)
continue; continue;
@ -825,15 +704,19 @@ static void
icl_receive_thread(void *arg) icl_receive_thread(void *arg)
{ {
struct icl_conn *ic; struct icl_conn *ic;
size_t available; size_t available, read = 0;
struct socket *so; struct socket *so;
struct mbuf *m, *r = NULL;
struct uio uio;
int error, flags;
ic = arg; ic = arg;
so = ic->ic_socket; so = ic->ic_socket;
for (;;) { for (;;) {
SOCKBUF_LOCK(&so->so_rcv);
if (ic->ic_disconnecting) { if (ic->ic_disconnecting) {
//ICL_DEBUG("terminating"); SOCKBUF_UNLOCK(&so->so_rcv);
break; break;
} }
@ -843,18 +726,50 @@ icl_receive_thread(void *arg)
* to avoid unnecessary wakeups until there * to avoid unnecessary wakeups until there
* is enough data received to read the PDU. * is enough data received to read the PDU.
*/ */
SOCKBUF_LOCK(&so->so_rcv);
available = sbavail(&so->so_rcv); available = sbavail(&so->so_rcv);
if (available < ic->ic_receive_len) { if (read + available < ic->ic_receive_len) {
so->so_rcv.sb_lowat = ic->ic_receive_len; so->so_rcv.sb_lowat = ic->ic_receive_len - read;
cv_wait(&ic->ic_receive_cv, &so->so_rcv.sb_mtx); cv_wait(&ic->ic_receive_cv, &so->so_rcv.sb_mtx);
} else
so->so_rcv.sb_lowat = so->so_rcv.sb_hiwat + 1; so->so_rcv.sb_lowat = so->so_rcv.sb_hiwat + 1;
available = sbavail(&so->so_rcv);
}
SOCKBUF_UNLOCK(&so->so_rcv); SOCKBUF_UNLOCK(&so->so_rcv);
icl_conn_receive_pdus(ic, available); if (available == 0) {
if (so->so_error != 0) {
ICL_DEBUG("connection error %d; "
"dropping connection", so->so_error);
icl_conn_fail(ic);
break;
}
continue;
}
memset(&uio, 0, sizeof(uio));
uio.uio_resid = available;
flags = MSG_DONTWAIT;
error = soreceive(so, NULL, &uio, &m, NULL, &flags);
if (error != 0) {
ICL_DEBUG("soreceive error %d", error);
break;
}
if (uio.uio_resid != 0) {
m_freem(m);
ICL_DEBUG("short read");
break;
}
if (r)
m_cat(r, m);
else
r = m;
read += available;
icl_conn_receive_pdus(ic, &r, &read);
} }
if (r)
m_freem(r);
ICL_CONN_LOCK(ic); ICL_CONN_LOCK(ic);
ic->ic_receive_running = false; ic->ic_receive_running = false;
cv_signal(&ic->ic_send_cv); cv_signal(&ic->ic_send_cv);
@ -1440,12 +1355,17 @@ icl_soft_conn_close(struct icl_conn *ic)
struct icl_pdu *pdu; struct icl_pdu *pdu;
struct socket *so; struct socket *so;
ICL_CONN_LOCK(ic);
/* /*
* Wake up the threads, so they can properly terminate. * Wake up the threads, so they can properly terminate.
* Receive thread sleeps on so->so_rcv lock, send on ic->ic_lock.
*/ */
ic->ic_disconnecting = true; ICL_CONN_LOCK(ic);
if (!ic->ic_disconnecting) {
so = ic->ic_socket;
SOCKBUF_LOCK(&so->so_rcv);
ic->ic_disconnecting = true;
SOCKBUF_UNLOCK(&so->so_rcv);
}
while (ic->ic_receive_running || ic->ic_send_running) { while (ic->ic_receive_running || ic->ic_send_running) {
cv_signal(&ic->ic_receive_cv); cv_signal(&ic->ic_receive_cv);
cv_signal(&ic->ic_send_cv); cv_signal(&ic->ic_send_cv);