Compare commits

...

27 Commits
master ... kq

Author SHA1 Message Date
Oscar Zhao
49b76286b6 support thread migration and refactor kqdom locks 2021-05-21 14:38:08 -04:00
Oscar Zhao
0aa6e4d988 add last_nkev 2020-05-20 04:25:02 -04:00
Oscar Zhao
2e383660dd add kn_proc_count to best of two 2020-05-20 04:11:34 -04:00
Oscar Zhao
3ce2173be6 add cap to workstealing controlled by sysctl and its stat dumps 2020-05-09 20:52:39 -04:00
Oscar Zhao
96330609b7 1. get rid of the usage of next_kn in kqueue_scan. The latter is hell to maintain and lead to countless bugs.
2. slightly reduce lock contention in the knote_drop path.
2020-05-06 03:42:29 -04:00
Oscar Zhao
61067bc214 release processing KN only when kq_scan has >0 max events. Because calling kevent with =0 max events doesn't mean all events are processed. This results in a subtle race of pre-releasing unprocessed knotes. 2020-05-05 04:14:30 -04:00
Charlie Root
d535232e92 fix workstealing > 1 and possible divide by 0 2020-05-05 01:22:57 -04:00
Oscar Zhao
29df71b972 merge with head 2020-04-30 02:15:26 -04:00
Oscar Zhao
cb10d21976 Patch for better scheduling 2020-03-31 02:31:03 -04:00
Oscar Tsalapatis
4a263e37fa adding support for single kqueue dump 2019-12-16 20:39:29 -05:00
Charlie Root
2043d894bb Fix best of 2. The wakeup bug in knote_flux. Fix work stolen knotes not being processed 2019-12-09 18:17:02 -05:00
Oscar Tsalapatis
28ef953b8c Fixed random race and hardcode CPU HZ 2019-09-17 19:34:23 -04:00
Charlie Root
7f31542099 fix infinite loop when rtshare=0; fix a crash in Queue/CPU + best2; Added stat counter for sched and priority 2019-09-15 04:57:34 -04:00
Charlie Root
638e096379 change some sysctl defaults and change WS to insert to the head 2019-09-13 16:49:10 -04:00
Oscar Zhao
98f588928f priority queue. Fixed last_nkev. 2019-09-06 16:52:22 -04:00
Oscar Zhao
d74786ec34 optimized work stealing. IOCTL dump. cache_penalty and ws_int SYSCTL and SYSINIT. 2019-09-03 21:01:51 -04:00
Charlie Root
21aa3325fc add mismatch and fallback counters 2019-08-31 14:54:51 -04:00
Oscar Zhao
caba92060d Sched flag refactoring. Implemented wqCPUX, QUEUEX. RW lock refactoring for KQDOM and KVLST 2019-08-29 20:31:11 -04:00
Oscar Zhao
f085e08d18 random kevq changed to rw lock 2019-08-26 17:54:58 -04:00
Charlie Root
8e73e73197 improve best of 2 (still slow) 2019-08-23 15:30:30 -04:00
Oscar Tsalapatis
b4d0670a1f Move KQ scheduler flags to Non kernel 2019-08-02 16:34:03 -04:00
Oscar Zhao
2fdea6945b remove kq refcnt 2019-07-29 23:49:15 -04:00
Oscar Zhao
cb4c673500 WS + BON 2019-04-19 15:33:07 -04:00
Oscar Zhao
cb22931bab fix race in sleep 2019-04-05 15:23:07 -04:00
Charlie Root
b2e5289a2d kq domain + queue cache 2019-04-01 21:35:09 -04:00
Oscar Zhao
eb525254bc Brutal test 2019-03-14 03:49:26 -04:00
BuildTools
0d1463d912 inital commit 2019-03-05 01:55:45 -05:00
15 changed files with 5339 additions and 1206 deletions

File diff suppressed because it is too large Load Diff

View File

