1. Move code for scanning pending I/O from aio_fsync to aio_aqueue,

it has less overhead.
2. Avoid scheduling task if maximum number of I/O threads is reached.
This commit is contained in:
davidxu 2006-03-24 00:50:06 +00:00
parent c5cf5122a1
commit 60f31ebe10

View File

@ -313,7 +313,7 @@ static int aio_free_entry(struct aiocblist *aiocbe);
static void aio_process(struct aiocblist *aiocbe); static void aio_process(struct aiocblist *aiocbe);
static int aio_newproc(int *); static int aio_newproc(int *);
static int aio_aqueue(struct thread *td, struct aiocb *job, static int aio_aqueue(struct thread *td, struct aiocb *job,
struct aioliojob *lio, int type, int osigev, uint64_t *jseqno); struct aioliojob *lio, int type, int osigev);
static void aio_physwakeup(struct buf *bp); static void aio_physwakeup(struct buf *bp);
static void aio_proc_rundown(void *arg, struct proc *p); static void aio_proc_rundown(void *arg, struct proc *p);
static int aio_qphysio(struct proc *p, struct aiocblist *iocb); static int aio_qphysio(struct proc *p, struct aiocblist *iocb);
@ -325,7 +325,7 @@ static void aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, in
#define DONE_BUF 1 #define DONE_BUF 1
#define DONE_QUEUE 2 #define DONE_QUEUE 2
static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev); static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev);
static void aio_kick(struct proc *userp); static int aio_kick(struct proc *userp);
static void aio_kick_nowait(struct proc *userp); static void aio_kick_nowait(struct proc *userp);
static void aio_kick_helper(void *context, int pending); static void aio_kick_helper(void *context, int pending);
static int filt_aioattach(struct knote *kn); static int filt_aioattach(struct knote *kn);
@ -904,8 +904,7 @@ aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type)
notification_done: notification_done:
if (aiocbe->jobflags & AIOCBLIST_CHECKSYNC) { if (aiocbe->jobflags & AIOCBLIST_CHECKSYNC) {
TAILQ_FOREACH_SAFE(scb, &ki->kaio_syncqueue, list, scbn) { TAILQ_FOREACH_SAFE(scb, &ki->kaio_syncqueue, list, scbn) {
if (scb->pending != -1 && if (aiocbe->fd_file == scb->fd_file &&
aiocbe->fd_file == scb->fd_file &&
aiocbe->seqno < scb->seqno) { aiocbe->seqno < scb->seqno) {
if (--scb->pending == 0) { if (--scb->pending == 0) {
mtx_lock(&aio_job_mtx); mtx_lock(&aio_job_mtx);
@ -1308,12 +1307,12 @@ aio_swake_cb(struct socket *so, struct sockbuf *sb)
*/ */
static int static int
aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
int type, int oldsigev, uint64_t *jseqno) int type, int oldsigev)
{ {
struct proc *p = td->td_proc; struct proc *p = td->td_proc;
struct file *fp; struct file *fp;
struct socket *so; struct socket *so;
struct aiocblist *aiocbe; struct aiocblist *aiocbe, *cb;
struct kaioinfo *ki; struct kaioinfo *ki;
struct kevent kev; struct kevent kev;
struct kqueue *kq; struct kqueue *kq;
@ -1342,7 +1341,6 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO); aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO);
aiocbe->inputcharge = 0; aiocbe->inputcharge = 0;
aiocbe->outputcharge = 0; aiocbe->outputcharge = 0;
aiocbe->pending = -1;
knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL); knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL);
if (oldsigev) { if (oldsigev) {
@ -1408,7 +1406,7 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
goto aqueue_fail; goto aqueue_fail;
} }
if (aiocbe->uaiocb.aio_offset == -1LL) { if (opcode != LIO_SYNC && aiocbe->uaiocb.aio_offset == -1LL) {
error = EINVAL; error = EINVAL;
goto aqueue_fail; goto aqueue_fail;
} }
@ -1418,8 +1416,6 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
mtx_lock(&aio_job_mtx); mtx_lock(&aio_job_mtx);
jid = jobrefid++; jid = jobrefid++;
aiocbe->seqno = jobseqno++; aiocbe->seqno = jobseqno++;
if (jseqno)
*jseqno = aiocbe->seqno;
mtx_unlock(&aio_job_mtx); mtx_unlock(&aio_job_mtx);
error = suword(&job->_aiocb_private.kernelinfo, jid); error = suword(&job->_aiocb_private.kernelinfo, jid);
if (error) { if (error) {
@ -1540,15 +1536,34 @@ queueit:
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
if (opcode == LIO_SYNC) { if (opcode == LIO_SYNC) {
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, aiocbe, list); TAILQ_FOREACH(cb, &ki->kaio_jobqueue, plist) {
aiocbe->jobstate = JOBST_JOBQSYNC; if (cb->fd_file == aiocbe->fd_file &&
} else { cb->uaiocb.aio_lio_opcode != LIO_SYNC &&
mtx_lock(&aio_job_mtx); cb->seqno < aiocbe->seqno) {
TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); cb->jobflags |= AIOCBLIST_CHECKSYNC;
aiocbe->jobstate = JOBST_JOBQGLOBAL; aiocbe->pending++;
aio_kick_nowait(p); }
mtx_unlock(&aio_job_mtx); }
TAILQ_FOREACH(cb, &ki->kaio_bufqueue, plist) {
if (cb->fd_file == aiocbe->fd_file &&
cb->uaiocb.aio_lio_opcode != LIO_SYNC &&
cb->seqno < aiocbe->seqno) {
cb->jobflags |= AIOCBLIST_CHECKSYNC;
aiocbe->pending++;
}
}
if (aiocbe->pending != 0) {
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, aiocbe, list);
aiocbe->jobstate = JOBST_JOBQSYNC;
PROC_UNLOCK(p);
goto done;
}
} }
mtx_lock(&aio_job_mtx);
TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
aiocbe->jobstate = JOBST_JOBQGLOBAL;
aio_kick_nowait(p);
mtx_unlock(&aio_job_mtx);
PROC_UNLOCK(p); PROC_UNLOCK(p);
error = 0; error = 0;
done: done:
@ -1566,17 +1581,19 @@ aio_kick_nowait(struct proc *userp)
TAILQ_REMOVE(&aio_freeproc, aiop, list); TAILQ_REMOVE(&aio_freeproc, aiop, list);
aiop->aiothreadflags &= ~AIOP_FREE; aiop->aiothreadflags &= ~AIOP_FREE;
wakeup(aiop->aiothread); wakeup(aiop->aiothread);
} else { } else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) &&
((ki->kaio_active_count + num_aio_resv_start) <
ki->kaio_maxactive_count)) {
taskqueue_enqueue(taskqueue_aiod_bio, &ki->kaio_task); taskqueue_enqueue(taskqueue_aiod_bio, &ki->kaio_task);
} }
} }
static void static int
aio_kick(struct proc *userp) aio_kick(struct proc *userp)
{ {
struct kaioinfo *ki = userp->p_aioinfo; struct kaioinfo *ki = userp->p_aioinfo;
struct aiothreadlist *aiop; struct aiothreadlist *aiop;
int error; int error, ret = 0;
mtx_assert(&aio_job_mtx, MA_OWNED); mtx_assert(&aio_job_mtx, MA_OWNED);
retryproc: retryproc:
@ -1595,7 +1612,10 @@ retryproc:
num_aio_resv_start--; num_aio_resv_start--;
goto retryproc; goto retryproc;
} }
} else {
ret = -1;
} }
return (ret);
} }
static void static void
@ -1604,8 +1624,10 @@ aio_kick_helper(void *context, int pending)
struct proc *userp = context; struct proc *userp = context;
mtx_lock(&aio_job_mtx); mtx_lock(&aio_job_mtx);
while (--pending >= 0) while (--pending >= 0) {
aio_kick(userp); if (aio_kick(userp))
break;
}
mtx_unlock(&aio_job_mtx); mtx_unlock(&aio_job_mtx);
} }
@ -1893,14 +1915,14 @@ int
oaio_read(struct thread *td, struct oaio_read_args *uap) oaio_read(struct thread *td, struct oaio_read_args *uap)
{ {
return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1, NULL); return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1);
} }
int int
aio_read(struct thread *td, struct aio_read_args *uap) aio_read(struct thread *td, struct aio_read_args *uap)
{ {
return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0, NULL); return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0);
} }
/* syscall - asynchronous write to a file (REALTIME) */ /* syscall - asynchronous write to a file (REALTIME) */
@ -1908,14 +1930,14 @@ int
oaio_write(struct thread *td, struct oaio_write_args *uap) oaio_write(struct thread *td, struct oaio_write_args *uap)
{ {
return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1, NULL); return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1);
} }
int int
aio_write(struct thread *td, struct aio_write_args *uap) aio_write(struct thread *td, struct aio_write_args *uap)
{ {
return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0, NULL); return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0);
} }
/* syscall - list directed I/O (REALTIME) */ /* syscall - list directed I/O (REALTIME) */
@ -2039,7 +2061,7 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
for (i = 0; i < uap->nent; i++) { for (i = 0; i < uap->nent; i++) {
iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]); iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) { if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
error = aio_aqueue(td, iocb, lj, LIO_NOP, oldsigev, NULL); error = aio_aqueue(td, iocb, lj, LIO_NOP, oldsigev);
if (error != 0) if (error != 0)
nerror++; nerror++;
} }
@ -2213,62 +2235,14 @@ int
aio_fsync(struct thread *td, struct aio_fsync_args *uap) aio_fsync(struct thread *td, struct aio_fsync_args *uap)
{ {
struct proc *p = td->td_proc; struct proc *p = td->td_proc;
struct aiocblist *cb, *scb;
struct kaioinfo *ki; struct kaioinfo *ki;
struct aiocb uaiocb;
int error;
uint64_t jseqno;
if (uap->op != O_SYNC) /* XXX lack of O_DSYNC */ if (uap->op != O_SYNC) /* XXX lack of O_DSYNC */
return (EINVAL); return (EINVAL);
error = copyin(uap->aiocbp, &uaiocb, sizeof(struct aiocb));
if (error)
return (error);
ki = p->p_aioinfo; ki = p->p_aioinfo;
if (ki == NULL) if (ki == NULL)
aio_init_aioinfo(p); aio_init_aioinfo(p);
ki = p->p_aioinfo; return aio_aqueue(td, uap->aiocbp, NULL, LIO_SYNC, 0);
error = aio_aqueue(td, uap->aiocbp, NULL, LIO_SYNC, 0, &jseqno);
if (error)
return (error);
PROC_LOCK(p);
TAILQ_FOREACH(scb, &ki->kaio_syncqueue, plist) {
if (scb->seqno == jseqno)
break;
}
if (scb == NULL) {
PROC_UNLOCK(p);
return (0);
}
scb->pending = 0;
TAILQ_FOREACH(cb, &ki->kaio_jobqueue, plist) {
if (cb->fd_file == scb->fd_file &&
cb->uaiocb.aio_lio_opcode != LIO_SYNC &&
cb->seqno < scb->seqno) {
cb->jobflags |= AIOCBLIST_CHECKSYNC;
scb->pending++;
}
}
TAILQ_FOREACH(cb, &ki->kaio_bufqueue, plist) {
if (cb->fd_file == scb->fd_file &&
cb->uaiocb.aio_lio_opcode != LIO_SYNC &&
cb->seqno < scb->seqno) {
cb->jobflags |= AIOCBLIST_CHECKSYNC;
scb->pending++;
}
}
if (scb->pending == 0) {
mtx_lock(&aio_job_mtx);
if (scb->jobstate == JOBST_JOBQSYNC) {
scb->jobstate = JOBST_JOBQGLOBAL;
TAILQ_REMOVE(&ki->kaio_syncqueue, scb, list);
TAILQ_INSERT_TAIL(&aio_jobs, scb, list);
aio_kick_nowait(p);
}
mtx_unlock(&aio_job_mtx);
}
PROC_UNLOCK(p);
return (0);
} }
/* kqueue attach function */ /* kqueue attach function */