This commit is contained in:
Oscar Zhao 2019-04-19 15:33:07 -04:00
parent cb22931bab
commit cb4c673500
4 changed files with 650 additions and 124 deletions

View File

@ -115,17 +115,13 @@ extern struct cpu_group *cpu_top;
} while(0)
/* no need to handle overflow as long as the existing org/cur doesn't overflow */
#define CALC_OVERTIME_AVG(org, cur, out) do { \
(out)->tv_sec = (org)->tv_sec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \
(out)->tv_nsec = (org)->tv_nsec * AVG_WEIGHT_FACTOR_OLD + (cur)->tv_nsec * AVG_WEIGHT_FACTOR_NEW; \
TIMESPEC_DIV((out), (AVG_WEIGHT_FACTOR_OLD + AVG_WEIGHT_FACTOR_NEW), (out)); \
} while(0)
#define CALC_OVERTIME_AVG(prev, cur) (((prev) * AVG_WEIGHT_FACTOR_OLD + (cur) * AVG_WEIGHT_FACTOR_NEW) / (AVG_WEIGHT_FACTOR_OLD + AVG_WEIGHT_FACTOR_NEW))
#define KEVQ_NEXT_AVAIL_LOCKED(out, head, element, member) do { \
(out) = (element); \
while(1) { \
(out) = (out) == NULL ? LIST_FIRST((head)) : LIST_NEXT((out), member); \
if ((out) != NULL) { \
if ((out) != NULL && KEVQ_AVAIL(out)) { \
KEVQ_LOCK((out)); \
if (KEVQ_AVAIL((out))) { \
break; \
@ -147,7 +143,8 @@ static void kevq_init(struct kevq *kevq);
static void kevq_release(struct kevq* kevq, int locked);
static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp);
static void kevq_destroy(struct kevq *kevq);
static int kevq_acquire(struct kevq *kevq);
static int kevq_acquire(struct kevq *kevq, int locked);
static void kevq_worksteal(struct kevq *kevq);
void kevq_drain(struct kevq *kevq);
static void knote_xinit(struct knote *kn);
@ -217,11 +214,12 @@ static void knote_sched(struct knote *kn);
static void kqdom_init(struct kqdom *kqd);
static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg);
static void kqdom_update_lat(struct kqdom *leaf, unsigned long avg);
static void kqdom_update_active(struct kqdom *leaf, int change);
static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq);
static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq);
static void kqdom_destroy(struct kqdom *root);
static void kqdom_update_stats(struct kqdom *leaf, struct timespec *avg);
static struct kevq * kqdom_random_kevq_locked(struct kqdom *kqd, struct kevq *last_kevq);
static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id);
static struct kqdom * kqdom_build(void);
static struct kqdom * kqdom_find(struct kqdom *root, int cpuid);
@ -246,6 +244,26 @@ static int filt_user(struct knote *kn, long hint);
static void filt_usertouch(struct knote *kn, struct kevent *kev,
u_long type);
static int kq_sched_bon_count = 2;
SYSCTL_INT(_kern, OID_AUTO, kq_sched_bon_count, CTLFLAG_RWTUN, &kq_sched_bon_count, 0, "the number of kevqs to select the best one from");
/* TODO: make this a percentage? */
static int kq_sched_ws_count = 1;
SYSCTL_INT(_kern, OID_AUTO, kq_sched_ws_count, CTLFLAG_RWTUN, &kq_sched_ws_count, 0, "the number of kevqs to steal each time");
// hacky fast random generator
static unsigned int g_seed = 0x1234;
// Used to seed the generator.
static void kqueue_fsrand(int seed) {
g_seed = seed;
}
// Compute a pseudorandom integer.
// Output value in range [0, 32767]
static int kqueue_frand(void) {
g_seed = (214013 * g_seed + 2531011);
return (g_seed>>16) & 0x7FFF;
}
static struct filterops file_filtops = {
.f_isfd = 1,
.f_attach = filt_fileattach,
@ -547,7 +565,7 @@ filt_kqueue(struct knote *kn, long hint)
CTR1(KTR_KQ, "filt_kqueue called for kn %p", kn);
if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ( (kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
return 0;
}
@ -1407,8 +1425,9 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan
struct kevent *kevp, *changes;
struct timespec cur_ts;
int i, n, nerrors, error;
unsigned long avg;
if ((kq->kq_state & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) {
/* Mark the global kevq as ready for single threaded mode to close the window between
kqueue_register and kqueue_scan.*/
KEVQ_LOCK(kevq);
@ -1421,22 +1440,24 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan
if (kevq->kevq_last_nkev != 0)
{
/* make sure we actually processed events last time */
getnanouptime(&cur_ts);
timespecsub(&cur_ts, &kevq->kevq_last_kev, &cur_ts);
/* divide by the number of events processed */
TIMESPEC_DIV(&cur_ts, kevq->kevq_last_nkev, &cur_ts);
if (timespecisset(&kevq->kevq_avg_lat)) {
CALC_OVERTIME_AVG(&kevq->kevq_avg_lat, &kevq->kevq_avg_lat, &cur_ts);
avg = (cur_ts.tv_sec * 1000000 + cur_ts.tv_nsec / 100) / kevq->kevq_last_nkev;
if (kevq->kevq_avg_lat != 0) {
kevq->kevq_avg_lat = CALC_OVERTIME_AVG(kevq->kevq_avg_lat, avg);
} else {
/* first time */
timespecadd(&cur_ts, &kevq->kevq_avg_lat, &kevq->kevq_avg_lat);
kevq->kevq_avg_lat = avg;
}
CTR4(KTR_KQ, "kevent: td %d spent %ld s %ld ns per event on %d events", td->td_tid, cur_ts.tv_sec, cur_ts.tv_nsec, kevq->kevq_last_nkev);
CTR3(KTR_KQ, "kevent: td %d spent %ld us per event on %d events", td->td_tid, avg, kevq->kevq_last_nkev);
/* clear parameters */
timespecclear(&kevq->kevq_last_kev);
kevq->kevq_last_nkev = 0;
kqdom_update_lat(kevq->kevq_kqd, avg);
}
KEVQ_UNLOCK(kevq);
@ -1908,7 +1929,7 @@ kevq_thred_drain(struct kevq_thred *kevq_th, struct thread* td) {
KEVQ_TH_LOCK(kevq_th);
while((kevq = LIST_FIRST(&kevq_th->kevq_list)) != NULL) {
if (kevq_acquire(kevq) == 0) {
if (kevq_acquire(kevq, 0) == 0) {
CTR2(KTR_KQ, "kevq_thred_drain: draining kevq %p on kevq_th %p", kevq, kevq_th);
KEVQ_TH_UNLOCK(kevq_th);
kevq_drain(kevq);
@ -1924,7 +1945,6 @@ static void
kevq_init(struct kevq *kevq) {
mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK);
TAILQ_INIT(&kevq->kn_head);
timespecclear(&kevq->kevq_avg_lat);
timespecclear(&kevq->kevq_last_kev);
}
@ -1944,19 +1964,25 @@ kevq_release(struct kevq* kevq, int locked)
}
static int
kevq_acquire(struct kevq *kevq)
kevq_acquire(struct kevq *kevq, int locked)
{
KEVQ_NOTOWNED(kevq);
int error;
if (locked) {
KEVQ_OWNED(kevq);
} else {
KEVQ_LOCK(kevq);
}
error = 0;
KEVQ_LOCK(kevq);
CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt);
if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) {
error = EINVAL;
} else {
kevq->kevq_refcnt++;
}
KEVQ_UNLOCK(kevq);
if (!locked) {
KEVQ_UNLOCK(kevq);
}
return error;
}
@ -1982,7 +2008,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
return EINVAL;
}
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// allocate KEVQ_TH
if (td->td_kevq_thred == NULL) {
kevq_th = malloc(sizeof(struct kevq_thred), M_KQUEUE, M_WAITOK | M_ZERO);
@ -2043,13 +2069,20 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
LIST_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e);
KEVQ_TH_UNLOCK(kevq_th);
KQ_UNLOCK(kq);
kqdom_insert(kqd, kevq);
KQD_UNLOCK(kqd);
} else {
to_free = alloc_kevq;
KEVQ_TH_UNLOCK(kevq_th);
KQD_UNLOCK(kqd);
KQ_UNLOCK(kq);
}
KEVQ_TH_UNLOCK(kevq_th);
KQD_UNLOCK(kqd);
KQ_UNLOCK(kq);
if (to_free != NULL) {
free(to_free, M_KQUEUE);
@ -2081,7 +2114,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
}
}
error = kevq_acquire(kevq);
error = kevq_acquire(kevq, 0);
if (!error) {
*kevqp = kevq;
@ -2105,13 +2138,13 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp)
if (((kq->kq_state) & KQ_CLOSING) != 0) {
return (EBADF);
}
if ((kq->kq_state & KQ_FLAG_INIT) == 0) {
kq->kq_state |= KQ_FLAG_INIT;
if ((kq->kq_flags & KQ_FLAG_INIT) == 0) {
kq->kq_flags |= KQ_FLAG_INIT;
}
kq->kq_refcnt++;
KQ_UNLOCK(kq);
if (((kq->kq_state & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) {
if (((kq->kq_flags & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) {
kqd = kqdom_build();
KQ_LOCK(kq);
if (kq->kq_kqd == NULL) {
@ -2230,82 +2263,142 @@ static void
kqdom_init(struct kqdom *kqd)
{
mtx_init(&kqd->kqd_lock, "kqdom_lock", NULL, MTX_DEF | MTX_DUPOK);
LIST_INIT(&kqd->kqd_kevqlist);
TAILQ_INIT(&kqd->children);
}
/* inserts a list*/
static void
kqdom_insert(struct kqdom *kqd, struct kevq *kevq)
{
int val;
struct kqdom* parent;
int oldcap;
struct kevq **expand;
KQD_OWNED(kqd);
KASSERT(kqd->num_children == 0, ("inserting into a non-leaf kqdom"));
LIST_INSERT_HEAD(&kqd->kqd_kevqlist, kevq, kqd_e);
/* TODO: don't hold the lock while doing the update */
parent = kqd;
while(parent != NULL) {
val = atomic_fetchadd_int(&parent->num_kevq, 1);
KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0"));
parent = parent->parent;
CTR4(KTR_KQ, "kqdom_insert: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap);
retry:
if (kqd->kqd_kevqcnt + 1 > kqd->kqd_kevqcap) {
CTR2(KTR_KQ, "kqdom_insert: expanding... kqd %d for kevq %p\n", kqd->id, kevq);
oldcap = kqd->kqd_kevqcap;
KQD_UNLOCK(kqd);
expand = malloc(sizeof(struct kqdom *) * (oldcap + KQDOM_EXTENT_FACTOR), M_KQUEUE, M_WAITOK | M_ZERO);
KQD_LOCK(kqd);
/* recheck if we need expansion, make sure old capacity didn't change */
if (kqd->kqd_kevqcap == oldcap) {
/* copy the content from the old list to this */
for(int i = 0; i < kqd->kqd_kevqcnt; i++) {
expand[i] = kqd->kqd_kevqlist[i];
}
free(kqd->kqd_kevqlist, M_KQUEUE);
kqd->kqd_kevqlist = expand;
kqd->kqd_kevqcap = oldcap + KQDOM_EXTENT_FACTOR;
} else {
/* some threads made changes while we allocated memory, retry */
free(expand, M_KQUEUE);
goto retry;
}
}
KQD_OWNED(kqd);
KASSERT(kqd->kqd_kevqcnt + 1 <= kqd->kqd_kevqcap, ("kqdom didn't expand properly"));
/* insert to list */
kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = kevq;
kqd->kqd_kevqcnt++;
}
/* removes a list */
static void
kqdom_remove(struct kqdom *kqd, struct kevq *kevq)
{
int val;
struct kqdom* parent;
int found;
KQD_OWNED(kqd);
KASSERT(kqd->num_children == 0, ("removing from a non-leaf kqdom"));
LIST_REMOVE(kevq, kqd_e);
/* TODO: don't hold the lock while doing the update */
parent = kqd;
while(parent != NULL) {
val = atomic_fetchadd_int(&parent->num_kevq, -1);
KASSERT(val >= 0, ("invalid num_kevq for kqdom <= 0"));
parent = parent->parent;
CTR4(KTR_KQ, "kqdom_remove: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap);
found = 0;
/* slow, but no need to optimize for delete */
for(int i = 0; i < kqd->kqd_kevqcnt; i++) {
if(kqd->kqd_kevqlist[i] == kevq) {
found = 1;
}
if(found && (i+1 < kqd->kqd_kevqcnt)) {
kqd->kqd_kevqlist[i] = kqd->kqd_kevqlist[i+1];
}
}
if (kqd->kqd_ckevq == kevq) {
kqd->kqd_ckevq = LIST_NEXT(kevq, kqd_e);
}
KASSERT(found, ("cannot find kevq from kqdom"));
kqd->kqd_kevqcnt--;
kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = NULL;
if (kqd->kqd_kevqcnt != 0)
kqd->kqd_ckevq = kqd->kqd_ckevq % kqd->kqd_kevqcnt;
else
kqd->kqd_ckevq = 0;
}
static void
kqdom_destroy(struct kqdom *root)
{
struct kqdom *kqdom, *tkqd;
TAILQ_FOREACH_SAFE(kqdom, &root->children, child_e, tkqd) {
kqdom_destroy(kqdom);
for(int i = 0; i < root->num_children; i++) {
kqdom_destroy(root->children[i]);
}
CTR2(KTR_KQ, "kqdom_destroy: destroyed kqdom %p with %d child kqdoms", root, root->num_children);
KASSERT(LIST_FIRST(&root->kqd_kevqlist) == NULL, ("freeing a kqdom with kevqs"));
if (root->kqd_kevqlist != NULL) {
KASSERT(root->kqd_kevqcnt == 0, ("freeing a kqdom with kevqs"));
free(root->kqd_kevqlist, M_KQUEUE);
}
if (root->children != NULL) {
free(root->children, M_KQUEUE);
}
KASSERT(root->num_active == 0, ("freeing a kqdom with active kevqs"));
free(root, M_KQUEUE);
}
static void
kqdom_update_stats(struct kqdom *leaf, struct timespec *avg)
kqdom_update_active(struct kqdom *leaf, int change)
{
struct timespec last_avg;
last_avg.tv_sec = avg->tv_sec;
last_avg.tv_nsec = avg->tv_nsec;
int oldval, newval;
KASSERT(change != 0, ("updating active 0"));
while (leaf != NULL) {
oldval = atomic_fetchadd_int(&leaf->num_active, change);
newval = oldval + change;
KASSERT(oldval >= 0 && newval >= 0, ("invalid oldval or newval after update"));
if (oldval == 0) {
change = 1;
CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id);
} else if (newval == 0) {
/* if new val is 0, we */
change = -1;
CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id);
} else {
break;
}
leaf = leaf->parent;
}
}
static void
kqdom_update_lat(struct kqdom *leaf, unsigned long avg)
{
while(leaf != NULL) {
KQD_LOCK(leaf);
if (leaf->avg_lat != 0) {
// bit rot race here?
leaf->avg_lat = CALC_OVERTIME_AVG(leaf->avg_lat, avg);
} else {
leaf->avg_lat = avg;
}
CALC_OVERTIME_AVG(&leaf->kqd_avg_lat, &last_avg, &leaf->kqd_avg_lat);
CTR3(KTR_KQ, "kqdom_update_stats: updated avg lat %ld sec %ld for kqdom %d",
leaf->kqd_avg_lat.tv_sec, leaf->kqd_avg_lat.tv_nsec, leaf->id);
last_avg.tv_sec = leaf->kqd_avg_lat.tv_sec;
last_avg.tv_nsec = leaf->kqd_avg_lat.tv_nsec;
KQD_UNLOCK(leaf);
CTR2(KTR_KQ, "kqdom_update_lat: updated avg lat %ld us for kqdom %d", leaf->avg_lat, leaf->id);
leaf = leaf->parent;
}
@ -2325,6 +2418,7 @@ kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_i
(*kqd_id)++;
kqd_cur->num_children = cg_numchild;
CPU_COPY(&cg_cur->cg_mask, &kqd_cur->cpu_mask);
kqd_cur->children = malloc(sizeof(struct kqdom *) * cg_numchild, M_KQUEUE, M_WAITOK | M_ZERO);
for (int i = 0; i < cg_numchild; i++) {
child = malloc(sizeof(struct kqdom), M_KQUEUE, M_WAITOK | M_ZERO);
@ -2332,7 +2426,7 @@ kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_i
child->parent = kqd_cur;
TAILQ_INSERT_TAIL(&kqd_cur->children, child, child_e);
kqd_cur->children[i] = child;
kqdom_build_internal(child, &cg_cur->cg_child[i], kqd_id);
}
}
@ -2351,16 +2445,14 @@ kqdom_build()
static struct kqdom *
kqdom_find(struct kqdom *root, int cpuid)
{
struct kqdom *child, *tchild;
if (root->num_children == 0) {
KASSERT(CPU_ISSET(cpuid, &root->cpu_mask), ("kqdom_find: cpuid and cpumask mismatch"));
return root;
}
TAILQ_FOREACH_SAFE(child, &root->children, child_e, tchild) {
if (CPU_ISSET(cpuid, &child->cpu_mask)) {
return kqdom_find(child, cpuid);
for(int i = 0; i < root->num_children; i++) {
if(CPU_ISSET(cpuid, &root->children[i]->cpu_mask)) {
return kqdom_find(root->children[i], cpuid);
}
}
@ -2463,6 +2555,89 @@ kqueue_task(void *arg, int pending)
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
}
static void
kevq_worksteal(struct kevq *kevq)
{
struct kevq *other_kevq;
struct kqueue *kq;
struct knote *ws_kn, *next_kn;
struct knlist *knl;
int ws_count, valid;
TAILQ_HEAD(, knote) kn_wsq;
KEVQ_OWNED(kevq);
TAILQ_INIT(&kn_wsq);
ws_count = 0;
kq = kevq->kq;
KEVQ_UNLOCK(kevq);
/* todo maybe from cur kqdomain instead of from root */
other_kevq = kqdom_random_kevq_locked(kq->kq_kqd, kevq);
if (other_kevq != NULL && other_kevq != kevq && other_kevq->kn_count > 0) {
CTR3(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p with %d knotes", kevq, other_kevq, other_kevq->kn_count);
ws_kn = TAILQ_FIRST(&other_kevq->kn_head);
while(ws_count < kq_sched_ws_count && ws_kn != NULL) {
KEVQ_OWNED(other_kevq);
next_kn = TAILQ_NEXT(ws_kn, kn_tqe);
/* don't care about markers */
if ((ws_kn->kn_status & KN_MARKER) != 0) {
goto end_loop;
}
KN_FLUX_LOCK(ws_kn);
/* ignore influx, inactive and disabled */
if (kn_in_flux(ws_kn) || (ws_kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_REQUEUE)) != (KN_ACTIVE)) {
KN_FLUX_UNLOCK(ws_kn);
goto end_loop;
}
knote_enter_flux(ws_kn);
KN_FLUX_UNLOCK(ws_kn);
/* Remove from the old kevq first, due to lock order */
knote_dequeue(ws_kn);
KEVQ_UNLOCK(other_kevq);
/* validate event */
knl = kn_list_lock(ws_kn);
valid = ws_kn->kn_fop->f_event(ws_kn, 0);
kn_list_unlock(knl);
if (valid) {
TAILQ_INSERT_TAIL(&kn_wsq, ws_kn, kn_wse);
ws_count++;
}
KEVQ_LOCK(other_kevq);
if (!valid) {
/* if not valid, return it to the previous queue */
knote_enqueue(ws_kn, other_kevq);
KN_LEAVE_FLUX_WAKEUP(ws_kn);
}
end_loop:
ws_kn = next_kn;
}
}
if (other_kevq != NULL) {
KEVQ_UNLOCK(other_kevq);
}
KEVQ_LOCK(kevq);
while(!TAILQ_EMPTY(&kn_wsq)) {
ws_kn = TAILQ_FIRST(&kn_wsq);
TAILQ_REMOVE(&kn_wsq, ws_kn, kn_wse);
knote_enqueue(ws_kn, kevq);
KN_LEAVE_FLUX_WAKEUP(ws_kn);
CTR4(KTR_KQ, "kevq_worksteal: kevq %p stole kn %p, ident: %d from kevq %p", kevq, ws_kn, ws_kn->kn_id, other_kevq);
}
}
/*
* Scan, update kn_data (if not ONESHOT), and copyout triggered events.
* We treat KN_MARKER knotes as if they are in flux.
@ -2471,12 +2646,14 @@ static int
kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
const struct timespec *tsp, struct kevent *keva, struct thread *td)
{
struct kqueue *kq;
struct kevent *kevp;
struct knote *kn, *marker;
struct knlist *knl;
sbintime_t asbt, rsbt;
int count, error, haskqglobal, influx, nkev, touch;
kq = kevq->kq;
count = maxevents;
nkev = 0;
error = 0;
@ -2517,11 +2694,20 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
if ((kevq->kevq_state & KEVQ_RDY) == 0) {
/* Mark the kevq as ready to receive events */
kevq->kevq_state |= KEVQ_RDY;
kqdom_update_active(kevq->kevq_kqd, 1);
}
retry:
kevp = keva;
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
if ((kq->kq_flags & KQ_FLAG_MULTI) != 0 && (kq->kq_sched_flags & KQ_SCHED_WORK_STEALING) != 0 && kevq->kn_count == 0)
{
/* try work stealing */
kevq_worksteal(kevq);
}
KEVQ_OWNED(kevq);
if (kevq->kn_count == 0) {
if (asbt == -1) {
error = EWOULDBLOCK;
@ -2573,7 +2759,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn);
if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) {
kn->kn_status &= ~KN_QUEUED;
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
continue;
@ -2592,7 +2778,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
("knote %p is unexpectedly in flux", kn));
if ((kn->kn_flags & EV_DROP) == EV_DROP) {
kn->kn_status &= ~KN_QUEUED;
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
knote_enter_flux(kn);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
@ -2605,7 +2791,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
continue;
} else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) {
kn->kn_status &= ~KN_QUEUED;
kn->kn_status &= ~(KN_QUEUED | KN_REQUEUE);
knote_enter_flux(kn);
kevq->kn_count--;
KN_FLUX_UNLOCK(kn);
@ -2633,12 +2819,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
KEVQ_LOCK(kevq);
KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE |
KN_SCAN);
KN_SCAN | KN_REQUEUE);
knote_leave_flux_ul(kn);
kevq->kn_count--;
kn_list_unlock(knl);
influx = 1;
CTR3(KTR_KQ, "kqueue_scan: kn %p not valid anymore for kevq %p, td %d", kn, kevq, td->td_tid);
CTR4(KTR_KQ, "kqueue_scan: kn %p, ident: %d not valid anymore for kevq %p, td %d", kn, kn->kn_id, kevq, td->td_tid);
continue;
}
touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL);
@ -2659,10 +2845,15 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
if (kn->kn_flags & EV_DISPATCH)
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_REQUEUE);
kevq->kn_count--;
} else {
CTR2(KTR_KQ, "kqueue_scan: requeued kn %p to kevq %p", kn, kevq);
/* this flag is here to prevent a subtle workstealing race where one thread gets an identifier
and returns, before it can process the event, another thread steals the knote and
processes the same fd, resulting in the first thread having no data available.
Work stealing will avoid stealing knotes with this flag set*/
kn->kn_status |= KN_REQUEUE;
CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq);
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
}
@ -2771,11 +2962,11 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
switch (cmd) {
case FKQMULTI:
KQ_LOCK(kq);
if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) {
if ((kq->kq_flags & KQ_FLAG_INIT) == KQ_FLAG_INIT) {
error = (EINVAL);
} else {
CTR2(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p, scheduler flags: %d", kq, *(int*)data);
kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI);
kq->kq_flags |= (KQ_FLAG_INIT | KQ_FLAG_MULTI);
kq->kq_sched_flags = *(int*)data;
}
KQ_UNLOCK(kq);
@ -2801,7 +2992,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
return POLLERR;
KQ_LOCK(kq);
if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) {
if ((kq->kq_flags & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) {
revents = 0;
} else {
if (events & (POLLIN | POLLRDNORM)) {
@ -2906,7 +3097,7 @@ kevq_drain(struct kevq *kevq)
knote_dequeue(kn);
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) {
KEVQ_UNLOCK(kevq);
/* TODO: When we knote activate, if the ev has EV_CLEAR set, maybe we shouldn't activate the event
* if there hasn't been activities on the fd
@ -2937,7 +3128,7 @@ kevq_drain(struct kevq *kevq)
// and will be dequeued later (kn->kn_kevq will be set to another valid kevq)
//
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// drop from KQ Domain
KQ_LOCK(kq);
KQD_LOCK(kqd);
@ -2949,6 +3140,9 @@ kevq_drain(struct kevq *kevq)
LIST_REMOVE(kevq, kevq_th_e);
// detach from kqdom
if((kevq->kevq_state & KEVQ_RDY) != 0) {
kqdom_update_active(kqd, -1);
}
kqdom_remove(kqd, kevq);
// detach from kqueue
@ -3038,10 +3232,10 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
}
// destroy kqdoms and kevqs
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
while((kevq = LIST_FIRST(&kq->kq_kevqlist)) != NULL) {
KQ_UNLOCK(kq);
if (kevq_acquire(kevq) == 0)
if (kevq_acquire(kevq, 0) == 0)
kevq_drain(kevq);
KQ_LOCK(kq);
}
@ -3099,7 +3293,7 @@ kqueue_close(struct file *fp, struct thread *td)
int error;
int filedesc_unlock;
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// only acquire the kqueue lock here
if ((error = kqueue_acquire(fp, &kq)))
return error;
@ -3691,6 +3885,89 @@ knote_drop_detached(struct knote *kn, struct thread *td)
knote_free(kn);
}
static struct kevq *
kqdom_random_kevq_locked(struct kqdom* kqd, struct kevq* last_kevq)
{
struct kqdom *each_child, *last_child;
struct kevq *kevq, *each_kevq;
int num_active, init_idx;
u_long random;
/* fall-back with the last child in case there is a race */
last_child = NULL;
kevq = NULL;
while (kqd->num_children > 0) {
/* read once */
num_active = kqd->num_active;
if (num_active == 0) {
/* if we got to a child and now it doesn't have any active children, then return NULL
this happens either on the first loop or due to a race of kevq deletion */
return NULL;
}
random = kqueue_frand() % num_active;
KASSERT(random < kqd->num_children, ("more active children than total children"));
for(int i = 0; i < kqd->num_children; i++) {
each_child = kqd->children[i];
if (each_child->num_active > 0) {
/* if the child suits our need */
last_child = each_child;
if (random == 0) {
kqd = each_child;
break;
}
random--;
}
if (i == kqd->num_children) {
kqd = last_child;
}
}
}
if (kqd != NULL) {
CTR3(KTR_KQ, "kqdom_random_kevq_locked: selected kqd %d, # children %p, last_kevq %p", kqd->id, kqd->kqd_kevqcnt, last_kevq);
KQD_LOCK(kqd);
if (kqd->kqd_kevqcnt != 0) {
random = kqueue_frand() % kqd->kqd_kevqcnt;
init_idx = random;
each_kevq = kqd->kqd_kevqlist[random];
while(1) {
/* fast fail */
if (KEVQ_AVAIL(each_kevq) && each_kevq != last_kevq) {
KEVQ_LOCK(each_kevq);
if (KEVQ_AVAIL(each_kevq)) {
kevq = each_kevq;
break;
}
KEVQ_UNLOCK(each_kevq);
}
random = (random + 1) % kqd->kqd_kevqcnt;
if (random == init_idx) {
break;
}
each_kevq = kqd->kqd_kevqlist[random];
}
}
KQD_UNLOCK(kqd);
}
if (kevq != NULL) {
KEVQ_OWNED(kevq);
}
CTR2(KTR_KQ, "kqdom_random_kevq_locked: selected kevq %p, last_kevq %p", kevq, last_kevq);
return kevq;
}
/* select the next kevq based on knote and scheduler flags and locks the returned kevq */
static struct kevq *
@ -3698,14 +3975,15 @@ knote_next_kevq(struct knote *kn)
{
struct kqdom *kqd;
struct kqueue *kq;
struct kevq *next_kevq;
struct kevq *next_kevq, *sel_kevq;
int cur_kevq;
next_kevq = NULL;
kq = kn->kn_kq;
CTR1(KTR_KQ, "knote_next_kevq: processing kn %p", kn);
if ((kq->kq_state & KQ_FLAG_MULTI) == 0) {
if ((kq->kq_flags & KQ_FLAG_MULTI) == 0) {
// single threaded mode, just return the current kevq
KQ_LOCK(kn->kn_kq);
if ((kq->kq_state & KQ_CLOSING) == 0)
@ -3731,27 +4009,96 @@ knote_next_kevq(struct knote *kn)
return next_kevq;
}
if ((kq->kq_sched_flags & KQ_SCHED_QUEUE) != 0) {
if (kn->kn_kqd == NULL) {
/* the first time knote is queued, record the kqdom */
kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid));
KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2"));
CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] knote %p attached to kqdom id %d", kn, kn->kn_kqd->id);
if ((kq->kq_sched_flags & KQ_SCHED_BEST_OF_N) != 0) {
kqd = kq->kq_kqd;
for(int i = 0; i < kq_sched_bon_count; i++) {
sel_kevq = kqdom_random_kevq_locked(kqd, next_kevq);
if (sel_kevq != NULL) {
KEVQ_OWNED(sel_kevq);
CTR2(KTR_KQ, "knote_next_kevq: [BON] selected random kevq %p for kn %p", sel_kevq, kn);
if (next_kevq == NULL && kevq_acquire(sel_kevq, 1) == 0) {
next_kevq = sel_kevq;
KEVQ_UNLOCK(sel_kevq);
} else {
// compare their avg wait time
// TODO: refactor the unlock pattern here
if (sel_kevq->kevq_avg_lat * sel_kevq->kn_count < next_kevq->kevq_avg_lat * next_kevq->kn_count) {
if (kevq_acquire(sel_kevq, 1) == 0) {
KEVQ_UNLOCK(sel_kevq);
kevq_release(next_kevq, 0);
next_kevq = sel_kevq;
} else {
KEVQ_UNLOCK(sel_kevq);
}
} else {
KEVQ_UNLOCK(sel_kevq);
}
}
CTR2(KTR_KQ, "knote_next_kevq: [BON] current best kevq %p, avg wait time: %d", next_kevq, next_kevq->kevq_avg_lat * next_kevq->kn_count);
}
}
kqd = kn->kn_kqd;
if (next_kevq != NULL) {
KEVQ_LOCK(next_kevq);
kevq_release(next_kevq, 1);
// recheck availability
if (!KEVQ_AVAIL(next_kevq)) {
KEVQ_UNLOCK(next_kevq);
next_kevq = NULL;
}
}
CTR2(KTR_KQ, "knote_next_kevq: [BON] next kevq %p for kn %p", next_kevq, kn);
}
if ((next_kevq == NULL) && (kq->kq_sched_flags & KQ_SCHED_QUEUE) != 0) {
if((kq->kq_sched_flags & KQ_SCHED_QUEUE_CPU) != 0) {
kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid));
} else {
if (kn->kn_kqd == NULL) {
/* the first time knote is queued, record the kqdom */
kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid));
KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2"));
CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] knote %p attached to kqdom id %d", kn, kn->kn_kqd->id);
}
kqd = kn->kn_kqd;
}
KQD_LOCK(kqd);
cur_kevq = kqd->kqd_ckevq;
KEVQ_NEXT_AVAIL_LOCKED(next_kevq, &kqd->kqd_kevqlist, kqd->kqd_ckevq, kqd_e);
kqd->kqd_ckevq = next_kevq;
while(1) {
if (kqd->kqd_kevqcnt == 0) {
break;
}
cur_kevq = (cur_kevq + 1) % kqd->kqd_kevqcnt;
next_kevq = kqd->kqd_kevqlist[cur_kevq];
if (KEVQ_AVAIL(next_kevq)) {
/* fast fail */
KEVQ_LOCK(next_kevq);
if (KEVQ_AVAIL(next_kevq)) {
kqd->kqd_ckevq = cur_kevq;
break;
}
KEVQ_UNLOCK(next_kevq);
}
if (cur_kevq == kqd->kqd_ckevq) {
next_kevq = NULL;
break;
}
}
KQD_UNLOCK(kqd);
CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] next kevq %p for kn %p", next_kevq, kn);
}
// generic round-robbin
// fall-back round-robbin
if (next_kevq == NULL) {
KQ_LOCK(kq);

View File

@ -283,7 +283,11 @@ struct filterops {
/*
* KQ scheduler flags
*/
#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */
#define KQ_SCHED_QUEUE 0x01 /* make kq affinitize the knote depending on the first cpu it's scheduled to */
#define KQ_SCHED_QUEUE_CPU 0x02 /* make kq affinitize the knote depending on the runtime cpu it's scheduled to */
#define KQ_SCHED_WORK_STEALING 0x04
#define KQ_SCHED_BEST_OF_N 0x08
#define KQ_SCHED_GREEDY 0x16
/*
* An in-flux knote cannot be dropped from its kq while the kq is
@ -299,6 +303,7 @@ struct knote {
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
TAILQ_ENTRY(knote) kn_wse; /* for work stealing queue */
struct kqueue *kn_kq; /* which kqueue we are on */
struct kevq *kn_kevq; /* the kevq the knote is on */
/* used by the scheduler */
@ -316,6 +321,7 @@ struct knote {
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
#define KN_REQUEUE 0x200 /* knote has triggered and is requeued to the current queue */
int kn_fluxwait;
int kn_influx;
struct mtx kn_fluxlock;

View File

@ -40,9 +40,10 @@
#define KQ_NEVENTS 8 /* minimize copy{in,out} calls */
#define KQEXTENT 256 /* linear growth by this amount */
#define KQDOM_EXTENT_FACTOR 8 /* linear growth by this amount */
struct kevq {
LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */
LIST_ENTRY(kevq) kqd_e; /* entry into kqdomain */
LIST_ENTRY(kevq) kq_e; /* entry into kq */
LIST_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's kevq_list */
struct kqueue *kq; /* the kq that the kevq belongs to */
@ -58,42 +59,50 @@ struct kevq {
int kevq_refcnt;
/* Used by the scheduler */
struct timespec kevq_avg_lat;
unsigned long kevq_avg_lat;
struct timespec kevq_last_kev;
int kevq_last_nkev;
};
/* TODO: assumed that threads don't get rescheduled across cores */
struct kqdom {
/* static */
struct mtx kqd_lock;
TAILQ_ENTRY(kqdom) child_e;
struct kqdom *parent;
int id;
struct timespec kqd_avg_lat;
cpuset_t cpu_mask;
int num_children;
int num_kevq;
TAILQ_HEAD(, kqdom) children;
struct kevqlist kqd_kevqlist; /* list of kevqs on the kdomain, only set for leaf domains*/
struct kevq *kqd_ckevq;
struct kqdom **children;
/* statistics */
unsigned long avg_lat;
int num_active; /* total number of active children below this node */
/* dynamic members*/
struct kevq **kqd_kevqlist; /* array list of kevqs on the kdomain, only set for leaf domains */
int kqd_kevqcap;
int kqd_kevqcnt;
int kqd_ckevq;
};
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
int kq_refcnt;
struct selinfo kq_sel;
int kq_state;
int kq_state;
#define KQ_SEL 0x01
#define KQ_ASYNC 0x02
#define KQ_TASKSCHED 0x04 /* task scheduled */
#define KQ_TASKDRAIN 0x08 /* waiting for task to drain */
#define KQ_CLOSING 0x10
#define KQ_FLAG_INIT 0x20 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */
#define KQ_FLAG_MULTI 0x40 /* Multi-threaded mode */
int kq_flags;
#define KQ_FLAG_INIT 0x01 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */
#define KQ_FLAG_MULTI 0x02 /* Multi-threaded mode */
TAILQ_ENTRY(kqueue) kq_list;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_knlistsize; /* size of knlist */
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
@ -105,7 +114,7 @@ struct kqueue {
struct kevqlist kq_kevqlist; /* list of kevqs for fall-back round robbin */
struct kqdom *kq_kqd; /* root domain */
struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue, used for round robbin */
int kq_sched_flags; /* Scheduler flag for the KQ */
int kq_sched_flags; /* Scheduler flag for the KQ */
};
#endif /* !_SYS_EVENTVAR_H_ */

View File

@ -33,13 +33,18 @@
/*
* KQ scheduler flags
*/
#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */
#define KQ_SCHED_QUEUE 0x01 /* make kq affinitize the knote depending on the first cpu it's scheduled to */
#define KQ_SCHED_QUEUE_CPU 0x02 /* make kq affinitize the knote depending on the runtime cpu it's scheduled to */
#define KQ_SCHED_WORK_STEALING 0x04
#define KQ_SCHED_BEST_OF_N 0x08
#define KQ_SCHED_GREEDY 0x16
//#define TEST_DEBUG
struct thread_info {
pthread_t thrd;
int can_crash;
int ws_master;
pthread_mutex_t lock;
int group_id;
int evcnt;
@ -89,7 +94,7 @@ socket_pop(int sockfd)
/* Drain the read buffer, then make sure there are no more events. */
#ifdef TEST_DEBUG
printf("READ_M: popping the read buffer\n");
printf("READ_M: popping the read buffer of sock %d\n", sockfd);
#endif
if (read(sockfd, &buf, 1) < 1)
err(1, "read(2)");
@ -432,6 +437,139 @@ test_socket_queue(void)
success();
}
/***************************
* WS test
***************************/
#define SOCK_WS_CNT (1000)
volatile int ws_good = 0;
static void*
test_socket_ws_worker(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
int ws_num = 0;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
dat = socket_pop(ret->ident);
free(ret);
if (info->ws_master == 0) {
/*if we are the master, wait for slave to signal us*/
while(!ws_good) {
usleep(500);
}
break;
} else {
ws_num++;
if (ws_num == SOCK_WS_CNT - 1) {
ws_good = 1;
break;
}
}
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
pthread_exit(0);
}
int ws_sockfd[SOCK_WS_CNT][2];
static void
test_socket_ws()
{
struct kevent kev;
struct thread_info thrd_info[2];
const char *test_id = "[Multi][WS]kevent(evfilt)";
cpuset_t cpuset;
test_begin(test_id);
for (int i = 0; i < SOCK_WS_CNT; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &ws_sockfd[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &ws_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_add");
}
}
srand(time(NULL));
#ifdef TEST_DEBUG
printf("READ_M: creating master thread...\n");
#endif
for (int i = 0; i < 1; i++) {
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
}
sleep(3);
for(int i = 0; i < SOCK_WS_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: pusing 1 packet to sock %d\n", i);
#endif
socket_push(ws_sockfd[i][1], '.');
}
sleep(1);
for(int i = 1; i < 2; i++) {
#ifdef TEST_DEBUG
printf("READ_M: creating slave thread...\n");
#endif
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
if (pthread_setaffinity_np(thrd_info[i].thrd, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
}
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to exit...\n");
#endif
for (int i = 0; i < 2; i++) {
pthread_join(thrd_info[i].thrd, NULL);
}
for (int i = 0; i < SOCK_WS_CNT; i++) {
EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &ws_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_delete");
}
}
success();
}
/***************************
* Brutal test
@ -465,6 +603,9 @@ test_socket_brutal_worker(void* args)
#endif
if ((rand() % 100) < THREAD_EXIT_PROB) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d checking fake crash\n", info->tid);
#endif
pthread_mutex_lock(&info->lock);
#ifdef TEST_DEBUG
printf("READ_M: thread %d trying to fake crash. Can crash: %d\n", info->tid, info->can_crash);
@ -478,6 +619,10 @@ test_socket_brutal_worker(void* args)
pthread_mutex_unlock(&info->lock);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d ident: %ld\n", info->tid, ret->ident);
#endif
dat = socket_pop(ret->ident);
free(ret);
@ -529,6 +674,7 @@ test_socket_brutal()
brute_threadinfo[i].tid = i;
brute_threadinfo[i].evcnt = 0;
brute_threadinfo[i].can_crash = ((i % 10) != 0);
pthread_mutex_init(&brute_threadinfo[i].lock, NULL);
pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]);
}
@ -574,6 +720,7 @@ test_socket_brutal()
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
pthread_join(brute_threadinfo[i].thrd, NULL);
pthread_mutex_destroy(&brute_threadinfo[i].lock);
}
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
@ -612,4 +759,21 @@ test_evfilt_read_m()
test_socket_brutal();
close(g_kqfd);
flags = KQ_SCHED_BEST_OF_N;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
test_socket_brutal();
close(g_kqfd);
flags = KQ_SCHED_WORK_STEALING;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
test_socket_ws();
test_socket_brutal();
close(g_kqfd);
}