diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 98e0332759fa..3405f7329eef 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -154,6 +154,7 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF); } while (0) TASKQUEUE_DEFINE_THREAD(kqueue_ctx); +//TASKQUEUE_DEFINE_THREAD(kqueue_tmr); extern struct cpu_group *cpu_top; @@ -164,6 +165,11 @@ calc_overtime_avg(uint64_t prev, uint64_t cur, uint32_t prev_pct) return (prev * prev_pct + cur * (100 - prev_pct)) / 100; } +static int +kevq_dbg_count_knotes(struct kevq *kevq); +static void +kevq_dbg_chk_knotes(struct kevq *kevq); +static void kevq_rel_proc_kn(struct kevq *kevq); static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq); static void kevq_thred_init(struct kevq_thred *kevq_th); static void kevq_thred_destroy(struct kevq_thred *kevq_th); @@ -181,6 +187,7 @@ static struct knote * kevq_peek_knote(struct kevq *kevq); static inline void kevq_delete_knote(struct kevq *kevq, struct knote *kn); static void kevq_insert_knote(struct kevq *kevq, struct knote *kn); static int kevq_total_knote(struct kevq *kevq); +static int kevq_avail_knote(struct kevq *kevq); static void kevq_insert_head_knote(struct kevq *kevq, struct knote *kn); static void knote_enqueue_head(struct knote *kn, struct kevq *kevq); @@ -246,6 +253,8 @@ static void knote_activate(struct knote *kn); static int knote_attach(struct knote *kn, struct kqueue *kq); static void knote_drop(struct knote *kn, struct thread *td); static void knote_drop_detached(struct knote *kn, struct thread *td); +static void knote_proc_enqueue(struct knote *kn, struct kevq *kevq); +static void knote_proc_dequeue(struct knote *kn); static void knote_enqueue(struct knote *kn, struct kevq *kevq); static void knote_dequeue(struct knote *kn); static void knote_init(void); @@ -567,6 +576,9 @@ static void knote_enter_flux(struct knote *kn) { CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx); + if (kn->kn_influx != 0) { + panic("knote %p flux error", kn); + } KN_FLUX_OWNED(kn); MPASS(kn->kn_influx < INT_MAX); kn->kn_influx++; @@ -719,9 +731,9 @@ filt_kqueue(struct knote *kn, long hint) if (kevq == NULL) { return 0; } else { - kn->kn_data = kevq_total_knote(kevq); - return (kn->kn_data > 0); -} + kn->kn_data = kevq_avail_knote(kevq); + return (kn->kn_data > 0); + } } /* XXX - move to kern_proc.c? */ @@ -1028,10 +1040,29 @@ filt_timerexpire(void *knx) kn = knx; kn->kn_data++; - knote_enter_flux_ul(kn); +// busy wait, shouldn't be a big problem + + CTR1(KTR_KQ, "timerexpire: for kn %p start\n", kn); +//retry: + KN_FLUX_LOCK(kn); + if (kn->kn_drop) { + KN_FLUX_UNLOCK(kn); + CTR1(KTR_KQ, "timerexpire: kn %p receive dropped\n", kn); + goto skip; + } + if (kn_in_flux(kn)) { + KN_FLUX_UNLOCK(kn); + //CTR1(KTR_KQ, "timerexpire: kn %p retrying\n", kn); + //XXX: shouldn't skip in this case + goto skip; + } + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + knote_activate(kn); knote_leave_flux_ul(kn); +skip: if ((kn->kn_flags & EV_ONESHOT) != 0) return; kc = kn->kn_ptr.p_v; @@ -1040,6 +1071,8 @@ filt_timerexpire(void *knx) kc->next += kc->to; callout_reset_sbt_on(&kc->c, kc->next, 0, filt_timerexpire, kn, PCPU_GET(cpuid), C_ABSOLUTE); + + CTR1(KTR_KQ, "timerexpire: for kn %p end\n", kn); } /* @@ -1124,8 +1157,14 @@ filt_timerdetach(struct knote *kn) struct kq_timer_cb_data *kc; unsigned int old __unused; + KN_FLUX_LOCK(kn); + kn->kn_drop = 1; + CTR1(KTR_KQ, "timerdetach: kn %p set dropped\n", kn); + KN_FLUX_UNLOCK(kn); + kc = kn->kn_ptr.p_v; callout_drain(&kc->c); + CTR1(KTR_KQ, "timerdetach: kn %p callout drained\n", kn); free(kc, M_KQUEUE); old = atomic_fetchadd_int(&kq_ncallouts, -1); KASSERT(old > 0, ("Number of callouts cannot become negative")); @@ -2111,6 +2150,7 @@ kevq_init(struct kevq *kevq) { mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK); TAILQ_INIT(&kevq->kn_head); TAILQ_INIT(&kevq->kn_rt_head); + TAILQ_INIT(&kevq->kn_proc_head); kevq->kn_marker = knote_alloc(M_WAITOK); kevq->kn_marker_rt = knote_alloc(M_WAITOK); @@ -2798,14 +2838,14 @@ kqueue_task(void *arg, int pending) static inline int knote_stealable(struct knote *kn) { - return (kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_PROCESSING | KN_WS | KN_MARKER)) == KN_ACTIVE; + return (kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_WS | KN_MARKER | KN_PROCESSING)) == KN_ACTIVE; } static inline int kevq_stealable(struct kevq *kevq) { //CTR3(KTR_KQ, "kevq_stealable: AVAIL: %d, kn_cnt: %d, WS: %d", kevq_avail(kevq), kevq->kn_count, kevq->kevq_state & KEVQ_WS); - return kevq_avail(kevq) && kevq_total_knote(kevq) > 0 && !(kevq->kevq_state & KEVQ_WS); + return kevq_avail(kevq) && kevq_avail_knote(kevq) > 0 && (kevq->kevq_state & (KEVQ_SCAN | KEVQ_WS)) == 0; } static void @@ -2952,6 +2992,48 @@ kevq_activate(struct kevq *kevq, struct thread *td) } } +static void +kevq_rel_proc_kn(struct kevq *kevq) +{ + struct knote *kn; + + KEVQ_OWNED(kevq); + + while ((kn = TAILQ_FIRST(&kevq->kn_proc_head)) != NULL) { + + KN_FLUX_LOCK(kn); + + if (kn_in_flux(kn)) { + kn->kn_fluxwait = 1; + + KEVQ_UNLOCK(kevq); + msleep(kn, &kn->kn_fluxlock, PSOCK | PDROP, + "kevqflxwt10", 0); + KEVQ_LOCK(kevq); + + continue; + } + + KASSERT(kn->kn_status & KN_PROCESSING, ("releasing non-processing knote")); + CTR2(KTR_KQ, "kevq_rel_proc_kn: kevq %p dequeuing kn %p", kevq, kn); + + // release the knote + knote_proc_dequeue(kn); + + if (!(kn->kn_status & KN_QUEUED) && !(kn->kn_flags & EV_CLEAR)) { + // this dude didn't go thru the scheduler (event not + // triggered) + // we just queue to the front of our own queue, except for + // dawgs with EV_CLEAR if it is not valid - will be checked + // and released if it's valid - will be returned to userspace + CTR1(KTR_KQ, "kevq_rel_proc_kn: requeuing kn %p", kn); + knote_enqueue_head(kn, kevq); + } + + KN_FLUX_UNLOCK(kn); + } +} + /* * Scan, update kn_data (if not ONESHOT), and copyout triggered events. * We treat KN_MARKER knotes as if they are in flux. @@ -2962,7 +3044,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, { struct kqueue *kq; struct kevent *kevp; - struct knote *kn, *marker, *rtmarker; + struct knote *kn, *marker, *rtmarker, *nextkn; struct knlist *knl; sbintime_t asbt, rsbt, fsbt; int count, error, haskqglobal, influx, nkev, touch, fevent; @@ -2970,20 +3052,31 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, struct ktailq *kntq; int *kncnt; - int rtlimit, curr, rdrained, pri; + int rtlimit, curr, rdrained; curr = 0; rdrained = 0; count = 0; + kn = NULL; kq = kevq->kq; nkev = 0; error = 0; haskqglobal = 0; - kn = NULL; + KEVQ_LOCK(kevq); + /* release processing knotes first */ + kevq_rel_proc_kn(kevq); + KEVQ_UNLOCK(kevq); + + // it's important that this is done before activate if (maxevents == 0) goto done_nl; + if ((kevq->kevq_state & KEVQ_ACTIVE) == 0) { + /* activate kq if not already activated */ + kevq_activate(kevq, td); + } + /* adjust max events according to the target frequency */ if ((kq->kq_flags & KQ_FLAG_MULTI) && kq->kq_tfreq > 0 && kevq->kevq_avg_lat > 0) { /* expected events per syscall @@ -3048,11 +3141,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, asbt = fsbt; - if ((kevq->kevq_state & KEVQ_ACTIVE) == 0) { - /* activate kq if not already activated */ - kevq_activate(kevq, td); - } - if (kq->kq_flags & KQ_FLAG_MULTI) { marker = kevq->kn_marker; rtmarker = kevq->kn_marker_rt; @@ -3090,16 +3178,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, retry: KEVQ_OWNED(kevq); - if (kevq_total_knote(kevq) == 0 && (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS)) { + if (kevq_avail_knote(kevq) == 0 && (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS)) { kevq_worksteal(kevq); } KEVQ_OWNED(kevq); kevp = keva; - CTR4(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events, max_ev %d", td->td_tid, kevq, kevq_total_knote(kevq), maxevents); + CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d avail events, %d total, max_ev %d", td->td_tid, kevq, kevq_avail_knote(kevq), kevq_total_knote(kevq), maxevents); - if (kevq_total_knote(kevq) == 0) { + if (kevq_avail_knote(kevq) == 0) { + kevq_dbg_chk_knotes(kevq); if (fsbt == -1) { error = EWOULDBLOCK; } else { @@ -3169,6 +3258,8 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, } KEVQ_OWNED(kevq); + kevq->kevq_state |= KEVQ_SCAN; + // if (kevq_total_knote(kevq) > 0) { // KASSERT(!(TAILQ_FIRST(&kevq->kn_rt_head) == NULL && TAILQ_FIRST(&kevq->kn_head) == NULL), ("NULL > 0?")); // } @@ -3182,9 +3273,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, TAILQ_INSERT_TAIL(&kevq->kn_head, marker, kn_tqe); influx = 0; - - /* XXX : performance: we should only traverse all knotes in SKQ mode. */ - while (1) { + kn = NULL; + nextkn = NULL; + while (count < maxevents) { KEVQ_OWNED(kevq); /* fullfill the limit first */ @@ -3192,18 +3283,20 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if (curr < rtlimit) { kntq = &kevq->kn_rt_head; kncnt = &kevq->kn_rt_count; - kn = TAILQ_FIRST(kntq); - pri = 1; } else { /* we've reached the limit, dequeue the realtime marker knote */ - kn = rtmarker; + nextkn = rtmarker; } } else { kntq = &kevq->kn_head; kncnt = &kevq->kn_count; - pri = 0; - kn = TAILQ_FIRST(kntq); } + + if (nextkn == NULL) { + nextkn = TAILQ_FIRST(kntq); + } + + kn = nextkn; KASSERT(kn != NULL, ("kqueue_scan dequeued NULL")); @@ -3227,19 +3320,29 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, continue; } - CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, fflags: %d, curr %d", td->td_tid, kevq, kn, kn->kn_fflags, curr); - + nextkn = TAILQ_NEXT(kn, kn_tqe); + + if ((kn->kn_status & KN_PROCESSING) == KN_PROCESSING) { + // ignore knotes being processed + KN_FLUX_UNLOCK(kn); + continue; + } + + // now this kn is going to be always dequeued from the kevq TAILQ_REMOVE(kntq, kn, kn_tqe); + CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, fflags: %d, curr %d", td->td_tid, kevq, kn, kn->kn_fflags, curr); + /* check marker first (exit condition) */ if (kn == marker || kn == rtmarker) { /* We are dequeuing our marker, wakeup threads waiting on it */ knote_flux_wakeup(kn); KN_FLUX_UNLOCK(kn); - CTR3(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p PRI %d", td->td_tid, kn, pri); + CTR3(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p PRI %d", td->td_tid, kn, !rdrained); if (kn == rtmarker) { rdrained = 1; + nextkn = NULL; continue; } @@ -3250,9 +3353,10 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, goto done; } + kn->kn_status &= ~(KN_QUEUED | KN_WS); + (*kncnt)--; + if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { - kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS); - *kncnt -= 1; KN_FLUX_UNLOCK(kn); continue; } @@ -3261,9 +3365,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, ("knote %p is unexpectedly in flux", kn)); if ((kn->kn_flags & EV_DROP) == EV_DROP) { - kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS); knote_enter_flux(kn); - *kncnt -= 1; KN_FLUX_UNLOCK(kn); KEVQ_UNLOCK(kevq); /* @@ -3274,9 +3376,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, KEVQ_LOCK(kevq); continue; } else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) { - kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS); knote_enter_flux(kn); - *kncnt -= 1; KN_FLUX_UNLOCK(kn); KEVQ_UNLOCK(kevq); /* @@ -3288,7 +3388,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, KEVQ_LOCK(kevq); kn = NULL; } else { - kn->kn_status |= KN_SCAN; knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); KEVQ_UNLOCK(kevq); @@ -3300,52 +3399,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, */ KQ_GLOBAL_LOCK(&kq_global, haskqglobal); } + knl = kn_list_lock(kn); + kn->kn_status |= KN_SCAN; + fevent = kn->kn_fop->f_event(kn, 0); - /* resched knotes that were processed last time - only for SKQ mode */ - if (KQSCHED_GET_SCHED(kq) && (kn->kn_status & KN_PROCESSING) == KN_PROCESSING) { - KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); - - kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS); - if (fevent) { - knote_activate(kn); - } - - KEVQ_LOCK(kevq); - *kncnt -= 1; - - CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p resched processed knote %p", td->td_tid, kevq, kn); - - knote_leave_flux_ul(kn); - kn_list_unlock(knl); - influx = 1; - continue; - } - - if (count >= maxevents) { - KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); - kn->kn_status &= ~KN_SCAN; - - KEVQ_LOCK(kevq); - /* don't continue processing events when we don't have enough space for it, re-insert at the end */ - TAILQ_INSERT_TAIL(kntq, kn, kn_tqe); - - CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p ignored overflow knote %p", td->td_tid, kevq, kn); - - knote_leave_flux_ul(kn); - kn_list_unlock(knl); - influx = 1; - continue; - } - if (fevent == 0) { + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); - kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS); - - KEVQ_LOCK(kevq); - *kncnt -= 1; + kn->kn_status &= ~(KN_ACTIVE | KN_SCAN); CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not asserted anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid); @@ -3359,7 +3423,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if (touch) kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS); else { - CTR4(KTR_KQ, "kqueue_scan: td %d, kevq %p WRITTEN to KEVP knote %p, fflags: %d", td->td_tid, kevq, kn, kn->kn_fflags); *kevp = kn->kn_kevent; } @@ -3377,18 +3440,13 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, if (kn->kn_flags & EV_DISPATCH) kn->kn_status |= KN_DISABLED; - kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_PROCESSING | KN_WS); - *kncnt -= 1; - } else { - /* this flag is here to prevent a subtle workstealing race where one thread gets an fd - and returns, before it can process the event, another thread steals the knote and - processes the same fd, resulting in the first thread having no data available. - Work stealing will avoid stealing knotes with this flag set */ - kn->kn_status |= KN_PROCESSING; - CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq); - TAILQ_INSERT_TAIL(kntq, kn, kn_tqe); - } + kn->kn_status &= ~KN_ACTIVE; + } + + /* insert the kn to the kn_proc_tq */ + knote_proc_enqueue(kn, kevq); + /* dequeue officially from our queue */ kn->kn_status &= ~KN_SCAN; knote_leave_flux_ul(kn); kn_list_unlock(knl); @@ -3404,7 +3462,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, nkev++; count++; - if (pri) { + if (!rdrained) { curr++; kevq->kevq_tot_realtime++; } @@ -3440,6 +3498,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knote_flux_wakeup_ul(marker); knote_flux_wakeup_ul(rtmarker); + kevq->kevq_state &= ~KEVQ_SCAN; KEVQ_UNLOCK(kevq); CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid); @@ -3473,6 +3532,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, td->td_retval[0] = count; CTR2(KTR_KQ, "Kqueue_scan RETURNED for tid %d, total %d events!\n", td->td_tid, count); + return (error); } @@ -3483,7 +3543,7 @@ kqueue_dump(struct kqueue *kq, struct sbuf *buf) kq->kq_ssched, kq->kq_ssargs, kq->kq_sfeat, kq->kq_sfargs,kq->kq_rtshare, kq->kq_tfreq, kq->kq_total_sched_time); sbuf_printf(buf, "\n%*c\n", 1 * DUMP_INDENT, ' '); - if (KQSCHED_GET_SCHED(kq)) { + if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { // SKQ dump KVLST_RLOCK(kq); for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) { @@ -3649,7 +3709,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred, revents = 0; } else { if (events & (POLLIN | POLLRDNORM)) { - if (kevq_total_knote(kevq)) { + if (kevq_avail_knote(kevq)) { revents |= events & (POLLIN | POLLRDNORM); } else { selrecord(td, &kq->kq_sel); @@ -3727,7 +3787,7 @@ kevq_drain(struct kevq *kevq, struct thread *td) KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!")); KEVQ_UNLOCK(kevq); - /* remove from queues first */ + /* remove the kevq from queues first */ if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { KQ_LOCK(kq); KEVQ_TH_LOCK(kevq->kevq_th); @@ -3760,6 +3820,8 @@ kevq_drain(struct kevq *kevq, struct thread *td) } KEVQ_LOCK(kevq); + /* release all kntoes processed by this dude */ + kevq_rel_proc_kn(kevq); /* drain all knotes on the kevq */ while ((kn = kevq_peek_knote(kevq)) != NULL) { KEVQ_OWNED(kevq); @@ -3805,7 +3867,7 @@ kevq_drain(struct kevq *kevq, struct thread *td) knote_leave_flux_ul(kn); } - KASSERT(kevq_total_knote(kevq) == 0, ("some knotes are left")); + KASSERT(kevq_total_knote(kevq) == 0 && kevq_avail_knote(kevq) == 0, ("some knotes are left")); KEVQ_OWNED(kevq); KEVQ_UNLOCK(kevq); @@ -4063,7 +4125,7 @@ knote(struct knlist *list, long hint, int lockflags) SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { CTR1(KTR_KQ, "knote() scanning kn %p", kn); KN_FLUX_LOCK(kn); - if (kn_in_flux(kn) && ((kn->kn_status & KN_SCAN) == 0)) { + if (kn_in_flux(kn)) { //&& ((kn->kn_status & KN_SCAN) == 0)) { /* * Do not process the influx notes, except for * the influx coming from the kq unlock in the @@ -4543,6 +4605,13 @@ knote_drop_detached(struct knote *kn, struct thread *td) KEVQ_UNLOCK(kevq); } + if (kn->kn_status & KN_PROCESSING) { + kevq = kn->kn_proc_kevq; + KEVQ_LOCK(kevq); + knote_proc_dequeue(kn); + KEVQ_UNLOCK(kevq); + } + knote_leave_flux_ul(kn); KQ_UNLOCK(kq); @@ -4809,6 +4878,45 @@ knote_sched(struct knote *kn) } } +static int +kevq_dbg_count_knotes(struct kevq *kevq) +{ + int cnt = 0; + struct knote *kn; + KEVQ_OWNED(kevq); + + kn = TAILQ_FIRST(&kevq->kn_rt_head); + while(kn != NULL) { + KN_FLUX_LOCK(kn); + if (!(kn->kn_status & (KN_PROCESSING | KN_MARKER))) + cnt++; + KN_FLUX_UNLOCK(kn); + kn = TAILQ_NEXT(kn, kn_tqe); + } + + kn = TAILQ_FIRST(&kevq->kn_head); + while(kn != NULL) { + KN_FLUX_LOCK(kn); + if (!(kn->kn_status & (KN_PROCESSING | KN_MARKER))) + cnt++; + KN_FLUX_UNLOCK(kn); + kn = TAILQ_NEXT(kn, kn_tqe); + } + + return cnt; +} + +static void +kevq_dbg_chk_knotes(struct kevq *kevq) +{ + //int cnt1, cnt2; + //cnt1 = kevq_dbg_count_knotes(kevq); + //cnt2 = kevq_total_knote(kevq); + //if (cnt1 != cnt2) { + // panic("knote counts are not equal %d != %d for kevq %p!", cnt1, cnt2, kevq); + //} +} + /* Here comes kevq priority queue - like operations */ static int kevq_total_knote(struct kevq *kevq) @@ -4816,19 +4924,35 @@ kevq_total_knote(struct kevq *kevq) return (kevq->kn_count + kevq->kn_rt_count); } +static int +kevq_avail_knote(struct kevq *kevq) +{ + int ret = kevq_total_knote(kevq) - kevq->kn_proc_count; + + // if (mtx_owned(&kevq->lock)) { + // if (ret > 0) { + // KASSERT(!(TAILQ_EMPTY(&kevq->kn_head) && TAILQ_EMPTY(&kevq->kn_rt_head)), ("kevq %p avail but no knotes ", kevq)); + // } + // } + return ret; +} + static struct knote * kevq_peek_knote(struct kevq *kevq) { struct knote *kn; - kn = NULL; KEVQ_OWNED(kevq); - if (kevq->kn_rt_count > 0) { - kn = TAILQ_FIRST(&kevq->kn_rt_head); - KASSERT((kn->kn_flags & EV_REALTIME), ("knote in the wrong queue")); - } else if (kevq->kn_count > 0) { + + kn = TAILQ_FIRST(&kevq->kn_rt_head); + + if (kn != NULL) { + KASSERT((kn->kn_flags & EV_REALTIME), ("batch knote in the wrong queue")); + } else { kn = TAILQ_FIRST(&kevq->kn_head); - KASSERT(!(kn->kn_flags & EV_REALTIME), ("knote in the wrong queue")); + if (kn != NULL) { + KASSERT(!(kn->kn_flags & EV_REALTIME), ("rt knote in the wrong queue")); + } } return kn; } @@ -4840,10 +4964,18 @@ kevq_delete_knote(struct kevq *kevq, struct knote *kn) if (kn->kn_flags & EV_REALTIME) { TAILQ_REMOVE(&kevq->kn_rt_head, kn, kn_tqe); + if (kn->kn_status & KN_PROCESSING) { + kevq->kn_proc_count--; + } kevq->kn_rt_count--; + CTR3(KTR_KQ, "KN_CNT: delete kevq %p dec 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count); } else { TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); + if (kn->kn_status & KN_PROCESSING) { + kevq->kn_proc_count--; + } kevq->kn_count--; + CTR3(KTR_KQ, "KN_CNT: delete kevq %p dec 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count); } } @@ -4854,11 +4986,21 @@ kevq_insert_knote(struct kevq *kevq, struct knote *kn) if (kn->kn_flags & EV_REALTIME) { TAILQ_INSERT_TAIL(&kevq->kn_rt_head, kn, kn_tqe); + if ((kn->kn_status & KN_PROCESSING)) { + kevq->kn_proc_count++; + } kevq->kn_rt_count++; + CTR3(KTR_KQ, "KN_CNT: insert kevq %p inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count); } else { TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); + if ((kn->kn_status & KN_PROCESSING)) { + kevq->kn_proc_count++; + } kevq->kn_count++; + CTR3(KTR_KQ, "KN_CNT: insert kevq %p inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count); } + + kevq_wakeup(kevq); } static void @@ -4868,11 +5010,21 @@ kevq_insert_head_knote(struct kevq *kevq, struct knote *kn) if (kn->kn_flags & EV_REALTIME) { TAILQ_INSERT_HEAD(&kevq->kn_rt_head, kn, kn_tqe); + if ((kn->kn_status & KN_PROCESSING)) { + kevq->kn_proc_count++; + } kevq->kn_rt_count++; + CTR3(KTR_KQ, "KN_CNT: insert kevq %p inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count); } else { - TAILQ_INSERT_HEAD(&kevq->kn_head, kn, kn_tqe); + TAILQ_INSERT_HEAD(&kevq->kn_head, kn, kn_tqe); + if ((kn->kn_status & KN_PROCESSING)) { + kevq->kn_proc_count++; + } kevq->kn_count++; + CTR3(KTR_KQ, "KN_CNT: insert kevq %p inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count); } + + kevq_wakeup(kevq); } /* END Priority Queue */ @@ -4883,11 +5035,11 @@ knote_enqueue_head(struct knote *kn, struct kevq *kevq) struct kqueue *kq; kq = kn->kn_kq; - CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq); + CTR2(KTR_KQ, "knote_enqueue_head: kn %p to kevq %p", kn, kevq); KEVQ_OWNED(kevq); - KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux")); + //KASSERT(kn_in_flux(kn) || KN_FLUX_OWNED(kn), ("enqueuing a knote that's not in flux nor locked")); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); /* Queuing to a closing kevq is fine. @@ -4899,8 +5051,8 @@ knote_enqueue_head(struct knote *kn, struct kevq *kevq) kn->kn_status |= KN_QUEUED; kevq_insert_head_knote(kevq, kn); - - kevq_wakeup(kevq); + + kevq_dbg_chk_knotes(kevq); } static void @@ -4913,7 +5065,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq) KEVQ_OWNED(kevq); - KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux")); + //KASSERT(kn_in_flux(kn) || KN_FLUX_OWNED(kn), ("enqueuing a knote that's not in flux nor locked")); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); /* Queuing to a closing kevq is fine. @@ -4925,8 +5077,69 @@ knote_enqueue(struct knote *kn, struct kevq *kevq) kn->kn_status |= KN_QUEUED; kevq_insert_knote(kevq, kn); + + kevq_dbg_chk_knotes(kevq); +} + +static void +knote_proc_enqueue(struct knote *kn, struct kevq *kevq) +{ + KEVQ_OWNED(kevq); + KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux")); + KASSERT((kn->kn_status & KN_PROCESSING) == 0, ("knote already processing or queued")); + + CTR2(KTR_KQ, "knote_proc_enqueue: kn %p to kevq %p", kn, kevq); + TAILQ_INSERT_TAIL(&kevq->kn_proc_head, kn, kn_pqe); + + kn->kn_proc_kevq = kevq; + kn->kn_status |= KN_PROCESSING; +} + +static void +knote_proc_dequeue(struct knote *kn) +{ + struct kevq *kevq, *other_kevq; + + kevq = kn->kn_proc_kevq; - kevq_wakeup(kevq); + KEVQ_OWNED(kevq); + KASSERT(kn->kn_status & KN_PROCESSING, ("knote not being processed")); + + CTR3(KTR_KQ, "knote_proc_dequeue: kn %p from kevq %p flag: 0x%x", kn, kevq, kn->kn_status); + TAILQ_REMOVE(&kevq->kn_proc_head, kn, kn_pqe); + + kn->kn_status &= ~KN_PROCESSING; + kn->kn_proc_kevq = NULL; + + // if the knote is queued, we need to increment the count of the target kevq + if (kn->kn_status & KN_QUEUED) { + other_kevq = kn->kn_kevq; + + if (other_kevq != kevq) { + // if queued, we need to update the other kevq + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); + + KEVQ_LOCK(other_kevq); + } + + // XXX: we did all those locking for this one liner, wtf + // why not use atomic instead? + other_kevq->kn_proc_count--; + + kevq_dbg_chk_knotes(other_kevq); + + if (other_kevq != kevq) { + // update count + kevq_wakeup(other_kevq); + KEVQ_UNLOCK(other_kevq); + + KEVQ_LOCK(kevq); + KN_FLUX_LOCK(kn); + knote_leave_flux(kn); + } + } } static void @@ -4944,6 +5157,9 @@ knote_dequeue(struct knote *kn) kevq_delete_knote(kevq, kn); kn->kn_status &= ~KN_QUEUED; + kn->kn_kevq = NULL; + + kevq_dbg_chk_knotes(kevq); } static void @@ -4961,6 +5177,7 @@ knote_alloc(int mflag) struct knote *ret = uma_zalloc(knote_zone, mflag | M_ZERO); /* CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret); */ mtx_init(&ret->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK); +// TASK_INIT(&ret->kn_timer_task, 0, &task_timer_expire, ret); kqueue_srandom(&ret->kn_rand_seed, (u_long)ret); return ret; } diff --git a/sys/sys/event.h b/sys/sys/event.h index 5dc4f1459250..906cce99e441 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -290,10 +290,14 @@ struct knote { SLIST_ENTRY(knote) kn_link; /* for kq */ SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ + // struct task kn_timer_task; /* timer task for kn */ + // int kn_timer_task_queued; + int kn_drop; TAILQ_ENTRY(knote) kn_tqe; - TAILQ_ENTRY(knote) kn_wse; /* for work stealing queue */ + TAILQ_ENTRY(knote) kn_pqe; /* knote for the processing queue */ struct kqueue *kn_kq; /* which kqueue we are on */ struct kevq *kn_kevq; /* the kevq the knote is on, only valid if KN_QUEUED */ + struct kevq *kn_proc_kevq; /* the kevq that's processing the knote, only valid if KN_PROCESSING */ /* used by the scheduler */ struct kevq *kn_org_kevq; /* the kevq that registered the knote */ struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */ diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 5d2d2e110521..cbefc1601c81 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -31,6 +31,7 @@ #ifndef _SYS_EVENTVAR_H_ #define _SYS_EVENTVAR_H_ +#include #ifndef _KERNEL #error "no user-serviceable parts inside" #endif @@ -66,7 +67,9 @@ struct kevq { #define KEVQ_CLOSING 0x02 #define KEVQ_ACTIVE 0x04 #define KEVQ_WS 0x08 /* the kevq is work stealing */ +#define KEVQ_SCAN 0x10 /* the kevq is being scanned */ int kevq_state; + int kn_proc_count; /* number of processing knotes */ int kn_count; /* number of pending knotes */ int kn_rt_count; /* number of runtime knotes */ /* end 1st cache line */ @@ -82,7 +85,8 @@ struct kevq { struct ktailq kn_head; /* list of pending knotes */ struct knote *kn_marker; struct ktailq kn_rt_head; /* list of pending knotes with runtime priority */ - struct knote *kn_marker_rt; + struct knote *kn_marker_rt; + struct ktailq kn_proc_head; /* list of pending knotes being processed */ int kevq_refcnt; /* TODO: maybe these should be in kqdomain or global */ diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c index 2f2a0b0bea3e..99725442b8d2 100644 --- a/tests/sys/kqueue/libkqueue/read_m.c +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -803,7 +803,7 @@ test_socket_check_rt(int kqfd, int kev_sz, int rtcnt) int nev = kevent_get_n(kqfd, kev, kev_sz); if (nev != kev_sz) { - err(1, "too few events"); + err(1, "too few events: expected %d, recvd %d", kev_sz, nev); } for (int i = 0; i < rtcnt; i++) {