1. Fix race condition between umtx lock and unlock, heavy testing

on SMP can explore the bug.
2. Let umtx_wake returns number of threads have been woken.
This commit is contained in:
David Xu 2004-12-24 11:30:55 +00:00
parent 8ba85e7741
commit a08c214a72
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=139257
2 changed files with 107 additions and 137 deletions

View File

@ -74,7 +74,7 @@ struct umtx_key {
struct umtx_q {
LIST_ENTRY(umtx_q) uq_next; /* Linked list for the hash. */
struct umtx_key uq_key; /* Umtx key. */
struct thread *uq_thread; /* The thread waits on */
struct thread *uq_thread; /* The thread waits on. */
LIST_ENTRY(umtx_q) uq_rqnext; /* Linked list for requeuing. */
vm_offset_t uq_addr; /* Umtx's virtual address. */
};
@ -83,6 +83,9 @@ LIST_HEAD(umtx_head, umtx_q);
struct umtxq_chain {
struct mtx uc_lock; /* Lock for this chain. */
struct umtx_head uc_queue; /* List of sleep queues. */
#define UCF_BUSY 0x01
#define UCF_WANT 0x02
int uc_flags;
};
#define GOLDEN_RATIO_PRIME 2654404609U
@ -99,13 +102,14 @@ static int umtxq_hash(struct umtx_key *key);
static struct mtx *umtxq_mtx(int chain);
static void umtxq_lock(struct umtx_key *key);
static void umtxq_unlock(struct umtx_key *key);
static void umtxq_busy(struct umtx_key *key);
static void umtxq_unbusy(struct umtx_key *key);
static void umtxq_insert(struct umtx_q *uq);
static void umtxq_remove(struct umtx_q *uq);
static int umtxq_sleep(struct thread *td, struct umtx_key *key,
int prio, const char *wmesg, int timo);
static int umtxq_count(struct umtx_key *key);
static void umtxq_signal(struct umtx_key *key);
static void umtxq_broadcast(struct umtx_key *key);
static int umtxq_count(struct umtx_key *key);
static int umtxq_signal(struct umtx_key *key, int nr_wakeup);
#ifdef UMTX_DYNAMIC_SHARED
static void fork_handler(void *arg, struct proc *p1, struct proc *p2,
int flags);
@ -126,6 +130,7 @@ umtxq_init_chains(void *arg __unused)
mtx_init(&umtxq_chains[i].uc_lock, "umtxq_lock", NULL,
MTX_DEF | MTX_DUPOK);
LIST_INIT(&umtxq_chains[i].uc_queue);
umtxq_chains[i].uc_flags = 0;
}
#ifdef UMTX_DYNAMIC_SHARED
EVENTHANDLER_REGISTER(process_fork, fork_handler, 0, 10000);
@ -153,6 +158,34 @@ umtxq_mtx(int chain)
return (&umtxq_chains[chain].uc_lock);
}
static inline void
umtxq_busy(struct umtx_key *key)
{
int chain = umtxq_hash(key);
mtx_assert(umtxq_mtx(chain), MA_OWNED);
while (umtxq_chains[chain].uc_flags & UCF_BUSY) {
umtxq_chains[chain].uc_flags |= UCF_WANT;
msleep(&umtxq_chains[chain], umtxq_mtx(chain),
curthread->td_priority, "umtxq_busy", 0);
}
umtxq_chains[chain].uc_flags |= UCF_BUSY;
}
static inline void
umtxq_unbusy(struct umtx_key *key)
{
int chain = umtxq_hash(key);
mtx_assert(umtxq_mtx(chain), MA_OWNED);
KASSERT(umtxq_chains[chain].uc_flags & UCF_BUSY, "not busy");
umtxq_chains[chain].uc_flags &= ~UCF_BUSY;
if (umtxq_chains[chain].uc_flags & UCF_WANT) {
umtxq_chains[chain].uc_flags &= ~UCF_WANT;
wakeup(&umtxq_chains[chain]);
}
}
static inline void
umtxq_lock(struct umtx_key *key)
{
@ -176,6 +209,7 @@ umtxq_insert(struct umtx_q *uq)
struct umtx_head *head;
int chain = umtxq_hash(&uq->uq_key);
mtx_assert(umtxq_mtx(chain), MA_OWNED);
head = &umtxq_chains[chain].uc_queue;
LIST_INSERT_HEAD(head, uq, uq_next);
uq->uq_thread->td_umtxq = uq;
@ -190,6 +224,7 @@ umtxq_insert(struct umtx_q *uq)
static inline void
umtxq_remove(struct umtx_q *uq)
{
mtx_assert(umtxq_mtx(umtxq_hash(&uq->uq_key)), MA_OWNED);
if (uq->uq_thread->td_flags & TDF_UMTXQ) {
LIST_REMOVE(uq, uq_next);
uq->uq_thread->td_umtxq = NULL;
@ -208,7 +243,7 @@ umtxq_count(struct umtx_key *key)
int chain, count = 0;
chain = umtxq_hash(key);
umtxq_lock(key);
mtx_assert(umtxq_mtx(chain), MA_OWNED);
head = &umtxq_chains[chain].uc_queue;
LIST_FOREACH(uq, head, uq_next) {
if (umtx_key_match(&uq->uq_key, key)) {
@ -216,65 +251,41 @@ umtxq_count(struct umtx_key *key)
break;
}
}
umtxq_unlock(key);
return (count);
}
static void
umtxq_signal(struct umtx_key *key)
{
struct umtx_q *uq;
struct umtx_head *head;
struct thread *blocked = NULL;
int chain;
chain = umtxq_hash(key);
umtxq_lock(key);
head = &umtxq_chains[chain].uc_queue;
LIST_FOREACH(uq, head, uq_next) {
if (umtx_key_match(&uq->uq_key, key)) {
blocked = uq->uq_thread;
umtxq_remove(uq);
break;
}
}
umtxq_unlock(key);
if (blocked != NULL)
wakeup(blocked);
}
static void
umtxq_broadcast(struct umtx_key *key)
static int
umtxq_signal(struct umtx_key *key, int n_wake)
{
struct umtx_q *uq, *next;
struct umtx_head *head;
struct thread *blocked;
int chain;
struct thread *blocked = NULL;
int chain, ret;
ret = 0;
chain = umtxq_hash(key);
umtxq_lock(key);
mtx_assert(umtxq_mtx(chain), MA_OWNED);
head = &umtxq_chains[chain].uc_queue;
for (uq = LIST_FIRST(head); uq != NULL; uq = next) {
for (uq = LIST_FIRST(head); uq; uq = next) {
next = LIST_NEXT(uq, uq_next);
if (umtx_key_match(&uq->uq_key, key)) {
blocked = uq->uq_thread;
umtxq_remove(uq);
wakeup(blocked);
if (++ret >= n_wake)
break;
}
uq = next;
}
umtxq_unlock(key);
return (ret);
}
static inline int
umtxq_sleep(struct thread *td, struct umtx_key *key, int priority,
const char *wmesg, int timo)
{
int error;
int chain = umtxq_hash(key);
error = msleep(td, umtxq_mtx(chain), priority, wmesg, timo);
return (error);
return (msleep(td, umtxq_mtx(chain), priority, wmesg, timo));
}
static int
@ -344,7 +355,10 @@ umtxq_queue_me(struct thread *td, struct umtx *umtx, struct umtx_q *uq)
uq->uq_addr = (vm_offset_t)umtx;
uq->uq_thread = td;
umtxq_lock(&uq->uq_key);
/* hmm, for condition variable, we don't need busy flag. */
umtxq_busy(&uq->uq_key);
umtxq_insert(uq);
umtxq_unbusy(&uq->uq_key);
umtxq_unlock(&uq->uq_key);
return (0);
}
@ -394,17 +408,21 @@ fork_handler(void *arg, struct proc *p1, struct proc *p2, int flags)
}
umtxq_lock(&uq->uq_key);
umtxq_busy(&uq->uq_key);
if (uq->uq_thread->td_flags & TDF_UMTXQ) {
umtxq_remove(uq);
onq = 1;
} else
onq = 0;
umtxq_unbusy(&uq->uq_key);
umtxq_unlock(&uq->uq_key);
if (onq) {
vm_object_deallocate(uq->uq_key.info.shared.object);
uq->uq_key = key;
umtxq_lock(&uq->uq_key);
umtxq_busy(&uq->uq_key);
umtxq_insert(uq);
umtxq_unbusy(&uq->uq_key);
umtxq_unlock(&uq->uq_key);
vm_object_reference(uq->uq_key.info.shared.object);
}
@ -476,7 +494,9 @@ _do_lock(struct thread *td, struct umtx *umtx, long id, int timo)
/* The address was invalid. */
if (old == -1) {
umtxq_lock(&uq.uq_key);
umtxq_busy(&uq.uq_key);
umtxq_remove(&uq);
umtxq_unbusy(&uq.uq_key);
umtxq_unlock(&uq.uq_key);
umtx_key_release(&uq.uq_key);
return (EFAULT);
@ -490,18 +510,13 @@ _do_lock(struct thread *td, struct umtx *umtx, long id, int timo)
umtxq_lock(&uq.uq_key);
if (old == owner && (td->td_flags & TDF_UMTXQ)) {
error = umtxq_sleep(td, &uq.uq_key,
td->td_priority | PCATCH | PDROP,
td->td_priority | PCATCH,
"umtx", timo);
if (td->td_flags & TDF_UMTXQ) {
umtxq_lock(&uq.uq_key);
umtxq_remove(&uq);
umtxq_unlock(&uq.uq_key);
}
} else {
umtxq_remove(&uq);
umtxq_unlock(&uq.uq_key);
error = 0;
}
umtxq_busy(&uq.uq_key);
umtxq_remove(&uq);
umtxq_unbusy(&uq.uq_key);
umtxq_unlock(&uq.uq_key);
umtx_key_release(&uq.uq_key);
}
@ -546,7 +561,8 @@ do_unlock(struct thread *td, struct umtx *umtx, long id)
struct umtx_key key;
intptr_t owner;
intptr_t old;
int count, error;
int error;
int count;
/*
* Make sure we own this mtx.
@ -564,65 +580,30 @@ do_unlock(struct thread *td, struct umtx *umtx, long id)
if ((owner & UMTX_CONTESTED) == 0)
return (EINVAL);
if ((error = umtx_key_get(td, umtx, &key)) != 0)
return (error);
umtxq_lock(&key);
umtxq_busy(&key);
count = umtxq_count(&key);
umtxq_unlock(&key);
/*
* When unlocking the umtx, it must be marked as unowned if
* there is zero or one thread only waiting for it.
* Otherwise, it must be marked as contested.
*/
old = casuptr((intptr_t *)&umtx->u_owner, owner, UMTX_UNOWNED);
old = casuptr((intptr_t *)&umtx->u_owner, owner,
count <= 1 ? UMTX_UNOWNED : UMTX_CONTESTED);
umtxq_lock(&key);
umtxq_signal(&key, 0);
umtxq_unbusy(&key);
umtxq_unlock(&key);
umtx_key_release(&key);
if (old == -1)
return (EFAULT);
if (old != owner)
return (EINVAL);
if ((error = umtx_key_get(td, umtx, &key)) != 0)
return (error);
/*
* At the point, a new thread can lock the umtx before we
* reach here, so contested bit will not be set, if there
* are two or more threads on wait queue, we should set
* contensted bit for them.
*/
count = umtxq_count(&key);
if (count <= 0) {
umtx_key_release(&key);
return (0);
}
/*
* If there is second thread waiting on umtx, set contested bit,
* if they are resumed before we reach here, it is harmless,
* just a bit unefficient.
*/
if (count > 1) {
owner = UMTX_UNOWNED;
for (;;) {
old = casuptr((intptr_t *)&umtx->u_owner, owner,
owner | UMTX_CONTESTED);
if (old == owner)
break;
if (old == -1) {
umtx_key_release(&key);
return (EFAULT);
}
owner = old;
}
/*
* Another thread locked the umtx before us, so don't bother
* to wake more threads, that thread will do it when it unlocks
* the umtx.
*/
if ((owner & ~UMTX_CONTESTED) != 0) {
umtx_key_release(&key);
return (0);
}
}
/* Wake blocked thread. */
umtxq_signal(&key);
umtx_key_release(&key);
return (0);
}
@ -678,64 +659,54 @@ do_unlock_and_wait(struct thread *td, struct umtx *umtx, long id, void *uaddr,
if (td->td_flags & TDF_UMTXQ)
error = umtxq_sleep(td, &uq.uq_key,
td->td_priority | PCATCH, "ucond", 0);
umtxq_remove(&uq);
if (!(td->td_flags & TDF_UMTXQ))
error = 0;
else
umtxq_remove(&uq);
umtxq_unlock(&uq.uq_key);
if (error == ERESTART)
error = EINTR;
} else {
for (;;) {
ts1 = *abstime;
getnanotime(&ts2);
timespecsub(&ts1, &ts2);
TIMESPEC_TO_TIMEVAL(&tv, &ts1);
umtxq_lock(&uq.uq_key);
if (tv.tv_sec < 0) {
error = EWOULDBLOCK;
break;
}
timo = tvtohz(&tv);
umtxq_lock(&uq.uq_key);
if (td->td_flags & TDF_UMTXQ) {
if (td->td_flags & TDF_UMTXQ)
error = umtxq_sleep(td, &uq.uq_key,
td->td_priority | PCATCH | PDROP,
td->td_priority | PCATCH,
"ucond", timo);
if (!(td->td_flags & TDF_UMTXQ)) {
error = 0;
break;
}
if (error != 0 && error != EWOULDBLOCK) {
if (error == ERESTART)
error = EINTR;
break;
}
} else {
umtxq_unlock(&uq.uq_key);
error = 0;
if (!td->td_flags & TDF_UMTXQ)
break;
}
}
if (td->td_flags & TDF_UMTXQ) {
umtxq_lock(&uq.uq_key);
umtxq_remove(&uq);
umtxq_unlock(&uq.uq_key);
}
if (!(td->td_flags & TDF_UMTXQ))
error = 0;
else
umtxq_remove(&uq);
umtxq_unlock(&uq.uq_key);
}
umtx_key_release(&uq.uq_key);
if (error == ERESTART)
error = EINTR;
return (error);
}
static int
do_wake(struct thread *td, void *uaddr, int broadcast)
do_wake(struct thread *td, void *uaddr, int n_wake)
{
struct umtx_key key;
int error;
int ret;
if ((error = umtx_key_get(td, uaddr, &key)) != 0)
return (error);
if (!broadcast)
umtxq_signal(&key);
else
umtxq_broadcast(&key);
umtx_key_release(&key);
if ((ret = umtx_key_get(td, uaddr, &key)) != 0)
return (ret);
ret = umtxq_signal(&key, n_wake);
umtx_key_release(&key);
td->td_retval[0] = ret;
return (0);
}

View File

@ -122,11 +122,10 @@ umtx_timedwait(struct umtx *umtx, long id, void *uaddr,
/* Wake threads waiting on a user address. */
static __inline int
umtx_wake(void *uaddr, int broadcast)
umtx_wake(void *uaddr, int nr_wakeup)
{
if (_umtx_op(0, UMTX_OP_WAKE, broadcast, uaddr, 0) == -1)
return (errno);
return (0);
/* return how many threads were woke up, -1 if error */
return _umtx_op(0, UMTX_OP_WAKE, nr_wakeup, uaddr, 0);
}
#endif /* !_KERNEL */