Patch for better scheduling

This commit is contained in:
Oscar Zhao 2020-03-31 02:31:03 -04:00
parent 4a263e37fa
commit cb10d21976
4 changed files with 334 additions and 109 deletions

View File

@ -154,6 +154,7 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF);
} while (0)
TASKQUEUE_DEFINE_THREAD(kqueue_ctx);
//TASKQUEUE_DEFINE_THREAD(kqueue_tmr);
extern struct cpu_group *cpu_top;
@ -164,6 +165,11 @@ calc_overtime_avg(uint64_t prev, uint64_t cur, uint32_t prev_pct)
return (prev * prev_pct + cur * (100 - prev_pct)) / 100;
}
static int
kevq_dbg_count_knotes(struct kevq *kevq);
static void
kevq_dbg_chk_knotes(struct kevq *kevq);
static void kevq_rel_proc_kn(struct kevq *kevq);
static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq);
static void kevq_thred_init(struct kevq_thred *kevq_th);
static void kevq_thred_destroy(struct kevq_thred *kevq_th);
@ -181,6 +187,7 @@ static struct knote * kevq_peek_knote(struct kevq *kevq);
static inline void kevq_delete_knote(struct kevq *kevq, struct knote *kn);
static void kevq_insert_knote(struct kevq *kevq, struct knote *kn);
static int kevq_total_knote(struct kevq *kevq);
static int kevq_avail_knote(struct kevq *kevq);
static void kevq_insert_head_knote(struct kevq *kevq, struct knote *kn);
static void knote_enqueue_head(struct knote *kn, struct kevq *kevq);
@ -246,6 +253,8 @@ static void knote_activate(struct knote *kn);
static int knote_attach(struct knote *kn, struct kqueue *kq);
static void knote_drop(struct knote *kn, struct thread *td);
static void knote_drop_detached(struct knote *kn, struct thread *td);
static void knote_proc_enqueue(struct knote *kn, struct kevq *kevq);
static void knote_proc_dequeue(struct knote *kn);
static void knote_enqueue(struct knote *kn, struct kevq *kevq);
static void knote_dequeue(struct knote *kn);
static void knote_init(void);
@ -567,6 +576,9 @@ static void
knote_enter_flux(struct knote *kn)
{
CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx);
if (kn->kn_influx != 0) {
panic("knote %p flux error", kn);
}
KN_FLUX_OWNED(kn);
MPASS(kn->kn_influx < INT_MAX);
kn->kn_influx++;
@ -719,9 +731,9 @@ filt_kqueue(struct knote *kn, long hint)
if (kevq == NULL) {
return 0;
} else {
kn->kn_data = kevq_total_knote(kevq);
kn->kn_data = kevq_avail_knote(kevq);
return (kn->kn_data > 0);
}
}
}
/* XXX - move to kern_proc.c? */
@ -1028,10 +1040,29 @@ filt_timerexpire(void *knx)
kn = knx;
kn->kn_data++;
knote_enter_flux_ul(kn);
// busy wait, shouldn't be a big problem
CTR1(KTR_KQ, "timerexpire: for kn %p start\n", kn);
//retry:
KN_FLUX_LOCK(kn);
if (kn->kn_drop) {
KN_FLUX_UNLOCK(kn);
CTR1(KTR_KQ, "timerexpire: kn %p receive dropped\n", kn);
goto skip;
}
if (kn_in_flux(kn)) {
KN_FLUX_UNLOCK(kn);
//CTR1(KTR_KQ, "timerexpire: kn %p retrying\n", kn);
//XXX: shouldn't skip in this case
goto skip;
}
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
knote_activate(kn);
knote_leave_flux_ul(kn);
skip:
if ((kn->kn_flags & EV_ONESHOT) != 0)
return;
kc = kn->kn_ptr.p_v;
@ -1040,6 +1071,8 @@ filt_timerexpire(void *knx)
kc->next += kc->to;
callout_reset_sbt_on(&kc->c, kc->next, 0, filt_timerexpire, kn,
PCPU_GET(cpuid), C_ABSOLUTE);
CTR1(KTR_KQ, "timerexpire: for kn %p end\n", kn);
}
/*
@ -1124,8 +1157,14 @@ filt_timerdetach(struct knote *kn)
struct kq_timer_cb_data *kc;
unsigned int old __unused;
KN_FLUX_LOCK(kn);
kn->kn_drop = 1;
CTR1(KTR_KQ, "timerdetach: kn %p set dropped\n", kn);
KN_FLUX_UNLOCK(kn);
kc = kn->kn_ptr.p_v;
callout_drain(&kc->c);
CTR1(KTR_KQ, "timerdetach: kn %p callout drained\n", kn);
free(kc, M_KQUEUE);
old = atomic_fetchadd_int(&kq_ncallouts, -1);
KASSERT(old > 0, ("Number of callouts cannot become negative"));
@ -2111,6 +2150,7 @@ kevq_init(struct kevq *kevq) {
mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK);
TAILQ_INIT(&kevq->kn_head);
TAILQ_INIT(&kevq->kn_rt_head);
TAILQ_INIT(&kevq->kn_proc_head);
kevq->kn_marker = knote_alloc(M_WAITOK);
kevq->kn_marker_rt = knote_alloc(M_WAITOK);
@ -2798,14 +2838,14 @@ kqueue_task(void *arg, int pending)
static inline int
knote_stealable(struct knote *kn)
{
return (kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_PROCESSING | KN_WS | KN_MARKER)) == KN_ACTIVE;
return (kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_WS | KN_MARKER | KN_PROCESSING)) == KN_ACTIVE;
}
static inline int
kevq_stealable(struct kevq *kevq)
{
//CTR3(KTR_KQ, "kevq_stealable: AVAIL: %d, kn_cnt: %d, WS: %d", kevq_avail(kevq), kevq->kn_count, kevq->kevq_state & KEVQ_WS);
return kevq_avail(kevq) && kevq_total_knote(kevq) > 0 && !(kevq->kevq_state & KEVQ_WS);
return kevq_avail(kevq) && kevq_avail_knote(kevq) > 0 && (kevq->kevq_state & (KEVQ_SCAN | KEVQ_WS)) == 0;
}
static void
@ -2952,6 +2992,48 @@ kevq_activate(struct kevq *kevq, struct thread *td)
}
}
static void
kevq_rel_proc_kn(struct kevq *kevq)
{
struct knote *kn;
KEVQ_OWNED(kevq);
while ((kn = TAILQ_FIRST(&kevq->kn_proc_head)) != NULL) {
KN_FLUX_LOCK(kn);
if (kn_in_flux(kn)) {
kn->kn_fluxwait = 1;
KEVQ_UNLOCK(kevq);
msleep(kn, &kn->kn_fluxlock, PSOCK | PDROP,
"kevqflxwt10", 0);
KEVQ_LOCK(kevq);
continue;
}
KASSERT(kn->kn_status & KN_PROCESSING, ("releasing non-processing knote"));
CTR2(KTR_KQ, "kevq_rel_proc_kn: kevq %p dequeuing kn %p", kevq, kn);
// release the knote
knote_proc_dequeue(kn);
if (!(kn->kn_status & KN_QUEUED) && !(kn->kn_flags & EV_CLEAR)) {
// this dude didn't go thru the scheduler (event not
// triggered)
// we just queue to the front of our own queue, except for
// dawgs with EV_CLEAR if it is not valid - will be checked
// and released if it's valid - will be returned to userspace
CTR1(KTR_KQ, "kevq_rel_proc_kn: requeuing kn %p", kn);
knote_enqueue_head(kn, kevq);
}
KN_FLUX_UNLOCK(kn);
}
}
/*
* Scan, update kn_data (if not ONESHOT), and copyout triggered events.
* We treat KN_MARKER knotes as if they are in flux.
@ -2962,7 +3044,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
{
struct kqueue *kq;
struct kevent *kevp;
struct knote *kn, *marker, *rtmarker;
struct knote *kn, *marker, *rtmarker, *nextkn;
struct knlist *knl;
sbintime_t asbt, rsbt, fsbt;
int count, error, haskqglobal, influx, nkev, touch, fevent;
@ -2970,20 +3052,31 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
struct ktailq *kntq;
int *kncnt;
int rtlimit, curr, rdrained, pri;
int rtlimit, curr, rdrained;
curr = 0;
rdrained = 0;
count = 0;
kn = NULL;
kq = kevq->kq;
nkev = 0;
error = 0;
haskqglobal = 0;
kn = NULL;
KEVQ_LOCK(kevq);
/* release processing knotes first */
kevq_rel_proc_kn(kevq);
KEVQ_UNLOCK(kevq);
// it's important that this is done before activate
if (maxevents == 0)
goto done_nl;
if ((kevq->kevq_state & KEVQ_ACTIVE) == 0) {
/* activate kq if not already activated */
kevq_activate(kevq, td);
}
/* adjust max events according to the target frequency */
if ((kq->kq_flags & KQ_FLAG_MULTI) && kq->kq_tfreq > 0 && kevq->kevq_avg_lat > 0) {
/* expected events per syscall
@ -3048,11 +3141,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
asbt = fsbt;
if ((kevq->kevq_state & KEVQ_ACTIVE) == 0) {
/* activate kq if not already activated */
kevq_activate(kevq, td);
}
if (kq->kq_flags & KQ_FLAG_MULTI) {
marker = kevq->kn_marker;
rtmarker = kevq->kn_marker_rt;
@ -3090,16 +3178,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
retry:
KEVQ_OWNED(kevq);
if (kevq_total_knote(kevq) == 0 && (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS)) {
if (kevq_avail_knote(kevq) == 0 && (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS)) {
kevq_worksteal(kevq);
}
KEVQ_OWNED(kevq);
kevp = keva;
CTR4(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events, max_ev %d", td->td_tid, kevq, kevq_total_knote(kevq), maxevents);
CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d avail events, %d total, max_ev %d", td->td_tid, kevq, kevq_avail_knote(kevq), kevq_total_knote(kevq), maxevents);
if (kevq_total_knote(kevq) == 0) {
if (kevq_avail_knote(kevq) == 0) {
kevq_dbg_chk_knotes(kevq);
if (fsbt == -1) {
error = EWOULDBLOCK;
} else {
@ -3169,6 +3258,8 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
KEVQ_OWNED(kevq);
kevq->kevq_state |= KEVQ_SCAN;
// if (kevq_total_knote(kevq) > 0) {
// KASSERT(!(TAILQ_FIRST(&kevq->kn_rt_head) == NULL && TAILQ_FIRST(&kevq->kn_head) == NULL), ("NULL > 0?"));
// }
@ -3182,9 +3273,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
TAILQ_INSERT_TAIL(&kevq->kn_head, marker, kn_tqe);
influx = 0;
/* XXX : performance: we should only traverse all knotes in SKQ mode. */
while (1) {
kn = NULL;
nextkn = NULL;
while (count < maxevents) {
KEVQ_OWNED(kevq);
/* fullfill the limit first */
@ -3192,19 +3283,21 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (curr < rtlimit) {
kntq = &kevq->kn_rt_head;
kncnt = &kevq->kn_rt_count;
kn = TAILQ_FIRST(kntq);
pri = 1;
} else {
/* we've reached the limit, dequeue the realtime marker knote */
kn = rtmarker;
nextkn = rtmarker;
}
} else {
kntq = &kevq->kn_head;
kncnt = &kevq->kn_count;
pri = 0;
kn = TAILQ_FIRST(kntq);
}
if (nextkn == NULL) {
nextkn = TAILQ_FIRST(kntq);
}
kn = nextkn;
KASSERT(kn != NULL, ("kqueue_scan dequeued NULL"));
KN_FLUX_LOCK(kn);
@ -3227,19 +3320,29 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
continue;
}
CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, fflags: %d, curr %d", td->td_tid, kevq, kn, kn->kn_fflags, curr);
nextkn = TAILQ_NEXT(kn, kn_tqe);
if ((kn->kn_status & KN_PROCESSING) == KN_PROCESSING) {
// ignore knotes being processed
KN_FLUX_UNLOCK(kn);
continue;
}
// now this kn is going to be always dequeued from the kevq
TAILQ_REMOVE(kntq, kn, kn_tqe);
CTR5(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p, fflags: %d, curr %d", td->td_tid, kevq, kn, kn->kn_fflags, curr);
/* check marker first (exit condition) */
if (kn == marker || kn == rtmarker) {
/* We are dequeuing our marker, wakeup threads waiting on it */
knote_flux_wakeup(kn);
KN_FLUX_UNLOCK(kn);
CTR3(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p PRI %d", td->td_tid, kn, pri);
CTR3(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p PRI %d", td->td_tid, kn, !rdrained);
if (kn == rtmarker) {
rdrained = 1;
nextkn = NULL;
continue;
}
@ -3250,9 +3353,10 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
goto done;
}
kn->kn_status &= ~(KN_QUEUED | KN_WS);
(*kncnt)--;
if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
continue;
}
@ -3261,9 +3365,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
("knote %p is unexpectedly in flux", kn));
if ((kn->kn_flags & EV_DROP) == EV_DROP) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
/*
@ -3274,9 +3376,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
continue;
} else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
/*
@ -3288,7 +3388,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
kn = NULL;
} else {
kn->kn_status |= KN_SCAN;
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
@ -3300,52 +3399,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
*/
KQ_GLOBAL_LOCK(&kq_global, haskqglobal);
}
knl = kn_list_lock(kn);
kn->kn_status |= KN_SCAN;
fevent = kn->kn_fop->f_event(kn, 0);
/* resched knotes that were processed last time - only for SKQ mode */
if (KQSCHED_GET_SCHED(kq) && (kn->kn_status & KN_PROCESSING) == KN_PROCESSING) {
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS);
if (fevent) {
knote_activate(kn);
}
KEVQ_LOCK(kevq);
*kncnt -= 1;
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p resched processed knote %p", td->td_tid, kevq, kn);
knote_leave_flux_ul(kn);
kn_list_unlock(knl);
influx = 1;
continue;
}
if (count >= maxevents) {
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~KN_SCAN;
KEVQ_LOCK(kevq);
/* don't continue processing events when we don't have enough space for it, re-insert at the end */
TAILQ_INSERT_TAIL(kntq, kn, kn_tqe);
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p ignored overflow knote %p", td->td_tid, kevq, kn);
knote_leave_flux_ul(kn);
kn_list_unlock(knl);
influx = 1;
continue;
}
if (fevent == 0) {
KEVQ_LOCK(kevq);
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS);
KEVQ_LOCK(kevq);
*kncnt -= 1;
kn->kn_status &= ~(KN_ACTIVE | KN_SCAN);
CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not asserted anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid);
@ -3359,7 +3423,6 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (touch)
kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS);
else {
CTR4(KTR_KQ, "kqueue_scan: td %d, kevq %p WRITTEN to KEVP knote %p, fflags: %d", td->td_tid, kevq, kn, kn->kn_fflags);
*kevp = kn->kn_kevent;
}
@ -3377,18 +3440,13 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (kn->kn_flags & EV_DISPATCH)
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_PROCESSING | KN_WS);
*kncnt -= 1;
} else {
/* this flag is here to prevent a subtle workstealing race where one thread gets an fd
and returns, before it can process the event, another thread steals the knote and
processes the same fd, resulting in the first thread having no data available.
Work stealing will avoid stealing knotes with this flag set */
kn->kn_status |= KN_PROCESSING;
CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq);
TAILQ_INSERT_TAIL(kntq, kn, kn_tqe);
kn->kn_status &= ~KN_ACTIVE;
}
/* insert the kn to the kn_proc_tq */
knote_proc_enqueue(kn, kevq);
/* dequeue officially from our queue */
kn->kn_status &= ~KN_SCAN;
knote_leave_flux_ul(kn);
kn_list_unlock(knl);
@ -3404,7 +3462,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
nkev++;
count++;
if (pri) {
if (!rdrained) {
curr++;
kevq->kevq_tot_realtime++;
}
@ -3440,6 +3498,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
knote_flux_wakeup_ul(marker);
knote_flux_wakeup_ul(rtmarker);
kevq->kevq_state &= ~KEVQ_SCAN;
KEVQ_UNLOCK(kevq);
CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid);
@ -3473,6 +3532,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
td->td_retval[0] = count;
CTR2(KTR_KQ, "Kqueue_scan RETURNED for tid %d, total %d events!\n", td->td_tid, count);
return (error);
}
@ -3483,7 +3543,7 @@ kqueue_dump(struct kqueue *kq, struct sbuf *buf)
kq->kq_ssched, kq->kq_ssargs, kq->kq_sfeat,
kq->kq_sfargs,kq->kq_rtshare, kq->kq_tfreq, kq->kq_total_sched_time);
sbuf_printf(buf, "\n%*c<kevq_dump>\n", 1 * DUMP_INDENT, ' ');
if (KQSCHED_GET_SCHED(kq)) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// SKQ dump
KVLST_RLOCK(kq);
for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) {
@ -3649,7 +3709,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
revents = 0;
} else {
if (events & (POLLIN | POLLRDNORM)) {
if (kevq_total_knote(kevq)) {
if (kevq_avail_knote(kevq)) {
revents |= events & (POLLIN | POLLRDNORM);
} else {
selrecord(td, &kq->kq_sel);
@ -3727,7 +3787,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!"));
KEVQ_UNLOCK(kevq);
/* remove from queues first */
/* remove the kevq from queues first */
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
KQ_LOCK(kq);
KEVQ_TH_LOCK(kevq->kevq_th);
@ -3760,6 +3820,8 @@ kevq_drain(struct kevq *kevq, struct thread *td)
}
KEVQ_LOCK(kevq);
/* release all kntoes processed by this dude */
kevq_rel_proc_kn(kevq);
/* drain all knotes on the kevq */
while ((kn = kevq_peek_knote(kevq)) != NULL) {
KEVQ_OWNED(kevq);
@ -3805,7 +3867,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
knote_leave_flux_ul(kn);
}
KASSERT(kevq_total_knote(kevq) == 0, ("some knotes are left"));
KASSERT(kevq_total_knote(kevq) == 0 && kevq_avail_knote(kevq) == 0, ("some knotes are left"));
KEVQ_OWNED(kevq);
KEVQ_UNLOCK(kevq);
@ -4063,7 +4125,7 @@ knote(struct knlist *list, long hint, int lockflags)
SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) {
CTR1(KTR_KQ, "knote() scanning kn %p", kn);
KN_FLUX_LOCK(kn);
if (kn_in_flux(kn) && ((kn->kn_status & KN_SCAN) == 0)) {
if (kn_in_flux(kn)) { //&& ((kn->kn_status & KN_SCAN) == 0)) {
/*
* Do not process the influx notes, except for
* the influx coming from the kq unlock in the
@ -4543,6 +4605,13 @@ knote_drop_detached(struct knote *kn, struct thread *td)
KEVQ_UNLOCK(kevq);
}
if (kn->kn_status & KN_PROCESSING) {
kevq = kn->kn_proc_kevq;
KEVQ_LOCK(kevq);
knote_proc_dequeue(kn);
KEVQ_UNLOCK(kevq);
}
knote_leave_flux_ul(kn);
KQ_UNLOCK(kq);
@ -4809,6 +4878,45 @@ knote_sched(struct knote *kn)
}
}
static int
kevq_dbg_count_knotes(struct kevq *kevq)
{
int cnt = 0;
struct knote *kn;
KEVQ_OWNED(kevq);
kn = TAILQ_FIRST(&kevq->kn_rt_head);
while(kn != NULL) {
KN_FLUX_LOCK(kn);
if (!(kn->kn_status & (KN_PROCESSING | KN_MARKER)))
cnt++;
KN_FLUX_UNLOCK(kn);
kn = TAILQ_NEXT(kn, kn_tqe);
}
kn = TAILQ_FIRST(&kevq->kn_head);
while(kn != NULL) {
KN_FLUX_LOCK(kn);
if (!(kn->kn_status & (KN_PROCESSING | KN_MARKER)))
cnt++;
KN_FLUX_UNLOCK(kn);
kn = TAILQ_NEXT(kn, kn_tqe);
}
return cnt;
}
static void
kevq_dbg_chk_knotes(struct kevq *kevq)
{
//int cnt1, cnt2;
//cnt1 = kevq_dbg_count_knotes(kevq);
//cnt2 = kevq_total_knote(kevq);
//if (cnt1 != cnt2) {
// panic("knote counts are not equal %d != %d for kevq %p!", cnt1, cnt2, kevq);
//}
}
/* Here comes kevq priority queue - like operations */
static int
kevq_total_knote(struct kevq *kevq)
@ -4816,19 +4924,35 @@ kevq_total_knote(struct kevq *kevq)
return (kevq->kn_count + kevq->kn_rt_count);
}
static int
kevq_avail_knote(struct kevq *kevq)
{
int ret = kevq_total_knote(kevq) - kevq->kn_proc_count;
// if (mtx_owned(&kevq->lock)) {
// if (ret > 0) {
// KASSERT(!(TAILQ_EMPTY(&kevq->kn_head) && TAILQ_EMPTY(&kevq->kn_rt_head)), ("kevq %p avail but no knotes ", kevq));
// }
// }
return ret;
}
static struct knote *
kevq_peek_knote(struct kevq *kevq)
{
struct knote *kn;
kn = NULL;
KEVQ_OWNED(kevq);
if (kevq->kn_rt_count > 0) {
kn = TAILQ_FIRST(&kevq->kn_rt_head);
KASSERT((kn->kn_flags & EV_REALTIME), ("knote in the wrong queue"));
} else if (kevq->kn_count > 0) {
if (kn != NULL) {
KASSERT((kn->kn_flags & EV_REALTIME), ("batch knote in the wrong queue"));
} else {
kn = TAILQ_FIRST(&kevq->kn_head);
KASSERT(!(kn->kn_flags & EV_REALTIME), ("knote in the wrong queue"));
if (kn != NULL) {
KASSERT(!(kn->kn_flags & EV_REALTIME), ("rt knote in the wrong queue"));
}
}
return kn;
}
@ -4840,10 +4964,18 @@ kevq_delete_knote(struct kevq *kevq, struct knote *kn)
if (kn->kn_flags & EV_REALTIME) {
TAILQ_REMOVE(&kevq->kn_rt_head, kn, kn_tqe);
if (kn->kn_status & KN_PROCESSING) {
kevq->kn_proc_count--;
}
kevq->kn_rt_count--;
CTR3(KTR_KQ, "KN_CNT: delete kevq %p <R> dec 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count);
} else {
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
if (kn->kn_status & KN_PROCESSING) {
kevq->kn_proc_count--;
}
kevq->kn_count--;
CTR3(KTR_KQ, "KN_CNT: delete kevq %p <N> dec 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count);
}
}
@ -4854,11 +4986,21 @@ kevq_insert_knote(struct kevq *kevq, struct knote *kn)
if (kn->kn_flags & EV_REALTIME) {
TAILQ_INSERT_TAIL(&kevq->kn_rt_head, kn, kn_tqe);
if ((kn->kn_status & KN_PROCESSING)) {
kevq->kn_proc_count++;
}
kevq->kn_rt_count++;
CTR3(KTR_KQ, "KN_CNT: insert kevq %p <R> inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count);
} else {
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
kevq->kn_count++;
if ((kn->kn_status & KN_PROCESSING)) {
kevq->kn_proc_count++;
}
kevq->kn_count++;
CTR3(KTR_KQ, "KN_CNT: insert kevq %p <N> inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count);
}
kevq_wakeup(kevq);
}
static void
@ -4868,11 +5010,21 @@ kevq_insert_head_knote(struct kevq *kevq, struct knote *kn)
if (kn->kn_flags & EV_REALTIME) {
TAILQ_INSERT_HEAD(&kevq->kn_rt_head, kn, kn_tqe);
if ((kn->kn_status & KN_PROCESSING)) {
kevq->kn_proc_count++;
}
kevq->kn_rt_count++;
CTR3(KTR_KQ, "KN_CNT: insert kevq %p <R> inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_rt_count, kevq->kn_proc_count);
} else {
TAILQ_INSERT_HEAD(&kevq->kn_head, kn, kn_tqe);
kevq->kn_count++;
if ((kn->kn_status & KN_PROCESSING)) {
kevq->kn_proc_count++;
}
kevq->kn_count++;
CTR3(KTR_KQ, "KN_CNT: insert kevq %p <N> inc 1, new cnt = %d, proc = %d", kevq, kevq->kn_count, kevq->kn_proc_count);
}
kevq_wakeup(kevq);
}
/* END Priority Queue */
@ -4883,11 +5035,11 @@ knote_enqueue_head(struct knote *kn, struct kevq *kevq)
struct kqueue *kq;
kq = kn->kn_kq;
CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq);
CTR2(KTR_KQ, "knote_enqueue_head: kn %p to kevq %p", kn, kevq);
KEVQ_OWNED(kevq);
KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux"));
//KASSERT(kn_in_flux(kn) || KN_FLUX_OWNED(kn), ("enqueuing a knote that's not in flux nor locked"));
KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued"));
/* Queuing to a closing kevq is fine.
@ -4900,7 +5052,7 @@ knote_enqueue_head(struct knote *kn, struct kevq *kevq)
kevq_insert_head_knote(kevq, kn);
kevq_wakeup(kevq);
kevq_dbg_chk_knotes(kevq);
}
static void
@ -4913,7 +5065,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
KEVQ_OWNED(kevq);
KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux"));
//KASSERT(kn_in_flux(kn) || KN_FLUX_OWNED(kn), ("enqueuing a knote that's not in flux nor locked"));
KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued"));
/* Queuing to a closing kevq is fine.
@ -4926,7 +5078,68 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
kevq_insert_knote(kevq, kn);
kevq_wakeup(kevq);
kevq_dbg_chk_knotes(kevq);
}
static void
knote_proc_enqueue(struct knote *kn, struct kevq *kevq)
{
KEVQ_OWNED(kevq);
KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux"));
KASSERT((kn->kn_status & KN_PROCESSING) == 0, ("knote already processing or queued"));
CTR2(KTR_KQ, "knote_proc_enqueue: kn %p to kevq %p", kn, kevq);
TAILQ_INSERT_TAIL(&kevq->kn_proc_head, kn, kn_pqe);
kn->kn_proc_kevq = kevq;
kn->kn_status |= KN_PROCESSING;
}
static void
knote_proc_dequeue(struct knote *kn)
{
struct kevq *kevq, *other_kevq;
kevq = kn->kn_proc_kevq;
KEVQ_OWNED(kevq);
KASSERT(kn->kn_status & KN_PROCESSING, ("knote not being processed"));
CTR3(KTR_KQ, "knote_proc_dequeue: kn %p from kevq %p flag: 0x%x", kn, kevq, kn->kn_status);
TAILQ_REMOVE(&kevq->kn_proc_head, kn, kn_pqe);
kn->kn_status &= ~KN_PROCESSING;
kn->kn_proc_kevq = NULL;
// if the knote is queued, we need to increment the count of the target kevq
if (kn->kn_status & KN_QUEUED) {
other_kevq = kn->kn_kevq;
if (other_kevq != kevq) {
// if queued, we need to update the other kevq
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
KEVQ_LOCK(other_kevq);
}
// XXX: we did all those locking for this one liner, wtf
// why not use atomic instead?
other_kevq->kn_proc_count--;
kevq_dbg_chk_knotes(other_kevq);
if (other_kevq != kevq) {
// update count
kevq_wakeup(other_kevq);
KEVQ_UNLOCK(other_kevq);
KEVQ_LOCK(kevq);
KN_FLUX_LOCK(kn);
knote_leave_flux(kn);
}
}
}
static void
@ -4944,6 +5157,9 @@ knote_dequeue(struct knote *kn)
kevq_delete_knote(kevq, kn);
kn->kn_status &= ~KN_QUEUED;
kn->kn_kevq = NULL;
kevq_dbg_chk_knotes(kevq);
}
static void
@ -4961,6 +5177,7 @@ knote_alloc(int mflag)
struct knote *ret = uma_zalloc(knote_zone, mflag | M_ZERO);
/* CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret); */
mtx_init(&ret->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK);
// TASK_INIT(&ret->kn_timer_task, 0, &task_timer_expire, ret);
kqueue_srandom(&ret->kn_rand_seed, (u_long)ret);
return ret;
}

View File

@ -290,10 +290,14 @@ struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
// struct task kn_timer_task; /* timer task for kn */
// int kn_timer_task_queued;
int kn_drop;
TAILQ_ENTRY(knote) kn_tqe;
TAILQ_ENTRY(knote) kn_wse; /* for work stealing queue */
TAILQ_ENTRY(knote) kn_pqe; /* knote for the processing queue */
struct kqueue *kn_kq; /* which kqueue we are on */
struct kevq *kn_kevq; /* the kevq the knote is on, only valid if KN_QUEUED */
struct kevq *kn_proc_kevq; /* the kevq that's processing the knote, only valid if KN_PROCESSING */
/* used by the scheduler */
struct kevq *kn_org_kevq; /* the kevq that registered the knote */
struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */

View File

@ -31,6 +31,7 @@
#ifndef _SYS_EVENTVAR_H_
#define _SYS_EVENTVAR_H_
#include <sys/queue.h>
#ifndef _KERNEL
#error "no user-serviceable parts inside"
#endif
@ -66,7 +67,9 @@ struct kevq {
#define KEVQ_CLOSING 0x02
#define KEVQ_ACTIVE 0x04
#define KEVQ_WS 0x08 /* the kevq is work stealing */
#define KEVQ_SCAN 0x10 /* the kevq is being scanned */
int kevq_state;
int kn_proc_count; /* number of processing knotes */
int kn_count; /* number of pending knotes */
int kn_rt_count; /* number of runtime knotes */
/* end 1st cache line */
@ -83,6 +86,7 @@ struct kevq {
struct knote *kn_marker;
struct ktailq kn_rt_head; /* list of pending knotes with runtime priority */
struct knote *kn_marker_rt;
struct ktailq kn_proc_head; /* list of pending knotes being processed */
int kevq_refcnt;
/* TODO: maybe these should be in kqdomain or global */

View File

@ -803,7 +803,7 @@ test_socket_check_rt(int kqfd, int kev_sz, int rtcnt)
int nev = kevent_get_n(kqfd, kev, kev_sz);
if (nev != kev_sz) {
err(1, "too few events");
err(1, "too few events: expected %d, recvd %d", kev_sz, nev);
}
for (int i = 0; i < rtcnt; i++) {