Add locking annotation and comments about socket, pipe, fifo problem.
Temporarily fix a locking problem for socket I/O.
This commit is contained in:
parent
0ba6836a1e
commit
1f5105d8a7
@ -184,24 +184,50 @@ typedef struct oaiocb {
|
||||
struct __aiocb_private _aiocb_private;
|
||||
} oaiocb_t;
|
||||
|
||||
/*
|
||||
* Below is a key of locks used to protect each member of struct aiocblist
|
||||
* aioliojob and kaioinfo and any backends.
|
||||
*
|
||||
* * - need not protected
|
||||
* a - locked by proc mtx
|
||||
* b - locked by backend lock, the backend lock can be null in some cases,
|
||||
* for example, BIO belongs to this type, in this case, proc lock is
|
||||
* reused.
|
||||
* c - locked by aio_job_mtx, the lock for the generic file I/O backend.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Current, there is only two backends: BIO and generic file I/O.
|
||||
* socket I/O is served by generic file I/O, this is not a good idea, since
|
||||
* disk file I/O and any other types without O_NONBLOCK flag can block daemon
|
||||
* threads, if there is no thread to serve socket I/O, the socket I/O will be
|
||||
* delayed too long or starved, we should create some threads dedicated to
|
||||
* sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
|
||||
* systems we really need non-blocking interface, fiddling O_NONBLOCK in file
|
||||
* structure is not safe because there is race between userland and aio
|
||||
* daemons.
|
||||
*/
|
||||
|
||||
struct aiocblist {
|
||||
TAILQ_ENTRY(aiocblist) list; /* List of jobs */
|
||||
TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */
|
||||
TAILQ_ENTRY(aiocblist) allist;
|
||||
int jobflags;
|
||||
int jobstate;
|
||||
int inputcharge;
|
||||
int outputcharge;
|
||||
struct buf *bp; /* Buffer pointer */
|
||||
struct proc *userproc; /* User process */
|
||||
struct ucred *cred; /* Active credential when created */
|
||||
struct file *fd_file; /* Pointer to file structure */
|
||||
struct aioliojob *lio; /* Optional lio job */
|
||||
struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */
|
||||
struct knlist klist; /* list of knotes */
|
||||
struct aiocb uaiocb; /* Kernel I/O control block */
|
||||
ksiginfo_t ksi; /* Realtime signal info */
|
||||
struct task biotask;
|
||||
TAILQ_ENTRY(aiocblist) list; /* (b) internal list of for backend */
|
||||
TAILQ_ENTRY(aiocblist) plist; /* (a) list of jobs for each backend */
|
||||
TAILQ_ENTRY(aiocblist) allist; /* (a) list of all jobs in proc */
|
||||
int jobflags; /* (a) job flags */
|
||||
int jobstate; /* (b) job state */
|
||||
int inputcharge; /* (*) input blockes */
|
||||
int outputcharge; /* (*) output blockes */
|
||||
struct buf *bp; /* (*) private to BIO backend,
|
||||
* buffer pointer
|
||||
*/
|
||||
struct proc *userproc; /* (*) user process */
|
||||
struct ucred *cred; /* (*) active credential when created */
|
||||
struct file *fd_file; /* (*) pointer to file structure */
|
||||
struct aioliojob *lio; /* (*) optional lio job */
|
||||
struct aiocb *uuaiocb; /* (*) pointer in userspace of aiocb */
|
||||
struct knlist klist; /* (a) list of knotes */
|
||||
struct aiocb uaiocb; /* (*) kernel I/O control block */
|
||||
ksiginfo_t ksi; /* (a) realtime signal info */
|
||||
struct task biotask; /* (*) private to BIO backend */
|
||||
};
|
||||
|
||||
/* jobflags */
|
||||
@ -215,22 +241,22 @@ struct aiocblist {
|
||||
#define AIOP_FREE 0x1 /* proc on free queue */
|
||||
|
||||
struct aiothreadlist {
|
||||
int aiothreadflags; /* AIO proc flags */
|
||||
TAILQ_ENTRY(aiothreadlist) list; /* List of processes */
|
||||
struct thread *aiothread; /* The AIO thread */
|
||||
int aiothreadflags; /* (c) AIO proc flags */
|
||||
TAILQ_ENTRY(aiothreadlist) list; /* (c) list of processes */
|
||||
struct thread *aiothread; /* (*) the AIO thread */
|
||||
};
|
||||
|
||||
/*
|
||||
* data-structure for lio signal management
|
||||
*/
|
||||
struct aioliojob {
|
||||
int lioj_flags;
|
||||
int lioj_count;
|
||||
int lioj_finished_count;
|
||||
struct sigevent lioj_signal; /* signal on all I/O done */
|
||||
TAILQ_ENTRY(aioliojob) lioj_list;
|
||||
struct knlist klist; /* list of knotes */
|
||||
ksiginfo_t lioj_ksi; /* Realtime signal info */
|
||||
int lioj_flags; /* (a) listio flags */
|
||||
int lioj_count; /* (a) listio flags */
|
||||
int lioj_finished_count; /* (a) listio flags */
|
||||
struct sigevent lioj_signal; /* (a) signal on all I/O done */
|
||||
TAILQ_ENTRY(aioliojob) lioj_list; /* (a) lio list */
|
||||
struct knlist klist; /* (a) list of knotes */
|
||||
ksiginfo_t lioj_ksi; /* (a) Realtime signal info */
|
||||
};
|
||||
|
||||
#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
|
||||
@ -241,29 +267,31 @@ struct aioliojob {
|
||||
* per process aio data structure
|
||||
*/
|
||||
struct kaioinfo {
|
||||
int kaio_flags; /* per process kaio flags */
|
||||
int kaio_maxactive_count; /* maximum number of AIOs */
|
||||
int kaio_active_count; /* number of currently used AIOs */
|
||||
int kaio_qallowed_count; /* maxiumu size of AIO queue */
|
||||
int kaio_count; /* size of AIO queue */
|
||||
int kaio_ballowed_count; /* maximum number of buffers */
|
||||
int kaio_buffer_count; /* number of physio buffers */
|
||||
TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */
|
||||
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */
|
||||
TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */
|
||||
int kaio_flags; /* (a) per process kaio flags */
|
||||
int kaio_maxactive_count; /* (*) maximum number of AIOs */
|
||||
int kaio_active_count; /* (c) number of currently used AIOs */
|
||||
int kaio_qallowed_count; /* (*) maxiumu size of AIO queue */
|
||||
int kaio_count; /* (a) size of AIO queue */
|
||||
int kaio_ballowed_count; /* (*) maximum number of buffers */
|
||||
int kaio_buffer_count; /* (a) number of physio buffers */
|
||||
TAILQ_HEAD(,aiocblist) kaio_all; /* (a) all AIOs in the process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_done; /* (a) done queue for process */
|
||||
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
|
||||
TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* (a) job queue for process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* (a) buffer job queue for process */
|
||||
TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* (a) queue for aios waiting on sockets,
|
||||
* not used yet.
|
||||
*/
|
||||
};
|
||||
|
||||
#define KAIO_RUNDOWN 0x1 /* process is being run down */
|
||||
#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
|
||||
|
||||
static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */
|
||||
static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* (c) Idle daemons */
|
||||
static struct sema aio_newproc_sem;
|
||||
static struct mtx aio_job_mtx;
|
||||
static struct mtx aio_sock_mtx;
|
||||
static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
|
||||
static TAILQ_HEAD(,aiocblist) aio_jobs; /* (c) Async job list */
|
||||
static struct unrhdr *aiod_unr;
|
||||
|
||||
static void aio_init_aioinfo(struct proc *p);
|
||||
@ -587,6 +615,7 @@ aio_proc_rundown(void *arg, struct proc *p)
|
||||
struct aiocblist *cbe, *cbn;
|
||||
struct file *fp;
|
||||
struct socket *so;
|
||||
int remove;
|
||||
|
||||
KASSERT(curthread->td_proc == p,
|
||||
("%s: called on non-curproc", __func__));
|
||||
@ -603,35 +632,30 @@ restart:
|
||||
* Try to cancel all pending requests. This code simulates
|
||||
* aio_cancel on all pending I/O requests.
|
||||
*/
|
||||
while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) {
|
||||
fp = cbe->fd_file;
|
||||
so = fp->f_data;
|
||||
mtx_lock(&aio_sock_mtx);
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
|
||||
mtx_unlock(&aio_sock_mtx);
|
||||
TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
|
||||
TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist);
|
||||
cbe->jobstate = JOBST_JOBQGLOBAL;
|
||||
}
|
||||
|
||||
TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
|
||||
remove = 0;
|
||||
mtx_lock(&aio_job_mtx);
|
||||
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
|
||||
TAILQ_REMOVE(&aio_jobs, cbe, list);
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
remove = 1;
|
||||
} else if (cbe->jobstate == JOBST_JOBQSOCK) {
|
||||
fp = cbe->fd_file;
|
||||
MPASS(fp->f_type == DTYPE_SOCKET);
|
||||
so = fp->f_data;
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
|
||||
remove = 1;
|
||||
}
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
|
||||
if (remove) {
|
||||
cbe->jobstate = JOBST_JOBFINISHED;
|
||||
cbe->uaiocb._aiocb_private.status = -1;
|
||||
cbe->uaiocb._aiocb_private.error = ECANCELED;
|
||||
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
|
||||
aio_bio_done_notify(p, cbe, DONE_QUEUE);
|
||||
} else {
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
}
|
||||
}
|
||||
|
||||
if (TAILQ_FIRST(&ki->kaio_sockqueue))
|
||||
goto restart;
|
||||
|
||||
/* Wait for all running I/O to be finished */
|
||||
if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
|
||||
TAILQ_FIRST(&ki->kaio_jobqueue)) {
|
||||
@ -693,14 +717,7 @@ aio_selectjob(struct aiothreadlist *aiop)
|
||||
* and this code should work in all instances for every type of file, including
|
||||
* pipes, sockets, fifos, and regular files.
|
||||
*
|
||||
* XXX I don't think these code work well with pipes, sockets and fifo, the
|
||||
* problem is the aiod threads can be blocked if there is not data or no
|
||||
* buffer space, and file was not opened with O_NONBLOCK, all aiod threads
|
||||
* will be blocked if there is couple of such processes. We need a FOF_OFFSET
|
||||
* like flag to override f_flag to tell low level system to do non-blocking
|
||||
* I/O, we can not muck O_NONBLOCK because there is full of race between
|
||||
* userland and aiod threads, although there is a trigger mechanism for socket,
|
||||
* but it also does not work well if userland is misbehaviored.
|
||||
* XXX I don't think it works well for socket, pipe, and fifo.
|
||||
*/
|
||||
static void
|
||||
aio_process(struct aiocblist *aiocbe)
|
||||
@ -1186,47 +1203,33 @@ static void
|
||||
aio_swake_cb(struct socket *so, struct sockbuf *sb)
|
||||
{
|
||||
struct aiocblist *cb, *cbn;
|
||||
struct proc *p;
|
||||
struct kaioinfo *ki = NULL;
|
||||
int opcode, wakecount = 0;
|
||||
struct aiothreadlist *aiop;
|
||||
|
||||
if (sb == &so->so_snd) {
|
||||
if (sb == &so->so_snd)
|
||||
opcode = LIO_WRITE;
|
||||
SOCKBUF_LOCK(&so->so_snd);
|
||||
so->so_snd.sb_flags &= ~SB_AIO;
|
||||
SOCKBUF_UNLOCK(&so->so_snd);
|
||||
} else {
|
||||
else
|
||||
opcode = LIO_READ;
|
||||
SOCKBUF_LOCK(&so->so_rcv);
|
||||
so->so_rcv.sb_flags &= ~SB_AIO;
|
||||
SOCKBUF_UNLOCK(&so->so_rcv);
|
||||
}
|
||||
|
||||
mtx_lock(&aio_sock_mtx);
|
||||
SOCKBUF_LOCK(sb);
|
||||
sb->sb_flags &= ~SB_AIO;
|
||||
mtx_lock(&aio_job_mtx);
|
||||
TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
|
||||
if (opcode == cb->uaiocb.aio_lio_opcode) {
|
||||
if (cb->jobstate != JOBST_JOBQSOCK)
|
||||
panic("invalid queue value");
|
||||
p = cb->userproc;
|
||||
ki = p->p_aioinfo;
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cb, list);
|
||||
PROC_LOCK(p);
|
||||
TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
|
||||
/*
|
||||
* XXX check AIO_RUNDOWN, and don't put on
|
||||
* jobqueue if it was set.
|
||||
/* XXX
|
||||
* We don't have actual sockets backend yet,
|
||||
* so we simply move the requests to the generic
|
||||
* file I/O backend.
|
||||
*/
|
||||
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
|
||||
cb->jobstate = JOBST_JOBQGLOBAL;
|
||||
mtx_lock(&aio_job_mtx);
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cb, list);
|
||||
TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
PROC_UNLOCK(p);
|
||||
wakecount++;
|
||||
}
|
||||
}
|
||||
mtx_unlock(&aio_sock_mtx);
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
SOCKBUF_UNLOCK(sb);
|
||||
|
||||
while (wakecount--) {
|
||||
mtx_lock(&aio_job_mtx);
|
||||
@ -1426,14 +1429,15 @@ no_kqueue:
|
||||
SOCKBUF_LOCK(sb);
|
||||
if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
|
||||
LIO_WRITE) && (!sowriteable(so)))) {
|
||||
mtx_lock(&aio_sock_mtx);
|
||||
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
|
||||
mtx_unlock(&aio_sock_mtx);
|
||||
|
||||
sb->sb_flags |= SB_AIO;
|
||||
|
||||
mtx_lock(&aio_job_mtx);
|
||||
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
|
||||
PROC_LOCK(p);
|
||||
TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
|
||||
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
|
||||
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
|
||||
aiocbe->jobstate = JOBST_JOBQSOCK;
|
||||
ki->kaio_count++;
|
||||
if (lj)
|
||||
@ -1651,6 +1655,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
|
||||
struct file *fp;
|
||||
struct socket *so;
|
||||
int error;
|
||||
int remove;
|
||||
int cancelled = 0;
|
||||
int notcancelled = 0;
|
||||
struct vnode *vp;
|
||||
@ -1671,32 +1676,6 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
|
||||
td->td_retval[0] = AIO_NOTCANCELED;
|
||||
return (0);
|
||||
}
|
||||
} else if (fp->f_type == DTYPE_SOCKET) {
|
||||
so = fp->f_data;
|
||||
mtx_lock(&aio_sock_mtx);
|
||||
TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) {
|
||||
if (cbe->userproc == p &&
|
||||
(uap->aiocbp == NULL ||
|
||||
uap->aiocbp == cbe->uuaiocb)) {
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
|
||||
PROC_LOCK(p);
|
||||
TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
|
||||
cbe->jobstate = JOBST_JOBRUNNING;
|
||||
cbe->uaiocb._aiocb_private.status = -1;
|
||||
cbe->uaiocb._aiocb_private.error = ECANCELED;
|
||||
aio_bio_done_notify(p, cbe, DONE_QUEUE);
|
||||
PROC_UNLOCK(p);
|
||||
cancelled++;
|
||||
if (uap->aiocbp != NULL)
|
||||
break;
|
||||
}
|
||||
}
|
||||
mtx_unlock(&aio_sock_mtx);
|
||||
if (cancelled && uap->aiocbp != NULL) {
|
||||
fdrop(fp, td);
|
||||
td->td_retval[0] = AIO_CANCELED;
|
||||
return (0);
|
||||
}
|
||||
}
|
||||
|
||||
PROC_LOCK(p);
|
||||
@ -1704,33 +1683,55 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
|
||||
if ((uap->fd == cbe->uaiocb.aio_fildes) &&
|
||||
((uap->aiocbp == NULL) ||
|
||||
(uap->aiocbp == cbe->uuaiocb))) {
|
||||
remove = 0;
|
||||
|
||||
mtx_lock(&aio_job_mtx);
|
||||
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
|
||||
TAILQ_REMOVE(&aio_jobs, cbe, list);
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
remove = 1;
|
||||
} else if (cbe->jobstate == JOBST_JOBQSOCK) {
|
||||
MPASS(fp->f_type == DTYPE_SOCKET);
|
||||
so = fp->f_data;
|
||||
TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
|
||||
remove = 1;
|
||||
}
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
|
||||
if (remove) {
|
||||
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
|
||||
cbe->uaiocb._aiocb_private.status = -1;
|
||||
cbe->uaiocb._aiocb_private.error = ECANCELED;
|
||||
aio_bio_done_notify(p, cbe, DONE_QUEUE);
|
||||
cancelled++;
|
||||
} else {
|
||||
mtx_unlock(&aio_job_mtx);
|
||||
notcancelled++;
|
||||
}
|
||||
if (uap->aiocbp != NULL)
|
||||
break;
|
||||
}
|
||||
}
|
||||
PROC_UNLOCK(p);
|
||||
|
||||
done:
|
||||
fdrop(fp, td);
|
||||
|
||||
if (uap->aiocbp != NULL) {
|
||||
if (cancelled) {
|
||||
td->td_retval[0] = AIO_CANCELED;
|
||||
return (0);
|
||||
}
|
||||
}
|
||||
|
||||
if (notcancelled) {
|
||||
td->td_retval[0] = AIO_NOTCANCELED;
|
||||
return (0);
|
||||
}
|
||||
|
||||
if (cancelled) {
|
||||
td->td_retval[0] = AIO_CANCELED;
|
||||
return (0);
|
||||
}
|
||||
|
||||
td->td_retval[0] = AIO_ALLDONE;
|
||||
|
||||
return (0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user