From b2e5289a2db833bf82026941f1c9d5e3b3d7055d Mon Sep 17 00:00:00 2001 From: Charlie Root Date: Mon, 1 Apr 2019 21:35:09 -0400 Subject: [PATCH] kq domain + queue cache --- sys/kern/kern_event.c | 715 +++++++++++++++++++++------- sys/kern/kern_thr.c | 2 +- sys/sys/event.h | 21 +- sys/sys/eventvar.h | 39 +- tests/sys/kqueue/libkqueue/read_m.c | 256 +++++++++- 5 files changed, 838 insertions(+), 195 deletions(-) diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 981b6ed13bed..015acae63fbc 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -72,6 +72,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #ifdef KTRACE #include #endif @@ -100,6 +101,44 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF); TASKQUEUE_DEFINE_THREAD(kqueue_ctx); +// TODO: only use it in SMP +extern struct cpu_group *cpu_top; +#define AVG_WEIGHT_FACTOR_OLD (4) +#define AVG_WEIGHT_FACTOR_NEW (1) +#define TIMESPEC_DIV(org, val, out) do { \ + (out)->tv_nsec = ((org)->tv_sec * 1000000000L + (org)->tv_nsec) / (val); \ + (out)->tv_sec = 0; \ + while((out)->tv_nsec >= 1000000000L) { \ + (out)->tv_nsec -= 1000000000L; \ + (out)->tv_sec++; \ + } \ +} while(0) + +/* no need to handle overflow as long as the existing org/cur doesn't overflow */ +#define CALC_OVERTIME_AVG(org, cur, out) do { \ + (out)->tv_sec = (org)->tv_sec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \ + (out)->tv_nsec = (org)->tv_nsec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \ + TIMESPEC_DIV((out), (AVG_WEIGHT_FACTOR_OLD + AVG_WEIGHT_FACTOR_NEW), (out)); \ +} while(0) + +#define KEVQ_NEXT_AVAIL_LOCKED(out, head, element, member) do { \ + (out) = (element); \ + while(1) { \ + (out) = (out) == NULL ? LIST_FIRST((head)) : LIST_NEXT((out), member); \ + if ((out) != NULL) { \ + KEVQ_LOCK((out)); \ + if (KEVQ_AVAIL((out))) { \ + break; \ + } \ + KEVQ_UNLOCK((out)); \ + } \ + if ((out) == (element)) { \ + (out) = NULL; \ + break; \ + } \ + } \ +} while(0) + static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq); static void kevq_thred_init(struct kevq_thred *kevq_th); static void kevq_thred_destroy(struct kevq_thred *kevq_th); @@ -165,7 +204,6 @@ static void knote_enter_flux(struct knote *kn); static void knote_enter_flux_ul(struct knote *kn); static void knote_flux_wakeup_ul(struct knote *kn); static void knote_flux_wakeup(struct knote *kn); -static void knote_activate_ul(struct knote *kn); static void knote_activate(struct knote *kn); static int knote_attach(struct knote *kn, struct kqueue *kq); static void knote_drop(struct knote *kn, struct thread *td); @@ -177,6 +215,17 @@ static struct knote *knote_alloc(int mflag); static void knote_free(struct knote *kn); static void knote_sched(struct knote *kn); + +static void kqdom_init(struct kqdom *kqd); +static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg); +static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq); +static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq); +static void kqdom_destroy(struct kqdom *root); +static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg); +static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id); +static struct kqdom * kqdom_build(void); +static struct kqdom * kqdom_find(struct kqdom *root, int cpuid); + static void filt_kqdetach(struct knote *kn); static int filt_kqueue(struct knote *kn, long hint); static int filt_procattach(struct knote *kn); @@ -233,6 +282,8 @@ static unsigned int kq_calloutmax = 4 * 1024; SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, &kq_calloutmax, 0, "Maximum number of callouts allocated for kqueue"); +//#define ENABLE_SELECT + #define KTR_KQ (KTR_SPARE5) #define KQ_LOCK(kq) do { \ @@ -244,6 +295,18 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KEVQ_TH_LOCK(kevqth) do { \ mtx_lock(&(kevqth)->lock); \ } while (0) +#define KQD_LOCK(kqd) do { \ + mtx_lock(&(kqd)->kqd_lock); \ +} while (0) +#define KQD_LOCK_BOTH(kqd1, kqd2) do { \ + if (kqd1->id < kqd2->id) { \ + KQD_LOCK(kqd1); \ + KQD_LOCK(kqd2); \ + } else { \ + KQD_LOCK(kqd2); \ + KQD_LOCK(kqd1); \ + } \ +} while(0) #define KEVQ_LOCK(kevq) do { \ mtx_lock(&(kevq)->lock); \ } while (0) @@ -263,6 +326,13 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KEVQ_TH_UNLOCK(kevqth) do { \ mtx_unlock(&(kevqth)->lock); \ } while (0) +#define KQD_UNLOCK(kqd) do { \ + mtx_unlock(&(kqd)->kqd_lock); \ +} while (0) +#define KQD_UNLOCK_BOTH(kqd1, kqd2) do { \ + KQD_UNLOCK(kqd1); \ + KQD_UNLOCK(kqd2); \ +} while (0) #define KEVQ_UNLOCK(kevq) do { \ mtx_unlock(&(kevq)->lock); \ } while (0) @@ -281,9 +351,14 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KEVQ_OWNED(kevq) do { \ mtx_assert(&(kevq)->lock, MA_OWNED); \ } while (0) +#define KQD_OWNED(kqd) do { \ + mtx_assert(&(kqd)->kqd_lock, MA_OWNED); \ +} while (0) #define KEVQ_NOTOWNED(kevq) do { \ mtx_assert(&(kevq)->lock, MA_NOTOWNED); \ } while (0) +#define KEVQ_AVAIL(kevq) ((((kevq)->kevq_state & KEVQ_CLOSING) == 0) && (((kevq)->kevq_state & KEVQ_RDY) != 0)) + static struct knlist * kn_list_lock(struct knote *kn) { @@ -378,7 +453,7 @@ knote_leave_flux(struct knote *kn) #ifndef KN_HASHSIZE #define KN_HASHSIZE 64 /* XXX should be tunable */ -#define KEVQ_HASHSIZE 64 +#define KEVQ_HASHSIZE 128 #endif #define KN_HASH(val, mask) (((val) ^ (val >> 8)) & (mask)) @@ -476,7 +551,7 @@ filt_kqueue(struct knote *kn, long hint) return 0; } - kevq = TAILQ_FIRST(&kq->kq_kevqlist); + kevq = kq->kq_kevq; if (kevq == NULL) { return 0; @@ -540,7 +615,7 @@ filt_procattach(struct knote *kn) * is registered. */ if (immediate || (exiting && filt_proc(kn, NOTE_EXIT))) - knote_activate_ul(kn); + knote_activate(kn); PROC_UNLOCK(p); @@ -614,6 +689,7 @@ knote_fork(struct knlist *list, struct thread *td, int pid) struct kevq *kevq; struct kevent kev; int error; + int event; MPASS(list != NULL); KNL_ASSERT_LOCKED(list); @@ -640,11 +716,13 @@ knote_fork(struct knlist *list, struct thread *td, int pid) * The same as knote(), activate the event. */ if ((kn->kn_sfflags & NOTE_TRACK) == 0) { - if(kn->kn_fop->f_event(kn, NOTE_FORK)) + event = kn->kn_fop->f_event(kn, NOTE_FORK); + KQ_UNLOCK(kq); + + if (event) knote_activate(kn); KN_LEAVE_FLUX_WAKEUP(kn); - KQ_UNLOCK(kq); continue; } @@ -691,7 +769,7 @@ knote_fork(struct knlist *list, struct thread *td, int pid) if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) - knote_activate_ul(kn); + knote_activate(kn); list->kl_lock(list->kl_lockarg); KN_LEAVE_FLUX_WAKEUP(kn); @@ -778,7 +856,7 @@ filt_timerexpire(void *knx) kn->kn_data++; knote_enter_flux_ul(kn); - knote_activate_ul(kn); + knote_activate(kn); KN_LEAVE_FLUX_WAKEUP(kn); if ((kn->kn_flags & EV_ONESHOT) != 0) @@ -1307,16 +1385,14 @@ kern_kevent(struct thread *td, int fd, int nchanges, int nevents, static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq) { - struct kevq *kevq_found, *kevq_each; + struct kevq *kevq_found, *kevq_each, *tkevq; kevq_found = NULL; - if (!SLIST_EMPTY(kevq_list)) { - SLIST_FOREACH(kevq_each, kevq_list, kevq_th_e) { - if (kevq_each->kq == kq) { - kevq_found = kevq_each; - break; - } + LIST_FOREACH_SAFE(kevq_each, kevq_list, kevq_th_e, tkevq) { + if (kevq_each->kq == kq) { + kevq_found = kevq_each; + break; } } @@ -1329,6 +1405,7 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan { struct kevent keva[KQ_NEVENTS]; struct kevent *kevp, *changes; + struct timespec cur_ts; int i, n, nerrors, error; if ((kq->kq_state & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) { @@ -1339,6 +1416,30 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan KEVQ_UNLOCK(kevq); } + KEVQ_LOCK(kevq); + /* prob don't need the lock here as these are only accessible by one thread */ + if (kevq->kevq_last_nkev != 0) + { + /* make sure we actually processed events last time */ + + getnanouptime(&cur_ts); + timespecsub(&cur_ts, &kevq->kevq_last_kev, &cur_ts); + /* divide by the number of events processed */ + TIMESPEC_DIV(&cur_ts, kevq->kevq_last_nkev, &cur_ts); + if (timespecisset(&kevq->kevq_avg_lat)) { + CALC_OVERTIME_AVG(&kevq->kevq_avg_lat, &kevq->kevq_avg_lat, &cur_ts); + } else { + /* first time */ + timespecadd(&cur_ts, &kevq->kevq_avg_lat, &kevq->kevq_avg_lat); + } + + CTR4(KTR_KQ, "kevent: td %d spent %ld s %ld ns per event on %d events", td->td_tid, cur_ts.tv_sec, cur_ts.tv_nsec, kevq->kevq_last_nkev); + /* clear parameters */ + timespecclear(&kevq->kevq_last_kev); + kevq->kevq_last_nkev = 0; + } + KEVQ_UNLOCK(kevq); + nerrors = 0; while (nchanges > 0) { n = nchanges > KQ_NEVENTS ? KQ_NEVENTS : nchanges; @@ -1405,7 +1506,7 @@ kern_kevent_anonymous(struct thread *td, int nevents, kqueue_init(&kq); kevq_init(&kevq); - TAILQ_INSERT_HEAD(&kq.kq_kevqlist, &kevq, kq_e); + kq.kq_kevq = &kevq; kevq.kq = &kq; kq.kq_refcnt = 1; kevq.kevq_refcnt = 1; @@ -1510,7 +1611,7 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct struct file *fp; struct knote *kn, *tkn; struct knlist *knl; - int error, filt, event; + int error, filt; int haskqglobal, filedesc_unlock; CTR5(KTR_KQ, "kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X", kq, kevq, (int)kev->ident, kev->filter, kev->flags); @@ -1663,6 +1764,8 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct } knote_xinit(kn); kn->kn_kevq = kevq; + // this is set later depending on the scheduled CPU + kn->kn_kqd = NULL; kn->kn_fp = fp; kn->kn_kq = kq; kn->kn_fop = fops; @@ -1720,10 +1823,12 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct /* We have the exclusive lock */ knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); + KQ_UNLOCK(kq); // we have kq lock and knote influx if (kev->flags & EV_FORCEONESHOT) { kn->kn_flags |= EV_ONESHOT; + knote_activate(kn); } @@ -1738,8 +1843,6 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct * triggered. */ kn->kn_status |= KN_SCAN; - - KQ_UNLOCK(kq); knl = kn_list_lock(kn); kn->kn_kevent.udata = kev->udata; @@ -1761,25 +1864,15 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct * KN_DISABLED will be stable while the knote is in flux, so the * unlocked read will not race with an update. */ - if ((kn->kn_status & KN_DISABLED) == 0) - event = kn->kn_fop->f_event(kn, 0); - else - event = 0; + if ((kn->kn_status & KN_DISABLED) == 0 && kn->kn_fop->f_event(kn, 0)) + kn->kn_status |= KN_ACTIVE; - KQ_LOCK(kq); - - if (event) - kn->kn_status |= KN_ACTIVE; - if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == - KN_ACTIVE) + if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == KN_ACTIVE) knote_activate(kn); kn->kn_status &= ~KN_SCAN; - KN_LEAVE_FLUX_WAKEUP(kn); - KQ_UNLOCK(kq); - kn_list_unlock(knl); done: @@ -1797,40 +1890,32 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct static void kevq_thred_init(struct kevq_thred *kevq_th) { mtx_init(&kevq_th->lock, "kevq_th", NULL, MTX_DEF | MTX_DUPOK); - TAILQ_INIT(&kevq_th->kevq_tq); + LIST_INIT(&kevq_th->kevq_list); } static void kevq_thred_destroy(struct kevq_thred *kevq_th) { free(kevq_th->kevq_hash, M_KQUEUE); free(kevq_th, M_KQUEUE); + CTR1(KTR_KQ, "kevq_thred_destroy: freed kevq_th %p", kevq_th); } void -kevq_thred_drain(struct kevq_thred *kevq_th) { - /* [Solved] Race here on kevq. Consider: - * Thread 1: Grabs KEVQ ptr for thread 2 from kevq_list in knote_sched - * Thread 2: crashes and calls kevq_drain, destorys kevq - * Thread 1: KEVQ_ACQUIRE((already freed)) - * Maybe require holding the KQ lock when deleting to make sure nothing queries - * the kevq_list during deletion - - * this is solved by holding the KQ lock while scheduling - * thread 1 either accesses kevq_list (which requires KQ lock) before or after thread 2's kevq is deleted from the KQ - * if before, then thread 2's kevq cannot be freed because kevq_drain deletes from kevq_list then frees kevq - * if after, then thread 2's kevq cannot be found in kevq_list. - */ +kevq_thred_drain(struct kevq_thred *kevq_th, struct thread* td) { struct kevq *kevq; - int error; - - error = 0; - while((kevq = TAILQ_FIRST(&kevq_th->kevq_tq)) != NULL) { - error = kevq_acquire(kevq); - if (!error) { - CTR1(KTR_KQ, "kevq_thred_drain: draining kevq %p\n", kevq); + + CTR2(KTR_KQ, "kevq_thred_drain: draining kevq_th %p on thread %d", kevq_th, td->td_tid); + + KEVQ_TH_LOCK(kevq_th); + while((kevq = LIST_FIRST(&kevq_th->kevq_list)) != NULL) { + if (kevq_acquire(kevq) == 0) { + CTR2(KTR_KQ, "kevq_thred_drain: draining kevq %p on kevq_th %p", kevq, kevq_th); + KEVQ_TH_UNLOCK(kevq_th); kevq_drain(kevq); + KEVQ_TH_LOCK(kevq_th); } } + KEVQ_TH_UNLOCK(kevq_th); kevq_thred_destroy(kevq_th); } @@ -1839,6 +1924,8 @@ static void kevq_init(struct kevq *kevq) { mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK); TAILQ_INIT(&kevq->kn_head); + timespecclear(&kevq->kevq_avg_lat); + timespecclear(&kevq->kevq_last_kev); } static void @@ -1873,7 +1960,7 @@ kevq_acquire(struct kevq *kevq) return error; } -/* a reference to kq should be held */ +/* a reference to kq must be held */ static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) { @@ -1882,6 +1969,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) struct kevq_thred *kevq_th; struct kevq *kevq, *alloc_kevq; struct kevqlist *kevq_list; + struct kqdom *kqd; kevq = NULL; error = 0; @@ -1895,6 +1983,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) } if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + // allocate KEVQ_TH if (td->td_kevq_thred == NULL) { kevq_th = malloc(sizeof(struct kevq_thred), M_KQUEUE, M_WAITOK | M_ZERO); kevq_thred_init(kevq_th); @@ -1919,22 +2008,28 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) KASSERT(kevq_th != NULL && kevq_th->kevq_hashmask != 0, ("unallocated kevq")); - // fast fail KEVQ_TH_LOCK(kevq_th); kevq_list = &kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq_th->kevq_hashmask)]; kevq = kevqlist_find(kevq_list, kq); KEVQ_TH_UNLOCK(kevq_th); + // allocate kevq if (kevq == NULL) { - // allocate kevq to_free = NULL; alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); kevq_init(alloc_kevq); alloc_kevq->kq = kq; alloc_kevq->kevq_th = kevq_th; - CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq %p for thread %d", alloc_kevq, td->td_tid); + // assign the proper kqdomain + KASSERT(kq->kq_kqd != NULL, ("kqdom doesn't exist after referecing kq")); + kqd = kqdom_find(kq->kq_kqd, td->td_oncpu); + alloc_kevq->kevq_kqd = kqd; + CTR4(KTR_KQ, "kevq_acquire_kq(M): allocated kevq %p for thread %d (oncpu = %d), kqdom %d", alloc_kevq, td->td_tid, td->td_oncpu, kqd->id); + + KQ_LOCK(kq); + KQD_LOCK(kqd); KEVQ_TH_LOCK(kevq_th); kevq = kevqlist_find(kevq_list, kq); /* TODO: probably don't need to re-check unless a thread can asynchronously call @@ -1942,18 +2037,19 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) if (kevq == NULL) { kevq = alloc_kevq; // insert kevq to the kevq_th hash table - SLIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); + LIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); // insert kevq to the kevq_th list, the list is used to drain kevq - TAILQ_INSERT_HEAD(&kevq_th->kevq_tq, kevq, kevq_th_tqe); + LIST_INSERT_HEAD(&kevq_th->kevq_list, kevq, kevq_th_tqe); - KQ_LOCK(kq); - // insert to kq's kevq list - TAILQ_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); - KQ_UNLOCK(kq); + LIST_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); + + kqdom_insert(kqd, kevq); } else { to_free = alloc_kevq; } KEVQ_TH_UNLOCK(kevq_th); + KQD_UNLOCK(kqd); + KQ_UNLOCK(kq); if (to_free != NULL) { free(to_free, M_KQUEUE); @@ -1963,10 +2059,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) KASSERT(kevq != NULL, ("kevq isn't allocated.")); } else { - // prob don't need this lock depending on whether TAILQ_FIRST is atomic - KQ_LOCK(kq); - kevq = TAILQ_FIRST(&kq->kq_kevqlist); - KQ_UNLOCK(kq); + kevq = kq->kq_kevq; if (kevq == NULL) { alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); CTR2(KTR_KQ, "kevq_acquire_kq(S): allocated kevq %p for kq %p", alloc_kevq, kq); @@ -1974,8 +2067,8 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) alloc_kevq->kq = kq; KQ_LOCK(kq); - if ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) == NULL) { - TAILQ_INSERT_HEAD(&kq->kq_kevqlist, alloc_kevq, kq_e); + if ((kevq = kq->kq_kevq) == NULL) { + kq->kq_kevq = alloc_kevq; kevq = alloc_kevq; } else { to_free = alloc_kevq; @@ -2001,6 +2094,7 @@ static int kqueue_acquire(struct file *fp, struct kqueue **kqp) { struct kqueue *kq; + struct kqdom *kqd; kq = fp->f_data; if (fp->f_type != DTYPE_KQUEUE || kq == NULL) @@ -2017,6 +2111,20 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp) kq->kq_refcnt++; KQ_UNLOCK(kq); + if (((kq->kq_state & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) { + kqd = kqdom_build(); + KQ_LOCK(kq); + if (kq->kq_kqd == NULL) { + kq->kq_kqd = kqd; + kqd = NULL; + } + KQ_UNLOCK(kq); + + if (kqd != NULL) { + kqdom_destroy(kqd); + } + } + return 0; } @@ -2071,6 +2179,195 @@ kqueue_schedtask(struct kqueue *kq) } } +/* +// not guaranteed to have a children +static struct kqdom * +kqdom_next_leaf(struct kqdom *kqd) +{ + struct kqdom *parent; + struct kqdom *cur; + struct kqdom *next; + struct kqdom *each_child; + + cur = kqd; + parent = cur->parent; + next = NULL; + + // move right once + while (cur != NULL) { + next = TAILQ_NEXT(cur, child_e); + + if (next != NULL && next->num_kevq > 0) + break; + + cur = parent; + parent = cur->parent; + } + + // if the selected kqdom isn't leaf, return a leaf + while (next != NULL && next->num_children > 0) { + TAILQ_FOREACH(each_child, &next->children, child_e) { + if (each_child->num_kevq > 0) { + if (each_child->num_children == 0) { + // return if we have a valid leaf node + break; + } + // we have a non-leaf node, set next to child and try again + next = each_child; + continue; + } + } + // we traversed all children and nobody has >0 kevqs, return NULL + next = NULL; + break; + } + + return next; +} +*/ + +static void +kqdom_init(struct kqdom *kqd) +{ + mtx_init(&kqd->kqd_lock, "kqdom_lock", NULL, MTX_DEF | MTX_DUPOK); + LIST_INIT(&kqd->kqd_kevqlist); + TAILQ_INIT(&kqd->children); +} + +/* inserts a list*/ +static void +kqdom_insert(struct kqdom *kqd, struct kevq *kevq) +{ + int val; + struct kqdom* parent; + KQD_OWNED(kqd); + KASSERT(kqd->num_children == 0, ("inserting into a non-leaf kqdom")); + LIST_INSERT_HEAD(&kqd->kqd_kevqlist, kevq, kqd_e); + /* TODO: don't hold the lock while doing the update */ + parent = kqd; + while(parent != NULL) { + val = atomic_fetchadd_int(&parent->num_kevq, 1); + KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0")); + parent = parent->parent; + } +} + +/* removes a list */ +static void +kqdom_remove(struct kqdom *kqd, struct kevq *kevq) +{ + int val; + struct kqdom* parent; + KQD_OWNED(kqd); + KASSERT(kqd->num_children == 0, ("removing from a non-leaf kqdom")); + LIST_REMOVE(kevq, kqd_e); + /* TODO: don't hold the lock while doing the update */ + parent = kqd; + while(parent != NULL) { + val = atomic_fetchadd_int(&parent->num_kevq, -1); + KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0")); + parent = parent->parent; + } + + if (kqd->kqd_ckevq == kevq) { + kqd->kqd_ckevq = LIST_NEXT(kevq, kqd_e); + } +} + +static void +kqdom_destroy(struct kqdom *root) +{ + struct kqdom *kqdom, *tkqd; + + TAILQ_FOREACH_SAFE(kqdom, &root->children, child_e, tkqd) { + kqdom_destroy(kqdom); + } + + CTR2(KTR_KQ, "kqdom_destroy: destroyed kqdom %p with %d child kqdoms", root, root->num_children); + KASSERT(LIST_FIRST(&root->kqd_kevqlist) == NULL, ("freeing a kqdom with kevqs")); + free(root, M_KQUEUE); +} + +static void +kqdom_update_stats(struct kqdom *leaf, struct timespec *avg) +{ + struct timespec last_avg; + last_avg.tv_sec = avg->tv_sec; + last_avg.tv_nsec = avg->tv_nsec; + + while(leaf != NULL) { + KQD_LOCK(leaf); + + CALC_OVERTIME_AVG(&leaf->kqd_avg_lat, &last_avg, &leaf->kqd_avg_lat); + CTR3(KTR_KQ, "kqdom_update_stats: updated avg lat %ld sec %ld for kqdom %d", + leaf->kqd_avg_lat.tv_sec, leaf->kqd_avg_lat.tv_nsec, leaf->id); + + last_avg.tv_sec = leaf->kqd_avg_lat.tv_sec; + last_avg.tv_nsec = leaf->kqd_avg_lat.tv_nsec; + + KQD_UNLOCK(leaf); + + leaf = leaf->parent; + } +} + +/* DFS to mirror the cpu_group structure */ +static void +kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id) +{ + struct kqdom *child; + int cg_numchild = cg_cur->cg_children; + CTR4(KTR_KQ, "kqdom_build_internal: processing cpu_group with %d child groups, %d CPUs, shared cache level %d, kqd_id %d", + cg_numchild, cg_cur->cg_count, cg_cur->cg_level, *kqd_id); + + // init fields for current + kqd_cur->id = *kqd_id; + (*kqd_id)++; + kqd_cur->num_children = cg_numchild; + CPU_COPY(&cg_cur->cg_mask, &kqd_cur->cpu_mask); + + for (int i = 0; i < cg_numchild; i++) { + child = malloc(sizeof(struct kqdom), M_KQUEUE, M_WAITOK | M_ZERO); + kqdom_init(child); + + child->parent = kqd_cur; + + TAILQ_INSERT_TAIL(&kqd_cur->children, child, child_e); + kqdom_build_internal(child, &cg_cur->cg_child[i], kqd_id); + } +} + +static struct kqdom * +kqdom_build() +{ + int kqd_id = 0; + CTR0(KTR_KQ, "kqueue_build_sched: mirroring cpu_group..."); + struct kqdom* kqd_root = malloc(sizeof(struct kqdom), M_KQUEUE, M_WAITOK | M_ZERO); + kqdom_init(kqd_root); + kqdom_build_internal(kqd_root, cpu_top, &kqd_id); + return kqd_root; +} + +static struct kqdom * +kqdom_find(struct kqdom *root, int cpuid) +{ + struct kqdom *child, *tchild; + + if (root->num_children == 0) { + KASSERT(CPU_ISSET(cpuid, &root->cpu_mask), ("kqdom_find: cpuid and cpumask mismatch")); + return root; + } + + TAILQ_FOREACH_SAFE(child, &root->children, child_e, tchild) { + if (CPU_ISSET(cpuid, &child->cpu_mask)) { + return kqdom_find(child, cpuid); + } + } + + KASSERT(0, ( "kqdom_find: cpu doesn't exist ")); + return NULL; +} + /* * Expand the kq to make sure we have storage for fops/ident pair. * @@ -2403,6 +2700,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knote_flux_wakeup_ul(marker); } + if (nkev != 0) { + /* book keep the statistics */ + getnanouptime(&kevq->kevq_last_kev); + kevq->kevq_last_nkev = nkev; + } + KEVQ_UNLOCK(kevq); CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid); knote_free(marker); @@ -2465,14 +2768,15 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd); switch (cmd) { case FKQMULTI: + KQ_LOCK(kq); if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { error = (EINVAL); } else { - CTR1(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p", kq); - KQ_LOCK(kq); + CTR2(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p, scheduler flags: %d", kq, *(int*)data); kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); - KQ_UNLOCK(kq); + kq->kq_sched_flags = *(int*)data; } + KQ_UNLOCK(kq); break; default: error = (ENOTTY); @@ -2548,9 +2852,11 @@ kevq_drain(struct kevq *kevq) { struct kqueue *kq; struct knote *kn; + struct kqdom *kqd; struct kevqlist *kevq_list; CTR3(KTR_KQ, "kevq_drain for %p (refcnt = %d) with %d knotes", kevq, kevq->kevq_refcnt, kevq->kn_count); kq = kevq->kq; + kqd = kevq->kevq_kqd; KQ_NOTOWNED(kq); KEVQ_NOTOWNED(kevq); @@ -2565,6 +2871,7 @@ kevq_drain(struct kevq *kevq) kevq->kevq_state |= KEVQ_CLOSING; } + // Wait for extra references to the kevq if (kevq->kevq_refcnt > 1) msleep(&kevq->kevq_refcnt, &kevq->lock, PSOCK, "kevqclose1", 0); @@ -2573,7 +2880,6 @@ kevq_drain(struct kevq *kevq) /* drain all knotes on the kevq */ while ((kn = TAILQ_FIRST(&kevq->kn_head)) != NULL) { -retry: KEVQ_OWNED(kevq); KN_FLUX_LOCK(kn); /* Wait for kn to stablize */ @@ -2582,7 +2888,7 @@ kevq_drain(struct kevq *kevq) CTR2(KTR_KQ, "kevq_drain %p fluxwait knote %p", kevq, kn); KN_FLUX_UNLOCK(kn); msleep(kn, &kevq->lock, PSOCK, "kevqclose2", 0); - goto retry; + continue; } CTR2(KTR_KQ, "kevq_drain %p draining knote %p", kevq, kn); @@ -2592,11 +2898,15 @@ kevq_drain(struct kevq *kevq) knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); + knote_dequeue(kn); if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) { KEVQ_UNLOCK(kevq); - knote_activate_ul(kn); + /* TODO: When we knote activate, if the ev has EV_CLEAR set, maybe we shouldn't activate the event + * if there hasn't been activities on the fd + */ + knote_activate(kn); KEVQ_LOCK(kevq); } @@ -2604,24 +2914,52 @@ kevq_drain(struct kevq *kevq) } KASSERT(kevq->kn_count == 0, ("some knotes are left")); + KEVQ_OWNED(kevq); KEVQ_UNLOCK(kevq); - KQ_LOCK(kq); - if (kq->kq_ckevq == kevq) { - kq->kq_ckevq = TAILQ_NEXT(kevq, kq_e); - } - TAILQ_REMOVE(&kq->kq_kevqlist, kevq, kq_e); - KQ_UNLOCK(kq); + // + // Here it's guaranteed that no knotes contain a pointer to the kevq + // + // First, all knotes with kn->kn_kevq != kevq before queuing is not an issue + // because if kn->kn_kevq == NULL, scheduler will grab kevq from either kqdom (QC) or kevqlist (RR) or kn->orgkevq (EV_AFFINITY) + // EV_AFFINITY is currently broken (need to keep a list of EV_AFFINITY for each kevq and delete them atomically) + // KEVQs grabbed from QC or RR are locked with QC or RR locked, therefore they are either grabbed before kevq invalidation + // or after kevq detachment. (In between doesn't matter since kevq is already invalidated) + // In the former case, the knote would be queued to the kevq and later drained as usual. + // In the latter case, the kevq would not be found at all because it's already removed from QC or RR. + // + // Second, for all knotes with kn->kn_kevq == kevq. They would be already queued to kevq + // and will be dequeued later (kn->kn_kevq will be set to another valid kevq) + // if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { - // drop from kq if in multithreaded mode + // drop from KQ Domain + KQ_LOCK(kq); + KQD_LOCK(kqd); KEVQ_TH_LOCK(kevq->kevq_th); - TAILQ_REMOVE(&kevq->kevq_th->kevq_tq, kevq, kevq_th_tqe); - kevq_list = &kevq->kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq->kevq_th->kevq_hashmask)]; - SLIST_REMOVE(kevq_list, kevq, kevq, kevq_th_e); - KEVQ_TH_UNLOCK(kevq->kevq_th); - } + // detach from kevq_th + LIST_REMOVE(kevq, kevq_th_tqe); + kevq_list = &kevq->kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq->kevq_th->kevq_hashmask)]; + LIST_REMOVE(kevq, kevq_th_e); + + // detach from kqdom + kqdom_remove(kqd, kevq); + + // detach from kqueue + if (kq->kq_ckevq == kevq) { + kq->kq_ckevq = LIST_NEXT(kevq, kq_e); + } + LIST_REMOVE(kevq, kq_e); + + KEVQ_TH_UNLOCK(kevq->kevq_th); + KQD_UNLOCK(kqd); + KQ_UNLOCK(kq); + } else { + KQ_LOCK(kq); + kq->kq_kevq = NULL; + KQ_UNLOCK(kq); + } /* delete the kevq */ kevq_destroy(kevq); @@ -2634,7 +2972,6 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) { struct knote *kn; int i; - int error; CTR2(KTR_KQ, "kqueue_drain on %p. args kevq %p", kq, kevq); @@ -2651,25 +2988,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) KASSERT(knlist_empty(&kq->kq_sel.si_note), ("kqueue's knlist not empty")); - - KQ_UNLOCK(kq); - - error = 0; - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { - /* drain all kevqs belonging to the kq */ - while ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) != NULL) { - error = kevq_acquire(kevq); - if (!error) { - kevq_drain(kevq); - } - } - } else { - if (kevq != NULL) { - kevq_drain(kevq); - } - } - KQ_LOCK(kq); - + // destroy knotes first for (i = 0; i < kq->kq_knlistsize; i++) { while ((kn = SLIST_FIRST(&kq->kq_knlist[i])) != NULL) { KN_FLUX_LOCK(kn); @@ -2705,6 +3024,26 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) } } + // destroy kqdoms and kevqs + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + while((kevq = LIST_FIRST(&kq->kq_kevqlist)) != NULL) { + KQ_UNLOCK(kq); + if (kevq_acquire(kevq) == 0) + kevq_drain(kevq); + KQ_LOCK(kq); + } + + KQ_OWNED(kq); + kqdom_destroy(kq->kq_kqd); + } else { + KQ_UNLOCK(kq); + // we already have a reference for single threaded mode + kevq_drain(kq->kq_kevq); + KQ_LOCK(kq); + } + + KQ_OWNED(kq); + if ((kq->kq_state & KQ_TASKSCHED) == KQ_TASKSCHED) { kq->kq_state |= KQ_TASKDRAIN; msleep(&kq->kq_state, &kq->kq_lock, PSOCK, "kqtqdr", 0); @@ -2832,7 +3171,7 @@ knote(struct knlist *list, long hint, int lockflags) { struct kqueue *kq; struct knote *kn, *tkn; - int require_kqlock; + int require_kqlock, kn_active; if (list == NULL) return; @@ -2852,7 +3191,7 @@ knote(struct knlist *list, long hint, int lockflags) SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { CTR1(KTR_KQ, "knote() scanning kn %p", kn); KN_FLUX_LOCK(kn); - if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { + if (kn_in_flux(kn) && ((kn->kn_status & KN_SCAN) == 0)) { /* * Do not process the influx notes, except for * the influx coming from the kq unlock in the @@ -2863,6 +3202,7 @@ knote(struct knlist *list, long hint, int lockflags) */ KN_FLUX_UNLOCK(kn); } else { + // either not influx or being scanned kq = kn->kn_kq; knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); @@ -2872,14 +3212,14 @@ knote(struct knlist *list, long hint, int lockflags) if (require_kqlock) KQ_LOCK(kq); - if (kn->kn_fop->f_event(kn, hint)){ - // TODO: WTH is this? - require_kqlock ? knote_activate(kn) : knote_activate_ul(kn); - } + kn_active = kn->kn_fop->f_event(kn, hint); if (require_kqlock) KQ_UNLOCK(kq); + if (kn_active) + knote_activate(kn); + KN_LEAVE_FLUX_WAKEUP(kn); } } @@ -2900,16 +3240,11 @@ static void knote_flux_wakeup(struct knote *kn) { KN_FLUX_OWNED(kn); + if (kn->kn_fluxwait) { CTR1(KTR_KQ, "waking up kn %p", kn); + kn->kn_fluxwait = 0; wakeup(kn); -} - -static void -knote_activate_ul(struct knote *kn) -{ - KQ_LOCK(kn->kn_kq); - knote_activate(kn); - KQ_UNLOCK(kn->kn_kq); + } } /* @@ -2923,8 +3258,9 @@ knote_activate(struct knote *kn) struct kqueue *kq; kq = kn->kn_kq; - CTR1(KTR_KQ, "knote_activate: kn %p", kn); - KQ_OWNED(kq); + KQ_NOTOWNED(kq); + + CTR2(KTR_KQ, "knote_activate: kn %p, flags %d", kn, kn->kn_status); KN_FLUX_NOTOWNED(kn); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); @@ -2932,9 +3268,15 @@ knote_activate(struct knote *kn) if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) { knote_sched(kn); + } else { + CTR2(KTR_KQ, "knote_activate: kn %p, flags %d not sched", kn, kn->kn_status); } +#ifdef ENABLE_SELECT + KQ_LOCK(kq); kqueue_wakeup(kq); + KQ_UNLOCK(kq); +#endif } /* @@ -3333,61 +3675,92 @@ knote_drop_detached(struct knote *kn, struct thread *td) } +/* select the next kevq based on knote and scheduler flags and locks the returned kevq */ +static struct kevq * +knote_next_kevq(struct knote *kn) +{ + struct kqdom *kqd; + struct kqueue *kq; + struct kevq *next_kevq; + + next_kevq = NULL; + kq = kn->kn_kq; + + CTR1(KTR_KQ, "knote_next_kevq: processing kn %p", kn); + + if ((kq->kq_state & KQ_FLAG_MULTI) == 0) { + // single threaded mode, just return the current kevq + KQ_LOCK(kn->kn_kq); + if ((kq->kq_state & KQ_CLOSING) == 0) + next_kevq = kn->kn_kq->kq_kevq; + KQ_UNLOCK(kn->kn_kq); + + if (next_kevq != NULL) + KEVQ_LOCK(next_kevq); + + CTR2(KTR_KQ, "knote_next_kevq: [LEGACY] next kevq %p for kn %p", next_kevq, kn); + return next_kevq; + } + + if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { + next_kevq = kn->kn_org_kevq; + KEVQ_LOCK(next_kevq); + if (!KEVQ_AVAIL(next_kevq)) { + KEVQ_UNLOCK(next_kevq); + next_kevq = NULL; + } + + CTR2(KTR_KQ, "knote_next_kevq: [AFFIN] next kevq %p for kn %p", kn, next_kevq); + return next_kevq; + } + + if ((kq->kq_sched_flags & KQ_SCHED_QUEUE) != 0) { + if (kn->kn_kqd == NULL) { + /* the first time knote is queued, record the kqdom */ + kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid)); + + KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2")); + CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] knote %p attached to kqdom id %d", kn, kn->kn_kqd->id); + } + + kqd = kn->kn_kqd; + KQD_LOCK(kqd); + + KEVQ_NEXT_AVAIL_LOCKED(next_kevq, &kqd->kqd_kevqlist, kqd->kqd_ckevq, kqd_e); + kqd->kqd_ckevq = next_kevq; + + KQD_UNLOCK(kqd); + + CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] next kevq %p for kn %p", next_kevq, kn); + } + + // generic round-robbin + if (next_kevq == NULL) { + KQ_LOCK(kq); + + KEVQ_NEXT_AVAIL_LOCKED(next_kevq, &kq->kq_kevqlist, kq->kq_ckevq, kq_e); + kq->kq_ckevq = next_kevq; + + KQ_UNLOCK(kq); + CTR2(KTR_KQ, "knote_next_kevq: [RR] next kevq %p for kn %p", next_kevq, kn); + } + + if (next_kevq != NULL) + KEVQ_OWNED(next_kevq); + + return next_kevq; +} + // if no kevqs are available for queueing, returns NULL static void knote_sched(struct knote *kn) { - struct kqueue *kq = kn->kn_kq; struct kevq *next_kevq; - - KQ_OWNED(kq); + KASSERT(kn_in_flux(kn), ("kn not in flux")); - // reschedule knotes to available threads - if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { - CTR1(KTR_KQ, "knote_sched: affinity set: kn %p", kn); - if ((kn->kn_org_kevq->kevq_state & KEVQ_RDY) != 0) { - next_kevq = kn->kn_org_kevq; - KEVQ_LOCK(next_kevq); - } else { - next_kevq = NULL; - } - } else { - next_kevq = kq->kq_ckevq; - while(1) { - if (next_kevq == NULL) { - next_kevq = TAILQ_FIRST(&kq->kq_kevqlist); - if (next_kevq == NULL) { - CTR1(KTR_KQ, "knote_sched: no kevqs exist for queueing kn %p, discarding...", kn); - break; - } - } else { - next_kevq = TAILQ_NEXT(next_kevq, kq_e); - if (next_kevq == NULL) { - continue; - } - } - - KASSERT(next_kevq != NULL, ("picked a null kevq")); - KEVQ_LOCK(next_kevq); - - if ((next_kevq->kevq_state & KEVQ_CLOSING) == 0 && (next_kevq->kevq_state & KEVQ_RDY) != 0) { - kq->kq_ckevq = next_kevq; - break; - } - - KEVQ_UNLOCK(next_kevq); - - if (next_kevq == kq->kq_ckevq) { - // if the previous "if" didn't break - // we have traversed the list once and the current kevq is closing - // we have no queue to queue the knote - CTR1(KTR_KQ, "knote_sched: no open kevqs for queueing kn %p, discarding...", kn); - next_kevq = NULL; - break; - } - } - } + // note that kevq will be locked after this + next_kevq = knote_next_kevq(kn); CTR2(KTR_KQ, "knote_sched: next kevq %p for kn %p", next_kevq, kn); diff --git a/sys/kern/kern_thr.c b/sys/kern/kern_thr.c index a55d7cc4019f..97259f0eed94 100644 --- a/sys/kern/kern_thr.c +++ b/sys/kern/kern_thr.c @@ -335,7 +335,7 @@ kern_thr_exit(struct thread *td) * Release the event queues */ if (td->td_kevq_thred != NULL) - kevq_thred_drain(td->td_kevq_thred); + kevq_thred_drain(td->td_kevq_thred, td); /* * If all of the threads in a process call this routine to diff --git a/sys/sys/event.h b/sys/sys/event.h index 5fcf30cb964b..640eb73cfd26 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -224,7 +224,7 @@ SLIST_HEAD(klist, knote); struct kqueue; TAILQ_HEAD(kqlist, kqueue); struct kevq; -SLIST_HEAD(kevqlist, kevq); +LIST_HEAD(kevqlist, kevq); struct knlist { struct klist kl_list; @@ -275,9 +275,15 @@ struct filterops { void (*f_touch)(struct knote *kn, struct kevent *kev, u_long type); }; -/* The ioctl to set multithreaded mode +/* + * The ioctl to set multithreaded mode */ -#define FKQMULTI _IO('f', 89) +#define FKQMULTI _IOW('f', 89, int) + +/* + * KQ scheduler flags + */ +#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */ /* * An in-flux knote cannot be dropped from its kq while the kq is @@ -294,8 +300,11 @@ struct knote { struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; struct kqueue *kn_kq; /* which kqueue we are on */ - struct kevq *kn_org_kevq; /* the kevq that registered the knote */ struct kevq *kn_kevq; /* the kevq the knote is on */ + /* used by the scheduler */ + struct kevq *kn_org_kevq; /* the kevq that registered the knote */ + struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */ + /* end scheduler */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; @@ -338,7 +347,7 @@ struct kevent_copyops { struct kevq_thred { u_long kevq_hashmask; /* hash mask for kevqs */ struct kevqlist *kevq_hash; /* hash table for kevqs */ - TAILQ_HEAD(, kevq) kevq_tq; + struct kevqlist kevq_list; struct mtx lock; /* the lock for the kevq*/ }; @@ -374,7 +383,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *p, int kqueue_add_filteropts(int filt, struct filterops *filtops); int kqueue_del_filteropts(int filt); -void kevq_thred_drain(struct kevq_thred *kevq_th); +void kevq_thred_drain(struct kevq_thred *kevq_th, struct thread *td); #else /* !_KERNEL */ #include diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 71f2573b2506..05f5e6727489 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -41,10 +41,12 @@ #define KQEXTENT 256 /* linear growth by this amount */ struct kevq { - SLIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */ - TAILQ_ENTRY(kevq) kq_e; /* entry into kqueue's list */ - TAILQ_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's TAILQ */ - struct kqueue *kq; /* the kq that the kevq belongs to */ + LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */ + LIST_ENTRY(kevq) kqd_e; /* entry into kqdomain */ + LIST_ENTRY(kevq) kq_e; /* entry into kq */ + LIST_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's kevq_list */ + struct kqueue *kq; /* the kq that the kevq belongs to */ + struct kqdom *kevq_kqd; /* the kq domain the kevq is on */ struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */ struct mtx lock; /* the lock for the kevq */ TAILQ_HEAD(, knote) kn_head; /* list of pending knotes */ @@ -54,6 +56,26 @@ struct kevq { #define KEVQ_RDY 0x04 int kevq_state; int kevq_refcnt; + + /* Used by the scheduler */ + struct timespec kevq_avg_lat; + struct timespec kevq_last_kev; + int kevq_last_nkev; +}; + +/* TODO: assumed that threads don't get rescheduled across cores */ +struct kqdom { + struct mtx kqd_lock; + TAILQ_ENTRY(kqdom) child_e; + struct kqdom *parent; + int id; + struct timespec kqd_avg_lat; + cpuset_t cpu_mask; + int num_children; + int num_kevq; + TAILQ_HEAD(, kqdom) children; + struct kevqlist kqd_kevqlist; /* list of kevqs on the kdomain, only set for leaf domains*/ + struct kevq *kqd_ckevq; }; struct kqueue { @@ -75,10 +97,15 @@ struct kqueue { struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ - TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */ - struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */ + struct kevq *kq_kevq; /* the kevq for kq, always created, act as buffer queue in multithreaded mode */ struct task kq_task; struct ucred *kq_cred; + + /* scheduling stuff */ + struct kevqlist kq_kevqlist; /* list of kevqs for fall-back round robbin */ + struct kqdom *kq_kqd; /* root domain */ + struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue, used for round robbin */ + int kq_sched_flags; /* Scheduler flag for the KQ */ }; #endif /* !_SYS_EVENTVAR_H_ */ diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c index b5a45c51c9db..89c62684dc43 100644 --- a/tests/sys/kqueue/libkqueue/read_m.c +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -21,14 +21,27 @@ #include #include #include +#include +#include +#include -#define FKQMULTI _IO('f', 89) -#define TEST_DEBUG +/* + * The ioctl to set multithreaded mode + */ +#define FKQMULTI _IOW('f', 89, int) + +/* + * KQ scheduler flags + */ +#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */ + +//#define TEST_DEBUG struct thread_info { pthread_t thrd; int can_crash; pthread_mutex_t lock; + int group_id; int evcnt; int tid; }; @@ -37,8 +50,8 @@ struct thread_info { * Read test */ -#define THREAD_CNT (32) -#define PACKET_CNT (1000) +#define THREAD_CNT (16) +#define PACKET_CNT (1600) int g_kqfd; int g_sockfd[2]; @@ -98,7 +111,9 @@ socket_push(int sockfd, char ch) } } -/* for multi threaded read */ +/*************************** + * Read test + ***************************/ static void* test_socket_read_thrd(void* args) { @@ -127,6 +142,11 @@ test_socket_read_thrd(void* args) sem_post(&g_sem_driver); } +#ifdef TEST_DEBUG + printf("READ_M: thread %d exiting\n", info->tid); +#endif + + sem_post(&g_sem_driver); pthread_exit(0); } @@ -184,14 +204,19 @@ test_socket_read(void) #ifdef TEST_DEBUG printf("READ_M: finished testing, system shutting down...\n"); #endif - for(int i = 0; i < PACKET_CNT; i++) { + for(int i = 0; i < THREAD_CNT; i++) { socket_push(g_sockfd[1], 'e'); + + sem_wait(&g_sem_driver); } for (int i = 0; i < THREAD_CNT; i++) { pthread_join(g_thrd_info[i].thrd, NULL); } +#ifdef TEST_DEBUG + printf("READ_M: clearing kevent...\n"); +#endif EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]); error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); @@ -202,7 +227,205 @@ test_socket_read(void) #endif err(1, "kevent_delete"); } +#ifdef TEST_DEBUG + printf("READ_M: closing sockets...\n"); +#endif + close(g_sockfd[0]); + close(g_sockfd[1]); + + success(); +} + +/*************************** + * Queue test + ***************************/ + +#define THREAD_QUEUE_CNT (4) +#define PACKET_QUEUE_CNT (1000) + +static int +get_ncpu() +{ + int mib[4]; + int numcpu; + size_t len = sizeof(numcpu); + + mib[0] = CTL_HW; + mib[1] = HW_NCPU; + + sysctl(mib, 2, &numcpu, &len, NULL, 0); + + if (numcpu < 1) + { + err(1, "< 1 cpu detected"); + } + + return numcpu; +} + +static void* +test_socket_queue_thrd(void* args) +{ + struct thread_info *info = (struct thread_info *) args; + char dat; + struct kevent *ret; + + while (1) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d waiting for events\n", info->tid); +#endif + ret = kevent_get(g_kqfd); +#ifdef TEST_DEBUG + printf("READ_M: thread %d woke up\n", info->tid); +#endif + + dat = socket_pop(ret->ident); + free(ret); + + if (dat == 'e') + break; + + info->evcnt++; + + /* signal the driver */ + sem_post(&g_sem_driver); + } +#ifdef TEST_DEBUG + printf("READ_M: thread %d exiting\n", info->tid); +#endif + + sem_post(&g_sem_driver); + pthread_exit(0); +} + +static void +test_socket_queue(void) +{ + int error = 0; + const char *test_id = "[Multi][Queue]kevent(EVFILT_READ)"; + + test_begin(test_id); + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0) + err(1, "kevent_read socket"); + + struct kevent kev; + EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]); + + sem_init(&g_sem_driver, 0, 0); + + error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); + + if (error == -1) { +#ifdef TEST_DEBUG + printf("READ_M: kevent add failed with %d\n", errno); +#endif + err(1, "kevent_add"); + } + + cpuset_t cpuset; + int ncpu = get_ncpu(); + int tid = 0; +#ifdef TEST_DEBUG + printf("READ_M: detected %d cores...\n", ncpu); +#endif + + struct thread_info **group = malloc(sizeof(struct thread_info*) * ncpu); + for (int i = 0; i < ncpu; i++) { + group[i] = malloc(sizeof(struct thread_info) * THREAD_QUEUE_CNT); + for (int j = 0; j < THREAD_QUEUE_CNT; j++) { + group[i][j].tid = tid; + tid++; + group[i][j].evcnt = 0; + group[i][j].group_id = i; + pthread_create(&group[i][j].thrd, NULL, test_socket_queue_thrd, &group[i][j]); + CPU_ZERO(&cpuset); + CPU_SET(i, &cpuset); + if (pthread_setaffinity_np(group[i][j].thrd, sizeof(cpuset_t), &cpuset) < 0) { + err(1, "thread_affinity"); + } +#ifdef TEST_DEBUG + printf("READ_M: created and affinitized thread %d to core group %d\n", group[i][j].tid, i); +#endif + } + } + +#ifdef TEST_DEBUG + printf("READ_M: waiting for threads to wait on KQ...\n"); +#endif + + sleep(3); + + int affinity_group = -1; + for(int k = 1; k <= PACKET_QUEUE_CNT; k++) { +#ifdef TEST_DEBUG + printf("READ_M: processing packet %d\n", k); +#endif + socket_push(g_sockfd[1], '.'); + /* wait for thread events */ + sem_wait(&g_sem_driver); + + /* basically only one group should get events, do this for now, ideally we can have a table that remembers each knote's affinity*/ + for(int i = 0; i < ncpu; i++) { + int sum = 0; + for (int j = 0; j < THREAD_QUEUE_CNT; j++) { + sum += group[i][j].evcnt; + } + if (sum != 0 && affinity_group == -1) { + affinity_group = i; + } + +#ifdef TEST_DEBUG + printf("READ_M: group %d sum %d, affinity group: %d\n", i, sum, affinity_group); +#endif + + if (i == affinity_group) { + if (sum != k) { + err(1, "affinity group sum != 1"); + } + } else { + if (sum != 0) { + err(1, "non-affinity group sum != 0"); + } + } + } + } + + +#ifdef TEST_DEBUG + printf("READ_M: finished testing, system shutting down...\n"); +#endif + for(int i = 0; i < THREAD_QUEUE_CNT * ncpu; i++) { + socket_push(g_sockfd[1], 'e'); + + sem_wait(&g_sem_driver); + } + + for (int i = 0; i < ncpu; i++) { + for (int j = 0; j < THREAD_QUEUE_CNT; j++) { + pthread_join(group[i][j].thrd, NULL); + } + free(group[i]); + } + free(group); + +#ifdef TEST_DEBUG + printf("READ_M: clearing kevent...\n"); +#endif + EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]); + + error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); + + if (error == -1) { +#ifdef TEST_DEBUG + printf("READ_M: kevent delete failed with %d\n", errno); +#endif + err(1, "kevent_delete"); + } +#ifdef TEST_DEBUG + printf("READ_M: closing sockets...\n"); +#endif close(g_sockfd[0]); close(g_sockfd[1]); @@ -210,17 +433,16 @@ test_socket_read(void) } -/* +/*************************** * Brutal test - */ - + ***************************/ #define THREAD_BRUTE_CNT (32) #define SOCK_BRUTE_CNT (64) #define PACKET_BRUTE_CNT (10000) #define THREAD_EXIT_PROB (50) #define RAND_SLEEP (29) -#define RAND_SEND_SLEEP (13) +#define RAND_SEND_SLEEP (7) int brute_sockfd[SOCK_BRUTE_CNT][2]; @@ -365,11 +587,13 @@ test_socket_brutal() success(); } + void test_evfilt_read_m() { + int flags = 0; g_kqfd = kqueue(); - int error = ioctl(g_kqfd, FKQMULTI); + int error = ioctl(g_kqfd, FKQMULTI, &flags); if (error == -1) { err(1, "ioctl"); } @@ -378,4 +602,14 @@ test_evfilt_read_m() test_socket_brutal(); close(g_kqfd); + + /* test scheduler */ + flags = KQ_SCHED_QUEUE; + g_kqfd = kqueue(); + error = ioctl(g_kqfd, FKQMULTI, &flags); + + test_socket_queue(); + test_socket_brutal(); + + close(g_kqfd); } \ No newline at end of file