Add in kqueue support to LIO event notification and fix how it handled

notifications when LIO operations completed.  These were the problems
with LIO event complete notification:
      -	Move all LIO/AIO event notification into one general function
	so we don't have bugs in different data paths.  This unification
	got rid of several notification bugs one of which if kqueue was
	used a SIGILL could get sent to the process.
      -	Change the LIO event accounting to count all AIO request that
	could have been split across the fast path and daemon mode.
	The prior accounting only kept track of AIO op's in that
	mode and not the entire list of operations.  This could cause
	a bogus LIO event complete notification to occur when all of
	the fast path AIO op's completed and not the AIO op's that
	ended up queued for the daemon.

Suggestions from:	alc
This commit is contained in:
Doug Ambrisko 2005-10-12 17:51:31 +00:00
parent 7fb8511e34
commit 69cd28dacb
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=151260
3 changed files with 206 additions and 125 deletions

View File

@ -247,6 +247,7 @@ static struct {
{ &timer_filtops }, /* EVFILT_TIMER */
{ &file_filtops }, /* EVFILT_NETDEV */
{ &fs_filtops }, /* EVFILT_FS */
{ &null_filtops }, /* EVFILT_LIO */
};
/*
@ -633,6 +634,8 @@ kern_kevent(struct thread *td, int fd, int nchanges, int nevents,
changes = keva;
for (i = 0; i < n; i++) {
kevp = &changes[i];
if (!kevp->filter)
continue;
kevp->flags &= ~EV_SYSFLAGS;
error = kqueue_register(kq, kevp, td, 1);
if (error) {
@ -1828,7 +1831,7 @@ knote_attach(struct knote *kn, struct kqueue *kq)
}
/*
* knote must already have been detatched using the f_detach method.
* knote must already have been detached using the f_detach method.
* no lock need to be held, it is assumed that the KN_INFLUX flag is set
* to prevent other removal.
*/
@ -1850,7 +1853,8 @@ knote_drop(struct knote *kn, struct thread *td)
else
list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)];
SLIST_REMOVE(list, kn, knote, kn_link);
if (!SLIST_EMPTY(list))
SLIST_REMOVE(list, kn, knote, kn_link);
if (kn->kn_status & KN_QUEUED)
knote_dequeue(kn);
KQ_UNLOCK_FLUX(kq);

View File

