sblock() implements a sleep lock by interlocking SB_WANT and SB_LOCK flags

on each socket buffer with the socket buffer's mutex.  This sleep lock is
used to serialize I/O on sockets in order to prevent I/O interlacing.

This change replaces the custom sleep lock with an sx(9) lock, which
results in marginally better performance, better handling of contention
during simultaneous socket I/O across multiple threads, and a cleaner
separation between the different layers of locking in socket buffers.
Specifically, the socket buffer mutex is now solely responsible for
serializing simultaneous operation on the socket buffer data structure,
and not for I/O serialization.

While here, fix two historic bugs:

(1) a bug allowing I/O to be occasionally interlaced during long I/O
    operations (discovere by Isilon).

(2) a bug in which failed non-blocking acquisition of the socket buffer
    I/O serialization lock might be ignored (discovered by sam).

SCTP portion of this patch submitted by rrs.
This commit is contained in:
Robert Watson 2007-05-03 14:42:42 +00:00
parent 61f31ed6cf
commit 7abab91135
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=169236
6 changed files with 98 additions and 129 deletions

View File

@ -340,14 +340,6 @@ db_print_sbflags(short sb_flags)
int comma;
comma = 0;
if (sb_flags & SB_LOCK) {
db_printf("%sSB_LOCK", comma ? ", " : "");
comma = 1;
}
if (sb_flags & SB_WANT) {
db_printf("%sSB_WANT", comma ? ", " : "");
comma = 1;
}
if (sb_flags & SB_WAIT) {
db_printf("%sSB_WAIT", comma ? ", " : "");
comma = 1;

View File

@ -46,6 +46,7 @@ __FBSDID("$FreeBSD$");
#include <sys/signalvar.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
/*
@ -132,27 +133,25 @@ sbwait(struct sockbuf *sb)
sb->sb_timeo));
}
/*
* Lock a sockbuf already known to be locked; return any error returned from
* sleep (EINTR).
*/
int
sb_lock(struct sockbuf *sb)
sblock(struct sockbuf *sb, int flags)
{
int error;
SOCKBUF_LOCK_ASSERT(sb);
while (sb->sb_flags & SB_LOCK) {
sb->sb_flags |= SB_WANT;
error = msleep(&sb->sb_flags, &sb->sb_mtx,
(sb->sb_flags & SB_NOINTR) ? PSOCK : PSOCK|PCATCH,
"sblock", 0);
if (error)
return (error);
if (flags == M_WAITOK) {
sx_xlock(&sb->sb_sx);
return (0);
} else {
if (sx_try_xlock(&sb->sb_sx) == 0)
return (EWOULDBLOCK);
return (0);
}
sb->sb_flags |= SB_LOCK;
return (0);
}
void
sbunlock(struct sockbuf *sb)
{
sx_xunlock(&sb->sb_sx);
}
/*
@ -797,8 +796,6 @@ static void
sbflush_internal(struct sockbuf *sb)
{
if (sb->sb_flags & SB_LOCK)
panic("sbflush_internal: locked");
while (sb->sb_mbcnt) {
/*
* Don't call sbdrop(sb, 0) if the leading mbuf is non-empty:

View File

@ -2,7 +2,7 @@
* Copyright (c) 1982, 1986, 1988, 1990, 1993
* The Regents of the University of California.
* Copyright (c) 2004 The FreeBSD Foundation
* Copyright (c) 2004-2006 Robert N. M. Watson
* Copyright (c) 2004-2007 Robert N. M. Watson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -124,6 +124,7 @@ __FBSDID("$FreeBSD$");
#include <sys/resourcevar.h>
#include <sys/signalvar.h>
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
#include <sys/uio.h>
#include <sys/jail.h>
@ -273,6 +274,8 @@ soalloc(void)
#endif
SOCKBUF_LOCK_INIT(&so->so_snd, "so_snd");
SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv");
sx_init(&so->so_snd.sb_sx, "so_snd_sx");
sx_init(&so->so_rcv.sb_sx, "so_rcv_sx");
TAILQ_INIT(&so->so_aiojobq);
mtx_lock(&so_global_mtx);
so->so_gencnt = ++so_gencnt;
@ -312,6 +315,8 @@ sodealloc(struct socket *so)
mac_destroy_socket(so);
#endif
crfree(so->so_cred);
sx_destroy(&so->so_snd.sb_sx);
sx_destroy(&so->so_rcv.sb_sx);
SOCKBUF_LOCK_DESTROY(&so->so_snd);
SOCKBUF_LOCK_DESTROY(&so->so_rcv);
uma_zfree(socket_zone, so);
@ -624,11 +629,8 @@ sofree(so)
*
* Notice that the socket buffer and kqueue state are torn down
* before calling pru_detach. This means that protocols shold not
* assume they can perform socket wakeups, etc, in their detach
* code.
* assume they can perform socket wakeups, etc, in their detach code.
*/
KASSERT((so->so_snd.sb_flags & SB_LOCK) == 0, ("sofree: snd sblock"));
KASSERT((so->so_rcv.sb_flags & SB_LOCK) == 0, ("sofree: rcv sblock"));
sbdestroy(&so->so_snd, so);
sbdestroy(&so->so_rcv, so);
knlist_destroy(&so->so_rcv.sb_sel.si_note);
@ -1123,7 +1125,6 @@ sosend_dgram(so, addr, uio, top, control, flags, td)
* counts if EINTR/ERESTART are returned. Data and control buffers are freed
* on return.
*/
#define snderr(errno) { error = (errno); goto release; }
int
sosend_generic(so, addr, uio, top, control, flags, td)
struct socket *so;
@ -1165,19 +1166,22 @@ sosend_generic(so, addr, uio, top, control, flags, td)
if (control != NULL)
clen = control->m_len;
SOCKBUF_LOCK(&so->so_snd);
restart:
SOCKBUF_LOCK_ASSERT(&so->so_snd);
error = sblock(&so->so_snd, SBLOCKWAIT(flags));
if (error)
goto out_locked;
goto out;
restart:
do {
SOCKBUF_LOCK_ASSERT(&so->so_snd);
if (so->so_snd.sb_state & SBS_CANTSENDMORE)
snderr(EPIPE);
SOCKBUF_LOCK(&so->so_snd);
if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
SOCKBUF_UNLOCK(&so->so_snd);
error = EPIPE;
goto release;
}
if (so->so_error) {
error = so->so_error;
so->so_error = 0;
SOCKBUF_UNLOCK(&so->so_snd);
goto release;
}
if ((so->so_state & SS_ISCONNECTED) == 0) {
@ -1190,26 +1194,40 @@ sosend_generic(so, addr, uio, top, control, flags, td)
if ((so->so_proto->pr_flags & PR_CONNREQUIRED) &&
(so->so_proto->pr_flags & PR_IMPLOPCL) == 0) {
if ((so->so_state & SS_ISCONFIRMING) == 0 &&
!(resid == 0 && clen != 0))
snderr(ENOTCONN);
} else if (addr == NULL)
snderr(so->so_proto->pr_flags & PR_CONNREQUIRED ?
ENOTCONN : EDESTADDRREQ);
!(resid == 0 && clen != 0)) {
SOCKBUF_UNLOCK(&so->so_snd);
error = ENOTCONN;
goto release;
}
} else if (addr == NULL) {
SOCKBUF_UNLOCK(&so->so_snd);
if (so->so_proto->pr_flags & PR_CONNREQUIRED)
error = ENOTCONN;
else
error = EDESTADDRREQ;
goto release;
}
}
space = sbspace(&so->so_snd);
if (flags & MSG_OOB)
space += 1024;
if ((atomic && resid > so->so_snd.sb_hiwat) ||
clen > so->so_snd.sb_hiwat)
snderr(EMSGSIZE);
clen > so->so_snd.sb_hiwat) {
SOCKBUF_UNLOCK(&so->so_snd);
error = EMSGSIZE;
goto release;
}
if (space < resid + clen &&
(atomic || space < so->so_snd.sb_lowat || space < clen)) {
if ((so->so_state & SS_NBIO) || (flags & MSG_NBIO))
snderr(EWOULDBLOCK);
sbunlock(&so->so_snd);
if ((so->so_state & SS_NBIO) || (flags & MSG_NBIO)) {
SOCKBUF_UNLOCK(&so->so_snd);
error = EWOULDBLOCK;
goto release;
}
error = sbwait(&so->so_snd);
if (error)
goto out_locked;
goto release;
SOCKBUF_UNLOCK(&so->so_snd);
goto restart;
}
SOCKBUF_UNLOCK(&so->so_snd);
@ -1223,10 +1241,8 @@ sosend_generic(so, addr, uio, top, control, flags, td)
#ifdef ZERO_COPY_SOCKETS
error = sosend_copyin(uio, &top, atomic,
&space, flags);
if (error != 0) {
SOCKBUF_LOCK(&so->so_snd);
if (error != 0)
goto release;
}
#else
/*
* Copy the data from userland into a mbuf
@ -1238,7 +1254,6 @@ sosend_generic(so, addr, uio, top, control, flags, td)
(atomic ? M_PKTHDR : 0) |
((flags & MSG_EOR) ? M_EOR : 0));
if (top == NULL) {
SOCKBUF_LOCK(&so->so_snd);
error = EFAULT; /* only possible error */
goto release;
}
@ -1283,20 +1298,13 @@ sosend_generic(so, addr, uio, top, control, flags, td)
clen = 0;
control = NULL;
top = NULL;
if (error) {
SOCKBUF_LOCK(&so->so_snd);
if (error)
goto release;
}
} while (resid && space > 0);
SOCKBUF_LOCK(&so->so_snd);
} while (resid);
release:
SOCKBUF_LOCK_ASSERT(&so->so_snd);
sbunlock(&so->so_snd);
out_locked:
SOCKBUF_LOCK_ASSERT(&so->so_snd);
SOCKBUF_UNLOCK(&so->so_snd);
out:
if (top != NULL)
m_freem(top);
@ -1304,7 +1312,6 @@ sosend_generic(so, addr, uio, top, control, flags, td)
m_freem(control);
return (error);
}
#undef snderr
int
sosend(so, addr, uio, top, control, flags, td)
@ -1462,13 +1469,12 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
&& uio->uio_resid)
(*pr->pr_usrreqs->pru_rcvd)(so, 0);
SOCKBUF_LOCK(&so->so_rcv);
restart:
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
error = sblock(&so->so_rcv, SBLOCKWAIT(flags));
if (error)
goto out;
return (error);
restart:
SOCKBUF_LOCK(&so->so_rcv);
m = so->so_rcv.sb_mb;
/*
* If we have less data than requested, block awaiting more (subject
@ -1495,14 +1501,16 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
error = so->so_error;
if ((flags & MSG_PEEK) == 0)
so->so_error = 0;
SOCKBUF_UNLOCK(&so->so_rcv);
goto release;
}
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (so->so_rcv.sb_state & SBS_CANTRCVMORE) {
if (m)
goto dontblock;
else
if (m == NULL) {
SOCKBUF_UNLOCK(&so->so_rcv);
goto release;
} else
goto dontblock;
}
for (; m != NULL; m = m->m_next)
if (m->m_type == MT_OOBDATA || (m->m_flags & M_EOR)) {
@ -1511,22 +1519,26 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
}
if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0 &&
(so->so_proto->pr_flags & PR_CONNREQUIRED)) {
SOCKBUF_UNLOCK(&so->so_rcv);
error = ENOTCONN;
goto release;
}
if (uio->uio_resid == 0)
if (uio->uio_resid == 0) {
SOCKBUF_UNLOCK(&so->so_rcv);
goto release;
}
if ((so->so_state & SS_NBIO) ||
(flags & (MSG_DONTWAIT|MSG_NBIO))) {
SOCKBUF_UNLOCK(&so->so_rcv);
error = EWOULDBLOCK;
goto release;
}
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
sbunlock(&so->so_rcv);
error = sbwait(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
if (error)
goto out;
goto release;
goto restart;
}
dontblock:
@ -1721,6 +1733,7 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
if (m && pr->pr_flags & PR_ATOMIC &&
((flags & MSG_PEEK) == 0))
(void)sbdroprecord_locked(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
goto release;
}
} else
@ -1822,8 +1835,10 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
error = sbwait(&so->so_rcv);
if (error)
if (error) {
SOCKBUF_UNLOCK(&so->so_rcv);
goto release;
}
m = so->so_rcv.sb_mb;
if (m != NULL)
nextrecord = m->m_nextpkt;
@ -1867,18 +1882,15 @@ soreceive_generic(so, psa, uio, mp0, controlp, flagsp)
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (orig_resid == uio->uio_resid && orig_resid &&
(flags & MSG_EOR) == 0 && (so->so_rcv.sb_state & SBS_CANTRCVMORE) == 0) {
sbunlock(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
goto restart;
}
SOCKBUF_UNLOCK(&so->so_rcv);
if (flagsp != NULL)
*flagsp |= flags;
release:
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
sbunlock(&so->so_rcv);
out:
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
SOCKBUF_UNLOCK(&so->so_rcv);
return (error);
}
@ -1933,27 +1945,22 @@ sorflush(so)
* however, we have to initialize and destroy the mutex in the copy
* so that dom_dispose() and sbrelease() can lock t as needed.
*/
(void) sblock(sb, M_WAITOK);
SOCKBUF_LOCK(sb);
sb->sb_flags |= SB_NOINTR;
(void) sblock(sb, M_WAITOK);
/*
* socantrcvmore_locked() drops the socket buffer mutex so that it
* can safely perform wakeups. Re-acquire the mutex before
* continuing.
*/
socantrcvmore_locked(so);
SOCKBUF_LOCK(sb);
sbunlock(sb);
/*
* Invalidate/clear most of the sockbuf structure, but leave selinfo
* and mutex data unchanged.
*/
SOCKBUF_LOCK(sb);
bzero(&asb, offsetof(struct sockbuf, sb_startzero));
bcopy(&sb->sb_startzero, &asb.sb_startzero,
sizeof(*sb) - offsetof(struct sockbuf, sb_startzero));
bzero(&sb->sb_startzero,
sizeof(*sb) - offsetof(struct sockbuf, sb_startzero));
SOCKBUF_UNLOCK(sb);
sbunlock(sb);
SOCKBUF_LOCK_INIT(&asb, "so_rcv");
if (pr->pr_flags & PR_RIGHTS && pr->pr_domain->dom_dispose != NULL)

