diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index e44bc33eeb4d..981b6ed13bed 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -71,6 +71,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #ifdef KTRACE #include #endif @@ -232,7 +233,7 @@ static unsigned int kq_calloutmax = 4 * 1024; SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, &kq_calloutmax, 0, "Maximum number of callouts allocated for kqueue"); -#define KQ_DEBUG +#define KTR_KQ (KTR_SPARE5) #define KQ_LOCK(kq) do { \ mtx_lock(&(kq)->kq_lock); \ @@ -328,9 +329,7 @@ knote_enter_flux_ul(struct knote *kn) static void knote_enter_flux(struct knote *kn) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_enter_flux: %p\n", kn); -#endif + CTR2(KTR_KQ, "knote_enter_flux: %p flux: %d", kn, kn->kn_influx); KN_FLUX_OWNED(kn); MPASS(kn->kn_influx < INT_MAX); kn->kn_influx++; @@ -351,9 +350,7 @@ knote_leave_flux_ul(struct knote *kn) static bool knote_leave_flux(struct knote *kn) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_leave_flux: %p\n", kn); -#endif + CTR2(KTR_KQ, "knote_leave_flux: %p flux: %d", kn, kn->kn_influx); KN_FLUX_OWNED(kn); MPASS(kn->kn_influx > 0); kn->kn_influx--; @@ -444,9 +441,7 @@ filt_fileattach(struct knote *kn) static int kqueue_kqfilter(struct file *fp, struct knote *kn) { -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_kqfilter called for kn %p\n", kn); -#endif + CTR1(KTR_KQ, "kqueue_kqfilter called for kn %p", kn); struct kqueue *kq = kn->kn_fp->f_data; @@ -473,18 +468,23 @@ static int filt_kqueue(struct knote *kn, long hint) { struct kqueue *kq = kn->kn_fp->f_data; + struct kevq *kevq; -#ifdef KQ_DEBUG - printf("KQUEUE: filt_kqueue called for kn %p\n", kn); -#endif + CTR1(KTR_KQ, "filt_kqueue called for kn %p", kn); if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { return 0; } - kn->kn_data = kq->kq_kevq->kn_count; + kevq = TAILQ_FIRST(&kq->kq_kevqlist); + + if (kevq == NULL) { + return 0; + } else { + kn->kn_data = kevq->kn_count; return (kn->kn_data > 0); } +} /* XXX - move to kern_proc.c? */ static int @@ -571,9 +571,7 @@ filt_proc(struct knote *kn, long hint) struct proc *p; u_int event; -#ifdef KQ_DEBUG - printf("KQUEUE: filt_proc called for kn %p\n", kn); -#endif + CTR1(KTR_KQ, "KQUEUE: filt_proc called for kn %p", kn); p = kn->kn_ptr.p_proc; if (p == NULL) /* already activated, from attach filter */ @@ -635,12 +633,6 @@ knote_fork(struct knlist *list, struct thread *td, int pid) continue; } - /* - * The NOTE_TRACK case. In addition to the activation - * of the event, we need to register new events to - * track the child. Drop the locks in preparation for - * the call to kqueue_register(). - */ knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); @@ -648,24 +640,21 @@ knote_fork(struct knlist *list, struct thread *td, int pid) * The same as knote(), activate the event. */ if ((kn->kn_sfflags & NOTE_TRACK) == 0) { - error = kn->kn_fop->f_event(kn, NOTE_FORK); - KQ_UNLOCK(kq); - - if(error) + if(kn->kn_fop->f_event(kn, NOTE_FORK)) knote_activate(kn); KN_LEAVE_FLUX_WAKEUP(kn); + KQ_UNLOCK(kq); continue; } - KQ_UNLOCK(kq); - /* * The NOTE_TRACK case. In addition to the activation * of the event, we need to register new events to * track the child. Drop the locks in preparation for * the call to kqueue_register(). */ + KQ_UNLOCK(kq); list->kl_unlock(list->kl_lockarg); /* @@ -967,9 +956,7 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) static int filt_timer(struct knote *kn, long hint) { -#ifdef KQ_DEBUG - printf("KQUEUE: filt_timer called for kn %p\n", kn); -#endif + CTR1(KTR_KQ, "filt_timer called for kn %p", kn); return (kn->kn_data != 0); } @@ -1000,9 +987,7 @@ filt_userdetach(__unused struct knote *kn) static int filt_user(struct knote *kn, __unused long hint) { -#ifdef KQ_DEBUG - printf("KQUEUE: filt_user called for kn %p\n", kn); -#endif + CTR1(KTR_KQ, "KQUEUE: filt_user called for kn %p", kn); return (kn->kn_hookid); } @@ -1420,7 +1405,7 @@ kern_kevent_anonymous(struct thread *td, int nevents, kqueue_init(&kq); kevq_init(&kevq); - kq.kq_kevq = &kevq; + TAILQ_INSERT_HEAD(&kq.kq_kevqlist, &kevq, kq_e); kevq.kq = &kq; kq.kq_refcnt = 1; kevq.kevq_refcnt = 1; @@ -1438,9 +1423,7 @@ kqueue_add_filteropts(int filt, struct filterops *filtops) error = 0; if (filt > 0 || filt + EVFILT_SYSCOUNT < 0) { - printf( -"trying to add a filterop that is out of range: %d is beyond %d\n", - ~filt, EVFILT_SYSCOUNT); + CTR2(KTR_KQ, "trying to add a filterop that is out of range: %d is beyond %d", ~filt, EVFILT_SYSCOUNT); return EINVAL; } mtx_lock(&filterops_lock); @@ -1529,9 +1512,8 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct struct knlist *knl; int error, filt, event; int haskqglobal, filedesc_unlock; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X\n", kq, kevq, (int)kev->ident, kev->filter, kev->flags); -#endif + + CTR5(KTR_KQ, "kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X", kq, kevq, (int)kev->ident, kev->filter, kev->flags); if ((kev->flags & (EV_ENABLE | EV_DISABLE)) == (EV_ENABLE | EV_DISABLE)) return (EINVAL); @@ -1751,8 +1733,8 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct kn->kn_status |= KN_DISABLED; /* - * The user may changkhe initial EV_ADD, - * but doing so will kas already been + * The user may change some filter values after the initial EV_ADD, + * but doing so will not reset any filter which has already been * triggered. */ kn->kn_status |= KN_SCAN; @@ -1785,7 +1767,11 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct event = 0; KQ_LOCK(kq); + if (event) + kn->kn_status |= KN_ACTIVE; + if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == + KN_ACTIVE) knote_activate(kn); kn->kn_status &= ~KN_SCAN; @@ -1822,14 +1808,26 @@ kevq_thred_destroy(struct kevq_thred *kevq_th) { void kevq_thred_drain(struct kevq_thred *kevq_th) { + /* [Solved] Race here on kevq. Consider: + * Thread 1: Grabs KEVQ ptr for thread 2 from kevq_list in knote_sched + * Thread 2: crashes and calls kevq_drain, destorys kevq + * Thread 1: KEVQ_ACQUIRE((already freed)) + * Maybe require holding the KQ lock when deleting to make sure nothing queries + * the kevq_list during deletion + + * this is solved by holding the KQ lock while scheduling + * thread 1 either accesses kevq_list (which requires KQ lock) before or after thread 2's kevq is deleted from the KQ + * if before, then thread 2's kevq cannot be freed because kevq_drain deletes from kevq_list then frees kevq + * if after, then thread 2's kevq cannot be found in kevq_list. + */ struct kevq *kevq; int error; error = 0; while((kevq = TAILQ_FIRST(&kevq_th->kevq_tq)) != NULL) { - // multithreaded mode, acquire kevq ref cnt error = kevq_acquire(kevq); if (!error) { + CTR1(KTR_KQ, "kevq_thred_drain: draining kevq %p\n", kevq); kevq_drain(kevq); } } @@ -1846,13 +1844,11 @@ kevq_init(struct kevq *kevq) { static void kevq_release(struct kevq* kevq, int locked) { -#ifdef KQ_DEBUG - printf("KQUEUE: Releasing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); -#endif if (locked) KEVQ_OWNED(kevq); else KEVQ_LOCK(kevq); + CTR2(KTR_KQ, "releasing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt); kevq->kevq_refcnt--; if (kevq->kevq_refcnt == 1) wakeup(&kevq->kevq_refcnt); @@ -1863,13 +1859,11 @@ kevq_release(struct kevq* kevq, int locked) static int kevq_acquire(struct kevq *kevq) { -#ifdef KQ_DEBUG - printf("KQUEUE: Referencing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); -#endif KEVQ_NOTOWNED(kevq); int error; error = 0; KEVQ_LOCK(kevq); + CTR2(KTR_KQ, "referencing kevq %p (refcnt = %d)", kevq, kevq->kevq_refcnt); if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { error = EINVAL; } else { @@ -1909,9 +1903,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) thread_lock(td); if (td->td_kevq_thred == NULL) { td->td_kevq_thred = kevq_th; -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_acquire_kq(M): allocated kevq_th %p for thread %d\n", kevq_th, td->td_tid); -#endif + CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq_th %p for thread %d", kevq_th, td->td_tid); } else { to_free = kevq_th; kevq_th = td->td_kevq_thred; @@ -1941,13 +1933,12 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) alloc_kevq->kq = kq; alloc_kevq->kevq_th = kevq_th; -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", alloc_kevq, td->td_tid); -#endif + CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq %p for thread %d", alloc_kevq, td->td_tid); KEVQ_TH_LOCK(kevq_th); kevq = kevqlist_find(kevq_list, kq); - + /* TODO: probably don't need to re-check unless a thread can asynchronously call + * kevent (signal handler?) */ if (kevq == NULL) { kevq = alloc_kevq; // insert kevq to the kevq_th hash table @@ -1972,27 +1963,28 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) KASSERT(kevq != NULL, ("kevq isn't allocated.")); } else { - if (kq->kq_kevq == NULL) { - kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_acquire_kq(S): allocated kevq %p for kq %p\n", kevq, kq); -#endif - kevq_init(kevq); - kevq->kq = kq; + // prob don't need this lock depending on whether TAILQ_FIRST is atomic + KQ_LOCK(kq); + kevq = TAILQ_FIRST(&kq->kq_kevqlist); + KQ_UNLOCK(kq); + if (kevq == NULL) { + alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); + CTR2(KTR_KQ, "kevq_acquire_kq(S): allocated kevq %p for kq %p", alloc_kevq, kq); + kevq_init(alloc_kevq); + alloc_kevq->kq = kq; KQ_LOCK(kq); - if (kq->kq_kevq == NULL) { - kq->kq_kevq = kevq; + if ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) == NULL) { + TAILQ_INSERT_HEAD(&kq->kq_kevqlist, alloc_kevq, kq_e); + kevq = alloc_kevq; } else { - to_free = kevq; + to_free = alloc_kevq; } KQ_UNLOCK(kq); if (to_free != NULL) { free(to_free, M_KQUEUE); } - } else { - kevq = kq->kq_kevq; } } @@ -2016,10 +2008,10 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp) *kqp = kq; KQ_LOCK(kq); + if (((kq->kq_state) & KQ_CLOSING) != 0) { + return (EBADF); + } if ((kq->kq_state & KQ_FLAG_INIT) == 0) { - /* Mark the kqueue as initialized - TODO: Why not make some locks separate field so we - don't have short critical sections like this */ kq->kq_state |= KQ_FLAG_INIT; } kq->kq_refcnt++; @@ -2220,6 +2212,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, } else asbt = 0; marker = knote_alloc(M_WAITOK); + CTR2(KTR_KQ, "kqueue_scan: td %d allocated marker %p", td->td_tid, marker); knote_xinit(marker); marker->kn_status = KN_MARKER; KEVQ_LOCK(kevq); @@ -2231,22 +2224,16 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, retry: kevp = keva; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: kevq %p has %d events\n", kevq, kevq->kn_count); -#endif + CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count); if (kevq->kn_count == 0) { if (asbt == -1) { error = EWOULDBLOCK; } else { kevq->kevq_state |= KEVQ_SLEEP; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: thread %d waiting on kevq %p for events\n", td->td_tid, kevq); -#endif + CTR2(KTR_KQ, "kqueue_scan: td %d waiting on kevq %p for events", td->td_tid, kevq); error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH, "kqread", asbt, rsbt, C_ABSOLUTE); -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: thread %d wokeup on kevq %p for events\n", td->td_tid, kevq); -#endif + CTR2(KTR_KQ, "kqueue_scan: td %d wokeup on kevq %p for events", td->td_tid, kevq); } if (error == 0) goto retry; @@ -2275,16 +2262,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, } kn->kn_fluxwait = 1; KN_FLUX_UNLOCK(kn); + CTR3(KTR_KQ, "kqueue_scan: td %d fluxwait on kn %p marker %p", td->td_tid, kn, marker); error = msleep(kn, &kevq->lock, PSOCK, - "kqflxwt", 0); + "kevqflxwt3", 0); + + CTR3(KTR_KQ, "kqueue_scan: td %d fluxwait WAKEUP kn %p marker %p", td->td_tid, kn, marker); continue; } /* Now we have exclusive access to kn */ TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: thread %d on kevq %p dequeued knote %p\n", td->td_tid, kevq, kn); -#endif + CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p dequeued knote %p", td->td_tid, kevq, kn); if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { kn->kn_status &= ~KN_QUEUED; kevq->kn_count--; @@ -2295,6 +2283,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, /* We are dequeuing our marker, wakeup threads waiting on it */ knote_flux_wakeup(kn); KN_FLUX_UNLOCK(kn); + CTR2(KTR_KQ, "kqueue_scan: td %d MARKER WAKEUP %p", td->td_tid, kn); if (count == maxevents) { goto retry; } @@ -2350,9 +2339,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, kevq->kn_count--; kn_list_unlock(knl); influx = 1; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: kn %p not valid anymore for kevq %p\n", kn, kevq); -#endif + CTR3(KTR_KQ, "kqueue_scan: kn %p not valid anymore for kevq %p, td %d", kn, kevq, td->td_tid); continue; } touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); @@ -2376,9 +2363,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); kevq->kn_count--; } else { -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_scan: requeued kn %p to kevq %p\n", kn, kevq); -#endif + CTR2(KTR_KQ, "kqueue_scan: requeued kn %p to kevq %p", kn, kevq); TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); } @@ -2394,10 +2379,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, count--; if (nkev == KQ_NEVENTS) { - KEVQ_UNLOCK(kevq); - influx = 0; knote_flux_wakeup_ul(kn); + KEVQ_UNLOCK(kevq); error = k_ops->k_copyout(k_ops->arg, keva, nkev); nkev = 0; @@ -2410,12 +2394,17 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, TAILQ_REMOVE(&kevq->kn_head, marker, kn_tqe); done: KEVQ_OWNED(kevq); - KEVQ_UNLOCK(kevq); if (kn != NULL) { knote_flux_wakeup_ul(kn); } + if (marker != NULL) { + knote_flux_wakeup_ul(marker); + } + + KEVQ_UNLOCK(kevq); + CTR2(KTR_KQ, "kqueue_scan: knote_free marker %p td %d", marker, td->td_tid); knote_free(marker); done_nl: KEVQ_NOTOWNED(kevq); @@ -2473,17 +2462,13 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, int error = 0; kq = fp->f_data; -#ifdef KQ_DEBUG - printf("KQUEUE: ioctl: received: kq %p cmd: 0x%lx", kq, cmd); -#endif + CTR2(KTR_KQ, "kqueue_ioctl: received: kq %p cmd: 0x%lx", kq, cmd); switch (cmd) { case FKQMULTI: if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { error = (EINVAL); } else { -#ifdef KQ_DEBUG - printf("KQUEUE: ioctl: multi flag set for kq %p", kq); -#endif + CTR1(KTR_KQ, "kqueue_ioctl: multi flag set for kq %p", kq); KQ_LOCK(kq); kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); KQ_UNLOCK(kq); @@ -2509,27 +2494,24 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred, if ((error = kqueue_acquire_both(fp, td, &kq, &kevq))) return POLLERR; + KQ_LOCK(kq); if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) { - // TODO: support select? really need it in multithreaded mode? - KQ_LOCK(kq); - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { - revents = 0; - } else { - if (events & (POLLIN | POLLRDNORM)) { - if (kq->kq_kevq->kn_count) { - revents |= events & (POLLIN | POLLRDNORM); - } else { - selrecord(td, &kq->kq_sel); - if (SEL_WAITING(&kq->kq_sel)) - kq->kq_state |= KQ_SEL; - } + revents = 0; + } else { + if (events & (POLLIN | POLLRDNORM)) { + if (kevq->kn_count) { + revents |= events & (POLLIN | POLLRDNORM); + } else { + selrecord(td, &kq->kq_sel); + if (SEL_WAITING(&kq->kq_sel)) + kq->kq_state |= KQ_SEL; } } - - kqueue_release(kq, 1); - KQ_UNLOCK(kq); - kevq_release(kevq, 0); } + + kqueue_release(kq, 1); + KQ_UNLOCK(kq); + kevq_release(kevq, 0); return (revents); } @@ -2554,9 +2536,7 @@ kqueue_stat(struct file *fp, struct stat *st, struct ucred *active_cred, static void kevq_destroy(struct kevq *kevq) { -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_destroy for %p \n", kevq); -#endif + CTR1(KTR_KQ, "kevq_destroy for %p", kevq); free(kevq, M_KQUEUE); } @@ -2569,9 +2549,7 @@ kevq_drain(struct kevq *kevq) struct kqueue *kq; struct knote *kn; struct kevqlist *kevq_list; -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_drain for %p (refcnt = %d) with %d knotes\n", kevq, kevq->kevq_refcnt, kevq->kn_count); -#endif + CTR3(KTR_KQ, "kevq_drain for %p (refcnt = %d) with %d knotes", kevq, kevq->kevq_refcnt, kevq->kn_count); kq = kevq->kq; KQ_NOTOWNED(kq); @@ -2594,19 +2572,21 @@ kevq_drain(struct kevq *kevq) KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!")); /* drain all knotes on the kevq */ - TAILQ_FOREACH(kn, &kevq->kn_head, kn_tqe) { - + while ((kn = TAILQ_FIRST(&kevq->kn_head)) != NULL) { retry: KEVQ_OWNED(kevq); KN_FLUX_LOCK(kn); /* Wait for kn to stablize */ if (kn_in_flux(kn)) { kn->kn_fluxwait = 1; + CTR2(KTR_KQ, "kevq_drain %p fluxwait knote %p", kevq, kn); KN_FLUX_UNLOCK(kn); msleep(kn, &kevq->lock, PSOCK, "kevqclose2", 0); goto retry; } + CTR2(KTR_KQ, "kevq_drain %p draining knote %p", kevq, kn); + KN_FLUX_OWNED(kn); KASSERT(!kn_in_flux(kn), ("knote is still influx")); @@ -2614,7 +2594,7 @@ kevq_drain(struct kevq *kevq) KN_FLUX_UNLOCK(kn); knote_dequeue(kn); - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) { + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING && (kn->kn_status & KN_MARKER) == 0) { KEVQ_UNLOCK(kevq); knote_activate_ul(kn); KEVQ_LOCK(kevq); @@ -2627,6 +2607,9 @@ kevq_drain(struct kevq *kevq) KEVQ_UNLOCK(kevq); KQ_LOCK(kq); + if (kq->kq_ckevq == kevq) { + kq->kq_ckevq = TAILQ_NEXT(kevq, kq_e); + } TAILQ_REMOVE(&kq->kq_kevqlist, kevq, kq_e); KQ_UNLOCK(kq); @@ -2653,9 +2636,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) int i; int error; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_drain on %p. args kevq %p\n", kq, kevq); -#endif + CTR2(KTR_KQ, "kqueue_drain on %p. args kevq %p", kq, kevq); KQ_LOCK(kq); @@ -2800,9 +2781,7 @@ kqueue_close(struct file *fp, struct thread *td) crfree(kq->kq_cred); free(kq, M_KQUEUE); fp->f_data = NULL; -#ifdef KQ_DEBUG - printf("KQUEUE: kqueue_closed for %p.\n", kq); -#endif + CTR1(KTR_KQ, "kqueue_close: %p.", kq); return (0); } @@ -2820,8 +2799,8 @@ kevq_wakeup(struct kevq* kevq) KEVQ_OWNED(kevq); if ((kevq->kevq_state & KEVQ_SLEEP) == KEVQ_SLEEP) { kevq->kevq_state &= ~KEVQ_SLEEP; - wakeup(kevq); } + wakeup(kevq); } static void @@ -2871,9 +2850,7 @@ knote(struct knlist *list, long hint, int lockflags) * or other threads could remove events. */ SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { -#ifdef KQ_DEBUG - printf("KNOTE: knote() scanning kn %p\n", kn); -#endif + CTR1(KTR_KQ, "knote() scanning kn %p", kn); KN_FLUX_LOCK(kn); if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { /* @@ -2923,10 +2900,8 @@ static void knote_flux_wakeup(struct knote *kn) { KN_FLUX_OWNED(kn); - if ((kn)->kn_fluxwait) { - (kn)->kn_fluxwait = 0; - wakeup((kn)); - } + CTR1(KTR_KQ, "waking up kn %p", kn); + wakeup(kn); } static void @@ -2948,9 +2923,7 @@ knote_activate(struct knote *kn) struct kqueue *kq; kq = kn->kn_kq; -#ifdef KQ_DEBUG - printf("KQUEUE: knote_activate: kn %p\n", kn); -#endif + CTR1(KTR_KQ, "knote_activate: kn %p", kn); KQ_OWNED(kq); KN_FLUX_NOTOWNED(kn); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); @@ -2970,9 +2943,7 @@ knote_activate(struct knote *kn) void knlist_add(struct knlist *knl, struct knote *kn, int islocked) { -#ifdef KQ_DEBUG - printf("KNLIST: knlist_add kn %p\n", kn); -#endif + CTR1(KTR_KQ, "knlist_add kn %p", kn); KNL_ASSERT_LOCK(knl, islocked); KQ_NOTOWNED(kn->kn_kq); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); @@ -3270,13 +3241,12 @@ knote_fdclose(struct thread *td, int fd) knote_flux_wakeup(kn); kn->kn_fluxwait = 1; KN_FLUX_UNLOCK(kn); - msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt", 0); + msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt4", 0); goto again; } knote_enter_flux(kn); KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); - /* WTF ? influx should be 0? */ influx = 1; knote_drop(kn, td); KQ_LOCK(kq); @@ -3344,14 +3314,15 @@ knote_drop_detached(struct knote *kn, struct thread *td) if (!SLIST_EMPTY(list)) SLIST_REMOVE(list, kn, knote, kn_link); - KQ_UNLOCK(kq); - if (kn->kn_status & KN_QUEUED) { KEVQ_LOCK(kevq); knote_dequeue(kn); KEVQ_UNLOCK(kevq); } + KN_LEAVE_FLUX_WAKEUP(kn); + KQ_UNLOCK(kq); + if (kn->kn_fop->f_isfd) { fdrop(kn->kn_fp, td); kn->kn_fp = NULL; @@ -3373,13 +3344,11 @@ knote_sched(struct knote *kn) KASSERT(kn_in_flux(kn), ("kn not in flux")); // reschedule knotes to available threads - if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_sched(M) affinity set: kn %p \n", kn); -#endif + CTR1(KTR_KQ, "knote_sched: affinity set: kn %p", kn); if ((kn->kn_org_kevq->kevq_state & KEVQ_RDY) != 0) { next_kevq = kn->kn_org_kevq; + KEVQ_LOCK(next_kevq); } else { next_kevq = NULL; } @@ -3389,9 +3358,7 @@ knote_sched(struct knote *kn) if (next_kevq == NULL) { next_kevq = TAILQ_FIRST(&kq->kq_kevqlist); if (next_kevq == NULL) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_sched(M) no kevqs exist for queueing kn %p, discarding... \n", kn); -#endif + CTR1(KTR_KQ, "knote_sched: no kevqs exist for queueing kn %p, discarding...", kn); break; } } else { @@ -3409,28 +3376,20 @@ knote_sched(struct knote *kn) break; } + KEVQ_UNLOCK(next_kevq); + if (next_kevq == kq->kq_ckevq) { // if the previous "if" didn't break // we have traversed the list once and the current kevq is closing // we have no queue to queue the knote -#ifdef KQ_DEBUG - printf("KQUEUE: knote_sched(M) no open kevqs for queueing kn %p, discarding... \n", kn); -#endif + CTR1(KTR_KQ, "knote_sched: no open kevqs for queueing kn %p, discarding...", kn); next_kevq = NULL; break; } } } - } else { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_sched(S): kn %p to kevq %p\n", kn, kq->kq_kevq); -#endif - next_kevq = kq->kq_kevq; - } -#ifdef KQ_DEBUG - printf("KQUEUE: knote_sched(M) next kevq %p for kn %p \n", next_kevq, kn); -#endif + CTR2(KTR_KQ, "knote_sched: next kevq %p for kn %p", next_kevq, kn); if (next_kevq != NULL) { KEVQ_OWNED(next_kevq); @@ -3445,9 +3404,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq) struct kqueue *kq; kq = kn->kn_kq; -#ifdef KQ_DEBUG - printf("KQUEUE: knote_enqueue: kn %p to kevq %p\n", kn, kevq); -#endif + CTR2(KTR_KQ, "knote_enqueue: kn %p to kevq %p", kn, kevq); KEVQ_OWNED(kevq); @@ -3476,9 +3433,7 @@ knote_dequeue(struct knote *kn) KEVQ_OWNED(kevq); -#ifdef KQ_DEBUG - printf("KQUEUE: knote_dequeue: kn %p from kevq %p\n", kn, kevq); -#endif + CTR2(KTR_KQ, "knote_dequeue: kn %p from kevq %p", kn, kevq); KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued")); TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); @@ -3499,16 +3454,15 @@ SYSINIT(knote, SI_SUB_PSEUDO, SI_ORDER_ANY, knote_init, NULL); static struct knote * knote_alloc(int mflag) { - - return (uma_zalloc(knote_zone, mflag | M_ZERO)); + struct knote *ret = uma_zalloc(knote_zone, mflag | M_ZERO); + CTR1(KTR_KQ, "knote_alloc: allocating knote %p", ret); + return ret; } static void knote_free(struct knote *kn) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_free: kn %p\n", kn); -#endif + CTR1(KTR_KQ, "knote_free: kn %p", kn); uma_zfree(knote_zone, kn); } diff --git a/sys/kern/kern_thr.c b/sys/kern/kern_thr.c index dd8e2c8d90af..a55d7cc4019f 100644 --- a/sys/kern/kern_thr.c +++ b/sys/kern/kern_thr.c @@ -331,6 +331,12 @@ kern_thr_exit(struct thread *td) p = td->td_proc; + /* + * Release the event queues + */ + if (td->td_kevq_thred != NULL) + kevq_thred_drain(td->td_kevq_thred); + /* * If all of the threads in a process call this routine to * exit (e.g. all threads call pthread_exit()), exactly one diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c index 20dedb3b9518..4677d2778dc4 100644 --- a/sys/kern/kern_thread.c +++ b/sys/kern/kern_thread.c @@ -68,6 +68,10 @@ __FBSDID("$FreeBSD$"); #include #include +#ifdef SMP +extern struct cpu_group *cpu_top; /* CPU topology */ +#endif + /* * Asserts below verify the stability of struct thread and struct proc * layout, as exposed by KBI to modules. On head, the KBI is allowed @@ -445,9 +449,6 @@ thread_free(struct thread *td) cpu_thread_free(td); if (td->td_kstack != 0) vm_thread_dispose(td); - if (td->td_kevq_thred != NULL) { - kevq_thred_drain(td->td_kevq_thred); - } callout_drain(&td->td_slpcallout); uma_zfree(thread_zone, td); } diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index f9920417ee53..71f2573b2506 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -75,12 +75,8 @@ struct kqueue { struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ - /* only-set: in multithreaded mode */ TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */ struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */ - /* only-set: in single threaded mode */ - struct kevq *kq_kevq; - /* End only-set */ struct task kq_task; struct ucred *kq_cred; }; diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c index 7aaf0c23aa5a..b5a45c51c9db 100644 --- a/tests/sys/kqueue/libkqueue/read_m.c +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -22,31 +22,37 @@ #include #include -#define THREAD_CNT (100) -#define PACKET_CNT (5000) #define FKQMULTI _IO('f', 89) #define TEST_DEBUG struct thread_info { pthread_t thrd; + int can_crash; + pthread_mutex_t lock; int evcnt; int tid; }; +/* + * Read test + */ + +#define THREAD_CNT (32) +#define PACKET_CNT (1000) + int g_kqfd; int g_sockfd[2]; -int g_end; struct thread_info g_thrd_info[THREAD_CNT]; /* Test threads signals this upon receiving events */ sem_t g_sem_driver; -static int -check_sched(void) +static void +check_sched(struct thread_info *info, int size) { int max = 0, min = 999999; - for(int i = 0; i < THREAD_CNT; i++) { - int cur = g_thrd_info[i].evcnt; + for(int i = 0; i < size; i++) { + int cur = info[i].evcnt; if (cur > max) { max = cur; } @@ -55,33 +61,36 @@ check_sched(void) } } + if ((max - min) > 1) { #ifdef TEST_DEBUG printf("READ_M: check_sched: max difference is %d\n", max - min); #endif - - return (max - min) <= 1; + abort(); + } } -static void -socket_pop() +static char +socket_pop(int sockfd) { - char buf[1]; + char buf; /* Drain the read buffer, then make sure there are no more events. */ #ifdef TEST_DEBUG printf("READ_M: popping the read buffer\n"); #endif - if (read(g_sockfd[0], &buf[0], 1) < 1) + if (read(sockfd, &buf, 1) < 1) err(1, "read(2)"); + + return buf; } static void -socket_push() +socket_push(int sockfd, char ch) { #ifdef TEST_DEBUG - printf("READ_M: pushing to socket\n"); + printf("READ_M: pushing to socket %d\n", sockfd); #endif - if (write(g_sockfd[1], ".", 1) < 1) { + if (write(sockfd, &ch, 1) < 1) { #ifdef TEST_DEBUG printf("READ_M: write failed with %d\n", errno); #endif @@ -94,32 +103,30 @@ static void* test_socket_read_thrd(void* args) { struct thread_info *info = (struct thread_info *) args; - - struct kevent kev; - EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]); - kev.data = 1; + char dat; + struct kevent *ret; while (1) { #ifdef TEST_DEBUG printf("READ_M: thread %d waiting for events\n", info->tid); #endif - kevent_cmp(&kev, kevent_get(g_kqfd)); + ret = kevent_get(g_kqfd); #ifdef TEST_DEBUG printf("READ_M: thread %d woke up\n", info->tid); #endif - socket_pop(); - info->evcnt++; + dat = socket_pop(ret->ident); + free(ret); + + if (dat == 'e') + break; + + info->evcnt++; + /* signal the driver */ sem_post(&g_sem_driver); - - if (g_end) { -#ifdef TEST_DEBUG - printf("READ_M: thread %d exiting...\n", info->tid); -#endif - break; - } } + pthread_exit(0); } @@ -127,14 +134,16 @@ static void test_socket_read(void) { int error = 0; + const char *test_id = "[Multi]kevent(EVFILT_READ)"; + test_begin(test_id); + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0) + err(1, "kevent_read socket"); + struct kevent kev; EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]); - const char *test_id = "[Multi]kevent(EVFILT_READ)"; sem_init(&g_sem_driver, 0, 0); - g_end = 0; - - test_begin(test_id); error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); @@ -162,61 +171,211 @@ test_socket_read(void) for(int i = 0; i < PACKET_CNT; i++) { #ifdef TEST_DEBUG - printf("READ_M: processing packet %d\n", i); + printf("READ_M: processing packet %d\n", i); #endif - socket_push(); + socket_push(g_sockfd[1], '.'); /* wait for thread events */ sem_wait(&g_sem_driver); - if (!check_sched()) { -#ifdef TEST_DEBUG - printf("READ_M: check_sched failed...\n"); -#endif - g_end = 1; - error = 2; - break; - } - - if ((i + THREAD_CNT) == PACKET_CNT - 1) { - g_end = 1; - } + check_sched(g_thrd_info, THREAD_CNT); } - /* shutdown the systems */ #ifdef TEST_DEBUG printf("READ_M: finished testing, system shutting down...\n"); -#endif +#endif + for(int i = 0; i < PACKET_CNT; i++) { + socket_push(g_sockfd[1], 'e'); + } + for (int i = 0; i < THREAD_CNT; i++) { pthread_join(g_thrd_info[i].thrd, NULL); } - if (!error) - success(); - else - err(error, "kevent"); + EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]); + + error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); + + if (error == -1) { +#ifdef TEST_DEBUG + printf("READ_M: kevent delete failed with %d\n", errno); +#endif + err(1, "kevent_delete"); + } + + close(g_sockfd[0]); + close(g_sockfd[1]); + + success(); } +/* + * Brutal test + */ + +#define THREAD_BRUTE_CNT (32) +#define SOCK_BRUTE_CNT (64) +#define PACKET_BRUTE_CNT (10000) +#define THREAD_EXIT_PROB (50) + +#define RAND_SLEEP (29) +#define RAND_SEND_SLEEP (13) + + +int brute_sockfd[SOCK_BRUTE_CNT][2]; +struct thread_info brute_threadinfo[THREAD_BRUTE_CNT]; + +static void* +test_socket_brutal_worker(void* args) +{ + struct thread_info *info = (struct thread_info *) args; + char dat; + struct kevent *ret; + + while (1) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d waiting for events\n", info->tid); +#endif + ret = kevent_get(g_kqfd); +#ifdef TEST_DEBUG + printf("READ_M: thread %d woke up\n", info->tid); +#endif + + if ((rand() % 100) < THREAD_EXIT_PROB) { + pthread_mutex_lock(&info->lock); +#ifdef TEST_DEBUG + printf("READ_M: thread %d trying to fake crash. Can crash: %d\n", info->tid, info->can_crash); +#endif + if (info->can_crash) { + pthread_create(&info->thrd, NULL, test_socket_brutal_worker, info); + pthread_mutex_unlock(&info->lock); + free(ret); + pthread_exit(0); + } + pthread_mutex_unlock(&info->lock); + } + + dat = socket_pop(ret->ident); + free(ret); + + if (dat == 'e') + break; + + info->evcnt++; + + usleep(rand() % RAND_SLEEP); + } +#ifdef TEST_DEBUG + printf("READ_M: thread %d exiting...\n", info->tid); +#endif + pthread_exit(0); + + return NULL; +} + +static void +test_socket_brutal() +{ + struct kevent kev; + + const char *test_id = "[Multi]kevent(brutal)"; + + test_begin(test_id); + + for (int i = 0; i < SOCK_BRUTE_CNT; i++) { + + /* Create a connected pair of full-duplex sockets for testing socket events */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &brute_sockfd[i][0]) < 0) { + err(1, "kevent_socket"); + } + + + EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_ADD, 0, 0, &brute_sockfd[i][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_brutal_add"); + } + } + + srand(time(NULL)); + +#ifdef TEST_DEBUG + printf("READ_M: creating %d threads...\n", THREAD_BRUTE_CNT); +#endif + for (int i = 0; i < THREAD_BRUTE_CNT; i++) { + brute_threadinfo[i].tid = i; + brute_threadinfo[i].evcnt = 0; + brute_threadinfo[i].can_crash = ((i % 10) != 0); + pthread_create(&brute_threadinfo[i].thrd, NULL, test_socket_brutal_worker, &brute_threadinfo[i]); + } + + sleep(3); + + for(int i = 0; i < PACKET_BRUTE_CNT; i++) { +#ifdef TEST_DEBUG + printf("READ_M: processing packet %d\n", i); +#endif + socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], '.'); + usleep(rand() % RAND_SEND_SLEEP); + } + + while (1) { + int sum = 0; + for (int i = 0; i < THREAD_BRUTE_CNT; i++) { + sum += brute_threadinfo[i].evcnt; + } + + if (sum == PACKET_BRUTE_CNT) { + break; + } + +#ifdef TEST_DEBUG + printf("READ_M: waiting for all packets to finish processing. Cur: %d Tgt: %d\n", sum, PACKET_BRUTE_CNT); +#endif + sleep(1); + } + + /* shutdown the systems */ +#ifdef TEST_DEBUG + printf("READ_M: finished testing, system shutting down...\n"); +#endif + for (int i = 0; i < THREAD_BRUTE_CNT; i++) { + pthread_mutex_lock(&brute_threadinfo[i].lock); + brute_threadinfo[i].can_crash = 0; + pthread_mutex_unlock(&brute_threadinfo[i].lock); + } + + for(int i = 0; i < THREAD_BRUTE_CNT; i++) { + socket_push(brute_sockfd[rand() % SOCK_BRUTE_CNT][1], 'e'); + } + + for (int i = 0; i < THREAD_BRUTE_CNT; i++) { + pthread_join(brute_threadinfo[i].thrd, NULL); + } + + for (int i = 0; i < SOCK_BRUTE_CNT; i++) { + EV_SET(&kev, brute_sockfd[i][0], EVFILT_READ, EV_DELETE, 0, 0, &brute_sockfd[i][0]); + + if (kevent(g_kqfd, &kev, 1, NULL, 0, NULL) == -1) { + err(1, "kevent_brutal_delete"); + } + } + + success(); +} + void test_evfilt_read_m() { - /* Create a connected pair of full-duplex sockets for testing socket events */ - if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0) - abort(); - g_kqfd = kqueue(); int error = ioctl(g_kqfd, FKQMULTI); if (error == -1) { -#ifdef TEST_DEBUG - printf("READ_M: ioctl failed with %d\n", errno); -#endif err(1, "ioctl"); } test_socket_read(); + test_socket_brutal(); close(g_kqfd); - close(g_sockfd[0]); - close(g_sockfd[1]); -} +} \ No newline at end of file