Brutal test

This commit is contained in:
Oscar Zhao 2019-03-14 03:49:26 -04:00
parent 0d1463d912
commit eb525254bc
5 changed files with 368 additions and 252 deletions

View File

@ -71,6 +71,7 @@ __FBSDID("$FreeBSD$");
#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/user.h>
#include <sys/ktr.h>
#ifdef KTRACE
#include <sys/ktrace.h>
#endif
@ -232,7 +233,7 @@ static unsigned int kq_calloutmax = 4 * 1024;
SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW,
&kq_calloutmax, 0, "Maximum number of callouts allocated for kqueue");
#define KQ_DEBUG
#define KTR_KQ (KTR_SPARE5)
#define KQ_LOCK(kq) do { \
mtx_lock(&(kq)->kq_lock); \
@ -328,9 +329,7 @@ knote_enter_flux_ul(struct knote *kn)
static void
knote_enter_flux(struct knote *kn)
{
#ifdef KQ_DEBUG
printf("KQUEUE: knote_enter_flux: %p\n", kn);
#endif
CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx);
KN_FLUX_OWNED(kn);
MPASS(kn->kn_influx < INT_MAX);
kn->kn_influx++;
@ -351,9 +350,7 @@ knote_leave_flux_ul(struct knote *kn)
static bool
knote_leave_flux(struct knote *kn)
{
#ifdef KQ_DEBUG
printf("KQUEUE: knote_leave_flux: %p\n", kn);
#endif
CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx);
KN_FLUX_OWNED(kn);
MPASS(kn->kn_influx > 0);
kn->kn_influx--;
@ -444,9 +441,7 @@ filt_fileattach(struct knote *kn)
static int
kqueue_kqfilter(struct file *fp, struct knote *kn)
{
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_kqfilter called for kn %p\n", kn);
#endif
CTR1(KTR_KQ, "kqueue_kqfilter called for kn %p", kn);
struct kqueue *kq = kn->kn_fp->f_data;
@ -473,18 +468,23 @@ static int
filt_kqueue(struct knote *kn, long hint)
{
struct kqueue *kq = kn->kn_fp->f_data;
struct kevq *kevq;
#ifdef KQ_DEBUG
printf("KQUEUE: filt_kqueue called for kn %p\n", kn);
#endif
CTR1(KTR_KQ, "filt_kqueue called for kn %p", kn);
if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
return 0;
}
kn->kn_data = kq->kq_kevq->kn_count;
kevq = TAILQ_FIRST(&kq->kq_kevqlist);
if (kevq == NULL) {
return 0;
} else {
kn->kn_data = kevq->kn_count;
return (kn->kn_data > 0);
}
}
/* XXX - move to kern_proc.c? */
static int
@ -571,9 +571,7 @@ filt_proc(struct knote *kn, long hint)
struct proc *p;
u_int event;
#ifdef KQ_DEBUG
printf("KQUEUE: filt_proc called for kn %p\n", kn);
#endif
CTR1(KTR_KQ, "KQUEUE: filt_proc called for kn %p", kn);
p = kn->kn_ptr.p_proc;
if (p == NULL) /* already activated, from attach filter */
@ -635,12 +633,6 @@ knote_fork(struct knlist *list, struct thread *td, int pid)
continue;
}
/*
* The NOTE_TRACK case. In addition to the activation
* of the event, we need to register new events to
* track the child. Drop the locks in preparation for
* the call to kqueue_register().
*/
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
@ -648,24 +640,21 @@ knote_fork(struct knlist *list, struct thread *td, int pid)
* The same as knote(), activate the event.
*/
if ((kn->kn_sfflags & NOTE_TRACK) == 0) {
error = kn->kn_fop->f_event(kn, NOTE_FORK);
KQ_UNLOCK(kq);
if(error)
if(kn->kn_fop->f_event(kn, NOTE_FORK))
knote_activate(kn);
KN_LEAVE_FLUX_WAKEUP(kn);
KQ_UNLOCK(kq);
continue;
}
KQ_UNLOCK(kq);
/*
* The NOTE_TRACK case. In addition to the activation
* of the event, we need to register new events to
* track the child. Drop the locks in preparation for
* the call to kqueue_register().
*/
KQ_UNLOCK(kq);
list->kl_unlock(list->kl_lockarg);
/*
@ -967,9 +956,7 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type)
static int
filt_timer(struct knote *kn, long hint)
{
#ifdef KQ_DEBUG
printf("KQUEUE: filt_timer called for kn %p\n", kn);
#endif
CTR1(KTR_KQ, "filt_timer called for kn %p", kn);
return (kn->kn_data != 0);
}
@ -1000,9 +987,7 @@ filt_userdetach(__unused struct knote *kn)
static int
filt_user(struct knote *kn, __unused long hint)
{
#ifdef KQ_DEBUG
printf("KQUEUE: filt_user called for kn %p\n", kn);
#endif
CTR1(KTR_KQ, "KQUEUE: filt_user called for kn %p", kn);
return (kn->kn_hookid);
}
@ -1420,7 +1405,7 @@ kern_kevent_anonymous(struct thread *td, int nevents,
kqueue_init(&kq);
kevq_init(&kevq);
kq.kq_kevq = &kevq;
TAILQ_INSERT_HEAD(&kq.kq_kevqlist, &kevq, kq_e);
kevq.kq = &kq;
kq.kq_refcnt = 1;
kevq.kevq_refcnt = 1;
@ -1438,9 +1423,7 @@ kqueue_add_filteropts(int filt, struct filterops *filtops)
error = 0;
if (filt > 0 || filt + EVFILT_SYSCOUNT < 0) {
printf(
"trying to add a filterop that is out of range: %d is beyond %d\n",
~filt, EVFILT_SYSCOUNT);
CTR2(KTR_KQ, "trying to add a filterop that is out of range: %d is beyond %d", ~filt, EVFILT_SYSCOUNT);
return EINVAL;
}
mtx_lock(&filterops_lock);
@ -1529,9 +1512,8 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct
struct knlist *knl;
int error, filt, event;
int haskqglobal, filedesc_unlock;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X\n", kq, kevq, (int)kev->ident, kev->filter, kev->flags);
#endif
CTR5(KTR_KQ, "kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X", kq, kevq, (int)kev->ident, kev->filter, kev->flags);
if ((kev->flags & (EV_ENABLE | EV_DISABLE)) == (EV_ENABLE | EV_DISABLE))
return (EINVAL);
@ -1751,8 +1733,8 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct
kn->kn_status |= KN_DISABLED;
/*
* The user may changkhe initial EV_ADD,
* but doing so will kas already been
* The user may change some filter values after the initial EV_ADD,
* but doing so will not reset any filter which has already been
* triggered.
*/
kn->kn_status |= KN_SCAN;
@ -1785,7 +1767,11 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct
event = 0;
KQ_LOCK(kq);
if (event)
kn->kn_status |= KN_ACTIVE;
if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) ==
KN_ACTIVE)
knote_activate(kn);
kn->kn_status &= ~KN_SCAN;
@ -1822,14 +1808,26 @@ kevq_thred_destroy(struct kevq_thred *kevq_th) {
void
kevq_thred_drain(struct kevq_thred *kevq_th) {
/* [Solved] Race here on kevq. Consider:
* Thread 1: Grabs KEVQ ptr for thread 2 from kevq_list in knote_sched
* Thread 2: crashes and calls kevq_drain, destorys kevq
* Thread 1: KEVQ_ACQUIRE((already freed))
* Maybe require holding the KQ lock when deleting to make sure nothing queries
* the kevq_list during deletion
* this is solved by holding the KQ lock while scheduling
* thread 1 either accesses kevq_list (which requires KQ lock) before or after thread 2's kevq is deleted from the KQ
* if before, then thread 2's kevq cannot be freed because kevq_drain deletes from kevq_list then frees kevq
* if after, then thread 2's kevq cannot be found in kevq_list.
*/
struct kevq *kevq;
int error;
error = 0;
while((kevq = TAILQ_FIRST(&kevq_th->kevq_tq)) != NULL) {
// multithreaded mode, acquire kevq ref cnt
error = kevq_acquire(kevq);
if (!error) {
CTR1(KTR_KQ, "kevq_thred_drain: draining kevq %p\n", kevq);
kevq_drain(kevq);
}
}
@ -1846,13 +1844,11 @@ kevq_init(struct kevq *kevq) {
static void
kevq_release(struct kevq* kevq, int locked)
{
#ifdef KQ_DEBUG
printf("KQUEUE: Releasing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt);
#endif
if (locked)
KEVQ_OWNED(kevq);
else
KEVQ_LOCK(kevq);
CTR2(KTR_KQ, "releasing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt);
kevq->kevq_refcnt--;
if (kevq->kevq_refcnt == 1)
wakeup(&kevq->kevq_refcnt);
@ -1863,13 +1859,11 @@ kevq_release(struct kevq* kevq, int locked)
static int
kevq_acquire(struct kevq *kevq)
{
#ifdef KQ_DEBUG
printf("KQUEUE: Referencing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt);
#endif
KEVQ_NOTOWNED(kevq);
int error;
error = 0;
KEVQ_LOCK(kevq);
CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt);
if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) {
error = EINVAL;
} else {
@ -1909,9 +1903,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
thread_lock(td);
if (td->td_kevq_thred == NULL) {
td->td_kevq_thred = kevq_th;
#ifdef KQ_DEBUG
printf("KQUEUE: kevq_acquire_kq(M): allocated kevq_th %p for thread %d\n", kevq_th, td->td_tid);
#endif
CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq_th %p for thread %d", kevq_th, td->td_tid);
} else {
to_free = kevq_th;
kevq_th = td->td_kevq_thred;
@ -1941,13 +1933,12 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
alloc_kevq->kq = kq;
alloc_kevq->kevq_th = kevq_th;
#ifdef KQ_DEBUG
printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", alloc_kevq, td->td_tid);
#endif
CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq %p for thread %d", alloc_kevq, td->td_tid);
KEVQ_TH_LOCK(kevq_th);
kevq = kevqlist_find(kevq_list, kq);
/* TODO: probably don't need to re-check unless a thread can asynchronously call
* kevent (signal handler?) */
if (kevq == NULL) {
kevq = alloc_kevq;
// insert kevq to the kevq_th hash table
@ -1972,27 +1963,28 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
KASSERT(kevq != NULL, ("kevq isn't allocated."));
} else {
if (kq->kq_kevq == NULL) {
kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO);
#ifdef KQ_DEBUG
printf("KQUEUE: kevq_acquire_kq(S): allocated kevq %p for kq %p\n", kevq, kq);
#endif
kevq_init(kevq);
kevq->kq = kq;
// prob don't need this lock depending on whether TAILQ_FIRST is atomic
KQ_LOCK(kq);
kevq = TAILQ_FIRST(&kq->kq_kevqlist);
KQ_UNLOCK(kq);
if (kevq == NULL) {
alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO);
CTR2(KTR_KQ, "kevq_acquire_kq(S): allocated kevq %p for kq %p", alloc_kevq, kq);
kevq_init(alloc_kevq);
alloc_kevq->kq = kq;
KQ_LOCK(kq);
if (kq->kq_kevq == NULL) {
kq->kq_kevq = kevq;
if ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) == NULL) {
TAILQ_INSERT_HEAD(&kq->kq_kevqlist, alloc_kevq, kq_e);
kevq = alloc_kevq;
} else {
to_free = kevq;
to_free = alloc_kevq;
}
KQ_UNLOCK(kq);
if (to_free != NULL) {
free(to_free, M_KQUEUE);
}
} else {
kevq = kq->kq_kevq;
}
}
@ -2016,10 +2008,10 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp)
*kqp = kq;
KQ_LOCK(kq);
if (((kq->kq_state) & KQ_CLOSING) != 0) {
return (EBADF);
}
if ((kq->kq_state & KQ_FLAG_INIT) == 0) {
/* Mark the kqueue as initialized
TODO: Why not make some locks separate field so we
don't have short critical sections like this */
kq->kq_state |= KQ_FLAG_INIT;
}
kq->kq_refcnt++;
@ -2220,6 +2212,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
} else
asbt = 0;
marker = knote_alloc(M_WAITOK);
CTR2(KTR_KQ, "kqueue_scan: td %d allocated marker %p", td->td_tid, marker);
knote_xinit(marker);
marker->kn_status = KN_MARKER;
KEVQ_LOCK(kevq);
@ -2231,22 +2224,16 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
retry:
kevp = keva;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: kevq %p has %d events\n", kevq, kevq->kn_count);
#endif
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
if (kevq->kn_count == 0) {
if (asbt == -1) {
error = EWOULDBLOCK;
} else {
kevq->kevq_state |= KEVQ_SLEEP;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: thread %d waiting on kevq %p for events\n", td->td_tid, kevq);
#endif
CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq);
error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH,
"kqread", asbt, rsbt, C_ABSOLUTE);
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: thread %d wokeup on kevq %p for events\n", td->td_tid, kevq);
#endif
CTR2(KTR_KQ, "kqueue_scan: td %d wokeup on kevq %p for events", td->td_tid, kevq);
}
if (error == 0)
goto retry;
@ -2275,16 +2262,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
}
kn->kn_fluxwait = 1;
KN_FLUX_UNLOCK(kn);
CTR3(KTR_KQ, "kqueue_scan: td %d fluxwait on kn %p marker %p", td->td_tid, kn, marker);
error = msleep(kn, &kevq->lock, PSOCK,
"kqflxwt", 0);
"kevqflxwt3", 0);
CTR3(KTR_KQ, "kqueue_scan: td %d fluxwait WAKEUP kn %p marker %p", td->td_tid, kn, marker);
continue;
}
/* Now we have exclusive access to kn */
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: thread %d on kevq %p dequeued knote %p\n", td->td_tid, kevq, kn);
#endif
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn);
if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) {
kn->kn_status &= ~KN_QUEUED;
kevq->kn_count--;
@ -2295,6 +2283,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
/* We are dequeuing our marker, wakeup threads waiting on it */
knote_flux_wakeup(kn);
KN_FLUX_UNLOCK(kn);
CTR2(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p", td->td_tid, kn);
if (count == maxevents) {
goto retry;
}
@ -2350,9 +2339,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
kevq->kn_count--;
kn_list_unlock(knl);
influx = 1;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: kn %p not valid anymore for kevq %p\n", kn, kevq);
#endif
CTR3(KTR_KQ, "kqueue_scan: kn %p not valid anymore for kevq %p, td %d", kn, kevq, td->td_tid);
continue;
}
touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL);
@ -2376,9 +2363,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
kevq->kn_count--;
} else {
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_scan: requeued kn %p to kevq %p\n", kn, kevq);
#endif
CTR2(KTR_KQ, "kqueue_scan: requeued kn %p to kevq %p", kn, kevq);
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
}
@ -2394,10 +2379,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
count--;
if (nkev == KQ_NEVENTS) {
KEVQ_UNLOCK(kevq);
influx = 0;
knote_flux_wakeup_ul(kn);
KEVQ_UNLOCK(kevq);
error = k_ops->k_copyout(k_ops->arg, keva, nkev);
nkev = 0;
@ -2410,12 +2394,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
TAILQ_REMOVE(&kevq->kn_head, marker, kn_tqe);
done:
KEVQ_OWNED(kevq);
KEVQ_UNLOCK(kevq);
if (kn != NULL) {
knote_flux_wakeup_ul(kn);
}
if (marker != NULL) {
knote_flux_wakeup_ul(marker);
}
KEVQ_UNLOCK(kevq);
CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid);
knote_free(marker);
done_nl:
KEVQ_NOTOWNED(kevq);
@ -2473,17 +2462,13 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data,
int error = 0;
kq = fp->f_data;
#ifdef KQ_DEBUG
printf("KQUEUE: ioctl: received: kq %p cmd: 0x%lx", kq, cmd);
#endif
CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd);
switch (cmd) {
case FKQMULTI:
if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) {
error = (EINVAL);
} else {
#ifdef KQ_DEBUG
printf("KQUEUE: ioctl: multi flag set for kq %p", kq);
#endif
CTR1(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p", kq);
KQ_LOCK(kq);
kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI);
KQ_UNLOCK(kq);
@ -2509,27 +2494,24 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)))
return POLLERR;
KQ_LOCK(kq);
if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) {
// TODO: support select? really need it in multithreaded mode?
KQ_LOCK(kq);
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
revents = 0;
} else {
if (events & (POLLIN | POLLRDNORM)) {
if (kq->kq_kevq->kn_count) {
revents |= events & (POLLIN | POLLRDNORM);
} else {
selrecord(td, &kq->kq_sel);
if (SEL_WAITING(&kq->kq_sel))
kq->kq_state |= KQ_SEL;
}
revents = 0;
} else {
if (events & (POLLIN | POLLRDNORM)) {
if (kevq->kn_count) {
revents |= events & (POLLIN | POLLRDNORM);
} else {
selrecord(td, &kq->kq_sel);
if (SEL_WAITING(&kq->kq_sel))
kq->kq_state |= KQ_SEL;
}
}
kqueue_release(kq, 1);
KQ_UNLOCK(kq);
kevq_release(kevq, 0);
}
kqueue_release(kq, 1);
KQ_UNLOCK(kq);
kevq_release(kevq, 0);
return (revents);
}
@ -2554,9 +2536,7 @@ kqueue_stat(struct file *fp, struct stat *st, struct ucred *active_cred,
static void
kevq_destroy(struct kevq *kevq)
{
#ifdef KQ_DEBUG
printf("KQUEUE: kevq_destroy for %p \n", kevq);
#endif
CTR1(KTR_KQ, "kevq_destroy for %p", kevq);
free(kevq, M_KQUEUE);
}
@ -2569,9 +2549,7 @@ kevq_drain(struct kevq *kevq)
struct kqueue *kq;
struct knote *kn;
struct kevqlist *kevq_list;
#ifdef KQ_DEBUG
printf("KQUEUE: kevq_drain for %p (refcnt = %d) with %d knotes\n", kevq, kevq->kevq_refcnt, kevq->kn_count);
#endif
CTR3(KTR_KQ, "kevq_drain for %p (refcnt = %d) with %d knotes", kevq, kevq->kevq_refcnt, kevq->kn_count);
kq = kevq->kq;
KQ_NOTOWNED(kq);
@ -2594,19 +2572,21 @@ kevq_drain(struct kevq *kevq)
KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!"));
/* drain all knotes on the kevq */
TAILQ_FOREACH(kn, &kevq->kn_head, kn_tqe) {
while ((kn = TAILQ_FIRST(&kevq->kn_head)) != NULL) {
retry:
KEVQ_OWNED(kevq);
KN_FLUX_LOCK(kn);
/* Wait for kn to stablize */
if (kn_in_flux(kn)) {
kn->kn_fluxwait = 1;
CTR2(KTR_KQ, "kevq_drain %p fluxwait knote %p", kevq, kn);
KN_FLUX_UNLOCK(kn);
msleep(kn, &kevq->lock, PSOCK, "kevqclose2", 0);
goto retry;
}
CTR2(KTR_KQ, "kevq_drain %p draining knote %p", kevq, kn);
KN_FLUX_OWNED(kn);
KASSERT(!kn_in_flux(kn), ("knote is still influx"));
@ -2614,7 +2594,7 @@ kevq_drain(struct kevq *kevq)
KN_FLUX_UNLOCK(kn);
knote_dequeue(kn);
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) {
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) {
KEVQ_UNLOCK(kevq);
knote_activate_ul(kn);
KEVQ_LOCK(kevq);
@ -2627,6 +2607,9 @@ kevq_drain(struct kevq *kevq)
KEVQ_UNLOCK(kevq);
KQ_LOCK(kq);
if (kq->kq_ckevq == kevq) {
kq->kq_ckevq = TAILQ_NEXT(kevq, kq_e);
}
TAILQ_REMOVE(&kq->kq_kevqlist, kevq, kq_e);
KQ_UNLOCK(kq);
@ -2653,9 +2636,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
int i;
int error;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_drain on %p. args kevq %p\n", kq, kevq);
#endif
CTR2(KTR_KQ, "kqueue_drain on %p. args kevq %p", kq, kevq);
KQ_LOCK(kq);
@ -2800,9 +2781,7 @@ kqueue_close(struct file *fp, struct thread *td)
crfree(kq->kq_cred);
free(kq, M_KQUEUE);
fp->f_data = NULL;
#ifdef KQ_DEBUG
printf("KQUEUE: kqueue_closed for %p.\n", kq);
#endif
CTR1(KTR_KQ, "kqueue_close: %p.", kq);
return (0);
}
@ -2820,8 +2799,8 @@ kevq_wakeup(struct kevq* kevq)
KEVQ_OWNED(kevq);
if ((kevq->kevq_state & KEVQ_SLEEP) == KEVQ_SLEEP) {
kevq->kevq_state &= ~KEVQ_SLEEP;
wakeup(kevq);
}
wakeup(kevq);
}
static void
@ -2871,9 +2850,7 @@ knote(struct knlist *list, long hint, int lockflags)
* or other threads could remove events.
*/
SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) {
#ifdef KQ_DEBUG
printf("KNOTE: knote() scanning kn %p\n", kn);
#endif
CTR1(KTR_KQ, "knote() scanning kn %p", kn);
KN_FLUX_LOCK(kn);
if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) {
/*
@ -2923,10 +2900,8 @@ static void
knote_flux_wakeup(struct knote *kn)
{
KN_FLUX_OWNED(kn);
if ((kn)->kn_fluxwait) {
(kn)->kn_fluxwait = 0;
wakeup((kn));
}
CTR1(KTR_KQ, "waking up kn %p", kn);
wakeup(kn);
}
static void
@ -2948,9 +2923,7 @@ knote_activate(struct knote *kn)
struct kqueue *kq;
kq = kn->kn_kq;
#ifdef KQ_DEBUG
printf("KQUEUE: knote_activate: kn %p\n", kn);
#endif
CTR1(KTR_KQ, "knote_activate: kn %p", kn);
KQ_OWNED(kq);
KN_FLUX_NOTOWNED(kn);
KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn));
@ -2970,9 +2943,7 @@ knote_activate(struct knote *kn)
void
knlist_add(struct knlist *knl, struct knote *kn, int islocked)
{
#ifdef KQ_DEBUG
printf("KNLIST: knlist_add kn %p\n", kn);
#endif
CTR1(KTR_KQ, "knlist_add kn %p", kn);
KNL_ASSERT_LOCK(knl, islocked);
KQ_NOTOWNED(kn->kn_kq);
KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn));
@ -3270,13 +3241,12 @@ knote_fdclose(struct thread *td, int fd)
knote_flux_wakeup(kn);
kn->kn_fluxwait = 1;
KN_FLUX_UNLOCK(kn);
msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt", 0);
msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt4", 0);
goto again;
}
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
KQ_UNLOCK(kq);
/* WTF ? influx should be 0? */
influx = 1;
knote_drop(kn, td);
KQ_LOCK(kq);
@ -3344,14 +3314,15 @@ knote_drop_detached(struct knote *kn, struct thread *td)
if (!SLIST_EMPTY(list))
SLIST_REMOVE(list, kn, knote, kn_link);
KQ_UNLOCK(kq);
if (kn->kn_status & KN_QUEUED) {
KEVQ_LOCK(kevq);
knote_dequeue(kn);
KEVQ_UNLOCK(kevq);
}
KN_LEAVE_FLUX_WAKEUP(kn);
KQ_UNLOCK(kq);
if (kn->kn_fop->f_isfd) {
fdrop(kn->kn_fp, td);
kn->kn_fp = NULL;
@ -3373,13 +3344,11 @@ knote_sched(struct knote *kn)
KASSERT(kn_in_flux(kn), ("kn not in flux"));
// reschedule knotes to available threads
if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) {
#ifdef KQ_DEBUG
printf("KQUEUE: knote_sched(M) affinity set: kn %p \n", kn);
#endif
CTR1(KTR_KQ, "knote_sched: affinity set: kn %p", kn);
if ((kn->kn_org_kevq->kevq_state & KEVQ_RDY) != 0) {
next_kevq = kn->kn_org_kevq;
KEVQ_LOCK(next_kevq);
} else {
next_kevq = NULL;
}
@ -3389,9 +3358,7 @@ knote_sched(struct knote *kn)
if (next_kevq == NULL) {
next_kevq = TAILQ_FIRST(&kq->kq_kevqlist);
if (next_kevq == NULL) {
#ifdef KQ_DEBUG
printf("KQUEUE: knote_sched(M) no kevqs exist for queueing kn %p, discarding... \n", kn);
#endif
CTR1(KTR_KQ, "knote_sched: no kevqs exist for queueing kn %p, discarding...", kn);
break;
}
} else {
@ -3409,28 +3376,20 @@ knote_sched(struct knote *kn)
break;
}
KEVQ_UNLOCK(next_kevq);
if (next_kevq == kq->kq_ckevq) {
// if the previous "if" didn't break
// we have traversed the list once and the current kevq is closing
// we have no queue to queue the knote
#ifdef KQ_DEBUG
printf("KQUEUE: knote_sched(M) no open kevqs for queueing kn %p, discarding... \n", kn);
#endif
CTR1(KTR_KQ, "knote_sched: no open kevqs for queueing kn %p, discarding...", kn);
next_kevq = NULL;
break;
}
}
}
} else {
#ifdef KQ_DEBUG
printf("KQUEUE: knote_sched(S): kn %p to kevq %p\n", kn, kq->kq_kevq);
#endif
next_kevq = kq->kq_kevq;
}
#ifdef KQ_DEBUG
printf("KQUEUE: knote_sched(M) next kevq %p for kn %p \n", next_kevq, kn);
#endif
CTR2(KTR_KQ, "knote_sched: next kevq %p for kn %p", next_kevq, kn);
if (next_kevq != NULL) {
KEVQ_OWNED(next_kevq);
@ -3445,9 +3404,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq)
struct kqueue *kq;
kq = kn->kn_kq;
#ifdef KQ_DEBUG
printf("KQUEUE: knote_enqueue: kn %p to kevq %p\n", kn, kevq);
#endif
CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq);
KEVQ_OWNED(kevq);
@ -3476,9 +3433,7 @@ knote_dequeue(struct knote *kn)
KEVQ_OWNED(kevq);
#ifdef KQ_DEBUG
printf("KQUEUE: knote_dequeue: kn %p from kevq %p\n", kn, kevq);
#endif
CTR2(KTR_KQ, "knote_dequeue: kn %p from kevq %p", kn, kevq);
KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued"));
TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe);
@ -3499,16 +3454,15 @@ SYSINIT(knote, SI_SUB_PSEUDO, SI_ORDER_ANY, knote_init, NULL);
static struct knote *
knote_alloc(int mflag)
{
return (uma_zalloc(knote_zone, mflag | M_ZERO));
struct knote *ret = uma_zalloc(knote_zone, mflag | M_ZERO);
CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret);
return ret;
}
static void
knote_free(struct knote *kn)
{
#ifdef KQ_DEBUG
printf("KQUEUE: knote_free: kn %p\n", kn);
#endif
CTR1(KTR_KQ, "knote_free: kn %p", kn);
uma_zfree(knote_zone, kn);
}

