diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 5a9d516df898..e44bc33eeb4d 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -99,18 +99,32 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF); TASKQUEUE_DEFINE_THREAD(kqueue_ctx); +static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq); +static void kevq_thred_init(struct kevq_thred *kevq_th); +static void kevq_thred_destroy(struct kevq_thred *kevq_th); +static void kevq_wakeup(struct kevq* kevq); +static void kevq_init(struct kevq *kevq); +static void kevq_release(struct kevq* kevq, int locked); +static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp); +static void kevq_destroy(struct kevq *kevq); +static int kevq_acquire(struct kevq *kevq); +void kevq_drain(struct kevq *kevq); + +static void knote_xinit(struct knote *kn); + static int kevent_copyout(void *arg, struct kevent *kevp, int count); static int kevent_copyin(void *arg, struct kevent *kevp, int count); -static int kqueue_register(struct kqueue *kq, struct kevent *kev, - struct thread *td, int mflag); +static int kqueue_register(struct kqueue *kq, struct kevq *kevq, + struct kevent *kev, struct thread *td, int mflag); static int kqueue_acquire(struct file *fp, struct kqueue **kqp); +static int kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp); static void kqueue_release(struct kqueue *kq, int locked); static void kqueue_destroy(struct kqueue *kq); -static void kqueue_drain(struct kqueue *kq, struct thread *td); +static void kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td); static int kqueue_expand(struct kqueue *kq, struct filterops *fops, uintptr_t ident, int mflag); static void kqueue_task(void *arg, int pending); -static int kqueue_scan(struct kqueue *kq, int maxevents, +static int kqueue_scan(struct kevq *kq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *timeout, struct kevent *keva, struct thread *td); @@ -144,14 +158,23 @@ static struct fileops kqueueops = { .fo_fill_kinfo = kqueue_fill_kinfo, }; +static bool knote_leave_flux_ul(struct knote *kn); +static bool knote_leave_flux(struct knote *kn); +static void knote_enter_flux(struct knote *kn); +static void knote_enter_flux_ul(struct knote *kn); +static void knote_flux_wakeup_ul(struct knote *kn); +static void knote_flux_wakeup(struct knote *kn); +static void knote_activate_ul(struct knote *kn); +static void knote_activate(struct knote *kn); static int knote_attach(struct knote *kn, struct kqueue *kq); static void knote_drop(struct knote *kn, struct thread *td); static void knote_drop_detached(struct knote *kn, struct thread *td); -static void knote_enqueue(struct knote *kn); +static void knote_enqueue(struct knote *kn, struct kevq *kevq); static void knote_dequeue(struct knote *kn); static void knote_init(void); static struct knote *knote_alloc(int mflag); static void knote_free(struct knote *kn); +static void knote_sched(struct knote *kn); static void filt_kqdetach(struct knote *kn); static int filt_kqueue(struct knote *kn, long hint); @@ -209,41 +232,57 @@ static unsigned int kq_calloutmax = 4 * 1024; SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, &kq_calloutmax, 0, "Maximum number of callouts allocated for kqueue"); -/* XXX - ensure not influx ? */ -#define KNOTE_ACTIVATE(kn, islock) do { \ - if ((islock)) \ - mtx_assert(&(kn)->kn_kq->kq_lock, MA_OWNED); \ - else \ - KQ_LOCK((kn)->kn_kq); \ - (kn)->kn_status |= KN_ACTIVE; \ - if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) \ - knote_enqueue((kn)); \ - if (!(islock)) \ - KQ_UNLOCK((kn)->kn_kq); \ -} while(0) +#define KQ_DEBUG + #define KQ_LOCK(kq) do { \ mtx_lock(&(kq)->kq_lock); \ } while (0) -#define KQ_FLUX_WAKEUP(kq) do { \ - if (((kq)->kq_state & KQ_FLUXWAIT) == KQ_FLUXWAIT) { \ - (kq)->kq_state &= ~KQ_FLUXWAIT; \ - wakeup((kq)); \ - } \ +#define KN_FLUX_LOCK(kn) do { \ + mtx_lock(&(kn)->kn_fluxlock); \ } while (0) -#define KQ_UNLOCK_FLUX(kq) do { \ - KQ_FLUX_WAKEUP(kq); \ - mtx_unlock(&(kq)->kq_lock); \ +#define KEVQ_TH_LOCK(kevqth) do { \ + mtx_lock(&(kevqth)->lock); \ +} while (0) +#define KEVQ_LOCK(kevq) do { \ + mtx_lock(&(kevq)->lock); \ } while (0) #define KQ_UNLOCK(kq) do { \ mtx_unlock(&(kq)->kq_lock); \ } while (0) +#define KN_FLUX_UNLOCK(kn) do { \ + mtx_unlock(&(kn)->kn_fluxlock); \ +} while (0) +#define KN_LEAVE_FLUX_WAKEUP(kn) do { \ + KN_FLUX_NOTOWNED((kn)); \ + KN_FLUX_LOCK((kn)); \ + knote_leave_flux((kn)); \ + knote_flux_wakeup((kn)); \ + KN_FLUX_UNLOCK((kn)); \ +} while(0) +#define KEVQ_TH_UNLOCK(kevqth) do { \ + mtx_unlock(&(kevqth)->lock); \ +} while (0) +#define KEVQ_UNLOCK(kevq) do { \ + mtx_unlock(&(kevq)->lock); \ +} while (0) #define KQ_OWNED(kq) do { \ mtx_assert(&(kq)->kq_lock, MA_OWNED); \ } while (0) #define KQ_NOTOWNED(kq) do { \ mtx_assert(&(kq)->kq_lock, MA_NOTOWNED); \ } while (0) - +#define KN_FLUX_OWNED(kn) do { \ + mtx_assert(&(kn)->kn_fluxlock, MA_OWNED); \ +} while (0) +#define KN_FLUX_NOTOWNED(kn) do { \ + mtx_assert(&(kn)->kn_fluxlock, MA_NOTOWNED); \ +} while (0) +#define KEVQ_OWNED(kevq) do { \ + mtx_assert(&(kevq)->lock, MA_OWNED); \ +} while (0) +#define KEVQ_NOTOWNED(kevq) do { \ + mtx_assert(&(kevq)->lock, MA_NOTOWNED); \ +} while (0) static struct knlist * kn_list_lock(struct knote *kn) { @@ -278,23 +317,49 @@ kn_in_flux(struct knote *kn) } static void -kn_enter_flux(struct knote *kn) +knote_enter_flux_ul(struct knote *kn) { + KN_FLUX_NOTOWNED(kn); + KN_FLUX_LOCK(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); +} - KQ_OWNED(kn->kn_kq); +static void +knote_enter_flux(struct knote *kn) +{ +#ifdef KQ_DEBUG + printf("KQUEUE: knote_enter_flux: %p\n", kn); +#endif + KN_FLUX_OWNED(kn); MPASS(kn->kn_influx < INT_MAX); kn->kn_influx++; } +/* TODO: change *_ul functions to macros? */ static bool -kn_leave_flux(struct knote *kn) +knote_leave_flux_ul(struct knote *kn) { + bool ret; + KN_FLUX_NOTOWNED(kn); + KN_FLUX_LOCK(kn); + ret = knote_leave_flux(kn); + KN_FLUX_UNLOCK(kn); + return ret; +} - KQ_OWNED(kn->kn_kq); +static bool +knote_leave_flux(struct knote *kn) +{ +#ifdef KQ_DEBUG + printf("KQUEUE: knote_leave_flux: %p\n", kn); +#endif + KN_FLUX_OWNED(kn); MPASS(kn->kn_influx > 0); kn->kn_influx--; - return (kn->kn_influx == 0); -} + + return (kn->kn_influx == 0); + } #define KNL_ASSERT_LOCK(knl, islocked) do { \ if (islocked) \ @@ -316,14 +381,16 @@ kn_leave_flux(struct knote *kn) #ifndef KN_HASHSIZE #define KN_HASHSIZE 64 /* XXX should be tunable */ +#define KEVQ_HASHSIZE 64 #endif #define KN_HASH(val, mask) (((val) ^ (val >> 8)) & (mask)) +#define KEVQ_HASH(val, mask) KN_HASH((val), (mask)) static int filt_nullattach(struct knote *kn) { - + return (ENXIO); }; @@ -377,6 +444,10 @@ filt_fileattach(struct knote *kn) static int kqueue_kqfilter(struct file *fp, struct knote *kn) { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_kqfilter called for kn %p\n", kn); +#endif + struct kqueue *kq = kn->kn_fp->f_data; if (kn->kn_filter != EVFILT_READ) @@ -403,7 +474,15 @@ filt_kqueue(struct knote *kn, long hint) { struct kqueue *kq = kn->kn_fp->f_data; - kn->kn_data = kq->kq_count; +#ifdef KQ_DEBUG + printf("KQUEUE: filt_kqueue called for kn %p\n", kn); +#endif + + if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + return 0; + } + + kn->kn_data = kq->kq_kevq->kn_count; return (kn->kn_data > 0); } @@ -461,7 +540,7 @@ filt_procattach(struct knote *kn) * is registered. */ if (immediate || (exiting && filt_proc(kn, NOTE_EXIT))) - KNOTE_ACTIVATE(kn, 0); + knote_activate_ul(kn); PROC_UNLOCK(p); @@ -492,6 +571,10 @@ filt_proc(struct knote *kn, long hint) struct proc *p; u_int event; +#ifdef KQ_DEBUG + printf("KQUEUE: filt_proc called for kn %p\n", kn); +#endif + p = kn->kn_ptr.p_proc; if (p == NULL) /* already activated, from attach filter */ return (0); @@ -526,10 +609,11 @@ filt_proc(struct knote *kn, long hint) * child's pid. */ void -knote_fork(struct knlist *list, int pid) +knote_fork(struct knlist *list, struct thread *td, int pid) { struct kqueue *kq; struct knote *kn; + struct kevq *kevq; struct kevent kev; int error; @@ -541,18 +625,12 @@ knote_fork(struct knlist *list, int pid) memset(&kev, 0, sizeof(kev)); SLIST_FOREACH(kn, &list->kl_list, kn_selnext) { kq = kn->kn_kq; - KQ_LOCK(kq); - if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { - KQ_UNLOCK(kq); - continue; - } + kevq = kn->kn_org_kevq; - /* - * The same as knote(), activate the event. - */ - if ((kn->kn_sfflags & NOTE_TRACK) == 0) { - if (kn->kn_fop->f_event(kn, NOTE_FORK)) - KNOTE_ACTIVATE(kn, 1); + KQ_LOCK(kq); + KN_FLUX_LOCK(kn); + if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); continue; } @@ -563,8 +641,31 @@ knote_fork(struct knlist *list, int pid) * track the child. Drop the locks in preparation for * the call to kqueue_register(). */ - kn_enter_flux(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + + /* + * The same as knote(), activate the event. + */ + if ((kn->kn_sfflags & NOTE_TRACK) == 0) { + error = kn->kn_fop->f_event(kn, NOTE_FORK); + KQ_UNLOCK(kq); + + if(error) + knote_activate(kn); + + KN_LEAVE_FLUX_WAKEUP(kn); + continue; + } + KQ_UNLOCK(kq); + + /* + * The NOTE_TRACK case. In addition to the activation + * of the event, we need to register new events to + * track the child. Drop the locks in preparation for + * the call to kqueue_register(). + */ list->kl_unlock(list->kl_lockarg); /* @@ -583,7 +684,7 @@ knote_fork(struct knlist *list, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; @@ -597,15 +698,14 @@ knote_fork(struct knlist *list, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) - KNOTE_ACTIVATE(kn, 0); + knote_activate_ul(kn); list->kl_lock(list->kl_lockarg); - KQ_LOCK(kq); - kn_leave_flux(kn); - KQ_UNLOCK_FLUX(kq); + + KN_LEAVE_FLUX_WAKEUP(kn); } } @@ -687,7 +787,10 @@ filt_timerexpire(void *knx) kn = knx; kn->kn_data++; - KNOTE_ACTIVATE(kn, 0); /* XXX - handle locking */ + + knote_enter_flux_ul(kn); + knote_activate_ul(kn); + KN_LEAVE_FLUX_WAKEUP(kn); if ((kn->kn_flags & EV_ONESHOT) != 0) return; @@ -794,6 +897,7 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) { struct kq_timer_cb_data *kc; struct kqueue *kq; + struct kevq *kevq; sbintime_t to; int error; @@ -822,9 +926,13 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) * - clear the count of expiration events */ kq = kn->kn_kq; + kevq = kn->kn_kevq; KQ_LOCK(kq); - if (kn->kn_status & KN_QUEUED) + if (kn->kn_status & KN_QUEUED) { + KEVQ_LOCK(kevq); knote_dequeue(kn); + KEVQ_UNLOCK(kevq); + } kn->kn_status &= ~KN_ACTIVE; kn->kn_data = 0; @@ -859,14 +967,16 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) static int filt_timer(struct knote *kn, long hint) { - +#ifdef KQ_DEBUG + printf("KQUEUE: filt_timer called for kn %p\n", kn); +#endif return (kn->kn_data != 0); } static int filt_userattach(struct knote *kn) { - + /* * EVFILT_USER knotes are not attached to anything in the kernel. */ @@ -890,7 +1000,9 @@ filt_userdetach(__unused struct knote *kn) static int filt_user(struct knote *kn, __unused long hint) { - +#ifdef KQ_DEBUG + printf("KQUEUE: filt_user called for kn %p\n", kn); +#endif return (kn->kn_hookid); } @@ -961,9 +1073,7 @@ sys_kqueue(struct thread *td, struct kqueue_args *uap) static void kqueue_init(struct kqueue *kq) { - mtx_init(&kq->kq_lock, "kqueue", NULL, MTX_DEF | MTX_DUPOK); - TAILQ_INIT(&kq->kq_head); knlist_init_mtx(&kq->kq_sel.si_note, &kq->kq_lock); TASK_INIT(&kq->kq_task, 0, kqueue_task, kq); } @@ -1209,14 +1319,41 @@ kern_kevent(struct thread *td, int fd, int nchanges, int nevents, return (error); } +static struct kevq * +kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq) +{ + struct kevq *kevq_found, *kevq_each; + + kevq_found = NULL; + + if (!SLIST_EMPTY(kevq_list)) { + SLIST_FOREACH(kevq_each, kevq_list, kevq_th_e) { + if (kevq_each->kq == kq) { + kevq_found = kevq_each; + break; + } + } + } + + return kevq_found; +} + static int -kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, +kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchanges, int nevents, struct kevent_copyops *k_ops, const struct timespec *timeout) { struct kevent keva[KQ_NEVENTS]; struct kevent *kevp, *changes; int i, n, nerrors, error; + if ((kq->kq_state & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) { + /* Mark the global kevq as ready for single threaded mode to close the window between + kqueue_register and kqueue_scan.*/ + KEVQ_LOCK(kevq); + kevq->kevq_state |= KEVQ_RDY; + KEVQ_UNLOCK(kevq); + } + nerrors = 0; while (nchanges > 0) { n = nchanges > KQ_NEVENTS ? KQ_NEVENTS : nchanges; @@ -1229,7 +1366,7 @@ kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, if (!kevp->filter) continue; kevp->flags &= ~EV_SYSFLAGS; - error = kqueue_register(kq, kevp, td, M_WAITOK); + error = kqueue_register(kq, kevq, kevp, td, M_WAITOK); if (error || (kevp->flags & EV_RECEIPT)) { if (nevents == 0) return (error); @@ -1247,7 +1384,7 @@ kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, return (0); } - return (kqueue_scan(kq, nevents, k_ops, timeout, keva, td)); + return (kqueue_scan(kevq, nevents, k_ops, timeout, keva, td)); } int @@ -1255,13 +1392,17 @@ kern_kevent_fp(struct thread *td, struct file *fp, int nchanges, int nevents, struct kevent_copyops *k_ops, const struct timespec *timeout) { struct kqueue *kq; + struct kevq *kevq; int error; - error = kqueue_acquire(fp, &kq); + error = kqueue_acquire_both(fp, td, &kq, &kevq); + if (error != 0) return (error); - error = kqueue_kevent(kq, td, nchanges, nevents, k_ops, timeout); + + error = kqueue_kevent(kq, kevq, td, nchanges, nevents, k_ops, timeout); kqueue_release(kq, 0); + kevq_release(kevq, 0); return (error); } @@ -1274,12 +1415,18 @@ kern_kevent_anonymous(struct thread *td, int nevents, struct kevent_copyops *k_ops) { struct kqueue kq = {}; + struct kevq kevq = {}; int error; kqueue_init(&kq); + kevq_init(&kevq); + kq.kq_kevq = &kevq; + kevq.kq = &kq; kq.kq_refcnt = 1; - error = kqueue_kevent(&kq, td, nevents, nevents, k_ops, NULL); - kqueue_drain(&kq, td); + kevq.kevq_refcnt = 1; + error = kqueue_kevent(&kq, &kevq, td, nevents, nevents, k_ops, NULL); + // TODO: kevq destroy called here but memory not dynamically allocated + kqueue_drain(&kq, &kevq, td); kqueue_destroy(&kq); return (error); } @@ -1373,7 +1520,7 @@ kqueue_fo_release(int filt) * A ref to kq (obtained via kqueue_acquire) must be held. */ static int -kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, +kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct thread *td, int mflag) { struct filterops *fops; @@ -1382,6 +1529,9 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, struct knlist *knl; int error, filt, event; int haskqglobal, filedesc_unlock; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X\n", kq, kevq, (int)kev->ident, kev->filter, kev->flags); +#endif if ((kev->flags & (EV_ENABLE | EV_DISABLE)) == (EV_ENABLE | EV_DISABLE)) return (EINVAL); @@ -1453,6 +1603,7 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, KQ_GLOBAL_LOCK(&kq_global, haskqglobal); } + /* lock the kq lock for accessing kq_knhash table */ KQ_LOCK(kq); if (kev->ident < kq->kq_knlistsize) { SLIST_FOREACH(kn, &kq->kq_knlist[kev->ident], kn_link) @@ -1466,6 +1617,7 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, goto done; } + /* lock the kq lock for accessing kq_knhash table */ KQ_LOCK(kq); /* @@ -1490,21 +1642,30 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, } } + /* We need the kq lock because attaching to KQ requires KQ Lock */ + KQ_OWNED(kq); + /* knote is in the process of changing, wait for it to stabilize. */ - if (kn != NULL && kn_in_flux(kn)) { - KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); - if (filedesc_unlock) { - FILEDESC_XUNLOCK(td->td_proc->p_fd); - filedesc_unlock = 0; + if (kn != NULL) { + KN_FLUX_LOCK(kn); + if (kn_in_flux(kn)) { + KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); + if (filedesc_unlock) { + FILEDESC_XUNLOCK(td->td_proc->p_fd); + filedesc_unlock = 0; + } + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK | PDROP, "kqflxwt", 0); + + if (fp != NULL) { + fdrop(fp, td); + fp = NULL; + } + goto findkn; } - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK | PDROP, "kqflxwt", 0); - if (fp != NULL) { - fdrop(fp, td); - fp = NULL; - } - goto findkn; } + /* We now have exclusive access to the knote with flux lock and kq lock */ /* * kn now contains the matching knote, or NULL if no match @@ -1518,6 +1679,8 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, error = ENOMEM; goto done; } + knote_xinit(kn); + kn->kn_kevq = kevq; kn->kn_fp = fp; kn->kn_kq = kq; kn->kn_fop = fops; @@ -1538,7 +1701,7 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, kn->kn_status = KN_DETACHED; if ((kev->flags & EV_DISABLE) != 0) kn->kn_status |= KN_DISABLED; - kn_enter_flux(kn); + knote_enter_flux_ul(kn); error = knote_attach(kn, kq); KQ_UNLOCK(kq); @@ -1562,15 +1725,24 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, } if (kev->flags & EV_DELETE) { - kn_enter_flux(kn); + /* We have the exclusive flux lock here */ + knote_enter_flux(kn); + + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); + knote_drop(kn, td); goto done; } + /* We have the exclusive lock */ + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + + // we have kq lock and knote influx if (kev->flags & EV_FORCEONESHOT) { kn->kn_flags |= EV_ONESHOT; - KNOTE_ACTIVATE(kn, 1); + knote_activate(kn); } if ((kev->flags & EV_ENABLE) != 0) @@ -1579,13 +1751,14 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, kn->kn_status |= KN_DISABLED; /* - * The user may change some filter values after the initial EV_ADD, - * but doing so will not reset any filter which has already been + * The user may changkhe initial EV_ADD, + * but doing so will kas already been * triggered. */ kn->kn_status |= KN_SCAN; - kn_enter_flux(kn); + KQ_UNLOCK(kq); + knl = kn_list_lock(kn); kn->kn_kevent.udata = kev->udata; if (!fops->f_isfd && fops->f_touch != NULL) { @@ -1613,14 +1786,15 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, KQ_LOCK(kq); if (event) - kn->kn_status |= KN_ACTIVE; - if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == - KN_ACTIVE) - knote_enqueue(kn); + knote_activate(kn); + kn->kn_status &= ~KN_SCAN; - kn_leave_flux(kn); + + KN_LEAVE_FLUX_WAKEUP(kn); + + KQ_UNLOCK(kq); + kn_list_unlock(knl); - KQ_UNLOCK_FLUX(kq); done: KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); @@ -1634,26 +1808,246 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, return (error); } +static void +kevq_thred_init(struct kevq_thred *kevq_th) { + mtx_init(&kevq_th->lock, "kevq_th", NULL, MTX_DEF | MTX_DUPOK); + TAILQ_INIT(&kevq_th->kevq_tq); +} + +static void +kevq_thred_destroy(struct kevq_thred *kevq_th) { + free(kevq_th->kevq_hash, M_KQUEUE); + free(kevq_th, M_KQUEUE); +} + +void +kevq_thred_drain(struct kevq_thred *kevq_th) { + struct kevq *kevq; + int error; + + error = 0; + while((kevq = TAILQ_FIRST(&kevq_th->kevq_tq)) != NULL) { + // multithreaded mode, acquire kevq ref cnt + error = kevq_acquire(kevq); + if (!error) { + kevq_drain(kevq); + } + } + + kevq_thred_destroy(kevq_th); +} + +static void +kevq_init(struct kevq *kevq) { + mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK); + TAILQ_INIT(&kevq->kn_head); +} + +static void +kevq_release(struct kevq* kevq, int locked) +{ +#ifdef KQ_DEBUG + printf("KQUEUE: Releasing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); +#endif + if (locked) + KEVQ_OWNED(kevq); + else + KEVQ_LOCK(kevq); + kevq->kevq_refcnt--; + if (kevq->kevq_refcnt == 1) + wakeup(&kevq->kevq_refcnt); + if (!locked) + KEVQ_UNLOCK(kevq); +} + +static int +kevq_acquire(struct kevq *kevq) +{ +#ifdef KQ_DEBUG + printf("KQUEUE: Referencing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); +#endif + KEVQ_NOTOWNED(kevq); + int error; + error = 0; + KEVQ_LOCK(kevq); + if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { + error = EINVAL; + } else { + kevq->kevq_refcnt++; + } + KEVQ_UNLOCK(kevq); + return error; +} + +/* a reference to kq should be held */ +static int +kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) +{ + int error; + void* to_free; + struct kevq_thred *kevq_th; + struct kevq *kevq, *alloc_kevq; + struct kevqlist *kevq_list; + + kevq = NULL; + error = 0; + to_free = NULL; + kevq_th = NULL; + + KQ_NOTOWNED(kq); + + if ((kq->kq_state & KQ_CLOSING) == KQ_CLOSING) { + return EINVAL; + } + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if (td->td_kevq_thred == NULL) { + kevq_th = malloc(sizeof(struct kevq_thred), M_KQUEUE, M_WAITOK | M_ZERO); + kevq_thred_init(kevq_th); + kevq_th->kevq_hash = hashinit_flags(KEVQ_HASHSIZE, M_KQUEUE, &kevq_th->kevq_hashmask , HASH_WAITOK); + + thread_lock(td); + if (td->td_kevq_thred == NULL) { + td->td_kevq_thred = kevq_th; +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(M): allocated kevq_th %p for thread %d\n", kevq_th, td->td_tid); +#endif + } else { + to_free = kevq_th; + kevq_th = td->td_kevq_thred; + } + thread_unlock(td); + if (to_free != NULL) { + free(((struct kevq_thred*)to_free)->kevq_hash, M_KQUEUE); + free(to_free, M_KQUEUE); + } + } else { + kevq_th = td->td_kevq_thred; + } + + KASSERT(kevq_th != NULL && kevq_th->kevq_hashmask != 0, ("unallocated kevq")); + + // fast fail + KEVQ_TH_LOCK(kevq_th); + kevq_list = &kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq_th->kevq_hashmask)]; + kevq = kevqlist_find(kevq_list, kq); + KEVQ_TH_UNLOCK(kevq_th); + + if (kevq == NULL) { + // allocate kevq + to_free = NULL; + alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); + kevq_init(alloc_kevq); + alloc_kevq->kq = kq; + alloc_kevq->kevq_th = kevq_th; + +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", alloc_kevq, td->td_tid); +#endif + + KEVQ_TH_LOCK(kevq_th); + kevq = kevqlist_find(kevq_list, kq); + + if (kevq == NULL) { + kevq = alloc_kevq; + // insert kevq to the kevq_th hash table + SLIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); + // insert kevq to the kevq_th list, the list is used to drain kevq + TAILQ_INSERT_HEAD(&kevq_th->kevq_tq, kevq, kevq_th_tqe); + + KQ_LOCK(kq); + // insert to kq's kevq list + TAILQ_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); + KQ_UNLOCK(kq); + } else { + to_free = alloc_kevq; + } + KEVQ_TH_UNLOCK(kevq_th); + + if (to_free != NULL) { + free(to_free, M_KQUEUE); + } + } + + KASSERT(kevq != NULL, ("kevq isn't allocated.")); + + } else { + if (kq->kq_kevq == NULL) { + kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(S): allocated kevq %p for kq %p\n", kevq, kq); +#endif + kevq_init(kevq); + kevq->kq = kq; + + KQ_LOCK(kq); + if (kq->kq_kevq == NULL) { + kq->kq_kevq = kevq; + } else { + to_free = kevq; + } + KQ_UNLOCK(kq); + + if (to_free != NULL) { + free(to_free, M_KQUEUE); + } + } else { + kevq = kq->kq_kevq; + } + } + + error = kevq_acquire(kevq); + + if (!error) { + *kevqp = kevq; + } + + return error; +} + static int kqueue_acquire(struct file *fp, struct kqueue **kqp) { - int error; struct kqueue *kq; - error = 0; - kq = fp->f_data; if (fp->f_type != DTYPE_KQUEUE || kq == NULL) return (EBADF); *kqp = kq; + KQ_LOCK(kq); - if ((kq->kq_state & KQ_CLOSING) == KQ_CLOSING) { - KQ_UNLOCK(kq); - return (EBADF); + if ((kq->kq_state & KQ_FLAG_INIT) == 0) { + /* Mark the kqueue as initialized + TODO: Why not make some locks separate field so we + don't have short critical sections like this */ + kq->kq_state |= KQ_FLAG_INIT; } kq->kq_refcnt++; KQ_UNLOCK(kq); + return 0; +} + +static int +kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp) +{ + int error; + struct kqueue *tmp_kq; + struct kevq *tmp_kevq; + + error = kqueue_acquire(fp, &tmp_kq); + + if (!error) { + error = kevq_acquire_kq(tmp_kq, td, &tmp_kevq); + } + + if (error) { + kqueue_release(tmp_kq, 0); + } else { + *kqp = tmp_kq; + *kevqp = tmp_kevq; + } + return error; } @@ -1785,7 +2179,7 @@ kqueue_task(void *arg, int pending) * We treat KN_MARKER knotes as if they are in flux. */ static int -kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, +kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *tsp, struct kevent *keva, struct thread *td) { struct kevent *kevp; @@ -1826,18 +2220,33 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, } else asbt = 0; marker = knote_alloc(M_WAITOK); + knote_xinit(marker); marker->kn_status = KN_MARKER; - KQ_LOCK(kq); + KEVQ_LOCK(kevq); + if ((kevq->kevq_state & KEVQ_RDY) == 0) { + /* Mark the kevq as ready to receive events */ + kevq->kevq_state |= KEVQ_RDY; + } + retry: kevp = keva; - if (kq->kq_count == 0) { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: kevq %p has %d events\n", kevq, kevq->kn_count); +#endif + if (kevq->kn_count == 0) { if (asbt == -1) { error = EWOULDBLOCK; } else { - kq->kq_state |= KQ_SLEEP; - error = msleep_sbt(kq, &kq->kq_lock, PSOCK | PCATCH, + kevq->kevq_state |= KEVQ_SLEEP; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d waiting on kevq %p for events\n", td->td_tid, kevq); +#endif + error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH, "kqread", asbt, rsbt, C_ABSOLUTE); +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d wokeup on kevq %p for events\n", td->td_tid, kevq); +#endif } if (error == 0) goto retry; @@ -1849,34 +2258,46 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, goto done; } - TAILQ_INSERT_TAIL(&kq->kq_head, marker, kn_tqe); + KEVQ_OWNED(kevq); + TAILQ_INSERT_TAIL(&kevq->kn_head, marker, kn_tqe); influx = 0; - while (count) { - KQ_OWNED(kq); - kn = TAILQ_FIRST(&kq->kq_head); + while (count) { + KEVQ_OWNED(kevq); + kn = TAILQ_FIRST(&kevq->kn_head); + + KN_FLUX_LOCK(kn); if ((kn->kn_status == KN_MARKER && kn != marker) || kn_in_flux(kn)) { if (influx) { influx = 0; - KQ_FLUX_WAKEUP(kq); + knote_flux_wakeup(kn); } - kq->kq_state |= KQ_FLUXWAIT; - error = msleep(kq, &kq->kq_lock, PSOCK, + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + error = msleep(kn, &kevq->lock, PSOCK, "kqflxwt", 0); continue; } - TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + /* Now we have exclusive access to kn */ + TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d on kevq %p dequeued knote %p\n", td->td_tid, kevq, kn); +#endif if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { kn->kn_status &= ~KN_QUEUED; - kq->kq_count--; + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); continue; } if (kn == marker) { - KQ_FLUX_WAKEUP(kq); - if (count == maxevents) + /* We are dequeuing our marker, wakeup threads waiting on it */ + knote_flux_wakeup(kn); + KN_FLUX_UNLOCK(kn); + if (count == maxevents) { goto retry; + } goto done; } KASSERT(!kn_in_flux(kn), @@ -1884,54 +2305,62 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, if ((kn->kn_flags & EV_DROP) == EV_DROP) { kn->kn_status &= ~KN_QUEUED; - kn_enter_flux(kn); - kq->kq_count--; - KQ_UNLOCK(kq); + knote_enter_flux(kn); + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); /* * We don't need to lock the list since we've * marked it as in flux. */ knote_drop(kn, td); - KQ_LOCK(kq); + KEVQ_LOCK(kevq); continue; } else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) { kn->kn_status &= ~KN_QUEUED; - kn_enter_flux(kn); - kq->kq_count--; - KQ_UNLOCK(kq); + knote_enter_flux(kn); + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); /* * We don't need to lock the list since we've * marked the knote as being in flux. */ *kevp = kn->kn_kevent; knote_drop(kn, td); - KQ_LOCK(kq); + KEVQ_LOCK(kevq); kn = NULL; } else { kn->kn_status |= KN_SCAN; - kn_enter_flux(kn); - KQ_UNLOCK(kq); - if ((kn->kn_status & KN_KQUEUE) == KN_KQUEUE) + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); + if ((kn->kn_status & KN_KQUEUE) == KN_KQUEUE) { + /* TODO: we are waiting for another kqueue + */ KQ_GLOBAL_LOCK(&kq_global, haskqglobal); + } knl = kn_list_lock(kn); if (kn->kn_fop->f_event(kn, 0) == 0) { - KQ_LOCK(kq); + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN); - kn_leave_flux(kn); - kq->kq_count--; + knote_leave_flux_ul(kn); + kevq->kn_count--; kn_list_unlock(knl); influx = 1; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: kn %p not valid anymore for kevq %p\n", kn, kevq); +#endif continue; } - touch = (!kn->kn_fop->f_isfd && - kn->kn_fop->f_touch != NULL); + touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); if (touch) kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS); else *kevp = kn->kn_kevent; - KQ_LOCK(kq); + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); if (kn->kn_flags & (EV_CLEAR | EV_DISPATCH)) { /* @@ -1945,12 +2374,16 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, if (kn->kn_flags & EV_DISPATCH) kn->kn_status |= KN_DISABLED; kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); - kq->kq_count--; - } else - TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); + kevq->kn_count--; + } else { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: requeued kn %p to kevq %p\n", kn, kevq); +#endif + TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); + } kn->kn_status &= ~KN_SCAN; - kn_leave_flux(kn); + knote_leave_flux_ul(kn); kn_list_unlock(knl); influx = 1; } @@ -1961,23 +2394,31 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, count--; if (nkev == KQ_NEVENTS) { + KEVQ_UNLOCK(kevq); + influx = 0; - KQ_UNLOCK_FLUX(kq); + knote_flux_wakeup_ul(kn); + error = k_ops->k_copyout(k_ops->arg, keva, nkev); nkev = 0; kevp = keva; - KQ_LOCK(kq); + KEVQ_LOCK(kevq); if (error) break; } } - TAILQ_REMOVE(&kq->kq_head, marker, kn_tqe); + TAILQ_REMOVE(&kevq->kn_head, marker, kn_tqe); done: - KQ_OWNED(kq); - KQ_UNLOCK_FLUX(kq); + KEVQ_OWNED(kevq); + KEVQ_UNLOCK(kevq); + + if (kn != NULL) { + knote_flux_wakeup_ul(kn); + } + knote_free(marker); done_nl: - KQ_NOTOWNED(kq); + KEVQ_NOTOWNED(kevq); if (nkev != 0) error = k_ops->k_copyout(k_ops->arg, keva, nkev); td->td_retval[0] = maxevents - count; @@ -2022,14 +2463,37 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, case FIOSETOWN: return (fsetown(*(int *)data, &kq->kq_sigio)); - + case FIOGETOWN: *(int *)data = fgetown(&kq->kq_sigio); return (0); } #endif + struct kqueue *kq; + int error = 0; - return (ENOTTY); + kq = fp->f_data; +#ifdef KQ_DEBUG + printf("KQUEUE: ioctl: received: kq %p cmd: 0x%lx", kq, cmd); +#endif + switch (cmd) { + case FKQMULTI: + if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { + error = (EINVAL); + } else { +#ifdef KQ_DEBUG + printf("KQUEUE: ioctl: multi flag set for kq %p", kq); +#endif + KQ_LOCK(kq); + kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); + KQ_UNLOCK(kq); + } + break; + default: + error = (ENOTTY); + } + + return error; } /*ARGSUSED*/ @@ -2038,24 +2502,34 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred, struct thread *td) { struct kqueue *kq; + struct kevq *kevq; int revents = 0; int error; - if ((error = kqueue_acquire(fp, &kq))) + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq))) return POLLERR; - KQ_LOCK(kq); - if (events & (POLLIN | POLLRDNORM)) { - if (kq->kq_count) { - revents |= events & (POLLIN | POLLRDNORM); + if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) { + // TODO: support select? really need it in multithreaded mode? + KQ_LOCK(kq); + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + revents = 0; } else { - selrecord(td, &kq->kq_sel); - if (SEL_WAITING(&kq->kq_sel)) - kq->kq_state |= KQ_SEL; + if (events & (POLLIN | POLLRDNORM)) { + if (kq->kq_kevq->kn_count) { + revents |= events & (POLLIN | POLLRDNORM); + } else { + selrecord(td, &kq->kq_sel); + if (SEL_WAITING(&kq->kq_sel)) + kq->kq_state |= KQ_SEL; + } + } } + + kqueue_release(kq, 1); + KQ_UNLOCK(kq); + kevq_release(kevq, 0); } - kqueue_release(kq, 1); - KQ_UNLOCK(kq); return (revents); } @@ -2078,10 +2552,110 @@ kqueue_stat(struct file *fp, struct stat *st, struct ucred *active_cred, } static void -kqueue_drain(struct kqueue *kq, struct thread *td) +kevq_destroy(struct kevq *kevq) +{ +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_destroy for %p \n", kevq); +#endif + free(kevq, M_KQUEUE); +} + +/* This is called on every kevq when kqueue exits + This is also called when a thread exits/crashes (currently racing, also to make it work need to reconfigure kq->ck_evq) + * a ref cnt must be held */ +void +kevq_drain(struct kevq *kevq) +{ + struct kqueue *kq; + struct knote *kn; + struct kevqlist *kevq_list; +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_drain for %p (refcnt = %d) with %d knotes\n", kevq, kevq->kevq_refcnt, kevq->kn_count); +#endif + kq = kevq->kq; + + KQ_NOTOWNED(kq); + KEVQ_NOTOWNED(kevq); + + KEVQ_LOCK(kevq); + if(kevq->kevq_state == KEVQ_CLOSING) { + // already closing, dereference + kevq_release(kevq, 1); + KEVQ_UNLOCK(kevq); + return; + } else { + kevq->kevq_state |= KEVQ_CLOSING; + } + + if (kevq->kevq_refcnt > 1) + msleep(&kevq->kevq_refcnt, &kevq->lock, PSOCK, "kevqclose1", 0); + + KEVQ_OWNED(kevq); + KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!")); + + /* drain all knotes on the kevq */ + TAILQ_FOREACH(kn, &kevq->kn_head, kn_tqe) { + +retry: + KEVQ_OWNED(kevq); + KN_FLUX_LOCK(kn); + /* Wait for kn to stablize */ + if (kn_in_flux(kn)) { + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kevq->lock, PSOCK, "kevqclose2", 0); + goto retry; + } + + KN_FLUX_OWNED(kn); + KASSERT(!kn_in_flux(kn), ("knote is still influx")); + + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + knote_dequeue(kn); + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) { + KEVQ_UNLOCK(kevq); + knote_activate_ul(kn); + KEVQ_LOCK(kevq); + } + + KN_LEAVE_FLUX_WAKEUP(kn); + } + + KASSERT(kevq->kn_count == 0, ("some knotes are left")); + KEVQ_UNLOCK(kevq); + + KQ_LOCK(kq); + TAILQ_REMOVE(&kq->kq_kevqlist, kevq, kq_e); + KQ_UNLOCK(kq); + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + // drop from kq if in multithreaded mode + KEVQ_TH_LOCK(kevq->kevq_th); + TAILQ_REMOVE(&kevq->kevq_th->kevq_tq, kevq, kevq_th_tqe); + kevq_list = &kevq->kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq->kevq_th->kevq_hashmask)]; + SLIST_REMOVE(kevq_list, kevq, kevq, kevq_th_e); + KEVQ_TH_UNLOCK(kevq->kevq_th); + } + + + /* delete the kevq */ + kevq_destroy(kevq); +} + +/* kevq is only used when kq is in single mode + in this case kevq has been referenced by the caller */ +static void +kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) { struct knote *kn; int i; + int error; + +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_drain on %p. args kevq %p\n", kq, kevq); +#endif KQ_LOCK(kq); @@ -2096,14 +2670,36 @@ kqueue_drain(struct kqueue *kq, struct thread *td) KASSERT(knlist_empty(&kq->kq_sel.si_note), ("kqueue's knlist not empty")); + + KQ_UNLOCK(kq); + + error = 0; + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + /* drain all kevqs belonging to the kq */ + while ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) != NULL) { + error = kevq_acquire(kevq); + if (!error) { + kevq_drain(kevq); + } + } + } else { + if (kevq != NULL) { + kevq_drain(kevq); + } + } + KQ_LOCK(kq); + for (i = 0; i < kq->kq_knlistsize; i++) { while ((kn = SLIST_FIRST(&kq->kq_knlist[i])) != NULL) { + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, "kqclo1", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqclo1", 0); continue; } - kn_enter_flux(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop(kn, td); KQ_LOCK(kq); @@ -2112,13 +2708,15 @@ kqueue_drain(struct kqueue *kq, struct thread *td) if (kq->kq_knhashmask != 0) { for (i = 0; i <= kq->kq_knhashmask; i++) { while ((kn = SLIST_FIRST(&kq->kq_knhash[i])) != NULL) { + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, - "kqclo2", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqclo2", 0); continue; } - kn_enter_flux(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop(kn, td); KQ_LOCK(kq); @@ -2163,13 +2761,22 @@ static int kqueue_close(struct file *fp, struct thread *td) { struct kqueue *kq = fp->f_data; + struct kevq *kevq = NULL; struct filedesc *fdp; int error; int filedesc_unlock; - if ((error = kqueue_acquire(fp, &kq))) - return error; - kqueue_drain(kq, td); + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + // only acquire the kqueue lock here + if ((error = kqueue_acquire(fp, &kq))) + return error; + } else { + // acquire both + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq))) + return error; + } + + kqueue_drain(kq, kevq, td); /* * We could be called due to the knote_drop() doing fdrop(), @@ -2193,7 +2800,9 @@ kqueue_close(struct file *fp, struct thread *td) crfree(kq->kq_cred); free(kq, M_KQUEUE); fp->f_data = NULL; - +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_closed for %p.\n", kq); +#endif return (0); } @@ -2205,15 +2814,20 @@ kqueue_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp) return (0); } +static void +kevq_wakeup(struct kevq* kevq) +{ + KEVQ_OWNED(kevq); + if ((kevq->kevq_state & KEVQ_SLEEP) == KEVQ_SLEEP) { + kevq->kevq_state &= ~KEVQ_SLEEP; + wakeup(kevq); + } +} + static void kqueue_wakeup(struct kqueue *kq) { KQ_OWNED(kq); - - if ((kq->kq_state & KQ_SLEEP) == KQ_SLEEP) { - kq->kq_state &= ~KQ_SLEEP; - wakeup(kq); - } if ((kq->kq_state & KQ_SEL) == KQ_SEL) { selwakeuppri(&kq->kq_sel, PSOCK); if (!SEL_WAITING(&kq->kq_sel)) @@ -2239,7 +2853,7 @@ knote(struct knlist *list, long hint, int lockflags) { struct kqueue *kq; struct knote *kn, *tkn; - int error; + int require_kqlock; if (list == NULL) return; @@ -2257,8 +2871,10 @@ knote(struct knlist *list, long hint, int lockflags) * or other threads could remove events. */ SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { - kq = kn->kn_kq; - KQ_LOCK(kq); +#ifdef KQ_DEBUG + printf("KNOTE: knote() scanning kn %p\n", kn); +#endif + KN_FLUX_LOCK(kn); if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { /* * Do not process the influx notes, except for @@ -2268,33 +2884,95 @@ knote(struct knlist *list, long hint, int lockflags) * fragment in kqueue_scan() locks the knlist, * and cannot proceed until we finished. */ - KQ_UNLOCK(kq); - } else if ((lockflags & KNF_NOKQLOCK) != 0) { - kn_enter_flux(kn); - KQ_UNLOCK(kq); - error = kn->kn_fop->f_event(kn, hint); - KQ_LOCK(kq); - kn_leave_flux(kn); - if (error) - KNOTE_ACTIVATE(kn, 1); - KQ_UNLOCK_FLUX(kq); + KN_FLUX_UNLOCK(kn); } else { - if (kn->kn_fop->f_event(kn, hint)) - KNOTE_ACTIVATE(kn, 1); - KQ_UNLOCK(kq); + kq = kn->kn_kq; + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); + + require_kqlock = ((lockflags & KNF_NOKQLOCK) == 0); + + if (require_kqlock) + KQ_LOCK(kq); + + if (kn->kn_fop->f_event(kn, hint)){ + // TODO: WTH is this? + require_kqlock ? knote_activate(kn) : knote_activate_ul(kn); + } + + if (require_kqlock) + KQ_UNLOCK(kq); + + KN_LEAVE_FLUX_WAKEUP(kn); } } if ((lockflags & KNF_LISTLOCKED) == 0) list->kl_unlock(list->kl_lockarg); } +static void +knote_flux_wakeup_ul(struct knote *kn) +{ + KN_FLUX_NOTOWNED(kn); + KN_FLUX_LOCK(kn); + knote_flux_wakeup(kn); + KN_FLUX_UNLOCK(kn); +} + +static void +knote_flux_wakeup(struct knote *kn) +{ + KN_FLUX_OWNED(kn); + if ((kn)->kn_fluxwait) { + (kn)->kn_fluxwait = 0; + wakeup((kn)); + } +} + +static void +knote_activate_ul(struct knote *kn) +{ + KQ_LOCK(kn->kn_kq); + knote_activate(kn); + KQ_UNLOCK(kn->kn_kq); +} + +/* + * activate a knote + * the knote should be marked in flux and the knote flux lock should not be owned + * none of the other locks should be held + */ +static void +knote_activate(struct knote *kn) +{ + struct kqueue *kq; + kq = kn->kn_kq; + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_activate: kn %p\n", kn); +#endif + KQ_OWNED(kq); + KN_FLUX_NOTOWNED(kn); + KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); + + kn->kn_status |= KN_ACTIVE; + + if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) { + knote_sched(kn); + } + + kqueue_wakeup(kq); +} + /* * add a knote to a knlist */ void knlist_add(struct knlist *knl, struct knote *kn, int islocked) { - +#ifdef KQ_DEBUG + printf("KNLIST: knlist_add kn %p\n", kn); +#endif KNL_ASSERT_LOCK(knl, islocked); KQ_NOTOWNED(kn->kn_kq); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); @@ -2514,18 +3192,22 @@ knlist_cleardel(struct knlist *knl, struct thread *td, int islocked, int killkn) SLIST_FOREACH_SAFE(kn, &knl->kl_list, kn_selnext, kn2) { kq = kn->kn_kq; KQ_LOCK(kq); + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); continue; } knlist_remove_kq(knl, kn, 1, 1); if (killkn) { - kn_enter_flux(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop_detached(kn, td); } else { /* Make sure cleared knotes disappear soon */ kn->kn_flags |= EV_EOF | EV_ONESHOT; + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); } kq = NULL; @@ -2536,10 +3218,12 @@ knlist_cleardel(struct knlist *knl, struct thread *td, int islocked, int killkn) kn = SLIST_FIRST(&knl->kl_list); kq = kn->kn_kq; KQ_LOCK(kq); + KN_FLUX_LOCK(kn); KASSERT(kn_in_flux(kn), ("knote removed w/o list lock")); knl->kl_unlock(knl->kl_lockarg); - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK | PDROP, "kqkclr", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK | PDROP, "kqkclr", 0); kq = NULL; goto again; } @@ -2578,21 +3262,26 @@ knote_fdclose(struct thread *td, int fd) influx = 0; while (kq->kq_knlistsize > fd && (kn = SLIST_FIRST(&kq->kq_knlist[fd])) != NULL) { + KQ_OWNED(kq); + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { /* someone else might be waiting on our knote */ if (influx) - wakeup(kq); - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, "kqflxwt", 0); + knote_flux_wakeup(kn); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt", 0); goto again; } - kn_enter_flux(kn); + knote_enter_flux(kn); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); + /* WTF ? influx should be 0? */ influx = 1; knote_drop(kn, td); KQ_LOCK(kq); } - KQ_UNLOCK_FLUX(kq); + KQ_UNLOCK(kq); } } @@ -2632,15 +3321,18 @@ static void knote_drop_detached(struct knote *kn, struct thread *td) { struct kqueue *kq; + struct kevq *kevq; struct klist *list; kq = kn->kn_kq; + kevq = kn->kn_kevq; KASSERT((kn->kn_status & KN_DETACHED) != 0, ("knote %p still attached", kn)); KQ_NOTOWNED(kq); KQ_LOCK(kq); + KASSERT(kn->kn_influx == 1, ("knote_drop called on %p with influx %d", kn, kn->kn_influx)); @@ -2651,9 +3343,14 @@ knote_drop_detached(struct knote *kn, struct thread *td) if (!SLIST_EMPTY(list)) SLIST_REMOVE(list, kn, knote, kn_link); - if (kn->kn_status & KN_QUEUED) + + KQ_UNLOCK(kq); + + if (kn->kn_status & KN_QUEUED) { + KEVQ_LOCK(kevq); knote_dequeue(kn); - KQ_UNLOCK_FLUX(kq); + KEVQ_UNLOCK(kevq); + } if (kn->kn_fop->f_isfd) { fdrop(kn->kn_fp, td); @@ -2664,31 +3361,130 @@ knote_drop_detached(struct knote *kn, struct thread *td) knote_free(kn); } + +// if no kevqs are available for queueing, returns NULL static void -knote_enqueue(struct knote *kn) +knote_sched(struct knote *kn) { struct kqueue *kq = kn->kn_kq; + struct kevq *next_kevq; + + KQ_OWNED(kq); + KASSERT(kn_in_flux(kn), ("kn not in flux")); + + // reschedule knotes to available threads + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) affinity set: kn %p \n", kn); +#endif + if ((kn->kn_org_kevq->kevq_state & KEVQ_RDY) != 0) { + next_kevq = kn->kn_org_kevq; + } else { + next_kevq = NULL; + } + } else { + next_kevq = kq->kq_ckevq; + while(1) { + if (next_kevq == NULL) { + next_kevq = TAILQ_FIRST(&kq->kq_kevqlist); + if (next_kevq == NULL) { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) no kevqs exist for queueing kn %p, discarding... \n", kn); +#endif + break; + } + } else { + next_kevq = TAILQ_NEXT(next_kevq, kq_e); + if (next_kevq == NULL) { + continue; + } + } + + KASSERT(next_kevq != NULL, ("picked a null kevq")); + KEVQ_LOCK(next_kevq); + + if ((next_kevq->kevq_state & KEVQ_CLOSING) == 0 && (next_kevq->kevq_state & KEVQ_RDY) != 0) { + kq->kq_ckevq = next_kevq; + break; + } + + if (next_kevq == kq->kq_ckevq) { + // if the previous "if" didn't break + // we have traversed the list once and the current kevq is closing + // we have no queue to queue the knote +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) no open kevqs for queueing kn %p, discarding... \n", kn); +#endif + next_kevq = NULL; + break; + } + } + } + } else { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(S): kn %p to kevq %p\n", kn, kq->kq_kevq); +#endif + next_kevq = kq->kq_kevq; + } + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) next kevq %p for kn %p \n", next_kevq, kn); +#endif + + if (next_kevq != NULL) { + KEVQ_OWNED(next_kevq); + knote_enqueue(kn, next_kevq); + KEVQ_UNLOCK(next_kevq); + } +} + +static void +knote_enqueue(struct knote *kn, struct kevq *kevq) +{ + struct kqueue *kq; + kq = kn->kn_kq; + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_enqueue: kn %p to kevq %p\n", kn, kevq); +#endif + + KEVQ_OWNED(kevq); - KQ_OWNED(kn->kn_kq); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); + KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0 && (kevq->kevq_state & KEVQ_RDY) != 0, ("kevq already closing or not ready")); - TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); + kn->kn_kevq = kevq; kn->kn_status |= KN_QUEUED; - kq->kq_count++; - kqueue_wakeup(kq); + + TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); + kevq->kn_count++; + + kevq_wakeup(kevq); +} + +static void +knote_xinit(struct knote *kn) +{ + mtx_init(&kn->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK); } static void knote_dequeue(struct knote *kn) { - struct kqueue *kq = kn->kn_kq; + struct kevq *kevq = kn->kn_kevq; - KQ_OWNED(kn->kn_kq); + KEVQ_OWNED(kevq); + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_dequeue: kn %p from kevq %p\n", kn, kevq); +#endif KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued")); - TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); kn->kn_status &= ~KN_QUEUED; - kq->kq_count--; + kn->kn_kevq = NULL; + kevq->kn_count--; } static void @@ -2710,7 +3506,9 @@ knote_alloc(int mflag) static void knote_free(struct knote *kn) { - +#ifdef KQ_DEBUG + printf("KQUEUE: knote_free: kn %p\n", kn); +#endif uma_zfree(knote_zone, kn); } @@ -2721,6 +3519,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag) { struct kqueue *kq; + struct kevq *kevq; struct file *fp; cap_rights_t rights; int error; @@ -2728,11 +3527,12 @@ kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag) error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp); if (error != 0) return (error); - if ((error = kqueue_acquire(fp, &kq)) != 0) + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)) != 0) goto noacquire; - error = kqueue_register(kq, kev, td, mflag); + error = kqueue_register(kq, kevq, kev, td, mflag); kqueue_release(kq, 0); + kevq_release(kevq, 0); noacquire: fdrop(fp, td); diff --git a/sys/kern/kern_fork.c b/sys/kern/kern_fork.c index 39307a573bb3..5a886d9cfaaa 100644 --- a/sys/kern/kern_fork.c +++ b/sys/kern/kern_fork.c @@ -690,7 +690,7 @@ do_fork(struct thread *td, struct fork_req *fr, struct proc *p2, struct thread * /* * Tell any interested parties about the new process. */ - knote_fork(p1->p_klist, p2->p_pid); + knote_fork(p1->p_klist, td, p2->p_pid); /* * Now can be swapped. diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c index 69635f58f67e..20dedb3b9518 100644 --- a/sys/kern/kern_thread.c +++ b/sys/kern/kern_thread.c @@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #ifdef HWPMC_HOOKS #include #endif @@ -82,9 +83,9 @@ _Static_assert(offsetof(struct thread, td_flags) == 0xfc, "struct thread KBI td_flags"); _Static_assert(offsetof(struct thread, td_pflags) == 0x104, "struct thread KBI td_pflags"); -_Static_assert(offsetof(struct thread, td_frame) == 0x478, +_Static_assert(offsetof(struct thread, td_frame) == 0x478 + 0x8, "struct thread KBI td_frame"); -_Static_assert(offsetof(struct thread, td_emuldata) == 0x530, +_Static_assert(offsetof(struct thread, td_emuldata) == 0x530 + 0x8, "struct thread KBI td_emuldata"); _Static_assert(offsetof(struct proc, p_flag) == 0xb0, "struct proc KBI p_flag"); @@ -444,6 +445,9 @@ thread_free(struct thread *td) cpu_thread_free(td); if (td->td_kstack != 0) vm_thread_dispose(td); + if (td->td_kevq_thred != NULL) { + kevq_thred_drain(td->td_kevq_thred); + } callout_drain(&td->td_slpcallout); uma_zfree(thread_zone, td); } diff --git a/sys/sys/event.h b/sys/sys/event.h index e2e80f37946e..5fcf30cb964b 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -33,6 +33,8 @@ #include #include +#include +#include #define EVFILT_READ (-1) #define EVFILT_WRITE (-2) @@ -143,6 +145,7 @@ struct kevent32_freebsd11 { #define EV_CLEAR 0x0020 /* clear event state after reporting */ #define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */ #define EV_DISPATCH 0x0080 /* disable event after reporting */ +#define EV_AFFINITY 0x0200 /* in multithreaded mode, this event has hard affinity for the registering thread */ #define EV_SYSFLAGS 0xF000 /* reserved by system */ #define EV_DROP 0x1000 /* note should be dropped */ @@ -220,6 +223,9 @@ struct knote; SLIST_HEAD(klist, knote); struct kqueue; TAILQ_HEAD(kqlist, kqueue); +struct kevq; +SLIST_HEAD(kevqlist, kevq); + struct knlist { struct klist kl_list; void (*kl_lock)(void *); /* lock function */ @@ -269,6 +275,10 @@ struct filterops { void (*f_touch)(struct knote *kn, struct kevent *kev, u_long type); }; +/* The ioctl to set multithreaded mode + */ +#define FKQMULTI _IO('f', 89) + /* * An in-flux knote cannot be dropped from its kq while the kq is * unlocked. If the KN_SCAN flag is not set, a thread can only set @@ -283,7 +293,9 @@ struct knote { SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; - struct kqueue *kn_kq; /* which queue we are on */ + struct kqueue *kn_kq; /* which kqueue we are on */ + struct kevq *kn_org_kevq; /* the kevq that registered the knote */ + struct kevq *kn_kevq; /* the kevq the knote is on */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; @@ -295,7 +307,9 @@ struct knote { #define KN_MARKER 0x20 /* ignore this knote */ #define KN_KQUEUE 0x40 /* this knote belongs to a kq */ #define KN_SCAN 0x100 /* flux set in kqueue_scan() */ + int kn_fluxwait; int kn_influx; + struct mtx kn_fluxlock; int kn_sfflags; /* saved filter flags */ int64_t kn_sdata; /* saved data field */ union { @@ -321,6 +335,14 @@ struct kevent_copyops { size_t kevent_size; }; +struct kevq_thred { + u_long kevq_hashmask; /* hash mask for kevqs */ + struct kevqlist *kevq_hash; /* hash table for kevqs */ + TAILQ_HEAD(, kevq) kevq_tq; + struct mtx lock; /* the lock for the kevq*/ +}; + + struct thread; struct proc; struct knlist; @@ -328,7 +350,7 @@ struct mtx; struct rwlock; void knote(struct knlist *list, long hint, int lockflags); -void knote_fork(struct knlist *list, int pid); +void knote_fork(struct knlist *list, struct thread *td, int pid); struct knlist *knlist_alloc(struct mtx *lock); void knlist_detach(struct knlist *knl); void knlist_add(struct knlist *knl, struct knote *kn, int islocked); @@ -352,6 +374,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *p, int kqueue_add_filteropts(int filt, struct filterops *filtops); int kqueue_del_filteropts(int filt); +void kevq_thred_drain(struct kevq_thred *kevq_th); #else /* !_KERNEL */ #include diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 1ed6e9fc4b6b..f9920417ee53 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -40,27 +40,47 @@ #define KQ_NEVENTS 8 /* minimize copy{in,out} calls */ #define KQEXTENT 256 /* linear growth by this amount */ +struct kevq { + SLIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */ + TAILQ_ENTRY(kevq) kq_e; /* entry into kqueue's list */ + TAILQ_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's TAILQ */ + struct kqueue *kq; /* the kq that the kevq belongs to */ + struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */ + struct mtx lock; /* the lock for the kevq */ + TAILQ_HEAD(, knote) kn_head; /* list of pending knotes */ + int kn_count; /* number of pending knotes */ +#define KEVQ_SLEEP 0x01 +#define KEVQ_CLOSING 0x02 +#define KEVQ_RDY 0x04 + int kevq_state; + int kevq_refcnt; +}; + struct kqueue { struct mtx kq_lock; int kq_refcnt; - TAILQ_ENTRY(kqueue) kq_list; - TAILQ_HEAD(, knote) kq_head; /* list of pending event */ - int kq_count; /* number of pending events */ struct selinfo kq_sel; - struct sigio *kq_sigio; - struct filedesc *kq_fdp; int kq_state; #define KQ_SEL 0x01 -#define KQ_SLEEP 0x02 -#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */ -#define KQ_ASYNC 0x08 +#define KQ_ASYNC 0x02 +#define KQ_TASKSCHED 0x04 /* task scheduled */ +#define KQ_TASKDRAIN 0x08 /* waiting for task to drain */ #define KQ_CLOSING 0x10 -#define KQ_TASKSCHED 0x20 /* task scheduled */ -#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */ +#define KQ_FLAG_INIT 0x20 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */ +#define KQ_FLAG_MULTI 0x40 /* Multi-threaded mode */ + TAILQ_ENTRY(kqueue) kq_list; + struct sigio *kq_sigio; + struct filedesc *kq_fdp; int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ + /* only-set: in multithreaded mode */ + TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */ + struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */ + /* only-set: in single threaded mode */ + struct kevq *kq_kevq; + /* End only-set */ struct task kq_task; struct ucred *kq_cred; }; diff --git a/sys/sys/proc.h b/sys/sys/proc.h index 7e67ec48e0ac..bb4b4fd51617 100644 --- a/sys/sys/proc.h +++ b/sys/sys/proc.h @@ -302,6 +302,7 @@ struct thread { int td_rtcgen; /* (s) rtc_generation of abs. sleep */ size_t td_vslock_sz; /* (k) amount of vslock-ed space */ struct kcov_info *td_kcov_info; /* (*) Kernel code coverage data */ + struct kevq_thred *td_kevq_thred; #define td_endzero td_sigmask /* Copied during fork1() or create_thread(). */ diff --git a/tests/sys/kqueue/libkqueue/Makefile b/tests/sys/kqueue/libkqueue/Makefile index 94d198bf9900..10672d2d1466 100644 --- a/tests/sys/kqueue/libkqueue/Makefile +++ b/tests/sys/kqueue/libkqueue/Makefile @@ -15,7 +15,8 @@ SRCS.kqtest= \ vnode.c \ proc.c \ signal.c \ + read_m.c \ user.c WARNS?= 2 - +LDADD+= -lthr .include diff --git a/tests/sys/kqueue/libkqueue/main.c b/tests/sys/kqueue/libkqueue/main.c index d2e45c40d894..64a6cb465b25 100644 --- a/tests/sys/kqueue/libkqueue/main.c +++ b/tests/sys/kqueue/libkqueue/main.c @@ -29,6 +29,7 @@ extern void test_evfilt_read(); extern void test_evfilt_signal(); extern void test_evfilt_vnode(); extern void test_evfilt_timer(); +extern void test_evfilt_read_m(); extern void test_evfilt_proc(); #if HAVE_EVFILT_USER extern void test_evfilt_user(); @@ -322,6 +323,7 @@ main(int argc, char **argv) int test_signal = 1; int test_vnode = 1; int test_timer = 1; + int test_socket_m = 1; #ifdef __FreeBSD__ int test_user = 1; #else @@ -342,6 +344,8 @@ main(int argc, char **argv) test_vnode = 0; if (strcmp(argv[0], "--no-user") == 0) test_user = 0; + if (strcmp(argv[0], "--no-socket_m") == 0) + test_socket_m = 0; argv++; argc--; } @@ -360,6 +364,8 @@ main(int argc, char **argv) if (test_socket) test_evfilt_read(); + if (test_socket_m) + test_evfilt_read_m(); if (test_signal) test_evfilt_signal(); if (test_vnode) diff --git a/tests/sys/kqueue/libkqueue/read_m.c b/tests/sys/kqueue/libkqueue/read_m.c new file mode 100644 index 000000000000..7aaf0c23aa5a --- /dev/null +++ b/tests/sys/kqueue/libkqueue/read_m.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2009 Mark Heily + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * $FreeBSD$ + */ + +#include "common.h" + +#include +#include +#include + +#define THREAD_CNT (100) +#define PACKET_CNT (5000) +#define FKQMULTI _IO('f', 89) +#define TEST_DEBUG + +struct thread_info { + pthread_t thrd; + int evcnt; + int tid; +}; + +int g_kqfd; +int g_sockfd[2]; +int g_end; +struct thread_info g_thrd_info[THREAD_CNT]; +/* Test threads signals this upon receiving events */ +sem_t g_sem_driver; + +static int +check_sched(void) +{ + int max = 0, min = 999999; + + for(int i = 0; i < THREAD_CNT; i++) { + int cur = g_thrd_info[i].evcnt; + if (cur > max) { + max = cur; + } + if (cur < min) { + min = cur; + } + } + +#ifdef TEST_DEBUG + printf("READ_M: check_sched: max difference is %d\n", max - min); +#endif + + return (max - min) <= 1; +} + +static void +socket_pop() +{ + char buf[1]; + + /* Drain the read buffer, then make sure there are no more events. */ +#ifdef TEST_DEBUG + printf("READ_M: popping the read buffer\n"); +#endif + if (read(g_sockfd[0], &buf[0], 1) < 1) + err(1, "read(2)"); +} + +static void +socket_push() +{ +#ifdef TEST_DEBUG + printf("READ_M: pushing to socket\n"); +#endif + if (write(g_sockfd[1], ".", 1) < 1) { +#ifdef TEST_DEBUG + printf("READ_M: write failed with %d\n", errno); +#endif + err(1, "write(2)"); + } +} + +/* for multi threaded read */ +static void* +test_socket_read_thrd(void* args) +{ + struct thread_info *info = (struct thread_info *) args; + + struct kevent kev; + EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]); + kev.data = 1; + + while (1) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d waiting for events\n", info->tid); +#endif + kevent_cmp(&kev, kevent_get(g_kqfd)); +#ifdef TEST_DEBUG + printf("READ_M: thread %d woke up\n", info->tid); +#endif + socket_pop(); + info->evcnt++; + + /* signal the driver */ + sem_post(&g_sem_driver); + + if (g_end) { +#ifdef TEST_DEBUG + printf("READ_M: thread %d exiting...\n", info->tid); +#endif + break; + } + } + pthread_exit(0); +} + +static void +test_socket_read(void) +{ + int error = 0; + struct kevent kev; + EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]); + + const char *test_id = "[Multi]kevent(EVFILT_READ)"; + sem_init(&g_sem_driver, 0, 0); + g_end = 0; + + test_begin(test_id); + + error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL); + + if (error == -1) { +#ifdef TEST_DEBUG + printf("READ_M: kevent add failed with %d\n", errno); +#endif + err(1, "kevent_add"); + } + +#ifdef TEST_DEBUG + printf("READ_M: creating %d threads...\n", THREAD_CNT); +#endif + for (int i = 0; i < THREAD_CNT; i++) { + g_thrd_info[i].tid = i; + g_thrd_info[i].evcnt = 0; + pthread_create(&g_thrd_info[i].thrd, NULL, test_socket_read_thrd, &g_thrd_info[i]); + } + +#ifdef TEST_DEBUG + printf("READ_M: waiting for threads to wait on KQ...\n"); +#endif + + sleep(3); + + for(int i = 0; i < PACKET_CNT; i++) { +#ifdef TEST_DEBUG + printf("READ_M: processing packet %d\n", i); +#endif + socket_push(); + /* wait for thread events */ + sem_wait(&g_sem_driver); + + if (!check_sched()) { +#ifdef TEST_DEBUG + printf("READ_M: check_sched failed...\n"); +#endif + g_end = 1; + error = 2; + break; + } + + if ((i + THREAD_CNT) == PACKET_CNT - 1) { + g_end = 1; + } + } + + /* shutdown the systems */ + +#ifdef TEST_DEBUG + printf("READ_M: finished testing, system shutting down...\n"); +#endif + for (int i = 0; i < THREAD_CNT; i++) { + pthread_join(g_thrd_info[i].thrd, NULL); + } + + if (!error) + success(); + else + err(error, "kevent"); +} + + +void +test_evfilt_read_m() +{ + /* Create a connected pair of full-duplex sockets for testing socket events */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0) + abort(); + + g_kqfd = kqueue(); + int error = ioctl(g_kqfd, FKQMULTI); + if (error == -1) { +#ifdef TEST_DEBUG + printf("READ_M: ioctl failed with %d\n", errno); +#endif + err(1, "ioctl"); + } + + test_socket_read(); + + close(g_kqfd); + close(g_sockfd[0]); + close(g_sockfd[1]); +}