Add locking annotation and comments about socket, pipe, fifo problem.

Temporarily fix a locking problem for socket I/O.
This commit is contained in:
David Xu 2006-01-24 07:24:24 +00:00
parent fc3c1bc471
commit 1aa4c324ee
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=154765

View File

@ -184,24 +184,50 @@ typedef struct oaiocb {
struct __aiocb_private _aiocb_private;
} oaiocb_t;
/*
* Below is a key of locks used to protect each member of struct aiocblist
* aioliojob and kaioinfo and any backends.
*
* * - need not protected
* a - locked by proc mtx
* b - locked by backend lock, the backend lock can be null in some cases,
* for example, BIO belongs to this type, in this case, proc lock is
* reused.
* c - locked by aio_job_mtx, the lock for the generic file I/O backend.
*/
/*
* Current, there is only two backends: BIO and generic file I/O.
* socket I/O is served by generic file I/O, this is not a good idea, since
* disk file I/O and any other types without O_NONBLOCK flag can block daemon
* threads, if there is no thread to serve socket I/O, the socket I/O will be
* delayed too long or starved, we should create some threads dedicated to
* sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
* systems we really need non-blocking interface, fiddling O_NONBLOCK in file
* structure is not safe because there is race between userland and aio
* daemons.
*/
struct aiocblist {
TAILQ_ENTRY(aiocblist) list; /* List of jobs */
TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */
TAILQ_ENTRY(aiocblist) allist;
int jobflags;
int jobstate;
int inputcharge;
int outputcharge;
struct buf *bp; /* Buffer pointer */
struct proc *userproc; /* User process */
struct ucred *cred; /* Active credential when created */
struct file *fd_file; /* Pointer to file structure */
struct aioliojob *lio; /* Optional lio job */
struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */
struct knlist klist; /* list of knotes */
struct aiocb uaiocb; /* Kernel I/O control block */
ksiginfo_t ksi; /* Realtime signal info */
struct task biotask;
TAILQ_ENTRY(aiocblist) list; /* (b) internal list of for backend */
TAILQ_ENTRY(aiocblist) plist; /* (a) list of jobs for each backend */
TAILQ_ENTRY(aiocblist) allist; /* (a) list of all jobs in proc */
int jobflags; /* (a) job flags */
int jobstate; /* (b) job state */
int inputcharge; /* (*) input blockes */
int outputcharge; /* (*) output blockes */
struct buf *bp; /* (*) private to BIO backend,
* buffer pointer
*/
struct proc *userproc; /* (*) user process */
struct ucred *cred; /* (*) active credential when created */
struct file *fd_file; /* (*) pointer to file structure */
struct aioliojob *lio; /* (*) optional lio job */
struct aiocb *uuaiocb; /* (*) pointer in userspace of aiocb */
struct knlist klist; /* (a) list of knotes */
struct aiocb uaiocb; /* (*) kernel I/O control block */
ksiginfo_t ksi; /* (a) realtime signal info */
struct task biotask; /* (*) private to BIO backend */
};
/* jobflags */
@ -215,22 +241,22 @@ struct aiocblist {
#define AIOP_FREE 0x1 /* proc on free queue */
struct aiothreadlist {
int aiothreadflags; /* AIO proc flags */
TAILQ_ENTRY(aiothreadlist) list; /* List of processes */
struct thread *aiothread; /* The AIO thread */
int aiothreadflags; /* (c) AIO proc flags */
TAILQ_ENTRY(aiothreadlist) list; /* (c) list of processes */
struct thread *aiothread; /* (*) the AIO thread */
};
/*
* data-structure for lio signal management
*/
struct aioliojob {
int lioj_flags;
int lioj_count;
int lioj_finished_count;
struct sigevent lioj_signal; /* signal on all I/O done */
TAILQ_ENTRY(aioliojob) lioj_list;
struct knlist klist; /* list of knotes */
ksiginfo_t lioj_ksi; /* Realtime signal info */
int lioj_flags; /* (a) listio flags */
int lioj_count; /* (a) listio flags */
int lioj_finished_count; /* (a) listio flags */
struct sigevent lioj_signal; /* (a) signal on all I/O done */
TAILQ_ENTRY(aioliojob) lioj_list; /* (a) lio list */
struct knlist klist; /* (a) list of knotes */
ksiginfo_t lioj_ksi; /* (a) Realtime signal info */
};
#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
@ -241,29 +267,31 @@ struct aioliojob {
* per process aio data structure
*/
struct kaioinfo {
int kaio_flags; /* per process kaio flags */
int kaio_maxactive_count; /* maximum number of AIOs */
int kaio_active_count; /* number of currently used AIOs */
int kaio_qallowed_count; /* maxiumu size of AIO queue */
int kaio_count; /* size of AIO queue */
int kaio_ballowed_count; /* maximum number of buffers */
int kaio_buffer_count; /* number of physio buffers */
TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */
TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */
TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */
TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */
TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */
int kaio_flags; /* (a) per process kaio flags */
int kaio_maxactive_count; /* (*) maximum number of AIOs */
int kaio_active_count; /* (c) number of currently used AIOs */
int kaio_qallowed_count; /* (*) maxiumu size of AIO queue */
int kaio_count; /* (a) size of AIO queue */
int kaio_ballowed_count; /* (*) maximum number of buffers */
int kaio_buffer_count; /* (a) number of physio buffers */
TAILQ_HEAD(,aiocblist) kaio_all; /* (a) all AIOs in the process */
TAILQ_HEAD(,aiocblist) kaio_done; /* (a) done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* (a) job queue for process */
TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* (a) buffer job queue for process */
TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* (a) queue for aios waiting on sockets,
* not used yet.
*/
};
#define KAIO_RUNDOWN 0x1 /* process is being run down */
#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */
static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* (c) Idle daemons */
static struct sema aio_newproc_sem;
static struct mtx aio_job_mtx;
static struct mtx aio_sock_mtx;
static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
static TAILQ_HEAD(,aiocblist) aio_jobs; /* (c) Async job list */
static struct unrhdr *aiod_unr;
static void aio_init_aioinfo(struct proc *p);
@ -587,6 +615,7 @@ aio_proc_rundown(void *arg, struct proc *p)
struct aiocblist *cbe, *cbn;
struct file *fp;
struct socket *so;
int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@ -603,35 +632,30 @@ aio_proc_rundown(void *arg, struct proc *p)
* Try to cancel all pending requests. This code simulates
* aio_cancel on all pending I/O requests.
*/
while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) {
fp = cbe->fd_file;
so = fp->f_data;
mtx_lock(&aio_sock_mtx);
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
mtx_unlock(&aio_sock_mtx);
TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist);
cbe->jobstate = JOBST_JOBQGLOBAL;
}
TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
remove = 0;
mtx_lock(&aio_job_mtx);
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
mtx_unlock(&aio_job_mtx);
remove = 1;
} else if (cbe->jobstate == JOBST_JOBQSOCK) {
fp = cbe->fd_file;
MPASS(fp->f_type == DTYPE_SOCKET);
so = fp->f_data;
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
remove = 1;
}
mtx_unlock(&aio_job_mtx);
if (remove) {
cbe->jobstate = JOBST_JOBFINISHED;
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
aio_bio_done_notify(p, cbe, DONE_QUEUE);
} else {
mtx_unlock(&aio_job_mtx);
}
}
if (TAILQ_FIRST(&ki->kaio_sockqueue))
goto restart;
/* Wait for all running I/O to be finished */
if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
TAILQ_FIRST(&ki->kaio_jobqueue)) {
@ -693,14 +717,7 @@ aio_selectjob(struct aiothreadlist *aiop)
* and this code should work in all instances for every type of file, including
* pipes, sockets, fifos, and regular files.
*
* XXX I don't think these code work well with pipes, sockets and fifo, the
* problem is the aiod threads can be blocked if there is not data or no
* buffer space, and file was not opened with O_NONBLOCK, all aiod threads
* will be blocked if there is couple of such processes. We need a FOF_OFFSET
* like flag to override f_flag to tell low level system to do non-blocking
* I/O, we can not muck O_NONBLOCK because there is full of race between
* userland and aiod threads, although there is a trigger mechanism for socket,
* but it also does not work well if userland is misbehaviored.
* XXX I don't think it works well for socket, pipe, and fifo.
*/
static void
aio_process(struct aiocblist *aiocbe)
@ -1186,47 +1203,33 @@ static void
aio_swake_cb(struct socket *so, struct sockbuf *sb)
{
struct aiocblist *cb, *cbn;
struct proc *p;
struct kaioinfo *ki = NULL;
int opcode, wakecount = 0;
struct aiothreadlist *aiop;
if (sb == &so->so_snd) {
if (sb == &so->so_snd)
opcode = LIO_WRITE;
SOCKBUF_LOCK(&so->so_snd);
so->so_snd.sb_flags &= ~SB_AIO;
SOCKBUF_UNLOCK(&so->so_snd);
} else {
else
opcode = LIO_READ;
SOCKBUF_LOCK(&so->so_rcv);
so->so_rcv.sb_flags &= ~SB_AIO;
SOCKBUF_UNLOCK(&so->so_rcv);
}
mtx_lock(&aio_sock_mtx);
SOCKBUF_LOCK(sb);
sb->sb_flags &= ~SB_AIO;
mtx_lock(&aio_job_mtx);
TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
if (opcode == cb->uaiocb.aio_lio_opcode) {
if (cb->jobstate != JOBST_JOBQSOCK)
panic("invalid queue value");
p = cb->userproc;
ki = p->p_aioinfo;
TAILQ_REMOVE(&so->so_aiojobq, cb, list);
PROC_LOCK(p);
TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
/*
* XXX check AIO_RUNDOWN, and don't put on
* jobqueue if it was set.
/* XXX
* We don't have actual sockets backend yet,
* so we simply move the requests to the generic
* file I/O backend.
*/
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
cb->jobstate = JOBST_JOBQGLOBAL;
mtx_lock(&aio_job_mtx);
TAILQ_REMOVE(&so->so_aiojobq, cb, list);
TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
mtx_unlock(&aio_job_mtx);
PROC_UNLOCK(p);
wakecount++;
}
}
mtx_unlock(&aio_sock_mtx);
mtx_unlock(&aio_job_mtx);
SOCKBUF_UNLOCK(sb);
while (wakecount--) {
mtx_lock(&aio_job_mtx);
@ -1426,14 +1429,15 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
SOCKBUF_LOCK(sb);
if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
LIO_WRITE) && (!sowriteable(so)))) {
mtx_lock(&aio_sock_mtx);
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
mtx_unlock(&aio_sock_mtx);
sb->sb_flags |= SB_AIO;
mtx_lock(&aio_job_mtx);
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
mtx_unlock(&aio_job_mtx);
PROC_LOCK(p);
TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
aiocbe->jobstate = JOBST_JOBQSOCK;
ki->kaio_count++;
if (lj)
@ -1651,6 +1655,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
struct file *fp;
struct socket *so;
int error;
int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@ -1671,32 +1676,6 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
td->td_retval[0] = AIO_NOTCANCELED;
return (0);
}
} else if (fp->f_type == DTYPE_SOCKET) {
so = fp->f_data;
mtx_lock(&aio_sock_mtx);
TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) {
if (cbe->userproc == p &&
(uap->aiocbp == NULL ||
uap->aiocbp == cbe->uuaiocb)) {
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
PROC_LOCK(p);
TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
cbe->jobstate = JOBST_JOBRUNNING;
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
aio_bio_done_notify(p, cbe, DONE_QUEUE);
PROC_UNLOCK(p);
cancelled++;
if (uap->aiocbp != NULL)
break;
}
}
mtx_unlock(&aio_sock_mtx);
if (cancelled && uap->aiocbp != NULL) {
fdrop(fp, td);
td->td_retval[0] = AIO_CANCELED;
return (0);
}
}
PROC_LOCK(p);
@ -1704,33 +1683,55 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
if ((uap->fd == cbe->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == cbe->uuaiocb))) {
remove = 0;
mtx_lock(&aio_job_mtx);
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
mtx_unlock(&aio_job_mtx);
remove = 1;
} else if (cbe->jobstate == JOBST_JOBQSOCK) {
MPASS(fp->f_type == DTYPE_SOCKET);
so = fp->f_data;
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
remove = 1;
}
mtx_unlock(&aio_job_mtx);
if (remove) {
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
aio_bio_done_notify(p, cbe, DONE_QUEUE);
cancelled++;
} else {
mtx_unlock(&aio_job_mtx);
notcancelled++;
}
if (uap->aiocbp != NULL)
break;
}
}
PROC_UNLOCK(p);
done:
fdrop(fp, td);
if (uap->aiocbp != NULL) {
if (cancelled) {
td->td_retval[0] = AIO_CANCELED;
return (0);
}
}
if (notcancelled) {
td->td_retval[0] = AIO_NOTCANCELED;
return (0);
}
if (cancelled) {
td->td_retval[0] = AIO_CANCELED;
return (0);
}
td->td_retval[0] = AIO_ALLDONE;
return (0);