optimized work stealing. IOCTL dump. cache_penalty and ws_int SYSCTL and SYSINIT.

This commit is contained in:
Oscar Zhao 2019-09-03 21:01:51 -04:00
parent 21aa3325fc
commit d74786ec34
7 changed files with 461 additions and 184 deletions

View File

@ -88,6 +88,45 @@ __FBSDID("$FreeBSD$");
static MALLOC_DEFINE(M_KQUEUE, "kqueue", "memory for kqueue system");
/* sysctl for best of 2 latency penalty */
static uint32_t cache_pen = 10000;
SYSCTL_U32(_kern, OID_AUTO, kq_cache_pen, CTLFLAG_RW, &cache_pen, 0, "KQueue cache miss's penalty in cycles.");
/* sysctl for ws_int_sbt */
static sbintime_t ws_int_sbt = 0;
SYSCTL_U64(_kern, OID_AUTO, kq_ws_int_sbt, CTLFLAG_RD, &ws_int_sbt, 0, "KQueue work stealing interval in sbintime.");
/* sysctl for ws_int */
static uint32_t ws_int = 1000;
static inline void
update_ws_int_sbt()
{
ws_int_sbt = nstosbt(1000 * ws_int);
}
static inline int
sysctl_ws_int(SYSCTL_HANDLER_ARGS)
{
uint32_t new_int;
new_int = ws_int;
int error = sysctl_handle_int(oidp, &new_int, sizeof(uint32_t), req);
if (error || req->newptr == NULL) {
return error;
}
ws_int = new_int;
update_ws_int_sbt();
return error;
}
SYSCTL_PROC(_kern, OID_AUTO, kq_ws_int, CTLTYPE_U32 | CTLFLAG_RW, 0, 0, sysctl_ws_int, "IU", "KQueue work stealing interval in microseconds.");
#define KQDOM_FLAGS (KQ_SCHED_CPU | KQ_SCHED_QUEUE)
#define KEVQ_LAT_FLAGS ((uint64_t)-1) //(KQ_SCHED_CPU | KQ_SCHED_QUEUE | KQ_SCHED_BEST)
#define DUMP_INDENT (4)
@ -110,7 +149,6 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF);
TASKQUEUE_DEFINE_THREAD(kqueue_ctx);
// TODO: only use it in SMP
extern struct cpu_group *cpu_top;
static inline uint64_t
@ -208,7 +246,6 @@ static struct kevq * kqdom_random_kevq_locked(struct kqdom *kqd);
static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id);
static struct kqdom * kqdom_build(void);
static struct kqdom * kqdom_find(struct kqdom *root, int cpuid);
static void kqdom_dump(struct kqdom *kqd, int level, struct sbuf *buf);
static struct kevq *kevq_vec_select_kevq(struct veclist *lst, int num_rand);
@ -278,12 +315,14 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW,
#define KN_FLUX_LOCK(kn) do { \
mtx_lock(&(kn)->kn_fluxlock); \
} while (0)
#define KN_FLUX_TRYLOCK(kn) (mtx_trylock(&(kn)->kn_fluxlock))
#define KEVQ_TH_LOCK(kevqth) do { \
mtx_lock(&(kevqth)->lock); \
} while (0)
#define KEVQ_LOCK(kevq) do { \
mtx_lock(&(kevq)->lock); \
} while (0)
#define KEVQ_TRYLOCK(kevq) (mtx_trylock(&(kevq)->lock))
#define KQ_UNLOCK(kq) do { \
mtx_unlock(&(kq)->kq_lock); \
} while (0)
@ -370,22 +409,28 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW,
#define KQSCHED_GET_FARGS(kq) (kq->kq_sfargs)
#define KQSCHED_GET_FLAGS(kq) (kq->kq_sflags)
static inline int
static inline long
kevq_exp_lat(struct kevq *kevq)
{
return kevq->kevq_avg_lat * kevq->kn_count + kevq->kevq_last_kev - get_cyclecount();
}
static inline long
kevq_lat_cmp(struct kevq *kevq1, struct kevq *kevq2)
{
if (kevq1 == kevq2)
return 0;
return kevq1->kevq_avg_lat * kevq1->kn_count - kevq2->kevq_avg_lat * kevq2->kn_count;
return kevq_exp_lat(kevq1) - kevq_exp_lat(kevq2);
}
static inline int
static inline long
kevq_lat_wcmp(struct kevq *kevq1, struct kevq *kevq2, int pct1)
{
if (kevq1 == kevq2)
return 0;
return kevq1->kevq_avg_lat * kevq1->kn_count * pct1 / 100 - kevq2->kevq_avg_lat * kevq2->kn_count;
return kevq_exp_lat(kevq1) - (cache_pen + kevq_exp_lat(kevq2));
}
static inline int
@ -1451,6 +1496,13 @@ kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq)
return kevq_found;
}
static void
kqueue_sysinit(const void* dummy)
{
update_ws_int_sbt();
}
SYSINIT(KQUEUE, SI_SUB_KQUEUE, SI_ORDER_ANY, kqueue_sysinit, NULL);
static int
kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchanges, int nevents,
@ -1489,9 +1541,6 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan
}
CTR3(KTR_KQ, "kevent: td %d nkev %d kevent (new avg) %ld ns", td->td_tid, kevq->kevq_last_nkev, kevq->kevq_avg_lat);
/* clear parameters */
kevq->kevq_last_kev = 0;
kevq->kevq_last_nkev = 0;
//kqdom_update_lat(kevq->kevq_kqd, avg);
}
@ -2340,18 +2389,17 @@ kqdom_destroy(struct kqdom *root)
}
static void
kevq_dump(struct kevq *kevq, int level, struct sbuf *buf)
kevq_dump(struct sbuf *buf, struct kevq *kevq, int level)
{
int error;
error = sbuf_printf(buf, "%*c<kevq: %p knotes: %d "
sbuf_printf(buf, "%*c<kevq: %p knotes: %d "
"total_time: %ld "
"total_syscall: %ld "
"total_events: %ld "
"avg_latency: %ld "
"avg_events: %ld "
"total_fallbacks: %ld "
"total_mismatches: %ld/>\n",
"total_mismatches: %ld "
"total_worksteal: %ld/>\n",
level * DUMP_INDENT, ' ', kevq, kevq->kn_count,
kevq->kevq_tot_time,
kevq->kevq_tot_syscall,
@ -2359,18 +2407,16 @@ kevq_dump(struct kevq *kevq, int level, struct sbuf *buf)
kevq->kevq_avg_lat,
kevq->kevq_avg_ev,
kevq->kevq_tot_fallback,
kevq->kevq_tot_kqd_mismatch);
KASSERT(error == 0, ("error writing sbuf"));
kevq->kevq_tot_kqd_mismatch,
kevq->kevq_tot_ws);
}
static void
kqdom_dump(struct kqdom *kqd, int level, struct sbuf *buf)
kqdom_dump(struct sbuf *buf, struct kqdom *kqd, int level)
{
/* XXX: No potential race between this and kqdom_build() for now.
* If we move kqdom_build() out of kqueue() syscall then there is a potential race */
int error;
error = sbuf_printf(buf, "%*c<kqdom id: %d level: %d cpu_mask:0x%lx #children: %d #active: %d leaf: %d #kevq: %d>\n", level * DUMP_INDENT, ' ',
sbuf_printf(buf, "%*c<kqdom id: %d level: %d cpu_mask:0x%lx #children: %d #active: %d leaf: %d #kevq: %d>\n", level * DUMP_INDENT, ' ',
kqd->id,
level,
kqd->cpu_mask.__bits[0],
@ -2378,22 +2424,20 @@ kqdom_dump(struct kqdom *kqd, int level, struct sbuf *buf)
veclist_size(&kqd->kqd_activelist),
kqdom_is_leaf(kqd),
veclist_size(&kqd->kqd_kevqs));
KASSERT(error == 0, ("error writing sbuf"));
if (kqdom_is_leaf(kqd)) {
KQD_RLOCK(kqd);
/* print all kevqs */
for (int i = 0; i < veclist_size(&kqd->kqd_kevqs); i++) {
kevq_dump(veclist_at(&kqd->kqd_kevqs, i), level + 1, buf);
kevq_dump(buf, veclist_at(&kqd->kqd_kevqs, i), level + 1);
}
KQD_RUNLOCK(kqd);
} else {
for(int i = 0; i < veclist_size(&kqd->children); i++) {
kqdom_dump(veclist_at(&kqd->children, i), level + 1, buf);
kqdom_dump(buf, veclist_at(&kqd->children, i), level + 1);
}
}
error = sbuf_printf(buf, "%*c</kqdom>\n", level * DUMP_INDENT, ' ');
KASSERT(error == 0, ("error writing sbuf"));
sbuf_printf(buf, "%*c</kqdom>\n", level * DUMP_INDENT, ' ');
}
@ -2641,48 +2685,81 @@ kqueue_task(void *arg, int pending)
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
}
static inline int
knote_stealable(struct knote *kn)
{
return (kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_PROCESSING | KN_WS | KN_MARKER)) == KN_ACTIVE;
}
static inline int
kevq_stealable(struct kevq* kevq)
{
//CTR3(KTR_KQ, "kevq_stealable: AVAIL: %d, kn_cnt: %d, WS: %d", kevq_avail(kevq), kevq->kn_count, kevq->kevq_state & KEVQ_WS);
return kevq_avail(kevq) && kevq->kn_count > 0 && !(kevq->kevq_state & KEVQ_WS);
}
static void
kevq_worksteal(struct kevq *kevq)
{
struct kevq *other_kevq;
struct kqueue *kq;
struct knote *ws_kn, *next_kn;
struct knlist *knl;
int ws_count, valid, tgt_count;
struct knote *ws_kn;
//struct knlist *knl;
struct knote *ws_lst[8];
int ws_count;
int tgt_count;
KEVQ_OWNED(kevq);
kevq->kevq_state |= KEVQ_WS;
KEVQ_UNLOCK(kevq);
kq = kevq->kq;
ws_count = 0;
tgt_count = KQSCHED_GET_FARGS(kq);
TAILQ_HEAD(, knote) kn_wsq;
TAILQ_INIT(&kn_wsq);
/* XXX: hack */
KASSERT(tgt_count < 8, ("too many kevq ws knotes"));
KVLST_RLOCK(kq);
other_kevq = kevq_vec_select_kevq(&kq->kevq_vlist, 1);
other_kevq = kevq_lock_check_avail(other_kevq);
/* fast fail */
if (other_kevq != kevq && kevq_stealable(other_kevq)) {
if (KEVQ_TRYLOCK(other_kevq)) {
if (!kevq_stealable(other_kevq)) {
KEVQ_UNLOCK(other_kevq);
other_kevq = NULL;
}
} else {
other_kevq = NULL;
}
} else {
other_kevq = NULL;
}
KVLST_RUNLOCK(kq);
CTR2(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p", kevq, other_kevq);
//CTR2(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p", kevq, other_kevq);
if (other_kevq != NULL && other_kevq != kevq && other_kevq->kn_count > 0) {
if (other_kevq != NULL) {
KEVQ_OWNED(other_kevq);
/* steal from the first because it arrived late */
ws_kn = TAILQ_FIRST(&other_kevq->kn_head);
while(ws_count < tgt_count && ws_kn != NULL) {
KEVQ_OWNED(other_kevq);
next_kn = TAILQ_NEXT(ws_kn, kn_tqe);
/* TODO: maybe limit the number of knotes to go through */
while((ws_count < tgt_count) && (ws_kn != NULL)) {
/* don't care about markers */
if ((ws_kn->kn_status & KN_MARKER) != 0) {
/* fast fail */
if (!knote_stealable(ws_kn)) {
goto end_loop;
}
KN_FLUX_LOCK(ws_kn);
if (!KN_FLUX_TRYLOCK(ws_kn)) {
goto end_loop;
}
KN_FLUX_OWNED(ws_kn);
/* ignore influx, inactive and disabled */
if (kn_in_flux(ws_kn) || (ws_kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_REQUEUE)) != (KN_ACTIVE)) {
if (kn_in_flux(ws_kn) || !knote_stealable(ws_kn)) {
KN_FLUX_UNLOCK(ws_kn);
goto end_loop;
}
@ -2692,41 +2769,37 @@ kevq_worksteal(struct kevq *kevq)
/* Remove from the old kevq first, due to lock order */
knote_dequeue(ws_kn);
KEVQ_UNLOCK(other_kevq);
ws_kn->kn_status |= KN_WS;
/* validate event */
knl = kn_list_lock(ws_kn);
valid = ws_kn->kn_fop->f_event(ws_kn, 0);
kn_list_unlock(knl);
//knl = kn_list_lock(ws_kn);
//valid = ws_kn->kn_fop->f_event(ws_kn, 0);
//kn_list_unlock(knl);
if (valid) {
TAILQ_INSERT_TAIL(&kn_wsq, ws_kn, kn_wse);
//if (valid) {
//TAILQ_INSERT_TAIL(&kn_wsq, ws_kn, kn_wse);
ws_lst[ws_count] = ws_kn;
ws_count++;
}
//}
KEVQ_LOCK(other_kevq);
if (!valid) {
/* if not valid, return it to the previous queue */
knote_enqueue(ws_kn, other_kevq);
KN_LEAVE_FLUX_WAKEUP(ws_kn);
}
// if (!valid) {
// /* if not valid, return it to the previous queue */
// knote_enqueue(ws_kn, other_kevq);
// KN_LEAVE_FLUX_WAKEUP(ws_kn);
// }
end_loop:
ws_kn = next_kn;
}
ws_kn = TAILQ_NEXT(ws_kn, kn_tqe);
}
if (other_kevq != NULL) {
KEVQ_UNLOCK(other_kevq);
}
KEVQ_LOCK(kevq);
while(!TAILQ_EMPTY(&kn_wsq)) {
ws_kn = TAILQ_FIRST(&kn_wsq);
TAILQ_REMOVE(&kn_wsq, ws_kn, kn_wse);
knote_enqueue(ws_kn, kevq);
KN_LEAVE_FLUX_WAKEUP(ws_kn);
CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_kn, ws_kn->kn_id, other_kevq);
kevq->kevq_state &= ~KEVQ_WS;
kevq->kevq_tot_ws += ws_count;
for (int i = 0; i < ws_count; i++) {
knote_enqueue(ws_lst[i], kevq);
KN_LEAVE_FLUX_WAKEUP(ws_lst[i]);
//CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_lst[i], ws_lst[i]->kn_id, other_kevq);
}
}
@ -2782,14 +2855,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
struct kevent *kevp;
struct knote *kn, *marker;
struct knlist *knl;
sbintime_t asbt, rsbt;
int count, error, haskqglobal, influx, nkev, touch;
sbintime_t asbt, rsbt, fsbt;
int wait;
int count, error, haskqglobal, influx, nkev, touch, fevent;
wait = 0;
kq = kevq->kq;
count = maxevents;
nkev = 0;
error = 0;
haskqglobal = 0;
kn = NULL;
if (maxevents == 0)
goto done_nl;
@ -2804,19 +2880,22 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (timespecisset(tsp)) {
if (tsp->tv_sec <= INT32_MAX) {
rsbt = tstosbt(*tsp);
if (TIMESEL(&asbt, rsbt))
asbt += tc_tick_sbt;
if (asbt <= SBT_MAX - rsbt)
asbt += rsbt;
if(TIMESEL(&fsbt, rsbt))
fsbt += tc_tick_sbt;
if (fsbt <= SBT_MAX - rsbt)
fsbt += rsbt;
else
asbt = 0;
rsbt >>= tc_precexp;
fsbt = 0; /* wait indefinitely */
} else
asbt = 0;
fsbt = 0;
} else
asbt = -1;
fsbt = -1; /* return immediately */
} else
asbt = 0;
fsbt = 0; /* wait indefinitely */
asbt = fsbt;
if ((kevq->kevq_state & KEVQ_ACTIVE) == 0) {
/* activate kq if not already activated */
@ -2829,32 +2908,91 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
retry:
kevp = keva;
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
KEVQ_OWNED(kevq);
if ((KQSCHED_GET_FLAGS(kq) == KQ_SCHED_FLAG_WS) && kevq->kn_count == 0) {
/* try work stealing */
if (kevq->kn_count == 0 && (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS)) {
kevq_worksteal(kevq);
}
KEVQ_OWNED(kevq);
kevp = keva;
//CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
if (kevq->kn_count == 0) {
if (asbt == -1) {
if (fsbt == -1) {
error = EWOULDBLOCK;
} else {
kevq->kevq_state |= KEVQ_SLEEP;
CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq);
error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH,
"kqread", asbt, rsbt, C_ABSOLUTE);
CTR2(KTR_KQ, "kqueue_scan: td %d wokeup on kevq %p for events", td->td_tid, kevq);
if (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS) {
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target wait sbt: %ld", td->td_tid, kevq, fsbt);
//printf("kqueue_scan: td %d, kevq %p target wait sbt: %ld\n", td->td_tid, kevq, fsbt);
/* calculate rsbt */
if (fsbt == 0) {
/* if wait indefinitely, sleep for ws_interval */
rsbt = ws_int_sbt;
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p indefinite wait...", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p indefinite wait...\n", td->td_tid, kevq);
} else {
/* get the current asbt */
if (TIMESEL(&asbt, ws_int_sbt)) {
asbt += tc_tick_sbt;
}
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p current sbt: %ld", td->td_tid, kevq, asbt);
//printf("kqueue_scan: td %d, kevq %p current sbt: %ld\n", td->td_tid, kevq, asbt);
/* calc the difference */
rsbt = fsbt - asbt;
if (rsbt <= 0) {
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p target sbt reached.", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p target sbt reached.\n", td->td_tid, kevq);
/* we are already overdue */
error = 0;
goto done;
} else {
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target difference: %ld", td->td_tid, kevq, rsbt);
//printf("kqueue_scan: td %d, kevq %p target difference: %ld\n", td->td_tid, kevq, rsbt);
if (rsbt > ws_int_sbt) {
rsbt = ws_int_sbt;
} else {
/* if it's the last time waiting, we set fsbt = -1, which causes us to return no matter what next time */
fsbt = -1;
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for the last time, setting fsbt to -1", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p sleeping for the last time, setting fsbt to -1\n", td->td_tid, kevq);
}
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for %ld", td->td_tid, kevq, rsbt);
//printf("kqueue_scan: td %d, kevq %p sleeping for %ld\n", td->td_tid, kevq, rsbt);
}
}
/* set the target asbt */
if (asbt <= SBT_MAX - rsbt)
asbt += rsbt;
else
asbt = fsbt; /* wait till fsbt, shouldn't happen */
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p next wakeup sbt: %ld", td->td_tid, kevq, asbt);
//printf("kqueue_scan: td %d, kevq %p next wakeup sbt: %ld\n", td->td_tid, kevq, asbt);
}
kevq->kevq_state |= KEVQ_SLEEP;
//CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq);
error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH,
"kqread", asbt, rsbt >> tc_precexp, C_ABSOLUTE);
//CTR2(KTR_KQ, "kqueue_scan: td %d wokeup on kevq %p for events", td->td_tid, kevq);
}
if (error == 0)
goto retry;
/* don't restart after signals... */
if (error == ERESTART)
error = EINTR;
else if (error == EWOULDBLOCK)
else if (error == EWOULDBLOCK) {
if (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS && fsbt != -1) {
goto retry;
}
error = 0;
}
goto done;
}
@ -2889,7 +3027,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn);
if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) {
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
continue;
@ -2908,7 +3046,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
("knote %p is unexpectedly in flux", kn));
if ((kn->kn_flags & EV_DROP) == EV_DROP) {
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
@ -2921,7 +3059,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
continue;
} else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) {
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
@ -2945,11 +3083,29 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KQ_GLOBAL_LOCK(&kq_global, haskqglobal);
}
knl = kn_list_lock(kn);
if (kn->kn_fop->f_event(kn, 0) == 0) {
fevent = kn->kn_fop->f_event(kn, 0);
/* return stolen knotes */
if (kn->kn_status & KN_WS) {
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS);
if (fevent) {
knote_activate(kn);
}
knote_leave_flux_ul(kn);
KEVQ_LOCK(kevq);
kevq->kn_count--;
kn_list_unlock(knl);
influx = 1;
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p returned stolen knote %p", td->td_tid, kevq, kn);
continue;
}
if (fevent == 0) {
KEVQ_LOCK(kevq);
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE |
KN_SCAN | KN_REQUEUE);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS);
knote_leave_flux_ul(kn);
kevq->kn_count--;
kn_list_unlock(knl);
@ -2957,6 +3113,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not valid anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid);
continue;
}
touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL);
if (touch)
kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS);
@ -2975,14 +3132,15 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
if (kn->kn_flags & EV_DISPATCH)
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_REQUEUE);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_PROCESSING | KN_WS);
kevq->kn_count--;
} else {
/* this flag is here to prevent a subtle workstealing race where one thread gets an identifier
/* this flag is here to prevent a subtle workstealing race where one thread gets an fd
and returns, before it can process the event, another thread steals the knote and
processes the same fd, resulting in the first thread having no data available.
Work stealing will avoid stealing knotes with this flag set */
kn->kn_status |= KN_REQUEUE;
kn->kn_status |= KN_PROCESSING;
CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq);
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
}
@ -3095,11 +3253,11 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
return (0);
}
#endif
struct sbuf buf;
struct kqueue *kq;
char *dat;
int error = 0;
int sched;
struct sbuf buf;
char *rbuf;
kq = fp->f_data;
CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd);
@ -3125,36 +3283,38 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
KQ_UNLOCK(kq);
break;
case FKQMPRNT:
if (kq->kq_flags & KQ_FLAG_MULTI) {
/* 4MB buffer */
dat = malloc(sizeof(char) * 4096 * 1024, M_KQUEUE, M_WAITOK | M_ZERO);
sbuf_new(&buf, dat, 4096 * 1024 * sizeof(char), SBUF_INCLUDENUL);
/* dump kvlst */
error = sbuf_printf(&buf, "KQ 0x%p dump:\n\nActive KEVQ:\n", kq);
KASSERT(error == 0, ("error writing sbuf"));
rbuf = malloc(1024 * 1024 * sizeof(char), M_KQUEUE, M_WAITOK);
sbuf_new(&buf, rbuf, 1024 * 1024, SBUF_FIXEDLEN | SBUF_INCLUDENUL);
if (kq->kq_flags & KQ_FLAG_MULTI) {
sbuf_printf(&buf, "KQ 0x%p dump:\n\nActive KEVQs:\n", kq);
KVLST_RLOCK(kq);
for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) {
kevq_dump(veclist_at(&kq->kevq_vlist, i), 0, &buf);
kevq_dump(&buf, veclist_at(&kq->kevq_vlist, i), 0);
}
KVLST_RUNLOCK(kq);
/* dump kqdom if used */
if (KQSCHED_GET_SCHED(kq) & KQDOM_FLAGS) {
error = sbuf_printf(&buf, "\nKQDOM:\n");
KASSERT(error == 0, ("error writing sbuf"));
kqdom_dump(kq->kq_kqd, 0, &buf);
sbuf_printf(&buf, "\nKQDOM:\n");
kqdom_dump(&buf, kq->kq_kqd, 0);
}
error = sbuf_finish(&buf);
KASSERT(error == 0, ("error sbuf_finish"));
uprintf("%s", sbuf_data(&buf));
sbuf_delete(&buf);
free(dat, M_KQUEUE);
} else {
error = (EINVAL);
}
sbuf_finish(&buf);
/* XXX: severe hack */
memcpy((void*)*(uintptr_t*)data, sbuf_data(&buf), sbuf_len(&buf) > 1024 * 1024 ? 1024 * 1024 : sbuf_len(&buf));
sbuf_delete(&buf);
free(rbuf, M_KQUEUE);
break;
default:
error = (ENOTTY);
@ -3317,6 +3477,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
knote_dequeue(kn);
if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) {
/* XXX: segfault here */
knote_drop(kn, td);
}
@ -4089,7 +4250,7 @@ kevq_vec_select_kevq(struct veclist *lst, int num_rand)
/* XXX: hack */
KASSERT(num_rand < 8, ("too much num_rand"));
CTR1(KTR_KQ, "kevq_vec_select_kevq: num - %d", num_rand);
//CTR1(KTR_KQ, "kevq_vec_select_kevq: num - %d", num_rand);
sz = veclist_size(lst);
@ -4099,7 +4260,7 @@ kevq_vec_select_kevq(struct veclist *lst, int num_rand)
for (int i = 0; i < num_rand; i++) {
next_kevq = veclist_at(lst, rand % sz);
CTR5(KTR_KQ, "kevq_vec_select_kevq: candidate kevq %p [%d] avg lat %ld #kn %d Tot: %ld", next_kevq, i, next_kevq->kevq_avg_lat, next_kevq->kn_count, next_kevq->kevq_avg_lat * next_kevq->kn_count);
//CTR5(KTR_KQ, "kevq_vec_select_kevq: candidate kevq %p [%d] avg lat %ld #kn %d Tot: %ld", next_kevq, i, next_kevq->kevq_avg_lat, next_kevq->kn_count, next_kevq->kevq_avg_lat * next_kevq->kn_count);
if (cur_kevq == NULL || (next_kevq != NULL && kevq_lat_cmp(cur_kevq, next_kevq) > 0)) {
cur_kevq = next_kevq;
@ -4110,7 +4271,7 @@ kevq_vec_select_kevq(struct veclist *lst, int num_rand)
}
}
CTR2(KTR_KQ, "kevq_vec_select_kevq: selected kevq %p Tot: %ld", cur_kevq, cur_kevq == NULL ? 0 : cur_kevq->kevq_avg_lat * cur_kevq->kn_count);
//CTR2(KTR_KQ, "kevq_vec_select_kevq: selected kevq %p Tot: %ld", cur_kevq, cur_kevq == NULL ? 0 : cur_kevq->kevq_avg_lat * cur_kevq->kn_count);
return cur_kevq;
}
@ -4232,7 +4393,7 @@ knote_next_kevq(struct knote *kn)
other_kevq = kevq_vec_select_kevq(&kq->kevq_vlist, sargs);
KVLST_RUNLOCK(kq);
if (next_kevq == NULL || (other_kevq != NULL && kevq_lat_wcmp(next_kevq, other_kevq, 80) > 0)) {
if (next_kevq == NULL || (other_kevq != NULL && kevq_lat_wcmp(next_kevq, other_kevq, 90) > 0)) {
next_kevq = other_kevq;
}
}
@ -4315,13 +4476,13 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
struct kqueue *kq;
kq = kn->kn_kq;
/* CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq); */
CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq);
KEVQ_OWNED(kevq);
KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued"));
/* Queuing to a clsoing kevq is fine.
/* Queuing to a closing kevq is fine.
* The refcnt wait in kevq drain is before knote requeuing
* so no knote will be forgotten
* KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0 && (kevq->kevq_state & KEVQ_ACTIVE) != 0, ("kevq already closing or not ready")); */
@ -4342,7 +4503,7 @@ knote_dequeue(struct knote *kn)
KEVQ_OWNED(kevq);
CTR2(KTR_KQ, "knote_dequeue: kn %p from kevq %p", kn, kevq);
CTR3(KTR_KQ, "knote_dequeue: kn %p from kevq %p flag: 0x%x", kn, kevq, kn->kn_status);
KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued"));
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);

