Simplify lazy advance with a 64bit atomic cmpset.

This provides the potential to force a lazy (tick based) SMR to advance
when there are blocking waiters by decoupling the wr_seq value from the
ticks value.

Add some missing compiler barriers.

Reviewed by:	rlibby
Differential Revision:	https://reviews.freebsd.org/D23825
This commit is contained in:
jeff 2020-02-27 19:05:26 +00:00
parent d425254c28
commit afe550b4a3
2 changed files with 41 additions and 60 deletions

View File

@ -184,12 +184,9 @@ static uma_zone_t smr_zone;
* that will flush the store buffer or prevent access to the section protected * that will flush the store buffer or prevent access to the section protected
* data. For example, an idle processor, or an system management interrupt, * data. For example, an idle processor, or an system management interrupt,
* or a vm exit. * or a vm exit.
*
* We must wait one additional tick if we are around the wrap condition
* because the write seq will move forward by two with one interrupt.
*/ */
#define SMR_LAZY_GRACE 2 #define SMR_LAZY_GRACE 2
#define SMR_LAZY_GRACE_MAX (SMR_LAZY_GRACE + 1) #define SMR_LAZY_INCR (SMR_LAZY_GRACE * SMR_SEQ_INCR)
/* /*
* The maximum sequence number ahead of wr_seq that may still be valid. The * The maximum sequence number ahead of wr_seq that may still be valid. The
@ -197,7 +194,7 @@ static uma_zone_t smr_zone;
* case poll needs to attempt to forward the sequence number if the goal is * case poll needs to attempt to forward the sequence number if the goal is
* within wr_seq + SMR_SEQ_ADVANCE. * within wr_seq + SMR_SEQ_ADVANCE.
*/ */
#define SMR_SEQ_ADVANCE MAX(SMR_SEQ_INCR, SMR_LAZY_GRACE_MAX) #define SMR_SEQ_ADVANCE SMR_LAZY_INCR
static SYSCTL_NODE(_debug, OID_AUTO, smr, CTLFLAG_RW | CTLFLAG_MPSAFE, NULL, static SYSCTL_NODE(_debug, OID_AUTO, smr, CTLFLAG_RW | CTLFLAG_MPSAFE, NULL,
"SMR Stats"); "SMR Stats");
@ -214,66 +211,45 @@ SYSCTL_COUNTER_U64(_debug_smr, OID_AUTO, poll_fail, CTLFLAG_RW, &poll_fail, "");
/* /*
* Advance a lazy write sequence number. These move forward at the rate of * Advance a lazy write sequence number. These move forward at the rate of
* ticks. Grace is two ticks in the future. lazy write sequence numbers can * ticks. Grace is SMR_LAZY_INCR (2 ticks) in the future.
* be even but not SMR_SEQ_INVALID so we pause time for a tick when we wrap.
* *
* This returns the _current_ write sequence number. The lazy goal sequence * This returns the goal write sequence number.
* number is SMR_LAZY_GRACE ticks ahead.
*/ */
static smr_seq_t static smr_seq_t
smr_lazy_advance(smr_t smr, smr_shared_t s) smr_lazy_advance(smr_t smr, smr_shared_t s)
{ {
smr_seq_t s_rd_seq, s_wr_seq, goal; union s_wr s_wr, old;
int t; int t, d;
CRITICAL_ASSERT(curthread); CRITICAL_ASSERT(curthread);
/* /*
* Load s_wr_seq prior to ticks to ensure that the thread that * Load the stored ticks value before the current one. This way the
* observes the largest value wins. * current value can only be the same or larger.
*/
s_wr_seq = atomic_load_acq_int(&s->s_wr_seq);
/*
* We must not allow a zero tick value. We go back in time one tick
* and advance the grace period forward one tick around zero.
*/ */
old._pair = s_wr._pair = atomic_load_acq_64(&s->s_wr._pair);
t = ticks; t = ticks;
if (t == SMR_SEQ_INVALID)
t--;
/* /*
* The most probable condition that the update already took place. * The most probable condition that the update already took place.
*/ */
if (__predict_true(t == s_wr_seq)) d = t - s_wr.ticks;
if (__predict_true(d == 0))
goto out; goto out;
/* Cap the rate of advancement and handle long idle periods. */
if (d > SMR_LAZY_GRACE || d < 0)
d = SMR_LAZY_GRACE;
s_wr.ticks = t;
s_wr.seq += d * SMR_SEQ_INCR;
/* /*
* After long idle periods the read sequence may fall too far * This can only fail if another thread races to call advance().
* behind write. Prevent poll from ever seeing this condition * Strong cmpset semantics mean we are guaranteed that the update
* by updating the stale rd_seq. This assumes that there can * happened.
* be no valid section 2bn ticks old. The rd_seq update must
* be visible before wr_seq to avoid races with other advance
* callers.
*/ */
s_rd_seq = atomic_load_int(&s->s_rd_seq); atomic_cmpset_64(&s->s_wr._pair, old._pair, s_wr._pair);
if (SMR_SEQ_GT(s_rd_seq, t))
atomic_cmpset_rel_int(&s->s_rd_seq, s_rd_seq, t);
/*
* Release to synchronize with the wr_seq load above. Ignore
* cmpset failures from simultaneous updates.
*/
atomic_cmpset_rel_int(&s->s_wr_seq, s_wr_seq, t);
counter_u64_add(advance, 1);
/* If we lost either update race another thread did it. */
s_wr_seq = t;
out: out:
goal = s_wr_seq + SMR_LAZY_GRACE; return (s_wr.seq + SMR_LAZY_INCR);
/* Skip over the SMR_SEQ_INVALID tick. */
if (goal < SMR_LAZY_GRACE)
goal++;
return (goal);
} }
/* /*
@ -285,7 +261,7 @@ static smr_seq_t
smr_shared_advance(smr_shared_t s) smr_shared_advance(smr_shared_t s)
{ {
return (atomic_fetchadd_int(&s->s_wr_seq, SMR_SEQ_INCR) + SMR_SEQ_INCR); return (atomic_fetchadd_int(&s->s_wr.seq, SMR_SEQ_INCR) + SMR_SEQ_INCR);
} }
/* /*
@ -346,8 +322,8 @@ smr_deferred_advance(smr_t smr, smr_shared_t s, smr_t self)
* This function may busy loop if the readers are roughly 1 billion * This function may busy loop if the readers are roughly 1 billion
* sequence numbers behind the writers. * sequence numbers behind the writers.
* *
* Lazy SMRs will not busy loop and the wrap happens every 49.6 days * Lazy SMRs will not busy loop and the wrap happens every 25 days
* at 1khz and 119 hours at 10khz. Readers can block for no longer * at 1khz and 60 hours at 10khz. Readers can block for no longer
* than half of this for SMR_SEQ_ macros to continue working. * than half of this for SMR_SEQ_ macros to continue working.
*/ */
smr_seq_t smr_seq_t
@ -478,7 +454,7 @@ smr_poll_scan(smr_t smr, smr_shared_t s, smr_seq_t s_rd_seq,
* Advance the rd_seq as long as we observed a more recent value. * Advance the rd_seq as long as we observed a more recent value.
*/ */
s_rd_seq = atomic_load_int(&s->s_rd_seq); s_rd_seq = atomic_load_int(&s->s_rd_seq);
if (SMR_SEQ_GEQ(rd_seq, s_rd_seq)) { if (SMR_SEQ_GT(rd_seq, s_rd_seq)) {
atomic_cmpset_int(&s->s_rd_seq, s_rd_seq, rd_seq); atomic_cmpset_int(&s->s_rd_seq, s_rd_seq, rd_seq);
s_rd_seq = rd_seq; s_rd_seq = rd_seq;
} }
@ -530,7 +506,7 @@ smr_poll(smr_t smr, smr_seq_t goal, bool wait)
/* /*
* Conditionally advance the lazy write clock on any writer * Conditionally advance the lazy write clock on any writer
* activity. This may reset s_rd_seq. * activity.
*/ */
if ((flags & SMR_LAZY) != 0) if ((flags & SMR_LAZY) != 0)
smr_lazy_advance(smr, s); smr_lazy_advance(smr, s);
@ -552,7 +528,7 @@ smr_poll(smr_t smr, smr_seq_t goal, bool wait)
* wr_seq must be loaded prior to any c_seq value so that a * wr_seq must be loaded prior to any c_seq value so that a
* stale c_seq can only reference time after this wr_seq. * stale c_seq can only reference time after this wr_seq.
*/ */
s_wr_seq = atomic_load_acq_int(&s->s_wr_seq); s_wr_seq = atomic_load_acq_int(&s->s_wr.seq);
/* /*
* This is the distance from s_wr_seq to goal. Positive values * This is the distance from s_wr_seq to goal. Positive values
@ -567,7 +543,7 @@ smr_poll(smr_t smr, smr_seq_t goal, bool wait)
* smr. If we are not blocking we can not succeed but the * smr. If we are not blocking we can not succeed but the
* sequence number is valid. * sequence number is valid.
*/ */
if (delta > 0 && delta <= SMR_SEQ_MAX_ADVANCE && if (delta > 0 && delta <= SMR_SEQ_ADVANCE &&
(flags & (SMR_LAZY | SMR_DEFERRED)) != 0) { (flags & (SMR_LAZY | SMR_DEFERRED)) != 0) {
if (!wait) { if (!wait) {
success = false; success = false;
@ -617,10 +593,8 @@ smr_create(const char *name, int limit, int flags)
smr = uma_zalloc_pcpu(smr_zone, M_WAITOK); smr = uma_zalloc_pcpu(smr_zone, M_WAITOK);
s->s_name = name; s->s_name = name;
if ((flags & SMR_LAZY) == 0) s->s_rd_seq = s->s_wr.seq = SMR_SEQ_INIT;
s->s_rd_seq = s->s_wr_seq = SMR_SEQ_INIT; s->s_wr.ticks = ticks;
else
s->s_rd_seq = s->s_wr_seq = ticks;
/* Initialize all CPUS, not just those running. */ /* Initialize all CPUS, not just those running. */
for (i = 0; i <= mp_maxid; i++) { for (i = 0; i <= mp_maxid; i++) {

View File

@ -56,9 +56,16 @@
#define SMR_SEQ_INVALID 0 #define SMR_SEQ_INVALID 0
/* Shared SMR state. */ /* Shared SMR state. */
union s_wr {
struct {
smr_seq_t seq; /* Current write sequence #. */
int ticks; /* tick of last update (LAZY) */
};
uint64_t _pair;
};
struct smr_shared { struct smr_shared {
const char *s_name; /* Name for debugging/reporting. */ const char *s_name; /* Name for debugging/reporting. */
smr_seq_t s_wr_seq; /* Current write sequence #. */ union s_wr s_wr; /* Write sequence */
smr_seq_t s_rd_seq; /* Minimum observed read sequence. */ smr_seq_t s_rd_seq; /* Minimum observed read sequence. */
}; };
typedef struct smr_shared *smr_shared_t; typedef struct smr_shared *smr_shared_t;
@ -189,7 +196,7 @@ static inline smr_seq_t
smr_shared_current(smr_shared_t s) smr_shared_current(smr_shared_t s)
{ {
return (atomic_load_int(&s->s_wr_seq)); return (atomic_load_int(&s->s_wr.seq));
} }
static inline smr_seq_t static inline smr_seq_t
@ -281,7 +288,7 @@ smr_lazy_enter(smr_t smr)
* If we assign a stale wr_seq value due to interrupt we use the * If we assign a stale wr_seq value due to interrupt we use the
* same algorithm that renders smr_enter() safe. * same algorithm that renders smr_enter() safe.
*/ */
smr->c_seq = smr_shared_current(smr->c_shared); atomic_store_int(&smr->c_seq, smr_shared_current(smr->c_shared));
} }
/* /*
@ -306,7 +313,7 @@ smr_lazy_exit(smr_t smr)
* time and wait 1 tick longer. * time and wait 1 tick longer.
*/ */
atomic_thread_fence_rel(); atomic_thread_fence_rel();
smr->c_seq = SMR_SEQ_INVALID; atomic_store_int(&smr->c_seq, SMR_SEQ_INVALID);
critical_exit(); critical_exit();
} }