diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 4178b118987a..5ffc510c7ddc 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -92,6 +92,10 @@ static MALLOC_DEFINE(M_KQUEUE, "kqueue", "memory for kqueue system"); static uint32_t cache_pen = 1000; SYSCTL_U32(_kern, OID_AUTO, kq_cache_pen, CTLFLAG_RW, &cache_pen, 0, "KQueue cache miss's penalty in cycles."); +/* sysctl for best of 2 latency penalty */ +static uint32_t log_threshold = 400; +SYSCTL_U32(_kern, OID_AUTO, log_threshold, CTLFLAG_RW, &log_threshold, 0, "KQueue knote log threshold."); + /* sysctl for ws_int_sbt */ static sbintime_t ws_int_sbt = 0; SYSCTL_U64(_kern, OID_AUTO, kq_ws_int_sbt, CTLFLAG_RD, &ws_int_sbt, 0, "KQueue work stealing interval in sbintime."); @@ -170,12 +174,12 @@ static int kevq_acquire(struct kevq *kevq, int locked); static void kevq_worksteal(struct kevq *kevq); static void kevq_drain(struct kevq *kevq, struct thread *td); static void kevq_activate(struct kevq *kevq, struct thread *td); -static struct kevq * kevq_vec_select_kevq(struct veclist *lst, int num_rand, u_long rand); +static struct kevq * kvlst_sel_dist_kevq(struct veclist *lst, u_long rand, struct kevq *kevq_to_skip); +static struct kevq * kvlst_sel_kevq(struct veclist *lst, int num_rand, u_long rand, long (*kevq_cmp_f)(struct kevq*, struct kevq*), struct kevq *kevq_to_skip); static struct knote * kevq_peek_knote(struct kevq *kevq); static inline void kevq_delete_knote(struct kevq *kevq, struct knote *kn); static void kevq_insert_knote(struct kevq *kevq, struct knote *kn); static int kevq_total_knote(struct kevq *kevq); -static struct knote * kevq_dequeue(struct kevq *kevq, int rt); static void kevq_insert_head_knote(struct kevq *kevq, struct knote *kn); static void knote_enqueue_head(struct knote *kn, struct kevq *kevq); @@ -195,7 +199,13 @@ static int kqueue_scan(struct kevq *kq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *timeout, struct kevent *keva, struct thread *td); +static void kqueue_dump(struct kqueue *kq, struct sbuf *buf); + +/* XXX: */ +#ifdef ENABLE_SELECT static void kqueue_wakeup(struct kqueue *kq); +#endif + static struct filterops *kqueue_fo_find(int filt); static void kqueue_fo_release(int filt); struct g_kevent_args; @@ -245,7 +255,7 @@ static struct kevq * knote_next_kevq(struct knote *kn); static void kqdom_init(struct kqdom *kqd); -static void kqdom_update_lat(struct kqdom *leaf, unsigned long avg); +//static void kqdom_update_lat(struct kqdom *leaf, unsigned long avg); static void kqdom_update_parents(struct kqdom *leaf, int direction); static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq); static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq); @@ -335,13 +345,6 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KN_FLUX_UNLOCK(kn) do { \ mtx_unlock(&(kn)->kn_fluxlock); \ } while (0) -#define KN_LEAVE_FLUX_WAKEUP(kn) do { \ - KN_FLUX_NOTOWNED((kn)); \ - KN_FLUX_LOCK((kn)); \ - knote_leave_flux((kn)); \ - knote_flux_wakeup((kn)); \ - KN_FLUX_UNLOCK((kn)); \ -} while(0) #define KEVQ_TH_UNLOCK(kevqth) do { \ mtx_unlock(&(kevqth)->lock); \ } while (0) @@ -366,7 +369,6 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KEVQ_NOTOWNED(kevq) do { \ mtx_assert(&(kevq)->lock, MA_NOTOWNED); \ } while (0) - #define KQD_ROWNED(kqd) do { \ rw_assert(&(kqd)->kqd_lock, RA_RLOCKED); \ } while (0) @@ -418,8 +420,7 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, #define KQTUNE_PARSE_ARGS(sf) (((sf) >> 16) & 0xFFFF) #define KQTUNE_PARSE_OBJ(sf) ((sf) & 0xFFFF) -#define NSHUFF 50 - +#define NSHUFF (50) /* * Pseudo-random number generator for perturbing the profiling clock, @@ -463,7 +464,21 @@ kqueue_srandom(u_long *field, u_long seed) static inline long kevq_exp_lat(struct kevq *kevq) { - return kevq->kevq_avg_lat * (kevq_total_knote(kevq) + 1) + kevq->kevq_last_kev; + int64_t expected_kev; + int64_t last_kev; + int64_t now; + now = get_cyclecount(); + + expected_kev = kevq->kevq_last_nkev * kevq->kevq_avg_lat; + /* XXX: if a thread is interrupted by a signal then this screws up. Maybe have a separate value for signal */ + last_kev = kevq->kevq_last_kev == KEVQ_LAST_KERN ? now : kevq->kevq_last_kev; + + if (last_kev < now - expected_kev) { + last_kev = now - expected_kev; + // CTR2(KTR_KQ, "kevq_exp_lat: %p too behind. setting new last_kevq: %ld", kevq, last_kev); + } + + return kevq->kevq_avg_lat * (kevq_total_knote(kevq) + 1) + last_kev; } static inline long @@ -471,17 +486,17 @@ kevq_lat_cmp(struct kevq *kevq1, struct kevq *kevq2) { if (kevq1 == kevq2) return 0; - - return kevq_exp_lat(kevq1) - kevq_exp_lat(kevq2); + // CTR4(KTR_KQ, "kevq_lat_cmp: comparing %p: %ld with %p: %ld", kevq1, kevq_exp_lat(kevq1), kevq2, kevq_exp_lat(kevq2)); + return kevq_exp_lat(kevq2) - kevq_exp_lat(kevq1); } static inline long -kevq_lat_wcmp(struct kevq *kevq1, struct kevq *kevq2, int pct1) +kevq_lat_wcmp(struct kevq *kevq1, struct kevq *kevq2) { if (kevq1 == kevq2) return 0; - - return kevq_exp_lat(kevq1) - (cache_pen + kevq_exp_lat(kevq2)); + // CTR4(KTR_KQ, "kevq_lat_wcmp: comparing %p: %ld with %p: %ld", kevq1, kevq_exp_lat(kevq1), kevq2, kevq_exp_lat(kevq2)); + return (cache_pen + kevq_exp_lat(kevq2)) - kevq_exp_lat(kevq1); } static inline int @@ -550,7 +565,7 @@ knote_enter_flux_ul(struct knote *kn) static void knote_enter_flux(struct knote *kn) { - /* CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx); */ + CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx); KN_FLUX_OWNED(kn); MPASS(kn->kn_influx < INT_MAX); kn->kn_influx++; @@ -570,11 +585,13 @@ knote_leave_flux_ul(struct knote *kn) static bool knote_leave_flux(struct knote *kn) { - /* CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx); */ - KN_FLUX_OWNED(kn); + CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx); + KN_FLUX_OWNED(kn); MPASS(kn->kn_influx > 0); kn->kn_influx--; + knote_flux_wakeup(kn); + return (kn->kn_influx == 0); } @@ -791,7 +808,7 @@ filt_proc(struct knote *kn, long hint) struct proc *p; u_int event; - CTR1(KTR_KQ, "KQUEUE: filt_proc called for kn %p", kn); + CTR2(KTR_KQ, "filt_proc called for kn %p, hint %ld", kn, hint); p = kn->kn_ptr.p_proc; if (p == NULL) /* already activated, from attach filter */ @@ -801,8 +818,10 @@ filt_proc(struct knote *kn, long hint) event = (u_int)hint & NOTE_PCTRLMASK; /* If the user is interested in this event, record it. */ - if (kn->kn_sfflags & event) + if (kn->kn_sfflags & event) { kn->kn_fflags |= event; + CTR2(KTR_KQ, "filt_proc: set fflags or kn %p: %d", kn, kn->kn_fflags); + } /* Process is gone, so flag the event as finished. */ if (event == NOTE_EXIT) { @@ -836,6 +855,8 @@ knote_fork(struct knlist *list, struct thread *td, int pid) int error; int event; + CTR2(KTR_KQ, "knote_fork for pid %d, tid", td->td_proc->p_pid, td->td_tid); + MPASS(list != NULL); KNL_ASSERT_LOCKED(list); if (SLIST_EMPTY(&list->kl_list)) @@ -843,6 +864,8 @@ knote_fork(struct knlist *list, struct thread *td, int pid) memset(&kev, 0, sizeof(kev)); SLIST_FOREACH(kn, &list->kl_list, kn_selnext) { + CTR2(KTR_KQ, "knote_fork processing knote %p for pid %d", kn, td->td_proc->p_pid); + kq = kn->kn_kq; kevq = kn->kn_org_kevq; @@ -861,13 +884,14 @@ knote_fork(struct knlist *list, struct thread *td, int pid) * The same as knote(), activate the event. */ if ((kn->kn_sfflags & NOTE_TRACK) == 0) { + CTR2(KTR_KQ, "knote_fork activating non-track knote %p for pid %d", kn, td->td_proc->p_pid); event = kn->kn_fop->f_event(kn, NOTE_FORK); KQ_UNLOCK(kq); if (event) knote_activate(kn); - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); continue; } @@ -896,7 +920,7 @@ knote_fork(struct knlist *list, struct thread *td, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, td, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; @@ -910,14 +934,17 @@ knote_fork(struct knlist *list, struct thread *td, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, td, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; - if (kn->kn_fop->f_event(kn, NOTE_FORK)) + + if (kn->kn_fop->f_event(kn, NOTE_FORK)) { + CTR2(KTR_KQ, "knote_fork activating track knote %p for pid %d", kn, td->td_proc->p_pid); knote_activate(kn); + } list->kl_lock(list->kl_lockarg); - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); } } @@ -1002,7 +1029,7 @@ filt_timerexpire(void *knx) knote_enter_flux_ul(kn); knote_activate(kn); - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); if ((kn->kn_flags & EV_ONESHOT) != 0) return; @@ -1570,8 +1597,8 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan if ((KQSCHED_GET_SCHED(kq) & KEVQ_LAT_FLAGS) && (kevq->kevq_state & KEVQ_ACTIVE)) { - /* prob don't need the lock here as these are only accessible by one thread */ - if (kevq->kevq_last_nkev != 0) + /* kevq_last_nkev might be 0 if the thread is waken up by a signal */ + if (kevq->kevq_last_nkev != KEVQ_LAST_KERN && kevq->kevq_last_nkev != 0) { /* make sure we actually processed events last time */ cur_ts = get_cyclecount(); @@ -1589,13 +1616,16 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan avg = cur_ts / kevq->kevq_last_nkev; CTR3(KTR_KQ, "kevent: td %d nkev %d kevent (avg) %ld ns", td->td_tid, kevq->kevq_last_nkev, avg); if (kevq->kevq_avg_lat != 0) { - kevq->kevq_avg_lat = calc_overtime_avg(kevq->kevq_avg_lat, avg, 80); + kevq->kevq_avg_lat = calc_overtime_avg(kevq->kevq_avg_lat, avg, 95); } else { kevq->kevq_avg_lat = avg; } CTR3(KTR_KQ, "kevent: td %d nkev %d kevent (new avg) %ld ns", td->td_tid, kevq->kevq_last_nkev, kevq->kevq_avg_lat); + /* reset kevq->kevq_last_kev and nkev */ + kevq->kevq_last_kev = KEVQ_LAST_KERN; + kevq->kevq_last_nkev = KEVQ_LAST_KERN; //kqdom_update_lat(kevq->kevq_kqd, avg); } } @@ -1770,7 +1800,7 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct int error, filt; int haskqglobal, filedesc_unlock; - CTR5(KTR_KQ, "kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X", kq, kevq, (int)kev->ident, kev->filter, kev->flags); + CTR6(KTR_KQ, "kqueue_register: td %d kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X", td->td_tid, kq, kevq, (int)kev->ident, kev->filter, kev->flags); if ((kev->flags & (EV_ENABLE | EV_DISABLE)) == (EV_ENABLE | EV_DISABLE)) return (EINVAL); @@ -2026,7 +2056,7 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct knote_activate(kn); kn->kn_status &= ~KN_SCAN; - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); kn_list_unlock(knl); @@ -2091,6 +2121,9 @@ kevq_init(struct kevq *kevq) { kevq->kn_marker->kn_kevq = kevq; kevq->kn_marker_rt->kn_kevq = kevq; + kevq->kevq_last_nkev = KEVQ_LAST_KERN; + kevq->kevq_last_kev = KEVQ_LAST_KERN; + kqueue_srandom(&kevq->kevq_rand_seed, (u_long)kevq); } @@ -2469,7 +2502,9 @@ kevq_dump(struct sbuf *buf, struct kevq *kevq, int level) "total_mismatches=\"%ld\" " "total_worksteal=\"%ld\" " "total_realtime=\"%ld\" " - "total_sched=\"%ld\" />\n", + "total_sched=\"%ld\" " + "last_kev=\"%ld\" " + "last_nkev=\"%d\" />\n", level * DUMP_INDENT, ' ', kevq, kevq->kn_count, kevq->kn_rt_count, kevq->kevq_avg_rlimit, kevq->kevq_tot_time, @@ -2481,7 +2516,9 @@ kevq_dump(struct sbuf *buf, struct kevq *kevq, int level) kevq->kevq_tot_kqd_mismatch, kevq->kevq_tot_ws, kevq->kevq_tot_realtime, - kevq->kevq_tot_sched); + kevq->kevq_tot_sched, + kevq->kevq_last_kev, + kevq->kevq_last_nkev); } static void @@ -2568,25 +2605,25 @@ kqdom_update_parents(struct kqdom *kqd, int direction) } } -static void -kqdom_update_lat(struct kqdom *leaf, uint64_t avg) -{ - /* We don't need this function for now */ - KASSERT(0, ("kqdom_update_lat called")); +// static void +// kqdom_update_lat(struct kqdom *leaf, uint64_t avg) +// { +// /* We don't need this function for now */ +// KASSERT(0, ("kqdom_update_lat called")); - while(leaf != NULL) { - if (leaf->avg_lat != 0) { - // bit rot race here? - leaf->avg_lat = calc_overtime_avg(leaf->avg_lat, avg, 80); - } else { - leaf->avg_lat = avg; - } +// while(leaf != NULL) { +// if (leaf->avg_lat != 0) { +// // bit rot race here? +// leaf->avg_lat = calc_overtime_avg(leaf->avg_lat, avg, 80); +// } else { +// leaf->avg_lat = avg; +// } - CTR2(KTR_KQ, "kqdom_update_lat: updated avg lat %ld us for kqdom %d", leaf->avg_lat, leaf->id); +// CTR2(KTR_KQ, "kqdom_update_lat: updated avg lat %ld us for kqdom %d", leaf->avg_lat, leaf->id); - leaf = leaf->parent; - } -} +// leaf = leaf->parent; +// } +// } /* Mirror the cpu_group structure */ @@ -2794,9 +2831,9 @@ kevq_worksteal(struct kevq *kevq) KASSERT(tgt_count <= 8, ("too many kevq ws knotes")); KVLST_RLOCK(kq); - other_kevq = kevq_vec_select_kevq(&kq->kevq_vlist, 1, kqueue_random(&kevq->kevq_rand_seed)); + other_kevq = kvlst_sel_kevq(&kq->kevq_vlist, 1, kqueue_random(&kevq->kevq_rand_seed), NULL, kevq); /* fast fail */ - if (other_kevq != kevq && kevq_stealable(other_kevq)) { + if (other_kevq != NULL && other_kevq != kevq && kevq_stealable(other_kevq)) { if (KEVQ_TRYLOCK(other_kevq)) { if (!kevq_stealable(other_kevq)) { KEVQ_UNLOCK(other_kevq); @@ -2870,8 +2907,8 @@ kevq_worksteal(struct kevq *kevq) kevq->kevq_tot_ws += ws_count; for (int i = 0; i < ws_count; i++) { knote_enqueue_head(ws_lst[i], kevq); - KN_LEAVE_FLUX_WAKEUP(ws_lst[i]); - //CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_lst[i], ws_lst[i]->kn_id, other_kevq); + knote_leave_flux_ul(ws_lst[i]); + CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_lst[i], ws_lst[i]->kn_id, other_kevq); } } @@ -2982,7 +3019,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if (kevq->kevq_avg_rlimit == 0) { kevq->kevq_avg_rlimit = rtlimit; } else { - kevq->kevq_avg_rlimit = calc_overtime_avg(kevq->kevq_avg_rlimit, rtlimit, 80); + kevq->kevq_avg_rlimit = calc_overtime_avg(kevq->kevq_avg_rlimit, rtlimit, 95); } rsbt = 0; if (tsp != NULL) { @@ -3026,6 +3063,27 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, rtmarker->kn_status = KN_MARKER; rtmarker->kn_flags = EV_REALTIME; } + + /* for detecting imbalances only */ + if (kevq_total_knote(kevq) >= log_threshold) { + struct sbuf buf; + char * rbuf; + + rbuf = malloc(1024 * 1024 * sizeof(char), M_KQUEUE, M_NOWAIT); + + if (rbuf != NULL) { + sbuf_new(&buf, rbuf, 1024 * 1024, SBUF_FIXEDLEN | SBUF_INCLUDENUL); + + kqueue_dump(kq, &buf); + + sbuf_finish(&buf); + + uprintf("%s\n", sbuf_data(&buf)); + + sbuf_delete(&buf); + free(rbuf, M_KQUEUE); + } + } KEVQ_LOCK(kevq); @@ -3125,7 +3183,8 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, influx = 0; - while (count < maxevents) { + /* XXX : performance: we should only traverse all knotes in SKQ mode. */ + while (1) { KEVQ_OWNED(kevq); /* fullfill the limit first */ @@ -3145,10 +3204,11 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, pri = 0; kn = TAILQ_FIRST(kntq); } - + KASSERT(kn != NULL, ("kqueue_scan dequeued NULL")); KN_FLUX_LOCK(kn); + if ((kn->kn_status == KN_MARKER && kn != marker && kn != rtmarker) || kn_in_flux(kn)) { if (influx) { @@ -3167,17 +3227,11 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, continue; } - /* Now we have exclusive access to kn */ + CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, fflags: %d, curr %d", td->td_tid, kevq, kn, kn->kn_fflags, curr); + TAILQ_REMOVE(kntq, kn, kn_tqe); - CTR4(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, curr %d", td->td_tid, kevq, kn, curr); - if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { - kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS); - *kncnt -= 1; - KN_FLUX_UNLOCK(kn); - continue; - } - + /* check marker first (exit condition) */ if (kn == marker || kn == rtmarker) { /* We are dequeuing our marker, wakeup threads waiting on it */ knote_flux_wakeup(kn); @@ -3196,6 +3250,13 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, goto done; } + if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { + kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS); + *kncnt -= 1; + KN_FLUX_UNLOCK(kn); + continue; + } + KASSERT(!kn_in_flux(kn), ("knote %p is unexpectedly in flux", kn)); @@ -3231,6 +3292,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); KEVQ_UNLOCK(kevq); + + /* Now we dropped the kevq lock and kn is influx */ + if ((kn->kn_status & KN_KQUEUE) == KN_KQUEUE) { /* TODO: we are waiting for another kqueue */ @@ -3239,40 +3303,66 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knl = kn_list_lock(kn); fevent = kn->kn_fop->f_event(kn, 0); - /* return ALL knotes */ - if (kn->kn_status & KN_WS) { + /* resched knotes that were processed last time - only for SKQ mode */ + if (KQSCHED_GET_SCHED(kq) && (kn->kn_status & KN_PROCESSING) == KN_PROCESSING) { KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); + kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS); if (fevent) { knote_activate(kn); } - knote_leave_flux_ul(kn); - + KEVQ_LOCK(kevq); *kncnt -= 1; + + CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p resched processed knote %p", td->td_tid, kevq, kn); + + knote_leave_flux_ul(kn); + kn_list_unlock(knl); + influx = 1; + continue; + } + + if (count >= maxevents) { + KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); + kn->kn_status &= ~KN_SCAN; + + KEVQ_LOCK(kevq); + /* don't continue processing events when we don't have enough space for it, re-insert at the end */ + TAILQ_INSERT_TAIL(kntq, kn, kn_tqe); + + CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p ignored overflow knote %p", td->td_tid, kevq, kn); + + knote_leave_flux_ul(kn); kn_list_unlock(knl); influx = 1; - CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p returned stolen knote %p", td->td_tid, kevq, kn); continue; } if (fevent == 0) { - KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); + kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS); - knote_leave_flux_ul(kn); + + KEVQ_LOCK(kevq); *kncnt -= 1; + + CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not asserted anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid); + + knote_leave_flux_ul(kn); kn_list_unlock(knl); influx = 1; - CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not valid anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid); continue; } touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); if (touch) kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS); - else + else { + CTR4(KTR_KQ, "kqueue_scan: td %d, kevq %p WRITTEN to KEVP knote %p, fflags: %d", td->td_tid, kevq, kn, kn->kn_fflags); *kevp = kn->kn_kevent; + } + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); if (kn->kn_flags & (EV_CLEAR | EV_DISPATCH)) { @@ -3303,8 +3393,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knote_leave_flux_ul(kn); kn_list_unlock(knl); influx = 1; - } + CTR4(KTR_KQ, "kqueue_scan: td %d, kevq %p finished scanning knote %p, fflags: %d", td->td_tid, kevq, kn, kn->kn_fflags); + } + + KASSERT(count < maxevents, ("count >= maxevents")); + /* we are returning a copy to the user */ kevp++; nkev++; @@ -3367,7 +3461,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if (kevq->kevq_avg_ev == 0) { kevq->kevq_avg_ev = count; } else { - kevq->kevq_avg_ev = calc_overtime_avg(kevq->kevq_avg_ev, count, 80); + kevq->kevq_avg_ev = calc_overtime_avg(kevq->kevq_avg_ev, count, 95); } } @@ -3377,9 +3471,34 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, error = k_ops->k_copyout(k_ops->arg, keva, nkev); } td->td_retval[0] = count; + + CTR2(KTR_KQ, "Kqueue_scan RETURNED for tid %d, total %d events!\n", td->td_tid, count); return (error); } +static void +kqueue_dump(struct kqueue *kq, struct sbuf *buf) +{ + sbuf_printf(buf, "\n", kq, + kq->kq_ssched, kq->kq_ssargs, kq->kq_sfeat, + kq->kq_sfargs,kq->kq_rtshare, kq->kq_tfreq); + sbuf_printf(buf, "\n%*c\n", 1 * DUMP_INDENT, ' '); + KVLST_RLOCK(kq); + for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) { + kevq_dump(buf, veclist_at(&kq->kevq_vlist, i), 2); + } + KVLST_RUNLOCK(kq); + sbuf_printf(buf, "%*c\n", 1 * DUMP_INDENT, ' '); + + /* dump kqdom if used */ + if (KQSCHED_GET_SCHED(kq) & KQDOM_FLAGS) { + sbuf_printf(buf, "\n%*c\n", 1 * DUMP_INDENT, ' '); + kqdom_dump(buf, kq->kq_kqd, 2); + sbuf_printf(buf, "%*c\n", 1 * DUMP_INDENT, ' '); + } + sbuf_printf(buf, "\n\n"); +} + /*ARGSUSED*/ static int kqueue_ioctl(struct file *fp, u_long cmd, void *data, @@ -3483,33 +3602,16 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, KQ_UNLOCK(kq); break; case FKQMPRNT: - + rbuf = malloc(1024 * 1024 * sizeof(char), M_KQUEUE, M_WAITOK); sbuf_new(&buf, rbuf, 1024 * 1024, SBUF_FIXEDLEN | SBUF_INCLUDENUL); - + if (kq->kq_flags & KQ_FLAG_MULTI) { - sbuf_printf(&buf, "\n", kq, - kq->kq_ssched, kq->kq_ssargs, kq->kq_sfeat, - kq->kq_sfargs,kq->kq_rtshare, kq->kq_tfreq); - - sbuf_printf(&buf, "\n%*c\n", 1 * DUMP_INDENT, ' '); - KVLST_RLOCK(kq); - for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) { - kevq_dump(&buf, veclist_at(&kq->kevq_vlist, i), 2); - } - KVLST_RUNLOCK(kq); - sbuf_printf(&buf, "%*c\n", 1 * DUMP_INDENT, ' '); - - /* dump kqdom if used */ - if (KQSCHED_GET_SCHED(kq) & KQDOM_FLAGS) { - sbuf_printf(&buf, "\n%*c\n", 1 * DUMP_INDENT, ' '); - kqdom_dump(&buf, kq->kq_kqd, 2); - sbuf_printf(&buf, "%*c\n", 1 * DUMP_INDENT, ' '); - } - sbuf_printf(&buf, "\n\n"); + kqueue_dump(kq, &buf); } else { error = (EINVAL); + break; } sbuf_finish(&buf); @@ -3699,7 +3801,7 @@ kevq_drain(struct kevq *kevq, struct thread *td) KEVQ_LOCK(kevq); } - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); } KASSERT(kevq_total_knote(kevq) == 0, ("some knotes are left")); @@ -3989,7 +4091,7 @@ knote(struct knlist *list, long hint, int lockflags) if (kn_active) knote_activate(kn); - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); } } if ((lockflags & KNF_LISTLOCKED) == 0) @@ -4010,9 +4112,9 @@ knote_flux_wakeup(struct knote *kn) { KN_FLUX_OWNED(kn); if (kn->kn_fluxwait) { - CTR1(KTR_KQ, "waking up kn %p", kn); + CTR1(KTR_KQ, "knote_flux_wakeup: waking up %p", kn); kn->kn_fluxwait = 0; - wakeup(kn); + wakeup(kn); } } @@ -4354,7 +4456,9 @@ knote_fdclose(struct thread *td, int fd) kn->kn_fluxwait = 1; KQ_UNLOCK(kq); + CTR2(KTR_KQ, "knote_fdclose: thread %d waiting on knote %p", td->td_tid, kn); msleep(kn, &kn->kn_fluxlock, PSOCK | PDROP, "kqflxwt4", 0); + CTR2(KTR_KQ, "knote_fdclose: thread %d woke up from knote %p", td->td_tid, kn); KQ_LOCK(kq); goto again; @@ -4409,6 +4513,7 @@ knote_drop_detached(struct knote *kn, struct thread *td) struct kevq *kevq; struct klist *list; + CTR2(KTR_KQ, "knote_drop_detached: td %d dropping knote %p", td->td_tid, kn); kq = kn->kn_kq; KASSERT((kn->kn_status & KN_DETACHED) != 0, @@ -4435,7 +4540,7 @@ knote_drop_detached(struct knote *kn, struct thread *td) KEVQ_UNLOCK(kevq); } - KN_LEAVE_FLUX_WAKEUP(kn); + knote_leave_flux_ul(kn); KQ_UNLOCK(kq); if (kn->kn_fop->f_isfd) { @@ -4448,35 +4553,54 @@ knote_drop_detached(struct knote *kn, struct thread *td) } static struct kevq * -kevq_vec_select_kevq(struct veclist *lst, int num_rand, u_long rand) +kvlst_sel_dist_kevq(struct veclist *lst, u_long rand, struct kevq *kevq_to_skip) { + struct kevq *ret = NULL; + int idx; int sz; - struct kevq *cur_kevq = NULL, *next_kevq; + sz = veclist_size(lst); + + if (kevq_to_skip != NULL) { + sz--; + } + + if (sz > 0) { + idx = rand % sz; + ret = veclist_at(lst, idx); + if (ret == kevq_to_skip) { + KASSERT(idx + 1 < veclist_size(lst), ("kvlist_sel_dist_kevq overflow")); + ret = veclist_at(lst, idx + 1); + } + } + + return ret; +} + +/* + * kevq_cmp_f: kevq1, kevq2. if kevq1 is more favorable than kevq2, return > 0 + */ +static struct kevq * +kvlst_sel_kevq(struct veclist *lst, int num_rand, u_long rand, long (*kevq_cmp_f)(struct kevq*, struct kevq*), struct kevq *kevq_to_skip) +{ + struct kevq *cur_kevq = NULL; + struct kevq *next_kevq; /* XXX: hack */ KASSERT(num_rand <= 2, ("too large num_rand")); - //CTR1(KTR_KQ, "kevq_vec_select_kevq: num - %d", num_rand); - - sz = veclist_size(lst); - - if (sz > 0) { + if (veclist_size(lst) > 0) { for (int i = 0; i < num_rand; i++) { - next_kevq = veclist_at(lst, rand % sz); + next_kevq = kvlst_sel_dist_kevq(lst, rand, kevq_to_skip); - //CTR5(KTR_KQ, "kevq_vec_select_kevq: candidate kevq %p [%d] avg lat %ld #kn %d Tot: %ld", next_kevq, i, next_kevq->kevq_avg_lat, next_kevq->kn_count, next_kevq->kevq_avg_lat * next_kevq->kn_count); - - if (cur_kevq == NULL || (next_kevq != NULL && kevq_lat_cmp(cur_kevq, next_kevq) > 0)) { + if (cur_kevq == NULL || (next_kevq != NULL && kevq_cmp_f != NULL && kevq_cmp_f(next_kevq, cur_kevq) > 0)) { cur_kevq = next_kevq; + kevq_to_skip = cur_kevq; } /* XXX: hack, 256 queues max */ rand = rand >> 8; } } - - //CTR2(KTR_KQ, "kevq_vec_select_kevq: selected kevq %p Tot: %ld", cur_kevq, cur_kevq == NULL ? 0 : cur_kevq->kevq_avg_lat * cur_kevq->kn_count); - return cur_kevq; } @@ -4590,14 +4714,16 @@ knote_next_kevq(struct knote *kn) KASSERT(kqdom_is_leaf(kqd), ("found kqdom not leaf")); KQD_RLOCK(kqd); - next_kevq = kevq_vec_select_kevq(&kqd->kqd_kevqs, 1, kqueue_random(&kn->kn_rand_seed)); + /* pick a random kevq */ + next_kevq = kvlst_sel_kevq(&kqd->kqd_kevqs, 1, kqueue_random(&kn->kn_rand_seed), NULL, NULL); if (sargs > 0) { KVLST_RLOCK(kq); - other_kevq = kevq_vec_select_kevq(&kq->kevq_vlist, sargs, kqueue_random(&kn->kn_rand_seed)); + other_kevq = kvlst_sel_kevq(&kq->kevq_vlist, sargs, kqueue_random(&kn->kn_rand_seed), kevq_lat_cmp, next_kevq); - if (next_kevq == NULL || (other_kevq != NULL && kevq_lat_wcmp(next_kevq, other_kevq, 90) > 0)) { + if (next_kevq == NULL || (other_kevq != NULL && kevq_lat_wcmp(other_kevq, next_kevq) > 0)) { next_kevq = other_kevq; + CTR1(KTR_KQ, "knote_next_kevq: [QUEUE%d] new selected kevq: %p", next_kevq); } } @@ -4619,7 +4745,7 @@ knote_next_kevq(struct knote *kn) case KQ_SCHED_BEST: KVLST_RLOCK(kq); - next_kevq = kevq_vec_select_kevq(&kq->kevq_vlist, sargs, kqueue_random(&kn->kn_rand_seed)); + next_kevq = kvlst_sel_kevq(&kq->kevq_vlist, sargs, kqueue_random(&kn->kn_rand_seed), kevq_lat_cmp, NULL); next_kevq = kevq_lock_check_avail(next_kevq); KVLST_RUNLOCK(kq); diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 240e3d71ad7e..b14c593cc9b1 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -56,6 +56,10 @@ struct kevq { uint64_t kevq_avg_ev; uint64_t kevq_tot_ev; uint64_t kevq_tot_time; + /* the following two are only set when the thread is procssing in userspace + * in kernel they are set to special value KEVQ_LAST_KERN + */ +#define KEVQ_LAST_KERN (0) uint64_t kevq_last_kev; uint32_t kevq_last_nkev; #define KEVQ_SLEEP 0x01 diff --git a/tests/sys/kqueue/libkqueue/Makefile b/tests/sys/kqueue/libkqueue/Makefile index 10672d2d1466..4178de63dde7 100644 --- a/tests/sys/kqueue/libkqueue/Makefile +++ b/tests/sys/kqueue/libkqueue/Makefile @@ -16,7 +16,9 @@ SRCS.kqtest= \ proc.c \ signal.c \ read_m.c \ - user.c + user.c \ + close_m.c + WARNS?= 2 LDADD+= -lthr .include diff --git a/tests/sys/kqueue/libkqueue/close_m.c b/tests/sys/kqueue/libkqueue/close_m.c new file mode 100644 index 000000000000..3040e75b443c --- /dev/null +++ b/tests/sys/kqueue/libkqueue/close_m.c @@ -0,0 +1,151 @@ + +#include "common.h" +#include "common_m.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define SOCK_CLOSE_PROB (30) +#define CLOSER_RAND_DELAY (17) +#define QUEUE_RAND_DELAY (7) +#define CLOSER_TOT_SOCK (8) +#define PACKET_CNT (CLOSER_TOT_SOCK * 1024) +#define THREAD_CNT (8) + +static int close_socks[CLOSER_TOT_SOCK][2]; +static volatile int close_stop; +static int g_kqfd; + +static void * +socket_closer(void* args) +{ + struct kevent kev; + + while(!close_stop) { + int ran = rand() % CLOSER_TOT_SOCK; + printf("closed idx %d...\n", ran); + + close(close_socks[ran][0]); + close(close_socks[ran][1]); + + /* events are supposed to clean up themselves after fd invalidates */ + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &close_socks[ran][0]) < 0) { + err(1, "kevent_socket"); + } + + EV_SET(&kev, close_socks[ran][0], EVFILT_READ, EV_ADD, 0, 0, &close_socks[ran][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_brutal_add"); + } + + usleep(rand() % CLOSER_RAND_DELAY); + } + + return NULL; +} + +static void * +socket_worker(void* args) +{ + char dat; + struct kevent *ret; + + while (1) { + ret = kevent_get(g_kqfd); + + + printf("processing packet...\n"); + + dat = socket_pop_igerr(ret->ident); + + if (dat == 'e') + break; + + free(ret); + } + + return NULL; +} + +void +test_socket_close(char* name) +{ + pthread_t workers[THREAD_CNT]; + pthread_t closer; + + char id[256]; + struct kevent kev; + + const char *test_id = "[Multi]kevent(close) - "; + + strcpy(id, test_id); + strcat(id, name); + + test_begin(id); + + close_stop = 0; + srand(time(NULL)); + + int flags = KQSCHED_MAKE(KQ_SCHED_CPU, 2, 0, 0); + g_kqfd = kqueue(); + int error = ioctl(g_kqfd, FKQMULTI, &flags); + if (error == -1) { + err(1, "ioctl"); + } + + for (int i = 0; i < CLOSER_TOT_SOCK; i++) { + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &close_socks[i][0]) < 0) { + err(1, "kevent_socket"); + } + + EV_SET(&kev, close_socks[i][0], EVFILT_READ, EV_ADD, 0, 0, &close_socks[i][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_brutal_add"); + } + } + + for (int i = 0; i < THREAD_CNT; i++) { + pthread_create(&workers[i], NULL, socket_worker, NULL); + } + pthread_create(&closer, NULL, socket_closer, NULL); + + + for(int i = 0; i < PACKET_CNT; i++) { + socket_push_igerr(close_socks[rand() % CLOSER_TOT_SOCK][1], '.'); + usleep(rand() % QUEUE_RAND_DELAY); + } + + printf("Stopping closer...\n"); + close_stop = 1; + pthread_join(closer, NULL); + printf("Closer stopped!\n"); + + for (int i = 0; i < THREAD_CNT; i++) { + socket_push(close_socks[rand() % CLOSER_TOT_SOCK][1], 'e'); + } + for (int i = 0; i < THREAD_CNT; i++) { + pthread_join(workers[i], NULL); + } + + for (int i = 0; i < CLOSER_TOT_SOCK; i++) { + close(close_socks[i][0]); + close(close_socks[i][1]); + } + + printf("Threads stopped!\n"); + + success(); +} + diff --git a/tests/sys/kqueue/libkqueue/common_m.h b/tests/sys/kqueue/libkqueue/common_m.h new file mode 100644 index 000000000000..bc5314b78b89 --- /dev/null +++ b/tests/sys/kqueue/libkqueue/common_m.h @@ -0,0 +1,39 @@ +#ifndef _COMMON_M_H +#define _COMMON_M_H + +#include "common.h" + +static inline char +socket_pop(int sockfd) +{ + char buf; + + if (read(sockfd, &buf, 1) < 1) + err(1, "read(2)"); + + return buf; +} + +static inline char +socket_pop_igerr(int sockfd) +{ + char buf = 0; + read(sockfd, &buf, 1); + return buf; +} + +static inline void +socket_push(int sockfd, char ch) +{ + if (write(sockfd, &ch, 1) < 1) { + err(1, "write(2)"); + } +} + +static inline void +socket_push_igerr(int sockfd, char ch) +{ + write(sockfd, &ch, 1); +} + +#endif \ No newline at end of file diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c index 403ae4da23f8..2f2a0b0bea3e 100644 --- a/tests/sys/kqueue/libkqueue/read_m.c +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -17,13 +17,17 @@ */ #include "common.h" +#include "common_m.h" +#include #include #include #include #include #include #include +#include +#include //#define TEST_DEBUG @@ -42,7 +46,7 @@ struct thread_info { * Read test */ -#define THREAD_CNT (16) +#define THREAD_CNT (8) #define PACKET_CNT (1600) static int g_kqfd; @@ -67,35 +71,6 @@ dump_gkq() } } -static char -socket_pop(int sockfd) -{ - char buf; - - /* Drain the read buffer, then make sure there are no more events. */ -#ifdef TEST_DEBUG - printf("READ_M: popping the read buffer of sock %d\n", sockfd); -#endif - if (read(sockfd, &buf, 1) < 1) - err(1, "read(2)"); - - return buf; -} - -static void -socket_push(int sockfd, char ch) -{ -#ifdef TEST_DEBUG - printf("READ_M: pushing to socket %d\n", sockfd); -#endif - if (write(sockfd, &ch, 1) < 1) { -#ifdef TEST_DEBUG - printf("READ_M: write failed with %d\n", errno); -#endif - err(1, "write(2)"); - } -} - /*************************** * Read test ***************************/ @@ -634,24 +609,22 @@ test_socket_ws_timeout() success(); } - /*************************** * Brutal test ***************************/ #define THREAD_BRUTE_CNT (8) -#define SOCK_BRUTE_CNT (512) -#define PACKET_BRUTE_CNT (100 * (SOCK_BRUTE_CNT)) +#define SOCK_BRUTE_CNT (256) +#define PACKET_BRUTE_CNT (256 * (SOCK_BRUTE_CNT)) #define THREAD_EXIT_PROB (50) #define BRUTE_REALTIME_PROB (50) #define BRUTE_MAX_FREQ (10000) #define BRUTE_MIN_FREQ (1) -#define RAND_SLEEP (29) +#define RAND_SLEEP (13) #define RAND_SEND_SLEEP (7) - -int brute_sockfd[SOCK_BRUTE_CNT][2]; -struct thread_info brute_threadinfo[THREAD_BRUTE_CNT]; +static int brute_sockfd[SOCK_BRUTE_CNT][2]; +static struct thread_info brute_threadinfo[THREAD_BRUTE_CNT]; static void* test_socket_brutal_worker(void* args) @@ -691,11 +664,12 @@ test_socket_brutal_worker(void* args) #endif dat = socket_pop(ret->ident); - free(ret); if (dat == 'e') break; + free(ret); + info->evcnt++; usleep(rand() % RAND_SLEEP); @@ -723,7 +697,6 @@ test_socket_brutal(char* name) srand(time(NULL)); - for (int i = 0; i < SOCK_BRUTE_CNT; i++) { /* Create a connected pair of full-duplex sockets for testing socket events */ @@ -740,8 +713,6 @@ test_socket_brutal(char* name) } } - srand(time(NULL)); - #ifdef TEST_DEBUG printf("READ_M: creating %d threads...\n", THREAD_BRUTE_CNT); #endif @@ -910,12 +881,18 @@ test_socket_realtime() success(); } +extern void +test_socket_close(char* name); + void test_evfilt_read_m() { int flags = 0; int error; + /* close test */ + //test_socket_close("default"); + /* Default rand */ flags = 0; g_kqfd = kqueue();