priority queue. Fixed last_nkev.

This commit is contained in:
Oscar Zhao 2019-09-06 16:52:22 -04:00
parent d74786ec34
commit 98f588928f
6 changed files with 455 additions and 120 deletions

View File

@ -126,6 +126,7 @@ sysctl_ws_int(SYSCTL_HANDLER_ARGS)
SYSCTL_PROC(_kern, OID_AUTO, kq_ws_int, CTLTYPE_U32 | CTLFLAG_RW, 0, 0, sysctl_ws_int, "IU", "KQueue work stealing interval in microseconds.");
#define KQ_RTSHARE_DEFAULT (100)
#define KQDOM_FLAGS (KQ_SCHED_CPU | KQ_SCHED_QUEUE)
#define KEVQ_LAT_FLAGS ((uint64_t)-1) //(KQ_SCHED_CPU | KQ_SCHED_QUEUE | KQ_SCHED_BEST)
@ -169,7 +170,12 @@ static int kevq_acquire(struct kevq *kevq, int locked);
static void kevq_worksteal(struct kevq *kevq);
static void kevq_drain(struct kevq *kevq, struct thread *td);
static void kevq_activate(struct kevq *kevq, struct thread *td);
static struct kevq * kevq_vec_select_kevq(struct veclist *lst, int num_rand);
static struct knote * kevq_peek_knote(struct kevq *kevq);
static inline void kevq_delete_knote(struct kevq *kevq, struct knote *kn);
static void kevq_insert_knote(struct kevq *kevq, struct knote *kn);
static int kevq_total_knote(struct kevq *kevq);
static struct knote * kevq_dequeue(struct kevq *kevq, int rt);
static int kqueue_acquire_kevq(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevq);
static void kqueue_ensure_kqdom(struct kqueue *kq);
@ -247,8 +253,6 @@ static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur
static struct kqdom * kqdom_build(void);
static struct kqdom * kqdom_find(struct kqdom *root, int cpuid);
static struct kevq *kevq_vec_select_kevq(struct veclist *lst, int num_rand);
static void filt_kqdetach(struct knote *kn);
static int filt_kqueue(struct knote *kn, long hint);
static int filt_procattach(struct knote *kn);
@ -402,17 +406,20 @@ SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW,
#define KQSCHED_PARSE_SCHED(sf) ((sf) & 0xFF)
#define KQSCHED_PARSE_SARGS(sf) (((sf) >> 8) & 0xFF)
#define KQSCHED_PARSE_FARGS(sf) (((sf) >> 24) & 0xFF)
#define KQSCHED_PARSE_FLAGS(sf) (((sf) >> 16) & 0xFF)
#define KQSCHED_PARSE_FEAT(sf) (((sf) >> 16) & 0xFF)
#define KQSCHED_GET_SCHED(kq) (kq->kq_ssched)
#define KQSCHED_GET_SARGS(kq) (kq->kq_ssargs)
#define KQSCHED_GET_FARGS(kq) (kq->kq_sfargs)
#define KQSCHED_GET_FLAGS(kq) (kq->kq_sflags)
#define KQSCHED_GET_FEAT(kq) (kq->kq_sfeat)
#define KQTUNE_PARSE_ARGS(sf) (((sf) >> 16) & 0xFFFF)
#define KQTUNE_PARSE_OBJ(sf) ((sf) & 0xFFFF)
static inline long
kevq_exp_lat(struct kevq *kevq)
{
return kevq->kevq_avg_lat * kevq->kn_count + kevq->kevq_last_kev - get_cyclecount();
return kevq->kevq_avg_lat * kevq_total_knote(kevq) + kevq->kevq_last_kev - get_cyclecount();
}
static inline long
@ -649,7 +656,7 @@ filt_kqueue(struct knote *kn, long hint)
if (kevq == NULL) {
return 0;
} else {
kn->kn_data = kevq->kn_count;
kn->kn_data = kevq_total_knote(kevq);
return (kn->kn_data > 0);
}
}
@ -1236,6 +1243,8 @@ kqueue_init(struct kqueue *kq)
veclist_init(&kq->kevq_vlist, 0, M_KQUEUE);
rw_init(&kq->kevq_vlist_lk, "kevq_vlist_lk");
kqueue_ensure_kqdom(kq);
kq->kq_rtshare = KQ_RTSHARE_DEFAULT;
}
int
@ -2026,7 +2035,15 @@ static void
kevq_init(struct kevq *kevq) {
mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK);
TAILQ_INIT(&kevq->kn_head);
kevq->kevq_last_kev = 0;
TAILQ_INIT(&kevq->kn_rt_head);
kevq->kn_marker.kn_status = KN_MARKER;
kevq->kn_marker_rt.kn_status = KN_MARKER;
kevq->kn_marker_rt.kn_flags = EV_REALTIME;
kevq->kn_marker.kn_kevq = kevq;
kevq->kn_marker_rt.kn_kevq = kevq;
mtx_init(&kevq->kn_marker.kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK);
mtx_init(&kevq->kn_marker_rt.kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK);
}
static void
@ -2391,16 +2408,18 @@ kqdom_destroy(struct kqdom *root)
static void
kevq_dump(struct sbuf *buf, struct kevq *kevq, int level)
{
sbuf_printf(buf, "%*c<kevq: %p knotes: %d "
"total_time: %ld "
"total_syscall: %ld "
"total_events: %ld "
"avg_latency: %ld "
"avg_events: %ld "
"total_fallbacks: %ld "
"total_mismatches: %ld "
"total_worksteal: %ld/>\n",
level * DUMP_INDENT, ' ', kevq, kevq->kn_count,
sbuf_printf(buf, "%*c<kevq ptr=\"%p\" "
"knotes=\"%d\" "
"rt_knotes=\"%d\" "
"total_time=\"%ld\" "
"total_syscall=\"%ld\" "
"total_events=\"%ld\" "
"avg_latency=\"%ld\" "
"avg_events=\"%ld\" "
"total_fallbacks=\"%ld\" "
"total_mismatches=\"%ld\" "
"total_worksteal=\"%ld\" />\n",
level * DUMP_INDENT, ' ', kevq, kevq->kn_count, kevq->kn_rt_count,
kevq->kevq_tot_time,
kevq->kevq_tot_syscall,
kevq->kevq_tot_ev,
@ -2416,7 +2435,7 @@ kqdom_dump(struct sbuf *buf, struct kqdom *kqd, int level)
{
/* XXX: No potential race between this and kqdom_build() for now.
* If we move kqdom_build() out of kqueue() syscall then there is a potential race */
sbuf_printf(buf, "%*c<kqdom id: %d level: %d cpu_mask:0x%lx #children: %d #active: %d leaf: %d #kevq: %d>\n", level * DUMP_INDENT, ' ',
sbuf_printf(buf, "%*c<kqdom id=\"%d\" level=\"%d\" cpu_mask=\"0x%lx\" num_children=\"%d\" num_active=\"%d\" leaf=\"%d\" num_kevq=\"%d\">\n", level * DUMP_INDENT, ' ',
kqd->id,
level,
kqd->cpu_mask.__bits[0],
@ -2692,10 +2711,10 @@ knote_stealable(struct knote *kn)
}
static inline int
kevq_stealable(struct kevq* kevq)
kevq_stealable(struct kevq *kevq)
{
//CTR3(KTR_KQ, "kevq_stealable: AVAIL: %d, kn_cnt: %d, WS: %d", kevq_avail(kevq), kevq->kn_count, kevq->kevq_state & KEVQ_WS);
return kevq_avail(kevq) && kevq->kn_count > 0 && !(kevq->kevq_state & KEVQ_WS);
return kevq_avail(kevq) && kevq_total_knote(kevq) > 0 && !(kevq->kevq_state & KEVQ_WS);
}
static void
@ -2742,7 +2761,7 @@ kevq_worksteal(struct kevq *kevq)
if (other_kevq != NULL) {
KEVQ_OWNED(other_kevq);
/* steal from the first because it arrived late */
ws_kn = TAILQ_FIRST(&other_kevq->kn_head);
ws_kn = kevq_peek_knote(other_kevq);
/* TODO: maybe limit the number of knotes to go through */
while((ws_count < tgt_count) && (ws_kn != NULL)) {
@ -2853,15 +2872,20 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
{
struct kqueue *kq;
struct kevent *kevp;
struct knote *kn, *marker;
struct knote *kn, *marker, *rtmarker;
struct knlist *knl;
sbintime_t asbt, rsbt, fsbt;
int wait;
int count, error, haskqglobal, influx, nkev, touch, fevent;
int evlimit;
wait = 0;
struct ktailq *kntq;
int *kncnt;
int rtlimit, curr, rdrained, pri;
curr = 0;
rdrained = 0;
count = 0;
kq = kevq->kq;
count = maxevents;
nkev = 0;
error = 0;
haskqglobal = 0;
@ -2870,6 +2894,31 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (maxevents == 0)
goto done_nl;
/* adjust max events according to the target frequency */
if ((kq->kq_flags & KQ_FLAG_MULTI) && kq->kq_tfreq > 0 && kevq->kevq_avg_lat > 0) {
/* expected events per syscall
* = (expected seconds per syscall) / (seconds per event)
* = .............................. / (avg cycles per event / cycles per second)
* = (1 / kq->kq_tfreq) / (kevq->kevq_avg_lat / hz)
* = (hz / (kevq->kevq_avg_lat * kq->kq_tfreq))
*/
evlimit = hz / (kevq->kevq_avg_lat * kq->kq_tfreq);
if (evlimit == 0) {
evlimit = 1;
}
if (evlimit < maxevents) {
maxevents = evlimit;
}
}
/* adjust rtlimit according to the target share
* = ceil(maxevents * kq->kq_rtshare%)
*/
rtlimit = (maxevents * kq->kq_rtshare + 99) / 100;
KASSERT(rtlimit >= 0, ("the math above is fundamentally broken"));
rsbt = 0;
if (tsp != NULL) {
if (tsp->tv_sec < 0 || tsp->tv_nsec < 0 ||
@ -2902,66 +2951,67 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
kevq_activate(kevq, td);
}
marker = knote_alloc(M_WAITOK);
CTR2(KTR_KQ, "kqueue_scan: td %d allocated marker %p", td->td_tid, marker);
marker->kn_status = KN_MARKER;
if (kq->kq_flags & KQ_FLAG_MULTI) {
marker = &kevq->kn_marker;
rtmarker = &kevq->kn_marker_rt;
} else {
marker = knote_alloc(M_WAITOK);
rtmarker = knote_alloc(M_WAITOK);
marker->kn_status = KN_MARKER;
rtmarker->kn_status = KN_MARKER;
rtmarker->kn_flags = EV_REALTIME;
}
KEVQ_LOCK(kevq);
retry:
KEVQ_OWNED(kevq);
if (kevq->kn_count == 0 && (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS)) {
if (kevq_total_knote(kevq) == 0 && (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS)) {
kevq_worksteal(kevq);
}
KEVQ_OWNED(kevq);
kevp = keva;
//CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq_total_knote(kevq));
if (kevq->kn_count == 0) {
if (kevq_total_knote(kevq) == 0) {
if (fsbt == -1) {
error = EWOULDBLOCK;
} else {
if (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS) {
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target wait sbt: %ld", td->td_tid, kevq, fsbt);
//printf("kqueue_scan: td %d, kevq %p target wait sbt: %ld\n", td->td_tid, kevq, fsbt);
if (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS) {
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target wait sbt: %ld", td->td_tid, kevq, fsbt);
/* calculate rsbt */
if (fsbt == 0) {
/* if wait indefinitely, sleep for ws_interval */
rsbt = ws_int_sbt;
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p indefinite wait...", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p indefinite wait...\n", td->td_tid, kevq);
CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p indefinite wait...", td->td_tid, kevq);
} else {
/* get the current asbt */
if (TIMESEL(&asbt, ws_int_sbt)) {
asbt += tc_tick_sbt;
}
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p current sbt: %ld", td->td_tid, kevq, asbt);
//printf("kqueue_scan: td %d, kevq %p current sbt: %ld\n", td->td_tid, kevq, asbt);
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p current sbt: %ld", td->td_tid, kevq, asbt);
/* calc the difference */
rsbt = fsbt - asbt;
if (rsbt <= 0) {
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p target sbt reached.", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p target sbt reached.\n", td->td_tid, kevq);
CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p target sbt reached.", td->td_tid, kevq);
/* we are already overdue */
error = 0;
goto done;
} else {
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target difference: %ld", td->td_tid, kevq, rsbt);
//printf("kqueue_scan: td %d, kevq %p target difference: %ld\n", td->td_tid, kevq, rsbt);
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p target difference: %ld", td->td_tid, kevq, rsbt);
if (rsbt > ws_int_sbt) {
rsbt = ws_int_sbt;
} else {
/* if it's the last time waiting, we set fsbt = -1, which causes us to return no matter what next time */
fsbt = -1;
//CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for the last time, setting fsbt to -1", td->td_tid, kevq);
//printf("kqueue_scan: td %d, kevq %p sleeping for the last time, setting fsbt to -1\n", td->td_tid, kevq);
CTR2(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for the last time, setting fsbt to -1", td->td_tid, kevq);
}
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for %ld", td->td_tid, kevq, rsbt);
//printf("kqueue_scan: td %d, kevq %p sleeping for %ld\n", td->td_tid, kevq, rsbt);
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p sleeping for %ld", td->td_tid, kevq, rsbt);
}
}
@ -2971,15 +3021,14 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
else
asbt = fsbt; /* wait till fsbt, shouldn't happen */
//CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p next wakeup sbt: %ld", td->td_tid, kevq, asbt);
//printf("kqueue_scan: td %d, kevq %p next wakeup sbt: %ld\n", td->td_tid, kevq, asbt);
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p next wakeup sbt: %ld", td->td_tid, kevq, asbt);
}
kevq->kevq_state |= KEVQ_SLEEP;
//CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq);
CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq);
error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH,
"kqread", asbt, rsbt >> tc_precexp, C_ABSOLUTE);
//CTR2(KTR_KQ, "kqueue_scan: td %d wokeup on kevq %p for events", td->td_tid, kevq);
CTR2(KTR_KQ, "kqueue_scan: td %d wokeup from kevq %p for events", td->td_tid, kevq);
}
if (error == 0)
@ -2988,7 +3037,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if (error == ERESTART)
error = EINTR;
else if (error == EWOULDBLOCK) {
if (KQSCHED_GET_FLAGS(kq) & KQ_SCHED_FLAG_WS && fsbt != -1) {
if (KQSCHED_GET_FEAT(kq) & KQ_SCHED_FEAT_WS && fsbt != -1) {
goto retry;
}
error = 0;
@ -2997,15 +3046,43 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
KEVQ_OWNED(kevq);
/* quick check */
if (curr < rtlimit) {
rdrained = 0;
TAILQ_INSERT_TAIL(&kevq->kn_rt_head, rtmarker, kn_tqe);
} else {
rdrained = 1;
}
TAILQ_INSERT_TAIL(&kevq->kn_head, marker, kn_tqe);
influx = 0;
while (count) {
while (count < maxevents) {
KEVQ_OWNED(kevq);
kn = TAILQ_FIRST(&kevq->kn_head);
/* fullfill the limit first */
if (!rdrained) {
if (curr < rtlimit) {
kntq = &kevq->kn_rt_head;
kncnt = &kevq->kn_rt_count;
kn = TAILQ_FIRST(kntq);
pri = 1;
} else {
/* we've reached the limit, dequeue the realtime marker knote */
kn = rtmarker;
}
} else {
kntq = &kevq->kn_head;
kncnt = &kevq->kn_count;
pri = 0;
kn = TAILQ_FIRST(kntq);
}
KASSERT(kn != NULL, ("kqueue_scan dequeued NULL"));
KN_FLUX_LOCK(kn);
if ((kn->kn_status == KN_MARKER && kn != marker) ||
if ((kn->kn_status == KN_MARKER && kn != marker && kn != rtmarker) ||
kn_in_flux(kn)) {
if (influx) {
influx = 0;
@ -3024,31 +3101,41 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
/* Now we have exclusive access to kn */
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
TAILQ_REMOVE(kntq, kn, kn_tqe);
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn);
if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
kevq->kn_count--;
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
continue;
}
if (kn == marker) {
if (kn == marker || kn == rtmarker) {
/* We are dequeuing our marker, wakeup threads waiting on it */
knote_flux_wakeup(kn);
KN_FLUX_UNLOCK(kn);
CTR2(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p", td->td_tid, kn);
if (count == maxevents) {
if (kn == rtmarker) {
rdrained = 1;
continue;
}
if (count == 0) {
goto retry;
}
goto done;
}
KASSERT(!kn_in_flux(kn),
("knote %p is unexpectedly in flux", kn));
if ((kn->kn_flags & EV_DROP) == EV_DROP) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
kevq->kn_count--;
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
/*
@ -3061,7 +3148,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
} else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) {
kn->kn_status &= ~(KN_QUEUED | KN_PROCESSING | KN_WS);
knote_enter_flux(kn);
kevq->kn_count--;
*kncnt -= 1;
KN_FLUX_UNLOCK(kn);
KEVQ_UNLOCK(kevq);
/*
@ -3095,7 +3182,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
knote_leave_flux_ul(kn);
KEVQ_LOCK(kevq);
kevq->kn_count--;
*kncnt -= 1;
kn_list_unlock(knl);
influx = 1;
CTR3(KTR_KQ, "kqueue_scan: td %d, kevq %p returned stolen knote %p", td->td_tid, kevq, kn);
@ -3107,7 +3194,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN | KN_PROCESSING | KN_WS);
knote_leave_flux_ul(kn);
kevq->kn_count--;
*kncnt -= 1;
kn_list_unlock(knl);
influx = 1;
CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not valid anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid);
@ -3134,7 +3221,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_PROCESSING | KN_WS);
kevq->kn_count--;
*kncnt -= 1;
} else {
/* this flag is here to prevent a subtle workstealing race where one thread gets an fd
and returns, before it can process the event, another thread steals the knote and
@ -3142,7 +3229,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
Work stealing will avoid stealing knotes with this flag set */
kn->kn_status |= KN_PROCESSING;
CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq);
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
TAILQ_INSERT_TAIL(kntq, kn, kn_tqe);
}
kn->kn_status &= ~KN_SCAN;
@ -3154,7 +3241,11 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
/* we are returning a copy to the user */
kevp++;
nkev++;
count--;
count++;
if (pri) {
curr++;
}
if (nkev == KQ_NEVENTS) {
influx = 0;
@ -3169,7 +3260,14 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
break;
}
}
/* getting here means more events than the return buffer */
if (!rdrained) {
TAILQ_REMOVE(&kevq->kn_rt_head, rtmarker, kn_tqe);
}
TAILQ_REMOVE(&kevq->kn_head, marker, kn_tqe);
done:
KEVQ_OWNED(kevq);
@ -3177,35 +3275,40 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
knote_flux_wakeup_ul(kn);
}
if (marker != NULL) {
knote_flux_wakeup_ul(marker);
}
knote_flux_wakeup_ul(marker);
knote_flux_wakeup_ul(rtmarker);
KEVQ_UNLOCK(kevq);
if (nkev > 0 && (KQSCHED_GET_SCHED(kq) & KEVQ_LAT_FLAGS)) {
CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid);
if (!(kq->kq_flags & KQ_FLAG_MULTI)) {
knote_free(marker);
knote_free(rtmarker);
}
if (KQSCHED_GET_SCHED(kq) & KEVQ_LAT_FLAGS) {
/* book keep the statistics */
kevq->kevq_last_kev = get_cyclecount();
kevq->kevq_last_nkev = nkev;
kevq->kevq_last_nkev = count;
CTR3(KTR_KQ, "kevent: td %d nkev %d kevent (exit) %ld ns", td->td_tid, kevq->kevq_last_nkev, kevq->kevq_last_kev);
/* update total ev */
kevq->kevq_tot_ev += nkev;
kevq->kevq_tot_ev += count;
kevq->kevq_tot_syscall++;
if (kevq->kevq_avg_ev == 0) {
kevq->kevq_avg_ev = nkev;
kevq->kevq_avg_ev = count;
} else {
kevq->kevq_avg_ev = calc_overtime_avg(kevq->kevq_avg_ev, nkev, 80);
kevq->kevq_avg_ev = calc_overtime_avg(kevq->kevq_avg_ev, count, 80);
}
}
CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid);
knote_free(marker);
done_nl:
KEVQ_NOTOWNED(kevq);
if (nkev != 0)
if (nkev != 0) {
error = k_ops->k_copyout(k_ops->arg, keva, nkev);
td->td_retval[0] = maxevents - count;
}
td->td_retval[0] = count;
return (error);
}
@ -3258,9 +3361,10 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
int sched;
struct sbuf buf;
char *rbuf;
int tune;
kq = fp->f_data;
CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd);
CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd);
switch (cmd) {
case FKQMULTI:
KQ_LOCK(kq);
@ -3270,15 +3374,43 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
kq->kq_flags |= (KQ_FLAG_INIT | KQ_FLAG_MULTI);
sched = *(int*)data;
kq->kq_sflags = KQSCHED_PARSE_FLAGS(sched);
kq->kq_sfeat = KQSCHED_PARSE_FEAT(sched);
kq->kq_ssargs = KQSCHED_PARSE_SARGS(sched);
kq->kq_ssched = KQSCHED_PARSE_SCHED(sched);
kq->kq_sfargs = KQSCHED_PARSE_FARGS(sched);
CTR5(KTR_KQ, "kqueue_ioctl: multi kq %p, sched: %d sargs: %d flags: %d fargs: %d", kq,
KQSCHED_GET_SCHED(kq),
KQSCHED_GET_SARGS(kq),
KQSCHED_GET_FLAGS(kq),
KQSCHED_GET_FARGS(kq));
}
if (!error) {
CTR5(KTR_KQ, "kqueue_ioctl: multi kq %p, sched: %d sargs: %d feat: %d fargs: %d", kq, KQSCHED_GET_SCHED(kq), KQSCHED_GET_SARGS(kq),
KQSCHED_GET_FEAT(kq), KQSCHED_GET_FARGS(kq));
}
KQ_UNLOCK(kq);
break;
case FKQTUNE:
KQ_LOCK(kq);
tune = *(int*)data;
switch KQTUNE_PARSE_OBJ(tune) {
case KQTUNE_RTSHARE:
tune = KQTUNE_PARSE_ARGS(tune);
if (tune >= 0 && tune <= 100)
kq->kq_rtshare = tune;
else
error = (EINVAL);
break;
case KQTUNE_FREQ:
tune = KQTUNE_PARSE_ARGS(tune);
if (tune >= 0)
kq->kq_tfreq = tune;
else
error = (EINVAL);
break;
default:
error = (EINVAL);
}
if (!error) {
CTR3(KTR_KQ, "kqueue_ioctl: tune kq %p, rtshare: %d tfreq: %d", kq, kq->kq_rtshare, kq->kq_tfreq);
}
KQ_UNLOCK(kq);
break;
@ -3289,20 +3421,25 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
sbuf_new(&buf, rbuf, 1024 * 1024, SBUF_FIXEDLEN | SBUF_INCLUDENUL);
if (kq->kq_flags & KQ_FLAG_MULTI) {
sbuf_printf(&buf, "KQ 0x%p dump:\n\nActive KEVQs:\n", kq);
sbuf_printf(&buf, "<kq_dump ptr=\"0x%p\" sched=\"0x%x\" sargs=\"0x%x\" feat=\"0x%x\" fargs=\"0x%x\" rtshare=\"%d\" tfreq=\"%d\" >\n", kq,
kq->kq_ssched, kq->kq_ssargs, kq->kq_sfeat,
kq->kq_sfargs,kq->kq_rtshare, kq->kq_tfreq);
sbuf_printf(&buf, "\n%*c<kevq_dump>\n", 1 * DUMP_INDENT, ' ');
KVLST_RLOCK(kq);
for(int i = 0; i < veclist_size(&kq->kevq_vlist); i++) {
kevq_dump(&buf, veclist_at(&kq->kevq_vlist, i), 0);
kevq_dump(&buf, veclist_at(&kq->kevq_vlist, i), 2);
}
KVLST_RUNLOCK(kq);
sbuf_printf(&buf, "%*c</kevq_dump>\n", 1 * DUMP_INDENT, ' ');
/* dump kqdom if used */
if (KQSCHED_GET_SCHED(kq) & KQDOM_FLAGS) {
sbuf_printf(&buf, "\nKQDOM:\n");
kqdom_dump(&buf, kq->kq_kqd, 0);
sbuf_printf(&buf, "\n%*c<kqdom_dump>\n", 1 * DUMP_INDENT, ' ');
kqdom_dump(&buf, kq->kq_kqd, 2);
sbuf_printf(&buf, "%*c</kqdom_dump>\n", 1 * DUMP_INDENT, ' ');
}
sbuf_printf(&buf, "\n</kq_dump>\n");
} else {
error = (EINVAL);
}
@ -3310,7 +3447,7 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
sbuf_finish(&buf);
/* XXX: severe hack */
memcpy((void*)*(uintptr_t*)data, sbuf_data(&buf), sbuf_len(&buf) > 1024 * 1024 ? 1024 * 1024 : sbuf_len(&buf));
copyout(sbuf_data(&buf), (void*)*(uintptr_t*)data, sbuf_len(&buf) > 1024 * 1024 ? 1024 * 1024 : sbuf_len(&buf));
sbuf_delete(&buf);
free(rbuf, M_KQUEUE);
@ -3341,7 +3478,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
revents = 0;
} else {
if (events & (POLLIN | POLLRDNORM)) {
if (kevq->kn_count) {
if (kevq_total_knote(kevq)) {
revents |= events & (POLLIN | POLLRDNORM);
} else {
selrecord(td, &kq->kq_sel);
@ -3392,7 +3529,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
struct knote *kn;
struct kqdom *kqd;
struct kevqlist *kevq_list;
CTR3(KTR_KQ, "kevq_drain for %p (refcnt = %d) with %d knotes", kevq, kevq->kevq_refcnt, kevq->kn_count);
CTR3(KTR_KQ, "kevq_drain for %p (refcnt = %d) with %d knotes", kevq, kevq->kevq_refcnt, kevq_total_knote(kevq));
kq = kevq->kq;
kqd = kevq->kevq_kqd;
@ -3451,7 +3588,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
KEVQ_LOCK(kevq);
/* drain all knotes on the kevq */
while ((kn = TAILQ_FIRST(&kevq->kn_head)) != NULL) {
while ((kn = kevq_peek_knote(kevq)) != NULL) {
KEVQ_OWNED(kevq);
KN_FLUX_LOCK(kn);
/* Wait for kn to stablize */
@ -3495,7 +3632,7 @@ kevq_drain(struct kevq *kevq, struct thread *td)
KN_LEAVE_FLUX_WAKEUP(kn);
}
KASSERT(kevq->kn_count == 0, ("some knotes are left"));
KASSERT(kevq_total_knote(kevq) == 0, ("some knotes are left"));
KEVQ_OWNED(kevq);
KEVQ_UNLOCK(kevq);
@ -4203,7 +4340,6 @@ knote_drop_detached(struct knote *kn, struct thread *td)
struct klist *list;
kq = kn->kn_kq;
kevq = kn->kn_kevq;
KASSERT((kn->kn_status & KN_DETACHED) != 0,
("knote %p still attached", kn));
@ -4223,6 +4359,7 @@ knote_drop_detached(struct knote *kn, struct thread *td)
SLIST_REMOVE(list, kn, knote, kn_link);
if (kn->kn_status & KN_QUEUED) {
kevq = kn->kn_kevq;
KEVQ_LOCK(kevq);
knote_dequeue(kn);
KEVQ_UNLOCK(kevq);
@ -4470,6 +4607,60 @@ knote_sched(struct knote *kn)
}
}
/* Here comes kevq priority queue - like operations */
static int
kevq_total_knote(struct kevq *kevq)
{
return (kevq->kn_count + kevq->kn_rt_count);
}
static struct knote *
kevq_peek_knote(struct kevq *kevq)
{
struct knote *kn;
kn = NULL;
KEVQ_OWNED(kevq);
if (kevq->kn_rt_count > 0) {
kn = TAILQ_FIRST(&kevq->kn_rt_head);
KASSERT((kn->kn_flags & EV_REALTIME), ("knote in the wrong queue"));
} else if (kevq->kn_count > 0) {
kn = TAILQ_FIRST(&kevq->kn_head);
KASSERT(!(kn->kn_flags & EV_REALTIME), ("knote in the wrong queue"));
}
return kn;
}
static inline void
kevq_delete_knote(struct kevq *kevq, struct knote *kn)
{
KEVQ_OWNED(kevq);
if (kn->kn_flags & EV_REALTIME) {
TAILQ_REMOVE(&kevq->kn_rt_head, kn, kn_tqe);
kevq->kn_rt_count--;
} else {
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
kevq->kn_count--;
}
}
static void
kevq_insert_knote(struct kevq *kevq, struct knote *kn)
{
KEVQ_OWNED(kevq);
if (kn->kn_flags & EV_REALTIME) {
TAILQ_INSERT_TAIL(&kevq->kn_rt_head, kn, kn_tqe);
kevq->kn_rt_count++;
} else {
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
kevq->kn_count++;
}
}
/* END Priority Queue */
static void
knote_enqueue(struct knote *kn, struct kevq *kevq)
{
@ -4480,6 +4671,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
KEVQ_OWNED(kevq);
KASSERT(kn_in_flux(kn), ("enqueuing a knote that's not in flux"));
KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued"));
/* Queuing to a closing kevq is fine.
@ -4489,9 +4681,8 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
kn->kn_kevq = kevq;
kn->kn_status |= KN_QUEUED;
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
kevq->kn_count++;
kevq_insert_knote(kevq, kn);
kevq_wakeup(kevq);
}
@ -4499,17 +4690,18 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
static void
knote_dequeue(struct knote *kn)
{
struct kevq *kevq = kn->kn_kevq;
struct kevq *kevq;
KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued"));
kevq = kn->kn_kevq;
KEVQ_OWNED(kevq);
CTR3(KTR_KQ, "knote_dequeue: kn %p from kevq %p flag: 0x%x", kn, kevq, kn->kn_status);
KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued"));
kevq_delete_knote(kevq, kn);
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
kn->kn_status &= ~KN_QUEUED;
kn->kn_kevq = NULL;
kevq->kn_count--;
}
static void

View File

@ -146,6 +146,7 @@ struct kevent32_freebsd11 {
#define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */
#define EV_DISPATCH 0x0080 /* disable event after reporting */
#define EV_AFFINITY 0x0200 /* in multithreaded mode, this event has hard affinity for the registering thread */
#define EV_REALTIME 0x0400 /* this knote has REALTIME priority */
#define EV_SYSFLAGS 0xF000 /* reserved by system */
#define EV_DROP 0x1000 /* note should be dropped */
@ -221,6 +222,7 @@ struct kevent32_freebsd11 {
struct knote;
SLIST_HEAD(klist, knote);
TAILQ_HEAD(ktailq, knote);
struct kqueue;
TAILQ_HEAD(kqlist, kqueue);
struct kevq;
@ -291,7 +293,7 @@ struct knote {
TAILQ_ENTRY(knote) kn_tqe;
TAILQ_ENTRY(knote) kn_wse; /* for work stealing queue */
struct kqueue *kn_kq; /* which kqueue we are on */
struct kevq *kn_kevq; /* the kevq the knote is on */
struct kevq *kn_kevq; /* the kevq the knote is on, only valid if KN_QUEUED */
/* used by the scheduler */
struct kevq *kn_org_kevq; /* the kevq that registered the knote */
struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */
@ -396,7 +398,8 @@ __END_DECLS
* The ioctl to set multithreaded mode
*/
#define FKQMULTI _IOW('f', 89, int)
#define FKQMPRNT _IOW('f', 90, uintptr_t)
#define FKQTUNE _IOW('f', 90, int)
#define FKQMPRNT _IOW('f', 91, uintptr_t)
/*
* KQ sched
@ -406,15 +409,22 @@ __END_DECLS
#define KQ_SCHED_BEST 0x04 /* Best of N, sarg = N */
/*
* KQ sched flags
* KQ sched features
*/
#define KQ_SCHED_FLAG_WS 0x01 /* work stealing, farg = # of knotes to steal */
#define KQ_SCHED_FEAT_WS 0x01 /* work stealing, farg = # of knotes to steal */
/*
* KQ tunables
*/
#define KQTUNE_FREQ 0x01 /* the target frequency of each call, default 0 meaning unlimited */
#define KQTUNE_RTSHARE 0x02 /* the percent share of runtime events vs batch events, default 100 meaning always hand runtime events first */
/*
* 0 - 7: sched
* 8 - 15: sargs
* 16 - 23: flags
* 16 - 23: features
* 24 - 31: fargs
*/
#define KQSCHED_MAKE(sched, sargs, flags, fargs) (((sched) & 0xFF) | (((sargs) & 0xFF) << 8) | (((flags) & 0xFF) << 16) | (((fargs) & 0xFF) << 24))
#define KQSCHED_MAKE(sched, sargs, feat, fargs) (((sched) & 0xFF) | (((sargs) & 0xFF) << 8) | (((feat) & 0xFF) << 16) | (((fargs) & 0xFF) << 24))
#define KQTUNE_MAKE(obj, val) ((obj & 0xFFFF) | (val & 0xFFFF) << 16)
#endif /* !_SYS_EVENT_H_ */

View File

@ -57,8 +57,13 @@ struct kevq {
/* XXX: Make kevq contain a struct thread ptr instead of this dude */
struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */
struct mtx lock; /* the lock for the kevq */
TAILQ_HEAD(, knote) kn_head; /* list of pending knotes */
struct ktailq kn_head; /* list of pending knotes */
struct knote kn_marker;
struct ktailq kn_rt_head; /* list of pending knotes with runtime priority */
struct knote kn_marker_rt;
int kn_count; /* number of pending knotes */
int kn_rt_count; /* number of runtime knotes */
#define KEVQ_SLEEP 0x01
#define KEVQ_CLOSING 0x02
#define KEVQ_ACTIVE 0x04
@ -123,10 +128,14 @@ struct kqueue {
struct kevqlist kq_kevqlist; /* list of kevqs */
/* scheduler flags for the KQ, set by IOCTL */
int kq_sflags;
int kq_sfeat;
int kq_ssargs;
int kq_ssched;
int kq_sfargs;
/* tuneables for the KQ, set by IOCTL */
int kq_tfreq;
int kq_rtshare;
/* Default */
struct rwlock kevq_vlist_lk;

View File

@ -47,6 +47,7 @@ extern char * kevent_to_str(struct kevent *);
struct kevent * kevent_get(int);
struct kevent * kevent_get_timeout(int, int);
struct kevent * kevent_get_timeout_u(int kqfd, uint64_t useconds);
int kevent_get_n(int kqfd, struct kevent *kev, int n);
void kevent_cmp(struct kevent *, struct kevent *);

View File

@ -78,6 +78,19 @@ test_no_kevents_quietly(void)
}
}
/* Retrieve n kevents */
int
kevent_get_n(int kqfd, struct kevent *kev, int n)
{
int nfds;
nfds = kevent(kqfd, NULL, 0, kev, n, NULL);
if (nfds < 1)
err(1, "kevent(2)");
return nfds;
}
/* Retrieve a single kevent */
struct kevent *
kevent_get(int kqfd)

View File

@ -56,7 +56,7 @@ static inline void
dump_gkq()
{
int error;
uintptr_t para = (uintptr_t)&dmpbuf;
uintptr_t para = (uintptr_t)dmpbuf;
/* dump KQ */
memset(dmpbuf, 0, 1024 * 1024 + 1);
error = ioctl(g_kqfd, FKQMPRNT, &para);
@ -581,7 +581,7 @@ test_socket_ws_timeout()
const char *test_id = "[Multi][WS]kevent_timeout(evfilt)";
test_begin(test_id);
int flags = KQSCHED_MAKE(0,0,KQ_SCHED_FLAG_WS,1);
int flags = KQSCHED_MAKE(0,0,KQ_SCHED_FEAT_WS,1);
int tkqfd = kqueue();
int error = ioctl(tkqfd, FKQMULTI, &flags);
if (error == -1) {
@ -640,6 +640,9 @@ test_socket_ws_timeout()
#define SOCK_BRUTE_CNT (128)
#define PACKET_BRUTE_CNT (50 * (SOCK_BRUTE_CNT))
#define THREAD_EXIT_PROB (67)
#define BRUTE_REALTIME_PROB (50)
#define BRUTE_MAX_FREQ (10000)
#define BRUTE_MIN_FREQ (1)
#define RAND_SLEEP (29)
#define RAND_SEND_SLEEP (7)
@ -716,6 +719,9 @@ test_socket_brutal(char* name)
test_begin(id);
srand(time(NULL));
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
@ -723,7 +729,9 @@ test_socket_brutal(char* name)
err(1, "kevent_socket");
}
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &brute_sockfd[i][0]);
int evflag = (rand() % 100 < BRUTE_REALTIME_PROB) ? EV_REALTIME : 0;
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD | evflag, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
@ -742,9 +750,7 @@ test_socket_brutal(char* name)
pthread_mutex_init(&brute_threadinfo[i].lock, NULL);
pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]);
}
sleep(3);
for(int i = 0; i < PACKET_BRUTE_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", i);
@ -766,7 +772,23 @@ test_socket_brutal(char* name)
#ifdef TEST_DEBUG
printf("READ_M: waiting for all packets to finish processing. Cur: %d Tgt: %d\n", sum, PACKET_BRUTE_CNT);
#endif
sleep(1);
/* randomize the freq and share */
int error;
int val;
val = KQTUNE_MAKE(KQTUNE_RTSHARE, rand() % 100);
error = ioctl(g_kqfd, FKQTUNE, &val);
if (error == -1) {
err(1, "ioctl TUNE");
}
val = KQTUNE_MAKE(KQTUNE_FREQ, rand() % (BRUTE_MAX_FREQ - BRUTE_MIN_FREQ) + BRUTE_MIN_FREQ);
error = ioctl(g_kqfd, FKQTUNE, &val);
if (error == -1) {
err(1, "ioctl TUNE");
}
usleep(1000);
}
/* shutdown the systems */
@ -799,6 +821,93 @@ test_socket_brutal(char* name)
success();
}
/* realtime test */
static void
test_socket_check_rt(int kqfd, int kev_sz, int rtcnt)
{
struct kevent *kev = malloc(sizeof(struct kevent) * kev_sz);
int nev = kevent_get_n(kqfd, kev, kev_sz);
if (nev != kev_sz) {
err(1, "too few events");
}
for (int i = 0; i < rtcnt; i++) {
if (!(kev[i].flags & EV_REALTIME)) {
err(1, "expected realtime");
}
}
for (int i = rtcnt; i < kev_sz; i++) {
if (kev[i].flags & EV_REALTIME) {
err(1, "expected !realtime");
}
}
free(kev);
}
static void
test_socket_rt_share(int kqfd, int kev_sz, int share)
{
if (share < 0 || share > 100) {
err(1, "INVAL");
}
int flag = KQTUNE_MAKE(KQTUNE_RTSHARE, share);
int error = ioctl(kqfd, FKQTUNE, &flag);
if (error == -1) {
err(1, "ioctl KQTUNE");
}
test_socket_check_rt(kqfd, kev_sz, (kev_sz * share + 99) / 100);
}
static void
test_socket_realtime()
{
/* create 8 sockets, 4 realtime 4 normal
* we are gonna test how kq hands requests back to us for different shares
*/
test_begin("kevent(realtime)");
int kqfd = kqueue();
struct kevent kev;
int socks[8][2];
for (int i = 0; i < 8; i++) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &socks[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, socks[i][0], EVFILT_READ, EV_ADD | (i >= 4 ? EV_REALTIME : 0), 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
}
/* push packets to the socket */
for (int i = 0; i < 8; i++) {
socket_push(socks[i][1], '.');
}
for (int i = 0; i <= 100; i++) {
test_socket_rt_share(kqfd, 4, i);
}
for (int i = 0; i < 8; i++) {
close(socks[i][0]);
close(socks[i][1]);
}
close(kqfd);
success();
}
void
test_evfilt_read_m()
{
@ -813,6 +922,7 @@ test_evfilt_read_m()
err(1, "ioctl");
}
test_socket_read(0);
test_socket_realtime();
test_socket_brutal("rand");
close(g_kqfd);
@ -870,7 +980,7 @@ test_evfilt_read_m()
close(g_kqfd);
/* WS */
flags = KQSCHED_MAKE(0,0,KQ_SCHED_FLAG_WS,1);
flags = KQSCHED_MAKE(0,0,KQ_SCHED_FEAT_WS,1);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {