Use a dedicated mutex to protect aio queues, the movation is to reduce

lock contention with other parts.
This commit is contained in:
David Xu 2006-05-09 00:10:11 +00:00
parent 7a1877d4c6
commit 759ccccadb

View File

@ -197,7 +197,7 @@ typedef struct oaiocb {
* aioliojob and kaioinfo and any backends.
*
* * - need not protected
* a - locked by proc mtx
* a - locked by kaioinfo lock
* b - locked by backend lock, the backend lock can be null in some cases,
* for example, BIO belongs to this type, in this case, proc lock is
* reused.
@ -278,6 +278,7 @@ struct aioliojob {
* per process aio data structure
*/
struct kaioinfo {
struct mtx kaio_mtx; /* the lock to protect this struct */
int kaio_flags; /* (a) per process kaio flags */
int kaio_maxactive_count; /* (*) maximum number of AIOs */
int kaio_active_count; /* (c) number of currently used AIOs */
@ -297,6 +298,11 @@ struct kaioinfo {
struct task kaio_task; /* (*) task to kick aio threads */
};
#define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx)
#define AIO_UNLOCK(ki) mtx_unlock(&(ki)->kaio_mtx)
#define AIO_LOCK_ASSERT(ki, f) mtx_assert(&(ki)->kaio_mtx, (f))
#define AIO_MTX(ki) (&(ki)->kaio_mtx)
#define KAIO_RUNDOWN 0x1 /* process is being run down */
#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
@ -497,6 +503,7 @@ aio_init_aioinfo(struct proc *p)
struct kaioinfo *ki;
ki = uma_zalloc(kaio_zone, M_WAITOK);
mtx_init(&ki->kaio_mtx, "aiomtx", NULL, MTX_DEF);
ki->kaio_flags = 0;
ki->kaio_maxactive_count = max_aio_per_proc;
ki->kaio_active_count = 0;
@ -518,6 +525,7 @@ aio_init_aioinfo(struct proc *p)
PROC_UNLOCK(p);
} else {
PROC_UNLOCK(p);
mtx_destroy(&ki->kaio_mtx);
uma_zfree(kaio_zone, ki);
}
@ -528,13 +536,16 @@ aio_init_aioinfo(struct proc *p)
static int
aio_sendsig(struct proc *p, struct sigevent *sigev, ksiginfo_t *ksi)
{
PROC_LOCK_ASSERT(p, MA_OWNED);
int ret = 0;
PROC_LOCK(p);
if (!KSI_ONQ(ksi)) {
ksi->ksi_code = SI_ASYNCIO;
ksi->ksi_flags |= KSI_EXT | KSI_INS;
return (psignal_event(p, sigev, ksi));
ret = psignal_event(p, sigev, ksi);
}
return (0);
PROC_UNLOCK(p);
return (ret);
}
/*
@ -550,14 +561,13 @@ aio_free_entry(struct aiocblist *aiocbe)
struct proc *p;
p = aiocbe->userproc;
PROC_LOCK_ASSERT(p, MA_OWNED);
MPASS(curproc == p);
MPASS(aiocbe->jobstate == JOBST_JOBFINISHED);
ki = p->p_aioinfo;
MPASS(ki != NULL);
AIO_LOCK_ASSERT(ki, MA_OWNED);
MPASS(aiocbe->jobstate == JOBST_JOBFINISHED);
atomic_subtract_int(&num_queue_count, 1);
ki->kaio_count--;
@ -575,18 +585,22 @@ aio_free_entry(struct aiocblist *aiocbe)
TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
/* lio is going away, we need to destroy any knotes */
knlist_delete(&lj->klist, curthread, 1);
PROC_LOCK(p);
sigqueue_take(&lj->lioj_ksi);
PROC_UNLOCK(p);
uma_zfree(aiolio_zone, lj);
}
}
/* aiocbe is going away, we need to destroy any knotes */
knlist_delete(&aiocbe->klist, curthread, 1);
PROC_LOCK(p);
sigqueue_take(&aiocbe->ksi);
PROC_UNLOCK(p);
MPASS(aiocbe->bp == NULL);
aiocbe->jobstate = JOBST_NULL;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
/*
* The thread argument here is used to find the owning process
@ -611,7 +625,7 @@ aio_free_entry(struct aiocblist *aiocbe)
fdrop(aiocbe->fd_file, curthread);
crfree(aiocbe->cred);
uma_zfree(aiocb_zone, aiocbe);
PROC_LOCK(p);
AIO_LOCK(ki);
return (0);
}
@ -635,7 +649,7 @@ aio_proc_rundown(void *arg, struct proc *p)
if (ki == NULL)
return;
PROC_LOCK(p);
AIO_LOCK(ki);
ki->kaio_flags |= KAIO_RUNDOWN;
restart:
@ -675,7 +689,7 @@ restart:
if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
TAILQ_FIRST(&ki->kaio_jobqueue)) {
ki->kaio_flags |= KAIO_WAKEUP;
msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz);
msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz);
goto restart;
}
@ -687,14 +701,16 @@ restart:
if (lj->lioj_count == 0) {
TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
knlist_delete(&lj->klist, curthread, 1);
PROC_LOCK(p);
sigqueue_take(&lj->lioj_ksi);
PROC_UNLOCK(p);
uma_zfree(aiolio_zone, lj);
} else {
panic("LIO job not cleaned up: C:%d, FC:%d\n",
lj->lioj_count, lj->lioj_finished_count);
}
}
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
taskqueue_drain(taskqueue_aiod_bio, &ki->kaio_task);
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
@ -861,8 +877,8 @@ aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type)
struct aiocblist *scb, *scbn;
int lj_done;
PROC_LOCK_ASSERT(userp, MA_OWNED);
ki = userp->p_aioinfo;
AIO_LOCK_ASSERT(ki, MA_OWNED);
lj = aiocbe->lio;
lj_done = 0;
if (lj) {
@ -1034,10 +1050,10 @@ aio_daemon(void *_id)
ki->kaio_active_count--;
mtx_unlock(&aio_job_mtx);
PROC_LOCK(userp);
AIO_LOCK(ki);
TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
aio_bio_done_notify(userp, aiocbe, DONE_QUEUE);
PROC_UNLOCK(userp);
AIO_UNLOCK(ki);
mtx_lock(&aio_job_mtx);
}
@ -1203,13 +1219,13 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
bp = (struct buf *)getpbuf(NULL);
BUF_KERNPROC(bp);
PROC_LOCK(p);
AIO_LOCK(ki);
ki->kaio_count++;
ki->kaio_buffer_count++;
lj = aiocbe->lio;
if (lj)
lj->lioj_count++;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
/*
* Get a copy of the kva from the physical buffer.
@ -1234,14 +1250,14 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
goto doerror;
}
PROC_LOCK(p);
AIO_LOCK(ki);
aiocbe->bp = bp;
bp->b_caller1 = (void *)aiocbe;
TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
aiocbe->jobstate = JOBST_JOBQBUF;
cb->_aiocb_private.status = cb->aio_nbytes;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
atomic_add_int(&num_queue_count, 1);
atomic_add_int(&num_buf_aio, 1);
@ -1255,13 +1271,13 @@ aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
return (0);
doerror:
PROC_LOCK(p);
AIO_LOCK(ki);
ki->kaio_count--;
ki->kaio_buffer_count--;
if (lj)
lj->lioj_count--;
aiocbe->bp = NULL;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
relpbuf(bp, NULL);
return (error);
}
@ -1341,7 +1357,7 @@ aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO);
aiocbe->inputcharge = 0;
aiocbe->outputcharge = 0;
knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL);
knlist_init(&aiocbe->klist, AIO_MTX(ki), NULL, NULL, NULL);
if (oldsigev) {
bzero(&aiocbe->uaiocb, sizeof(struct aiocb));
@ -1499,14 +1515,14 @@ no_kqueue:
TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
mtx_unlock(&aio_job_mtx);
PROC_LOCK(p);
AIO_LOCK(ki);
TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
aiocbe->jobstate = JOBST_JOBQSOCK;
ki->kaio_count++;
if (lj)
lj->lioj_count++;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
SOCKBUF_UNLOCK(sb);
atomic_add_int(&num_queue_count, 1);
error = 0;
@ -1529,7 +1545,7 @@ queueit:
aiocbe->bp = NULL;
atomic_add_int(&num_queue_count, 1);
PROC_LOCK(p);
AIO_LOCK(ki);
ki->kaio_count++;
if (lj)
lj->lioj_count++;
@ -1555,7 +1571,7 @@ queueit:
if (aiocbe->pending != 0) {
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, aiocbe, list);
aiocbe->jobstate = JOBST_JOBQSYNC;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
goto done;
}
}
@ -1564,7 +1580,7 @@ queueit:
aiocbe->jobstate = JOBST_JOBQGLOBAL;
aio_kick_nowait(p);
mtx_unlock(&aio_job_mtx);
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
error = 0;
done:
return (error);
@ -1648,7 +1664,7 @@ aio_return(struct thread *td, struct aio_return_args *uap)
if (ki == NULL)
return (EINVAL);
uaiocb = uap->aiocbp;
PROC_LOCK(p);
AIO_LOCK(ki);
TAILQ_FOREACH(cb, &ki->kaio_done, plist) {
if (cb->uuaiocb == uaiocb)
break;
@ -1667,12 +1683,12 @@ aio_return(struct thread *td, struct aio_return_args *uap)
cb->inputcharge = 0;
}
aio_free_entry(cb);
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
suword(&uaiocb->_aiocb_private.error, error);
suword(&uaiocb->_aiocb_private.status, status);
} else {
error = EINVAL;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
}
return (error);
}
@ -1734,7 +1750,7 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
return (0);
}
PROC_LOCK(p);
AIO_LOCK(ki);
for (;;) {
cbfirst = NULL;
error = 0;
@ -1753,7 +1769,7 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
break;
ki->kaio_flags |= KAIO_WAKEUP;
error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
error = msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO | PCATCH,
"aiospn", timo);
if (error == ERESTART)
error = EINTR;
@ -1761,7 +1777,7 @@ aio_suspend(struct thread *td, struct aio_suspend_args *uap)
break;
}
RETURN:
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
uma_zfree(aiol_zone, ujoblist);
return (error);
}
@ -1802,7 +1818,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
}
}
PROC_LOCK(p);
AIO_LOCK(ki);
TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
if ((uap->fd == cbe->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
@ -1837,7 +1853,7 @@ aio_cancel(struct thread *td, struct aio_cancel_args *uap)
break;
}
}
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
done:
fdrop(fp, td);
@ -1883,7 +1899,7 @@ aio_error(struct thread *td, struct aio_error_args *uap)
return (0);
}
PROC_LOCK(p);
AIO_LOCK(ki);
TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
if (cb->uuaiocb == uap->aiocbp) {
if (cb->jobstate == JOBST_JOBFINISHED)
@ -1891,11 +1907,11 @@ aio_error(struct thread *td, struct aio_error_args *uap)
cb->uaiocb._aiocb_private.error;
else
td->td_retval[0] = EINPROGRESS;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
return (0);
}
}
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
/*
* Hack for failure of aio_aqueue.
@ -1985,7 +2001,7 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
lj->lioj_flags = 0;
lj->lioj_count = 0;
lj->lioj_finished_count = 0;
knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL);
knlist_init(&lj->klist, AIO_MTX(ki), NULL, NULL, NULL);
ksiginfo_init(&lj->lioj_ksi);
/*
@ -2042,7 +2058,7 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
}
}
PROC_LOCK(p);
AIO_LOCK(ki);
TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
/*
* Add extra aiocb count to avoid the lio to be freed
@ -2051,7 +2067,7 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
* all tasks.
*/
lj->lioj_count = 1;
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
/*
* Get pointers to the list of I/O requests.
@ -2068,11 +2084,11 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
}
error = 0;
PROC_LOCK(p);
AIO_LOCK(ki);
if (uap->mode == LIO_WAIT) {
while (lj->lioj_count - 1 != lj->lioj_finished_count) {
ki->kaio_flags |= KAIO_WAKEUP;
error = msleep(&p->p_aioinfo, &p->p_mtx,
error = msleep(&p->p_aioinfo, AIO_MTX(ki),
PRIBIO | PCATCH, "aiospn", 0);
if (error == ERESTART)
error = EINTR;
@ -2099,11 +2115,13 @@ do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
if (lj->lioj_count == 0) {
TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
knlist_delete(&lj->klist, curthread, 1);
PROC_LOCK(p);
sigqueue_take(&lj->lioj_ksi);
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
uma_zfree(aiolio_zone, lj);
} else
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
if (nerror)
return (EIO);
@ -2138,7 +2156,7 @@ biohelper(void *context, int pending)
bp = aiocbe->bp;
userp = aiocbe->userproc;
ki = userp->p_aioinfo;
PROC_LOCK(userp);
AIO_LOCK(ki);
aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
aiocbe->uaiocb._aiocb_private.error = 0;
if (bp->b_ioflags & BIO_ERROR)
@ -2152,7 +2170,7 @@ biohelper(void *context, int pending)
TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist);
ki->kaio_buffer_count--;
aio_bio_done_notify(userp, aiocbe, DONE_BUF);
PROC_UNLOCK(userp);
AIO_UNLOCK(ki);
/* Release mapping into kernel space. */
vunmapbuf(bp);
@ -2196,10 +2214,10 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
error = 0;
cb = NULL;
PROC_LOCK(p);
AIO_LOCK(ki);
while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) {
ki->kaio_flags |= KAIO_WAKEUP;
error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
error = msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO | PCATCH,
"aiowc", timo);
if (timo && error == ERESTART)
error = EINTR;
@ -2221,12 +2239,12 @@ aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
cb->inputcharge = 0;
}
aio_free_entry(cb);
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
suword(uap->aiocbp, (long)uuaiocb);
suword(&uuaiocb->_aiocb_private.error, error);
suword(&uuaiocb->_aiocb_private.status, status);
} else
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
return (error);
}