improve best of 2 (still slow)

This commit is contained in:
Charlie Root 2019-08-23 15:30:30 -04:00
parent b4d0670a1f
commit 8e73e73197
6 changed files with 500 additions and 337 deletions

View File

@ -73,6 +73,9 @@ __FBSDID("$FreeBSD$");
#include <sys/user.h>
#include <sys/ktr.h>
#include <sys/smp.h>
#include <sys/veclist.h>
#include <sys/stdint.h>
#include <sys/libkern.h>
#ifdef KTRACE
#include <sys/ktrace.h>
#endif
@ -135,6 +138,18 @@ extern struct cpu_group *cpu_top;
} \
} while(0)
static inline int
need_track_latency(struct kqueue *kq)
{
return (kq->kq_flags & KQ_FLAG_MULTI) != 0 && (kq->kq_sched_flags & KQ_SCHED_BEST_OF_N) != 0;
}
static inline uint64_t
timespec_to_ns(struct timespec *spec)
{
return spec->tv_nsec + spec->tv_sec * 1000000;
}
static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq);
static void kevq_thred_init(struct kevq_thred *kevq_th);
static void kevq_thred_destroy(struct kevq_thred *kevq_th);
@ -144,11 +159,9 @@ static void kevq_release(struct kevq* kevq, int locked);
static void kevq_destroy(struct kevq *kevq);
static int kevq_acquire(struct kevq *kevq, int locked);
static void kevq_worksteal(struct kevq *kevq);
void kevq_drain(struct kevq *kevq);
void kevq_drain(struct kevq *kevq, struct thread *td);
static int kqueue_acquire_kevq(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevq);
static void knote_xinit(struct knote *kn);
static int kevent_copyout(void *arg, struct kevent *kevp, int count);
static int kevent_copyin(void *arg, struct kevent *kevp, int count);
static int kqueue_register(struct kqueue *kq, struct kevq *kevq,
@ -213,11 +226,11 @@ static void knote_sched(struct knote *kn);
static void kqdom_init(struct kqdom *kqd);
static void kqdom_update_lat(struct kqdom *leaf, unsigned long avg);
static void kqdom_update_active(struct kqdom *leaf, int change);
static void kqdom_update_parents(struct kqdom *leaf, int direction);
static void kqdom_insert(struct kqdom *kqd, struct kevq *kevq);
static void kqdom_remove(struct kqdom *kqd, struct kevq *kevq);
static void kqdom_destroy(struct kqdom *root);
static struct kevq * kqdom_random_kevq_locked(struct kqdom *kqd, struct kevq *last_kevq);
static struct kevq * kqdom_random_kevq_locked(struct kqdom *kqd);
static void kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id);
static struct kqdom * kqdom_build(void);
static struct kqdom * kqdom_find(struct kqdom *root, int cpuid);
@ -249,19 +262,6 @@ SYSCTL_INT(_kern, OID_AUTO, kq_sched_bon_count, CTLFLAG_RWTUN, &kq_sched_bon_cou
static int kq_sched_ws_count = 1;
SYSCTL_INT(_kern, OID_AUTO, kq_sched_ws_count, CTLFLAG_RWTUN, &kq_sched_ws_count, 0, "the number of kevqs to steal each time");
// hacky fast random generator
static unsigned int g_seed = 0x1234;
// Used to seed the generator.
static void kqueue_fsrand(int seed) {
g_seed = seed;
}
// Compute a pseudorandom integer.
// Output value in range [0, 32767]
static int kqueue_frand(void) {
g_seed = (214013 * g_seed + 2531011);
return (g_seed>>16) & 0x7FFF;
}
static struct filterops file_filtops = {
.f_isfd = 1,
.f_attach = filt_fileattach,
@ -420,8 +420,8 @@ knote_enter_flux_ul(struct knote *kn)
static void
knote_enter_flux(struct knote *kn)
{
CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx);
KN_FLUX_OWNED(kn);
/* CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx); */
KN_FLUX_OWNED(kn);
MPASS(kn->kn_influx < INT_MAX);
kn->kn_influx++;
}
@ -441,13 +441,13 @@ knote_leave_flux_ul(struct knote *kn)
static bool
knote_leave_flux(struct knote *kn)
{
CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx);
/* CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx); */
KN_FLUX_OWNED(kn);
MPASS(kn->kn_influx > 0);
kn->kn_influx--;
return (kn->kn_influx == 0);
}
return (kn->kn_influx == 0);
}
#define KNL_ASSERT_LOCK(knl, islocked) do { \
if (islocked) \
@ -1415,6 +1415,7 @@ kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq)
return kevq_found;
}
static int
kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchanges, int nevents,
struct kevent_copyops *k_ops, const struct timespec *timeout)
@ -1423,7 +1424,7 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan
struct kevent *kevp, *changes;
struct timespec cur_ts;
int i, n, nerrors, error;
unsigned long avg;
uint64_t avg;
if ((kq->kq_flags & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) {
/* Mark the global kevq as ready for single threaded mode to close the window between
@ -1433,31 +1434,36 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan
KEVQ_UNLOCK(kevq);
}
KEVQ_LOCK(kevq);
/* prob don't need the lock here as these are only accessible by one thread */
if (kevq->kevq_last_nkev != 0)
if (need_track_latency(kq))
{
/* make sure we actually processed events last time */
getnanouptime(&cur_ts);
timespecsub(&cur_ts, &kevq->kevq_last_kev, &cur_ts);
/* only need to do track the average latency for BON */
KEVQ_LOCK(kevq);
/* divide by the number of events processed */
avg = (cur_ts.tv_sec * 1000000 + cur_ts.tv_nsec / 100) / kevq->kevq_last_nkev;
/* prob don't need the lock here as these are only accessible by one thread */
if (kevq->kevq_last_nkev != 0)
{
/* make sure we actually processed events last time */
getnanouptime(&cur_ts);
timespecsub(&cur_ts, &kevq->kevq_last_kev, &cur_ts);
if (kevq->kevq_avg_lat != 0) {
kevq->kevq_avg_lat = CALC_OVERTIME_AVG(kevq->kevq_avg_lat, avg);
} else {
kevq->kevq_avg_lat = avg;
/* divide by the number of events processed */
avg = timespec_to_ns(&cur_ts) / kevq->kevq_last_nkev;
if (kevq->kevq_avg_lat != 0) {
kevq->kevq_avg_lat = CALC_OVERTIME_AVG(kevq->kevq_avg_lat, avg);
} else {
kevq->kevq_avg_lat = avg;
}
CTR3(KTR_KQ, "kevent: td %d spent %ld us per event on %d events", td->td_tid, avg, kevq->kevq_last_nkev);
/* clear parameters */
timespecclear(&kevq->kevq_last_kev);
kevq->kevq_last_nkev = 0;
kqdom_update_lat(kevq->kevq_kqd, avg);
}
CTR3(KTR_KQ, "kevent: td %d spent %ld us per event on %d events", td->td_tid, avg, kevq->kevq_last_nkev);
/* clear parameters */
timespecclear(&kevq->kevq_last_kev);
kevq->kevq_last_nkev = 0;
kqdom_update_lat(kevq->kevq_kqd, avg);
KEVQ_UNLOCK(kevq);
}
KEVQ_UNLOCK(kevq);
nerrors = 0;
while (nchanges > 0) {
@ -1777,7 +1783,6 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct
error = ENOMEM;
goto done;
}
knote_xinit(kn);
kn->kn_kevq = kevq;
// this is set later depending on the scheduled CPU
kn->kn_kqd = NULL;
@ -1910,6 +1915,7 @@ kevq_thred_init(struct kevq_thred *kevq_th) {
static void
kevq_thred_destroy(struct kevq_thred *kevq_th) {
mtx_destroy(&kevq_th->lock);
free(kevq_th->kevq_hash, M_KQUEUE);
free(kevq_th, M_KQUEUE);
CTR1(KTR_KQ, "kevq_thred_destroy: freed kevq_th %p", kevq_th);
@ -1926,7 +1932,7 @@ kevq_thred_drain(struct kevq_thred *kevq_th, struct thread* td) {
if (kevq_acquire(kevq, 0) == 0) {
CTR2(KTR_KQ, "kevq_thred_drain: draining kevq %p on kevq_th %p", kevq, kevq_th);
KEVQ_TH_UNLOCK(kevq_th);
kevq_drain(kevq);
kevq_drain(kevq, td);
KEVQ_TH_LOCK(kevq_th);
}
}
@ -1949,7 +1955,7 @@ kevq_release(struct kevq* kevq, int locked)
KEVQ_OWNED(kevq);
else
KEVQ_LOCK(kevq);
CTR2(KTR_KQ, "releasing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt);
/* CTR2(KTR_KQ, "releasing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt); */
kevq->kevq_refcnt--;
if (kevq->kevq_refcnt == 1)
wakeup(&kevq->kevq_refcnt);
@ -1967,7 +1973,7 @@ kevq_acquire(struct kevq *kevq, int locked)
KEVQ_LOCK(kevq);
}
error = 0;
CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt);
/* CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt); */
if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) {
error = EINVAL;
} else {
@ -2046,11 +2052,11 @@ kqueue_obtain_kevq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
CTR4(KTR_KQ, "kqueue_ensure_kevq(M): allocated kevq %p for thread %d (oncpu = %d), kqdom %d", alloc_kevq, td->td_tid, td->td_oncpu, kqd->id);
KQ_LOCK(kq);
KQD_LOCK(kqd);
KEVQ_TH_LOCK(kevq_th);
KQD_LOCK(kqd);
kevq = kevqlist_find(kevq_list, kq);
/* TODO: probably don't need to re-check unless a thread can asynchronously call
* kevent (signal handler?) */
/* TODO: probably don't need to re-check */
if (kevq == NULL) {
kevq = alloc_kevq;
// insert kevq to the kevq_th hash table
@ -2069,8 +2075,8 @@ kqueue_obtain_kevq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
} else {
to_free = alloc_kevq;
KEVQ_TH_UNLOCK(kevq_th);
KQD_UNLOCK(kqd);
KEVQ_TH_UNLOCK(kevq_th);
KQ_UNLOCK(kq);
}
@ -2234,128 +2240,156 @@ kqdom_next_leaf(struct kqdom *kqd)
static void
kqdom_init(struct kqdom *kqd)
{
veclist_init(&kqd->children, NULL, 0);
veclist_init(&kqd->kqd_activelist, NULL, 0);
veclist_init(&kqd->kqd_kevqs, NULL, 0);
mtx_init(&kqd->kqd_lock, "kqdom_lock", NULL, MTX_DEF | MTX_DUPOK);
}
/* inserts a list*/
static int
kqdom_is_leaf(struct kqdom *kqd)
{
return veclist_size(&kqd->children) == 0;
}
/* inserts a kevq into a leaf kqdom */
static void
kqdom_insert(struct kqdom *kqd, struct kevq *kevq)
{
int oldcap;
struct kevq **expand;
KQD_OWNED(kqd);
KASSERT(kqd->num_children == 0, ("inserting into a non-leaf kqdom"));
CTR4(KTR_KQ, "kqdom_insert: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap);
int oldcap, newcap;
void **expand;
KQD_OWNED(kqd);
KASSERT(kqdom_is_leaf(kqd), ("inserting into a non-leaf kqdom"));
CTR2(KTR_KQ, "kqdom_insert: kevq: %p kqdom %d", kevq, kqd->id);
/* expand the kqdom if needed */
retry:
if (kqd->kqd_kevqcnt + 1 > kqd->kqd_kevqcap) {
if (veclist_need_exp(&kqd->kqd_kevqs)) {
CTR2(KTR_KQ, "kqdom_insert: expanding... kqd %d for kevq %p\n", kqd->id, kevq);
oldcap = kqd->kqd_kevqcap;
oldcap = veclist_cap(&kqd->kqd_kevqs);
KQD_UNLOCK(kqd);
expand = malloc(sizeof(struct kqdom *) * (oldcap + KQDOM_EXTENT_FACTOR), M_KQUEUE, M_WAITOK | M_ZERO);
newcap = oldcap + KQDOM_EXTENT;
expand = malloc(sizeof(struct kqdom *) * newcap, M_KQUEUE, M_WAITOK | M_ZERO);
KQD_LOCK(kqd);
/* recheck if we need expansion, make sure old capacity didn't change */
if (kqd->kqd_kevqcap == oldcap) {
/* copy the content from the old list to this */
for(int i = 0; i < kqd->kqd_kevqcnt; i++) {
expand[i] = kqd->kqd_kevqlist[i];
if (veclist_cap(&kqd->kqd_kevqs) == oldcap) {
expand = veclist_expand(&kqd->kqd_kevqs, expand, newcap);
if (expand != NULL) {
free(expand, M_KQUEUE);
}
free(kqd->kqd_kevqlist, M_KQUEUE);
kqd->kqd_kevqlist = expand;
kqd->kqd_kevqcap = oldcap + KQDOM_EXTENT_FACTOR;
} else {
/* some threads made changes while we allocated memory, retry */
/* some threads made changes while we were allocating memory, retry */
free(expand, M_KQUEUE);
goto retry;
}
}
KQD_OWNED(kqd);
KASSERT(kqd->kqd_kevqcnt + 1 <= kqd->kqd_kevqcap, ("kqdom didn't expand properly"));
/* insert to list */
kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = kevq;
kqd->kqd_kevqcnt++;
KASSERT(!veclist_need_exp(&kqd->kqd_kevqs), ("failed to expand kqdom"));
veclist_insert_tail(&kqd->kqd_kevqs, kevq);
if (veclist_size(&kqd->kqd_kevqs) == 1) {
kqdom_update_parents(kqd, KQDIR_ACTIVE);
}
}
/* removes a list */
/* removes a kevq from a leaf kqdom */
static void
kqdom_remove(struct kqdom *kqd, struct kevq *kevq)
{
int found;
KQD_OWNED(kqd);
KASSERT(kqd->num_children == 0, ("removing from a non-leaf kqdom"));
CTR4(KTR_KQ, "kqdom_remove: kevq: %p kqd %d: cnt: %d cap: %d", kevq, kqd->id, kqd->kqd_kevqcnt, kqd->kqd_kevqcap);
found = 0;
KASSERT(kqdom_is_leaf(kqd), ("removing from a non-leaf kqdom"));
CTR2(KTR_KQ, "kqdom_remove: kevq: %p kqdom %d", kevq, kqd->id);
/* slow, but no need to optimize for delete */
for(int i = 0; i < kqd->kqd_kevqcnt; i++) {
if(kqd->kqd_kevqlist[i] == kevq) {
found = 1;
}
veclist_remove(&kqd->kqd_kevqs, kevq);
if(found && (i+1 < kqd->kqd_kevqcnt)) {
kqd->kqd_kevqlist[i] = kqd->kqd_kevqlist[i+1];
}
if (veclist_size(&kqd->kqd_kevqs) == 0) {
kqdom_update_parents(kqd, KQDIR_INACTIVE);
}
KASSERT(found, ("cannot find kevq from kqdom"));
kqd->kqd_kevqcnt--;
kqd->kqd_kevqlist[kqd->kqd_kevqcnt] = NULL;
if (kqd->kqd_kevqcnt != 0)
kqd->kqd_ckevq = kqd->kqd_ckevq % kqd->kqd_kevqcnt;
else
kqd->kqd_ckevq = 0;
}
static void
kqdom_destroy(struct kqdom *root)
{
for(int i = 0; i < root->num_children; i++) {
kqdom_destroy(root->children[i]);
void **buf;
for(int i = 0; i < veclist_size(&root->children); i++) {
kqdom_destroy(veclist_at(&root->children, i));
}
CTR2(KTR_KQ, "kqdom_destroy: destroyed kqdom %p with %d child kqdoms", root, root->num_children);
CTR2(KTR_KQ, "kqdom_destroy: destroyed kqdom %d with %d child kqdoms", root->id, veclist_size(&root->children));
if (root->kqd_kevqlist != NULL) {
KASSERT(root->kqd_kevqcnt == 0, ("freeing a kqdom with kevqs"));
free(root->kqd_kevqlist, M_KQUEUE);
buf = veclist_buf(&root->kqd_kevqs);
if (buf != NULL) {
free(buf, M_KQUEUE);
}
if (root->children != NULL) {
free(root->children, M_KQUEUE);
buf = veclist_buf(&root->kqd_activelist);
if (buf != NULL) {
free(buf, M_KQUEUE);
}
KASSERT(root->num_active == 0, ("freeing a kqdom with active kevqs"));
buf = veclist_buf(&root->children);
if (buf != NULL) {
free(buf, M_KQUEUE);
}
mtx_destroy(&root->kqd_lock);
free(root, M_KQUEUE);
}
/* Expensive if called *frequently*
*
* Updates a kqdom based on the currently active children
*/
static void
kqdom_update_active(struct kqdom *leaf, int change)
kqdom_update_parents(struct kqdom *kqd, int direction)
{
int oldval, newval;
int cont;
struct kqdom *child;
KASSERT(change != 0, ("updating active 0"));
KQD_OWNED(kqd);
while (leaf != NULL) {
oldval = atomic_fetchadd_int(&leaf->num_active, change);
newval = oldval + change;
KASSERT(oldval >= 0 && newval >= 0, ("invalid oldval or newval after update"));
if (oldval == 0) {
change = 1;
CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id);
} else if (newval == 0) {
/* if new val is 0, we */
change = -1;
CTR3(KTR_KQ, "kqdom_update_active: change %d: num of active %d for kqdom %d", change, newval, leaf->id);
} else {
/* We are locking parent kqdoms while the leaf lock is acquired.
* Just a note, not a problem (so far)
*/
cont = 1;
while (cont) {
child = kqd;
kqd = kqd->parent;
if(kqd == NULL)
break;
KQD_LOCK(kqd);
CTR3(KTR_KQ, "kqdom_update_parents: %d updating kqdom %d with %d active children", direction, kqd->id, veclist_size(&kqd->kqd_activelist));
if (direction == KQDIR_INACTIVE) {
veclist_remove(&kqd->kqd_activelist, child);
/* didn't change from 1 to 0, stop */
if (veclist_size(&kqd->kqd_activelist) != 0) {
cont = 0;
}
} else {
/* kqd->kqd_activelist are preallocated with maximum children for non-leaf nodes
* Should NEVER fail
*/
KASSERT(!veclist_need_exp(&kqd->kqd_activelist), ("kqdom requires expansion"));
veclist_insert_tail(&kqd->kqd_activelist, child);
/* didn't change from 0 to 1, stop */
if (veclist_size(&kqd->kqd_activelist) != 1) {
cont = 0;
}
}
leaf = leaf->parent;
KQD_UNLOCK(kqd);
}
}
@ -2376,21 +2410,28 @@ kqdom_update_lat(struct kqdom *leaf, unsigned long avg)
}
}
/* DFS to mirror the cpu_group structure */
/* Mirror the cpu_group structure */
static void
kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_id)
{
void **expand;
struct kqdom *child;
int cg_numchild = cg_cur->cg_children;
CTR4(KTR_KQ, "kqdom_build_internal: processing cpu_group with %d child groups, %d CPUs, shared cache level %d, kqd_id %d",
cg_numchild, cg_cur->cg_count, cg_cur->cg_level, *kqd_id);
CTR4(KTR_KQ, "kqdom_build_internal: processing cpu_group with %d child groups, %d CPUs, shared cache level %d, kqd_id %d", cg_numchild, cg_cur->cg_count, cg_cur->cg_level, *kqd_id);
// init fields for current
/* init fields for current */
kqd_cur->id = *kqd_id;
(*kqd_id)++;
kqd_cur->num_children = cg_numchild;
CPU_COPY(&cg_cur->cg_mask, &kqd_cur->cpu_mask);
kqd_cur->children = malloc(sizeof(struct kqdom *) * cg_numchild, M_KQUEUE, M_WAITOK | M_ZERO);
/* allocate children and active lists */
if (cg_numchild > 0) {
expand = malloc(sizeof(struct kqdom *) * cg_numchild, M_KQUEUE, M_WAITOK | M_ZERO);
veclist_expand(&kqd_cur->children, expand, cg_numchild);
expand = malloc(sizeof(struct kqdom *) * cg_numchild, M_KQUEUE, M_WAITOK | M_ZERO);
veclist_expand(&kqd_cur->kqd_activelist, expand, cg_numchild);
}
for (int i = 0; i < cg_numchild; i++) {
child = malloc(sizeof(struct kqdom), M_KQUEUE, M_WAITOK | M_ZERO);
@ -2398,7 +2439,7 @@ kqdom_build_internal(struct kqdom *kqd_cur, struct cpu_group *cg_cur, int *kqd_i
child->parent = kqd_cur;
kqd_cur->children[i] = child;
veclist_insert_tail(&kqd_cur->children, child);
kqdom_build_internal(child, &cg_cur->cg_child[i], kqd_id);
}
}
@ -2417,18 +2458,18 @@ kqdom_build()
static struct kqdom *
kqdom_find(struct kqdom *root, int cpuid)
{
if (root->num_children == 0) {
if (kqdom_is_leaf(root)) {
KASSERT(CPU_ISSET(cpuid, &root->cpu_mask), ("kqdom_find: cpuid and cpumask mismatch"));
return root;
}
for(int i = 0; i < root->num_children; i++) {
if(CPU_ISSET(cpuid, &root->children[i]->cpu_mask)) {
return kqdom_find(root->children[i], cpuid);
for(int i = 0; i < veclist_size(&root->children); i++) {
if(CPU_ISSET(cpuid, &((struct kqdom *)veclist_at(&root->children, i))->cpu_mask)) {
return kqdom_find((struct kqdom *)veclist_at(&root->children, i), cpuid);
}
}
KASSERT(0, ( "kqdom_find: cpu doesn't exist "));
KASSERT(0, ("kqdom_find: cpu doesn't exist "));
return NULL;
}
@ -2544,10 +2585,11 @@ kevq_worksteal(struct kevq *kevq)
KEVQ_UNLOCK(kevq);
/* todo maybe from cur kqdomain instead of from root */
other_kevq = kqdom_random_kevq_locked(kq->kq_kqd, kevq);
other_kevq = kqdom_random_kevq_locked(kq->kq_kqd);
CTR2(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p", kevq, other_kevq);
if (other_kevq != NULL && other_kevq != kevq && other_kevq->kn_count > 0) {
CTR3(KTR_KQ, "kevq_worksteal: kevq %p selected kevq %p with %d knotes", kevq, other_kevq, other_kevq->kn_count);
ws_kn = TAILQ_FIRST(&other_kevq->kn_head);
while(ws_count < kq_sched_ws_count && ws_kn != NULL) {
@ -2659,14 +2701,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
asbt = 0;
marker = knote_alloc(M_WAITOK);
CTR2(KTR_KQ, "kqueue_scan: td %d allocated marker %p", td->td_tid, marker);
knote_xinit(marker);
marker->kn_status = KN_MARKER;
KEVQ_LOCK(kevq);
if ((kevq->kevq_state & KEVQ_RDY) == 0) {
/* Mark the kevq as ready to receive events */
kevq->kevq_state |= KEVQ_RDY;
kqdom_update_active(kevq->kevq_kqd, 1);
}
retry:
@ -2864,7 +2904,8 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
knote_flux_wakeup_ul(marker);
}
if (nkev != 0) {
if (nkev != 0 && need_track_latency(kq)) {
/* book keep the statistics */
getnanouptime(&kevq->kevq_last_kev);
kevq->kevq_last_nkev = nkev;
@ -3004,6 +3045,7 @@ static void
kevq_destroy(struct kevq *kevq)
{
CTR1(KTR_KQ, "kevq_destroy for %p", kevq);
mtx_destroy(&kevq->lock);
free(kevq, M_KQUEUE);
}
@ -3011,7 +3053,7 @@ kevq_destroy(struct kevq *kevq)
This is also called when a thread exits/crashes (currently racing, also to make it work need to reconfigure kq->ck_evq)
* a ref cnt must be held */
void
kevq_drain(struct kevq *kevq)
kevq_drain(struct kevq *kevq, struct thread *td)
{
struct kqueue *kq;
struct knote *kn;
@ -3061,17 +3103,23 @@ kevq_drain(struct kevq *kevq)
KN_FLUX_OWNED(kn);
KASSERT(!kn_in_flux(kn), ("knote is still influx"));
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
/* remove knote from kevq */
knote_dequeue(kn);
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) {
if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) {
knote_drop(kn, td);
}
/* a thread cannot crash while in kernel, and there is no extra refs
* Marker KNs should should not exist
*/
KASSERT((kn->kn_status & KN_MARKER) == 0, ("Marker KN present while closing"));
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) {
KEVQ_UNLOCK(kevq);
/* TODO: When we knote activate, if the ev has EV_CLEAR set, maybe we shouldn't activate the event
* if there hasn't been activities on the fd
*/
knote_activate(kn);
KEVQ_LOCK(kevq);
}
@ -3088,7 +3136,6 @@ kevq_drain(struct kevq *kevq)
//
// First, all knotes with kn->kn_kevq != kevq before queuing is not an issue
// because if kn->kn_kevq == NULL, scheduler will grab kevq from either kqdom (QC) or kevqlist (RR) or kn->orgkevq (EV_AFFINITY)
// EV_AFFINITY is currently broken (need to keep a list of EV_AFFINITY for each kevq and delete them atomically)
// KEVQs grabbed from QC or RR are locked with QC or RR locked, therefore they are either grabbed before kevq invalidation
// or after kevq detachment. (In between doesn't matter since kevq is already invalidated)
// In the former case, the knote would be queued to the kevq and later drained as usual.
@ -3099,10 +3146,9 @@ kevq_drain(struct kevq *kevq)
//
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// drop from KQ Domain
KQ_LOCK(kq);
KQD_LOCK(kqd);
KEVQ_TH_LOCK(kevq->kevq_th);
KQD_LOCK(kqd);
// detach from kevq_th
LIST_REMOVE(kevq, kevq_th_tqe);
@ -3110,9 +3156,6 @@ kevq_drain(struct kevq *kevq)
LIST_REMOVE(kevq, kevq_th_e);
// detach from kqdom
if((kevq->kevq_state & KEVQ_RDY) != 0) {
kqdom_update_active(kqd, -1);
}
kqdom_remove(kqd, kevq);
// detach from kqueue
@ -3121,8 +3164,8 @@ kevq_drain(struct kevq *kevq)
}
LIST_REMOVE(kevq, kq_e);
KEVQ_TH_UNLOCK(kevq->kevq_th);
KQD_UNLOCK(kqd);
KEVQ_TH_UNLOCK(kevq->kevq_th);
KQ_UNLOCK(kq);
} else {
KQ_LOCK(kq);
@ -3204,7 +3247,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
while((kevq = LIST_FIRST(&kq->kq_kevqlist)) != NULL) {
KQ_UNLOCK(kq);
if (kevq_acquire(kevq, 0) == 0)
kevq_drain(kevq);
kevq_drain(kevq, td);
KQ_LOCK(kq);
}
@ -3213,7 +3256,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
} else {
KQ_UNLOCK(kq);
// we already have a reference for single threaded mode
kevq_drain(kq->kq_kevq);
kevq_drain(kq->kq_kevq, td);
KQ_LOCK(kq);
}
@ -3456,7 +3499,7 @@ knote_activate(struct knote *kn)
void
knlist_add(struct knlist *knl, struct knote *kn, int islocked)
{
CTR1(KTR_KQ, "knlist_add kn %p", kn);
/* CTR1(KTR_KQ, "knlist_add kn %p", kn); */
KNL_ASSERT_LOCK(knl, islocked);
KQ_NOTOWNED(kn->kn_kq);
KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn));
@ -3849,75 +3892,45 @@ knote_drop_detached(struct knote *kn, struct thread *td)
knote_free(kn);
}
/* A refcnt to kevq will be held upon return */
static struct kevq *
kqdom_random_kevq_locked(struct kqdom* kqd, struct kevq* last_kevq)
kqdom_random_kevq_locked(struct kqdom *kqd)
{
struct kqdom *each_child, *last_child;
struct kevq *kevq, *each_kevq;
int num_active, init_idx;
u_long random;
struct kevq *kevq;
struct kqdom *tkqd;
int num_active;
u_long rand;
/* fall-back with the last child in case there is a race */
last_child = NULL;
kevq = NULL;
while (kqd->num_children > 0) {
/* read once */
num_active = kqd->num_active;
if (num_active == 0) {
/* if we got to a child and now it doesn't have any active children, then return NULL
this happens either on the first loop or due to a race of kevq deletion */
return NULL;
}
random = kqueue_frand() % num_active;
KASSERT(random < kqd->num_children, ("more active children than total children"));
for(int i = 0; i < kqd->num_children; i++) {
each_child = kqd->children[i];
if (each_child->num_active > 0) {
/* if the child suits our need */
last_child = each_child;
if (random == 0) {
kqd = each_child;
break;
}
random--;
}
if (i == kqd->num_children) {
kqd = last_child;
}
while (!kqdom_is_leaf(kqd)) {
rand = random();
KQD_LOCK(kqd);
/* we only select active stuff inside this, need to be EXTREMELY fast */
num_active = veclist_size(&kqd->kqd_activelist);
CTR1(KTR_KQ, "kqdom_random_kevq_locked: randomly selected leaf kqdom %d", kqd->id);
if (num_active > 0) {
tkqd = veclist_at(&kqd->kqd_activelist, rand % num_active);
} else {
tkqd = NULL;
}
KQD_UNLOCK(kqd);
kqd = tkqd;
}
if (kqd != NULL) {
CTR3(KTR_KQ, "kqdom_random_kevq_locked: selected kqd %d, # children %p, last_kevq %p", kqd->id, kqd->kqd_kevqcnt, last_kevq);
CTR1(KTR_KQ, "kqdom_random_kevq_locked: randomly selected leaf kqdom %d", kqd->id);
rand = random();
KQD_LOCK(kqd);
if (kqd->kqd_kevqcnt != 0) {
random = kqueue_frand() % kqd->kqd_kevqcnt;
init_idx = random;
num_active = veclist_size(&kqd->kqd_kevqs);
if (num_active > 0) {
kevq = veclist_at(&kqd->kqd_kevqs, rand % num_active);
KEVQ_LOCK(kevq);
each_kevq = kqd->kqd_kevqlist[random];
while(1) {
/* fast fail */
if (KEVQ_AVAIL(each_kevq) && each_kevq != last_kevq) {
KEVQ_LOCK(each_kevq);
if (KEVQ_AVAIL(each_kevq)) {
kevq = each_kevq;
break;
}
KEVQ_UNLOCK(each_kevq);
}
random = (random + 1) % kqd->kqd_kevqcnt;
if (random == init_idx) {
break;
}
each_kevq = kqd->kqd_kevqlist[random];
/* make sure kevq is available */
if (!KEVQ_AVAIL(kevq)) {
KEVQ_UNLOCK(kevq);
kevq = NULL;
}
}
KQD_UNLOCK(kqd);
@ -3927,7 +3940,7 @@ kqdom_random_kevq_locked(struct kqdom* kqd, struct kevq* last_kevq)
KEVQ_OWNED(kevq);
}
CTR2(KTR_KQ, "kqdom_random_kevq_locked: selected kevq %p, last_kevq %p", kevq, last_kevq);
CTR1(KTR_KQ, "kqdom_random_kevq_locked: randomly selected kevq %p", kevq);
return kevq;
}
@ -3939,8 +3952,7 @@ knote_next_kevq(struct knote *kn)
{
struct kqdom *kqd;
struct kqueue *kq;
struct kevq *next_kevq, *sel_kevq;
int cur_kevq;
struct kevq *next_kevq;
next_kevq = NULL;
kq = kn->kn_kq;
@ -3975,45 +3987,46 @@ knote_next_kevq(struct knote *kn)
if ((kq->kq_sched_flags & KQ_SCHED_BEST_OF_N) != 0) {
kqd = kq->kq_kqd;
for(int i = 0; i < kq_sched_bon_count; i++) {
sel_kevq = kqdom_random_kevq_locked(kqd, next_kevq);
struct kevq *sel_kevq = kqdom_random_kevq_locked(kqd);
if (sel_kevq != NULL) {
int ret;
KEVQ_OWNED(sel_kevq);
CTR2(KTR_KQ, "knote_next_kevq: [BON] selected random kevq %p for kn %p", sel_kevq, kn);
/* acquire ref don't lock */
ret = kevq_acquire(sel_kevq, 1);
if (next_kevq == NULL && kevq_acquire(sel_kevq, 1) == 0) {
KEVQ_UNLOCK(sel_kevq);
if (ret != 0) {
continue;
}
if (next_kevq == NULL) {
next_kevq = sel_kevq;
KEVQ_UNLOCK(sel_kevq);
} else {
// compare their avg wait time
// TODO: refactor the unlock pattern here
/* compare estimated wait time */
if (sel_kevq->kevq_avg_lat * sel_kevq->kn_count < next_kevq->kevq_avg_lat * next_kevq->kn_count) {
if (kevq_acquire(sel_kevq, 1) == 0) {
KEVQ_UNLOCK(sel_kevq);
kevq_release(next_kevq, 0);
next_kevq = sel_kevq;
} else {
KEVQ_UNLOCK(sel_kevq);
}
/* if the newly selected kevq is better, swap them */
kevq_release(next_kevq, 0);
next_kevq = sel_kevq;
} else {
KEVQ_UNLOCK(sel_kevq);
kevq_release(sel_kevq, 0);
}
}
CTR2(KTR_KQ, "knote_next_kevq: [BON] current best kevq %p, avg wait time: %d", next_kevq, next_kevq->kevq_avg_lat * next_kevq->kn_count);
CTR3(KTR_KQ, "knote_next_kevq: [BON] current best kevq %p, avg time: %d, wait time: %d", next_kevq, next_kevq->kevq_avg_lat, next_kevq->kevq_avg_lat * next_kevq->kn_count);
}
}
if (next_kevq != NULL) {
KEVQ_LOCK(next_kevq);
kevq_release(next_kevq, 1);
// recheck availability
if (!KEVQ_AVAIL(next_kevq)) {
KEVQ_UNLOCK(next_kevq);
next_kevq = NULL;
}
/* Here we don't recheck availability although it could change
* thread exiting is very rare, just give to the thread and
*/
}
CTR2(KTR_KQ, "knote_next_kevq: [BON] next kevq %p for kn %p", next_kevq, kn);
@ -4032,32 +4045,8 @@ knote_next_kevq(struct knote *kn)
}
kqd = kn->kn_kqd;
}
KQD_LOCK(kqd);
cur_kevq = kqd->kqd_ckevq;
while(1) {
if (kqd->kqd_kevqcnt == 0) {
break;
}
cur_kevq = (cur_kevq + 1) % kqd->kqd_kevqcnt;
next_kevq = kqd->kqd_kevqlist[cur_kevq];
if (KEVQ_AVAIL(next_kevq)) {
/* fast fail */
KEVQ_LOCK(next_kevq);
if (KEVQ_AVAIL(next_kevq)) {
kqd->kqd_ckevq = cur_kevq;
break;
}
KEVQ_UNLOCK(next_kevq);
}
if (cur_kevq == kqd->kqd_ckevq) {
next_kevq = NULL;
break;
}
}
KQD_UNLOCK(kqd);
next_kevq = kqdom_random_kevq_locked(kqd);
CTR2(KTR_KQ, "knote_next_kevq: [QUEUE] next kevq %p for kn %p", next_kevq, kn);
}
@ -4105,12 +4094,16 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
struct kqueue *kq;
kq = kn->kn_kq;
CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq);
/* CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq); */
KEVQ_OWNED(kevq);
KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued"));
KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0 && (kevq->kevq_state & KEVQ_RDY) != 0, ("kevq already closing or not ready"));
/* Queuing to a clsoing kevq is fine.
* The refcnt wait in kevq drain is before knote requeuing
* so no knote will be forgotten
* KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0 && (kevq->kevq_state & KEVQ_RDY) != 0, ("kevq already closing or not ready")); */
kn->kn_kevq = kevq;
kn->kn_status |= KN_QUEUED;
@ -4121,12 +4114,6 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
kevq_wakeup(kevq);
}
static void
knote_xinit(struct knote *kn)
{
mtx_init(&kn->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK);
}
static void
knote_dequeue(struct knote *kn)
{
@ -4156,14 +4143,18 @@ static struct knote *
knote_alloc(int mflag)
{
struct knote *ret = uma_zalloc(knote_zone, mflag | M_ZERO);
CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret);
/* CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret); */
mtx_init(&ret->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK);
return ret;
}
static void
knote_free(struct knote *kn)
{
CTR1(KTR_KQ, "knote_free: kn %p", kn);
/* CTR1(KTR_KQ, "knote_free: kn %p", kn); */
if (kn != NULL) {
mtx_destroy(&kn->kn_fluxlock);
}
uma_zfree(knote_zone, kn);
}

View File

@ -36,11 +36,15 @@
#endif
#include <sys/_task.h>
#include <sys/veclist.h>
#include <sys/stdint.h>
#define KQ_NEVENTS 8 /* minimize copy{in,out} calls */
#define KQEXTENT 256 /* linear growth by this amount */
#define KQDOM_EXTENT_FACTOR 8 /* linear growth by this amount */
#define KQDOM_EXTENT 8 /* linear growth by this amount */
#define KQDIR_ACTIVE (0)
#define KQDIR_INACTIVE (1)
struct kevq {
LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */
@ -61,29 +65,25 @@ struct kevq {
/* Used by the scheduler */
unsigned long kevq_avg_lat;
struct timespec kevq_last_kev;
int kevq_last_nkev;
uint64_t kevq_last_nkev;
};
/* TODO: assumed that threads don't get rescheduled across cores */
struct kqdom {
/* static */
int id;
struct mtx kqd_lock;
struct kqdom *parent;
int id;
cpuset_t cpu_mask;
int num_children;
struct kqdom **children;
struct veclist children; /* child kqdoms */
/* statistics */
/* statistics. Atomically updated, doesn't require the lock*/
unsigned long avg_lat;
int num_active; /* total number of active children below this node */
/* dynamic members*/
struct kevq **kqd_kevqlist; /* array list of kevqs on the kdomain, only set for leaf domains */
int kqd_kevqcap;
int kqd_kevqcnt;
int kqd_ckevq;
struct veclist kqd_activelist; /* active child kqdoms */
struct veclist kqd_kevqs; /* kevqs for this kqdom */
int kqd_ckevq; /* current kevq for round robbin. XXX: Remove round robbin it has literally no benefit but maintainance nightmares */
};
struct kqueue {

161
sys/sys/veclist.h Normal file
View File

@ -0,0 +1,161 @@
/*-
* SPDX-License-Identifier: BSD-2-Clause-FreeBSD
*
* Copyright (c)2019 Reliable Computer Systems Lab, University of Waterloo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
/* Vector list - insert/remove: O(n)
* - random access: O(1)
* - insert/remove tail: O(1)
*/
#ifndef _SYS_VECLIST_H_
#define _SYS_VECLIST_H_
#include <sys/param.h>
#include <sys/systm.h>
#include <sys/types.h>
struct veclist {
size_t cap;
size_t size;
void **buf;
};
static inline void
veclist_init(struct veclist *lst, void **buf, int cap)
{
lst->size = 0;
lst->buf = buf;
lst->cap = cap;
}
static inline void *
veclist_remove_at(struct veclist *lst, size_t idx)
{
void *ret;
KASSERT(lst->size > idx, ("veclist_remove_at index out of bound"));
ret = lst->buf[idx];
memmove(&lst->buf[idx], &lst->buf[idx+1], (lst->size - (idx + 1)) * sizeof(void*));
lst->size--;
return ret;
}
static inline void *
veclist_remove(struct veclist *lst, void *ele)
{
int found;
for(found = 0; found < lst->size; found++) {
if(lst->buf[found] == ele) {
break;
}
}
return veclist_remove_at(lst, found);
}
/* inserts an element so that the index of the element after insertion is idx */
static inline void
veclist_insert_at(struct veclist *lst, void *ele, size_t idx)
{
KASSERT((lst->cap > lst->size) && (lst->size >= idx), ("veclist overflow"));
memmove(&lst->buf[idx+1], &lst->buf[idx], (lst->size - idx) * sizeof(void*));
lst->size++;
lst->buf[idx] = ele;
}
static inline void
veclist_insert_tail(struct veclist *lst, void *ele)
{
return veclist_insert_at(lst, ele, lst->size);
}
static inline void
veclist_insert_head(struct veclist *lst, void *ele)
{
return veclist_insert_at(lst, ele, 0);
}
static inline void *
veclist_remove_head(struct veclist *lst)
{
return veclist_remove_at(lst, 0);
}
static inline void *
veclist_remove_tail(struct veclist *lst)
{
return veclist_remove_at(lst, lst->size - 1);
}
/* returns old buffer */
static inline void**
veclist_expand(struct veclist *lst, void **new_buf, size_t new_cap)
{
void **ret;
KASSERT(new_cap > lst->cap, ("veclist expand"));
memcpy(new_buf, lst->buf, lst->size * sizeof(void*));
ret = lst->buf;
lst->buf = new_buf;
lst->cap = new_cap;
return ret;
}
static inline int
veclist_need_exp(struct veclist *lst)
{
return (lst->size == lst->cap);
}
static inline int
veclist_cap(struct veclist *lst)
{
return lst->cap;
}
static inline int
veclist_size(struct veclist *lst)
{
return lst->size;
}
static inline void *
veclist_buf(struct veclist *lst)
{
return lst->buf;
}
static inline void *
veclist_at(struct veclist *lst, size_t idx)
{
KASSERT(lst->size > idx, ("veclist_at index out of bound"));
return lst->buf[idx];
}
#endif

View File

@ -46,6 +46,7 @@ int vnode_fd;
extern char * kevent_to_str(struct kevent *);
struct kevent * kevent_get(int);
struct kevent * kevent_get_timeout(int, int);
struct kevent * kevent_get_timeout_u(int kqfd, int useconds);
void kevent_cmp(struct kevent *, struct kevent *);

View File

@ -117,6 +117,28 @@ kevent_get_timeout(int kqfd, int seconds)
return (kev);
}
/* Retrieve a single kevent, specifying a maximum time to wait for it. */
struct kevent *
kevent_get_timeout_u(int kqfd, int useconds)
{
int nfds;
struct kevent *kev;
struct timespec timeout = {0, useconds * 1000};
if ((kev = calloc(1, sizeof(*kev))) == NULL)
err(1, "out of memory");
nfds = kevent(kqfd, NULL, 0, kev, 1, &timeout);
if (nfds < 0) {
err(1, "kevent(2)");
} else if (nfds == 0) {
free(kev);
kev = NULL;
}
return (kev);
}
char *
kevent_fflags_dump(struct kevent *kev)
{

View File

@ -35,14 +35,15 @@ struct thread_info {
int group_id;
int evcnt;
int tid;
int delay;
};
/*
* Read test
*/
#define THREAD_CNT (16)
#define PACKET_CNT (1600)
#define THREAD_CNT (32)
#define PACKET_CNT (3200)
int g_kqfd;
int g_sockfd[2];
@ -51,9 +52,9 @@ struct thread_info g_thrd_info[THREAD_CNT];
sem_t g_sem_driver;
static void
check_sched(struct thread_info *info, int size)
check_sched(struct thread_info *info, int size, unsigned int max_diff)
{
int max = 0, min = 999999;
int max = 0, min = INT_MAX;
for(int i = 0; i < size; i++) {
int cur = info[i].evcnt;
@ -65,11 +66,8 @@ check_sched(struct thread_info *info, int size)
}
}
if ((max - min) > 1) {
#ifdef TEST_DEBUG
printf("READ_M: check_sched: max difference is %d\n", max - min);
#endif
abort();
if ((max - min) > max_diff) {
err(1, "READ_M: check_sched: max difference is %d\n", max - min);
}
}
@ -124,6 +122,9 @@ test_socket_read_thrd(void* args)
dat = socket_pop(ret->ident);
free(ret);
if(info->delay)
usleep(info->tid * 10);
if (dat == 'e')
break;
@ -142,10 +143,10 @@ test_socket_read_thrd(void* args)
}
static void
test_socket_read(void)
test_socket_read(int delay)
{
int error = 0;
const char *test_id = "[Multi]kevent(EVFILT_READ)";
const char *test_id = delay ? "[Multi][BON]kevent" : "[Multi]kevent(EVFILT_READ)";
test_begin(test_id);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
@ -171,6 +172,7 @@ test_socket_read(void)
for (int i = 0; i < THREAD_CNT; i++) {
g_thrd_info[i].tid = i;
g_thrd_info[i].evcnt = 0;
g_thrd_info[i].delay = delay;
pthread_create(&g_thrd_info[i].thrd, NULL, test_socket_read_thrd, &g_thrd_info[i]);
}
@ -188,7 +190,8 @@ test_socket_read(void)
/* wait for thread events */
sem_wait(&g_sem_driver);
check_sched(g_thrd_info, THREAD_CNT);
if (!delay)
check_sched(g_thrd_info, THREAD_CNT, 1);
}
@ -426,41 +429,25 @@ test_socket_queue(void)
/***************************
* WS test
***************************/
#define SOCK_WS_CNT (1000)
#define SOCK_WS_CNT (100)
#define WS_TIMEOUT (10)
volatile int ws_good = 0;
static volatile int ws_num = 0;
static void*
test_socket_ws_worker(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
int ws_num = 0;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
dat = socket_pop(ret->ident);
free(ret);
while (ws_num < SOCK_WS_CNT) {
if (info->ws_master == 0) {
/*if we are the master, wait for slave to signal us*/
while(!ws_good) {
usleep(500);
}
break;
} else {
ws_num++;
if (ws_num == SOCK_WS_CNT - 1) {
ws_good = 1;
break;
ret = kevent_get_timeout_u(g_kqfd, WS_TIMEOUT);
if (ret != NULL) {
dat = socket_pop(ret->ident);
free(ret);
ws_num++;
}
}
}
@ -731,7 +718,7 @@ test_evfilt_read_m()
err(1, "ioctl");
}
test_socket_read();
test_socket_read(0);
test_socket_brutal();
close(g_kqfd);
@ -744,18 +731,7 @@ test_evfilt_read_m()
err(1, "ioctl");
}
test_socket_queue();
test_socket_brutal();
close(g_kqfd);
flags = KQ_SCHED_BEST_OF_N;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
//test_socket_queue();
test_socket_brutal();
close(g_kqfd);
@ -769,6 +745,18 @@ test_evfilt_read_m()
test_socket_ws();
test_socket_brutal();
close(g_kqfd);
flags = KQ_SCHED_BEST_OF_N;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_brutal();
test_socket_read(1);
close(g_kqfd);
}