Integrate accept locking from rwatson_netperf, introducing a new

global mutex, accept_mtx, which serializes access to the following
fields across all sockets:

          so_qlen          so_incqlen         so_qstate
          so_comp          so_incomp          so_list
          so_head

While providing only coarse granularity, this approach avoids lock
order issues between sockets by avoiding ownership of the fields
by a specific socket and its per-socket mutexes.

While here, rewrite soclose(), sofree(), soaccept(), and
sonewconn() to add assertions, close additional races and  address
lock order concerns.  In particular:

- Reorganize the optimistic concurrency behavior in accept1() to
  always allocate a file descriptor with falloc() so that if we do
  find a socket, we don't have to encounter the "Oh, there wasn't
  a socket" race that can occur if falloc() sleeps in the current
  code, which broke inbound accept() ordering, not to mention
  requiring backing out socket state changes in a way that raced
  with the protocol level.  We may want to add a lockless read of
  the queue state if polling of empty queues proves to be important
  to optimize.

- In accept1(), soref() the socket while holding the accept lock
  so that the socket cannot be free'd in a race with the protocol
  layer.  Likewise in netgraph equivilents of the accept1() code.

- In sonewconn(), loop waiting for the queue to be small enough to
  insert our new socket once we've committed to inserting it, or
  races can occur that cause the incomplete socket queue to
  overfill.  In the previously implementation, it was sufficient
  to simply tested once since calling soabort() didn't release
  synchronization permitting another thread to insert a socket as
  we discard a previous one.

- In soclose()/sofree()/et al, it is the responsibility of the
  caller to remove a socket from the incomplete connection queue
  before calling soabort(), which prevents soabort() from having
  to walk into the accept socket to release the socket from its
  queue, and avoids races when releasing the accept mutex to enter
  soabort(), permitting soabort() to avoid lock ordering issues
  with the caller.

- Generally cluster accept queue related operations together
  throughout these functions in order to facilitate locking.

Annotate new locking in socketvar.h.
This commit is contained in:
Robert Watson 2004-06-02 04:15:39 +00:00
parent 33e1041767
commit 2658b3bb8e
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=129979
7 changed files with 197 additions and 126 deletions

View File

