From cb4c673500d9fe2496e5e8b1ac0c0ba72a7ee93a Mon Sep 17 00:00:00 2001 From: Oscar Zhao Date: Fri, 19 Apr 2019 15:33:07 -0400 Subject: [PATCH] WS + BON --- sys/kern/kern_event.c | 561 ++++++++++++++++++++++------ sys/sys/event.h | 8 +- sys/sys/eventvar.h | 37 +- tests/sys/kqueue/libkqueue/read_m.c | 168 ++++++++- 4 files changed, 650 insertions(+), 124 deletions(-) diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index cd027af21254..cc379c8c72be 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -115,17 +115,13 @@ extern struct cpu_group *cpu_top; } while(0) /* no need to handle overflow as long as the existing org/cur doesn't overflow */ -#define CALC_OVERTIME_AVG(org, cur, out) do { \ - (out)->tv_sec = (org)->tv_sec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \ - (out)->tv_nsec = (org)->tv_nsec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \ - TIMESPEC_DIV((out), (AVG_WEIGHT_FACTOR_OLD + AVG_WEIGHT_FACTOR_NEW), (out)); \ -} while(0) +#define CALC_OVERTIME_AVG(prev, cur) (((prev) * AVG_WEIGHT_FACTOR_OLD + (cur) * AVG_WEIGHT_FACTOR_NEW) / (AVG_WEIGHT_FACTOR_OLD + AVG_WEIGHT_FACTOR_NEW)) #define KEVQ_NEXT_AVAIL_LOCKED(out, head, element, member) do { \ (out) = (element); \ while(1) { \ (out) = (out) == NULL ? LIST_FIRST((head)) : LIST_NEXT((out), member); \ - if ((out) != NULL) { \ + if ((out) != NULL && KEVQ_AVAIL(out)) { \ KEVQ_LOCK((out)); \ if (KEVQ_AVAIL((out))) { \ break; \ @@ -147,7 +143,8 @@ static void kevq_init(struct kevq *kevq); static void kevq_release(struct kevq* kevq, int locked); static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp); static void kevq_destroy(struct kevq *kevq); -static int kevq_acquire(struct kevq *kevq); +static int kevq_acquire(struct kevq *kevq, int locked); +static void kevq_worksteal(struct kevq *kevq); void kevq_drain(struct kevq *kevq); static void knote_xinit(struct knote *kn); @@ -217,11 +214,12 @@ static void knote_sched(struct knote *kn); static void kqdom_init(struct kqdom *kqd); -static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg); +static void kqdom_update_lat(struct kqdom *leaf, unsigned long avg); +static void kqdom_update_active(struct kqdom *leaf, int change); static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq); static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq); static void kqdom_destroy(struct kqdom *root); -static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg); +static struct kevq * kqdom_random_kevq_locked(struct kqdom *kqd, struct kevq *last_kevq); static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id); static struct kqdom * kqdom_build(void); static struct kqdom * kqdom_find(struct kqdom *root, int cpuid); @@ -246,6 +244,26 @@ static int filt_user(struct knote *kn, long hint); static void filt_usertouch(struct knote *kn, struct kevent *kev, u_long type); +static int kq_sched_bon_count = 2; +SYSCTL_INT(_kern, OID_AUTO, kq_sched_bon_count, CTLFLAG_RWTUN, &kq_sched_bon_count, 0, "the number of kevqs to select the best one from"); + +/* TODO: make this a percentage? */ +static int kq_sched_ws_count = 1; +SYSCTL_INT(_kern, OID_AUTO, kq_sched_ws_count, CTLFLAG_RWTUN, &kq_sched_ws_count, 0, "the number of kevqs to steal each time"); + +// hacky fast random generator +static unsigned int g_seed = 0x1234; +// Used to seed the generator. +static void kqueue_fsrand(int seed) { + g_seed = seed; +} +// Compute a pseudorandom integer. +// Output value in range [0, 32767] +static int kqueue_frand(void) { + g_seed = (214013 * g_seed + 2531011); + return (g_seed>>16) & 0x7FFF; +} + static struct filterops file_filtops = { .f_isfd = 1, .f_attach = filt_fileattach, @@ -547,7 +565,7 @@ filt_kqueue(struct knote *kn, long hint) CTR1(KTR_KQ, "filt_kqueue called for kn %p", kn); - if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ( (kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { return 0; } @@ -1407,8 +1425,9 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan struct kevent *kevp, *changes; struct timespec cur_ts; int i, n, nerrors, error; + unsigned long avg; - if ((kq->kq_state & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) { /* Mark the global kevq as ready for single threaded mode to close the window between kqueue_register and kqueue_scan.*/ KEVQ_LOCK(kevq); @@ -1421,22 +1440,24 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan if (kevq->kevq_last_nkev != 0) { /* make sure we actually processed events last time */ - getnanouptime(&cur_ts); timespecsub(&cur_ts, &kevq->kevq_last_kev, &cur_ts); + /* divide by the number of events processed */ - TIMESPEC_DIV(&cur_ts, kevq->kevq_last_nkev, &cur_ts); - if (timespecisset(&kevq->kevq_avg_lat)) { - CALC_OVERTIME_AVG(&kevq->kevq_avg_lat, &kevq->kevq_avg_lat, &cur_ts); + avg = (cur_ts.tv_sec * 1000000 + cur_ts.tv_nsec / 100) / kevq->kevq_last_nkev; + + if (kevq->kevq_avg_lat != 0) { + kevq->kevq_avg_lat = CALC_OVERTIME_AVG(kevq->kevq_avg_lat, avg); } else { - /* first time */ - timespecadd(&cur_ts, &kevq->kevq_avg_lat, &kevq->kevq_avg_lat); + kevq->kevq_avg_lat = avg; } - CTR4(KTR_KQ, "kevent: td %d spent %ld s %ld ns per event on %d events", td->td_tid, cur_ts.tv_sec, cur_ts.tv_nsec, kevq->kevq_last_nkev); + CTR3(KTR_KQ, "kevent: td %d spent %ld us per event on %d events", td->td_tid, avg, kevq->kevq_last_nkev); /* clear parameters */ timespecclear(&kevq->kevq_last_kev); kevq->kevq_last_nkev = 0; + + kqdom_update_lat(kevq->kevq_kqd, avg); } KEVQ_UNLOCK(kevq); @@ -1908,7 +1929,7 @@ kevq_thred_drain(struct kevq_thred *kevq_th, struct thread* td) { KEVQ_TH_LOCK(kevq_th); while((kevq = LIST_FIRST(&kevq_th->kevq_list)) != NULL) { - if (kevq_acquire(kevq) == 0) { + if (kevq_acquire(kevq, 0) == 0) { CTR2(KTR_KQ, "kevq_thred_drain: draining kevq %p on kevq_th %p", kevq, kevq_th); KEVQ_TH_UNLOCK(kevq_th); kevq_drain(kevq); @@ -1924,7 +1945,6 @@ static void kevq_init(struct kevq *kevq) { mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK); TAILQ_INIT(&kevq->kn_head); - timespecclear(&kevq->kevq_avg_lat); timespecclear(&kevq->kevq_last_kev); } @@ -1944,19 +1964,25 @@ kevq_release(struct kevq* kevq, int locked) } static int -kevq_acquire(struct kevq *kevq) +kevq_acquire(struct kevq *kevq, int locked) { - KEVQ_NOTOWNED(kevq); int error; + if (locked) { + KEVQ_OWNED(kevq); + } else { + KEVQ_LOCK(kevq); + } error = 0; - KEVQ_LOCK(kevq); CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt); if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { error = EINVAL; } else { kevq->kevq_refcnt++; } - KEVQ_UNLOCK(kevq); + + if (!locked) { + KEVQ_UNLOCK(kevq); + } return error; } @@ -1982,7 +2008,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) return EINVAL; } - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { // allocate KEVQ_TH if (td->td_kevq_thred == NULL) { kevq_th = malloc(sizeof(struct kevq_thred), M_KQUEUE, M_WAITOK | M_ZERO); @@ -2043,13 +2069,20 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) LIST_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); + + KEVQ_TH_UNLOCK(kevq_th); + KQ_UNLOCK(kq); + kqdom_insert(kqd, kevq); + KQD_UNLOCK(kqd); } else { to_free = alloc_kevq; + + KEVQ_TH_UNLOCK(kevq_th); + KQD_UNLOCK(kqd); + KQ_UNLOCK(kq); } - KEVQ_TH_UNLOCK(kevq_th); - KQD_UNLOCK(kqd); - KQ_UNLOCK(kq); + if (to_free != NULL) { free(to_free, M_KQUEUE); @@ -2081,7 +2114,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) } } - error = kevq_acquire(kevq); + error = kevq_acquire(kevq, 0); if (!error) { *kevqp = kevq; @@ -2105,13 +2138,13 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp) if (((kq->kq_state) & KQ_CLOSING) != 0) { return (EBADF); } - if ((kq->kq_state & KQ_FLAG_INIT) == 0) { - kq->kq_state |= KQ_FLAG_INIT; + if ((kq->kq_flags & KQ_FLAG_INIT) == 0) { + kq->kq_flags |= KQ_FLAG_INIT; } kq->kq_refcnt++; KQ_UNLOCK(kq); - if (((kq->kq_state & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) { + if (((kq->kq_flags & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) { kqd = kqdom_build(); KQ_LOCK(kq); if (kq->kq_kqd == NULL) { @@ -2230,82 +2263,142 @@ static void kqdom_init(struct kqdom *kqd) { mtx_init(&kqd->kqd_lock, "kqdom_lock", NULL, MTX_DEF | MTX_DUPOK); - LIST_INIT(&kqd->kqd_kevqlist); - TAILQ_INIT(&kqd->children); } /* inserts a list*/ static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq) { - int val; - struct kqdom* parent; + int oldcap; + struct kevq **expand; KQD_OWNED(kqd); KASSERT(kqd->num_children == 0, ("inserting into a non-leaf kqdom")); - LIST_INSERT_HEAD(&kqd->kqd_kevqlist, kevq, kqd_e); - /* TODO: don't hold the lock while doing the update */ - parent = kqd; - while(parent != NULL) { - val = atomic_fetchadd_int(&parent->num_kevq, 1); - KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0")); - parent = parent->parent; + CTR4(KTR_KQ, "kqdom_insert: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap); + +retry: + if (kqd->kqd_kevqcnt + 1 > kqd->kqd_kevqcap) { + CTR2(KTR_KQ, "kqdom_insert: expanding... kqd %d for kevq %p\n", kqd->id, kevq); + oldcap = kqd->kqd_kevqcap; + KQD_UNLOCK(kqd); + expand = malloc(sizeof(struct kqdom *) * (oldcap + KQDOM_EXTENT_FACTOR), M_KQUEUE, M_WAITOK | M_ZERO); + + KQD_LOCK(kqd); + /* recheck if we need expansion, make sure old capacity didn't change */ + if (kqd->kqd_kevqcap == oldcap) { + /* copy the content from the old list to this */ + for(int i = 0; i < kqd->kqd_kevqcnt; i++) { + expand[i] = kqd->kqd_kevqlist[i]; + } + free(kqd->kqd_kevqlist, M_KQUEUE); + kqd->kqd_kevqlist = expand; + kqd->kqd_kevqcap = oldcap + KQDOM_EXTENT_FACTOR; + } else { + /* some threads made changes while we allocated memory, retry */ + free(expand, M_KQUEUE); + goto retry; + } } + + KQD_OWNED(kqd); + KASSERT(kqd->kqd_kevqcnt + 1 <= kqd->kqd_kevqcap, ("kqdom didn't expand properly")); + + /* insert to list */ + kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = kevq; + kqd->kqd_kevqcnt++; } /* removes a list */ static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq) { - int val; - struct kqdom* parent; + int found; KQD_OWNED(kqd); KASSERT(kqd->num_children == 0, ("removing from a non-leaf kqdom")); - LIST_REMOVE(kevq, kqd_e); - /* TODO: don't hold the lock while doing the update */ - parent = kqd; - while(parent != NULL) { - val = atomic_fetchadd_int(&parent->num_kevq, -1); - KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0")); - parent = parent->parent; + CTR4(KTR_KQ, "kqdom_remove: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap); + found = 0; + + /* slow, but no need to optimize for delete */ + for(int i = 0; i < kqd->kqd_kevqcnt; i++) { + if(kqd->kqd_kevqlist[i] == kevq) { + found = 1; + } + + if(found && (i+1 < kqd->kqd_kevqcnt)) { + kqd->kqd_kevqlist[i] = kqd->kqd_kevqlist[i+1]; + } } - if (kqd->kqd_ckevq == kevq) { - kqd->kqd_ckevq = LIST_NEXT(kevq, kqd_e); - } + KASSERT(found, ("cannot find kevq from kqdom")); + + kqd->kqd_kevqcnt--; + kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = NULL; + + if (kqd->kqd_kevqcnt != 0) + kqd->kqd_ckevq = kqd->kqd_ckevq % kqd->kqd_kevqcnt; + else + kqd->kqd_ckevq = 0; } static void kqdom_destroy(struct kqdom *root) { - struct kqdom *kqdom, *tkqd; - - TAILQ_FOREACH_SAFE(kqdom, &root->children, child_e, tkqd) { - kqdom_destroy(kqdom); + for(int i = 0; i < root->num_children; i++) { + kqdom_destroy(root->children[i]); } CTR2(KTR_KQ, "kqdom_destroy: destroyed kqdom %p with %d child kqdoms", root, root->num_children); - KASSERT(LIST_FIRST(&root->kqd_kevqlist) == NULL, ("freeing a kqdom with kevqs")); + + if (root->kqd_kevqlist != NULL) { + KASSERT(root->kqd_kevqcnt == 0, ("freeing a kqdom with kevqs")); + free(root->kqd_kevqlist, M_KQUEUE); + } + + if (root->children != NULL) { + free(root->children, M_KQUEUE); + } + + KASSERT(root->num_active == 0, ("freeing a kqdom with active kevqs")); + free(root, M_KQUEUE); } static void -kqdom_update_stats(struct kqdom *leaf, struct timespec *avg) +kqdom_update_active(struct kqdom *leaf, int change) { - struct timespec last_avg; - last_avg.tv_sec = avg->tv_sec; - last_avg.tv_nsec = avg->tv_nsec; + int oldval, newval; + KASSERT(change != 0, ("updating active 0")); + + while (leaf != NULL) { + oldval = atomic_fetchadd_int(&leaf->num_active, change); + newval = oldval + change; + KASSERT(oldval >= 0 && newval >= 0, ("invalid oldval or newval after update")); + if (oldval == 0) { + change = 1; + CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id); + } else if (newval == 0) { + /* if new val is 0, we */ + change = -1; + CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id); + } else { + break; + } + leaf = leaf->parent; + } +} + +static void +kqdom_update_lat(struct kqdom *leaf, unsigned long avg) +{ while(leaf != NULL) { - KQD_LOCK(leaf); + if (leaf->avg_lat != 0) { + // bit rot race here? + leaf->avg_lat = CALC_OVERTIME_AVG(leaf->avg_lat, avg); + } else { + leaf->avg_lat = avg; + } - CALC_OVERTIME_AVG(&leaf->kqd_avg_lat, &last_avg, &leaf->kqd_avg_lat); - CTR3(KTR_KQ, "kqdom_update_stats: updated avg lat %ld sec %ld for kqdom %d", - leaf->kqd_avg_lat.tv_sec, leaf->kqd_avg_lat.tv_nsec, leaf->id); - - last_avg.tv_sec = leaf->kqd_avg_lat.tv_sec; - last_avg.tv_nsec = leaf->kqd_avg_lat.tv_nsec; - - KQD_UNLOCK(leaf); + CTR2(KTR_KQ, "kqdom_update_lat: updated avg lat %ld us for kqdom %d", leaf->avg_lat, leaf->id); leaf = leaf->parent; } @@ -2325,6 +2418,7 @@ kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_i (*kqd_id)++; kqd_cur->num_children = cg_numchild; CPU_COPY(&cg_cur->cg_mask, &kqd_cur->cpu_mask); + kqd_cur->children = malloc(sizeof(struct kqdom *) * cg_numchild, M_KQUEUE, M_WAITOK | M_ZERO); for (int i = 0; i < cg_numchild; i++) { child = malloc(sizeof(struct kqdom), M_KQUEUE, M_WAITOK | M_ZERO); @@ -2332,7 +2426,7 @@ kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_i child->parent = kqd_cur; - TAILQ_INSERT_TAIL(&kqd_cur->children, child, child_e); + kqd_cur->children[i] = child; kqdom_build_internal(child, &cg_cur->cg_child[i], kqd_id); } } @@ -2351,16 +2445,14 @@ kqdom_build() static struct kqdom * kqdom_find(struct kqdom *root, int cpuid) { - struct kqdom *child, *tchild; - if (root->num_children == 0) { KASSERT(CPU_ISSET(cpuid, &root->cpu_mask), ("kqdom_find: cpuid and cpumask mismatch")); return root; } - TAILQ_FOREACH_SAFE(child, &root->children, child_e, tchild) { - if (CPU_ISSET(cpuid, &child->cpu_mask)) { - return kqdom_find(child, cpuid); + for(int i = 0; i < root->num_children; i++) { + if(CPU_ISSET(cpuid, &root->children[i]->cpu_mask)) { + return kqdom_find(root->children[i], cpuid); } } @@ -2463,6 +2555,89 @@ kqueue_task(void *arg, int pending) KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); } +static void +kevq_worksteal(struct kevq *kevq) +{ + struct kevq *other_kevq; + struct kqueue *kq; + struct knote *ws_kn, *next_kn; + struct knlist *knl; + int ws_count, valid; + TAILQ_HEAD(, knote) kn_wsq; + + KEVQ_OWNED(kevq); + TAILQ_INIT(&kn_wsq); + ws_count = 0; + kq = kevq->kq; + + KEVQ_UNLOCK(kevq); + /* todo maybe from cur kqdomain instead of from root */ + other_kevq = kqdom_random_kevq_locked(kq->kq_kqd, kevq); + + if (other_kevq != NULL && other_kevq != kevq && other_kevq->kn_count > 0) { + CTR3(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p with %d knotes", kevq, other_kevq, other_kevq->kn_count); + ws_kn = TAILQ_FIRST(&other_kevq->kn_head); + + while(ws_count < kq_sched_ws_count && ws_kn != NULL) { + KEVQ_OWNED(other_kevq); + next_kn = TAILQ_NEXT(ws_kn, kn_tqe); + + /* don't care about markers */ + if ((ws_kn->kn_status & KN_MARKER) != 0) { + goto end_loop; + } + + KN_FLUX_LOCK(ws_kn); + + /* ignore influx, inactive and disabled */ + if (kn_in_flux(ws_kn) || (ws_kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_REQUEUE)) != (KN_ACTIVE)) { + KN_FLUX_UNLOCK(ws_kn); + goto end_loop; + } + + knote_enter_flux(ws_kn); + KN_FLUX_UNLOCK(ws_kn); + + /* Remove from the old kevq first, due to lock order */ + knote_dequeue(ws_kn); + KEVQ_UNLOCK(other_kevq); + + /* validate event */ + knl = kn_list_lock(ws_kn); + valid = ws_kn->kn_fop->f_event(ws_kn, 0); + kn_list_unlock(knl); + + if (valid) { + TAILQ_INSERT_TAIL(&kn_wsq, ws_kn, kn_wse); + ws_count++; + } + + KEVQ_LOCK(other_kevq); + + if (!valid) { + /* if not valid, return it to the previous queue */ + knote_enqueue(ws_kn, other_kevq); + KN_LEAVE_FLUX_WAKEUP(ws_kn); + } +end_loop: + ws_kn = next_kn; + } + } + + if (other_kevq != NULL) { + KEVQ_UNLOCK(other_kevq); + } + + KEVQ_LOCK(kevq); + while(!TAILQ_EMPTY(&kn_wsq)) { + ws_kn = TAILQ_FIRST(&kn_wsq); + TAILQ_REMOVE(&kn_wsq, ws_kn, kn_wse); + knote_enqueue(ws_kn, kevq); + KN_LEAVE_FLUX_WAKEUP(ws_kn); + CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_kn, ws_kn->kn_id, other_kevq); + } +} + /* * Scan, update kn_data (if not ONESHOT), and copyout triggered events. * We treat KN_MARKER knotes as if they are in flux. @@ -2471,12 +2646,14 @@ static int kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *tsp, struct kevent *keva, struct thread *td) { + struct kqueue *kq; struct kevent *kevp; struct knote *kn, *marker; struct knlist *knl; sbintime_t asbt, rsbt; int count, error, haskqglobal, influx, nkev, touch; + kq = kevq->kq; count = maxevents; nkev = 0; error = 0; @@ -2517,11 +2694,20 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if ((kevq->kevq_state & KEVQ_RDY) == 0) { /* Mark the kevq as ready to receive events */ kevq->kevq_state |= KEVQ_RDY; + kqdom_update_active(kevq->kevq_kqd, 1); } retry: kevp = keva; CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count); + + if ((kq->kq_flags & KQ_FLAG_MULTI) != 0 && (kq->kq_sched_flags & KQ_SCHED_WORK_STEALING) != 0 && kevq->kn_count == 0) + { + /* try work stealing */ + kevq_worksteal(kevq); + } + + KEVQ_OWNED(kevq); if (kevq->kn_count == 0) { if (asbt == -1) { error = EWOULDBLOCK; @@ -2573,7 +2759,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn); if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { - kn->kn_status &= ~KN_QUEUED; + kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE); kevq->kn_count--; KN_FLUX_UNLOCK(kn); continue; @@ -2592,7 +2778,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, ("knote %p is unexpectedly in flux", kn)); if ((kn->kn_flags & EV_DROP) == EV_DROP) { - kn->kn_status &= ~KN_QUEUED; + kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE); knote_enter_flux(kn); kevq->kn_count--; KN_FLUX_UNLOCK(kn); @@ -2605,7 +2791,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, KEVQ_LOCK(kevq); continue; } else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) { - kn->kn_status &= ~KN_QUEUED; + kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE); knote_enter_flux(kn); kevq->kn_count--; KN_FLUX_UNLOCK(kn); @@ -2633,12 +2819,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | - KN_SCAN); + KN_SCAN | KN_REQUEUE); knote_leave_flux_ul(kn); kevq->kn_count--; kn_list_unlock(knl); influx = 1; - CTR3(KTR_KQ, "kqueue_scan: kn %p not valid anymore for kevq %p, td %d", kn, kevq, td->td_tid); + CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not valid anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid); continue; } touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); @@ -2659,10 +2845,15 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, } if (kn->kn_flags & EV_DISPATCH) kn->kn_status |= KN_DISABLED; - kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); + kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_REQUEUE); kevq->kn_count--; } else { - CTR2(KTR_KQ, "kqueue_scan: requeued kn %p to kevq %p", kn, kevq); + /* this flag is here to prevent a subtle workstealing race where one thread gets an identifier + and returns, before it can process the event, another thread steals the knote and + processes the same fd, resulting in the first thread having no data available. + Work stealing will avoid stealing knotes with this flag set*/ + kn->kn_status |= KN_REQUEUE; + CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq); TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); } @@ -2771,11 +2962,11 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, switch (cmd) { case FKQMULTI: KQ_LOCK(kq); - if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { + if ((kq->kq_flags & KQ_FLAG_INIT) == KQ_FLAG_INIT) { error = (EINVAL); } else { CTR2(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p, scheduler flags: %d", kq, *(int*)data); - kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); + kq->kq_flags |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); kq->kq_sched_flags = *(int*)data; } KQ_UNLOCK(kq); @@ -2801,7 +2992,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred, return POLLERR; KQ_LOCK(kq); - if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) { + if ((kq->kq_flags & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) { revents = 0; } else { if (events & (POLLIN | POLLRDNORM)) { @@ -2906,7 +3097,7 @@ kevq_drain(struct kevq *kevq) knote_dequeue(kn); - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) { KEVQ_UNLOCK(kevq); /* TODO: When we knote activate, if the ev has EV_CLEAR set, maybe we shouldn't activate the event * if there hasn't been activities on the fd @@ -2937,7 +3128,7 @@ kevq_drain(struct kevq *kevq) // and will be dequeued later (kn->kn_kevq will be set to another valid kevq) // - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { // drop from KQ Domain KQ_LOCK(kq); KQD_LOCK(kqd); @@ -2949,6 +3140,9 @@ kevq_drain(struct kevq *kevq) LIST_REMOVE(kevq, kevq_th_e); // detach from kqdom + if((kevq->kevq_state & KEVQ_RDY) != 0) { + kqdom_update_active(kqd, -1); + } kqdom_remove(kqd, kevq); // detach from kqueue @@ -3038,10 +3232,10 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) } // destroy kqdoms and kevqs - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { while((kevq = LIST_FIRST(&kq->kq_kevqlist)) != NULL) { KQ_UNLOCK(kq); - if (kevq_acquire(kevq) == 0) + if (kevq_acquire(kevq, 0) == 0) kevq_drain(kevq); KQ_LOCK(kq); } @@ -3099,7 +3293,7 @@ kqueue_close(struct file *fp, struct thread *td) int error; int filedesc_unlock; - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { // only acquire the kqueue lock here if ((error = kqueue_acquire(fp, &kq))) return error; @@ -3691,6 +3885,89 @@ knote_drop_detached(struct knote *kn, struct thread *td) knote_free(kn); } +static struct kevq * +kqdom_random_kevq_locked(struct kqdom* kqd, struct kevq* last_kevq) +{ + struct kqdom *each_child, *last_child; + struct kevq *kevq, *each_kevq; + int num_active, init_idx; + u_long random; + + /* fall-back with the last child in case there is a race */ + last_child = NULL; + kevq = NULL; + + while (kqd->num_children > 0) { + /* read once */ + num_active = kqd->num_active; + if (num_active == 0) { + /* if we got to a child and now it doesn't have any active children, then return NULL + this happens either on the first loop or due to a race of kevq deletion */ + return NULL; + } + + random = kqueue_frand() % num_active; + KASSERT(random < kqd->num_children, ("more active children than total children")); + + for(int i = 0; i < kqd->num_children; i++) { + each_child = kqd->children[i]; + + if (each_child->num_active > 0) { + /* if the child suits our need */ + last_child = each_child; + if (random == 0) { + kqd = each_child; + break; + } + + random--; + } + + if (i == kqd->num_children) { + kqd = last_child; + } + } + } + + if (kqd != NULL) { + CTR3(KTR_KQ, "kqdom_random_kevq_locked: selected kqd %d, # children %p, last_kevq %p", kqd->id, kqd->kqd_kevqcnt, last_kevq); + + KQD_LOCK(kqd); + if (kqd->kqd_kevqcnt != 0) { + random = kqueue_frand() % kqd->kqd_kevqcnt; + init_idx = random; + + each_kevq = kqd->kqd_kevqlist[random]; + while(1) { + /* fast fail */ + if (KEVQ_AVAIL(each_kevq) && each_kevq != last_kevq) { + KEVQ_LOCK(each_kevq); + if (KEVQ_AVAIL(each_kevq)) { + kevq = each_kevq; + break; + } + KEVQ_UNLOCK(each_kevq); + } + + random = (random + 1) % kqd->kqd_kevqcnt; + if (random == init_idx) { + break; + } + each_kevq = kqd->kqd_kevqlist[random]; + } + } + KQD_UNLOCK(kqd); + } + + if (kevq != NULL) { + KEVQ_OWNED(kevq); + } + + CTR2(KTR_KQ, "kqdom_random_kevq_locked: selected kevq %p, last_kevq %p", kevq, last_kevq); + + return kevq; +} + /* select the next kevq based on knote and scheduler flags and locks the returned kevq */ static struct kevq * @@ -3698,14 +3975,15 @@ knote_next_kevq(struct knote *kn) { struct kqdom *kqd; struct kqueue *kq; - struct kevq *next_kevq; + struct kevq *next_kevq, *sel_kevq; + int cur_kevq; next_kevq = NULL; kq = kn->kn_kq; CTR1(KTR_KQ, "knote_next_kevq: processing kn %p", kn); - if ((kq->kq_state & KQ_FLAG_MULTI) == 0) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == 0) { // single threaded mode, just return the current kevq KQ_LOCK(kn->kn_kq); if ((kq->kq_state & KQ_CLOSING) == 0) @@ -3731,27 +4009,96 @@ knote_next_kevq(struct knote *kn) return next_kevq; } - if ((kq->kq_sched_flags & KQ_SCHED_QUEUE) != 0) { - if (kn->kn_kqd == NULL) { - /* the first time knote is queued, record the kqdom */ - kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid)); - - KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2")); - CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] knote %p attached to kqdom id %d", kn, kn->kn_kqd->id); + if ((kq->kq_sched_flags & KQ_SCHED_BEST_OF_N) != 0) { + kqd = kq->kq_kqd; + for(int i = 0; i < kq_sched_bon_count; i++) { + sel_kevq = kqdom_random_kevq_locked(kqd, next_kevq); + if (sel_kevq != NULL) { + KEVQ_OWNED(sel_kevq); + + CTR2(KTR_KQ, "knote_next_kevq: [BON] selected random kevq %p for kn %p", sel_kevq, kn); + + if (next_kevq == NULL && kevq_acquire(sel_kevq, 1) == 0) { + next_kevq = sel_kevq; + KEVQ_UNLOCK(sel_kevq); + } else { + // compare their avg wait time + // TODO: refactor the unlock pattern here + if (sel_kevq->kevq_avg_lat * sel_kevq->kn_count < next_kevq->kevq_avg_lat * next_kevq->kn_count) { + if (kevq_acquire(sel_kevq, 1) == 0) { + KEVQ_UNLOCK(sel_kevq); + + kevq_release(next_kevq, 0); + next_kevq = sel_kevq; + } else { + KEVQ_UNLOCK(sel_kevq); + } + } else { + KEVQ_UNLOCK(sel_kevq); + } + } + + CTR2(KTR_KQ, "knote_next_kevq: [BON] current best kevq %p, avg wait time: %d", next_kevq, next_kevq->kevq_avg_lat * next_kevq->kn_count); + } } - kqd = kn->kn_kqd; + if (next_kevq != NULL) { + KEVQ_LOCK(next_kevq); + kevq_release(next_kevq, 1); + // recheck availability + if (!KEVQ_AVAIL(next_kevq)) { + KEVQ_UNLOCK(next_kevq); + next_kevq = NULL; + } + } + + CTR2(KTR_KQ, "knote_next_kevq: [BON] next kevq %p for kn %p", next_kevq, kn); + } + + if ((next_kevq == NULL) && (kq->kq_sched_flags & KQ_SCHED_QUEUE) != 0) { + if((kq->kq_sched_flags & KQ_SCHED_QUEUE_CPU) != 0) { + kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid)); + } else { + if (kn->kn_kqd == NULL) { + /* the first time knote is queued, record the kqdom */ + kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid)); + + KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2")); + CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] knote %p attached to kqdom id %d", kn, kn->kn_kqd->id); + } + kqd = kn->kn_kqd; + } KQD_LOCK(kqd); + cur_kevq = kqd->kqd_ckevq; - KEVQ_NEXT_AVAIL_LOCKED(next_kevq, &kqd->kqd_kevqlist, kqd->kqd_ckevq, kqd_e); - kqd->kqd_ckevq = next_kevq; + while(1) { + if (kqd->kqd_kevqcnt == 0) { + break; + } + cur_kevq = (cur_kevq + 1) % kqd->kqd_kevqcnt; + next_kevq = kqd->kqd_kevqlist[cur_kevq]; + if (KEVQ_AVAIL(next_kevq)) { + /* fast fail */ + KEVQ_LOCK(next_kevq); + if (KEVQ_AVAIL(next_kevq)) { + kqd->kqd_ckevq = cur_kevq; + break; + } + KEVQ_UNLOCK(next_kevq); + } + + if (cur_kevq == kqd->kqd_ckevq) { + next_kevq = NULL; + break; + } + } KQD_UNLOCK(kqd); CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] next kevq %p for kn %p", next_kevq, kn); } - // generic round-robbin + // fall-back round-robbin if (next_kevq == NULL) { KQ_LOCK(kq); diff --git a/sys/sys/event.h b/sys/sys/event.h index 640eb73cfd26..409abb121db1 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -283,7 +283,11 @@ struct filterops { /* * KQ scheduler flags */ -#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */ +#define KQ_SCHED_QUEUE 0x01 /* make kq affinitize the knote depending on the first cpu it's scheduled to */ +#define KQ_SCHED_QUEUE_CPU 0x02 /* make kq affinitize the knote depending on the runtime cpu it's scheduled to */ +#define KQ_SCHED_WORK_STEALING 0x04 +#define KQ_SCHED_BEST_OF_N 0x08 +#define KQ_SCHED_GREEDY 0x16 /* * An in-flux knote cannot be dropped from its kq while the kq is @@ -299,6 +303,7 @@ struct knote { SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; + TAILQ_ENTRY(knote) kn_wse; /* for work stealing queue */ struct kqueue *kn_kq; /* which kqueue we are on */ struct kevq *kn_kevq; /* the kevq the knote is on */ /* used by the scheduler */ @@ -316,6 +321,7 @@ struct knote { #define KN_MARKER 0x20 /* ignore this knote */ #define KN_KQUEUE 0x40 /* this knote belongs to a kq */ #define KN_SCAN 0x100 /* flux set in kqueue_scan() */ +#define KN_REQUEUE 0x200 /* knote has triggered and is requeued to the current queue */ int kn_fluxwait; int kn_influx; struct mtx kn_fluxlock; diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 05f5e6727489..816d0b1b14fa 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -40,9 +40,10 @@ #define KQ_NEVENTS 8 /* minimize copy{in,out} calls */ #define KQEXTENT 256 /* linear growth by this amount */ +#define KQDOM_EXTENT_FACTOR 8 /* linear growth by this amount */ + struct kevq { LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */ - LIST_ENTRY(kevq) kqd_e; /* entry into kqdomain */ LIST_ENTRY(kevq) kq_e; /* entry into kq */ LIST_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's kevq_list */ struct kqueue *kq; /* the kq that the kevq belongs to */ @@ -58,42 +59,50 @@ struct kevq { int kevq_refcnt; /* Used by the scheduler */ - struct timespec kevq_avg_lat; + unsigned long kevq_avg_lat; struct timespec kevq_last_kev; int kevq_last_nkev; }; /* TODO: assumed that threads don't get rescheduled across cores */ struct kqdom { + /* static */ struct mtx kqd_lock; - TAILQ_ENTRY(kqdom) child_e; struct kqdom *parent; int id; - struct timespec kqd_avg_lat; cpuset_t cpu_mask; int num_children; - int num_kevq; - TAILQ_HEAD(, kqdom) children; - struct kevqlist kqd_kevqlist; /* list of kevqs on the kdomain, only set for leaf domains*/ - struct kevq *kqd_ckevq; + struct kqdom **children; + + /* statistics */ + unsigned long avg_lat; + int num_active; /* total number of active children below this node */ + + /* dynamic members*/ + struct kevq **kqd_kevqlist; /* array list of kevqs on the kdomain, only set for leaf domains */ + int kqd_kevqcap; + int kqd_kevqcnt; + + int kqd_ckevq; }; struct kqueue { struct mtx kq_lock; - int kq_refcnt; + int kq_refcnt; struct selinfo kq_sel; - int kq_state; + int kq_state; #define KQ_SEL 0x01 #define KQ_ASYNC 0x02 #define KQ_TASKSCHED 0x04 /* task scheduled */ #define KQ_TASKDRAIN 0x08 /* waiting for task to drain */ #define KQ_CLOSING 0x10 -#define KQ_FLAG_INIT 0x20 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */ -#define KQ_FLAG_MULTI 0x40 /* Multi-threaded mode */ + int kq_flags; +#define KQ_FLAG_INIT 0x01 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */ +#define KQ_FLAG_MULTI 0x02 /* Multi-threaded mode */ TAILQ_ENTRY(kqueue) kq_list; struct sigio *kq_sigio; struct filedesc *kq_fdp; - int kq_knlistsize; /* size of knlist */ + int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ @@ -105,7 +114,7 @@ struct kqueue { struct kevqlist kq_kevqlist; /* list of kevqs for fall-back round robbin */ struct kqdom *kq_kqd; /* root domain */ struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue, used for round robbin */ - int kq_sched_flags; /* Scheduler flag for the KQ */ + int kq_sched_flags; /* Scheduler flag for the KQ */ }; #endif /* !_SYS_EVENTVAR_H_ */ diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c index 89c62684dc43..6c924770f68e 100644 --- a/tests/sys/kqueue/libkqueue/read_m.c +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -33,13 +33,18 @@ /* * KQ scheduler flags */ -#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */ +#define KQ_SCHED_QUEUE 0x01 /* make kq affinitize the knote depending on the first cpu it's scheduled to */ +#define KQ_SCHED_QUEUE_CPU 0x02 /* make kq affinitize the knote depending on the runtime cpu it's scheduled to */ +#define KQ_SCHED_WORK_STEALING 0x04 +#define KQ_SCHED_BEST_OF_N 0x08 +#define KQ_SCHED_GREEDY 0x16 //#define TEST_DEBUG struct thread_info { pthread_t thrd; int can_crash; + int ws_master; pthread_mutex_t lock; int group_id; int evcnt; @@ -89,7 +94,7 @@ socket_pop(int sockfd) /* Drain the read buffer, then make sure there are no more events. */ #ifdef TEST_DEBUG - printf("READ_M: popping the read buffer\n"); + printf("READ_M: popping the read buffer of sock %d\n", sockfd); #endif if (read(sockfd, &buf, 1) < 1) err(1, "read(2)"); @@ -432,6 +437,139 @@ test_socket_queue(void) success(); } +/*************************** + * WS test + ***************************/ +#define SOCK_WS_CNT (1000) + +volatile int ws_good = 0; + +static void* +test_socket_ws_worker(void* args) +{ + struct thread_info *info = (struct thread_info *) args; + char dat; + int ws_num = 0; + struct kevent *ret; + + while (1) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d waiting for events\n", info->tid); +#endif + ret = kevent_get(g_kqfd); +#ifdef TEST_DEBUG + printf("READ_M: thread %d woke up\n", info->tid); +#endif + + dat = socket_pop(ret->ident); + free(ret); + + if (info->ws_master == 0) { + /*if we are the master, wait for slave to signal us*/ + while(!ws_good) { + usleep(500); + } + break; + } else { + ws_num++; + if (ws_num == SOCK_WS_CNT - 1) { + ws_good = 1; + break; + } + } + } + +#ifdef TEST_DEBUG + printf("READ_M: thread %d exiting\n", info->tid); +#endif + pthread_exit(0); +} + +int ws_sockfd[SOCK_WS_CNT][2]; + +static void +test_socket_ws() +{ + struct kevent kev; + struct thread_info thrd_info[2]; + const char *test_id = "[Multi][WS]kevent(evfilt)"; + cpuset_t cpuset; + test_begin(test_id); + + for (int i = 0; i < SOCK_WS_CNT; i++) { + + /* Create a connected pair of full-duplex sockets for testing socket events */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &ws_sockfd[i][0]) < 0) { + err(1, "kevent_socket"); + } + + EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &ws_sockfd[i][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_ws_add"); + } + } + + srand(time(NULL)); + +#ifdef TEST_DEBUG + printf("READ_M: creating master thread...\n"); +#endif + for (int i = 0; i < 1; i++) { + thrd_info[i].tid = i; + thrd_info[i].ws_master = i; + pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]); + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) { + err(1, "thread_affinity"); + } + } + + sleep(3); + + for(int i = 0; i < SOCK_WS_CNT; i++) { +#ifdef TEST_DEBUG + printf("READ_M: pusing 1 packet to sock %d\n", i); +#endif + socket_push(ws_sockfd[i][1], '.'); + } + + sleep(1); + + for(int i = 1; i < 2; i++) { +#ifdef TEST_DEBUG + printf("READ_M: creating slave thread...\n"); +#endif + thrd_info[i].tid = i; + thrd_info[i].ws_master = i; + pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]); + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) { + err(1, "thread_affinity"); + } + } + + /* shutdown the systems */ +#ifdef TEST_DEBUG + printf("READ_M: waiting for threads to exit...\n"); +#endif + for (int i = 0; i < 2; i++) { + pthread_join(thrd_info[i].thrd, NULL); + } + + for (int i = 0; i < SOCK_WS_CNT; i++) { + EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &ws_sockfd[i][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_ws_delete"); + } + } + + success(); +} + /*************************** * Brutal test @@ -465,6 +603,9 @@ test_socket_brutal_worker(void* args) #endif if ((rand() % 100) < THREAD_EXIT_PROB) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d checking fake crash\n", info->tid); +#endif pthread_mutex_lock(&info->lock); #ifdef TEST_DEBUG printf("READ_M: thread %d trying to fake crash. Can crash: %d\n", info->tid, info->can_crash); @@ -478,6 +619,10 @@ test_socket_brutal_worker(void* args) pthread_mutex_unlock(&info->lock); } +#ifdef TEST_DEBUG + printf("READ_M: thread %d ident: %ld\n", info->tid, ret->ident); +#endif + dat = socket_pop(ret->ident); free(ret); @@ -529,6 +674,7 @@ test_socket_brutal() brute_threadinfo[i].tid = i; brute_threadinfo[i].evcnt = 0; brute_threadinfo[i].can_crash = ((i % 10) != 0); + pthread_mutex_init(&brute_threadinfo[i].lock, NULL); pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]); } @@ -574,6 +720,7 @@ test_socket_brutal() for (int i = 0; i < THREAD_BRUTE_CNT; i++) { pthread_join(brute_threadinfo[i].thrd, NULL); + pthread_mutex_destroy(&brute_threadinfo[i].lock); } for (int i = 0; i < SOCK_BRUTE_CNT; i++) { @@ -612,4 +759,21 @@ test_evfilt_read_m() test_socket_brutal(); close(g_kqfd); + + flags = KQ_SCHED_BEST_OF_N; + g_kqfd = kqueue(); + error = ioctl(g_kqfd, FKQMULTI, &flags); + + test_socket_brutal(); + + close(g_kqfd); + + flags = KQ_SCHED_WORK_STEALING; + g_kqfd = kqueue(); + error = ioctl(g_kqfd, FKQMULTI, &flags); + + test_socket_ws(); + test_socket_brutal(); + + close(g_kqfd); } \ No newline at end of file