View File

@ -331,6 +331,12 @@ kern_thr_exit(struct thread *td)
p = td->td_proc;
/*
* Release the event queues
*/
if (td->td_kevq_thred != NULL)
kevq_thred_drain(td->td_kevq_thred);
/*
* If all of the threads in a process call this routine to
* exit (e.g. all threads call pthread_exit()), exactly one

View File

@ -68,6 +68,10 @@ __FBSDID("$FreeBSD$");
#include <vm/uma.h>
#include <sys/eventhandler.h>
#ifdef SMP
extern struct cpu_group *cpu_top; /* CPU topology */
#endif
/*
* Asserts below verify the stability of struct thread and struct proc
* layout, as exposed by KBI to modules. On head, the KBI is allowed
@ -445,9 +449,6 @@ thread_free(struct thread *td)
cpu_thread_free(td);
if (td->td_kstack != 0)
vm_thread_dispose(td);
if (td->td_kevq_thred != NULL) {
kevq_thred_drain(td->td_kevq_thred);
}
callout_drain(&td->td_slpcallout);
uma_zfree(thread_zone, td);
}

View File

@ -75,12 +75,8 @@ struct kqueue {
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
/* only-set: in multithreaded mode */
TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */
struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */
/* only-set: in single threaded mode */
struct kevq *kq_kevq;
/* End only-set */
struct task kq_task;
struct ucred *kq_cred;
};

