diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index c8d9c9611f11..de7f70d2b2be 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -1349,6 +1349,14 @@ kqueue_kevent(struct kqueue *kq, struct kevq *kevq, struct thread *td, int nchan struct kevent *kevp, *changes; int i, n, nerrors, error; + if ((kq->kq_state & KQ_FLAG_MULTI) == 0 && (kevq->kevq_state & KEVQ_RDY) == 0) { + /* Mark the global kevq as ready for single threaded mode to close the window between + kqueue_register and kqueue_scan.*/ + KEVQ_LOCK(kevq); + kevq->kevq_state |= KEVQ_RDY; + KEVQ_UNLOCK(kevq); + } + nerrors = 0; while (nchanges > 0) { n = nchanges > KQ_NEVENTS ? KQ_NEVENTS : nchanges; @@ -1389,7 +1397,7 @@ kern_kevent_fp(struct thread *td, struct file *fp, int nchanges, int nevents, struct kqueue *kq; struct kevq *kevq; int error; - + error = kqueue_acquire_both(fp, td, &kq, &kevq); if (error != 0) @@ -1776,8 +1784,6 @@ kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct KQ_LOCK(kq); if (event) - kn->kn_status |= KN_ACTIVE; - if ((kn->kn_status & (KN_ACTIVE | KN_DISABLED | KN_QUEUED)) == KN_ACTIVE) knote_activate(kn, 1); kn->kn_status &= ~KN_SCAN; @@ -1840,6 +1846,9 @@ kevq_init(struct kevq *kevq) { static void kevq_release(struct kevq* kevq, int locked) { +#ifdef KQ_DEBUG + printf("KQUEUE: Releasing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); +#endif if (locked) KEVQ_OWNED(kevq); else @@ -1854,8 +1863,10 @@ kevq_release(struct kevq* kevq, int locked) static int kevq_acquire(struct kevq *kevq) { +#ifdef KQ_DEBUG + printf("KQUEUE: Referencing kevq %p (refcnt = %d)\n", kevq, kevq->kevq_refcnt); +#endif KEVQ_NOTOWNED(kevq); - int error; error = 0; KEVQ_LOCK(kevq); @@ -1875,7 +1886,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) int error; void* to_free; struct kevq_thred *kevq_th; - struct kevq *kevq; + struct kevq *kevq, *alloc_kevq; struct kevqlist *kevq_list; kevq = NULL; @@ -1916,30 +1927,50 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp) KASSERT(kevq_th != NULL && kevq_th->kevq_hashmask != 0, ("unallocated kevq")); + // fast fail KEVQ_TH_LOCK(kevq_th); kevq_list = &kevq_th->kevq_hash[KEVQ_HASH((unsigned long long)kq, kevq_th->kevq_hashmask)]; kevq = kevqlist_find(kevq_list, kq); + KEVQ_TH_UNLOCK(kevq_th); + if (kevq == NULL) { - // allocate kevq and add to hash table - // TODO: very bad sleep while holding the lock - kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); - kevq_init(kevq); - kevq->kq = kq; - kevq->kevq_th = kevq_th; - SLIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); - TAILQ_INSERT_HEAD(&kevq_th->kevq_tq, kevq, kevq_th_tqe); + // allocate kevq + to_free = NULL; + alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); + kevq_init(alloc_kevq); + alloc_kevq->kq = kq; + alloc_kevq->kevq_th = kevq_th; + +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", alloc_kevq, td->td_tid); +#endif + + KEVQ_TH_LOCK(kevq_th); + kevq = kevqlist_find(kevq_list, kq); + + if (kevq == NULL) { + kevq = alloc_kevq; + // insert kevq to the kevq_th hash table + SLIST_INSERT_HEAD(kevq_list, kevq, kevq_th_e); + // insert kevq to the kevq_th list, the list is used to drain kevq + TAILQ_INSERT_HEAD(&kevq_th->kevq_tq, kevq, kevq_th_tqe); + + KQ_LOCK(kq); + // insert to kq's kevq list + TAILQ_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); + KQ_UNLOCK(kq); + } else { + to_free = alloc_kevq; + } KEVQ_TH_UNLOCK(kevq_th); - KQ_LOCK(kq); - TAILQ_INSERT_HEAD(&kq->kq_kevqlist, kevq, kq_e); - KQ_UNLOCK(kq); -#ifdef KQ_DEBUG - printf("KQUEUE: kevq_acquire_kq(M): allocated kevq %p for thread %d\n", kevq, td->td_tid); -#endif - } else { - KEVQ_TH_UNLOCK(kevq_th); + if (to_free != NULL) { + free(to_free, M_KQUEUE); + } } + KASSERT(kevq != NULL, ("kevq isn't allocated.")); + } else { if (kq->kq_kevq == NULL) { kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO); @@ -1985,8 +2016,10 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp) *kqp = kq; KQ_LOCK(kq); - /* Mark the kqueue as initialized */ if ((kq->kq_state & KQ_FLAG_INIT) == 0) { + /* Mark the kqueue as initialized + TODO: Why not make some locks separate field so we + don't have short critical sections like this */ kq->kq_state |= KQ_FLAG_INIT; } kq->kq_refcnt++; @@ -2190,6 +2223,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, knote_xinit(marker); marker->kn_status = KN_MARKER; KEVQ_LOCK(kevq); + + if ((kevq->kevq_state & KEVQ_RDY) == 0) { + /* Mark the kevq as ready to receive events */ + kevq->kevq_state |= KEVQ_RDY; + } + retry: kevp = keva; #ifdef KQ_DEBUG @@ -2311,6 +2350,9 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, kevq->kn_count--; kn_list_unlock(knl); influx = 1; +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: kn %p not valid anymore for kevq %p\n", kn, kevq); +#endif continue; } touch = (!kn->kn_fop->f_isfd && kn->kn_fop->f_touch != NULL); @@ -2333,8 +2375,12 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops, kn->kn_status |= KN_DISABLED; kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); kevq->kn_count--; - } else + } else { +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_scan: requeued kn %p to kevq %p\n", kn, kevq); +#endif TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe); + } kn->kn_status &= ~KN_SCAN; kn_leave_flux(kn, 0); @@ -2415,27 +2461,37 @@ kqueue_ioctl(struct file *fp, u_long cmd, void *data, case FIOSETOWN: return (fsetown(*(int *)data, &kq->kq_sigio)); - + case FIOGETOWN: *(int *)data = fgetown(&kq->kq_sigio); return (0); } #endif struct kqueue *kq; + int error = 0; kq = fp->f_data; +#ifdef KQ_DEBUG + printf("KQUEUE: ioctl: received: kq %p cmd: 0x%lx", kq, cmd); +#endif switch (cmd) { - case FKQMULTI: - if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { - return (EINVAL); - } else { - KQ_LOCK(kq); - kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); - KQ_UNLOCK(kq); - } + case FKQMULTI: + if ((kq->kq_state & KQ_FLAG_INIT) == KQ_FLAG_INIT) { + error = (EINVAL); + } else { +#ifdef KQ_DEBUG + printf("KQUEUE: ioctl: multi flag set for kq %p", kq); +#endif + KQ_LOCK(kq); + kq->kq_state |= (KQ_FLAG_INIT | KQ_FLAG_MULTI); + KQ_UNLOCK(kq); + } + break; + default: + error = (ENOTTY); } - return (ENOTTY); + return error; } /*ARGSUSED*/ @@ -2496,6 +2552,9 @@ kqueue_stat(struct file *fp, struct stat *st, struct ucred *active_cred, static void kevq_destroy(struct kevq *kevq) { +#ifdef KQ_DEBUG + printf("KQUEUE: kevq_destroy for %p \n", kevq); +#endif free(kevq, M_KQUEUE); } @@ -2507,10 +2566,9 @@ kevq_drain(struct kevq *kevq) { struct kqueue *kq; struct knote *kn; - struct kevq *new_kevq; struct kevqlist *kevq_list; #ifdef KQ_DEBUG - printf("KQUEUE: kevq_drain for %p with %d knotes\n", kevq, kevq->kn_count); + printf("KQUEUE: kevq_drain for %p (refcnt = %d) with %d knotes\n", kevq, kevq->kevq_refcnt, kevq->kn_count); #endif kq = kevq->kq; @@ -2557,16 +2615,7 @@ kevq_drain(struct kevq *kevq) if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI && (kq->kq_state & KQ_CLOSING) != KQ_CLOSING) { KEVQ_UNLOCK(kevq); - - // reschedule knote iff the outer kq is not closing (caused by thread exit or crash) and KQ is in multicore mode - KQ_LOCK(kq); - new_kevq = knote_sched(kn); - KEVQ_LOCK(new_kevq); - knote_enqueue(kn, new_kevq); - KEVQ_UNLOCK(new_kevq); - KQ_UNLOCK(kq); - - /* relock the current kevq */ + knote_activate(kn, 0); KEVQ_LOCK(kevq); } @@ -2628,7 +2677,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td) error = 0; if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { /* drain all kevqs belonging to the kq */ - TAILQ_FOREACH(kevq, &kq->kq_kevqlist, kq_e) { + while ((kevq = TAILQ_FIRST(&kq->kq_kevqlist)) != NULL) { error = kevq_acquire(kevq); if (!error) { kevq_drain(kevq); @@ -2718,7 +2767,7 @@ kqueue_close(struct file *fp, struct thread *td) int error; int filedesc_unlock; - if ((kq->kq_state | KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { + if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { // only acquire the kqueue lock here if ((error = kqueue_acquire(fp, &kq))) return error; @@ -2752,7 +2801,9 @@ kqueue_close(struct file *fp, struct thread *td) crfree(kq->kq_cred); free(kq, M_KQUEUE); fp->f_data = NULL; - +#ifdef KQ_DEBUG + printf("KQUEUE: kqueue_closed for %p.\n", kq); +#endif return (0); } @@ -2821,6 +2872,9 @@ knote(struct knlist *list, long hint, int lockflags) * or other threads could remove events. */ SLIST_FOREACH_SAFE(kn, &list->kl_list, kn_selnext, tkn) { +#ifdef KQ_DEBUG + printf("KNOTE: knote() scanning kn %p\n", kn); +#endif KN_FLUX_LOCK(kn); if (kn_in_flux(kn) && (kn->kn_status & KN_SCAN) == 0) { /* @@ -2881,18 +2935,26 @@ knote_activate(struct knote *kn, int haskqlock) KN_FLUX_NOTOWNED(kn); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); - - kevq = knote_sched(kn); - - (kn)->kn_status |= KN_ACTIVE; - + + kn->kn_status |= KN_ACTIVE; + +retry: if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) { -#ifdef KQ_DEBUG - printf("KQUEUE: knote_activate: kn %p queued to kevq %p\n", kn, kevq); -#endif - KEVQ_LOCK(kevq); - knote_enqueue(kn, kevq); - KEVQ_UNLOCK(kevq); + kevq = knote_sched(kn); + + if (kevq != NULL) { + // if we have a queue to queue the knote + KEVQ_LOCK(kevq); + + if ((kevq->kevq_state & KEVQ_CLOSING) != 0) { + KEVQ_UNLOCK(kevq); + goto retry; + } + + knote_enqueue(kn, kevq); + + KEVQ_UNLOCK(kevq); + } } if (!haskqlock) { @@ -2906,7 +2968,9 @@ knote_activate(struct knote *kn, int haskqlock) void knlist_add(struct knlist *knl, struct knote *kn, int islocked) { - +#ifdef KQ_DEBUG + printf("KNLIST: knlist_add kn %p\n", kn); +#endif KNL_ASSERT_LOCK(knl, islocked); KQ_NOTOWNED(kn->kn_kq); KASSERT(kn_in_flux(kn), ("knote %p not in flux", kn)); @@ -3295,6 +3359,8 @@ knote_drop_detached(struct knote *kn, struct thread *td) knote_free(kn); } + +// if no kevqs are available for queueing, returns NULL static struct kevq* knote_sched(struct knote *kn) { @@ -3304,30 +3370,63 @@ knote_sched(struct knote *kn) KQ_OWNED(kq); KASSERT(kn_in_flux(kn), ("kn not in flux")); + // reschedule knotes to available threads if ((kq->kq_state & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) { if ((kn->kn_flags & EV_AFFINITY) == EV_AFFINITY) { - return kn->kn_org_kevq; +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) affinity set: kn %p \n", kn); +#endif + if ((kn->kn_org_kevq->kevq_state & KEVQ_RDY) != 0) { + return kn->kn_org_kevq; + } else { + return NULL; + } } else { each_kevq = kq->kq_ckevq; while(1) { if (each_kevq == NULL) { each_kevq = TAILQ_FIRST(&kq->kq_kevqlist); + if (each_kevq == NULL) { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) no kevqs exist for queueing kn %p, discarding... \n", kn); +#endif + break; + } } else { each_kevq = TAILQ_NEXT(each_kevq, kq_e); + if (each_kevq == NULL) { + continue; + } } - if ((each_kevq->kevq_state & KEVQ_CLOSING) == KEVQ_CLOSING) { - // TODO: could be an infinitely loop if all kevqs are closing - kq->kq_ckevq = each_kevq; - return each_kevq; + + if ((each_kevq->kevq_state & KEVQ_CLOSING) == 0 && (each_kevq->kevq_state & KEVQ_RDY) != 0) { + break; + } + + if (each_kevq == kq->kq_ckevq) { + // if the previous "if" didn't break + // we have traversed the list once and the current kevq is closing + // we have no queue to queue the knote +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) no open kevqs for queueing kn %p, discarding... \n", kn); +#endif + each_kevq = NULL; + break; } } + +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(M) next kevq %p for kn %p \n", each_kevq, kn); +#endif + kq->kq_ckevq = each_kevq; + return each_kevq; } } else { +#ifdef KQ_DEBUG + printf("KQUEUE: knote_sched(S): kn %p to kevq %p\n", kn, kq->kq_kevq); +#endif return kq->kq_kevq; } - - KASSERT(0, ("Shouldn't get here")); - return NULL; } static void @@ -3345,7 +3444,7 @@ knote_enqueue(struct knote *kn, struct kevq *kevq) KEVQ_OWNED(kevq); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); - KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0, ("kevq already closing")); + KASSERT((kevq->kevq_state & KEVQ_CLOSING) == 0 && (kevq->kevq_state & KEVQ_RDY) != 0, ("kevq already closing or not ready")); kn->kn_kevq = kevq; kn->kn_status |= KN_QUEUED; @@ -3368,7 +3467,7 @@ knote_dequeue(struct knote *kn) { struct kevq *kevq = kn->kn_kevq; - KEVQ_OWNED(kn->kn_kevq); + KEVQ_OWNED(kevq); #ifdef KQ_DEBUG printf("KQUEUE: knote_dequeue: kn %p from kevq %p\n", kn, kevq); @@ -3401,7 +3500,7 @@ static void knote_free(struct knote *kn) { #ifdef KQ_DEBUG - printf("KQUEUE: knote_free: kn %p\n", kn, kevq); + printf("KQUEUE: knote_free: kn %p\n", kn); #endif uma_zfree(knote_zone, kn); } diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c index c176c43590de..20dedb3b9518 100644 --- a/sys/kern/kern_thread.c +++ b/sys/kern/kern_thread.c @@ -83,9 +83,9 @@ _Static_assert(offsetof(struct thread, td_flags) == 0xfc, "struct thread KBI td_flags"); _Static_assert(offsetof(struct thread, td_pflags) == 0x104, "struct thread KBI td_pflags"); -_Static_assert(offsetof(struct thread, td_frame) == 0x478, +_Static_assert(offsetof(struct thread, td_frame) == 0x478 + 0x8, "struct thread KBI td_frame"); -_Static_assert(offsetof(struct thread, td_emuldata) == 0x530, +_Static_assert(offsetof(struct thread, td_emuldata) == 0x530 + 0x8, "struct thread KBI td_emuldata"); _Static_assert(offsetof(struct proc, p_flag) == 0xb0, "struct proc KBI p_flag"); diff --git a/sys/sys/event.h b/sys/sys/event.h index 37e09d8fdfc1..5fcf30cb964b 100644 --- a/sys/sys/event.h +++ b/sys/sys/event.h @@ -145,10 +145,7 @@ struct kevent32_freebsd11 { #define EV_CLEAR 0x0020 /* clear event state after reporting */ #define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */ #define EV_DISPATCH 0x0080 /* disable event after reporting */ -#define EV_MULTI 0x0200 /* enable multithreaded mode, only works for the first kevent passed in - * This flag of subsequent kevents is ignored - */ -#define EV_AFFINITY 0x0400 /* in multithreaded mode, this event has hard affinity for the registering thread */ +#define EV_AFFINITY 0x0200 /* in multithreaded mode, this event has hard affinity for the registering thread */ #define EV_SYSFLAGS 0xF000 /* reserved by system */ #define EV_DROP 0x1000 /* note should be dropped */ @@ -280,7 +277,7 @@ struct filterops { /* The ioctl to set multithreaded mode */ -#define FKQMULTI _IOW('f', 89, int) +#define FKQMULTI _IO('f', 89) /* * An in-flux knote cannot be dropped from its kq while the kq is diff --git a/sys/sys/eventvar.h b/sys/sys/eventvar.h index 2b93c612e3f5..f9920417ee53 100644 --- a/sys/sys/eventvar.h +++ b/sys/sys/eventvar.h @@ -51,6 +51,7 @@ struct kevq { int kn_count; /* number of pending knotes */ #define KEVQ_SLEEP 0x01 #define KEVQ_CLOSING 0x02 +#define KEVQ_RDY 0x04 int kevq_state; int kevq_refcnt; }; diff --git a/sys/sys/proc.h b/sys/sys/proc.h index fd2c9871274e..bb4b4fd51617 100644 --- a/sys/sys/proc.h +++ b/sys/sys/proc.h @@ -302,6 +302,7 @@ struct thread { int td_rtcgen; /* (s) rtc_generation of abs. sleep */ size_t td_vslock_sz; /* (k) amount of vslock-ed space */ struct kcov_info *td_kcov_info; /* (*) Kernel code coverage data */ + struct kevq_thred *td_kevq_thred; #define td_endzero td_sigmask /* Copied during fork1() or create_thread(). */ @@ -365,7 +366,6 @@ struct thread { void *td_lkpi_task; /* LinuxKPI task struct pointer */ struct epoch_tracker *td_et; /* (k) compat KPI spare tracker */ int td_pmcpend; - struct kevq_thred *td_kevq_thred; }; struct thread0_storage { diff --git a/tests/sys/kqueue/libkqueue/Makefile b/tests/sys/kqueue/libkqueue/Makefile index 94d198bf9900..10672d2d1466 100644 --- a/tests/sys/kqueue/libkqueue/Makefile +++ b/tests/sys/kqueue/libkqueue/Makefile @@ -15,7 +15,8 @@ SRCS.kqtest= \ vnode.c \ proc.c \ signal.c \ + read_m.c \ user.c WARNS?= 2 - +LDADD+= -lthr .include diff --git a/tests/sys/kqueue/libkqueue/main.c b/tests/sys/kqueue/libkqueue/main.c index d2e45c40d894..64a6cb465b25 100644 --- a/tests/sys/kqueue/libkqueue/main.c +++ b/tests/sys/kqueue/libkqueue/main.c @@ -29,6 +29,7 @@ extern void test_evfilt_read(); extern void test_evfilt_signal(); extern void test_evfilt_vnode(); extern void test_evfilt_timer(); +extern void test_evfilt_read_m(); extern void test_evfilt_proc(); #if HAVE_EVFILT_USER extern void test_evfilt_user(); @@ -322,6 +323,7 @@ main(int argc, char **argv) int test_signal = 1; int test_vnode = 1; int test_timer = 1; + int test_socket_m = 1; #ifdef __FreeBSD__ int test_user = 1; #else @@ -342,6 +344,8 @@ main(int argc, char **argv) test_vnode = 0; if (strcmp(argv[0], "--no-user") == 0) test_user = 0; + if (strcmp(argv[0], "--no-socket_m") == 0) + test_socket_m = 0; argv++; argc--; } @@ -360,6 +364,8 @@ main(int argc, char **argv) if (test_socket) test_evfilt_read(); + if (test_socket_m) + test_evfilt_read_m(); if (test_signal) test_evfilt_signal(); if (test_vnode)