diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index 5a9d516df898..604f7e42de29 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -99,18 +99,32 @@ MTX_SYSINIT(kq_global, &kq_global, "kqueue order", MTX_DEF); TASKQUEUE_DEFINE_THREAD(kqueue_ctx); +static struct kevq * kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq); +static void kevq_wakeup(struct kevq* kevq); +static void kevq_thred_init(struct kevq_thred *kevq_th); +static void kevq_thred_destroy(struct kevq_thred *kevq_th); +static void kevq_init(struct kevq *kevq); +static void kevq_release(struct kevq* kevq, int locked); +static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp); +static void kevq_destroy(struct kevq *kevq); +static int kevq_acquire(struct kevq *kevq); +void kevq_drain(struct kevq *kevq); + +static void knote_xinit(struct knote *kn); + static int kevent_copyout(void *arg, struct kevent *kevp, int count); static int kevent_copyin(void *arg, struct kevent *kevp, int count); -static int kqueue_register(struct kqueue *kq, struct kevent *kev, - struct thread *td, int mflag); +static int kqueue_register(struct kqueue *kq, struct kevq *kevq, + struct kevent *kev, struct thread *td, int mflag); static int kqueue_acquire(struct file *fp, struct kqueue **kqp); +static int kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp); static void kqueue_release(struct kqueue *kq, int locked); static void kqueue_destroy(struct kqueue *kq); -static void kqueue_drain(struct kqueue *kq, struct thread *td); +static void kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td); static int kqueue_expand(struct kqueue *kq, struct filterops *fops, uintptr_t ident, int mflag); static void kqueue_task(void *arg, int pending); -static int kqueue_scan(struct kqueue *kq, int maxevents, +static int kqueue_scan(struct kevq *kq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *timeout, struct kevent *keva, struct thread *td); @@ -144,14 +158,16 @@ static struct fileops kqueueops = { .fo_fill_kinfo = kqueue_fill_kinfo, }; +static void knote_activate(struct knote *kn, int haskqlock); static int knote_attach(struct knote *kn, struct kqueue *kq); static void knote_drop(struct knote *kn, struct thread *td); static void knote_drop_detached(struct knote *kn, struct thread *td); -static void knote_enqueue(struct knote *kn); +static void knote_enqueue(struct knote *kn, struct kevq *kevq); static void knote_dequeue(struct knote *kn); static void knote_init(void); static struct knote *knote_alloc(int mflag); static void knote_free(struct knote *kn); +static struct kevq* knote_sched(struct knote *kn); static void filt_kqdetach(struct knote *kn); static int filt_kqueue(struct knote *kn, long hint); @@ -209,41 +225,60 @@ static unsigned int kq_calloutmax = 4 * 1024; SYSCTL_UINT(_kern, OID_AUTO, kq_calloutmax, CTLFLAG_RW, &kq_calloutmax, 0, "Maximum number of callouts allocated for kqueue"); -/* XXX - ensure not influx ? */ -#define KNOTE_ACTIVATE(kn, islock) do { \ - if ((islock)) \ - mtx_assert(&(kn)->kn_kq->kq_lock, MA_OWNED); \ - else \ - KQ_LOCK((kn)->kn_kq); \ - (kn)->kn_status |= KN_ACTIVE; \ - if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) \ - knote_enqueue((kn)); \ - if (!(islock)) \ - KQ_UNLOCK((kn)->kn_kq); \ -} while(0) #define KQ_LOCK(kq) do { \ mtx_lock(&(kq)->kq_lock); \ } while (0) -#define KQ_FLUX_WAKEUP(kq) do { \ - if (((kq)->kq_state & KQ_FLUXWAIT) == KQ_FLUXWAIT) { \ - (kq)->kq_state &= ~KQ_FLUXWAIT; \ - wakeup((kq)); \ - } \ +#define KN_FLUX_LOCK(kn) do { \ + mtx_lock(&(kn)->kn_fluxlock); \ } while (0) -#define KQ_UNLOCK_FLUX(kq) do { \ - KQ_FLUX_WAKEUP(kq); \ - mtx_unlock(&(kq)->kq_lock); \ +#define KEVQ_TH_LOCK(kevqth) do { \ + mtx_lock(&(kevqth)->lock); \ +} while (0) +#define KEVQ_LOCK(kevq) do { \ + mtx_lock(&(kevq)->lock); \ +} while (0) +#define KN_FLUX_WAKEUP(kn, haslock) do { \ + if ((haslock)) \ + KN_FLUX_OWNED((kn)); \ + else \ + KN_FLUX_LOCK((kn)); \ + if ((kn)->kn_fluxwait) { \ + (kn)->kn_fluxwait = 0; \ + wakeup((kn)); \ + } \ + if (!(haslock)) \ + KN_FLUX_UNLOCK((kn)); \ } while (0) #define KQ_UNLOCK(kq) do { \ mtx_unlock(&(kq)->kq_lock); \ } while (0) +#define KN_FLUX_UNLOCK(kn) do { \ + mtx_unlock(&(kn)->kn_fluxlock); \ +} while (0) +#define KEVQ_TH_UNLOCK(kevqth) do { \ + mtx_unlock(&(kevqth)->lock); \ +} while (0) +#define KEVQ_UNLOCK(kevq) do { \ + mtx_unlock(&(kevq)->lock); \ +} while (0) #define KQ_OWNED(kq) do { \ mtx_assert(&(kq)->kq_lock, MA_OWNED); \ } while (0) #define KQ_NOTOWNED(kq) do { \ mtx_assert(&(kq)->kq_lock, MA_NOTOWNED); \ } while (0) - +#define KN_FLUX_OWNED(kn) do { \ + mtx_assert(&(kn)->kn_fluxlock, MA_OWNED); \ +} while (0) +#define KN_FLUX_NOTOWNED(kq) do { \ + mtx_assert(&(kn)->kn_fluxlock, MA_NOTOWNED); \ +} while (0) +#define KEVQ_OWNED(kevq) do { \ + mtx_assert(&(kevq)->lock, MA_OWNED); \ +} while (0) +#define KEVQ_NOTOWNED(kevq) do { \ + mtx_assert(&(kevq)->lock, MA_NOTOWNED); \ +} while (0) static struct knlist * kn_list_lock(struct knote *kn) { @@ -278,22 +313,45 @@ kn_in_flux(struct knote *kn) } static void -kn_enter_flux(struct knote *kn) +kn_enter_flux(struct knote *kn, int haslock) { - - KQ_OWNED(kn->kn_kq); +#ifdef KQ_DEBUG + printf("KQUEUE: kn_enter_flux: %p\n", kn); +#endif + if(haslock) { + KN_FLUX_OWNED(kn); + } else { + KN_FLUX_LOCK(kn); + } MPASS(kn->kn_influx < INT_MAX); kn->kn_influx++; + if(!haslock) { + KN_FLUX_UNLOCK(kn); +} } static bool -kn_leave_flux(struct knote *kn) +kn_leave_flux(struct knote *kn, int haslock) { - - KQ_OWNED(kn->kn_kq); +#ifdef KQ_DEBUG + printf("KQUEUE: kn_leave_flux: %p\n", kn); +#endif + if(haslock) { + KN_FLUX_OWNED(kn); + } else { + KN_FLUX_LOCK(kn); + } MPASS(kn->kn_influx > 0); kn->kn_influx--; - return (kn->kn_influx == 0); + + if(!haslock) { + KN_FLUX_UNLOCK(kn); + // RETURN false if the caller doesn't care about the result + // otherwise there might be a race here + return false; + } else { + return (kn->kn_influx == 0); + } } #define KNL_ASSERT_LOCK(knl, islocked) do { \ @@ -316,14 +374,16 @@ kn_leave_flux(struct knote *kn) #ifndef KN_HASHSIZE #define KN_HASHSIZE 64 /* XXX should be tunable */ +#define KEVQ_HASHSIZE 64 #endif #define KN_HASH(val, mask) (((val) ^ (val >> 8)) & (mask)) +#define KEVQ_HASH(val, mask) KN_HASH((val), (mask)) static int filt_nullattach(struct knote *kn) { - + return (ENXIO); }; @@ -377,6 +437,10 @@ filt_fileattach(struct knote *kn) static int kqueue_kqfilter(struct file *fp, struct knote *kn) { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_kqfilter called for kn %p\n", kn); +#endif + struct kqueue *kq = kn->kn_fp->f_data; if (kn->kn_filter != EVFILT_READ) @@ -403,7 +467,15 @@ filt_kqueue(struct knote *kn, long hint) { struct kqueue *kq = kn->kn_fp->f_data; - kn->kn_data = kq->kq_count; +#ifdef KQ_DEBUG + printf("KQUEUE: filt_kqueue called for kn %p\n", kn); +#endif + + if ( (kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + return 0; + } + + kn->kn_data = kq->kq_kevq->kn_count; return (kn->kn_data > 0); } @@ -461,7 +533,7 @@ filt_procattach(struct knote *kn) * is registered. */ if (immediate || (exiting && filt_proc(kn, NOTE_EXIT))) - KNOTE_ACTIVATE(kn, 0); + knote_activate(kn, 0); PROC_UNLOCK(p); @@ -492,6 +564,10 @@ filt_proc(struct knote *kn, long hint) struct proc *p; u_int event; +#ifdef KQ_DEBUG + printf("KQUEUE: filt_proc called for kn %p\n", kn); +#endif + p = kn->kn_ptr.p_proc; if (p == NULL) /* already activated, from attach filter */ return (0); @@ -526,10 +602,11 @@ filt_proc(struct knote *kn, long hint) * child's pid. */ void -knote_fork(struct knlist *list, int pid) +knote_fork(struct knlist *list, struct thread *td, int pid) { struct kqueue *kq; struct knote *kn; + struct kevq *kevq; struct kevent kev; int error; @@ -541,18 +618,12 @@ knote_fork(struct knlist *list, int pid) memset(&kev, 0, sizeof(kev)); SLIST_FOREACH(kn, &list->kl_list, kn_selnext) { kq = kn->kn_kq; - KQ_LOCK(kq); - if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { - KQ_UNLOCK(kq); - continue; - } + kevq = kn->kn_org_kevq; - /* - * The same as knote(), activate the event. - */ - if ((kn->kn_sfflags & NOTE_TRACK) == 0) { - if (kn->kn_fop->f_event(kn, NOTE_FORK)) - KNOTE_ACTIVATE(kn, 1); + KQ_LOCK(kq); + KN_FLUX_LOCK(kn); + if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); continue; } @@ -563,8 +634,32 @@ knote_fork(struct knlist *list, int pid) * track the child. Drop the locks in preparation for * the call to kqueue_register(). */ - kn_enter_flux(kn); + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); + + /* + * The same as knote(), activate the event. + */ + if ((kn->kn_sfflags & NOTE_TRACK) == 0) { + if (kn->kn_fop->f_event(kn, NOTE_FORK)) + knote_activate(kn, 1); + + KN_FLUX_LOCK(kn); + kn_leave_flux(kn, 1); + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); + KQ_UNLOCK(kq); + continue; + } + KQ_UNLOCK(kq); + + /* + * The NOTE_TRACK case. In addition to the activation + * of the event, we need to register new events to + * track the child. Drop the locks in preparation for + * the call to kqueue_register(). + */ list->kl_unlock(list->kl_lockarg); /* @@ -583,7 +678,7 @@ knote_fork(struct knlist *list, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; @@ -597,15 +692,17 @@ knote_fork(struct knlist *list, int pid) kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ - error = kqueue_register(kq, &kev, NULL, M_NOWAIT); + error = kqueue_register(kq, kevq, &kev, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) - KNOTE_ACTIVATE(kn, 0); + knote_activate(kn, 0); list->kl_lock(list->kl_lockarg); - KQ_LOCK(kq); - kn_leave_flux(kn); - KQ_UNLOCK_FLUX(kq); + + KN_FLUX_LOCK(kn); + kn_leave_flux(kn, 1); + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); } } @@ -687,7 +784,14 @@ filt_timerexpire(void *knx) kn = knx; kn->kn_data++; - KNOTE_ACTIVATE(kn, 0); /* XXX - handle locking */ + + kn_enter_flux(kn, 0); + + knote_activate(kn, 0); + + /* TODO: lock? */ + kn_leave_flux(kn, 0); + KN_FLUX_WAKEUP(kn, 0); if ((kn->kn_flags & EV_ONESHOT) != 0) return; @@ -794,6 +898,7 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) { struct kq_timer_cb_data *kc; struct kqueue *kq; + struct kevq *kevq; sbintime_t to; int error; @@ -822,9 +927,13 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) * - clear the count of expiration events */ kq = kn->kn_kq; + kevq = kn->kn_kevq; KQ_LOCK(kq); - if (kn->kn_status & KN_QUEUED) + if (kn->kn_status & KN_QUEUED) { + KEVQ_LOCK(kevq); knote_dequeue(kn); + KEVQ_UNLOCK(kevq); + } kn->kn_status &= ~KN_ACTIVE; kn->kn_data = 0; @@ -859,14 +968,16 @@ filt_timertouch(struct knote *kn, struct kevent *kev, u_long type) static int filt_timer(struct knote *kn, long hint) { - +#ifdef KQ_DEBUG + printf("KQUEUE: filt_timer called for kn %p\n", kn); +#endif return (kn->kn_data != 0); } static int filt_userattach(struct knote *kn) { - + /* * EVFILT_USER knotes are not attached to anything in the kernel. */ @@ -890,7 +1001,9 @@ filt_userdetach(__unused struct knote *kn) static int filt_user(struct knote *kn, __unused long hint) { - +#ifdef KQ_DEBUG + printf("KQUEUE: filt_user called for kn %p\n", kn); +#endif return (kn->kn_hookid); } @@ -961,9 +1074,7 @@ sys_kqueue(struct thread *td, struct kqueue_args *uap) static void kqueue_init(struct kqueue *kq) { - mtx_init(&kq->kq_lock, "kqueue", NULL, MTX_DEF | MTX_DUPOK); - TAILQ_INIT(&kq->kq_head); knlist_init_mtx(&kq->kq_sel.si_note, &kq->kq_lock); TASK_INIT(&kq->kq_task, 0, kqueue_task, kq); } @@ -1209,8 +1320,27 @@ kern_kevent(struct thread *td, int fd, int nchanges, int nevents, return (error); } +static struct kevq * +kevqlist_find(struct kevqlist *kevq_list, struct kqueue *kq) +{ + struct kevq *kevq_found, *kevq_each; + + kevq_found = NULL; + + if (!SLIST_EMPTY(kevq_list)) { + SLIST_FOREACH(kevq_each, kevq_list, kevq_th_e) { + if (kevq_each->kq == kq) { + kevq_found = kevq_each; + break; + } + } + } + + return kevq_found; +} + static int -kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, +kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchanges, int nevents, struct kevent_copyops *k_ops, const struct timespec *timeout) { struct kevent keva[KQ_NEVENTS]; @@ -1229,7 +1359,7 @@ kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, if (!kevp->filter) continue; kevp->flags &= ~EV_SYSFLAGS; - error = kqueue_register(kq, kevp, td, M_WAITOK); + error = kqueue_register(kq, kevq, kevp, td, M_WAITOK); if (error || (kevp->flags & EV_RECEIPT)) { if (nevents == 0) return (error); @@ -1247,7 +1377,7 @@ kqueue_kevent(struct kqueue *kq, struct thread *td, int nchanges, int nevents, return (0); } - return (kqueue_scan(kq, nevents, k_ops, timeout, keva, td)); + return (kqueue_scan(kevq, nevents, k_ops, timeout, keva, td)); } int @@ -1255,13 +1385,17 @@ kern_kevent_fp(struct thread *td, struct file *fp, int nchanges, int nevents, struct kevent_copyops *k_ops, const struct timespec *timeout) { struct kqueue *kq; + struct kevq *kevq; int error; + + error = kqueue_acquire_both(fp, td, &kq, &kevq); - error = kqueue_acquire(fp, &kq); if (error != 0) return (error); - error = kqueue_kevent(kq, td, nchanges, nevents, k_ops, timeout); + + error = kqueue_kevent(kq, kevq, td, nchanges, nevents, k_ops, timeout); kqueue_release(kq, 0); + kevq_release(kevq, 0); return (error); } @@ -1274,12 +1408,18 @@ kern_kevent_anonymous(struct thread *td, int nevents, struct kevent_copyops *k_ops) { struct kqueue kq = {}; + struct kevq kevq = {}; int error; kqueue_init(&kq); + kevq_init(&kevq); + kq.kq_kevq = &kevq; + kevq.kq = &kq; kq.kq_refcnt = 1; - error = kqueue_kevent(&kq, td, nevents, nevents, k_ops, NULL); - kqueue_drain(&kq, td); + kevq.kevq_refcnt = 1; + error = kqueue_kevent(&kq, &kevq, td, nevents, nevents, k_ops, NULL); + // TODO: kevq destroy called here but memory not dynamically allocated + kqueue_drain(&kq, &kevq, td); kqueue_destroy(&kq); return (error); } @@ -1373,7 +1513,7 @@ kqueue_fo_release(int filt) * A ref to kq (obtained via kqueue_acquire) must be held. */ static int -kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, +kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct thread *td, int mflag) { struct filterops *fops; @@ -1382,6 +1522,9 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, struct knlist *knl; int error, filt, event; int haskqglobal, filedesc_unlock; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_register: kq %p, kevq %p, ident: %d, filter: %d, flags: 0x%X\n", kq, kevq, (int)kev->ident, kev->filter, kev->flags); +#endif if ((kev->flags & (EV_ENABLE | EV_DISABLE)) == (EV_ENABLE | EV_DISABLE)) return (EINVAL); @@ -1491,20 +1634,26 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, } /* knote is in the process of changing, wait for it to stabilize. */ - if (kn != NULL && kn_in_flux(kn)) { - KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); - if (filedesc_unlock) { - FILEDESC_XUNLOCK(td->td_proc->p_fd); - filedesc_unlock = 0; + if (kn != NULL) { + KN_FLUX_LOCK(kn); + if (kn_in_flux(kn)) { + KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); + if (filedesc_unlock) { + FILEDESC_XUNLOCK(td->td_proc->p_fd); + filedesc_unlock = 0; + } + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK | PDROP, "kqflxwt", 0); + + if (fp != NULL) { + fdrop(fp, td); + fp = NULL; + } + goto findkn; } - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK | PDROP, "kqflxwt", 0); - if (fp != NULL) { - fdrop(fp, td); - fp = NULL; - } - goto findkn; } + /* We now have exclusive access to the knote with flux lock */ /* * kn now contains the matching knote, or NULL if no match @@ -1518,6 +1667,8 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, error = ENOMEM; goto done; } + knote_xinit(kn); + kn->kn_kevq = kevq; kn->kn_fp = fp; kn->kn_kq = kq; kn->kn_fop = fops; @@ -1538,7 +1689,7 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, kn->kn_status = KN_DETACHED; if ((kev->flags & EV_DISABLE) != 0) kn->kn_status |= KN_DISABLED; - kn_enter_flux(kn); + kn_enter_flux(kn, 0); error = knote_attach(kn, kq); KQ_UNLOCK(kq); @@ -1562,15 +1713,24 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, } if (kev->flags & EV_DELETE) { - kn_enter_flux(kn); + /* We have the exclusive flux lock here */ + kn_enter_flux(kn, 1); + + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); + knote_drop(kn, td); goto done; } + /* We have the exclusive lock */ + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); + + // we have kq lock and knote influx if (kev->flags & EV_FORCEONESHOT) { kn->kn_flags |= EV_ONESHOT; - KNOTE_ACTIVATE(kn, 1); + knote_activate(kn, 1); } if ((kev->flags & EV_ENABLE) != 0) @@ -1579,13 +1739,14 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, kn->kn_status |= KN_DISABLED; /* - * The user may change some filter values after the initial EV_ADD, - * but doing so will not reset any filter which has already been + * The user may changkhe initial EV_ADD, + * but doing so will kas already been * triggered. */ kn->kn_status |= KN_SCAN; - kn_enter_flux(kn); + KQ_UNLOCK(kq); + knl = kn_list_lock(kn); kn->kn_kevent.udata = kev->udata; if (!fops->f_isfd && fops->f_touch != NULL) { @@ -1614,13 +1775,18 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, KQ_LOCK(kq); if (event) kn->kn_status |= KN_ACTIVE; - if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == - KN_ACTIVE) - knote_enqueue(kn); + if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == KN_ACTIVE) + knote_activate(kn, 1); + kn->kn_status &= ~KN_SCAN; - kn_leave_flux(kn); + + KN_FLUX_LOCK(kn); + kn_leave_flux(kn, 1); + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); + + KQ_UNLOCK(kq); kn_list_unlock(knl); - KQ_UNLOCK_FLUX(kq); done: KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); @@ -1634,26 +1800,219 @@ kqueue_register(struct kqueue *kq, struct kevent *kev, struct thread *td, return (error); } +static void +kevq_thred_init(struct kevq_thred *kevq_th) { + mtx_init(&kevq_th->lock, "kevq_th", NULL, MTX_DEF | MTX_DUPOK); + TAILQ_INIT(&kevq_th->kevq_tq); +} + +static void +kevq_thred_destroy(struct kevq_thred *kevq_th) { + free(kevq_th->kevq_hash, M_KQUEUE); + free(kevq_th, M_KQUEUE); +} + +void +kevq_thred_drain(struct kevq_thred *kevq_th) { + struct kevq *kevq; + int error; + + error = 0; + while((kevq = TAILQ_FIRST(&kevq_th->kevq_tq)) != NULL) { + // multithreaded mode, acquire kevq ref cnt + error = kevq_acquire(kevq); + if (!error) { + kevq_drain(kevq); + } + } + + kevq_thred_destroy(kevq_th); +} + +static void +kevq_init(struct kevq *kevq) { + mtx_init(&kevq->lock, "kevq", NULL, MTX_DEF | MTX_DUPOK); + TAILQ_INIT(&kevq->kn_head); +} + +static void +kevq_release(struct kevq* kevq, int locked) +{ + if (locked) + KEVQ_OWNED(kevq); + else + KEVQ_LOCK(kevq); + kevq->kevq_refcnt--; + if (kevq->kevq_refcnt == 1) + wakeup(&kevq->kevq_refcnt); + if (!locked) + KEVQ_UNLOCK(kevq); +} + +static int +kevq_acquire(struct kevq *kevq) +{ + KEVQ_NOTOWNED(kevq); + + int error; + error = 0; + KEVQ_LOCK(kevq); + if ((kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { + error = EINVAL; + } else { + kevq->kevq_refcnt++; + } + KEVQ_UNLOCK(kevq); + return error; +} + +/* a reference to kq should be held */ +static int +kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) +{ + int error; + void* to_free; + struct kevq_thred *kevq_th; + struct kevq *kevq; + struct kevqlist *kevq_list; + + kevq = NULL; + error = 0; + to_free = NULL; + kevq_th = NULL; + + KQ_NOTOWNED(kq); + + if ((kq->kq_state & KQ_CLOSING) == KQ_CLOSING) { + return EINVAL; + } + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if (td->td_kevq_thred == NULL) { + kevq_th = malloc(sizeof(struct kevq_thred), M_KQUEUE, M_WAITOK | M_ZERO); + kevq_thred_init(kevq_th); + kevq_th->kevq_hash = hashinit_flags(KEVQ_HASHSIZE, M_KQUEUE, &kevq_th->kevq_hashmask , HASH_WAITOK); + + thread_lock(td); + if (td->td_kevq_thred == NULL) { + td->td_kevq_thred = kevq_th; +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(M): allocated kevq_th %p for thread %d\n", kevq_th, td->td_tid); +#endif + } else { + to_free = kevq_th; + kevq_th = td->td_kevq_thred; + } + thread_unlock(td); + if (to_free != NULL) { + free(((struct kevq_thred*)to_free)->kevq_hash, M_KQUEUE); + free(to_free, M_KQUEUE); + } + } else { + kevq_th = td->td_kevq_thred; + } + + KASSERT(kevq_th != NULL && kevq_th->kevq_hashmask != 0, ("unallocated kevq")); + + KEVQ_TH_LOCK(kevq_th); + kevq_list = &kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq_th->kevq_hashmask)]; + kevq = kevqlist_find(kevq_list, kq); + if (kevq == NULL) { + // allocate kevq and add to hash table + // TODO: very bad sleep while holding the lock + kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); + kevq_init(kevq); + kevq->kq = kq; + kevq->kevq_th = kevq_th; + SLIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); + TAILQ_INSERT_HEAD(&kevq_th->kevq_tq, kevq, kevq_th_tqe); + KEVQ_TH_UNLOCK(kevq_th); + + KQ_LOCK(kq); + TAILQ_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); + KQ_UNLOCK(kq); +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", kevq, td->td_tid); +#endif + } else { + KEVQ_TH_UNLOCK(kevq_th); + } + + } else { + if (kq->kq_kevq == NULL) { + kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(S): allocated kevq %p for kq %p\n", kevq, kq); +#endif + kevq_init(kevq); + kevq->kq = kq; + + KQ_LOCK(kq); + if (kq->kq_kevq == NULL) { + kq->kq_kevq = kevq; + } else { + to_free = kevq; + } + KQ_UNLOCK(kq); + + if (to_free != NULL) { + free(to_free, M_KQUEUE); + } + } else { + kevq = kq->kq_kevq; + } + } + + error = kevq_acquire(kevq); + + if (!error) { + *kevqp = kevq; + } + + return error; +} + static int kqueue_acquire(struct file *fp, struct kqueue **kqp) { - int error; struct kqueue *kq; - error = 0; - kq = fp->f_data; if (fp->f_type != DTYPE_KQUEUE || kq == NULL) return (EBADF); *kqp = kq; + KQ_LOCK(kq); - if ((kq->kq_state & KQ_CLOSING) == KQ_CLOSING) { - KQ_UNLOCK(kq); - return (EBADF); + /* Mark the kqueue as initialized */ + if ((kq->kq_state & KQ_FLAG_INIT) == 0) { + kq->kq_state |= KQ_FLAG_INIT; } kq->kq_refcnt++; KQ_UNLOCK(kq); + return 0; +} + +static int +kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp) +{ + int error; + struct kqueue *tmp_kq; + struct kevq *tmp_kevq; + + error = kqueue_acquire(fp, &tmp_kq); + + if (!error) { + error = kevq_acquire_kq(tmp_kq, td, &tmp_kevq); + } + + if (error) { + kqueue_release(tmp_kq, 0); + } else { + *kqp = tmp_kq; + *kevqp = tmp_kevq; + } + return error; } @@ -1785,7 +2144,7 @@ kqueue_task(void *arg, int pending) * We treat KN_MARKER knotes as if they are in flux. */ static int -kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, +kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, const struct timespec *tsp, struct kevent *keva, struct thread *td) { struct kevent *kevp; @@ -1826,18 +2185,27 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, } else asbt = 0; marker = knote_alloc(M_WAITOK); + knote_xinit(marker); marker->kn_status = KN_MARKER; - KQ_LOCK(kq); - + KEVQ_LOCK(kevq); retry: kevp = keva; - if (kq->kq_count == 0) { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: kevq %p has %d events\n", kevq, kevq->kn_count); +#endif + if (kevq->kn_count == 0) { if (asbt == -1) { error = EWOULDBLOCK; } else { - kq->kq_state |= KQ_SLEEP; - error = msleep_sbt(kq, &kq->kq_lock, PSOCK | PCATCH, + kevq->kevq_state |= KEVQ_SLEEP; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d waiting on kevq %p for events\n", td->td_tid, kevq); +#endif + error = msleep_sbt(kevq, &kevq->lock, PSOCK | PCATCH, "kqread", asbt, rsbt, C_ABSOLUTE); +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d wokeup on kevq %p for events\n", td->td_tid, kevq); +#endif } if (error == 0) goto retry; @@ -1849,34 +2217,46 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, goto done; } - TAILQ_INSERT_TAIL(&kq->kq_head, marker, kn_tqe); + KEVQ_OWNED(kevq); + TAILQ_INSERT_TAIL(&kevq->kn_head, marker, kn_tqe); influx = 0; - while (count) { - KQ_OWNED(kq); - kn = TAILQ_FIRST(&kq->kq_head); + while (count) { + KEVQ_OWNED(kevq); + kn = TAILQ_FIRST(&kevq->kn_head); + + KN_FLUX_LOCK(kn); if ((kn->kn_status == KN_MARKER && kn != marker) || kn_in_flux(kn)) { if (influx) { influx = 0; - KQ_FLUX_WAKEUP(kq); + KN_FLUX_WAKEUP(kn, 1); } - kq->kq_state |= KQ_FLUXWAIT; - error = msleep(kq, &kq->kq_lock, PSOCK, + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + error = msleep(kn, &kevq->lock, PSOCK, "kqflxwt", 0); continue; } - TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + /* Now we have exclusive access to kn */ + TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: thread %d on kevq %p dequeued knote %p\n", td->td_tid, kevq, kn); +#endif if ((kn->kn_status & KN_DISABLED) == KN_DISABLED) { kn->kn_status &= ~KN_QUEUED; - kq->kq_count--; + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); continue; } if (kn == marker) { - KQ_FLUX_WAKEUP(kq); - if (count == maxevents) + /* We are dequeuing our marker, wakeup threads waiting on it */ + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); + if (count == maxevents) { goto retry; + } goto done; } KASSERT(!kn_in_flux(kn), @@ -1884,54 +2264,59 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, if ((kn->kn_flags & EV_DROP) == EV_DROP) { kn->kn_status &= ~KN_QUEUED; - kn_enter_flux(kn); - kq->kq_count--; - KQ_UNLOCK(kq); + kn_enter_flux(kn, 1); + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); /* * We don't need to lock the list since we've * marked it as in flux. */ knote_drop(kn, td); - KQ_LOCK(kq); + KEVQ_LOCK(kevq); continue; } else if ((kn->kn_flags & EV_ONESHOT) == EV_ONESHOT) { kn->kn_status &= ~KN_QUEUED; - kn_enter_flux(kn); - kq->kq_count--; - KQ_UNLOCK(kq); + kn_enter_flux(kn, 1); + kevq->kn_count--; + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); /* * We don't need to lock the list since we've * marked the knote as being in flux. */ *kevp = kn->kn_kevent; knote_drop(kn, td); - KQ_LOCK(kq); + KEVQ_LOCK(kevq); kn = NULL; } else { kn->kn_status |= KN_SCAN; - kn_enter_flux(kn); - KQ_UNLOCK(kq); - if ((kn->kn_status & KN_KQUEUE) == KN_KQUEUE) + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); + KEVQ_UNLOCK(kevq); + if ((kn->kn_status & KN_KQUEUE) == KN_KQUEUE) { + /* TODO: we are waiting for another kqueue + */ KQ_GLOBAL_LOCK(&kq_global, haskqglobal); + } knl = kn_list_lock(kn); if (kn->kn_fop->f_event(kn, 0) == 0) { - KQ_LOCK(kq); + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE | KN_SCAN); - kn_leave_flux(kn); - kq->kq_count--; + kn_leave_flux(kn, 0); + kevq->kn_count--; kn_list_unlock(knl); influx = 1; continue; } - touch = (!kn->kn_fop->f_isfd && - kn->kn_fop->f_touch != NULL); + touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); if (touch) kn->kn_fop->f_touch(kn, kevp, EVENT_PROCESS); else *kevp = kn->kn_kevent; - KQ_LOCK(kq); + KEVQ_LOCK(kevq); KQ_GLOBAL_UNLOCK(&kq_global, haskqglobal); if (kn->kn_flags & (EV_CLEAR | EV_DISPATCH)) { /* @@ -1945,12 +2330,12 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, if (kn->kn_flags & EV_DISPATCH) kn->kn_status |= KN_DISABLED; kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); - kq->kq_count--; + kevq->kn_count--; } else - TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); + TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); kn->kn_status &= ~KN_SCAN; - kn_leave_flux(kn); + kn_leave_flux(kn, 0); kn_list_unlock(knl); influx = 1; } @@ -1962,22 +2347,28 @@ kqueue_scan(struct kqueue *kq, int maxevents, struct kevent_copyops *k_ops, if (nkev == KQ_NEVENTS) { influx = 0; - KQ_UNLOCK_FLUX(kq); + KN_FLUX_WAKEUP(kn, 0); + KEVQ_UNLOCK(kevq); error = k_ops->k_copyout(k_ops->arg, keva, nkev); nkev = 0; kevp = keva; - KQ_LOCK(kq); + KEVQ_LOCK(kevq); if (error) break; } } - TAILQ_REMOVE(&kq->kq_head, marker, kn_tqe); + TAILQ_REMOVE(&kevq->kn_head, marker, kn_tqe); done: - KQ_OWNED(kq); - KQ_UNLOCK_FLUX(kq); + KEVQ_OWNED(kevq); + KEVQ_UNLOCK(kevq); + + if (kn != NULL) { + KN_FLUX_WAKEUP(kn, 0); + } + knote_free(marker); done_nl: - KQ_NOTOWNED(kq); + KEVQ_NOTOWNED(kevq); if (nkev != 0) error = k_ops->k_copyout(k_ops->arg, keva, nkev); td->td_retval[0] = maxevents - count; @@ -2028,6 +2419,19 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, return (0); } #endif + struct kqueue *kq; + + kq = fp->f_data; + switch (cmd) { + case FKQMULTI: + if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { + return (EINVAL); + } else { + KQ_LOCK(kq); + kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); + KQ_UNLOCK(kq); + } + } return (ENOTTY); } @@ -2038,24 +2442,34 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred, struct thread *td) { struct kqueue *kq; + struct kevq *kevq; int revents = 0; int error; - if ((error = kqueue_acquire(fp, &kq))) + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq))) return POLLERR; - KQ_LOCK(kq); - if (events & (POLLIN | POLLRDNORM)) { - if (kq->kq_count) { - revents |= events & (POLLIN | POLLRDNORM); + if ((kq->kq_state & KQ_FLAG_MULTI) != KQ_FLAG_MULTI ) { + // TODO: support select? really need it in multithreaded mode? + KQ_LOCK(kq); + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + revents = 0; } else { - selrecord(td, &kq->kq_sel); - if (SEL_WAITING(&kq->kq_sel)) - kq->kq_state |= KQ_SEL; + if (events & (POLLIN | POLLRDNORM)) { + if (kq->kq_kevq->kn_count) { + revents |= events & (POLLIN | POLLRDNORM); + } else { + selrecord(td, &kq->kq_sel); + if (SEL_WAITING(&kq->kq_sel)) + kq->kq_state |= KQ_SEL; + } + } } + + kqueue_release(kq, 1); + KQ_UNLOCK(kq); + kevq_release(kevq, 0); } - kqueue_release(kq, 1); - KQ_UNLOCK(kq); return (revents); } @@ -2078,10 +2492,120 @@ kqueue_stat(struct file *fp, struct stat *st, struct ucred *active_cred, } static void -kqueue_drain(struct kqueue *kq, struct thread *td) +kevq_destroy(struct kevq *kevq) +{ + free(kevq, M_KQUEUE); +} + +/* This is called on every kevq when kqueue exits + This is also called when a thread exits/crashes + * a ref cnt must be held */ +void +kevq_drain(struct kevq *kevq) +{ + struct kqueue *kq; + struct knote *kn; + struct kevq *new_kevq; + struct kevqlist *kevq_list; +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_drain for %p with %d knotes\n", kevq, kevq->kn_count); +#endif + kq = kevq->kq; + + KQ_NOTOWNED(kq); + KEVQ_NOTOWNED(kevq); + + KEVQ_LOCK(kevq); + // now the refcnt is 1 + if(kevq->kevq_state == KEVQ_CLOSING) { + // already closing, dereference + kevq_release(kevq, 1); + KEVQ_UNLOCK(kevq); + return; + } else { + kevq->kevq_state |= KEVQ_CLOSING; + } + + if (kevq->kevq_refcnt > 1) + msleep(&kevq->kevq_refcnt, &kevq->lock, PSOCK, "kevqclose1", 0); + + KEVQ_OWNED(kevq); + KASSERT(kevq->kevq_refcnt == 1, ("other refs of kevq are out there!")); + + /* drain all knotes on the kevq */ + TAILQ_FOREACH(kn, &kevq->kn_head, kn_tqe) { + +retry: + KEVQ_OWNED(kevq); + KN_FLUX_LOCK(kn); + /* Wait for kn to stablize */ + if (kn_in_flux(kn)) { + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kevq->lock, PSOCK, "kevqclo1", 0); + goto retry; + } + + KN_FLUX_OWNED(kn); + KASSERT(!kn_in_flux(kn), ("knote is still influx")); + + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); + knote_dequeue(kn); + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) { + KEVQ_UNLOCK(kevq); + + // reschedule knote iff the outer kq is not closing (caused by thread exit or crash) and KQ is in multicore mode + KQ_LOCK(kq); + new_kevq = knote_sched(kn); + KEVQ_LOCK(new_kevq); + knote_enqueue(kn, new_kevq); + KEVQ_UNLOCK(new_kevq); + KQ_UNLOCK(kq); + + /* relock the current kevq */ + KEVQ_LOCK(kevq); + } + + KN_FLUX_LOCK(kn); + kn_leave_flux(kn, 1); + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); + } + + KASSERT(kevq->kn_count == 0, ("some knotes are left")); + KEVQ_UNLOCK(kevq); + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + // drop from kq if in multithreaded mode + KQ_LOCK(kq); + TAILQ_REMOVE(&kq->kq_kevqlist, kevq, kq_e); + KQ_UNLOCK(kq); + + KEVQ_TH_LOCK(kevq->kevq_th); + TAILQ_REMOVE(&kevq->kevq_th->kevq_tq, kevq, kevq_th_tqe); + kevq_list = &kevq->kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq->kevq_th->kevq_hashmask)]; + SLIST_REMOVE(kevq_list, kevq, kevq, kevq_th_e); + KEVQ_TH_UNLOCK(kevq->kevq_th); + } + + /* delete the kevq */ + kevq_destroy(kevq); +} + +/* kevq is only used when kq is in single mode + in this case kevq has been referenced by the caller */ +static void +kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) { struct knote *kn; int i; + int error; + +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_drain on %p. args kevq %p\n", kq, kevq); +#endif KQ_LOCK(kq); @@ -2096,14 +2620,36 @@ kqueue_drain(struct kqueue *kq, struct thread *td) KASSERT(knlist_empty(&kq->kq_sel.si_note), ("kqueue's knlist not empty")); + + KQ_UNLOCK(kq); + + error = 0; + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + /* drain all kevqs belonging to the kq */ + TAILQ_FOREACH(kevq, &kq->kq_kevqlist, kq_e) { + error = kevq_acquire(kevq); + if (!error) { + kevq_drain(kevq); + } + } + } else { + if (kevq != NULL) { + kevq_drain(kevq); + } + } + KQ_LOCK(kq); + for (i = 0; i < kq->kq_knlistsize; i++) { while ((kn = SLIST_FIRST(&kq->kq_knlist[i])) != NULL) { + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, "kqclo1", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqclo1", 0); continue; } - kn_enter_flux(kn); + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop(kn, td); KQ_LOCK(kq); @@ -2112,13 +2658,15 @@ kqueue_drain(struct kqueue *kq, struct thread *td) if (kq->kq_knhashmask != 0) { for (i = 0; i <= kq->kq_knhashmask; i++) { while ((kn = SLIST_FIRST(&kq->kq_knhash[i])) != NULL) { + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, - "kqclo2", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqclo2", 0); continue; } - kn_enter_flux(kn); + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop(kn, td); KQ_LOCK(kq); @@ -2163,13 +2711,22 @@ static int kqueue_close(struct file *fp, struct thread *td) { struct kqueue *kq = fp->f_data; + struct kevq *kevq = NULL; struct filedesc *fdp; int error; int filedesc_unlock; - if ((error = kqueue_acquire(fp, &kq))) - return error; - kqueue_drain(kq, td); + if ((kq->kq_state | KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + // only acquire the kqueue lock here + if ((error = kqueue_acquire(fp, &kq))) + return error; + } else { + // acquire both + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq))) + return error; + } + + kqueue_drain(kq, kevq, td); /* * We could be called due to the knote_drop() doing fdrop(), @@ -2205,15 +2762,20 @@ kqueue_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp) return (0); } +static void +kevq_wakeup(struct kevq* kevq) +{ + KEVQ_OWNED(kevq); + if ((kevq->kevq_state & KEVQ_SLEEP) == KEVQ_SLEEP) { + kevq->kevq_state &= ~KEVQ_SLEEP; + wakeup(kevq); + } +} + static void kqueue_wakeup(struct kqueue *kq) { KQ_OWNED(kq); - - if ((kq->kq_state & KQ_SLEEP) == KQ_SLEEP) { - kq->kq_state &= ~KQ_SLEEP; - wakeup(kq); - } if ((kq->kq_state & KQ_SEL) == KQ_SEL) { selwakeuppri(&kq->kq_sel, PSOCK); if (!SEL_WAITING(&kq->kq_sel)) @@ -2239,7 +2801,7 @@ knote(struct knlist *list, long hint, int lockflags) { struct kqueue *kq; struct knote *kn, *tkn; - int error; + int require_kqlock; if (list == NULL) return; @@ -2257,8 +2819,7 @@ knote(struct knlist *list, long hint, int lockflags) * or other threads could remove events. */ SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { - kq = kn->kn_kq; - KQ_LOCK(kq); + KN_FLUX_LOCK(kn); if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { /* * Do not process the influx notes, except for @@ -2268,26 +2829,75 @@ knote(struct knlist *list, long hint, int lockflags) * fragment in kqueue_scan() locks the knlist, * and cannot proceed until we finished. */ - KQ_UNLOCK(kq); - } else if ((lockflags & KNF_NOKQLOCK) != 0) { - kn_enter_flux(kn); - KQ_UNLOCK(kq); - error = kn->kn_fop->f_event(kn, hint); - KQ_LOCK(kq); - kn_leave_flux(kn); - if (error) - KNOTE_ACTIVATE(kn, 1); - KQ_UNLOCK_FLUX(kq); + KN_FLUX_UNLOCK(kn); } else { + kq = kn->kn_kq; + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); + + require_kqlock = ((lockflags & KNF_NOKQLOCK) == 0); + + if (require_kqlock) + KQ_LOCK(kq); + if (kn->kn_fop->f_event(kn, hint)) - KNOTE_ACTIVATE(kn, 1); - KQ_UNLOCK(kq); + knote_activate(kn, require_kqlock); + + if (require_kqlock) + KQ_UNLOCK(kq); + + KN_FLUX_LOCK(kn); + kn_leave_flux(kn, 1); + KN_FLUX_WAKEUP(kn, 1); + KN_FLUX_UNLOCK(kn); } } if ((lockflags & KNF_LISTLOCKED) == 0) list->kl_unlock(list->kl_lockarg); } +/* + * activate a knote + * the knote should be marked in flux and the knote flux lock should not be owned + */ +static void +knote_activate(struct knote *kn, int haskqlock) +{ + struct kevq *kevq; + struct kqueue *kq; + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_activate: kn %p\n", kn); +#endif + kq = kn->kn_kq; + + if (haskqlock) { + KQ_OWNED(kq); + } else { + KQ_LOCK(kq); + } + + KN_FLUX_NOTOWNED(kn); + KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); + + kevq = knote_sched(kn); + + (kn)->kn_status |= KN_ACTIVE; + + if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_activate: kn %p queued to kevq %p\n", kn, kevq); +#endif + KEVQ_LOCK(kevq); + knote_enqueue(kn, kevq); + KEVQ_UNLOCK(kevq); + } + + if (!haskqlock) { + KQ_UNLOCK(kq); + } +} + /* * add a knote to a knlist */ @@ -2514,18 +3124,22 @@ knlist_cleardel(struct knlist *knl, struct thread *td, int islocked, int killkn) SLIST_FOREACH_SAFE(kn, &knl->kl_list, kn_selnext, kn2) { kq = kn->kn_kq; KQ_LOCK(kq); + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); continue; } knlist_remove_kq(knl, kn, 1, 1); if (killkn) { - kn_enter_flux(kn); + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); knote_drop_detached(kn, td); } else { /* Make sure cleared knotes disappear soon */ kn->kn_flags |= EV_EOF | EV_ONESHOT; + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); } kq = NULL; @@ -2536,10 +3150,12 @@ knlist_cleardel(struct knlist *knl, struct thread *td, int islocked, int killkn) kn = SLIST_FIRST(&knl->kl_list); kq = kn->kn_kq; KQ_LOCK(kq); + KN_FLUX_LOCK(kn); KASSERT(kn_in_flux(kn), ("knote removed w/o list lock")); knl->kl_unlock(knl->kl_lockarg); - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK | PDROP, "kqkclr", 0); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK | PDROP, "kqkclr", 0); kq = NULL; goto again; } @@ -2578,21 +3194,26 @@ knote_fdclose(struct thread *td, int fd) influx = 0; while (kq->kq_knlistsize > fd && (kn = SLIST_FIRST(&kq->kq_knlist[fd])) != NULL) { + KQ_OWNED(kq); + KN_FLUX_LOCK(kn); if (kn_in_flux(kn)) { /* someone else might be waiting on our knote */ if (influx) - wakeup(kq); - kq->kq_state |= KQ_FLUXWAIT; - msleep(kq, &kq->kq_lock, PSOCK, "kqflxwt", 0); + KN_FLUX_WAKEUP(kn, 1); + kn->kn_fluxwait = 1; + KN_FLUX_UNLOCK(kn); + msleep(kn, &kq->kq_lock, PSOCK, "kqflxwt", 0); goto again; } - kn_enter_flux(kn); + kn_enter_flux(kn, 1); + KN_FLUX_UNLOCK(kn); KQ_UNLOCK(kq); + /* WTF ? influx should be 0? */ influx = 1; knote_drop(kn, td); KQ_LOCK(kq); } - KQ_UNLOCK_FLUX(kq); + KQ_UNLOCK(kq); } } @@ -2632,15 +3253,18 @@ static void knote_drop_detached(struct knote *kn, struct thread *td) { struct kqueue *kq; + struct kevq *kevq; struct klist *list; kq = kn->kn_kq; + kevq = kn->kn_kevq; KASSERT((kn->kn_status & KN_DETACHED) != 0, ("knote %p still attached", kn)); KQ_NOTOWNED(kq); KQ_LOCK(kq); + KASSERT(kn->kn_influx == 1, ("knote_drop called on %p with influx %d", kn, kn->kn_influx)); @@ -2651,9 +3275,14 @@ knote_drop_detached(struct knote *kn, struct thread *td) if (!SLIST_EMPTY(list)) SLIST_REMOVE(list, kn, knote, kn_link); - if (kn->kn_status & KN_QUEUED) + + KQ_UNLOCK(kq); + + if (kn->kn_status & KN_QUEUED) { + KEVQ_LOCK(kevq); knote_dequeue(kn); - KQ_UNLOCK_FLUX(kq); + KEVQ_UNLOCK(kevq); + } if (kn->kn_fop->f_isfd) { fdrop(kn->kn_fp, td); @@ -2664,31 +3293,90 @@ knote_drop_detached(struct knote *kn, struct thread *td) knote_free(kn); } -static void -knote_enqueue(struct knote *kn) +static struct kevq* +knote_sched(struct knote *kn) { struct kqueue *kq = kn->kn_kq; + struct kevq *each_kevq; + + KQ_OWNED(kq); + KASSERT(kn_in_flux(kn), ("kn not in flux")); + + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { + return kn->kn_org_kevq; + } else { + each_kevq = kq->kq_ckevq; + while(1) { + if (each_kevq == NULL) { + each_kevq = TAILQ_FIRST(&kq->kq_kevqlist); + } else { + each_kevq = TAILQ_NEXT(each_kevq, kq_e); + } + if ((each_kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { + // TODO: could be an infinitely loop if all kevqs are closing + kq->kq_ckevq = each_kevq; + return each_kevq; + } + } + } + } else { + return kq->kq_kevq; + } + + KASSERT(0, ("Shouldn't get here")); + return NULL; +} + +static void +knote_enqueue(struct knote *kn, struct kevq *kevq) +{ + struct kqueue *kq; + kq = kn->kn_kq; + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_enqueue: kn %p to kevq %p\n", kn, kevq); +#endif + + /* TODO: optimize locking, we don't really need KQ_LOCK here except for kqueue_wakeup() */ + KQ_OWNED(kq); + KEVQ_OWNED(kevq); - KQ_OWNED(kn->kn_kq); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); + KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0, ("kevq already closing")); - TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); + kn->kn_kevq = kevq; kn->kn_status |= KN_QUEUED; - kq->kq_count++; + + TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); + kevq->kn_count++; + kqueue_wakeup(kq); + kevq_wakeup(kevq); +} + +static void +knote_xinit(struct knote *kn) +{ + mtx_init(&kn->kn_fluxlock, "kn_fluxlock", NULL, MTX_DEF | MTX_DUPOK); } static void knote_dequeue(struct knote *kn) { - struct kqueue *kq = kn->kn_kq; + struct kevq *kevq = kn->kn_kevq; - KQ_OWNED(kn->kn_kq); + KEVQ_OWNED(kn->kn_kevq); + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_dequeue: kn %p from kevq %p\n", kn, kevq); +#endif KASSERT(kn->kn_status & KN_QUEUED, ("knote not queued")); - TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + TAILQ_REMOVE(&kevq->kn_head, kn, kn_tqe); kn->kn_status &= ~KN_QUEUED; - kq->kq_count--; + kn->kn_kevq = NULL; + kevq->kn_count--; } static void @@ -2710,7 +3398,9 @@ knote_alloc(int mflag) static void knote_free(struct knote *kn) { - +#ifdef KQ_DEBUG + printf("KQUEUE: knote_free: kn %p\n", kn, kevq); +#endif uma_zfree(knote_zone, kn); } @@ -2721,6 +3411,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag) { struct kqueue *kq; + struct kevq *kevq; struct file *fp; cap_rights_t rights; int error; @@ -2728,11 +3419,12 @@ kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag) error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp); if (error != 0) return (error); - if ((error = kqueue_acquire(fp, &kq)) != 0) + if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)) != 0) goto noacquire; - error = kqueue_register(kq, kev, td, mflag); + error = kqueue_register(kq, kevq, kev, td, mflag); kqueue_release(kq, 0); + kevq_release(kevq, 0); noacquire: fdrop(fp, td); diff --git a/sys/kern/kern_fork.c b/sys/kern/kern_fork.c index 39307a573bb3..5a886d9cfaaa 100644 --- a/sys/kern/kern_fork.c +++ b/sys/kern/kern_fork.c @@ -690,7 +690,7 @@ do_fork(struct thread *td, struct fork_req *fr, struct proc *p2, struct thread * /* * Tell any interested parties about the new process. */ - knote_fork(p1->p_klist, p2->p_pid); + knote_fork(p1->p_klist, td, p2->p_pid); /* * Now can be swapped. diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c index 69635f58f67e..c176c43590de 100644 --- a/sys/kern/kern_thread.c +++ b/sys/kern/kern_thread.c @@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #ifdef HWPMC_HOOKS #include #endif @@ -444,6 +445,9 @@ thread_free(struct thread *td) cpu_thread_free(td); if (td->td_kstack != 0) vm_thread_dispose(td); + if (td->td_kevq_thred != NULL) { + kevq_thred_drain(td->td_kevq_thred); + } callout_drain(&td->td_slpcallout); uma_zfree(thread_zone, td); } diff --git a/sys/sys/event.h b/sys/sys/event.h index e2e80f37946e..37e09d8fdfc1 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -33,6 +33,8 @@ #include #include +#include +#include #define EVFILT_READ (-1) #define EVFILT_WRITE (-2) @@ -143,6 +145,10 @@ struct kevent32_freebsd11 { #define EV_CLEAR 0x0020 /* clear event state after reporting */ #define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */ #define EV_DISPATCH 0x0080 /* disable event after reporting */ +#define EV_MULTI 0x0200 /* enable multithreaded mode, only works for the first kevent passed in + * This flag of subsequent kevents is ignored + */ +#define EV_AFFINITY 0x0400 /* in multithreaded mode, this event has hard affinity for the registering thread */ #define EV_SYSFLAGS 0xF000 /* reserved by system */ #define EV_DROP 0x1000 /* note should be dropped */ @@ -220,6 +226,9 @@ struct knote; SLIST_HEAD(klist, knote); struct kqueue; TAILQ_HEAD(kqlist, kqueue); +struct kevq; +SLIST_HEAD(kevqlist, kevq); + struct knlist { struct klist kl_list; void (*kl_lock)(void *); /* lock function */ @@ -269,6 +278,10 @@ struct filterops { void (*f_touch)(struct knote *kn, struct kevent *kev, u_long type); }; +/* The ioctl to set multithreaded mode + */ +#define FKQMULTI _IOW('f', 89, int) + /* * An in-flux knote cannot be dropped from its kq while the kq is * unlocked. If the KN_SCAN flag is not set, a thread can only set @@ -283,7 +296,9 @@ struct knote { SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; - struct kqueue *kn_kq; /* which queue we are on */ + struct kqueue *kn_kq; /* which kqueue we are on */ + struct kevq *kn_org_kevq; /* the kevq that registered the knote */ + struct kevq *kn_kevq; /* the kevq the knote is on */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; @@ -295,7 +310,9 @@ struct knote { #define KN_MARKER 0x20 /* ignore this knote */ #define KN_KQUEUE 0x40 /* this knote belongs to a kq */ #define KN_SCAN 0x100 /* flux set in kqueue_scan() */ + int kn_fluxwait; int kn_influx; + struct mtx kn_fluxlock; int kn_sfflags; /* saved filter flags */ int64_t kn_sdata; /* saved data field */ union { @@ -321,6 +338,14 @@ struct kevent_copyops { size_t kevent_size; }; +struct kevq_thred { + u_long kevq_hashmask; /* hash mask for kevqs */ + struct kevqlist *kevq_hash; /* hash table for kevqs */ + TAILQ_HEAD(, kevq) kevq_tq; + struct mtx lock; /* the lock for the kevq*/ +}; + + struct thread; struct proc; struct knlist; @@ -328,7 +353,7 @@ struct mtx; struct rwlock; void knote(struct knlist *list, long hint, int lockflags); -void knote_fork(struct knlist *list, int pid); +void knote_fork(struct knlist *list, struct thread *td, int pid); struct knlist *knlist_alloc(struct mtx *lock); void knlist_detach(struct knlist *knl); void knlist_add(struct knlist *knl, struct knote *kn, int islocked); @@ -352,6 +377,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *p, int kqueue_add_filteropts(int filt, struct filterops *filtops); int kqueue_del_filteropts(int filt); +void kevq_thred_drain(struct kevq_thred *kevq_th); #else /* !_KERNEL */ #include diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 1ed6e9fc4b6b..2b93c612e3f5 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -40,27 +40,46 @@ #define KQ_NEVENTS 8 /* minimize copy{in,out} calls */ #define KQEXTENT 256 /* linear growth by this amount */ +struct kevq { + SLIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */ + TAILQ_ENTRY(kevq) kq_e; /* entry into kqueue's list */ + TAILQ_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's TAILQ */ + struct kqueue *kq; /* the kq that the kevq belongs to */ + struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */ + struct mtx lock; /* the lock for the kevq */ + TAILQ_HEAD(, knote) kn_head; /* list of pending knotes */ + int kn_count; /* number of pending knotes */ +#define KEVQ_SLEEP 0x01 +#define KEVQ_CLOSING 0x02 + int kevq_state; + int kevq_refcnt; +}; + struct kqueue { struct mtx kq_lock; int kq_refcnt; - TAILQ_ENTRY(kqueue) kq_list; - TAILQ_HEAD(, knote) kq_head; /* list of pending event */ - int kq_count; /* number of pending events */ struct selinfo kq_sel; - struct sigio *kq_sigio; - struct filedesc *kq_fdp; int kq_state; #define KQ_SEL 0x01 -#define KQ_SLEEP 0x02 -#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */ -#define KQ_ASYNC 0x08 +#define KQ_ASYNC 0x02 +#define KQ_TASKSCHED 0x04 /* task scheduled */ +#define KQ_TASKDRAIN 0x08 /* waiting for task to drain */ #define KQ_CLOSING 0x10 -#define KQ_TASKSCHED 0x20 /* task scheduled */ -#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */ +#define KQ_FLAG_INIT 0x20 /* kqueue has been initialized. this flag is set after the first kevent structure is processed */ +#define KQ_FLAG_MULTI 0x40 /* Multi-threaded mode */ + TAILQ_ENTRY(kqueue) kq_list; + struct sigio *kq_sigio; + struct filedesc *kq_fdp; int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ + /* only-set: in multithreaded mode */ + TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */ + struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */ + /* only-set: in single threaded mode */ + struct kevq *kq_kevq; + /* End only-set */ struct task kq_task; struct ucred *kq_cred; }; diff --git a/sys/sys/proc.h b/sys/sys/proc.h index 7e67ec48e0ac..fd2c9871274e 100644 --- a/sys/sys/proc.h +++ b/sys/sys/proc.h @@ -365,6 +365,7 @@ struct thread { void *td_lkpi_task; /* LinuxKPI task struct pointer */ struct epoch_tracker *td_et; /* (k) compat KPI spare tracker */ int td_pmcpend; + struct kevq_thred *td_kevq_thred; }; struct thread0_storage {