First rev at making pipe(2) pipe's MPsafe.

Both ends of the pipe share a pool_mutex, this makes allocation
and deadlock avoidance easy.

Remove some un-needed FILE_LOCK ops while I'm here.

There are some issues wrt to select and the f{s,g}etown code that
we'll have to deal with, I think we may also need to move the calls
to vfs_timestamp outside of the sections covered by PIPE_LOCK.
This commit is contained in:
Alfred Perlstein 2002-02-27 07:35:59 +00:00
parent df38f87be1
commit f81b04d96c
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=91362
2 changed files with 76 additions and 23 deletions

View File

@ -114,6 +114,17 @@ static struct filterops pipe_rfiltops =
static struct filterops pipe_wfiltops =
{ 1, NULL, filt_pipedetach, filt_pipewrite };
#define PIPE_GET_GIANT(pipe) \
do { \
PIPE_UNLOCK(wpipe); \
mtx_lock(&Giant); \
} while (0)
#define PIPE_DROP_GIANT(pipe) \
do { \
mtx_unlock(&Giant); \
PIPE_LOCK(wpipe); \
} while (0)
/*
* Default pipe buffer size(s), this can be kind-of large now because pipe
@ -176,7 +187,8 @@ pipe(td, uap)
struct file *rf, *wf;
struct pipe *rpipe, *wpipe;
int fd, error;
/* XXX: SYSINIT this! */
if (pipe_zone == NULL)
pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
@ -234,6 +246,7 @@ pipe(td, uap)
td->td_retval[1] = fd;
rpipe->pipe_peer = wpipe;
wpipe->pipe_peer = rpipe;
rpipe->pipe_mtxp = wpipe->pipe_mtxp = mtx_pool_alloc();
fdrop(rf, td);
return (0);
@ -354,14 +367,16 @@ pipelock(cpipe, catch)
{
int error;
while (cpipe->pipe_state & PIPE_LOCK) {
PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
while (cpipe->pipe_state & PIPE_LOCKFL) {
cpipe->pipe_state |= PIPE_LWANT;
error = tsleep(cpipe, catch ? (PRIBIO | PCATCH) : PRIBIO,
error = msleep(cpipe, PIPE_MTX(cpipe),
catch ? (PRIBIO | PCATCH) : PRIBIO,
"pipelk", 0);
if (error != 0)
return (error);
}
cpipe->pipe_state |= PIPE_LOCK;
cpipe->pipe_state |= PIPE_LOCKFL;
return (0);
}
@ -373,7 +388,8 @@ pipeunlock(cpipe)
struct pipe *cpipe;
{
cpipe->pipe_state &= ~PIPE_LOCK;
PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
cpipe->pipe_state &= ~PIPE_LOCKFL;
if (cpipe->pipe_state & PIPE_LWANT) {
cpipe->pipe_state &= ~PIPE_LWANT;
wakeup(cpipe);
@ -408,6 +424,7 @@ pipe_read(fp, uio, cred, flags, td)
int nread = 0;
u_int size;
PIPE_LOCK(rpipe);
++rpipe->pipe_busy;
error = pipelock(rpipe, 1);
if (error)
@ -424,8 +441,10 @@ pipe_read(fp, uio, cred, flags, td)
if (size > (u_int) uio->uio_resid)
size = (u_int) uio->uio_resid;
PIPE_UNLOCK(rpipe);
error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
size, uio);
PIPE_LOCK(rpipe);
if (error)
break;
@ -457,7 +476,9 @@ pipe_read(fp, uio, cred, flags, td)
va = (caddr_t) rpipe->pipe_map.kva +
rpipe->pipe_map.pos;
PIPE_UNLOCK(rpipe);
error = uiomove(va, size, uio);
PIPE_LOCK(rpipe);
if (error)
break;
nread += size;
@ -501,14 +522,12 @@ pipe_read(fp, uio, cred, flags, td)
* Handle non-blocking mode operation or
* wait for more data.
*/
FILE_LOCK(fp);
if (fp->f_flag & FNONBLOCK) {
FILE_UNLOCK(fp);
error = EAGAIN;
} else {
FILE_UNLOCK(fp);
rpipe->pipe_state |= PIPE_WANTR;
if ((error = tsleep(rpipe, PRIBIO | PCATCH,
if ((error = msleep(rpipe, PIPE_MTX(rpipe),
PRIBIO | PCATCH,
"piperd", 0)) == 0)
error = pipelock(rpipe, 1);
}
@ -518,6 +537,7 @@ pipe_read(fp, uio, cred, flags, td)
}
pipeunlock(rpipe);
/* XXX: should probably do this before getting any locks. */
if (error == 0)
vfs_timestamp(&rpipe->pipe_atime);
unlocked_error:
@ -542,6 +562,7 @@ pipe_read(fp, uio, cred, flags, td)
if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
pipeselwakeup(rpipe);
PIPE_UNLOCK(rpipe);
return (error);
}
@ -658,6 +679,7 @@ pipe_clone_write_buffer(wpipe)
int size;
int pos;
PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
size = wpipe->pipe_map.cnt;
pos = wpipe->pipe_map.pos;
bcopy((caddr_t) wpipe->pipe_map.kva + pos,
@ -686,13 +708,15 @@ pipe_direct_write(wpipe, uio)
int error;
retry:
PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
while (wpipe->pipe_state & PIPE_DIRECTW) {
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
}
wpipe->pipe_state |= PIPE_WANTW;
error = tsleep(wpipe, PRIBIO | PCATCH, "pipdww", 0);
error = msleep(wpipe, PIPE_MTX(wpipe),
PRIBIO | PCATCH, "pipdww", 0);
if (error)
goto error1;
if (wpipe->pipe_state & PIPE_EOF) {
@ -708,7 +732,8 @@ pipe_direct_write(wpipe, uio)
}
wpipe->pipe_state |= PIPE_WANTW;
error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwc", 0);
error = msleep(wpipe, PIPE_MTX(wpipe),
PRIBIO | PCATCH, "pipdwc", 0);
if (error)
goto error1;
if (wpipe->pipe_state & PIPE_EOF) {
@ -720,7 +745,9 @@ pipe_direct_write(wpipe, uio)
wpipe->pipe_state |= PIPE_DIRECTW;
PIPE_GET_GIANT(wpipe);
error = pipe_build_write_buffer(wpipe, uio);
PIPE_DROP_GIANT(wpipe);
if (error) {
wpipe->pipe_state &= ~PIPE_DIRECTW;
goto error1;
@ -730,7 +757,9 @@ pipe_direct_write(wpipe, uio)
while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
if (wpipe->pipe_state & PIPE_EOF) {
pipelock(wpipe, 0);
PIPE_GET_GIANT(wpipe);
pipe_destroy_write_buffer(wpipe);
PIPE_DROP_GIANT(wpipe);
pipeunlock(wpipe);
pipeselwakeup(wpipe);
error = EPIPE;
@ -741,7 +770,8 @@ pipe_direct_write(wpipe, uio)
wakeup(wpipe);
}
pipeselwakeup(wpipe);
error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwt", 0);
error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
"pipdwt", 0);
}
pipelock(wpipe,0);
@ -778,10 +808,12 @@ pipe_write(fp, uio, cred, flags, td)
rpipe = (struct pipe *) fp->f_data;
wpipe = rpipe->pipe_peer;
PIPE_LOCK(wpipe);
/*
* detect loss of pipe read side, issue SIGPIPE if lost.
*/
if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
PIPE_UNLOCK(wpipe);
return (EPIPE);
}
++wpipe->pipe_busy;
@ -797,8 +829,10 @@ pipe_write(fp, uio, cred, flags, td)
(wpipe->pipe_buffer.cnt == 0)) {
if ((error = pipelock(wpipe,1)) == 0) {
PIPE_GET_GIANT(wpipe);
if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
nbigpipe++;
PIPE_DROP_GIANT(wpipe);
pipeunlock(wpipe);
}
}
@ -814,6 +848,7 @@ pipe_write(fp, uio, cred, flags, td)
wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
wakeup(wpipe);
}
PIPE_UNLOCK(wpipe);
return(error);
}
@ -834,18 +869,15 @@ pipe_write(fp, uio, cred, flags, td)
* The direct write mechanism will detect the reader going
* away on us.
*/
FILE_LOCK(fp);
if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
(fp->f_flag & FNONBLOCK) == 0 &&
(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
FILE_UNLOCK(fp);
error = pipe_direct_write( wpipe, uio);
if (error)
break;
continue;
} else
FILE_UNLOCK(fp);
}
#endif
/*
@ -861,7 +893,8 @@ pipe_write(fp, uio, cred, flags, td)
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
}
error = tsleep(wpipe, PRIBIO | PCATCH, "pipbww", 0);
error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
"pipbww", 0);
if (wpipe->pipe_state & PIPE_EOF)
break;
if (error)
@ -926,8 +959,10 @@ pipe_write(fp, uio, cred, flags, td)
/* Transfer first segment */
PIPE_UNLOCK(wpipe);
error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
segsize, uio);
PIPE_LOCK(wpipe);
if (error == 0 && segsize < size) {
/*
@ -939,8 +974,10 @@ pipe_write(fp, uio, cred, flags, td)
wpipe->pipe_buffer.size)
panic("Expected pipe buffer wraparound disappeared");
PIPE_UNLOCK(wpipe);
error = uiomove(&wpipe->pipe_buffer.buffer[0],
size - segsize, uio);
PIPE_LOCK(wpipe);
}
if (error == 0) {
wpipe->pipe_buffer.in += size;
@ -973,13 +1010,10 @@ pipe_write(fp, uio, cred, flags, td)
/*
* don't block on non-blocking I/O
*/
FILE_LOCK(fp);
if (fp->f_flag & FNONBLOCK) {
FILE_UNLOCK(fp);
error = EAGAIN;
break;
}
FILE_UNLOCK(fp);
/*
* We have no more space and have something to offer,
@ -988,7 +1022,8 @@ pipe_write(fp, uio, cred, flags, td)
pipeselwakeup(wpipe);
wpipe->pipe_state |= PIPE_WANTW;
error = tsleep(wpipe, PRIBIO | PCATCH, "pipewr", 0);
error = msleep(wpipe, PIPE_MTX(wpipe),
PRIBIO | PCATCH, "pipewr", 0);
if (error != 0)
break;
/*
@ -1037,6 +1072,7 @@ pipe_write(fp, uio, cred, flags, td)
if (wpipe->pipe_buffer.cnt)
pipeselwakeup(wpipe);
PIPE_UNLOCK(wpipe);
return (error);
}
@ -1058,18 +1094,22 @@ pipe_ioctl(fp, cmd, data, td)
return (0);
case FIOASYNC:
PIPE_LOCK(mpipe);
if (*(int *)data) {
mpipe->pipe_state |= PIPE_ASYNC;
} else {
mpipe->pipe_state &= ~PIPE_ASYNC;
}
PIPE_UNLOCK(mpipe);
return (0);
case FIONREAD:
PIPE_LOCK(mpipe);
if (mpipe->pipe_state & PIPE_DIRECTW)
*(int *)data = mpipe->pipe_map.cnt;
else
*(int *)data = mpipe->pipe_buffer.cnt;
PIPE_UNLOCK(mpipe);
return (0);
case FIOSETOWN:
@ -1104,6 +1144,7 @@ pipe_poll(fp, events, cred, td)
int revents = 0;
wpipe = rpipe->pipe_peer;
PIPE_LOCK(rpipe);
if (events & (POLLIN | POLLRDNORM))
if ((rpipe->pipe_state & PIPE_DIRECTW) ||
(rpipe->pipe_buffer.cnt > 0) ||
@ -1132,6 +1173,7 @@ pipe_poll(fp, events, cred, td)
wpipe->pipe_state |= PIPE_SEL;
}
}
PIPE_UNLOCK(rpipe);
return (revents);
}
@ -1215,6 +1257,7 @@ pipeclose(cpipe)
struct pipe *ppipe;
if (cpipe) {
PIPE_LOCK(cpipe);
pipeselwakeup(cpipe);
@ -1225,7 +1268,7 @@ pipeclose(cpipe)
while (cpipe->pipe_busy) {
wakeup(cpipe);
cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
tsleep(cpipe, PRIBIO, "pipecl", 0);
msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
}
/*
@ -1242,8 +1285,11 @@ pipeclose(cpipe)
/*
* free resources
*/
PIPE_UNLOCK(cpipe);
mtx_lock(&Giant);
pipe_free_kmem(cpipe);
zfree(pipe_zone, cpipe);
mtx_unlock(&Giant);
}
}

