MFC r259632:
Rework flow control for connection-oriented (TCP) RPC server. When processing receive buffer, write the amount of data, expected in present request record, into socket's so_rcv.sb_lowat to make stack aware about our needs. When processing following upcalls, ignore them until socket collect enough data to be read and processed in one turn. This change reduces number of context switches and other operations in RPC stack during large NFS writes (especially via non-Jumbo networks) by order of magnitude. After precessing current packet, take another look into the pending buffer to find out whether the next packet had been already received. If not, deactivate this port right there without making RPC code to push this port to another thread just to find that there is nothing. If the next packet is received partially, also deactivate the port, but also update socket's so_rcv.sb_lowat to not be woken up prematurely. This change additionally reduces number of context switches per NFS request about in half.
This commit is contained in:
parent
4202d51e16
commit
01e3debc7e
241
sys/rpc/svc_vc.c
241
sys/rpc/svc_vc.c
@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
* We must re-test for new connections after taking
|
||||
* the lock to protect us in the case where a new
|
||||
* connection arrives after our call to accept fails
|
||||
* with EWOULDBLOCK. The pool lock protects us from
|
||||
* racing the upcall after our TAILQ_EMPTY() call
|
||||
* returns false.
|
||||
* with EWOULDBLOCK.
|
||||
*/
|
||||
ACCEPT_LOCK();
|
||||
mtx_lock(&xprt->xp_pool->sp_lock);
|
||||
if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
|
||||
xprt_inactive_locked(xprt);
|
||||
mtx_unlock(&xprt->xp_pool->sp_lock);
|
||||
xprt_inactive(xprt);
|
||||
ACCEPT_UNLOCK();
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
return (FALSE);
|
||||
@ -526,35 +522,14 @@ static enum xprt_stat
|
||||
svc_vc_stat(SVCXPRT *xprt)
|
||||
{
|
||||
struct cf_conn *cd;
|
||||
struct mbuf *m;
|
||||
size_t n;
|
||||
|
||||
cd = (struct cf_conn *)(xprt->xp_p1);
|
||||
|
||||
if (cd->strm_stat == XPRT_DIED)
|
||||
return (XPRT_DIED);
|
||||
|
||||
/*
|
||||
* Return XPRT_MOREREQS if we have buffered data and we are
|
||||
* mid-record or if we have enough data for a record
|
||||
* marker. Since this is only a hint, we read mpending and
|
||||
* resid outside the lock. We do need to take the lock if we
|
||||
* have to traverse the mbuf chain.
|
||||
*/
|
||||
if (cd->mpending) {
|
||||
if (cd->resid)
|
||||
return (XPRT_MOREREQS);
|
||||
n = 0;
|
||||
sx_xlock(&xprt->xp_lock);
|
||||
m = cd->mpending;
|
||||
while (m && n < sizeof(uint32_t)) {
|
||||
n += m->m_len;
|
||||
m = m->m_next;
|
||||
}
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
if (n >= sizeof(uint32_t))
|
||||
return (XPRT_MOREREQS);
|
||||
}
|
||||
if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
|
||||
return (XPRT_MOREREQS);
|
||||
|
||||
if (soreadable(xprt->xp_socket))
|
||||
return (XPRT_MOREREQS);
|
||||
@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt)
|
||||
return (XPRT_IDLE);
|
||||
}
|
||||
|
||||
/*
|
||||
* If we have an mbuf chain in cd->mpending, try to parse a record from it,
|
||||
* leaving the result in cd->mreq. If we don't have a complete record, leave
|
||||
* the partial result in cd->mreq and try to read more from the socket.
|
||||
*/
|
||||
static void
|
||||
svc_vc_process_pending(SVCXPRT *xprt)
|
||||
{
|
||||
struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
|
||||
struct socket *so = xprt->xp_socket;
|
||||
struct mbuf *m;
|
||||
|
||||
/*
|
||||
* If cd->resid is non-zero, we have part of the
|
||||
* record already, otherwise we are expecting a record
|
||||
* marker.
|
||||
*/
|
||||
if (!cd->resid && cd->mpending) {
|
||||
/*
|
||||
* See if there is enough data buffered to
|
||||
* make up a record marker. Make sure we can
|
||||
* handle the case where the record marker is
|
||||
* split across more than one mbuf.
|
||||
*/
|
||||
size_t n = 0;
|
||||
uint32_t header;
|
||||
|
||||
m = cd->mpending;
|
||||
while (n < sizeof(uint32_t) && m) {
|
||||
n += m->m_len;
|
||||
m = m->m_next;
|
||||
}
|
||||
if (n < sizeof(uint32_t)) {
|
||||
so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
|
||||
return;
|
||||
}
|
||||
m_copydata(cd->mpending, 0, sizeof(header),
|
||||
(char *)&header);
|
||||
header = ntohl(header);
|
||||
cd->eor = (header & 0x80000000) != 0;
|
||||
cd->resid = header & 0x7fffffff;
|
||||
m_adj(cd->mpending, sizeof(uint32_t));
|
||||
}
|
||||
|
||||
/*
|
||||
* Start pulling off mbufs from cd->mpending
|
||||
* until we either have a complete record or
|
||||
* we run out of data. We use m_split to pull
|
||||
* data - it will pull as much as possible and
|
||||
* split the last mbuf if necessary.
|
||||
*/
|
||||
while (cd->mpending && cd->resid) {
|
||||
m = cd->mpending;
|
||||
if (cd->mpending->m_next
|
||||
|| cd->mpending->m_len > cd->resid)
|
||||
cd->mpending = m_split(cd->mpending,
|
||||
cd->resid, M_WAITOK);
|
||||
else
|
||||
cd->mpending = NULL;
|
||||
if (cd->mreq)
|
||||
m_last(cd->mreq)->m_next = m;
|
||||
else
|
||||
cd->mreq = m;
|
||||
while (m) {
|
||||
cd->resid -= m->m_len;
|
||||
m = m->m_next;
|
||||
}
|
||||
}
|
||||
|
||||
so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
|
||||
}
|
||||
|
||||
static bool_t
|
||||
svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
struct sockaddr **addrp, struct mbuf **mp)
|
||||
@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
|
||||
struct uio uio;
|
||||
struct mbuf *m;
|
||||
struct socket* so = xprt->xp_socket;
|
||||
XDR xdrs;
|
||||
int error, rcvflag;
|
||||
|
||||
@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
sx_xlock(&xprt->xp_lock);
|
||||
|
||||
for (;;) {
|
||||
/*
|
||||
* If we have an mbuf chain in cd->mpending, try to parse a
|
||||
* record from it, leaving the result in cd->mreq. If we don't
|
||||
* have a complete record, leave the partial result in
|
||||
* cd->mreq and try to read more from the socket.
|
||||
*/
|
||||
if (cd->mpending) {
|
||||
/*
|
||||
* If cd->resid is non-zero, we have part of the
|
||||
* record already, otherwise we are expecting a record
|
||||
* marker.
|
||||
*/
|
||||
if (!cd->resid) {
|
||||
/*
|
||||
* See if there is enough data buffered to
|
||||
* make up a record marker. Make sure we can
|
||||
* handle the case where the record marker is
|
||||
* split across more than one mbuf.
|
||||
*/
|
||||
size_t n = 0;
|
||||
uint32_t header;
|
||||
/* If we have no request ready, check pending queue. */
|
||||
while (cd->mpending &&
|
||||
(cd->mreq == NULL || cd->resid != 0 || !cd->eor))
|
||||
svc_vc_process_pending(xprt);
|
||||
|
||||
m = cd->mpending;
|
||||
while (n < sizeof(uint32_t) && m) {
|
||||
n += m->m_len;
|
||||
m = m->m_next;
|
||||
}
|
||||
if (n < sizeof(uint32_t))
|
||||
goto readmore;
|
||||
m_copydata(cd->mpending, 0, sizeof(header),
|
||||
(char *)&header);
|
||||
header = ntohl(header);
|
||||
cd->eor = (header & 0x80000000) != 0;
|
||||
cd->resid = header & 0x7fffffff;
|
||||
m_adj(cd->mpending, sizeof(uint32_t));
|
||||
/* Process and return complete request in cd->mreq. */
|
||||
if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
|
||||
|
||||
xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
|
||||
cd->mreq = NULL;
|
||||
|
||||
/* Check for next request in a pending queue. */
|
||||
svc_vc_process_pending(xprt);
|
||||
if (cd->mreq == NULL || cd->resid != 0) {
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
if (!soreadable(so))
|
||||
xprt_inactive(xprt);
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
}
|
||||
|
||||
/*
|
||||
* Start pulling off mbufs from cd->mpending
|
||||
* until we either have a complete record or
|
||||
* we run out of data. We use m_split to pull
|
||||
* data - it will pull as much as possible and
|
||||
* split the last mbuf if necessary.
|
||||
*/
|
||||
while (cd->mpending && cd->resid) {
|
||||
m = cd->mpending;
|
||||
if (cd->mpending->m_next
|
||||
|| cd->mpending->m_len > cd->resid)
|
||||
cd->mpending = m_split(cd->mpending,
|
||||
cd->resid, M_WAITOK);
|
||||
else
|
||||
cd->mpending = NULL;
|
||||
if (cd->mreq)
|
||||
m_last(cd->mreq)->m_next = m;
|
||||
else
|
||||
cd->mreq = m;
|
||||
while (m) {
|
||||
cd->resid -= m->m_len;
|
||||
m = m->m_next;
|
||||
}
|
||||
}
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
|
||||
/*
|
||||
* If cd->resid is zero now, we have managed to
|
||||
* receive a record fragment from the stream. Check
|
||||
* for the end-of-record mark to see if we need more.
|
||||
*/
|
||||
if (cd->resid == 0) {
|
||||
if (!cd->eor)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Success - we have a complete record in
|
||||
* cd->mreq.
|
||||
*/
|
||||
xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
|
||||
cd->mreq = NULL;
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
|
||||
if (! xdr_callmsg(&xdrs, msg)) {
|
||||
XDR_DESTROY(&xdrs);
|
||||
return (FALSE);
|
||||
}
|
||||
|
||||
*addrp = NULL;
|
||||
*mp = xdrmbuf_getall(&xdrs);
|
||||
if (! xdr_callmsg(&xdrs, msg)) {
|
||||
XDR_DESTROY(&xdrs);
|
||||
|
||||
return (TRUE);
|
||||
return (FALSE);
|
||||
}
|
||||
|
||||
*addrp = NULL;
|
||||
*mp = xdrmbuf_getall(&xdrs);
|
||||
XDR_DESTROY(&xdrs);
|
||||
|
||||
return (TRUE);
|
||||
}
|
||||
|
||||
readmore:
|
||||
/*
|
||||
* The socket upcall calls xprt_active() which will eventually
|
||||
* cause the server to call us here. We attempt to
|
||||
@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
uio.uio_td = curthread;
|
||||
m = NULL;
|
||||
rcvflag = MSG_DONTWAIT;
|
||||
error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
|
||||
&rcvflag);
|
||||
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
|
||||
|
||||
if (error == EWOULDBLOCK) {
|
||||
/*
|
||||
@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
|
||||
* taking the lock to protect us in the case
|
||||
* where a new packet arrives on the socket
|
||||
* after our call to soreceive fails with
|
||||
* EWOULDBLOCK. The pool lock protects us from
|
||||
* racing the upcall after our soreadable()
|
||||
* call returns false.
|
||||
* EWOULDBLOCK.
|
||||
*/
|
||||
mtx_lock(&xprt->xp_pool->sp_lock);
|
||||
if (!soreadable(xprt->xp_socket))
|
||||
xprt_inactive_locked(xprt);
|
||||
mtx_unlock(&xprt->xp_pool->sp_lock);
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
if (!soreadable(so))
|
||||
xprt_inactive(xprt);
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
return (FALSE);
|
||||
}
|
||||
|
||||
if (error) {
|
||||
SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
if (xprt->xp_upcallset) {
|
||||
xprt->xp_upcallset = 0;
|
||||
soupcall_clear(xprt->xp_socket, SO_RCV);
|
||||
soupcall_clear(so, SO_RCV);
|
||||
}
|
||||
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
xprt_inactive(xprt);
|
||||
cd->strm_stat = XPRT_DIED;
|
||||
sx_xunlock(&xprt->xp_lock);
|
||||
@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
|
||||
{
|
||||
SVCXPRT *xprt = (SVCXPRT *) arg;
|
||||
|
||||
xprt_active(xprt);
|
||||
if (soreadable(xprt->xp_socket))
|
||||
xprt_active(xprt);
|
||||
return (SU_OK);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user