Add soreceive_dgram(9), an optimized socket receive function for use by

datagram-only protocols, such as UDP.  This version removes use of
sblock(), which is not required due to an inability to interlace data
improperly with datagrams, as well as avoiding some of the larger loops
and state management that don't apply on datagram sockets.

This is experimental code, so hook it up only for UDPv4 for testing; if
there are problems we may need to revise it or turn it off by default,
but it offers *significant* performance improvements for threaded UDP
applications such as BIND9, nsd, and memcached using UDP.

Tested by:	kris, ps
This commit is contained in:
Robert Watson 2008-07-02 23:23:27 +00:00
parent 0a2fe17365
commit 5df3e83946
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=180198
3 changed files with 238 additions and 0 deletions

View File

@ -1845,6 +1845,240 @@ soreceive_generic(struct socket *so, struct sockaddr **psa, struct uio *uio,
return (error);
}
/*
* Optimized version of soreceive() for simple datagram cases from userspace;
* this is experimental, and while heavily tested, may contain errors.
*/
int
soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
{
struct mbuf *m, *m2;
int flags, len, error, offset;
struct protosw *pr = so->so_proto;
struct mbuf *nextrecord;
int orig_resid = uio->uio_resid;
if (psa != NULL)
*psa = NULL;
if (controlp != NULL)
*controlp = NULL;
if (flagsp != NULL)
flags = *flagsp &~ MSG_EOR;
else
flags = 0;
/*
* For any complicated cases, fall back to the full
* soreceive_generic().
*/
if (mp0 != NULL || (flags & MSG_PEEK) || (flags & MSG_OOB))
return (soreceive_generic(so, psa, uio, mp0, controlp,
flagsp));
/*
* Enforce restrictions on use.
*/
KASSERT((pr->pr_flags & PR_WANTRCVD) == 0,
("soreceive_dgram: wantrcvd"));
KASSERT(pr->pr_flags & PR_ATOMIC, ("soreceive_dgram: !atomic"));
KASSERT((so->so_rcv.sb_state & SBS_RCVATMARK) == 0,
("soreceive_dgram: SBS_RCVATMARK"));
KASSERT((so->so_proto->pr_flags & PR_CONNREQUIRED) == 0,
("soreceive_dgram: P_CONNREQUIRED"));
restart:
SOCKBUF_LOCK(&so->so_rcv);
m = so->so_rcv.sb_mb;
/*
* If we have less data than requested, block awaiting more (subject
* to any timeout) if:
* 1. the current count is less than the low water mark, or
* 2. MSG_WAITALL is set, and it is possible to do the entire
* receive operation at once if we block (resid <= hiwat).
* 3. MSG_DONTWAIT is not set
* If MSG_WAITALL is set but resid is larger than the receive buffer,
* we have to do the receive in sections, and thus risk returning a
* short count if a timeout or signal occurs after we start.
*/
if (m == NULL) {
KASSERT(m != NULL || !so->so_rcv.sb_cc,
("receive: m == %p so->so_rcv.sb_cc == %u",
m, so->so_rcv.sb_cc));
if (so->so_error) {
if (m != NULL)
goto dontblock;
error = so->so_error;
so->so_error = 0;
SOCKBUF_UNLOCK(&so->so_rcv);
return (error);
}
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (so->so_rcv.sb_state & SBS_CANTRCVMORE) {
if (m == NULL) {
SOCKBUF_UNLOCK(&so->so_rcv);
return (0);
} else
goto dontblock;
}
if (uio->uio_resid == 0) {
SOCKBUF_UNLOCK(&so->so_rcv);
return (0);
}
if ((so->so_state & SS_NBIO) ||
(flags & (MSG_DONTWAIT|MSG_NBIO))) {
SOCKBUF_UNLOCK(&so->so_rcv);
error = EWOULDBLOCK;
return (error);
}
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
/* XXXRW: sbwait() may not be as happy without sblock(). */
error = sbwait(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
if (error)
return (error);
goto restart;
}
dontblock:
/*
* From this point onward, we maintain 'nextrecord' as a cache of the
* pointer to the next record in the socket buffer. We must keep the
* various socket buffer pointers and local stack versions of the
* pointers in sync, pushing out modifications before dropping the
* socket buffer mutex, and re-reading them when picking it up.
*
* Otherwise, we will race with the network stack appending new data
* or records onto the socket buffer by using inconsistent/stale
* versions of the field, possibly resulting in socket buffer
* corruption.
*
* By holding the high-level sblock(), we prevent simultaneous
* readers from pulling off the front of the socket buffer.
*/
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (uio->uio_td)
uio->uio_td->td_ru.ru_msgrcv++;
KASSERT(m == so->so_rcv.sb_mb, ("soreceive: m != so->so_rcv.sb_mb"));
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
nextrecord = m->m_nextpkt;
if (pr->pr_flags & PR_ADDR) {
KASSERT(m->m_type == MT_SONAME,
("m->m_type == %d", m->m_type));
orig_resid = 0;
if (psa != NULL)
*psa = sodupsockaddr(mtod(m, struct sockaddr *),
M_NOWAIT);
sbfree(&so->so_rcv, m);
so->so_rcv.sb_mb = m_free(m);
m = so->so_rcv.sb_mb;
sockbuf_pushsync(&so->so_rcv, nextrecord);
}
if (m == NULL) {
/* XXXRW: Can this happen? */
SOCKBUF_UNLOCK(&so->so_rcv);
return (0);
}
KASSERT(m->m_nextpkt == nextrecord,
("soreceive: post-control, nextrecord !sync"));
if (nextrecord == NULL) {
KASSERT(so->so_rcv.sb_mb == m,
("soreceive: post-control, sb_mb!=m"));
KASSERT(so->so_rcv.sb_lastrecord == m,
("soreceive: post-control, lastrecord!=m"));
}
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
KASSERT(m == so->so_rcv.sb_mb, ("soreceive_dgram: m not sb_mb"));
KASSERT(so->so_rcv.sb_mb->m_nextpkt == nextrecord,
("soreceive_dgram: m_nextpkt != nextrecord"));
/*
* Pull 'm' and its chain off the front of the packet queue.
*/
so->so_rcv.sb_mb = NULL;
sockbuf_pushsync(&so->so_rcv, nextrecord);
/*
* Walk 'm's chain and free that many bytes from the socket buffer.
*/
for (m2 = m; m2 != NULL; m2 = m2->m_next)
sbfree(&so->so_rcv, m2);
/*
* Do a few last checks before we let go of the lock.
*/
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
/*
* Packet to copyout() is now in 'm' and it is disconnected from the
* queue.
*
* Process one or more MT_CONTROL mbufs present before any data mbufs
* in the first mbuf chain on the socket buffer. If MSG_PEEK, we
* just copy the data; if !MSG_PEEK, we call into the protocol to
* perform externalization (or freeing if controlp == NULL).
*/
if (m->m_type == MT_CONTROL) {
struct mbuf *cm = NULL, *cmn;
struct mbuf **cme = &cm;
do {
m2 = m->m_next;
m->m_next = NULL;
*cme = m;
cme = &(*cme)->m_next;
m = m2;
} while (m != NULL && m->m_type == MT_CONTROL);
while (cm != NULL) {
cmn = cm->m_next;
cm->m_next = NULL;
if (pr->pr_domain->dom_externalize != NULL) {
error = (*pr->pr_domain->dom_externalize)
(cm, controlp);
} else if (controlp != NULL)
*controlp = cm;
else
m_freem(cm);
if (controlp != NULL) {
orig_resid = 0;
while (*controlp != NULL)
controlp = &(*controlp)->m_next;
}
cm = cmn;
}
orig_resid = 0; /* XXXRW: why this? */
}
KASSERT(m->m_type == MT_DATA, ("soreceive_dgram: !data"));
offset = 0;
while (m != NULL && uio->uio_resid > 0) {
len = uio->uio_resid;
if (len > m->m_len)
len = m->m_len;
error = uiomove(mtod(m, char *), (int)len, uio);
if (error) {
m_freem(m);
return (error);
}
m = m_free(m);
}
if (m != NULL && pr->pr_flags & PR_ATOMIC)
flags |= MSG_TRUNC;
m_freem(m);
if (flagsp != NULL)
*flagsp |= flags;
return (0);
}
int
soreceive(struct socket *so, struct sockaddr **psa, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp)

View File

@ -1164,6 +1164,7 @@ struct pr_usrreqs udp_usrreqs = {
.pru_disconnect = udp_disconnect,
.pru_peeraddr = in_getpeeraddr,
.pru_send = udp_send,
.pru_soreceive = soreceive_dgram,
.pru_sosend = sosend_dgram,
.pru_shutdown = udp_shutdown,
.pru_sockaddr = in_getsockaddr,

View File

@ -548,6 +548,9 @@ int sopoll_generic(struct socket *so, int events,
struct ucred *active_cred, struct thread *td);
int soreceive(struct socket *so, struct sockaddr **paddr, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp);
int soreceive_dgram(struct socket *so, struct sockaddr **paddr,
struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
int *flagsp);
int soreceive_generic(struct socket *so, struct sockaddr **paddr,
struct uio *uio, struct mbuf **mp0, struct mbuf **controlp,
int *flagsp);