Add soreceive_stream(), an optimized version of soreceive() for

stream (TCP) sockets.

It is functionally identical to generic soreceive() but has a
number stream specific optimizations:
o does only one sockbuf unlock/lock per receive independent of
  the length of data to be moved into the uio compared to
  soreceive() which unlocks/locks per *mbuf*.
o uses m_mbuftouio() instead of its own copy(out) variant.
o much more compact code flow as a large number of special
  cases is removed.
o much improved reability.

It offers significantly reduced CPU usage and lock contention
when receiving fast TCP streams.  Additional gains are obtained
when the receiving application is using SO_RCVLOWAT to batch up
some data before a read (and wakeup) is done.

This function was written by "reverse engineering" and is not
just a stripped down variant of soreceive().

It is not yet enabled by default on TCP sockets.  Instead it is
commented out in the protocol initialization in tcp_usrreq.c
until more widespread testing has been done.

Testers, especially with 10GigE gear, are welcome.

MFP4:	r164817 //depot/user/andre/soreceive_stream/
This commit is contained in:
Andre Oppermann 2009-06-22 23:08:05 +00:00
parent 412501cdfd
commit ef760e6ad2
3 changed files with 205 additions and 0 deletions

View File

@ -1856,6 +1856,202 @@ soreceive_generic(struct socket *so, struct sockaddr **psa, struct uio *uio,
return (error);
}
/*
* Optimized version of soreceive() for stream (TCP) sockets.
*/
int
soreceive_stream(struct socket *so, struct sockaddr **psa, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
{
int len = 0, error = 0, flags, oresid;
struct sockbuf *sb;
struct mbuf *m, *n = NULL;
/* We only do stream sockets. */
if (so->so_type != SOCK_STREAM)
return (EINVAL);
if (psa != NULL)
*psa = NULL;
if (controlp != NULL)
return (EINVAL);
if (flagsp != NULL)
flags = *flagsp &~ MSG_EOR;
else
flags = 0;
if (flags & MSG_OOB)
return (soreceive_rcvoob(so, uio, flags));
if (mp0 != NULL)
*mp0 = NULL;
sb = &so->so_rcv;
/* Prevent other readers from entering the socket. */
error = sblock(sb, SBLOCKWAIT(flags));
if (error)
goto out;
SOCKBUF_LOCK(sb);
/* Easy one, no space to copyout anything. */
if (uio->uio_resid == 0) {
error = EINVAL;
goto out;
}
oresid = uio->uio_resid;
/* We will never ever get anything unless we are connected. */
if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) {
/* When disconnecting there may be still some data left. */
if (sb->sb_cc > 0)
goto deliver;
if (!(so->so_state & SS_ISDISCONNECTED))
error = ENOTCONN;
goto out;
}
/* Socket buffer is empty and we shall not block. */
if (sb->sb_cc == 0 &&
((sb->sb_flags & SS_NBIO) || (flags & (MSG_DONTWAIT|MSG_NBIO)))) {
error = EAGAIN;
goto out;
}
restart:
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
/* Abort if socket has reported problems. */
if (so->so_error) {
if (sb->sb_cc > 0)
goto deliver;
if (oresid > uio->uio_resid)
goto out;
error = so->so_error;
if (!(flags & MSG_PEEK))
so->so_error = 0;
goto out;
}
/* Door is closed. Deliver what is left, if any. */
if (sb->sb_state & SBS_CANTRCVMORE) {
if (sb->sb_cc > 0)
goto deliver;
else
goto out;
}
/* Socket buffer got some data that we shall deliver now. */
if (sb->sb_cc > 0 && !(flags & MSG_WAITALL) &&
((sb->sb_flags & SS_NBIO) ||
(flags & (MSG_DONTWAIT|MSG_NBIO)) ||
sb->sb_cc >= sb->sb_lowat ||
sb->sb_cc >= uio->uio_resid ||
sb->sb_cc >= sb->sb_hiwat) ) {
goto deliver;
}
/* On MSG_WAITALL we must wait until all data or error arrives. */
if ((flags & MSG_WAITALL) &&
(sb->sb_cc >= uio->uio_resid || sb->sb_cc >= sb->sb_lowat))
goto deliver;
/*
* Wait and block until (more) data comes in.
* NB: Drops the sockbuf lock during wait.
*/
error = sbwait(sb);
if (error)
goto out;
goto restart;
deliver:
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
KASSERT(sb->sb_cc > 0, ("%s: sockbuf empty", __func__));
KASSERT(sb->sb_mb != NULL, ("%s: sb_mb == NULL", __func__));
/* Statistics. */
if (uio->uio_td)
uio->uio_td->td_ru.ru_msgrcv++;
/* Fill uio until full or current end of socket buffer is reached. */
len = min(uio->uio_resid, sb->sb_cc);
if (mp0 != NULL) {
/* Dequeue as many mbufs as possible. */
if (!(flags & MSG_PEEK) && len >= sb->sb_mb->m_len) {
for (*mp0 = m = sb->sb_mb;
m != NULL && m->m_len <= len;
m = m->m_next) {
len -= m->m_len;
uio->uio_resid -= m->m_len;
sbfree(sb, m);
n = m;
}
sb->sb_mb = m;
if (sb->sb_mb == NULL)
SB_EMPTY_FIXUP(sb);
n->m_next = NULL;
}
/* Copy the remainder. */
if (len > 0) {
KASSERT(sb->sb_mb != NULL,
("%s: len > 0 && sb->sb_mb empty", __func__));
m = m_copym(sb->sb_mb, 0, len, M_DONTWAIT);
if (m == NULL)
len = 0; /* Don't flush data from sockbuf. */
else
uio->uio_resid -= m->m_len;
if (*mp0 != NULL)
n->m_next = m;
else
*mp0 = m;
if (*mp0 == NULL) {
error = ENOBUFS;
goto out;
}
}
} else {
/* NB: Must unlock socket buffer as uiomove may sleep. */
SOCKBUF_UNLOCK(sb);
error = m_mbuftouio(uio, sb->sb_mb, len);
SOCKBUF_LOCK(sb);
if (error)
goto out;
}
SBLASTRECORDCHK(sb);
SBLASTMBUFCHK(sb);
/*
* Remove the delivered data from the socket buffer unless we
* were only peeking.
*/
if (!(flags & MSG_PEEK)) {
if (len > 0)
sbdrop_locked(sb, len);
/* Notify protocol that we drained some data. */
if ((so->so_proto->pr_flags & PR_WANTRCVD) &&
(((flags & MSG_WAITALL) && uio->uio_resid > 0) ||
!(flags & MSG_SOCALLBCK))) {
SOCKBUF_UNLOCK(sb);
(*so->so_proto->pr_usrreqs->pru_rcvd)(so, flags);
SOCKBUF_LOCK(sb);
}
}
/*
* For MSG_WAITALL we may have to loop again and wait for
* more data to come in.
*/
if ((flags & MSG_WAITALL) && uio->uio_resid > 0)
goto restart;
out:
SOCKBUF_LOCK_ASSERT(sb);
SBLASTRECORDCHK(sb);
SBLASTMBUFCHK(sb);
SOCKBUF_UNLOCK(sb);
sbunlock(sb);
return (error);
}
/*
* Optimized version of soreceive() for simple datagram cases from userspace.
* Unlike in the stream case, we're able to drop a datagram if copyout()

View File

@ -1032,6 +1032,9 @@ struct pr_usrreqs tcp_usrreqs = {
.pru_send = tcp_usr_send,
.pru_shutdown = tcp_usr_shutdown,
.pru_sockaddr = in_getsockaddr,
#if 0
.pru_soreceive = soreceive_stream,
#endif
.pru_sosetlabel = in_pcbsosetlabel,
.pru_close = tcp_usr_close,
};
@ -1053,6 +1056,9 @@ struct pr_usrreqs tcp6_usrreqs = {
.pru_send = tcp_usr_send,
.pru_shutdown = tcp_usr_shutdown,
.pru_sockaddr = in6_mapped_sockaddr,
#if 0
.pru_soreceive = soreceive_stream,
#endif
.pru_sosetlabel = in_pcbsosetlabel,
.pru_close = tcp_usr_close,
};

View File

@ -345,6 +345,9 @@ int sopoll_generic(struct socket *so, int events,
struct ucred *active_cred, struct thread *td);
int soreceive(struct socket *so, struct sockaddr **paddr, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp);
int soreceive_stream(struct socket *so, struct sockaddr **paddr,
struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
int *flagsp);
int soreceive_dgram(struct socket *so, struct sockaddr **paddr,
struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
int *flagsp);