@ -690,7 +690,7 @@ do_fork(struct thread *td, struct fork_req *fr, struct proc *p2, struct thread *
/*
* Tell any interested parties about the new process.
*/
knote_fork(p1->p_klist, p2->p_pid);
knote_fork(p1->p_klist, td, p2->p_pid);
/*
* Now can be swapped.

View File

@ -328,6 +328,12 @@ kern_thr_exit(struct thread *td)
p = td->td_proc;
/*
* Release the event queues
*/
if (td->td_kevq_thred != NULL)
kevq_thred_drain(td->td_kevq_thred, td);
/*
* If all of the threads in a process call this routine to
* exit (e.g. all threads call pthread_exit()), exactly one

View File

@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
#include <sys/umtx.h>
#include <sys/vmmeter.h>
#include <sys/cpuset.h>
#include <sys/event.h>
#ifdef HWPMC_HOOKS
#include <sys/pmckern.h>
#endif
@ -67,6 +68,10 @@ __FBSDID("$FreeBSD$");
#include <vm/uma.h>
#include <sys/eventhandler.h>
#ifdef SMP
extern struct cpu_group *cpu_top; /* CPU topology */
#endif
/*
* Asserts below verify the stability of struct thread and struct proc
* layout, as exposed by KBI to modules. On head, the KBI is allowed
@ -82,9 +87,9 @@ _Static_assert(offsetof(struct thread, td_flags) == 0xfc,
"struct thread KBI td_flags");
_Static_assert(offsetof(struct thread, td_pflags) == 0x104,
"struct thread KBI td_pflags");
_Static_assert(offsetof(struct thread, td_frame) == 0x498,
_Static_assert(offsetof(struct thread, td_frame) == 0x498 + 0x8,
"struct thread KBI td_frame");
_Static_assert(offsetof(struct thread, td_emuldata) == 0x6a0,
_Static_assert(offsetof(struct thread, td_emuldata) == 0x6a0 + 0x10,
"struct thread KBI td_emuldata");
_Static_assert(offsetof(struct proc, p_flag) == 0xb0,
"struct proc KBI p_flag");

View File

@ -33,6 +33,8 @@
#include <sys/_types.h>
#include <sys/queue.h>
#include <sys/lock.h>
#include <sys/mutex.h>
#define EVFILT_READ (-1)
#define EVFILT_WRITE (-2)
@ -143,6 +145,8 @@ struct kevent32_freebsd11 {
#define EV_CLEAR 0x0020 /* clear event state after reporting */
#define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */
#define EV_DISPATCH 0x0080 /* disable event after reporting */
#define EV_AFFINITY 0x0200 /* in multithreaded mode, this event has hard affinity for the registering thread */
#define EV_REALTIME 0x0400 /* this knote has REALTIME priority */
#define EV_SYSFLAGS 0xF000 /* reserved by system */
#define EV_DROP 0x1000 /* note should be dropped */
@ -218,8 +222,12 @@ struct kevent32_freebsd11 {
struct knote;
SLIST_HEAD(klist, knote);
TAILQ_HEAD(ktailq, knote);
struct kqueue;
TAILQ_HEAD(kqlist, kqueue);
struct kevq;
LIST_HEAD(kevqlist, kevq);
struct knlist {
struct klist kl_list;
void (*kl_lock)(void *); /* lock function */
@ -282,8 +290,18 @@ struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
// struct task kn_timer_task; /* timer task for kn */
// int kn_timer_task_queued;
int kn_drop;
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which queue we are on */
TAILQ_ENTRY(knote) kn_pqe; /* knote for the processing queue */
struct kqueue *kn_kq; /* which kqueue we are on */
struct kevq *kn_kevq; /* the kevq the knote is on, only valid if KN_QUEUED */
struct kevq *kn_proc_kevq; /* the kevq that's processing the knote, only valid if KN_PROCESSING */
/* used by the scheduler */
struct kevq *kn_org_kevq; /* the kevq that registered the knote */
struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */
/* end scheduler */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
@ -292,10 +310,15 @@ struct knote {
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
#define KN_MARKER 0x10 /* ignore this knote */
#define KN_KQUEUE 0x20 /* this knote belongs to a kq */
#define KN_SCAN 0x40 /* flux set in kqueue_scan() */
#define KN_PROCESSING 0x80 /* the knote on the kevq is undergoing userspace processing */
#define KN_WS 0x100 /* the knote is stolen from another kevq */
int kn_fluxwait;
int kn_influx;
u_long kn_rand_seed;
struct mtx kn_fluxlock;
int kn_sfflags; /* saved filter flags */
int64_t kn_sdata; /* saved data field */
union {
@ -321,6 +344,14 @@ struct kevent_copyops {
size_t kevent_size;
};
struct kevq_thred {
u_long kevq_hashmask; /* hash mask for kevqs */
struct kevqlist *kevq_hash; /* hash table for kevqs */
struct kevqlist kevq_list;
struct mtx lock; /* the lock for the kevq*/
};
struct thread;
struct proc;
struct knlist;
@ -328,7 +359,7 @@ struct mtx;
struct rwlock;
void knote(struct knlist *list, long hint, int lockflags);
void knote_fork(struct knlist *list, int pid);
void knote_fork(struct knlist *list, struct thread *td, int pid);
struct knlist *knlist_alloc(struct mtx *lock);
void knlist_detach(struct knlist *knl);
void knlist_add(struct knlist *knl, struct knote *kn, int islocked);
@ -352,6 +383,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *p,
int kqueue_add_filteropts(int filt, struct filterops *filtops);
int kqueue_del_filteropts(int filt);
void kevq_thred_drain(struct kevq_thred *kevq_th, struct thread *td);
#else /* !_KERNEL */
#include <sys/cdefs.h>
@ -366,4 +398,38 @@ __END_DECLS
#endif /* !_KERNEL */
/*
* The ioctl to set multithreaded mode
*/
#define FKQMULTI _IOW('f', 89, int)
#define FKQTUNE _IOW('f', 90, int)
#define FKQMPRNT _IOW('f', 91, uintptr_t)
/*
* KQ sched
*/
#define KQ_SCHED_QUEUE 0x01 /* affnitizes knotes to the current cpu, sarg = extra queues to check */
#define KQ_SCHED_CPU 0x02 /* affinitize knotes to the first cpu, sarg = extra queues to check */
#define KQ_SCHED_BEST 0x04 /* Best of N, sarg = N */
/*
* KQ sched features
*/
#define KQ_SCHED_FEAT_WS 0x01 /* work stealing, farg = # of knotes to steal */
/*
* KQ tunables
*/
#define KQTUNE_FREQ 0x01 /* the target frequency of each call, default 0 meaning unlimited */
#define KQTUNE_RTSHARE 0x02 /* the percent share of runtime events vs batch events, default 100 meaning always hand runtime events first */
/*
* 0 - 7: sched
* 8 - 15: sargs
* 16 - 23: features
* 24 - 31: fargs
*/
#define KQSCHED_MAKE(sched, sargs, feat, fargs) (((sched) & 0xFF) | (((sargs) & 0xFF) << 8) | (((feat) & 0xFF) << 16) | (((fargs) & 0xFF) << 24))
#define KQTUNE_MAKE(obj, val) ((obj & 0xFFFF) | (val & 0xFFFF) << 16)
#endif /* !_SYS_EVENT_H_ */

View File

@ -31,38 +31,136 @@
#ifndef _SYS_EVENTVAR_H_
#define _SYS_EVENTVAR_H_
#include <sys/_stdint.h>
#include <sys/queue.h>
#ifndef _KERNEL
#error "no user-serviceable parts inside"
#endif
#include <sys/_task.h>
#include <sys/veclist.h>
#include <sys/stdint.h>
#include <sys/param.h>
#include <sys/lock.h>
#include <sys/rwlock.h>
#define KQ_NEVENTS 8 /* minimize copy{in,out} calls */
#define KQEXTENT 256 /* linear growth by this amount */
#define KQDIR_ACTIVE (0)
#define KQDIR_INACTIVE (1)
struct kevq {
/* 1st cacheline */
/* Sched stats */
u_long kevq_rand_seed;
uint64_t kevq_avg_lat;
uint64_t kevq_avg_ev;
uint64_t kevq_tot_ev;
uint64_t kevq_tot_time;
/* the following two are only set when the thread is procssing in userspace
* in kernel they are set to special value KEVQ_LAST_KERN
*/
#define KEVQ_LAST_KERN (0)
uint64_t kevq_last_kev;
uint32_t kevq_last_nkev;
#define KEVQ_SLEEP 0x01
#define KEVQ_CLOSING 0x02
#define KEVQ_ACTIVE 0x04
#define KEVQ_WS 0x08 /* the kevq is work stealing */
#define KEVQ_SCAN 0x10 /* the kevq is being scanned */
int kevq_state;
int kn_proc_count; /* number of processing knotes */
int kn_count; /* number of pending knotes */
int kn_rt_count; /* number of runtime knotes */
/* end 1st cache line */
LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */
LIST_ENTRY(kevq) kq_e; /* entry into kq */
LIST_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's kevq_list */
struct kqueue *kq; /* the kq that the kevq belongs to */
struct kqdom *kevq_kqd; /* the kq domain the kevq is on */
/* XXX: Make kevq contain a struct thread ptr instead of this dude */
struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */
struct mtx lock; /* the lock for the kevq */
struct ktailq kn_head; /* list of pending knotes */
struct knote *kn_marker;
struct ktailq kn_rt_head; /* list of pending knotes with runtime priority */
struct knote *kn_marker_rt;
struct ktailq kn_proc_head; /* list of pending knotes being processed */
int kevq_refcnt;
/* TODO: maybe these should be in kqdomain or global */
uint64_t kevq_tot_fallback;
uint64_t kevq_tot_kqd_mismatch;
uint64_t kevq_tot_sched;
uint64_t kevq_tot_realtime;
uint64_t kevq_tot_syscall;
uint64_t kevq_tot_ws;
uint64_t kevq_tot_ws_scan;
uint64_t kevq_avg_rlimit;
};
/* TODO: assumed that threads don't get rescheduled across cores */
struct kqdom {
/* static */
int id;
struct kqdom *parent;
cpuset_t cpu_mask;
struct veclist children; /* child kqdoms */
/* statistics. Atomically updated, doesn't require the lock*/
uint64_t avg_lat;
/* dynamic members*/
struct veclist kqd_activelist; /* active child kqdoms */
struct veclist kqd_kevqs; /* kevqs for this kqdom */
};
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
TAILQ_ENTRY(kqueue) kq_list;
TAILQ_HEAD(, knote) kq_head; /* list of pending event */
int kq_count; /* number of pending events */
struct selinfo kq_sel;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_state;
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */
#define KQ_ASYNC 0x08
#define KQ_ASYNC 0x02
#define KQ_TASKSCHED 0x04 /* task scheduled */
#define KQ_TASKDRAIN 0x08 /* waiting for task to drain */
#define KQ_CLOSING 0x10
#define KQ_TASKSCHED 0x20 /* task scheduled */
#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */
int kq_flags;
#define KQ_FLAG_INIT 0x01 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */
#define KQ_FLAG_MULTI 0x02 /* Multi-threaded mode */
TAILQ_ENTRY(kqueue) kq_list;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
struct kevq *kq_kevq; /* the kevq for kq, always created, act as buffer queue in multithreaded mode */
struct task kq_task;
struct ucred *kq_cred;
struct kevqlist kq_kevqlist; /* list of kevqs */
/* scheduler flags for the KQ, set by IOCTL */
int kq_sfeat;
int kq_ssargs;
int kq_ssched;
int kq_sfargs;
/* tuneables for the KQ, set by IOCTL */
int kq_tfreq;
int kq_rtshare;
/* statistics */
u_long kq_total_sched_time;
/* Default */
struct rwlock kevq_vlist_lk;
struct veclist kevq_vlist;
/* CPU queue */
struct rwlock kqd_lock;
struct kqdom *kq_kqd; /* root domain */
};
#endif /* !_SYS_EVENTVAR_H_ */

View File

@ -172,6 +172,7 @@ enum sysinit_sub_id {
SI_SUB_SMP = 0xf000000, /* start the APs*/
#endif
SI_SUB_RACCTD = 0xf100000, /* start racctd*/
SI_SUB_KQUEUE = 0xf200000, /* initialize kqueue */
SI_SUB_LAST = 0xfffffff /* final initialization */
};

View File

@ -305,6 +305,7 @@ struct thread {
int td_errno; /* (k) Error from last syscall. */
size_t td_vslock_sz; /* (k) amount of vslock-ed space */
struct kcov_info *td_kcov_info; /* (*) Kernel code coverage data */
struct kevq_thred *td_kevq_thred;
#define td_endzero td_sigmask
/* Copied during fork1() or create_thread(). */

171
sys/sys/veclist.h Normal file
View File

@ -0,0 +1,171 @@
/*-
* SPDX-License-Identifier: BSD-2-Clause-FreeBSD
*
* Copyright (c)2019 Reliable Computer Systems Lab, University of Waterloo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
/* Vector list - insert/remove: O(n)
* - random access: O(1)
* - insert/remove tail: O(1)
*/
#ifndef _SYS_VECLIST_H_
#define _SYS_VECLIST_H_
#include <sys/param.h>
#include <sys/systm.h>
#include <sys/types.h>
#include <sys/malloc.h>
#include <sys/errno.h>
struct veclist {
size_t cap;
size_t size;
struct malloc_type *mtype;
void **buf;
};
#define VECLIST_EXPAND_FACTOR (2)
#define VECLIST_INIT_SZ (8)
/* returns old buffer */
static inline int
veclist_expand(struct veclist *lst, size_t new_cap)
{
void **new_buf;
KASSERT(new_cap > lst->cap, ("veclist expand"));
new_buf = (void **)malloc(new_cap * sizeof(void*), lst->mtype, M_NOWAIT);
if (new_buf == NULL) {
return ENOMEM;
}
memcpy(new_buf, lst->buf, lst->size * sizeof(void*));
free(lst->buf, lst->mtype);
lst->buf = new_buf;
lst->cap = new_cap;
return 0;
}
static inline int
veclist_init(struct veclist *lst, size_t init_cap, struct malloc_type *mtype)
{
lst->cap = 0;
lst->buf = NULL;
lst->size = 0;
lst->mtype = mtype;
return init_cap ? veclist_expand(lst, init_cap) : 0;
}
static inline void *
veclist_remove_at(struct veclist *lst, size_t idx)
{
void *ret;
KASSERT(lst->size > idx, ("veclist_remove_at index out of bound"));
ret = lst->buf[idx];
memmove(&lst->buf[idx], &lst->buf[idx+1], (lst->size - (idx + 1)) * sizeof(void*));
lst->size--;
return ret;
}
static inline void
veclist_destroy(struct veclist *lst)
{
free(lst->buf, lst->mtype);
}
static inline void *
veclist_remove(struct veclist *lst, void *ele)
{
int found;
for(found = 0; found < lst->size; found++) {
if(lst->buf[found] == ele) {
break;
}
}
return veclist_remove_at(lst, found);
}
/* inserts an element so that the index of the element after insertion is idx */
static inline int
veclist_insert_at(struct veclist *lst, void *ele, size_t idx)
{
int err;
KASSERT(idx <= lst->size, ("veclist idx overflow"));
if (lst->size == lst->cap) {
/* needs expansion */
err = veclist_expand(lst, lst->cap == 0 ? VECLIST_INIT_SZ : lst->cap * VECLIST_EXPAND_FACTOR);
if (err) {
return err;
}
}
memmove(&lst->buf[idx+1], &lst->buf[idx], (lst->size - idx) * sizeof(void*));
lst->size++;
lst->buf[idx] = ele;
return 0;
}
static inline int
veclist_insert_tail(struct veclist *lst, void *ele)
{
return veclist_insert_at(lst, ele, lst->size);
}
static inline int
veclist_insert_head(struct veclist *lst, void *ele)
{
return veclist_insert_at(lst, ele, 0);
}
static inline void *
veclist_remove_head(struct veclist *lst)
{
return veclist_remove_at(lst, 0);
}
static inline void *
veclist_remove_tail(struct veclist *lst)
{
return veclist_remove_at(lst, lst->size - 1);
}
static inline int
veclist_size(struct veclist *lst)
{
return lst->size;
}
static inline void *
veclist_at(struct veclist *lst, size_t idx)
{
KASSERT(lst->size > idx, ("veclist_at index out of bound"));
return lst->buf[idx];
}
#endif

View File

@ -15,6 +15,10 @@ SRCS.kqtest= \
vnode.c \
proc.c \
signal.c \
read_m.c \
user.c
WARNS?= 2
LDADD+= -lthr
.include <bsd.test.mk>

View File

@ -0,0 +1,151 @@
#include "common.h"
#include "common_m.h"
#include <sys/event.h>
#include <sys/ioctl.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/sysctl.h>
#include <pthread_np.h>
#include <unistd.h>
#include <stdlib.h>
#define SOCK_CLOSE_PROB (30)
#define CLOSER_RAND_DELAY (17)
#define QUEUE_RAND_DELAY (7)
#define CLOSER_TOT_SOCK (8)
#define PACKET_CNT (CLOSER_TOT_SOCK * 1024)
#define THREAD_CNT (8)
static int close_socks[CLOSER_TOT_SOCK][2];
static volatile int close_stop;
static int g_kqfd;
static void *
socket_closer(void* args)
{
struct kevent kev;
while(!close_stop) {
int ran = rand() % CLOSER_TOT_SOCK;
printf("closed idx %d...\n", ran);
close(close_socks[ran][0]);
close(close_socks[ran][1]);
/* events are supposed to clean up themselves after fd invalidates */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &close_socks[ran][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, close_socks[ran][0], EVFILT_READ, EV_ADD, 0, 0, &close_socks[ran][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
usleep(rand() % CLOSER_RAND_DELAY);
}
return NULL;
}
static void *
socket_worker(void* args)
{
char dat;
struct kevent *ret;
while (1) {
ret = kevent_get(g_kqfd);
printf("processing packet...\n");
dat = socket_pop_igerr(ret->ident);
if (dat == 'e')
break;
free(ret);
}
return NULL;
}
void
test_socket_close(char* name)
{
pthread_t workers[THREAD_CNT];
pthread_t closer;
char id[256];
struct kevent kev;
const char *test_id = "[Multi]kevent(close) - ";
strcpy(id, test_id);
strcat(id, name);
test_begin(id);
close_stop = 0;
srand(time(NULL));
int flags = KQSCHED_MAKE(KQ_SCHED_CPU, 2, 0, 0);
g_kqfd = kqueue();
int error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
for (int i = 0; i < CLOSER_TOT_SOCK; i++) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &close_socks[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, close_socks[i][0], EVFILT_READ, EV_ADD, 0, 0, &close_socks[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
}
for (int i = 0; i < THREAD_CNT; i++) {
pthread_create(&workers[i], NULL, socket_worker, NULL);
}
pthread_create(&closer, NULL, socket_closer, NULL);
for(int i = 0; i < PACKET_CNT; i++) {
socket_push_igerr(close_socks[rand() % CLOSER_TOT_SOCK][1], '.');
usleep(rand() % QUEUE_RAND_DELAY);
}
printf("Stopping closer...\n");
close_stop = 1;
pthread_join(closer, NULL);
printf("Closer stopped!\n");
for (int i = 0; i < THREAD_CNT; i++) {
socket_push(close_socks[rand() % CLOSER_TOT_SOCK][1], 'e');
}
for (int i = 0; i < THREAD_CNT; i++) {
pthread_join(workers[i], NULL);
}
for (int i = 0; i < CLOSER_TOT_SOCK; i++) {
close(close_socks[i][0]);
close(close_socks[i][1]);
}
printf("Threads stopped!\n");
success();
}

View File

@ -46,6 +46,8 @@ extern int kqfd;
char * kevent_to_str(struct kevent *);
struct kevent * kevent_get(int);
struct kevent * kevent_get_timeout(int, int);
struct kevent * kevent_get_timeout_u(int kqfd, uint64_t useconds);
int kevent_get_n(int kqfd, struct kevent *kev, int n);
void kevent_cmp(struct kevent *, struct kevent *);

View File

@ -0,0 +1,39 @@
#ifndef _COMMON_M_H
#define _COMMON_M_H
#include "common.h"
static inline char
socket_pop(int sockfd)
{
char buf;
if (read(sockfd, &buf, 1) < 1)
err(1, "read(2)");
return buf;
}
static inline char
socket_pop_igerr(int sockfd)
{
char buf = 0;
read(sockfd, &buf, 1);
return buf;
}
static inline void
socket_push(int sockfd, char ch)
{
if (write(sockfd, &ch, 1) < 1) {
err(1, "write(2)");
}
}
static inline void
socket_push_igerr(int sockfd, char ch)
{
write(sockfd, &ch, 1);
}
#endif

View File

@ -22,6 +22,17 @@
#include "common.h"
int kqfd;
extern void test_evfilt_read();
extern void test_evfilt_signal();
extern void test_evfilt_vnode();
extern void test_evfilt_timer();
extern void test_evfilt_read_m();
extern void test_evfilt_proc();
#if HAVE_EVFILT_USER
extern void test_evfilt_user();
#endif
static char *cur_test_id = NULL;
static int testnum = 1;
@ -68,6 +79,19 @@ test_no_kevents_quietly(void)
}
}
/* Retrieve n kevents */
int
kevent_get_n(int kqfd, struct kevent *kev, int n)
{
int nfds;
nfds = kevent(kqfd, NULL, 0, kev, n, NULL);
if (nfds < 1)
err(1, "kevent(2)");
return nfds;
}
/* Retrieve a single kevent */
struct kevent *
kevent_get(int fd)
@ -107,6 +131,29 @@ kevent_get_timeout(int fd, int seconds)
return (kev);
}
/* Retrieve a single kevent, specifying a maximum time to wait for it. */
struct kevent *
kevent_get_timeout_u(int kqfd, uint64_t useconds)
{
int nfds;
struct kevent *kev;
uint64_t nsec = useconds * 1000;
struct timespec timeout = {nsec / 1000000000, nsec % 1000000000};
printf("timeout: %ld sec, %ld nsec\n", timeout.tv_sec, timeout.tv_nsec);
if ((kev = calloc(1, sizeof(*kev))) == NULL)
err(1, "out of memory");
nfds = kevent(kqfd, NULL, 0, kev, 1, &timeout);
if (nfds < 0) {
err(1, "kevent(2)");
} else if (nfds == 0) {
free(kev);
kev = NULL;
}
return (kev);
}
static char *
kevent_fflags_dump(struct kevent *kev)
{
@ -313,6 +360,7 @@ main(int argc, char **argv)
int test_signal = 1;
int test_vnode = 1;
int test_timer = 1;
int test_socket_m = 1;
#ifdef __FreeBSD__
int test_user = 1;
#else
@ -333,6 +381,8 @@ main(int argc, char **argv)
test_vnode = 0;
if (strcmp(argv[0], "--no-user") == 0)
test_user = 0;
if (strcmp(argv[0], "--no-socket_m") == 0)
test_socket_m = 0;
argv++;
argc--;
}
@ -351,6 +401,8 @@ main(int argc, char **argv)
if (test_socket)
test_evfilt_read();
if (test_socket_m)
test_evfilt_read_m();
if (test_signal)
test_evfilt_signal();
if (test_vnode)

View File

@ -0,0 +1,985 @@
/*
* Copyright (c) 2009 Mark Heily <mark@heily.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* $FreeBSD$
*/
#include "common.h"
#include "common_m.h"
#include <sys/event.h>
#include <sys/ioctl.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/sysctl.h>
#include <pthread_np.h>
#include <unistd.h>
#include <stdlib.h>
//#define TEST_DEBUG
struct thread_info {
pthread_t thrd;
int can_crash;
int ws_master;
pthread_mutex_t lock;
int group_id;
int evcnt;
int tid;
int delay;
};
/*
* Read test
*/
#define THREAD_CNT (8)
#define PACKET_CNT (1600)
static int g_kqfd;
static int g_sockfd[2];
static struct thread_info g_thrd_info[THREAD_CNT];
/* Test threads signals this upon receiving events */
static sem_t g_sem_driver;
static char dmpbuf[1024 * 1024 + 1];
static inline void
dump_gkq()
{
int error;
uintptr_t para = (uintptr_t)dmpbuf;
/* dump KQ */
memset(dmpbuf, 0, 1024 * 1024 + 1);
error = ioctl(g_kqfd, FKQMPRNT, &para);
if (error == -1) {
err(1, "dump ioctl failed");
} else {
printf("%s\n", dmpbuf);
}
}
/***************************
* Read test
***************************/
static void*
test_socket_read_thrd(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
dat = socket_pop(ret->ident);
free(ret);
if(info->delay)
usleep(info->tid * 10);
if (dat == 'e')
break;
info->evcnt++;
/* signal the driver */
sem_post(&g_sem_driver);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
sem_post(&g_sem_driver);
pthread_exit(0);
}
static void
test_socket_read(int delay)
{
int error = 0;
const char *test_id = delay ? "[Multi][BON]kevent" : "[Multi]kevent(EVFILT_READ)";
test_begin(test_id);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
struct kevent kev;
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]);
sem_init(&g_sem_driver, 0, 0);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent add failed with %d\n", errno);
#endif
err(1, "kevent_add");
}
#ifdef TEST_DEBUG
printf("READ_M: creating %d threads...\n", THREAD_CNT);
#endif
for (int i = 0; i < THREAD_CNT; i++) {
g_thrd_info[i].tid = i;
g_thrd_info[i].evcnt = 0;
g_thrd_info[i].delay = delay;
pthread_create(&g_thrd_info[i].thrd, NULL, test_socket_read_thrd, &g_thrd_info[i]);
}
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to wait on KQ...\n");
#endif
sleep(3);
for(int i = 0; i < PACKET_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", i);
#endif
socket_push(g_sockfd[1], '.');
/* wait for thread events */
sem_wait(&g_sem_driver);
}
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for(int i = 0; i < THREAD_CNT; i++) {
socket_push(g_sockfd[1], 'e');
sem_wait(&g_sem_driver);
}
for (int i = 0; i < THREAD_CNT; i++) {
pthread_join(g_thrd_info[i].thrd, NULL);
}
#ifdef TEST_DEBUG
printf("READ_M: clearing kevent...\n");
#endif
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent delete failed with %d\n", errno);
#endif
err(1, "kevent_delete");
}
#ifdef TEST_DEBUG
printf("READ_M: closing sockets...\n");
#endif
close(g_sockfd[0]);
close(g_sockfd[1]);
success();
}
/***************************
* Queue test
***************************/
#define THREAD_QUEUE_CNT (4)
#define PACKET_QUEUE_CNT (1000)
static int
get_ncpu()
{
int mib[4];
int numcpu;
size_t len = sizeof(numcpu);
mib[0] = CTL_HW;
mib[1] = HW_NCPU;
sysctl(mib, 2, &numcpu, &len, NULL, 0);
if (numcpu < 1)
{
err(1, "< 1 cpu detected");
}
return numcpu;
}
static void*
test_socket_queue_thrd(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
dat = socket_pop(ret->ident);
free(ret);
if (dat == 'e')
break;
info->evcnt++;
/* signal the driver */
sem_post(&g_sem_driver);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
sem_post(&g_sem_driver);
pthread_exit(0);
}
static void
test_socket_queue(void)
{
int error = 0;
const char *test_id = "[Multi][Queue]kevent(EVFILT_READ)";
test_begin(test_id);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
struct kevent kev;
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]);
sem_init(&g_sem_driver, 0, 0);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent add failed with %d\n", errno);
#endif
err(1, "kevent_add");
}
cpuset_t cpuset;
int ncpu = get_ncpu();
int tid = 0;
#ifdef TEST_DEBUG
printf("READ_M: detected %d cores...\n", ncpu);
#endif
struct thread_info **group = malloc(sizeof(struct thread_info*) * ncpu);
for (int i = 0; i < ncpu; i++) {
group[i] = malloc(sizeof(struct thread_info) * THREAD_QUEUE_CNT);
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
group[i][j].tid = tid;
tid++;
group[i][j].evcnt = 0;
group[i][j].group_id = i;
pthread_attr_t attr;
pthread_attr_init(&attr);
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
if (pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
pthread_create(&group[i][j].thrd, &attr, test_socket_queue_thrd, &group[i][j]);
#ifdef TEST_DEBUG
printf("READ_M: created and affinitized thread %d to core group %d\n", group[i][j].tid, i);
#endif
}
}
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to wait on KQ...\n");
#endif
sleep(3);
int affinity_group = -1;
for(int k = 1; k <= PACKET_QUEUE_CNT; k++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", k);
#endif
socket_push(g_sockfd[1], '.');
/* wait for thread events */
sem_wait(&g_sem_driver);
/* basically only one group should get events, do this for now, ideally we can have a table that remembers each knote's affinity*/
for(int i = 0; i < ncpu; i++) {
int sum = 0;
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
sum += group[i][j].evcnt;
}
if (sum != 0 && affinity_group == -1) {
affinity_group = i;
}
#ifdef TEST_DEBUG
printf("READ_M: group %d sum %d, affinity group: %d\n", i, sum, affinity_group);
#endif
if (i == affinity_group) {
if (sum != k) {
err(1, "affinity group sum != 1");
}
} else {
if (sum != 0) {
err(1, "non-affinity group sum != 0");
}
}
}
}
dump_gkq();
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for(int i = 0; i < THREAD_QUEUE_CNT * ncpu; i++) {
socket_push(g_sockfd[1], 'e');
sem_wait(&g_sem_driver);
}
for (int i = 0; i < ncpu; i++) {
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
pthread_join(group[i][j].thrd, NULL);
}
free(group[i]);
}
free(group);
#ifdef TEST_DEBUG
printf("READ_M: clearing kevent...\n");
#endif
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent delete failed with %d\n", errno);
#endif
err(1, "kevent_delete");
}
#ifdef TEST_DEBUG
printf("READ_M: closing sockets...\n");
#endif
close(g_sockfd[0]);
close(g_sockfd[1]);
success();
}
/***************************
* WS test
***************************/
#define SOCK_WS_CNT (127)
#define SOCK_WS_TOT (SOCK_WS_CNT * 50)
static volatile int ws_num = 0;
static volatile int ws_ok = 0;
static void*
test_socket_ws_worker(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (ws_num < SOCK_WS_TOT) {
ret = kevent_get(g_kqfd);
if (info->ws_master == 0) {
if (ret != NULL) {
free(ret);
}
break;
}
if (ret != NULL) {
dat = socket_pop(ret->ident);
#ifdef TEST_DEBUG
printf("READ_M: thread %d wokeup for event: ws_num: %d\n", info->tid, ws_num);
#endif
free(ret);
ws_num++;
}
}
/* the master does nothing */
while(!ws_ok) {
};
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
pthread_exit(0);
}
int ws_sockfd[SOCK_WS_CNT + 1][2];
static void
test_socket_ws()
{
struct kevent kev;
struct thread_info thrd_info[2];
const char *test_id = "[Multi][WS]kevent(evfilt)";
test_begin(test_id);
for (int i = 0; i < SOCK_WS_CNT + 1; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &ws_sockfd[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &ws_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_add");
}
}
srand(time(NULL));
#ifdef TEST_DEBUG
printf("READ_M: creating master thread...\n");
#endif
for (int i = 0; i < 1; i++) {
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
}
sleep(1);
/* push 1 packet to the last socket*/
socket_push(ws_sockfd[SOCK_WS_CNT][1], '.');
sleep(1);
for(int i = 1; i < 2; i++) {
#ifdef TEST_DEBUG
printf("READ_M: creating slave thread...\n");
#endif
thrd_info[i].tid = i;
thrd_info[i].ws_master = i;
pthread_create(&thrd_info[i].thrd, NULL, test_socket_ws_worker, &thrd_info[i]);
}
sleep(1);
for(int i = 0; i < SOCK_WS_TOT; i++) {
socket_push(ws_sockfd[i % SOCK_WS_CNT][1], '.');
}
while(ws_num < SOCK_WS_TOT) {
};
dump_gkq();
ws_ok = 1;
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to exit...\n");
#endif
for (int i = 0; i < 2; i++) {
pthread_join(thrd_info[i].thrd, NULL);
}
for (int i = 0; i < SOCK_WS_CNT + 1; i++) {
EV_SET(&kev, ws_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &ws_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_delete");
}
}
success();
}
static uint64_t get_utime()
{
struct timespec spec;
clock_gettime(CLOCK_REALTIME_PRECISE, &spec);
return spec.tv_nsec / 1000 + spec.tv_sec * 1000 * 1000;
}
#define TIMEOUT_THRESHOLD (1)
static void
test_socket_ws_check_timeout(uint64_t utimeout)
{
struct kevent *kev;
uint64_t start = get_utime();
kev = kevent_get_timeout_u(g_kqfd, utimeout);
uint64_t end = get_utime();
int pct = (end - start) * 100 / utimeout;
if (kev != NULL) {
err(1, "ws timeout kev != NULL");
}
if (pct > TIMEOUT_THRESHOLD) {
err(1, "ws timeout error too large: %d", pct);
}
}
static void
test_socket_ws_timeout()
{
struct kevent kev, *ret;
const char *test_id = "[Multi][WS]kevent_timeout(evfilt)";
test_begin(test_id);
int flags = KQSCHED_MAKE(0,0,KQ_SCHED_FEAT_WS,1);
int tkqfd = kqueue();
int error = ioctl(tkqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
EV_SET(&kev, g_sockfd[1], EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(tkqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_timeout_add");
}
/* 1s */
printf("1s. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 1000);
/* 100 ms */
printf("100ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 100);
/* 10 ms */
printf("10ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000 * 10);
/* 1 ms */
printf("1ms. \n");
ret = kevent_get_timeout_u(tkqfd, 1000);
/* 100 us */
printf("100u. \n");
ret = kevent_get_timeout_u(tkqfd, 100);
/* 10 us */
printf("10us. \n");
ret = kevent_get_timeout_u(tkqfd, 10);
/* 1 us */
printf("1us. \n");
ret = kevent_get_timeout_u(tkqfd, 1);
EV_SET(&kev, g_sockfd[1], EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (kevent(tkqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_ws_timeout_delete");
}
close(g_sockfd[0]);
close(g_sockfd[1]);
close(tkqfd);
success();
}
/***************************
* Brutal test
***************************/
#define THREAD_BRUTE_CNT (8)
#define SOCK_BRUTE_CNT (256)
#define PACKET_BRUTE_CNT (256 * (SOCK_BRUTE_CNT))
#define THREAD_EXIT_PROB (50)
#define BRUTE_REALTIME_PROB (50)
#define BRUTE_MAX_FREQ (10000)
#define BRUTE_MIN_FREQ (1)
#define RAND_SLEEP (13)
#define RAND_SEND_SLEEP (7)
static int brute_sockfd[SOCK_BRUTE_CNT][2];
static struct thread_info brute_threadinfo[THREAD_BRUTE_CNT];
static void*
test_socket_brutal_worker(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
if ((rand() % 100) < THREAD_EXIT_PROB) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d checking fake crash\n", info->tid);
#endif
pthread_mutex_lock(&info->lock);
#ifdef TEST_DEBUG
printf("READ_M: thread %d trying to fake crash. Can crash: %d\n", info->tid, info->can_crash);
#endif
if (info->can_crash) {
pthread_create(&info->thrd, NULL, test_socket_brutal_worker, info);
pthread_mutex_unlock(&info->lock);
free(ret);
pthread_exit(0);
}
pthread_mutex_unlock(&info->lock);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d ident: %ld\n", info->tid, ret->ident);
#endif
dat = socket_pop(ret->ident);
if (dat == 'e')
break;
free(ret);
info->evcnt++;
usleep(rand() % RAND_SLEEP);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting...\n", info->tid);
#endif
pthread_exit(0);
return NULL;
}
static void
test_socket_brutal(char* name)
{
char id[256];
struct kevent kev;
const char *test_id = "[Multi]kevent(brutal) - ";
strcpy(id, test_id);
strcat(id, name);
test_begin(id);
srand(time(NULL));
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &brute_sockfd[i][0]) < 0) {
err(1, "kevent_socket");
}
int evflag = (rand() % 100 < BRUTE_REALTIME_PROB) ? EV_REALTIME : 0;
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD | evflag, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
}
#ifdef TEST_DEBUG
printf("READ_M: creating %d threads...\n", THREAD_BRUTE_CNT);
#endif
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
brute_threadinfo[i].tid = i;
brute_threadinfo[i].evcnt = 0;
brute_threadinfo[i].can_crash = ((i % 10) != 0);
pthread_mutex_init(&brute_threadinfo[i].lock, NULL);
pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]);
}
for(int i = 0; i < PACKET_BRUTE_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", i);
#endif
socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], '.');
usleep(rand() % RAND_SEND_SLEEP);
}
while (1) {
int sum = 0;
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
sum += brute_threadinfo[i].evcnt;
}
if (sum == PACKET_BRUTE_CNT) {
break;
}
#ifdef TEST_DEBUG
printf("READ_M: waiting for all packets to finish processing. Cur: %d Tgt: %d\n", sum, PACKET_BRUTE_CNT);
#endif
/* randomize the freq and share */
int error;
int val;
val = KQTUNE_MAKE(KQTUNE_RTSHARE, (rand() % 100) + 1);
error = ioctl(g_kqfd, FKQTUNE, &val);
if (error == -1) {
err(1, "ioctl TUNE");
}
val = KQTUNE_MAKE(KQTUNE_FREQ, rand() % (BRUTE_MAX_FREQ - BRUTE_MIN_FREQ) + BRUTE_MIN_FREQ);
error = ioctl(g_kqfd, FKQTUNE, &val);
if (error == -1) {
err(1, "ioctl TUNE");
}
usleep(1000);
}
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
pthread_mutex_lock(&brute_threadinfo[i].lock);
brute_threadinfo[i].can_crash = 0;
pthread_mutex_unlock(&brute_threadinfo[i].lock);
}
for(int i = 0; i < THREAD_BRUTE_CNT; i++) {
socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], 'e');
}
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
pthread_join(brute_threadinfo[i].thrd, NULL);
pthread_mutex_destroy(&brute_threadinfo[i].lock);
}
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_delete");
}
}
success();
}
/* realtime test */
static void
test_socket_check_rt(int kqfd, int kev_sz, int rtcnt)
{
struct kevent *kev = malloc(sizeof(struct kevent) * kev_sz);
int nev = kevent_get_n(kqfd, kev, kev_sz);
if (nev != kev_sz) {
err(1, "too few events: expected %d, recvd %d", kev_sz, nev);
}
for (int i = 0; i < rtcnt; i++) {
if (!(kev[i].flags & EV_REALTIME)) {
err(1, "expected realtime");
}
}
for (int i = rtcnt; i < kev_sz; i++) {
if (kev[i].flags & EV_REALTIME) {
err(1, "expected !realtime");
}
}
free(kev);
}
static void
test_socket_rt_share(int kqfd, int kev_sz, int share)
{
if (share < 0 || share > 100) {
err(1, "INVAL");
}
int flag = KQTUNE_MAKE(KQTUNE_RTSHARE, share);
int error = ioctl(kqfd, FKQTUNE, &flag);
if (error == -1) {
err(1, "ioctl KQTUNE");
}
test_socket_check_rt(kqfd, kev_sz, (kev_sz * share + 99) / 100);
}
static void
test_socket_realtime()
{
/* create 8 sockets, 4 realtime 4 normal
* we are gonna test how kq hands requests back to us for different shares
*/
test_begin("kevent(realtime)");
int kqfd = kqueue();
struct kevent kev;
int socks[8][2];
for (int i = 0; i < 8; i++) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &socks[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, socks[i][0], EVFILT_READ, EV_ADD | (i >= 4 ? EV_REALTIME : 0), 0, 0, NULL);
if (kevent(kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
}
/* push packets to the socket */
for (int i = 0; i < 8; i++) {
socket_push(socks[i][1], '.');
}
for (int i = 1; i <= 100; i++) {
test_socket_rt_share(kqfd, 4, i);
}
for (int i = 0; i < 8; i++) {
close(socks[i][0]);
close(socks[i][1]);
}
close(kqfd);
success();
}
extern void
test_socket_close(char* name);
void
test_evfilt_read_m()
{
int flags = 0;
int error;
/* close test */
//test_socket_close("default");
/* Default rand */
flags = 0;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_read(0);
test_socket_realtime();
test_socket_brutal("rand");
close(g_kqfd);
/* BO2 */
flags = KQSCHED_MAKE(KQ_SCHED_BEST,2,0,0);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_read(1);
test_socket_brutal("best2");
close(g_kqfd);
/* Queue + bo0 */
flags = KQSCHED_MAKE(KQ_SCHED_QUEUE,0,0,0);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
//test_socket_queue();
test_socket_brutal("queue0");
close(g_kqfd);
/* Queue + bo2*/
flags = KQSCHED_MAKE(KQ_SCHED_QUEUE,2, 0, 0);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
//test_socket_queue();
test_socket_brutal("queue2");
close(g_kqfd);
/* Queue + bo2 + ws */
flags = KQSCHED_MAKE(KQ_SCHED_QUEUE,2, KQ_SCHED_FEAT_WS, 1);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
//test_socket_queue();
test_socket_brutal("queue2_ws");
close(g_kqfd);
/* CPU + Bo0 */
flags = KQSCHED_MAKE(KQ_SCHED_CPU,0,0,0);;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_brutal("cpu0");
close(g_kqfd);
/* CPU + Bo2 */
flags = KQSCHED_MAKE(KQ_SCHED_CPU,2,0,0);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_brutal("cpu2");
close(g_kqfd);
/* WS */
flags = KQSCHED_MAKE(0,0,KQ_SCHED_FEAT_WS,1);
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_ws();
test_socket_ws_timeout();
test_socket_brutal("ws1");
close(g_kqfd);
}