View File

@ -1920,9 +1920,7 @@ kern_sendfile(struct thread *td, struct sendfile_args *uap,
}
/* Protect against multiple writers to the socket. */
SOCKBUF_LOCK(&so->so_snd);
(void) sblock(&so->so_snd, M_WAITOK);
SOCKBUF_UNLOCK(&so->so_snd);
/*
* Loop through the pages of the file, starting with the requested
@ -2196,9 +2194,7 @@ kern_sendfile(struct thread *td, struct sendfile_args *uap,
}
done:
SOCKBUF_LOCK(&so->so_snd);
sbunlock(&so->so_snd);
SOCKBUF_UNLOCK(&so->so_snd);
out:
/*
* If there was no error we have to clear td->td_retval[0]

View File

@ -3862,9 +3862,7 @@ sctp_pull_off_control_to_new_inp(struct sctp_inpcb *old_inp,
old_so = old_inp->sctp_socket;
new_so = new_inp->sctp_socket;
TAILQ_INIT(&tmp_queue);
SOCKBUF_LOCK(&(old_so->so_rcv));
error = sblock(&old_so->so_rcv, waitflags);
SOCKBUF_UNLOCK(&(old_so->so_rcv));
if (error) {
/*
* Gak, can't get sblock, we have a problem. data will be
@ -3903,10 +3901,8 @@ sctp_pull_off_control_to_new_inp(struct sctp_inpcb *old_inp,
}
SCTP_INP_READ_UNLOCK(old_inp);
/* Remove the sb-lock on the old socket */
SOCKBUF_LOCK(&(old_so->so_rcv));
sbunlock(&old_so->so_rcv);
SOCKBUF_UNLOCK(&(old_so->so_rcv));
/* Now we move them over to the new socket buffer */
control = TAILQ_FIRST(&tmp_queue);
SCTP_INP_READ_LOCK(new_inp);
@ -4521,6 +4517,7 @@ sctp_sorecvmsg(struct socket *so,
int alen = 0;
int slen = 0;
int held_length = 0;
int sockbuf_lock = 0;
if (msg_flags) {
in_flags = *msg_flags;
@ -4558,8 +4555,6 @@ sctp_sorecvmsg(struct socket *so,
sctp_misc_ints(SCTP_SORECV_ENTER,
rwnd_req, in_eeor_mode, so->so_rcv.sb_cc, uio->uio_resid);
#endif
SOCKBUF_LOCK(&so->so_rcv);
hold_sblock = 1;
#ifdef SCTP_RECV_RWND_LOGGING
sctp_misc_ints(SCTP_SORECV_ENTERPL,
rwnd_req, block_allowed, so->so_rcv.sb_cc, uio->uio_resid);
@ -4567,15 +4562,12 @@ sctp_sorecvmsg(struct socket *so,
error = sblock(&so->so_rcv, (block_allowed ? M_WAITOK : 0));
sockbuf_lock = 1;
if (error) {
goto release_unlocked;
}
restart:
if (hold_sblock == 0) {
SOCKBUF_LOCK(&so->so_rcv);
hold_sblock = 1;
}
sbunlock(&so->so_rcv);
restart_nosblocks:
if (hold_sblock == 0) {
@ -4704,7 +4696,6 @@ sctp_sorecvmsg(struct socket *so,
SOCKBUF_UNLOCK(&so->so_rcv);
hold_sblock = 0;
}
error = sblock(&so->so_rcv, (block_allowed ? M_WAITOK : 0));
/* we possibly have data we can read */
control = TAILQ_FIRST(&inp->read_queue);
if (control == NULL) {
@ -5529,11 +5520,12 @@ sctp_sorecvmsg(struct socket *so,
SCTP_INP_READ_UNLOCK(inp);
hold_rlock = 0;
}
if (hold_sblock == 0) {
SOCKBUF_LOCK(&so->so_rcv);
hold_sblock = 1;
if (hold_sblock == 1) {
SOCKBUF_UNLOCK(&so->so_rcv);
hold_sblock = 0;
}
sbunlock(&so->so_rcv);
sockbuf_lock = 0;
release_unlocked:
if (hold_sblock) {
@ -5566,6 +5558,9 @@ sctp_sorecvmsg(struct socket *so,
SOCKBUF_UNLOCK(&so->so_rcv);
hold_sblock = 0;
}
if (sockbuf_lock) {
sbunlock(&so->so_rcv);
}
if (freecnt_applied) {
/*
* The lock on the socket buffer protects us so the free

View File

@ -37,6 +37,7 @@
#include <sys/selinfo.h> /* for struct selinfo */
#include <sys/_lock.h>
#include <sys/_mutex.h>
#include <sys/_sx.h>
/*
* Kernel structure per socket.
@ -97,6 +98,7 @@ struct socket {
struct sockbuf {
struct selinfo sb_sel; /* process selecting read/write */
struct mtx sb_mtx; /* sockbuf lock */
struct sx sb_sx; /* prevent I/O interlacing */
short sb_state; /* (c/d) socket state on sockbuf */
#define sb_startzero sb_mb
struct mbuf *sb_mb; /* (c/d) the mbuf chain */
@ -121,8 +123,6 @@ struct socket {
/*
* Constants for sb_flags field of struct sockbuf.
*/
#define SB_LOCK 0x01 /* lock on data queue */
#define SB_WANT 0x02 /* someone is waiting to lock */
#define SB_WAIT 0x04 /* someone is waiting for data/space */
#define SB_SEL 0x08 /* someone is selecting */
#define SB_ASYNC 0x10 /* ASYNC I/O, need signals */
@ -331,25 +331,6 @@ struct xsocket {
(sb)->sb_sndptroff -= (m)->m_len; \
}
/*
* Set lock on sockbuf sb; sleep if lock is already held.
* Unless SB_NOINTR is set on sockbuf, sleep is interruptible.
* Returns error without lock if sleep is interrupted.
*/
#define sblock(sb, wf) ((sb)->sb_flags & SB_LOCK ? \
(((wf) == M_WAITOK) ? sb_lock(sb) : EWOULDBLOCK) : \
((sb)->sb_flags |= SB_LOCK), 0)
/* release lock on sockbuf sb */
#define sbunlock(sb) do { \
SOCKBUF_LOCK_ASSERT(sb); \
(sb)->sb_flags &= ~SB_LOCK; \
if ((sb)->sb_flags & SB_WANT) { \
(sb)->sb_flags &= ~SB_WANT; \
wakeup(&(sb)->sb_flags); \
} \
} while (0)
/*
* soref()/sorele() ref-count the socket structure. Note that you must
* still explicitly close the socket, but the last ref count will free
@ -503,7 +484,8 @@ struct mbuf *
sbsndptr(struct sockbuf *sb, u_int off, u_int len, u_int *moff);
void sbtoxsockbuf(struct sockbuf *sb, struct xsockbuf *xsb);
int sbwait(struct sockbuf *sb);
int sb_lock(struct sockbuf *sb);
int sblock(struct sockbuf *sb, int flags);
void sbunlock(struct sockbuf *sb);
void soabort(struct socket *so);
int soaccept(struct socket *so, struct sockaddr **nam);
int socheckuid(struct socket *so, uid_t uid);