Modify the way the client side krpc does soreceive() for TCP.
Without this patch, clnt_vc_soupcall() first does a soreceive() for 4 bytes (the Sun RPC over TCP record mark) and then soreceive(s) for the RPC message. This first soreceive() almost always results in an mbuf allocation, since having the 4byte record mark in a separate mbuf in the socket rcv queue is unlikely. This is somewhat inefficient and rather odd. It also will not work for the ktls rx, since the latter returns a TLS record for each soreceive(). This patch replaces the above with code similar to what the server side of the krpc does for TCP, where it does a soreceive() for as much data as possible and then parses RPC messages out of the received data. A new field of the TCP socket structure called ct_raw is the list of received mbufs that the RPC message(s) are parsed from. I think this results in cleaner code and is needed for support of nfs-over-tls. It also fixes the code for the case where a server sends an RPC message in multiple RPC message fragments. Although this is allowed by RFC5531, no extant NFS server does this. However, it is probably good to fix this in case some future NFS server does do this.
This commit is contained in:
parent
cf2dc6a9cc
commit
ab0b2f3a9e
@ -269,6 +269,7 @@ clnt_vc_create(
|
||||
soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
|
||||
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
|
||||
|
||||
ct->ct_raw = NULL;
|
||||
ct->ct_record = NULL;
|
||||
ct->ct_record_resid = 0;
|
||||
TAILQ_INIT(&ct->ct_pending);
|
||||
@ -826,6 +827,8 @@ clnt_vc_destroy(CLIENT *cl)
|
||||
soshutdown(so, SHUT_WR);
|
||||
soclose(so);
|
||||
}
|
||||
m_freem(ct->ct_record);
|
||||
m_freem(ct->ct_raw);
|
||||
mem_free(ct, sizeof(struct ct_data));
|
||||
if (cl->cl_netid && cl->cl_netid[0])
|
||||
mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
|
||||
@ -854,122 +857,118 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
|
||||
struct ct_request *cr;
|
||||
int error, rcvflag, foundreq;
|
||||
uint32_t xid_plus_direction[2], header;
|
||||
bool_t do_read;
|
||||
SVCXPRT *xprt;
|
||||
struct cf_conn *cd;
|
||||
u_int rawlen;
|
||||
|
||||
CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
|
||||
/*
|
||||
* If another thread is already here, it must be in
|
||||
* soreceive(), so just return to avoid races with it.
|
||||
* ct_upcallrefs is protected by the SOCKBUF_LOCK(),
|
||||
* which is held in this function, except when
|
||||
* soreceive() is called.
|
||||
*/
|
||||
if (ct->ct_upcallrefs > 0)
|
||||
return (SU_OK);
|
||||
ct->ct_upcallrefs++;
|
||||
uio.uio_td = curthread;
|
||||
do {
|
||||
|
||||
/*
|
||||
* Read as much as possible off the socket and link it
|
||||
* onto ct_raw.
|
||||
*/
|
||||
for (;;) {
|
||||
uio.uio_resid = 1000000000;
|
||||
uio.uio_td = curthread;
|
||||
m2 = m = NULL;
|
||||
rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
|
||||
if (error == EWOULDBLOCK) {
|
||||
/*
|
||||
* We must re-test for readability after
|
||||
* taking the lock to protect us in the case
|
||||
* where a new packet arrives on the socket
|
||||
* after our call to soreceive fails with
|
||||
* EWOULDBLOCK.
|
||||
*/
|
||||
error = 0;
|
||||
if (!soreadable(so))
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
if (error == 0 && m == NULL) {
|
||||
/*
|
||||
* We must have got EOF trying
|
||||
* to read from the stream.
|
||||
*/
|
||||
error = ECONNRESET;
|
||||
}
|
||||
if (error != 0)
|
||||
break;
|
||||
|
||||
if (ct->ct_raw != NULL)
|
||||
m_last(ct->ct_raw)->m_next = m;
|
||||
else
|
||||
ct->ct_raw = m;
|
||||
}
|
||||
rawlen = m_length(ct->ct_raw, NULL);
|
||||
|
||||
/* Now, process as much of ct_raw as possible. */
|
||||
for (;;) {
|
||||
/*
|
||||
* If ct_record_resid is zero, we are waiting for a
|
||||
* record mark.
|
||||
*/
|
||||
if (ct->ct_record_resid == 0) {
|
||||
|
||||
/*
|
||||
* Make sure there is either a whole record
|
||||
* mark in the buffer or there is some other
|
||||
* error condition
|
||||
*/
|
||||
do_read = FALSE;
|
||||
if (sbavail(&so->so_rcv) >= sizeof(uint32_t)
|
||||
|| (so->so_rcv.sb_state & SBS_CANTRCVMORE)
|
||||
|| so->so_error)
|
||||
do_read = TRUE;
|
||||
|
||||
if (!do_read)
|
||||
if (rawlen < sizeof(uint32_t))
|
||||
break;
|
||||
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
uio.uio_resid = sizeof(uint32_t);
|
||||
m = NULL;
|
||||
rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
|
||||
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
|
||||
if (error == EWOULDBLOCK)
|
||||
break;
|
||||
|
||||
/*
|
||||
* If there was an error, wake up all pending
|
||||
* requests.
|
||||
*/
|
||||
if (error || uio.uio_resid > 0) {
|
||||
wakeup_all:
|
||||
mtx_lock(&ct->ct_lock);
|
||||
if (!error) {
|
||||
/*
|
||||
* We must have got EOF trying
|
||||
* to read from the stream.
|
||||
*/
|
||||
error = ECONNRESET;
|
||||
}
|
||||
ct->ct_error.re_status = RPC_CANTRECV;
|
||||
ct->ct_error.re_errno = error;
|
||||
TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
|
||||
cr->cr_error = error;
|
||||
wakeup(cr);
|
||||
}
|
||||
mtx_unlock(&ct->ct_lock);
|
||||
break;
|
||||
}
|
||||
m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
|
||||
m_copydata(ct->ct_raw, 0, sizeof(uint32_t),
|
||||
(char *)&header);
|
||||
header = ntohl(header);
|
||||
ct->ct_record = NULL;
|
||||
ct->ct_record_resid = header & 0x7fffffff;
|
||||
ct->ct_record_eor = ((header & 0x80000000) != 0);
|
||||
m_freem(m);
|
||||
m_adj(ct->ct_raw, sizeof(uint32_t));
|
||||
rawlen -= sizeof(uint32_t);
|
||||
} else {
|
||||
/*
|
||||
* Wait until the socket has the whole record
|
||||
* buffered.
|
||||
* Move as much of the record as possible to
|
||||
* ct_record.
|
||||
*/
|
||||
do_read = FALSE;
|
||||
if (sbavail(&so->so_rcv) >= ct->ct_record_resid
|
||||
|| (so->so_rcv.sb_state & SBS_CANTRCVMORE)
|
||||
|| so->so_error)
|
||||
do_read = TRUE;
|
||||
|
||||
if (!do_read)
|
||||
if (rawlen == 0)
|
||||
break;
|
||||
|
||||
/*
|
||||
* We have the record mark. Read as much as
|
||||
* the socket has buffered up to the end of
|
||||
* this record.
|
||||
*/
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
uio.uio_resid = ct->ct_record_resid;
|
||||
m = NULL;
|
||||
rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
|
||||
error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
|
||||
if (error == EWOULDBLOCK)
|
||||
if (rawlen <= ct->ct_record_resid) {
|
||||
if (ct->ct_record != NULL)
|
||||
m_last(ct->ct_record)->m_next =
|
||||
ct->ct_raw;
|
||||
else
|
||||
ct->ct_record = ct->ct_raw;
|
||||
ct->ct_raw = NULL;
|
||||
ct->ct_record_resid -= rawlen;
|
||||
rawlen = 0;
|
||||
} else {
|
||||
m = m_split(ct->ct_raw, ct->ct_record_resid,
|
||||
M_NOWAIT);
|
||||
if (m == NULL)
|
||||
break;
|
||||
if (ct->ct_record != NULL)
|
||||
m_last(ct->ct_record)->m_next =
|
||||
ct->ct_raw;
|
||||
else
|
||||
ct->ct_record = ct->ct_raw;
|
||||
rawlen -= ct->ct_record_resid;
|
||||
ct->ct_record_resid = 0;
|
||||
ct->ct_raw = m;
|
||||
}
|
||||
if (ct->ct_record_resid > 0)
|
||||
break;
|
||||
|
||||
if (error || uio.uio_resid == ct->ct_record_resid)
|
||||
goto wakeup_all;
|
||||
|
||||
/*
|
||||
* If we have part of the record already,
|
||||
* chain this bit onto the end.
|
||||
*/
|
||||
if (ct->ct_record)
|
||||
m_last(ct->ct_record)->m_next = m;
|
||||
else
|
||||
ct->ct_record = m;
|
||||
|
||||
ct->ct_record_resid = uio.uio_resid;
|
||||
|
||||
/*
|
||||
* If we have the entire record, see if we can
|
||||
* match it to a request.
|
||||
*/
|
||||
if (ct->ct_record_resid == 0
|
||||
&& ct->ct_record_eor) {
|
||||
if (ct->ct_record_eor) {
|
||||
/*
|
||||
* The XID is in the first uint32_t of
|
||||
* the reply and the message direction
|
||||
@ -979,8 +978,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
|
||||
sizeof(xid_plus_direction) &&
|
||||
m_length(ct->ct_record, NULL) <
|
||||
sizeof(xid_plus_direction)) {
|
||||
m_freem(ct->ct_record);
|
||||
break;
|
||||
/*
|
||||
* What to do now?
|
||||
* The data in the TCP stream is
|
||||
* corrupted such that there is no
|
||||
* valid RPC message to parse.
|
||||
* I think it best to close this
|
||||
* connection and allow
|
||||
* clnt_reconnect_XXX() to try
|
||||
* and establish a new one.
|
||||
*/
|
||||
printf("clnt_vc_soupcall: "
|
||||
"connection data corrupted\n");
|
||||
error = ECONNRESET;
|
||||
goto wakeup_all;
|
||||
}
|
||||
m_copydata(ct->ct_record, 0,
|
||||
sizeof(xid_plus_direction),
|
||||
@ -1057,7 +1068,26 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (m);
|
||||
}
|
||||
|
||||
if (error != 0) {
|
||||
wakeup_all:
|
||||
/*
|
||||
* This socket is broken, so mark that it cannot
|
||||
* receive and fail all RPCs waiting for a reply
|
||||
* on it, so that they will be retried on a new
|
||||
* TCP connection created by clnt_reconnect_X().
|
||||
*/
|
||||
mtx_lock(&ct->ct_lock);
|
||||
ct->ct_error.re_status = RPC_CANTRECV;
|
||||
ct->ct_error.re_errno = error;
|
||||
TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
|
||||
cr->cr_error = error;
|
||||
wakeup(cr);
|
||||
}
|
||||
mtx_unlock(&ct->ct_lock);
|
||||
}
|
||||
|
||||
ct->ct_upcallrefs--;
|
||||
if (ct->ct_upcallrefs < 0)
|
||||
panic("rpcvc upcall refcnt");
|
||||
|
@ -101,6 +101,7 @@ struct ct_data {
|
||||
struct ct_request_list ct_pending;
|
||||
int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
|
||||
SVCXPRT *ct_backchannelxprt; /* xprt for backchannel */
|
||||
struct mbuf *ct_raw; /* Raw mbufs recv'd */
|
||||
};
|
||||
|
||||
struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
|
||||
|
Loading…
Reference in New Issue
Block a user