Standardize pipe locking, ensuring that everything is locked via

pipelock(), not via a mixture of mutexes and pipelock().  Additionally,
add a few KASSERTS, and change some statements that should have been
KASSERTS into KASSERTS.

As a result of these cleanups, some segments of code have become
significantly shorter and/or easier to read.
This commit is contained in:
Mike Silbersack 2004-08-03 02:59:15 +00:00
parent 7926672ea5
commit e10ecdea88

View File

@ -58,6 +58,16 @@
* Memory usage may be monitored through the sysctls
* kern.ipc.pipes, kern.ipc.pipekva and kern.ipc.pipekvawired.
*
*
* Locking rules: There are two locks present here: A mutex, used via
* PIPE_LOCK, and a flag, used via pipelock(). All locking is done via
* the flag, as mutexes can not persist over uiomove. The mutex
* exists only to guard access to the flag, and is not in itself a
* locking mechanism.
*
* As pipelock() may have to sleep before it can acquire the flag, it
* is important to reread all data after a call to pipelock(); everything
* in the structure may have changed.
*/
#include <sys/cdefs.h>
@ -436,11 +446,8 @@ pipespace(cpipe, size)
int size;
{
/*
* XXXRW: Seems like we should really assert PIPE_LOCKFL on the
* pipe_state here.
*/
KASSERT(cpipe->pipe_state & PIPE_LOCKFL,
("Unlocked pipe passed to pipespace"));
return (pipespace_new(cpipe, size));
}
@ -476,6 +483,8 @@ pipeunlock(cpipe)
{
PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
KASSERT(cpipe->pipe_state & PIPE_LOCKFL,
("Unlocked pipe passed to pipeunlock"));
cpipe->pipe_state &= ~PIPE_LOCKFL;
if (cpipe->pipe_state & PIPE_LWANT) {
cpipe->pipe_state &= ~PIPE_LWANT;
@ -512,9 +521,9 @@ pipe_create(pipe)
* Reduce to 1/4th pipe size if we're over our global max.
*/
if (amountpipekva > maxpipekva / 2)
error = pipespace(pipe, SMALL_PIPE_SIZE);
error = pipespace_new(pipe, SMALL_PIPE_SIZE);
else
error = pipespace(pipe, PIPE_SIZE);
error = pipespace_new(pipe, PIPE_SIZE);
return (error);
}
@ -819,20 +828,26 @@ pipe_direct_write(wpipe, uio)
retry:
PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
error = pipelock(wpipe, 1);
if (wpipe->pipe_state & PIPE_EOF)
error = EPIPE;
if (error) {
pipeunlock(wpipe);
goto error1;
}
while (wpipe->pipe_state & PIPE_DIRECTW) {
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
}
wpipe->pipe_state |= PIPE_WANTW;
pipeunlock(wpipe);
error = msleep(wpipe, PIPE_MTX(wpipe),
PRIBIO | PCATCH, "pipdww", 0);
if (error)
goto error1;
if (wpipe->pipe_state & PIPE_EOF) {
error = EPIPE;
goto error1;
}
else
goto retry;
}
wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
if (wpipe->pipe_buffer.cnt > 0) {
@ -840,39 +855,30 @@ retry:
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
}
wpipe->pipe_state |= PIPE_WANTW;
pipeunlock(wpipe);
error = msleep(wpipe, PIPE_MTX(wpipe),
PRIBIO | PCATCH, "pipdwc", 0);
if (error)
goto error1;
if (wpipe->pipe_state & PIPE_EOF) {
error = EPIPE;
goto error1;
}
goto retry;
else
goto retry;
}
wpipe->pipe_state |= PIPE_DIRECTW;
pipelock(wpipe, 0);
if (wpipe->pipe_state & PIPE_EOF) {
error = EPIPE;
goto error2;
}
PIPE_UNLOCK(wpipe);
error = pipe_build_write_buffer(wpipe, uio);
PIPE_LOCK(wpipe);
pipeunlock(wpipe);
if (error) {
wpipe->pipe_state &= ~PIPE_DIRECTW;
pipeunlock(wpipe);
goto error1;
}
error = 0;
while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
if (wpipe->pipe_state & PIPE_EOF) {
pipelock(wpipe, 0);
pipe_destroy_write_buffer(wpipe);
pipeselwakeup(wpipe);
pipeunlock(wpipe);
@ -884,11 +890,12 @@ retry:
wakeup(wpipe);
}
pipeselwakeup(wpipe);
pipeunlock(wpipe);
error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
"pipdwt", 0);
pipelock(wpipe, 0);
}
pipelock(wpipe,0);
if (wpipe->pipe_state & PIPE_EOF)
error = EPIPE;
if (wpipe->pipe_state & PIPE_DIRECTW) {
@ -900,7 +907,6 @@ retry:
} else {
pipe_destroy_write_buffer(wpipe);
}
error2:
pipeunlock(wpipe);
return (error);
@ -926,16 +932,23 @@ pipe_write(fp, uio, active_cred, flags, td)
wpipe = rpipe->pipe_peer;
PIPE_LOCK(rpipe);
error = pipelock(wpipe, 1);
if (error) {
PIPE_UNLOCK(rpipe);
return (error);
}
/*
* detect loss of pipe read side, issue SIGPIPE if lost.
*/
if ((!wpipe->pipe_present) || (wpipe->pipe_state & PIPE_EOF)) {
pipeunlock(wpipe);
PIPE_UNLOCK(rpipe);
return (EPIPE);
}
#ifdef MAC
error = mac_check_pipe_write(active_cred, wpipe->pipe_pair);
if (error) {
pipeunlock(wpipe);
PIPE_UNLOCK(rpipe);
return (error);
}
@ -953,39 +966,25 @@ pipe_write(fp, uio, active_cred, flags, td)
(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
(wpipe->pipe_buffer.cnt == 0)) {
if ((error = pipelock(wpipe, 1)) == 0) {
if (wpipe->pipe_state & PIPE_EOF)
error = EPIPE;
else {
PIPE_UNLOCK(wpipe);
if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
atomic_add_int(&nbigpipe, 1);
PIPE_LOCK(wpipe);
}
pipeunlock(wpipe);
}
PIPE_UNLOCK(wpipe);
if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
atomic_add_int(&nbigpipe, 1);
PIPE_LOCK(wpipe);
}
/*
* If an early error occured unbusy and return, waking up any pending
* readers.
*/
if (error) {
--wpipe->pipe_busy;
if ((wpipe->pipe_busy == 0) &&
(wpipe->pipe_state & PIPE_WANT)) {
wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
wakeup(wpipe);
}
PIPE_UNLOCK(rpipe);
return(error);
}
pipeunlock(wpipe);
orig_resid = uio->uio_resid;
while (uio->uio_resid) {
int space;
pipelock(wpipe, 0);
if (wpipe->pipe_state & PIPE_EOF) {
pipeunlock(wpipe);
error = EPIPE;
break;
}
#ifndef PIPE_NODIRECT
/*
* If the transfer is large, we can gain performance if
@ -998,6 +997,7 @@ pipe_write(fp, uio, active_cred, flags, td)
*/
if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
(fp->f_flag & FNONBLOCK) == 0) {
pipeunlock(wpipe);
error = pipe_direct_write(wpipe, uio);
if (error)
break;
@ -1012,20 +1012,18 @@ pipe_write(fp, uio, active_cred, flags, td)
* pipe buffer. We break out if a signal occurs or the
* reader goes away.
*/
retrywrite:
while (wpipe->pipe_state & PIPE_DIRECTW) {
if (wpipe->pipe_state & PIPE_DIRECTW) {
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
}
pipeunlock(wpipe);
error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
"pipbww", 0);
if (wpipe->pipe_state & PIPE_EOF) {
error = EPIPE;
break;
}
if (error)
break;
else
continue;
}
space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
@ -1035,102 +1033,69 @@ pipe_write(fp, uio, active_cred, flags, td)
space = 0;
if (space > 0) {
if ((error = pipelock(wpipe,1)) == 0) {
int size; /* Transfer size */
int segsize; /* first segment to transfer */
int size; /* Transfer size */
int segsize; /* first segment to transfer */
/*
* It is possible for a direct write/EOF to
* slip in on us... handle them here...
*/
if (wpipe->pipe_state & PIPE_EOF)
goto lost_wpipe;
if (wpipe->pipe_state & PIPE_DIRECTW) {
pipeunlock(wpipe);
goto retrywrite;
}
/*
* If a process blocked in uiomove, our
* value for space might be bad.
*
* XXX will we be ok if the reader has gone
* away here?
*/
if (space > wpipe->pipe_buffer.size -
wpipe->pipe_buffer.cnt) {
pipeunlock(wpipe);
goto retrywrite;
}
/*
* Transfer size is minimum of uio transfer
* and free space in pipe buffer.
*/
if (space > uio->uio_resid)
size = uio->uio_resid;
else
size = space;
/*
* First segment to transfer is minimum of
* transfer size and contiguous space in
* pipe buffer. If first segment to transfer
* is less than the transfer size, we've got
* a wraparound in the buffer.
*/
segsize = wpipe->pipe_buffer.size -
wpipe->pipe_buffer.in;
if (segsize > size)
segsize = size;
/*
* Transfer size is minimum of uio transfer
* and free space in pipe buffer.
*/
if (space > uio->uio_resid)
size = uio->uio_resid;
else
size = space;
/*
* First segment to transfer is minimum of
* transfer size and contiguous space in
* pipe buffer. If first segment to transfer
* is less than the transfer size, we've got
* a wraparound in the buffer.
*/
segsize = wpipe->pipe_buffer.size -
wpipe->pipe_buffer.in;
if (segsize > size)
segsize = size;
/* Transfer first segment */
/* Transfer first segment */
PIPE_UNLOCK(rpipe);
error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
segsize, uio);
PIPE_LOCK(rpipe);
if (error == 0 && segsize < size) {
KASSERT(wpipe->pipe_buffer.in + segsize ==
wpipe->pipe_buffer.size,
("Pipe buffer wraparound disappeared"));
/*
* Transfer remaining part now, to
* support atomic writes. Wraparound
* happened.
*/
PIPE_UNLOCK(rpipe);
error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
segsize, uio);
error = uiomove(
&wpipe->pipe_buffer.buffer[0],
size - segsize, uio);
PIPE_LOCK(rpipe);
if (error == 0 && segsize < size) {
/*
* Transfer remaining part now, to
* support atomic writes. Wraparound
* happened.
*/
if (wpipe->pipe_buffer.in + segsize !=
wpipe->pipe_buffer.size)
panic("Expected pipe buffer "
"wraparound disappeared");
PIPE_UNLOCK(rpipe);
error = uiomove(
&wpipe->pipe_buffer.buffer[0],
size - segsize, uio);
PIPE_LOCK(rpipe);
}
if (error == 0) {
wpipe->pipe_buffer.in += size;
if (wpipe->pipe_buffer.in >=
wpipe->pipe_buffer.size) {
if (wpipe->pipe_buffer.in !=
size - segsize +
wpipe->pipe_buffer.size)
panic("Expected "
"wraparound bad");
wpipe->pipe_buffer.in = size -
segsize;
}
wpipe->pipe_buffer.cnt += size;
if (wpipe->pipe_buffer.cnt >
wpipe->pipe_buffer.size)
panic("Pipe buffer overflow");
}
lost_wpipe:
pipeunlock(wpipe);
}
if (error)
break;
if (error == 0) {
wpipe->pipe_buffer.in += size;
if (wpipe->pipe_buffer.in >=
wpipe->pipe_buffer.size) {
KASSERT(wpipe->pipe_buffer.in ==
size - segsize +
wpipe->pipe_buffer.size,
("Expected wraparound bad"));
wpipe->pipe_buffer.in = size - segsize;
}
wpipe->pipe_buffer.cnt += size;
KASSERT(wpipe->pipe_buffer.cnt <=
wpipe->pipe_buffer.size,
("Pipe buffer overflow"));
}
pipeunlock(wpipe);
} else {
/*
* If the "read-side" has been blocked, wake it up now.
@ -1145,6 +1110,7 @@ lost_wpipe:
*/
if (fp->f_flag & FNONBLOCK) {
error = EAGAIN;
pipeunlock(wpipe);
break;
}
@ -1155,21 +1121,15 @@ lost_wpipe:
pipeselwakeup(wpipe);
wpipe->pipe_state |= PIPE_WANTW;
pipeunlock(wpipe);
error = msleep(wpipe, PIPE_MTX(rpipe),
PRIBIO | PCATCH, "pipewr", 0);
if (error != 0)
break;
/*
* If read side wants to go away, we just issue a signal
* to ourselves.
*/
if (wpipe->pipe_state & PIPE_EOF) {
error = EPIPE;
break;
}
}
}
pipelock(wpipe, 0);
--wpipe->pipe_busy;
if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
@ -1205,6 +1165,7 @@ lost_wpipe:
if (wpipe->pipe_buffer.cnt)
pipeselwakeup(wpipe);
pipeunlock(wpipe);
PIPE_UNLOCK(rpipe);
return (error);
}
@ -1435,6 +1396,7 @@ pipeclose(cpipe)
KASSERT(cpipe != NULL, ("pipeclose: cpipe == NULL"));
PIPE_LOCK(cpipe);
pipelock(cpipe, 0);
pp = cpipe->pipe_pair;
pipeselwakeup(cpipe);
@ -1447,7 +1409,9 @@ pipeclose(cpipe)
while (cpipe->pipe_busy) {
wakeup(cpipe);
cpipe->pipe_state |= PIPE_WANT;
pipeunlock(cpipe);
msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
pipelock(cpipe, 0);
}
@ -1469,7 +1433,6 @@ pipeclose(cpipe)
* doing that, or the pipe might disappear out from under
* us.
*/
pipelock(cpipe, 0);
PIPE_UNLOCK(cpipe);
pipe_free_kmem(cpipe);
PIPE_LOCK(cpipe);