rw: decrease writer starvation

Writers waiting on readers to finish can set the RW_LOCK_WRITE_SPINNER
bit. This prevents most new readers from coming on. However, the last
reader to unlock also clears the bit which means new readers can sneak
in and the cycle starts over.

Change the code to keep the bit after last unlock.

Note that starvation potential is still there: no matter how many write
spinners are there, there is one bit. After the writer unlocks, the lock
is free to get raided by readers again. It is good enough for the time
being.

The real fix would include counting writers.

This runs into a caveat: the writer which set the bit may now be preempted.
In order to get rid of the problem all attempts to set the bit are preceeded
with critical_enter.

The bit gets cleared when the thread which set it goes to sleep. This way
an invariant holds that if the bit is set, someone is actively spinning and
will grab the lock soon. In particular this means that readers which find
the lock in this transient state can safely spin until the lock finds itself
an owner (i.e. they don't need to block nor speculate how long to spin
speculatively).

Tested by:	pho
This commit is contained in:
Mateusz Guzik 2018-05-22 07:16:39 +00:00
parent c76af09019
commit 9feec7ef69
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=334023

View File

@ -510,25 +510,38 @@ __rw_rlock_hard(struct rwlock *rw, struct thread *td, uintptr_t v
sched_tdname(curthread), "running");
continue;
}
} else if (spintries < rowner_retries) {
spintries++;
KTR_STATE1(KTR_SCHED, "thread", sched_tdname(curthread),
"spinning", "lockname:\"%s\"",
rw->lock_object.lo_name);
for (i = 0; i < rowner_loops; i += n) {
n = RW_READERS(v);
lock_delay_spin(n);
} else {
if ((v & RW_LOCK_WRITE_SPINNER) && RW_READERS(v) == 0) {
MPASS(!__rw_can_read(td, v, false));
lock_delay_spin(2);
v = RW_READ_VALUE(rw);
if ((v & RW_LOCK_READ) == 0 || __rw_can_read(td, v, false))
break;
}
#ifdef KDTRACE_HOOKS
lda.spin_cnt += rowner_loops - i;
#endif
KTR_STATE0(KTR_SCHED, "thread", sched_tdname(curthread),
"running");
if (i < rowner_loops)
continue;
}
if (spintries < rowner_retries) {
spintries++;
KTR_STATE1(KTR_SCHED, "thread", sched_tdname(curthread),
"spinning", "lockname:\"%s\"",
rw->lock_object.lo_name);
n = RW_READERS(v);
for (i = 0; i < rowner_loops; i += n) {
lock_delay_spin(n);
v = RW_READ_VALUE(rw);
if (!(v & RW_LOCK_READ))
break;
n = RW_READERS(v);
if (n == 0)
break;
if (__rw_can_read(td, v, false))
break;
}
#ifdef KDTRACE_HOOKS
lda.spin_cnt += rowner_loops - i;
#endif
KTR_STATE0(KTR_SCHED, "thread", sched_tdname(curthread),
"running");
if (i < rowner_loops)
continue;
}
}
#endif
@ -546,7 +559,8 @@ __rw_rlock_hard(struct rwlock *rw, struct thread *td, uintptr_t v
*/
v = RW_READ_VALUE(rw);
retry_ts:
if (__rw_can_read(td, v, false)) {
if (((v & RW_LOCK_WRITE_SPINNER) && RW_READERS(v) == 0) ||
__rw_can_read(td, v, false)) {
turnstile_cancel(ts);
continue;
}
@ -726,11 +740,7 @@ __rw_runlock_try(struct rwlock *rw, struct thread *td, uintptr_t *vp)
{
for (;;) {
/*
* See if there is more than one read lock held. If so,
* just drop one and return.
*/
if (RW_READERS(*vp) > 1) {
if (RW_READERS(*vp) > 1 || !(*vp & RW_LOCK_WAITERS)) {
if (atomic_fcmpset_rel_ptr(&rw->rw_lock, vp,
*vp - RW_ONE_READER)) {
if (LOCK_LOG_TEST(&rw->lock_object, 0))
@ -743,23 +753,6 @@ __rw_runlock_try(struct rwlock *rw, struct thread *td, uintptr_t *vp)
}
continue;
}
/*
* If there aren't any waiters for a write lock, then try
* to drop it quickly.
*/
if (!(*vp & RW_LOCK_WAITERS)) {
MPASS((*vp & ~RW_LOCK_WRITE_SPINNER) ==
RW_READERS_LOCK(1));
if (atomic_fcmpset_rel_ptr(&rw->rw_lock, vp,
RW_UNLOCKED)) {
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR2(KTR_LOCK, "%s: %p last succeeded",
__func__, rw);
td->td_rw_rlocks--;
return (true);
}
continue;
}
break;
}
return (false);
@ -788,7 +781,6 @@ __rw_runlock_hard(struct rwlock *rw, struct thread *td, uintptr_t v
if (__rw_runlock_try(rw, td, &v))
break;
v &= (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
MPASS(v & RW_LOCK_WAITERS);
/*
@ -813,7 +805,7 @@ __rw_runlock_hard(struct rwlock *rw, struct thread *td, uintptr_t v
queue = TS_EXCLUSIVE_QUEUE;
setv |= (v & RW_LOCK_READ_WAITERS);
}
v |= RW_READERS_LOCK(1);
setv |= (v & RW_LOCK_WRITE_SPINNER);
if (!atomic_fcmpset_rel_ptr(&rw->rw_lock, &v, setv))
continue;
if (LOCK_LOG_TEST(&rw->lock_object, 0))
@ -872,6 +864,23 @@ _rw_runlock_cookie(volatile uintptr_t *c, const char *file, int line)
_rw_runlock_cookie_int(rw LOCK_FILE_LINE_ARG);
}
#ifdef ADAPTIVE_RWLOCKS
static inline void
rw_drop_critical(uintptr_t v, bool *in_critical, int *extra_work)
{
if (v & RW_LOCK_WRITE_SPINNER)
return;
if (*in_critical) {
critical_exit();
*in_critical = false;
(*extra_work)--;
}
}
#else
#define rw_drop_critical(v, in_critical, extra_work) do { } while (0)
#endif
/*
* This function is called when we are unable to obtain a write lock on the
* first try. This means that at least one other thread holds either a
@ -888,8 +897,9 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
int spintries = 0;
int i, n;
enum { READERS, WRITER } sleep_reason = READERS;
bool in_critical = false;
#endif
uintptr_t x;
uintptr_t setv;
#ifdef LOCK_PROFILING
uint64_t waittime = 0;
int contested = 0;
@ -906,6 +916,7 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
uintptr_t state;
int doing_lockprof = 0;
#endif
int extra_work = 0;
tid = (uintptr_t)curthread;
rw = rwlock2rw(c);
@ -916,12 +927,14 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
if (_rw_write_lock_fetch(rw, &v, tid))
goto out_lockstat;
}
extra_work = 1;
doing_lockprof = 1;
all_time -= lockstat_nsecs(&rw->lock_object);
state = v;
}
#endif
#ifdef LOCK_PROFILING
extra_work = 1;
doing_lockprof = 1;
state = v;
#endif
@ -969,12 +982,19 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
#endif
#ifdef ADAPTIVE_RWLOCKS
if (v == (RW_LOCK_READ | RW_LOCK_WRITE_SPINNER)) {
if (atomic_fcmpset_acq_ptr(&rw->rw_lock, &v, tid))
break;
continue;
}
/*
* If the lock is write locked and the owner is
* running on another CPU, spin until the owner stops
* running or the state of the lock changes.
*/
if (!(v & RW_LOCK_READ)) {
rw_drop_critical(v, &in_critical, &extra_work);
sleep_reason = WRITER;
owner = lv_rw_wowner(v);
if (!TD_IS_RUNNING(owner))
@ -998,8 +1018,16 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
if (spintries == rowner_retries)
goto ts;
if (!(v & RW_LOCK_WRITE_SPINNER)) {
if (!in_critical) {
critical_enter();
in_critical = true;
extra_work++;
}
if (!atomic_fcmpset_ptr(&rw->rw_lock, &v,
v | RW_LOCK_WRITE_SPINNER)) {
critical_exit();
in_critical = false;
extra_work--;
continue;
}
}
@ -1007,11 +1035,16 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
KTR_STATE1(KTR_SCHED, "thread", sched_tdname(curthread),
"spinning", "lockname:\"%s\"",
rw->lock_object.lo_name);
n = RW_READERS(v);
for (i = 0; i < rowner_loops; i += n) {
n = RW_READERS(v);
lock_delay_spin(n);
v = RW_READ_VALUE(rw);
if ((v & RW_LOCK_WRITE_SPINNER) == 0)
if (!(v & RW_LOCK_WRITE_SPINNER))
break;
if (!(v & RW_LOCK_READ))
break;
n = RW_READERS(v);
if (n == 0)
break;
}
#ifdef KDTRACE_HOOKS
@ -1040,10 +1073,12 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
if (owner != NULL) {
if (TD_IS_RUNNING(owner)) {
turnstile_cancel(ts);
rw_drop_critical(v, &in_critical, &extra_work);
continue;
}
} else if (RW_READERS(v) > 0 && sleep_reason == WRITER) {
turnstile_cancel(ts);
rw_drop_critical(v, &in_critical, &extra_work);
continue;
}
#endif
@ -1054,11 +1089,11 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
* If a pending waiters queue is present, claim the lock
* ownership and maintain the pending queue.
*/
x = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
if ((v & ~x) == RW_UNLOCKED) {
x &= ~RW_LOCK_WRITE_SPINNER;
if (atomic_fcmpset_acq_ptr(&rw->rw_lock, &v, tid | x)) {
if (x)
setv = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
if ((v & ~setv) == RW_UNLOCKED) {
setv &= ~RW_LOCK_WRITE_SPINNER;
if (atomic_fcmpset_acq_ptr(&rw->rw_lock, &v, tid | setv)) {
if (setv)
turnstile_claim(ts);
else
turnstile_cancel(ts);
@ -1066,19 +1101,37 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
}
goto retry_ts;
}
/*
* If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
* set it. If we fail to set it, then loop back and try
* again.
*/
if (!(v & RW_LOCK_WRITE_WAITERS)) {
if (!atomic_fcmpset_ptr(&rw->rw_lock, &v,
v | RW_LOCK_WRITE_WAITERS))
goto retry_ts;
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR2(KTR_LOCK, "%s: %p set write waiters flag",
__func__, rw);
#ifdef ADAPTIVE_RWLOCKS
if (in_critical) {
if ((v & RW_LOCK_WRITE_SPINNER) ||
!((v & RW_LOCK_WRITE_WAITERS))) {
setv = v & ~RW_LOCK_WRITE_SPINNER;
setv |= RW_LOCK_WRITE_WAITERS;
if (!atomic_fcmpset_ptr(&rw->rw_lock, &v, setv))
goto retry_ts;
}
critical_exit();
in_critical = false;
extra_work--;
} else {
#endif
/*
* If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
* set it. If we fail to set it, then loop back and try
* again.
*/
if (!(v & RW_LOCK_WRITE_WAITERS)) {
if (!atomic_fcmpset_ptr(&rw->rw_lock, &v,
v | RW_LOCK_WRITE_WAITERS))
goto retry_ts;
if (LOCK_LOG_TEST(&rw->lock_object, 0))
CTR2(KTR_LOCK, "%s: %p set write waiters flag",
__func__, rw);
}
#ifdef ADAPTIVE_RWLOCKS
}
#endif
/*
* We were unable to acquire the lock and the write waiters
* flag is set, so we must block on the turnstile.
@ -1103,6 +1156,12 @@ __rw_wlock_hard(volatile uintptr_t *c, uintptr_t v LOCK_FILE_LINE_ARG_DEF)
#endif
v = RW_READ_VALUE(rw);
}
if (__predict_true(!extra_work))
return;
#ifdef ADAPTIVE_RWLOCKS
if (in_critical)
critical_exit();
#endif
#if defined(KDTRACE_HOOKS) || defined(LOCK_PROFILING)
if (__predict_true(!doing_lockprof))
return;