@ -113,32 +113,38 @@ void
soisconnected(so)
struct socket *so;
{
struct socket *head = so->so_head;
struct socket *head;
so->so_state &= ~(SS_ISCONNECTING|SS_ISDISCONNECTING|SS_ISCONFIRMING);
so->so_state |= SS_ISCONNECTED;
if (head && (so->so_qstate & SQ_INCOMP)) {
if ((so->so_options & SO_ACCEPTFILTER) != 0) {
so->so_upcall = head->so_accf->so_accept_filter->accf_callback;
ACCEPT_LOCK();
head = so->so_head;
if (head != NULL && (so->so_qstate & SQ_INCOMP)) {
if ((so->so_options & SO_ACCEPTFILTER) == 0) {
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
so->so_qstate &= ~SQ_INCOMP;
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
head->so_qlen++;
so->so_qstate |= SQ_COMP;
ACCEPT_UNLOCK();
sorwakeup(head);
wakeup_one(&head->so_timeo);
} else {
ACCEPT_UNLOCK();
so->so_upcall =
head->so_accf->so_accept_filter->accf_callback;
so->so_upcallarg = head->so_accf->so_accept_filter_arg;
so->so_rcv.sb_flags |= SB_UPCALL;
so->so_options &= ~SO_ACCEPTFILTER;
so->so_upcall(so, so->so_upcallarg, M_TRYWAIT);
return;
}
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
so->so_qstate &= ~SQ_INCOMP;
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
head->so_qlen++;
so->so_qstate |= SQ_COMP;
sorwakeup(head);
wakeup_one(&head->so_timeo);
} else {
wakeup(&so->so_timeo);
sorwakeup(so);
sowwakeup(so);
return;
}
ACCEPT_UNLOCK();
wakeup(&so->so_timeo);
sorwakeup(so);
sowwakeup(so);
}
void
@ -182,8 +188,12 @@ sonewconn(head, connstatus)
int connstatus;
{
register struct socket *so;
int over;
if (head->so_qlen > 3 * head->so_qlimit / 2)
ACCEPT_LOCK();
over = (head->so_qlen > 3 * head->so_qlimit / 2);
ACCEPT_UNLOCK();
if (over)
return ((struct socket *)0);
so = soalloc(M_NOWAIT);
if (so == NULL)
@ -206,25 +216,39 @@ sonewconn(head, connstatus)
sodealloc(so);
return ((struct socket *)0);
}
ACCEPT_LOCK();
if (connstatus) {
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
so->so_qstate |= SQ_COMP;
head->so_qlen++;
} else {
if (head->so_incqlen > head->so_qlimit) {
/*
* XXXRW: Keep removing sockets from the head until there's
* room for us to insert on the tail. In pre-locking
* revisions, this was a simple if(), but as we could be
* racing with other threads and soabort() requires dropping
* locks, we must loop waiting for the condition to be true.
*/
while (head->so_incqlen > head->so_qlimit) {
struct socket *sp;
sp = TAILQ_FIRST(&head->so_incomp);
TAILQ_REMOVE(&so->so_incomp, sp, so_list);
head->so_incqlen--;
sp->so_qstate &= ~SQ_INCOMP;
sp->so_head = NULL;
ACCEPT_UNLOCK();
(void) soabort(sp);
ACCEPT_LOCK();
}
TAILQ_INSERT_TAIL(&head->so_incomp, so, so_list);
so->so_qstate |= SQ_INCOMP;
head->so_incqlen++;
}
ACCEPT_UNLOCK();
if (connstatus) {
so->so_state |= connstatus;
sorwakeup(head);
wakeup_one(&head->so_timeo);
so->so_state |= connstatus;
}
return (so);
}

View File

@ -106,6 +106,9 @@ SYSCTL_INT(_kern_ipc_zero_copy, OID_AUTO, send, CTLFLAG_RW,
&so_zero_copy_send, 0, "Enable zero copy send");
#endif /* ZERO_COPY_SOCKETS */
struct mtx accept_mtx;
MTX_SYSINIT(accept_mtx, &accept_mtx, "accept", MTX_DEF);
/*
* Socket operation routines.
@ -266,11 +269,13 @@ solisten(so, backlog, td)
splx(s);
return (error);
}
ACCEPT_LOCK();
if (TAILQ_EMPTY(&so->so_comp))
so->so_options |= SO_ACCEPTCONN;
if (backlog < 0 || backlog > somaxconn)
backlog = somaxconn;
so->so_qlimit = backlog;
ACCEPT_UNLOCK();
splx(s);
return (0);
}
@ -286,25 +291,42 @@ sofree(so)
if (so->so_pcb != NULL || (so->so_state & SS_NOFDREF) == 0)
return;
if (so->so_head != NULL) {
head = so->so_head;
if (so->so_qstate & SQ_INCOMP) {
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
} else if (so->so_qstate & SQ_COMP) {
/*
* We must not decommission a socket that's
* on the accept(2) queue. If we do, then
* accept(2) may hang after select(2) indicated
* that the listening socket was ready.
*/
ACCEPT_LOCK();
head = so->so_head;
if (head != NULL) {
KASSERT((so->so_qstate & SQ_COMP) != 0 ||
(so->so_qstate & SQ_INCOMP) != 0,
("sofree: so_head != NULL, but neither SQ_COMP nor "
"SQ_INCOMP"));
KASSERT((so->so_qstate & SQ_COMP) == 0 ||
(so->so_qstate & SQ_INCOMP) == 0,
("sofree: so->so_qstate is SQ_COMP and also SQ_INCOMP"));
/*
* accept(2) is responsible draining the completed
* connection queue and freeing those sockets, so
* we just return here if this socket is currently
* on the completed connection queue. Otherwise,
* accept(2) may hang after select(2) has indicating
* that a listening socket was ready. If it's an
* incomplete connection, we remove it from the queue
* and free it; otherwise, it won't be released until
* the listening socket is closed.
*/
if ((so->so_qstate & SQ_COMP) != 0) {
ACCEPT_UNLOCK();
return;
} else {
panic("sofree: not queued");
}
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
so->so_qstate &= ~SQ_INCOMP;
so->so_head = NULL;
}
KASSERT((so->so_qstate & SQ_COMP) == 0 &&
(so->so_qstate & SQ_INCOMP) == 0,
("sofree: so_head == NULL, but still SQ_COMP(%d) or SQ_INCOMP(%d)",
so->so_qstate & SQ_COMP, so->so_qstate & SQ_INCOMP));
ACCEPT_UNLOCK();
so->so_snd.sb_flags |= SB_NOINTR;
(void)sblock(&so->so_snd, M_WAITOK);
s = splimp();
@ -334,22 +356,27 @@ soclose(so)
funsetown(&so->so_sigio);
if (so->so_options & SO_ACCEPTCONN) {
struct socket *sp, *sonext;
sp = TAILQ_FIRST(&so->so_incomp);
for (; sp != NULL; sp = sonext) {
sonext = TAILQ_NEXT(sp, so_list);
struct socket *sp;
ACCEPT_LOCK();
while ((sp = TAILQ_FIRST(&so->so_incomp)) != NULL) {
TAILQ_REMOVE(&so->so_incomp, sp, so_list);
so->so_incqlen--;
sp->so_qstate &= ~SQ_INCOMP;
sp->so_head = NULL;
ACCEPT_UNLOCK();
(void) soabort(sp);
ACCEPT_LOCK();
}
for (sp = TAILQ_FIRST(&so->so_comp); sp != NULL; sp = sonext) {
sonext = TAILQ_NEXT(sp, so_list);
/* Dequeue from so_comp since sofree() won't do it */
while ((sp = TAILQ_FIRST(&so->so_comp)) != NULL) {
TAILQ_REMOVE(&so->so_comp, sp, so_list);
so->so_qlen--;
sp->so_qstate &= ~SQ_COMP;
sp->so_head = NULL;
ACCEPT_UNLOCK();
(void) soabort(sp);
ACCEPT_LOCK();
}
ACCEPT_UNLOCK();
}
if (so->so_pcb == NULL)
goto discard;

View File

@ -113,32 +113,38 @@ void
soisconnected(so)
struct socket *so;
{
struct socket *head = so->so_head;
struct socket *head;
so->so_state &= ~(SS_ISCONNECTING|SS_ISDISCONNECTING|SS_ISCONFIRMING);
so->so_state |= SS_ISCONNECTED;
if (head && (so->so_qstate & SQ_INCOMP)) {
if ((so->so_options & SO_ACCEPTFILTER) != 0) {
so->so_upcall = head->so_accf->so_accept_filter->accf_callback;
ACCEPT_LOCK();
head = so->so_head;
if (head != NULL && (so->so_qstate & SQ_INCOMP)) {
if ((so->so_options & SO_ACCEPTFILTER) == 0) {
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
so->so_qstate &= ~SQ_INCOMP;
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
head->so_qlen++;
so->so_qstate |= SQ_COMP;
ACCEPT_UNLOCK();
sorwakeup(head);
wakeup_one(&head->so_timeo);
} else {
ACCEPT_UNLOCK();
so->so_upcall =
head->so_accf->so_accept_filter->accf_callback;
so->so_upcallarg = head->so_accf->so_accept_filter_arg;
so->so_rcv.sb_flags |= SB_UPCALL;
so->so_options &= ~SO_ACCEPTFILTER;
so->so_upcall(so, so->so_upcallarg, M_TRYWAIT);
return;
}
TAILQ_REMOVE(&head->so_incomp, so, so_list);
head->so_incqlen--;
so->so_qstate &= ~SQ_INCOMP;
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
head->so_qlen++;
so->so_qstate |= SQ_COMP;
sorwakeup(head);
wakeup_one(&head->so_timeo);
} else {
wakeup(&so->so_timeo);
sorwakeup(so);
sowwakeup(so);
return;
}
ACCEPT_UNLOCK();
wakeup(&so->so_timeo);
sorwakeup(so);
sowwakeup(so);
}
void
@ -182,8 +188,12 @@ sonewconn(head, connstatus)
int connstatus;
{
register struct socket *so;
int over;
if (head->so_qlen > 3 * head->so_qlimit / 2)
ACCEPT_LOCK();
over = (head->so_qlen > 3 * head->so_qlimit / 2);
ACCEPT_UNLOCK();
if (over)
return ((struct socket *)0);
so = soalloc(M_NOWAIT);
if (so == NULL)
@ -206,25 +216,39 @@ sonewconn(head, connstatus)
sodealloc(so);
return ((struct socket *)0);
}
ACCEPT_LOCK();
if (connstatus) {
TAILQ_INSERT_TAIL(&head->so_comp, so, so_list);
so->so_qstate |= SQ_COMP;
head->so_qlen++;
} else {
if (head->so_incqlen > head->so_qlimit) {
/*
* XXXRW: Keep removing sockets from the head until there's
* room for us to insert on the tail. In pre-locking
* revisions, this was a simple if(), but as we could be
* racing with other threads and soabort() requires dropping
* locks, we must loop waiting for the condition to be true.
*/
while (head->so_incqlen > head->so_qlimit) {
struct socket *sp;
sp = TAILQ_FIRST(&head->so_incomp);
TAILQ_REMOVE(&so->so_incomp, sp, so_list);
head->so_incqlen--;
sp->so_qstate &= ~SQ_INCOMP;
sp->so_head = NULL;
ACCEPT_UNLOCK();
(void) soabort(sp);
ACCEPT_LOCK();
}
TAILQ_INSERT_TAIL(&head->so_incomp, so, so_list);
so->so_qstate |= SQ_INCOMP;
head->so_incqlen++;
}
ACCEPT_UNLOCK();
if (connstatus) {
so->so_state |= connstatus;
sorwakeup(head);
wakeup_one(&head->so_timeo);
so->so_state |= connstatus;
}
return (so);
}

View File

@ -255,7 +255,7 @@ accept1(td, uap, compat)
struct file *nfp = NULL;
struct sockaddr *sa;
socklen_t namelen;
int error, s;
int error;
struct socket *head, *so;
int fd;
u_int fflag;
@ -266,84 +266,69 @@ accept1(td, uap, compat)
if (uap->name) {
error = copyin(uap->anamelen, &namelen, sizeof (namelen));
if(error)
goto done3;
if (namelen < 0) {
error = EINVAL;
goto done3;
}
return (error);
if (namelen < 0)
return (EINVAL);
}
NET_LOCK_GIANT();
error = fgetsock(td, uap->s, &head, &fflag);
if (error)
goto done2;
s = splnet();
if ((head->so_options & SO_ACCEPTCONN) == 0) {
splx(s);
error = EINVAL;
goto done;
}
error = falloc(td, &nfp, &fd);
if (error)
goto done;
ACCEPT_LOCK();
if ((head->so_state & SS_NBIO) && TAILQ_EMPTY(&head->so_comp)) {
ACCEPT_UNLOCK();
error = EWOULDBLOCK;
goto done;
}
while (TAILQ_EMPTY(&head->so_comp) && head->so_error == 0) {
if (head->so_state & SS_CANTRCVMORE) {
head->so_error = ECONNABORTED;
break;
}
if ((head->so_state & SS_NBIO) != 0) {
head->so_error = EWOULDBLOCK;
break;
}
error = tsleep(&head->so_timeo, PSOCK | PCATCH,
error = msleep(&head->so_timeo, &accept_mtx, PSOCK | PCATCH,
"accept", 0);
if (error) {
splx(s);
ACCEPT_UNLOCK();
goto done;
}
}
if (head->so_error) {
error = head->so_error;
head->so_error = 0;
splx(s);
ACCEPT_UNLOCK();
goto done;
}
/*
* At this point we know that there is at least one connection
* ready to be accepted. Remove it from the queue prior to
* allocating the file descriptor for it since falloc() may
* block allowing another process to accept the connection
* instead.
*/
so = TAILQ_FIRST(&head->so_comp);
KASSERT(!(so->so_qstate & SQ_INCOMP), ("accept1: so SQ_INCOMP"));
KASSERT(so->so_qstate & SQ_COMP, ("accept1: so not SQ_COMP"));
soref(so); /* file descriptor reference */
TAILQ_REMOVE(&head->so_comp, so, so_list);
head->so_qlen--;
so->so_qstate &= ~SQ_COMP;
so->so_head = NULL;
ACCEPT_UNLOCK();
error = falloc(td, &nfp, &fd);
if (error) {
/*
* Probably ran out of file descriptors. Put the
* unaccepted connection back onto the queue and
* do another wakeup so some other process might
* have a chance at it.
*/
TAILQ_INSERT_HEAD(&head->so_comp, so, so_list);
head->so_qlen++;
wakeup_one(&head->so_timeo);
splx(s);
goto done;
}
/* An extra reference on `nfp' has been held for us by falloc(). */
td->td_retval[0] = fd;
/* connection has been removed from the listen queue */
KNOTE(&head->so_rcv.sb_sel.si_note, 0);
so->so_qstate &= ~SQ_COMP;
so->so_head = NULL;
pgid = fgetown(&head->so_sigio);
if (pgid != 0)
fsetown(pgid, &so->so_sigio);
FILE_LOCK(nfp);
soref(so); /* file descriptor reference */
nfp->f_data = so; /* nfp has ref count from falloc */
nfp->f_flag = fflag;
nfp->f_ops = &socketops;
@ -372,7 +357,6 @@ accept1(td, uap, compat)
namelen = 0;
if (uap->name)
goto gotnoname;
splx(s);
error = 0;
goto done;
}
@ -410,7 +394,6 @@ accept1(td, uap, compat)
FILEDESC_UNLOCK(fdp);
}
}
splx(s);
/*
* Release explicitly held references before returning.
@ -421,7 +404,6 @@ accept1(td, uap, compat)
fputsock(head);
done2:
NET_UNLOCK_GIANT();
done3:
return (error);
}

View File

@ -1353,10 +1353,11 @@ ng_btsocket_rfcomm_session_accept(ng_btsocket_rfcomm_session_p s0)
return (error);
}
ACCEPT_LOCK();
if (TAILQ_EMPTY(&s0->l2so->so_comp)) {
ACCEPT_UNLOCK();
if (s0->l2so->so_state & SS_CANTRCVMORE)
return (ECONNABORTED);
return (EWOULDBLOCK);
}
@ -1367,11 +1368,11 @@ ng_btsocket_rfcomm_session_accept(ng_btsocket_rfcomm_session_p s0)
TAILQ_REMOVE(&s0->l2so->so_comp, l2so, so_list);
s0->l2so->so_qlen --;
soref(l2so);
l2so->so_qstate &= ~SQ_COMP;
l2so->so_state |= SS_NBIO;
l2so->so_head = NULL;
soref(l2so);
l2so->so_state |= SS_NBIO;
ACCEPT_UNLOCK();
error = soaccept(l2so, (struct sockaddr **) &l2sa);
if (error != 0) {

View File

@ -1169,6 +1169,7 @@ ng_ksocket_check_accept(priv_p priv)
head->so_error = 0;
return error;
}
/* Unlocked read. */
if (TAILQ_EMPTY(&head->so_comp)) {
if (head->so_state & SS_CANTRCVMORE)
return ECONNABORTED;
@ -1194,20 +1195,22 @@ ng_ksocket_finish_accept(priv_p priv)
int len;
int error;
ACCEPT_LOCK();
so = TAILQ_FIRST(&head->so_comp);
if (so == NULL) /* Should never happen */
if (so == NULL) { /* Should never happen */
ACCEPT_UNLOCK();
return;
}
TAILQ_REMOVE(&head->so_comp, so, so_list);
head->so_qlen--;
so->so_qstate &= ~SQ_COMP;
so->so_head = NULL;
soref(so);
so->so_state |= SS_NBIO;
ACCEPT_UNLOCK();
/* XXX KNOTE(&head->so_rcv.sb_sel.si_note, 0); */
soref(so);
so->so_qstate &= ~SQ_COMP;
so->so_state |= SS_NBIO;
so->so_head = NULL;
soaccept(so, &sa);
len = OFFSETOF(struct ng_ksocket_accept, addr);

View File

@ -60,7 +60,7 @@ struct socket {
short so_options; /* from socket call, see socket.h */
short so_linger; /* time to linger while closing */
short so_state; /* internal state flags SS_* */
int so_qstate; /* internal state flags SQ_* */
int so_qstate; /* (e) internal state flags SQ_* */
void *so_pcb; /* protocol control block */
struct protosw *so_proto; /* (a) protocol handle */
/*
@ -74,14 +74,14 @@ struct socket {
* We allow connections to queue up based on current queue lengths
* and limit on number of queued connections for this socket.
*/
struct socket *so_head; /* back pointer to accept socket */
TAILQ_HEAD(, socket) so_incomp; /* queue of partial unaccepted connections */
TAILQ_HEAD(, socket) so_comp; /* queue of complete unaccepted connections */
TAILQ_ENTRY(socket) so_list; /* list of unaccepted connections */
short so_qlen; /* number of unaccepted connections */
short so_incqlen; /* number of unaccepted incomplete
struct socket *so_head; /* (e) back pointer to accept socket */
TAILQ_HEAD(, socket) so_incomp; /* (e) queue of partial unaccepted connections */
TAILQ_HEAD(, socket) so_comp; /* (e) queue of complete unaccepted connections */
TAILQ_ENTRY(socket) so_list; /* (e) list of unaccepted connections */
short so_qlen; /* (e) number of unaccepted connections */
short so_incqlen; /* (e) number of unaccepted incomplete
connections */
short so_qlimit; /* max number queued connections */
short so_qlimit; /* (e) max number queued connections */
short so_timeo; /* connection timeout */
u_short so_error; /* error affecting connection */
struct sigio *so_sigio; /* [sg] information for async I/O or
@ -140,6 +140,16 @@ struct socket {
} \
} while (/*CONSTCOND*/0)
/*
* Global accept mutex to serialize access to accept queues and
* fields associated with multiple sockets. This allows us to
* avoid defining a lock order between listen and accept sockets
* until such time as it proves to be a good idea.
*/
extern struct mtx accept_mtx;
#define ACCEPT_LOCK() mtx_lock(&accept_mtx)
#define ACCEPT_UNLOCK() mtx_unlock(&accept_mtx)
/*
* Socket state bits.
*/