View File

@ -86,7 +86,7 @@ struct pipemapping {
#define PIPE_WANT 0x020 /* Pipe is wanted to be run-down. */
#define PIPE_SEL 0x040 /* Pipe has a select active. */
#define PIPE_EOF 0x080 /* Pipe is in EOF condition. */
#define PIPE_LOCK 0x100 /* Process has exclusive access to pointers/data. */
#define PIPE_LOCKFL 0x100 /* Process has exclusive access to pointers/data. */
#define PIPE_LWANT 0x200 /* Process wants exclusive access to pointers/data. */
#define PIPE_DIRECTW 0x400 /* Pipe direct write active. */
#define PIPE_DIRECTOK 0x800 /* Direct mode ok. */
@ -106,6 +106,13 @@ struct pipe {
struct pipe *pipe_peer; /* link with other direction */
u_int pipe_state; /* pipe status info */
int pipe_busy; /* busy flag, mostly to handle rundown sanely */
struct mtx *pipe_mtxp; /* shared mutex between both pipes */
};
#define PIPE_MTX(pipe) (pipe)->pipe_mtxp
#define PIPE_LOCK(pipe) mtx_lock(PIPE_MTX(pipe))
#define PIPE_UNLOCK(pipe) mtx_unlock(PIPE_MTX(pipe))
#define PIPE_LOCK_ASSERT(pipe, type) mtx_assert(PIPE_MTX(pipe), (type))
#endif /* !_SYS_PIPE_H_ */