diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c index 0231c303f089..253caee912a2 100644 --- a/sys/rpc/clnt_vc.c +++ b/sys/rpc/clnt_vc.c @@ -269,6 +269,7 @@ clnt_vc_create( soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); + ct->ct_raw = NULL; ct->ct_record = NULL; ct->ct_record_resid = 0; TAILQ_INIT(&ct->ct_pending); @@ -826,6 +827,8 @@ clnt_vc_destroy(CLIENT *cl) soshutdown(so, SHUT_WR); soclose(so); } + m_freem(ct->ct_record); + m_freem(ct->ct_raw); mem_free(ct, sizeof(struct ct_data)); if (cl->cl_netid && cl->cl_netid[0]) mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); @@ -854,122 +857,118 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) struct ct_request *cr; int error, rcvflag, foundreq; uint32_t xid_plus_direction[2], header; - bool_t do_read; SVCXPRT *xprt; struct cf_conn *cd; + u_int rawlen; - CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); + /* + * If another thread is already here, it must be in + * soreceive(), so just return to avoid races with it. + * ct_upcallrefs is protected by the SOCKBUF_LOCK(), + * which is held in this function, except when + * soreceive() is called. + */ + if (ct->ct_upcallrefs > 0) + return (SU_OK); ct->ct_upcallrefs++; - uio.uio_td = curthread; - do { + + /* + * Read as much as possible off the socket and link it + * onto ct_raw. + */ + for (;;) { + uio.uio_resid = 1000000000; + uio.uio_td = curthread; + m2 = m = NULL; + rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; + SOCKBUF_UNLOCK(&so->so_rcv); + error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); + SOCKBUF_LOCK(&so->so_rcv); + + if (error == EWOULDBLOCK) { + /* + * We must re-test for readability after + * taking the lock to protect us in the case + * where a new packet arrives on the socket + * after our call to soreceive fails with + * EWOULDBLOCK. + */ + error = 0; + if (!soreadable(so)) + break; + continue; + } + if (error == 0 && m == NULL) { + /* + * We must have got EOF trying + * to read from the stream. + */ + error = ECONNRESET; + } + if (error != 0) + break; + + if (ct->ct_raw != NULL) + m_last(ct->ct_raw)->m_next = m; + else + ct->ct_raw = m; + } + rawlen = m_length(ct->ct_raw, NULL); + + /* Now, process as much of ct_raw as possible. */ + for (;;) { /* * If ct_record_resid is zero, we are waiting for a * record mark. */ if (ct->ct_record_resid == 0) { - - /* - * Make sure there is either a whole record - * mark in the buffer or there is some other - * error condition - */ - do_read = FALSE; - if (sbavail(&so->so_rcv) >= sizeof(uint32_t) - || (so->so_rcv.sb_state & SBS_CANTRCVMORE) - || so->so_error) - do_read = TRUE; - - if (!do_read) + if (rawlen < sizeof(uint32_t)) break; - - SOCKBUF_UNLOCK(&so->so_rcv); - uio.uio_resid = sizeof(uint32_t); - m = NULL; - rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; - error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); - SOCKBUF_LOCK(&so->so_rcv); - - if (error == EWOULDBLOCK) - break; - - /* - * If there was an error, wake up all pending - * requests. - */ - if (error || uio.uio_resid > 0) { - wakeup_all: - mtx_lock(&ct->ct_lock); - if (!error) { - /* - * We must have got EOF trying - * to read from the stream. - */ - error = ECONNRESET; - } - ct->ct_error.re_status = RPC_CANTRECV; - ct->ct_error.re_errno = error; - TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { - cr->cr_error = error; - wakeup(cr); - } - mtx_unlock(&ct->ct_lock); - break; - } - m_copydata(m, 0, sizeof(uint32_t), (char *)&header); + m_copydata(ct->ct_raw, 0, sizeof(uint32_t), + (char *)&header); header = ntohl(header); - ct->ct_record = NULL; ct->ct_record_resid = header & 0x7fffffff; ct->ct_record_eor = ((header & 0x80000000) != 0); - m_freem(m); + m_adj(ct->ct_raw, sizeof(uint32_t)); + rawlen -= sizeof(uint32_t); } else { /* - * Wait until the socket has the whole record - * buffered. + * Move as much of the record as possible to + * ct_record. */ - do_read = FALSE; - if (sbavail(&so->so_rcv) >= ct->ct_record_resid - || (so->so_rcv.sb_state & SBS_CANTRCVMORE) - || so->so_error) - do_read = TRUE; - - if (!do_read) + if (rawlen == 0) break; - - /* - * We have the record mark. Read as much as - * the socket has buffered up to the end of - * this record. - */ - SOCKBUF_UNLOCK(&so->so_rcv); - uio.uio_resid = ct->ct_record_resid; - m = NULL; - rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; - error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); - SOCKBUF_LOCK(&so->so_rcv); - - if (error == EWOULDBLOCK) + if (rawlen <= ct->ct_record_resid) { + if (ct->ct_record != NULL) + m_last(ct->ct_record)->m_next = + ct->ct_raw; + else + ct->ct_record = ct->ct_raw; + ct->ct_raw = NULL; + ct->ct_record_resid -= rawlen; + rawlen = 0; + } else { + m = m_split(ct->ct_raw, ct->ct_record_resid, + M_NOWAIT); + if (m == NULL) + break; + if (ct->ct_record != NULL) + m_last(ct->ct_record)->m_next = + ct->ct_raw; + else + ct->ct_record = ct->ct_raw; + rawlen -= ct->ct_record_resid; + ct->ct_record_resid = 0; + ct->ct_raw = m; + } + if (ct->ct_record_resid > 0) break; - if (error || uio.uio_resid == ct->ct_record_resid) - goto wakeup_all; - - /* - * If we have part of the record already, - * chain this bit onto the end. - */ - if (ct->ct_record) - m_last(ct->ct_record)->m_next = m; - else - ct->ct_record = m; - - ct->ct_record_resid = uio.uio_resid; - /* * If we have the entire record, see if we can * match it to a request. */ - if (ct->ct_record_resid == 0 - && ct->ct_record_eor) { + if (ct->ct_record_eor) { /* * The XID is in the first uint32_t of * the reply and the message direction @@ -979,8 +978,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) sizeof(xid_plus_direction) && m_length(ct->ct_record, NULL) < sizeof(xid_plus_direction)) { - m_freem(ct->ct_record); - break; + /* + * What to do now? + * The data in the TCP stream is + * corrupted such that there is no + * valid RPC message to parse. + * I think it best to close this + * connection and allow + * clnt_reconnect_XXX() to try + * and establish a new one. + */ + printf("clnt_vc_soupcall: " + "connection data corrupted\n"); + error = ECONNRESET; + goto wakeup_all; } m_copydata(ct->ct_record, 0, sizeof(xid_plus_direction), @@ -1057,7 +1068,26 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) } } } - } while (m); + } + + if (error != 0) { + wakeup_all: + /* + * This socket is broken, so mark that it cannot + * receive and fail all RPCs waiting for a reply + * on it, so that they will be retried on a new + * TCP connection created by clnt_reconnect_X(). + */ + mtx_lock(&ct->ct_lock); + ct->ct_error.re_status = RPC_CANTRECV; + ct->ct_error.re_errno = error; + TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { + cr->cr_error = error; + wakeup(cr); + } + mtx_unlock(&ct->ct_lock); + } + ct->ct_upcallrefs--; if (ct->ct_upcallrefs < 0) panic("rpcvc upcall refcnt"); diff --git a/sys/rpc/krpc.h b/sys/rpc/krpc.h index 43fd742db71a..978edbe8ca48 100644 --- a/sys/rpc/krpc.h +++ b/sys/rpc/krpc.h @@ -101,6 +101,7 @@ struct ct_data { struct ct_request_list ct_pending; int ct_upcallrefs; /* Ref cnt of upcalls in prog. */ SVCXPRT *ct_backchannelxprt; /* xprt for backchannel */ + struct mbuf *ct_raw; /* Raw mbufs recv'd */ }; struct cf_conn { /* kept in xprt->xp_p1 for actual connection */