View File

@ -22,31 +22,37 @@
#include <semaphore.h>
#include <pthread.h>
#define THREAD_CNT (100)
#define PACKET_CNT (5000)
#define FKQMULTI _IO('f', 89)
#define TEST_DEBUG
struct thread_info {
pthread_t thrd;
int can_crash;
pthread_mutex_t lock;
int evcnt;
int tid;
};
/*
* Read test
*/
#define THREAD_CNT (32)
#define PACKET_CNT (1000)
int g_kqfd;
int g_sockfd[2];
int g_end;
struct thread_info g_thrd_info[THREAD_CNT];
/* Test threads signals this upon receiving events */
sem_t g_sem_driver;
static int
check_sched(void)
static void
check_sched(struct thread_info *info, int size)
{
int max = 0, min = 999999;
for(int i = 0; i < THREAD_CNT; i++) {
int cur = g_thrd_info[i].evcnt;
for(int i = 0; i < size; i++) {
int cur = info[i].evcnt;
if (cur > max) {
max = cur;
}
@ -55,33 +61,36 @@ check_sched(void)
}
}
if ((max - min) > 1) {
#ifdef TEST_DEBUG
printf("READ_M: check_sched: max difference is %d\n", max - min);
#endif
return (max - min) <= 1;
abort();
}
}
static void
socket_pop()
static char
socket_pop(int sockfd)
{
char buf[1];
char buf;
/* Drain the read buffer, then make sure there are no more events. */
#ifdef TEST_DEBUG
printf("READ_M: popping the read buffer\n");
#endif
if (read(g_sockfd[0], &buf[0], 1) < 1)
if (read(sockfd, &buf, 1) < 1)
err(1, "read(2)");
return buf;
}
static void
socket_push()
socket_push(int sockfd, char ch)
{
#ifdef TEST_DEBUG
printf("READ_M: pushing to socket\n");
printf("READ_M: pushing to socket %d\n", sockfd);
#endif
if (write(g_sockfd[1], ".", 1) < 1) {
if (write(sockfd, &ch, 1) < 1) {
#ifdef TEST_DEBUG
printf("READ_M: write failed with %d\n", errno);
#endif
@ -94,32 +103,30 @@ static void*
test_socket_read_thrd(void* args)
{
struct thread_info *info = (struct thread_info *) args;
struct kevent kev;
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]);
kev.data = 1;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
kevent_cmp(&kev, kevent_get(g_kqfd));
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
socket_pop();
info->evcnt++;
dat = socket_pop(ret->ident);
free(ret);
if (dat == 'e')
break;
info->evcnt++;
/* signal the driver */
sem_post(&g_sem_driver);
if (g_end) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting...\n", info->tid);
#endif
break;
}
}
pthread_exit(0);
}
@ -127,14 +134,16 @@ static void
test_socket_read(void)
{
int error = 0;
const char *test_id = "[Multi]kevent(EVFILT_READ)";
test_begin(test_id);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
struct kevent kev;
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]);
const char *test_id = "[Multi]kevent(EVFILT_READ)";
sem_init(&g_sem_driver, 0, 0);
g_end = 0;
test_begin(test_id);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
@ -162,61 +171,211 @@ test_socket_read(void)
for(int i = 0; i < PACKET_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", i);
printf("READ_M: processing packet %d\n", i);
#endif
socket_push();
socket_push(g_sockfd[1], '.');
/* wait for thread events */
sem_wait(&g_sem_driver);
if (!check_sched()) {
#ifdef TEST_DEBUG
printf("READ_M: check_sched failed...\n");
#endif
g_end = 1;
error = 2;
break;
}
if ((i + THREAD_CNT) == PACKET_CNT - 1) {
g_end = 1;
}
check_sched(g_thrd_info, THREAD_CNT);
}
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
#endif
for(int i = 0; i < PACKET_CNT; i++) {
socket_push(g_sockfd[1], 'e');
}
for (int i = 0; i < THREAD_CNT; i++) {
pthread_join(g_thrd_info[i].thrd, NULL);
}
if (!error)
success();
else
err(error, "kevent");
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent delete failed with %d\n", errno);
#endif
err(1, "kevent_delete");
}
close(g_sockfd[0]);
close(g_sockfd[1]);
success();
}
/*
* Brutal test
*/
#define THREAD_BRUTE_CNT (32)
#define SOCK_BRUTE_CNT (64)
#define PACKET_BRUTE_CNT (10000)
#define THREAD_EXIT_PROB (50)
#define RAND_SLEEP (29)
#define RAND_SEND_SLEEP (13)
int brute_sockfd[SOCK_BRUTE_CNT][2];
struct thread_info brute_threadinfo[THREAD_BRUTE_CNT];
static void*
test_socket_brutal_worker(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
if ((rand() % 100) < THREAD_EXIT_PROB) {
pthread_mutex_lock(&info->lock);
#ifdef TEST_DEBUG
printf("READ_M: thread %d trying to fake crash. Can crash: %d\n", info->tid, info->can_crash);
#endif
if (info->can_crash) {
pthread_create(&info->thrd, NULL, test_socket_brutal_worker, info);
pthread_mutex_unlock(&info->lock);
free(ret);
pthread_exit(0);
}
pthread_mutex_unlock(&info->lock);
}
dat = socket_pop(ret->ident);
free(ret);
if (dat == 'e')
break;
info->evcnt++;
usleep(rand() % RAND_SLEEP);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting...\n", info->tid);
#endif
pthread_exit(0);
return NULL;
}
static void
test_socket_brutal()
{
struct kevent kev;
const char *test_id = "[Multi]kevent(brutal)";
test_begin(test_id);
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &brute_sockfd[i][0]) < 0) {
err(1, "kevent_socket");
}
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_add");
}
}
srand(time(NULL));
#ifdef TEST_DEBUG
printf("READ_M: creating %d threads...\n", THREAD_BRUTE_CNT);
#endif
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
brute_threadinfo[i].tid = i;
brute_threadinfo[i].evcnt = 0;
brute_threadinfo[i].can_crash = ((i % 10) != 0);
pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]);
}
sleep(3);
for(int i = 0; i < PACKET_BRUTE_CNT; i++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", i);
#endif
socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], '.');
usleep(rand() % RAND_SEND_SLEEP);
}
while (1) {
int sum = 0;
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
sum += brute_threadinfo[i].evcnt;
}
if (sum == PACKET_BRUTE_CNT) {
break;
}
#ifdef TEST_DEBUG
printf("READ_M: waiting for all packets to finish processing. Cur: %d Tgt: %d\n", sum, PACKET_BRUTE_CNT);
#endif
sleep(1);
}
/* shutdown the systems */
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
pthread_mutex_lock(&brute_threadinfo[i].lock);
brute_threadinfo[i].can_crash = 0;
pthread_mutex_unlock(&brute_threadinfo[i].lock);
}
for(int i = 0; i < THREAD_BRUTE_CNT; i++) {
socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], 'e');
}
for (int i = 0; i < THREAD_BRUTE_CNT; i++) {
pthread_join(brute_threadinfo[i].thrd, NULL);
}
for (int i = 0; i < SOCK_BRUTE_CNT; i++) {
EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &brute_sockfd[i][0]);
if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) {
err(1, "kevent_brutal_delete");
}
}
success();
}
void
test_evfilt_read_m()
{
/* Create a connected pair of full-duplex sockets for testing socket events */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
abort();
g_kqfd = kqueue();
int error = ioctl(g_kqfd, FKQMULTI);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: ioctl failed with %d\n", errno);
#endif
err(1, "ioctl");
}
test_socket_read();
test_socket_brutal();
close(g_kqfd);
close(g_sockfd[0]);
close(g_sockfd[1]);
}
}