Simplify the adaptive spinning algorithm in rwlock and mutex:

currently, before to spin the turnstile spinlock is acquired and the
waiters flag is set.
This is not strictly necessary, so just spin before to acquire the
spinlock and to set the flags.
This will simplify a lot other functions too, as now we have the waiters
flag set only if there are actually waiters.
This should make wakeup/sleeping couplet faster under intensive mutex
workload.
This also fixes a bug in rw_try_upgrade() in the adaptive case, where
turnstile_lookup() will recurse on the ts_lock lock that will never be
really released [1].

[1] Reported by: jeff with Nokia help
Tested by: pho, kris (earlier, bugged version of rwlock part)
Discussed with: jhb [2], jeff
MFC after: 1 week

[2] John had a similar patch about 6.x and/or 7.x about mutexes probabilly
This commit is contained in:
attilio 2007-11-26 22:37:35 +00:00
parent 315c303950
commit 6fcea2407c
2 changed files with 114 additions and 142 deletions

View File

@ -335,6 +335,31 @@ _mtx_lock_sleep(struct mtx *m, uintptr_t tid, int opts, const char *file,
m->lock_object.lo_name, (void *)m->mtx_lock, file, line);
while (!_obtain_lock(m, tid)) {
#ifdef ADAPTIVE_MUTEXES
/*
* If the owner is running on another CPU, spin until the
* owner stops running or the state of the lock changes.
*/
v = m->mtx_lock;
if (v != MTX_UNOWNED) {
owner = (struct thread *)(v & ~MTX_FLAGMASK);
#ifdef ADAPTIVE_GIANT
if (TD_IS_RUNNING(owner)) {
#else
if (m != &Giant && TD_IS_RUNNING(owner)) {
#endif
if (LOCK_LOG_TEST(&m->lock_object, 0))
CTR3(KTR_LOCK,
"%s: spinning on %p held by %p",
__func__, m, owner);
while (mtx_owner(m) == owner &&
TD_IS_RUNNING(owner))
cpu_spinwait();
continue;
}
}
#endif
ts = turnstile_trywait(&m->lock_object);
v = m->mtx_lock;
@ -350,6 +375,23 @@ _mtx_lock_sleep(struct mtx *m, uintptr_t tid, int opts, const char *file,
MPASS(v != MTX_CONTESTED);
#ifdef ADAPTIVE_MUTEXES
/*
* If the current owner of the lock is executing on another
* CPU quit the hard path and try to spin.
*/
owner = (struct thread *)(v & ~MTX_FLAGMASK);
#ifdef ADAPTIVE_GIANT
if (TD_IS_RUNNING(owner)) {
#else
if (m != &Giant && TD_IS_RUNNING(owner)) {
#endif
turnstile_cancel(ts);
cpu_spinwait();
continue;
}
#endif
/*
* If the mutex isn't already contested and a failure occurs
* setting the contested bit, the mutex was either released
@ -362,26 +404,6 @@ _mtx_lock_sleep(struct mtx *m, uintptr_t tid, int opts, const char *file,
continue;
}
#ifdef ADAPTIVE_MUTEXES
/*
* If the current owner of the lock is executing on another
* CPU, spin instead of blocking.
*/
owner = (struct thread *)(v & ~MTX_FLAGMASK);
#ifdef ADAPTIVE_GIANT
if (TD_IS_RUNNING(owner))
#else
if (m != &Giant && TD_IS_RUNNING(owner))
#endif
{
turnstile_cancel(ts);
while (mtx_owner(m) == owner && TD_IS_RUNNING(owner)) {
cpu_spinwait();
}
continue;
}
#endif /* ADAPTIVE_MUTEXES */
/*
* We definitely must sleep for this lock.
*/
@ -589,17 +611,7 @@ _mtx_unlock_sleep(struct mtx *m, int opts, const char *file, int line)
if (LOCK_LOG_TEST(&m->lock_object, opts))
CTR1(KTR_LOCK, "_mtx_unlock_sleep: %p contested", m);
#ifdef ADAPTIVE_MUTEXES
if (ts == NULL) {
_release_lock_quick(m);
if (LOCK_LOG_TEST(&m->lock_object, opts))
CTR1(KTR_LOCK, "_mtx_unlock_sleep: %p no sleepers", m);
turnstile_chain_unlock(&m->lock_object);
return;
}
#else
MPASS(ts != NULL);
#endif
turnstile_broadcast(ts, TS_EXCLUSIVE_QUEUE);
_release_lock_quick(m);
/*

View File

@ -290,6 +290,28 @@ _rw_rlock(struct rwlock *rw, const char *file, int line)
continue;
}
#ifdef ADAPTIVE_RWLOCKS
/*
* If the owner is running on another CPU, spin until
* the owner stops running or the state of the lock
* changes.
*/
owner = (struct thread *)RW_OWNER(x);
if (TD_IS_RUNNING(owner)) {
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
__func__, rw, owner);
#ifdef LOCK_PROFILING_SHARED
lock_profile_obtain_lock_failed(&rw->lock_object,
&contested, &waittime);
#endif
while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
TD_IS_RUNNING(owner))
cpu_spinwait();
continue;
}
#endif
/*
* Okay, now it's the hard case. Some other thread already
* has a write lock, so acquire the turnstile lock so we can
@ -309,6 +331,19 @@ _rw_rlock(struct rwlock *rw, const char *file, int line)
continue;
}
#ifdef ADAPTIVE_RWLOCKS
/*
* If the current owner of the lock is executing on another
* CPU quit the hard path and try to spin.
*/
owner = (struct thread *)RW_OWNER(x);
if (TD_IS_RUNNING(owner)) {
turnstile_cancel(ts);
cpu_spinwait();
continue;
}
#endif
/*
* Ok, it's still a write lock. If the RW_LOCK_READ_WAITERS
* flag is already set, then we can go ahead and block. If
@ -327,29 +362,6 @@ _rw_rlock(struct rwlock *rw, const char *file, int line)
__func__, rw);
}
#ifdef ADAPTIVE_RWLOCKS
/*
* If the owner is running on another CPU, spin until
* the owner stops running or the state of the lock
* changes.
*/
owner = (struct thread *)RW_OWNER(x);
if (TD_IS_RUNNING(owner)) {
turnstile_cancel(ts);
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
__func__, rw, owner);
#ifdef LOCK_PROFILING_SHARED
lock_profile_obtain_lock_failed(&rw->lock_object,
&contested, &waittime);
#endif
while ((struct thread*)RW_OWNER(rw->rw_lock)== owner &&
TD_IS_RUNNING(owner))
cpu_spinwait();
continue;
}
#endif
/*
* We were unable to acquire the lock and the read waiters
* flag is set, so we must block on the turnstile.
@ -532,6 +544,27 @@ _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
while (!_rw_write_lock(rw, tid)) {
#ifdef ADAPTIVE_RWLOCKS
/*
* If the lock is write locked and the owner is
* running on another CPU, spin until the owner stops
* running or the state of the lock changes.
*/
v = rw->rw_lock;
owner = (struct thread *)RW_OWNER(v);
if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
__func__, rw, owner);
lock_profile_obtain_lock_failed(&rw->lock_object,
&contested, &waittime);
while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
TD_IS_RUNNING(owner))
cpu_spinwait();
continue;
}
#endif
ts = turnstile_trywait(&rw->lock_object);
v = rw->rw_lock;
@ -545,6 +578,21 @@ _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
continue;
}
#ifdef ADAPTIVE_RWLOCKS
/*
* If the current owner of the lock is executing on another
* CPU quit the hard path and try to spin.
*/
if (!(v & RW_LOCK_READ)) {
owner = (struct thread *)RW_OWNER(v);
if (TD_IS_RUNNING(owner)) {
turnstile_cancel(ts);
cpu_spinwait();
continue;
}
}
#endif
/*
* If the lock was released by a writer with both readers
* and writers waiting and a reader hasn't woken up and
@ -586,27 +634,6 @@ _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
__func__, rw);
}
#ifdef ADAPTIVE_RWLOCKS
/*
* If the lock is write locked and the owner is
* running on another CPU, spin until the owner stops
* running or the state of the lock changes.
*/
owner = (struct thread *)RW_OWNER(v);
if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
turnstile_cancel(ts);
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
__func__, rw, owner);
lock_profile_obtain_lock_failed(&rw->lock_object,
&contested, &waittime);
while ((struct thread*)RW_OWNER(rw->rw_lock)== owner &&
TD_IS_RUNNING(owner))
cpu_spinwait();
continue;
}
#endif
/*
* We were unable to acquire the lock and the write waiters
* flag is set, so we must block on the turnstile.
@ -654,22 +681,7 @@ _rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
turnstile_chain_lock(&rw->lock_object);
ts = turnstile_lookup(&rw->lock_object);
#ifdef ADAPTIVE_RWLOCKS
/*
* There might not be a turnstile for this lock if all of
* the waiters are adaptively spinning. In that case, just
* reset the lock to the unlocked state and return.
*/
if (ts == NULL) {
atomic_store_rel_ptr(&rw->rw_lock, RW_UNLOCKED);
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR2(KTR_LOCK, "%s: %p no sleepers", __func__, rw);
turnstile_chain_unlock(&rw->lock_object);
return;
}
#else
MPASS(ts != NULL);
#endif
/*
* Use the same algo as sx locks for now. Prefer waking up shared
@ -686,42 +698,14 @@ _rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
* above. There is probably a potential priority inversion in
* there that could be worked around either by waking both queues
* of waiters or doing some complicated lock handoff gymnastics.
*
* Note that in the ADAPTIVE_RWLOCKS case, if both flags are
* set, there might not be any actual writers on the turnstile
* as they might all be spinning. In that case, we don't want
* to preserve the RW_LOCK_WRITE_WAITERS flag as the turnstile
* is going to go away once we wakeup all the readers.
*/
v = RW_UNLOCKED;
if (rw->rw_lock & RW_LOCK_READ_WAITERS) {
queue = TS_SHARED_QUEUE;
#ifdef ADAPTIVE_RWLOCKS
if (rw->rw_lock & RW_LOCK_WRITE_WAITERS &&
!turnstile_empty(ts, TS_EXCLUSIVE_QUEUE))
v |= RW_LOCK_WRITE_WAITERS;
#else
v |= (rw->rw_lock & RW_LOCK_WRITE_WAITERS);
#endif
} else
queue = TS_EXCLUSIVE_QUEUE;
#ifdef ADAPTIVE_RWLOCKS
/*
* We have to make sure that we actually have waiters to
* wakeup. If they are all spinning, then we just need to
* disown the turnstile and return.
*/
if (turnstile_empty(ts, queue)) {
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR2(KTR_LOCK, "%s: %p no sleepers 2", __func__, rw);
atomic_store_rel_ptr(&rw->rw_lock, v);
turnstile_disown(ts);
turnstile_chain_unlock(&rw->lock_object);
return;
}
#endif
/* Wake up all waiters for the specific queue. */
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
@ -772,19 +756,12 @@ _rw_try_upgrade(struct rwlock *rw, const char *file, int line)
* Try to switch from one reader to a writer again. This time
* we honor the current state of the RW_LOCK_WRITE_WAITERS
* flag. If we obtain the lock with the flag set, then claim
* ownership of the turnstile. In the ADAPTIVE_RWLOCKS case
* it is possible for there to not be an associated turnstile
* even though there are waiters if all of the waiters are
* spinning.
* ownership of the turnstile.
*/
v = rw->rw_lock & RW_LOCK_WRITE_WAITERS;
success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
tid | v);
#ifdef ADAPTIVE_RWLOCKS
if (success && v && turnstile_lookup(&rw->lock_object) != NULL)
#else
if (success && v)
#endif
turnstile_claim(ts);
else
turnstile_cancel(ts);
@ -837,26 +814,9 @@ _rw_downgrade(struct rwlock *rw, const char *file, int line)
* Downgrade from a write lock while preserving
* RW_LOCK_WRITE_WAITERS and give up ownership of the
* turnstile. If there are any read waiters, wake them up.
*
* For ADAPTIVE_RWLOCKS, we have to allow for the fact that
* all of the read waiters might be spinning. In that case,
* act as if RW_LOCK_READ_WAITERS is not set. Also, only
* preserve the RW_LOCK_WRITE_WAITERS flag if at least one
* writer is blocked on the turnstile.
*/
ts = turnstile_lookup(&rw->lock_object);
#ifdef ADAPTIVE_RWLOCKS
if (ts == NULL)
v &= ~(RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS);
else if (v & RW_LOCK_READ_WAITERS &&
turnstile_empty(ts, TS_SHARED_QUEUE))
v &= ~RW_LOCK_READ_WAITERS;
else if (v & RW_LOCK_WRITE_WAITERS &&
turnstile_empty(ts, TS_EXCLUSIVE_QUEUE))
v &= ~RW_LOCK_WRITE_WAITERS;
#else
MPASS(ts != NULL);
#endif
if (v & RW_LOCK_READ_WAITERS)
turnstile_broadcast(ts, TS_SHARED_QUEUE);
atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) |