View File

@ -304,10 +304,11 @@ struct knote {
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
#define KN_REQUEUE 0x200 /* knote has triggered and is requeued to the current queue */
#define KN_MARKER 0x10 /* ignore this knote */
#define KN_KQUEUE 0x20 /* this knote belongs to a kq */
#define KN_SCAN 0x40 /* flux set in kqueue_scan() */
#define KN_PROCESSING 0x80 /* the knote on the kevq is undergoing userspace processing */
#define KN_WS 0x100 /* the knote is stolen from another kevq */
int kn_fluxwait;
int kn_influx;
struct mtx kn_fluxlock;
@ -395,7 +396,7 @@ __END_DECLS
* The ioctl to set multithreaded mode
*/
#define FKQMULTI _IOW('f', 89, int)
#define FKQMPRNT _IO('f', 90)
#define FKQMPRNT _IOW('f', 90, uintptr_t)
/*
* KQ sched

View File

@ -62,6 +62,7 @@ struct kevq {
#define KEVQ_SLEEP 0x01
#define KEVQ_CLOSING 0x02
#define KEVQ_ACTIVE 0x04
#define KEVQ_WS 0x08 /* the kevq is work stealing */
int kevq_state;
int kevq_refcnt;
@ -73,6 +74,7 @@ struct kevq {
uint64_t kevq_tot_ev;
uint64_t kevq_tot_time;
uint64_t kevq_tot_syscall;
uint64_t kevq_tot_ws;
/* TODO: maybe these should be in kqdomain or global */
uint64_t kevq_tot_fallback;

View File

@ -170,6 +170,7 @@ enum sysinit_sub_id {
SI_SUB_SMP = 0xf000000, /* start the APs*/
#endif
SI_SUB_RACCTD = 0xf100000, /* start racctd*/
SI_SUB_KQUEUE = 0xf200000, /* initialize kqueue */
SI_SUB_LAST = 0xfffffff /* final initialization */
};

View File

@ -46,7 +46,7 @@ int vnode_fd;
extern char * kevent_to_str(struct kevent *);
struct kevent * kevent_get(int);
struct kevent * kevent_get_timeout(int, int);
struct kevent * kevent_get_timeout_u(int kqfd, int useconds);
struct kevent * kevent_get_timeout_u(int kqfd, uint64_t useconds);
void kevent_cmp(struct kevent *, struct kevent *);

View File

@ -119,12 +119,13 @@ kevent_get_timeout(int kqfd, int seconds)
/* Retrieve a single kevent, specifying a maximum time to wait for it. */
struct kevent *
kevent_get_timeout_u(int kqfd, int useconds)
kevent_get_timeout_u(int kqfd, uint64_t useconds)
{
int nfds;
struct kevent *kev;
struct timespec timeout = {0, useconds * 1000};
uint64_t nsec = useconds * 1000;
struct timespec timeout = {nsec / 1000000000, nsec % 1000000000};
printf("timeout: %ld sec, %ld nsec\n", timeout.tv_sec, timeout.tv_nsec);
if ((kev = calloc(1, sizeof(*kev))) == NULL)
err(1, "out of memory");

View File

@ -42,14 +42,30 @@ struct thread_info {
* Read test
*/
#define THREAD_CNT (32)
#define PACKET_CNT (3200)
#define THREAD_CNT (16)
#define PACKET_CNT (1600)
int g_kqfd;
int g_sockfd[2];
struct thread_info g_thrd_info[THREAD_CNT];
static int g_kqfd;
static int g_sockfd[2];
static struct thread_info g_thrd_info[THREAD_CNT];
/* Test threads signals this upon receiving events */
sem_t g_sem_driver;
static sem_t g_sem_driver;
static char dmpbuf[1024 * 1024 + 1];
static inline void
dump_gkq()
{
int error;
uintptr_t para = (uintptr_t)&dmpbuf;
/* dump KQ */
memset(dmpbuf, 0, 1024 * 1024 + 1);
error = ioctl(g_kqfd, FKQMPRNT, &para);
if (error == -1) {
err(1, "dump ioctl failed");
} else {
printf("%s\n", dmpbuf);
}
}
static char
socket_pop(int sockfd)
@ -362,11 +378,7 @@ test_socket_queue(void)
}
}
/* dump KQ */
error = ioctl(g_kqfd, FKQMPRNT);
if (error == -1) {
err(1, "dump ioctl failed");
}
dump_gkq();
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
@ -410,10 +422,11 @@ test_socket_queue(void)
/***************************
* WS test
***************************/
#define SOCK_WS_CNT (100)
#define WS_TIMEOUT (10)
#define SOCK_WS_CNT (127)
#define SOCK_WS_TOT (SOCK_WS_CNT * 50)
static volatile int ws_num = 0;
static volatile int ws_ok = 0;
static void*
test_socket_ws_worker(void* args)
@ -422,16 +435,27 @@ test_socket_ws_worker(void* args)
char dat;
struct kevent *ret;
while (ws_num < SOCK_WS_CNT) {
while (ws_num < SOCK_WS_TOT) {
ret = kevent_get(g_kqfd);
if (info->ws_master == 0) {
ret = kevent_get_timeout_u(g_kqfd, WS_TIMEOUT);
if (ret != NULL) {
free(ret);
}
break;
}
if (ret != NULL) {
dat = socket_pop(ret->ident);
#ifdef TEST_DEBUG
printf("READ_M: thread %d wokeup for event: ws_num: %d\n", info->tid, ws_num);
#endif
free(ret);
ws_num++;
}
}
}
/* the master does nothing */
while(!ws_ok) {
};
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
@ -439,7 +463,7 @@ test_socket_ws_worker(void* args)
pthread_exit(0);
}
int ws_sockfd[SOCK_WS_CNT][2];
int ws_sockfd[SOCK_WS_CNT + 1][2];
static void
test_socket_ws()
@ -447,10 +471,9 @@ test_socket_ws()
struct kevent kev;
struct thread_info thrd_info[2];
const char *test_id = "[Multi][WS]kevent(evfilt)";
cpuset_t cpuset;
test_begin(test_id);
for (int i = 0; i < SOCK_WS_CNT; i++) {
for (int i = 0; i < SOCK_WS_CNT + 1; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &ws_sockfd[i][0]) < 0) {
@ -473,21 +496,12 @@ test_socket_ws()
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
}
sleep(3);
sleep(1);
for(int i = 0; i < SOCK_WS_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: pusing 1 packet to sock %d\n", i);
#endif
socket_push(ws_sockfd[i][1], '.');
}
/* push 1 packet to the last socket*/
socket_push(ws_sockfd[SOCK_WS_CNT][1], '.');
sleep(1);
@ -498,13 +512,21 @@ test_socket_ws()
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
sleep(1);
for(int i = 0; i < SOCK_WS_TOT; i++) {
socket_push(ws_sockfd[i % SOCK_WS_CNT][1], '.');
}
while(ws_num < SOCK_WS_TOT) {
};
dump_gkq();
ws_ok = 1;
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to exit...\n");
@ -513,7 +535,7 @@ test_socket_ws()
pthread_join(thrd_info[i].thrd, NULL);
}
for (int i = 0; i < SOCK_WS_CNT; i++) {
for (int i = 0; i < SOCK_WS_CNT + 1; i++) {
EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &ws_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
@ -524,14 +546,100 @@ test_socket_ws()
success();
}
static uint64_t get_utime()
{
struct timespec spec;
clock_gettime(CLOCK_REALTIME_PRECISE, &spec);
return spec.tv_nsec / 1000 + spec.tv_sec * 1000 * 1000;
}
#define TIMEOUT_THRESHOLD (1)
static void
test_socket_ws_check_timeout(uint64_t utimeout)
{
struct kevent *kev;
uint64_t start = get_utime();
kev = kevent_get_timeout_u(g_kqfd, utimeout);
uint64_t end = get_utime();
int pct = (end - start) * 100 / utimeout;
if (kev != NULL) {
err(1, "ws timeout kev != NULL");
}
if (pct > TIMEOUT_THRESHOLD) {
err(1, "ws timeout error too large: %d", pct);
}
}
static void
test_socket_ws_timeout()
{
struct kevent kev, *ret;
const char *test_id = "[Multi][WS]kevent_timeout(evfilt)";
test_begin(test_id);
int flags = KQSCHED_MAKE(0,0,KQ_SCHED_FLAG_WS,1);
int tkqfd = kqueue();
int error = ioctl(tkqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
EV_SET(&kev, g_sockfd[1], EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(tkqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_timeout_add");
}
/* 1s */
printf("1s. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 1000);
/* 100 ms */
printf("100ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 100);
/* 10 ms */
printf("10ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 10);
/* 1 ms */
printf("1ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000);
/* 100 us */
printf("100u. \n");
ret = kevent_get_timeout_u(tkqfd, 100);
/* 10 us */
printf("10us. \n");
ret = kevent_get_timeout_u(tkqfd, 10);
/* 1 us */
printf("1us. \n");
ret = kevent_get_timeout_u(tkqfd, 1);
EV_SET(&kev, g_sockfd[1], EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (kevent(tkqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_timeout_delete");
}
close(g_sockfd[0]);
close(g_sockfd[1]);
close(tkqfd);
success();
}
/***************************
* Brutal test
***************************/
#define THREAD_BRUTE_CNT (32)
#define SOCK_BRUTE_CNT (64)
#define PACKET_BRUTE_CNT (10000)
#define THREAD_EXIT_PROB (50)
#define THREAD_BRUTE_CNT (16)
#define SOCK_BRUTE_CNT (128)
#define PACKET_BRUTE_CNT (50 * (SOCK_BRUTE_CNT))
#define THREAD_EXIT_PROB (67)
#define RAND_SLEEP (29)
#define RAND_SEND_SLEEP (7)
@ -615,7 +723,6 @@ test_socket_brutal(char* name)
err(1, "kevent_socket");
}
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
@ -692,15 +799,16 @@ test_socket_brutal(char* name)
success();
}
void
test_evfilt_read_m()
{
int flags = 0;
g_kqfd = kqueue();
int error;
/* Default rand */
int error = ioctl(g_kqfd, FKQMULTI, &flags);
flags = 0;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
@ -756,18 +864,21 @@ test_evfilt_read_m()
if (error == -1) {
err(1, "ioctl");
}
test_socket_brutal("best2");
test_socket_read(1);
test_socket_brutal("best2");
close(g_kqfd);
/* WS */
flags = KQSCHED_MAKE(0,0,KQ_SCHED_FLAG_WS,1);;
flags = KQSCHED_MAKE(0,0,KQ_SCHED_FLAG_WS,1);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_ws();
test_socket_ws_timeout();
test_socket_brutal("ws1");
close(g_kqfd);
}