@ -211,11 +211,14 @@ struct aio_liojob {
int lioj_buffer_finished_count;
int lioj_queue_count;
int lioj_queue_finished_count;
int lioj_total_count;
struct sigevent lioj_signal; /* signal on all I/O done */
TAILQ_ENTRY(aio_liojob) lioj_list;
struct knlist klist; /* list of knotes */
};
#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
#define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */
#define LIOJ_KEVENT_POSTED 0x4 /* kevent triggered */
/*
* per process aio data structure
@ -262,6 +265,12 @@ static int aio_unload(void);
static int filt_aioattach(struct knote *kn);
static void filt_aiodetach(struct knote *kn);
static int filt_aio(struct knote *kn, long hint);
static int filt_lioattach(struct knote *kn);
static void filt_liodetach(struct knote *kn);
static int filt_lio(struct knote *kn, long hint);
#define DONE_BUF 1
#define DONE_QUEUE 2
static void aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type);
/*
* Zones for:
@ -276,6 +285,8 @@ static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone;
/* kqueue filters for aio */
static struct filterops aio_filtops =
{ 0, filt_aioattach, filt_aiodetach, filt_aio };
static struct filterops lio_filtops =
{ 0, filt_lioattach, filt_liodetach, filt_lio };
static eventhandler_tag exit_tag, exec_tag;
@ -336,6 +347,7 @@ aio_onceonly(void)
exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL,
EVENTHANDLER_PRI_ANY);
kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
kqueue_add_filteropts(EVFILT_LIO, &lio_filtops);
TAILQ_INIT(&aio_freeproc);
mtx_init(&aio_freeproc_mtx, "aio_freeproc", NULL, MTX_DEF);
TAILQ_INIT(&aio_jobs);
@ -486,6 +498,8 @@ aio_free_entry(struct aiocblist *aiocbe)
* OWNING thread? (or maybe the running thread?)
* There is a semantic problem here...
*/
if (lj)
knlist_delete(&lj->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */
knlist_delete(&aiocbe->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */
if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN)
@ -757,6 +771,67 @@ aio_process(struct aiocblist *aiocbe)
td->td_ucred = td_savedcred;
}
static void
aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type){
int lj_done;
struct aio_liojob *lj;
struct kaioinfo *ki;
ki = userp->p_aioinfo;
lj = aiocbe->lio;
lj_done = 0;
if (lj) {
if (type == DONE_QUEUE)
lj->lioj_queue_finished_count++;
else
lj->lioj_buffer_finished_count++;
if (lj->lioj_queue_finished_count +
lj->lioj_buffer_finished_count ==
lj->lioj_total_count)
lj_done = 1;
}
if (ki) {
if (type == DONE_QUEUE) {
ki->kaio_queue_finished_count++;
TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist);
} else {
ki->kaio_buffer_finished_count++;
TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
}
if (lj_done) {
if (!knlist_empty(&lj->klist)
&& lj->lioj_signal.sigev_notify ==
SIGEV_KEVENT) {
lj->lioj_flags |= LIOJ_KEVENT_POSTED;
KNOTE_UNLOCKED(&lj->klist, 0);
}
if ((lj->lioj_flags &
(LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
== LIOJ_SIGNAL
&& lj->lioj_signal.sigev_notify == SIGEV_SIGNAL) {
PROC_LOCK(userp);
psignal(userp, lj->lioj_signal.sigev_signo);
PROC_UNLOCK(userp);
lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
}
}
KNOTE_UNLOCKED(&aiocbe->klist, 0);
if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
ki->kaio_flags &= ~KAIO_WAKEUP;
wakeup(userp);
}
}
if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
PROC_LOCK(userp);
psignal(userp, aiocbe->uaiocb.aio_sigevent.sigev_signo);
PROC_UNLOCK(userp);
}
}
/*
* The AIO daemon, most of the actual work is done in aio_process,
* but the setup (and address space mgmt) is done in this routine.
@ -765,7 +840,6 @@ static void
aio_daemon(void *uproc)
{
int s;
struct aio_liojob *lj;
struct aiocb *cb;
struct aiocblist *aiocbe;
struct aiothreadlist *aiop;
@ -883,7 +957,6 @@ aio_daemon(void *uproc)
}
ki = userp->p_aioinfo;
lj = aiocbe->lio;
/* Account for currently active jobs. */
ki->kaio_active_count++;
@ -891,57 +964,17 @@ aio_daemon(void *uproc)
/* Do the I/O function. */
aio_process(aiocbe);
s = splbio();
/* Decrement the active job count. */
ki->kaio_active_count--;
/*
* Increment the completion count for wakeup/signal
* comparisons.
*/
aiocbe->jobflags |= AIOCBLIST_DONE;
ki->kaio_queue_finished_count++;
if (lj)
lj->lioj_queue_finished_count++;
if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags
& KAIO_RUNDOWN) && (ki->kaio_active_count == 0))) {
ki->kaio_flags &= ~KAIO_WAKEUP;
wakeup(userp);
}
s = splbio();
if (lj && (lj->lioj_flags &
(LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == LIOJ_SIGNAL) {
if ((lj->lioj_queue_finished_count ==
lj->lioj_queue_count) &&
(lj->lioj_buffer_finished_count ==
lj->lioj_buffer_count)) {
PROC_LOCK(userp);
psignal(userp,
lj->lioj_signal.sigev_signo);
PROC_UNLOCK(userp);
lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
}
}
splx(s);
aiocbe->jobstate = JOBST_JOBFINISHED;
s = splnet();
TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist);
splx(s);
KNOTE_UNLOCKED(&aiocbe->klist, 0);
aio_bio_done_notify(userp, aiocbe, DONE_QUEUE);
if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
wakeup(aiocbe);
aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
}
if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
PROC_LOCK(userp);
psignal(userp, cb->aio_sigevent.sigev_signo);
PROC_UNLOCK(userp);
}
}
/*
@ -1049,7 +1082,7 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
struct vnode *vp;
struct kaioinfo *ki;
struct aio_liojob *lj;
int s;
int s, lj_done = 0;
int notify;
cb = &aiocbe->uaiocb;
@ -1075,6 +1108,9 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
if (cb->aio_nbytes % vp->v_bufobj.bo_bsize)
return (-1);
if (cb->aio_nbytes > vp->v_rdev->si_iosize_max)
return (-1);
if (cb->aio_nbytes >
MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK))
return (-1);
@ -1148,6 +1184,14 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
aiocbe->uaiocb._aiocb_private.error = bp->b_error;
suword(&job->_aiocb_private.error, bp->b_error);
if (lj) {
lj->lioj_buffer_finished_count++;
if (lj->lioj_queue_finished_count +
lj->lioj_buffer_finished_count ==
lj->lioj_total_count)
lj_done = 1;
}
ki->kaio_buffer_finished_count++;
if (aiocbe->jobstate != JOBST_JOBBFINISHED) {
@ -1159,8 +1203,19 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
}
}
splx(s);
if (notify)
if (notify) {
if (lj && !knlist_empty(&lj->klist)) {
lj->lioj_flags |= LIOJ_KEVENT_POSTED;
KNOTE_UNLOCKED(&lj->klist, 0);
}
KNOTE_UNLOCKED(&aiocbe->klist, 0);
}
if (cb->aio_lio_opcode == LIO_WRITE) {
aiocbe->outputcharge += btodb(cb->aio_nbytes);
} else if (cb->aio_lio_opcode == LIO_READ) {
aiocbe->inputcharge += btodb(cb->aio_nbytes);
}
return (0);
doerror:
@ -1546,19 +1601,12 @@ aio_return(struct thread *td, struct aio_return_args *uap)
PROC_LOCK(p);
TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) ==
jobref) {
if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
p->p_stats->p_ru.ru_oublock +=
cb->outputcharge;
cb->outputcharge = 0;
} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
p->p_stats->p_ru.ru_inblock += cb->inputcharge;
cb->inputcharge = 0;
}
jobref)
goto done;
}
}
s = splbio();
/* aio_physwakeup */
for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = ncb) {
ncb = TAILQ_NEXT(cb, plist);
if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo)
@ -1575,6 +1623,14 @@ aio_return(struct thread *td, struct aio_return_args *uap)
cb->uaiocb._aiocb_private.status;
} else
td->td_retval[0] = EFAULT;
if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
p->p_stats->p_ru.ru_oublock +=
cb->outputcharge;
cb->outputcharge = 0;
} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
p->p_stats->p_ru.ru_inblock += cb->inputcharge;
cb->inputcharge = 0;
}
aio_free_entry(cb);
return (0);
}
@ -1781,21 +1837,11 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
if (cbe->jobstate == JOBST_JOBQGLOBAL) {
TAILQ_REMOVE(&aio_jobs, cbe, list);
TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe,
plist);
cancelled++;
ki->kaio_queue_finished_count++;
cbe->jobstate = JOBST_JOBFINISHED;
cancelled++;
cbe->uaiocb._aiocb_private.status = -1;
cbe->uaiocb._aiocb_private.error = ECANCELED;
/* XXX cancelled, knote? */
if (cbe->uaiocb.aio_sigevent.sigev_notify ==
SIGEV_SIGNAL) {
PROC_LOCK(cbe->userproc);
psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo);
PROC_UNLOCK(cbe->userproc);
}
aio_bio_done_notify(cbe->userproc, cbe, DONE_QUEUE);
} else {
notcancelled++;
}
@ -1935,6 +1981,9 @@ lio_listio(struct thread *td, struct lio_listio_args *uap)
struct aiocblist *cb;
struct kaioinfo *ki;
struct aio_liojob *lj;
struct kevent kev;
struct kqueue * kq;
struct file *kq_fp;
int error, runningcode;
int nerror;
int i;
@ -1966,6 +2015,10 @@ lio_listio(struct thread *td, struct lio_listio_args *uap)
lj->lioj_buffer_finished_count = 0;
lj->lioj_queue_count = 0;
lj->lioj_queue_finished_count = 0;
lj->lioj_total_count = nent;
knlist_init(&lj->klist, NULL, NULL, NULL, NULL);
kev.ident = 0;
/*
* Setup signal.
@ -1977,12 +2030,41 @@ lio_listio(struct thread *td, struct lio_listio_args *uap)
uma_zfree(aiolio_zone, lj);
return (error);
}
if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
/* Assume only new style KEVENT */
kev.ident = lj->lioj_signal.sigev_notify_kqueue;
kev.udata = lj->lioj_signal.sigev_value.sigval_ptr;
if ((u_int)kev.ident >= p->p_fd->fd_nfiles ||
(kq_fp = p->p_fd->fd_ofiles[kev.ident]) == NULL ||
(kq_fp->f_type != DTYPE_KQUEUE)) {
uma_zfree(aiolio_zone, lj);
splx(s);
return (EBADF);
}
kq = (struct kqueue *)kq_fp->f_data;
kev.filter = EVFILT_LIO;
kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
kev.ident = (uintptr_t)lj; /* something unique */
kev.data = (intptr_t)lj;
error = kqueue_register(kq, &kev, td, 1);
if (error) {
uma_zfree(aiolio_zone, lj);
splx(s);
return (error);
}
} else if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
uma_zfree(aiolio_zone, lj);
return (EINVAL);
splx(s);
return EINVAL;
} else {
lj->lioj_flags |= LIOJ_SIGNAL;
lj->lioj_flags &= ~LIOJ_SIGNAL_POSTED;
}
lj->lioj_flags |= LIOJ_SIGNAL;
}
} else
lj->lioj_flags &= ~LIOJ_SIGNAL;
TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
/*
* Get pointers to the list of I/O requests.
@ -2102,9 +2184,7 @@ static void
aio_physwakeup(struct buf *bp)
{
struct aiocblist *aiocbe;
struct proc *p;
struct kaioinfo *ki;
struct aio_liojob *lj;
struct proc *userp;
mtx_lock(&Giant);
bp->b_flags |= B_DONE;
@ -2112,7 +2192,7 @@ aio_physwakeup(struct buf *bp)
aiocbe = (struct aiocblist *)bp->b_caller1;
if (aiocbe) {
p = aiocbe->userproc;
userp = aiocbe->userproc;
aiocbe->jobstate = JOBST_JOBBFINISHED;
aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
@ -2122,50 +2202,7 @@ aio_physwakeup(struct buf *bp)
if (bp->b_ioflags & BIO_ERROR)
aiocbe->uaiocb._aiocb_private.error = bp->b_error;
lj = aiocbe->lio;
if (lj) {
lj->lioj_buffer_finished_count++;
/*
* wakeup/signal if all of the interrupt jobs are done.
*/
if (lj->lioj_buffer_finished_count ==
lj->lioj_buffer_count &&
lj->lioj_queue_finished_count ==
lj->lioj_queue_count) {
/*
* Post a signal if it is called for.
*/
if ((lj->lioj_flags &
(LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) ==
LIOJ_SIGNAL) {
PROC_LOCK(p);
psignal(p, lj->lioj_signal.sigev_signo);
PROC_UNLOCK(p);
lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
}
}
}
ki = p->p_aioinfo;
if (ki) {
ki->kaio_buffer_finished_count++;
TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
KNOTE_UNLOCKED(&aiocbe->klist, 0);
/* Do the wakeup. */
if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
ki->kaio_flags &= ~KAIO_WAKEUP;
wakeup(p);
}
}
if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
PROC_LOCK(p);
psignal(p, aiocbe->uaiocb.aio_sigevent.sigev_signo);
PROC_UNLOCK(p);
}
aio_bio_done_notify(userp, aiocbe, DONE_BUF);
}
mtx_unlock(&Giant);
}
@ -2275,7 +2312,8 @@ filt_aiodetach(struct knote *kn)
{
struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
knlist_remove(&aiocbe->klist, kn, 0);
if (!knlist_empty(&aiocbe->klist))
knlist_remove(&aiocbe->klist, kn, 0);
}
/* kqueue filter function */
@ -2292,3 +2330,42 @@ filt_aio(struct knote *kn, long hint)
kn->kn_flags |= EV_EOF;
return (1);
}
/* kqueue attach function */
static int
filt_lioattach(struct knote *kn)
{
struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
/*
* The aio_liojob pointer must be validated before using it, so
* registration is restricted to the kernel; the user cannot
* set EV_FLAG1.
*/
if ((kn->kn_flags & EV_FLAG1) == 0)
return (EPERM);
kn->kn_flags &= ~EV_FLAG1;
knlist_add(&lj->klist, kn, 0);
return (0);
}
/* kqueue detach function */
static void
filt_liodetach(struct knote *kn)
{
struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
if (!knlist_empty(&lj->klist))
knlist_remove(&lj->klist, kn, 0);
}
/* kqueue filter function */
/*ARGSUSED*/
static int
filt_lio(struct knote *kn, long hint)
{
struct aio_liojob * lj = (struct aio_liojob *)kn->kn_sdata;
return (lj->lioj_flags & LIOJ_KEVENT_POSTED);
}

View File

@ -38,8 +38,8 @@
#define EVFILT_TIMER (-7) /* timers */
#define EVFILT_NETDEV (-8) /* network devices */
#define EVFILT_FS (-9) /* filesystem events */
#define EVFILT_SYSCOUNT 9
#define EVFILT_LIO (-10) /* timers */
#define EVFILT_SYSCOUNT 10
#define EV_SET(kevp_, a, b, c, d, e, f) do { \
struct kevent *kevp = (kevp_); \