remove kq refcnt

This commit is contained in:
Oscar Zhao 2019-07-29 23:49:15 -04:00
parent cb4c673500
commit 2fdea6945b
3 changed files with 69 additions and 98 deletions

View File

@ -141,11 +141,11 @@ static void kevq_thred_destroy(struct kevq_thred *kevq_th);
static void kevq_wakeup(struct kevq* kevq);
static void kevq_init(struct kevq *kevq);
static void kevq_release(struct kevq* kevq, int locked);
static int kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp);
static void kevq_destroy(struct kevq *kevq);
static int kevq_acquire(struct kevq *kevq, int locked);
static void kevq_worksteal(struct kevq *kevq);
void kevq_drain(struct kevq *kevq);
static int kqueue_acquire_kevq(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevq);
static void knote_xinit(struct knote *kn);
@ -153,9 +153,7 @@ static int kevent_copyout(void *arg, struct kevent *kevp, int count);
static int kevent_copyin(void *arg, struct kevent *kevp, int count);
static int kqueue_register(struct kqueue *kq, struct kevq *kevq,
struct kevent *kev, struct thread *td, int mflag);
static int kqueue_acquire(struct file *fp, struct kqueue **kqp);
static int kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp);
static void kqueue_release(struct kqueue *kq, int locked);
static int kqueue_obtain_kevq(struct kqueue *kq, struct thread *td, struct kevq **kevqp);
static void kqueue_destroy(struct kqueue *kq);
static void kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td);
static int kqueue_expand(struct kqueue *kq, struct filterops *fops,
@ -1502,13 +1500,12 @@ kern_kevent_fp(struct thread *td, struct file *fp, int nchanges, int nevents,
struct kevq *kevq;
int error;
error = kqueue_acquire_both(fp, td, &kq, &kevq);
error = kqueue_acquire_kevq(fp, td, &kq, &kevq);
if (error != 0)
return (error);
error = kqueue_kevent(kq, kevq, td, nchanges, nevents, k_ops, timeout);
kqueue_release(kq, 0);
kevq_release(kevq, 0);
return (error);
}
@ -1529,7 +1526,6 @@ kern_kevent_anonymous(struct thread *td, int nevents,
kevq_init(&kevq);
kq.kq_kevq = &kevq;
kevq.kq = &kq;
kq.kq_refcnt = 1;
kevq.kevq_refcnt = 1;
error = kqueue_kevent(&kq, &kevq, td, nevents, nevents, k_ops, NULL);
// TODO: kevq destroy called here but memory not dynamically allocated
@ -1621,9 +1617,7 @@ kqueue_fo_release(int filt)
mtx_unlock(&filterops_lock);
}
/*
* A ref to kq (obtained via kqueue_acquire) must be held.
*/
static int
kqueue_register(struct kqueue *kq, struct kevq *kevq, struct kevent *kev, struct thread *td,
int mflag)
@ -1986,11 +1980,9 @@ kevq_acquire(struct kevq *kevq, int locked)
return error;
}
/* a reference to kq must be held */
static int
kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
kqueue_obtain_kevq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
{
int error;
void* to_free;
struct kevq_thred *kevq_th;
struct kevq *kevq, *alloc_kevq;
@ -1998,7 +1990,6 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
struct kqdom *kqd;
kevq = NULL;
error = 0;
to_free = NULL;
kevq_th = NULL;
@ -2018,7 +2009,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
thread_lock(td);
if (td->td_kevq_thred == NULL) {
td->td_kevq_thred = kevq_th;
CTR2(KTR_KQ, "kevq_acquire_kq(M): allocated kevq_th %p for thread %d", kevq_th, td->td_tid);
CTR2(KTR_KQ, "kqueue_ensure_kevq(M): allocated kevq_th %p for thread %d", kevq_th, td->td_tid);
} else {
to_free = kevq_th;
kevq_th = td->td_kevq_thred;
@ -2052,7 +2043,7 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
kqd = kqdom_find(kq->kq_kqd, td->td_oncpu);
alloc_kevq->kevq_kqd = kqd;
CTR4(KTR_KQ, "kevq_acquire_kq(M): allocated kevq %p for thread %d (oncpu = %d), kqdom %d", alloc_kevq, td->td_tid, td->td_oncpu, kqd->id);
CTR4(KTR_KQ, "kqueue_ensure_kevq(M): allocated kevq %p for thread %d (oncpu = %d), kqdom %d", alloc_kevq, td->td_tid, td->td_oncpu, kqd->id);
KQ_LOCK(kq);
KQD_LOCK(kqd);
@ -2088,14 +2079,11 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
free(to_free, M_KQUEUE);
}
}
KASSERT(kevq != NULL, ("kevq isn't allocated."));
} else {
kevq = kq->kq_kevq;
if (kevq == NULL) {
alloc_kevq = malloc(sizeof(struct kevq), M_KQUEUE, M_WAITOK | M_ZERO);
CTR2(KTR_KQ, "kevq_acquire_kq(S): allocated kevq %p for kq %p", alloc_kevq, kq);
CTR2(KTR_KQ, "kqueue_ensure_kevq(S): allocated kevq %p for kq %p", alloc_kevq, kq);
kevq_init(alloc_kevq);
alloc_kevq->kq = kq;
@ -2114,36 +2102,16 @@ kevq_acquire_kq(struct kqueue *kq, struct thread *td, struct kevq **kevqp)
}
}
error = kevq_acquire(kevq, 0);
if (!error) {
KASSERT(kevq != NULL, ("kevq isn't allocated."));
*kevqp = kevq;
}
return error;
return 0;
}
static int
kqueue_acquire(struct file *fp, struct kqueue **kqp)
static void
kqueue_check_kqdom(struct kqueue *kq)
{
struct kqueue *kq;
struct kqdom *kqd;
kq = fp->f_data;
if (fp->f_type != DTYPE_KQUEUE || kq == NULL)
return (EBADF);
*kqp = kq;
KQ_LOCK(kq);
if (((kq->kq_state) & KQ_CLOSING) != 0) {
return (EBADF);
}
if ((kq->kq_flags & KQ_FLAG_INIT) == 0) {
kq->kq_flags |= KQ_FLAG_INIT;
}
kq->kq_refcnt++;
KQ_UNLOCK(kq);
struct kqdom* kqd;
if (((kq->kq_flags & KQ_FLAG_MULTI) != 0) && (kq->kq_kqd == NULL)) {
kqd = kqdom_build();
KQ_LOCK(kq);
@ -2157,45 +2125,49 @@ kqueue_acquire(struct file *fp, struct kqueue **kqp)
kqdom_destroy(kqd);
}
}
return 0;
}
static int
kqueue_acquire_both(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp)
kqueue_acquire_kevq(struct file *fp, struct thread *td, struct kqueue **kqp, struct kevq **kevqp)
{
struct kqueue *kq;
struct kevq *kevq;
int error;
struct kqueue *tmp_kq;
struct kevq *tmp_kevq;
error = kqueue_acquire(fp, &tmp_kq);
kq = fp->f_data;
if (fp->f_type != DTYPE_KQUEUE || kq == NULL)
return (EBADF);
*kqp = kq;
if (!error) {
error = kevq_acquire_kq(tmp_kq, td, &tmp_kevq);
}
/* We already know that only one thread can be in kqueue syscall context
* when kqueue_close is called due to file descriptor limitations
*/
KASSERT((kq->kq_state & KQ_CLOSING) == 0, ("kq still in syscall context while closing"));
if (error) {
kqueue_release(tmp_kq, 0);
} else {
*kqp = tmp_kq;
*kevqp = tmp_kevq;
}
return error;
}
static void
kqueue_release(struct kqueue *kq, int locked)
{
if (locked)
KQ_OWNED(kq);
else
/* set the init flag, which blocks others from changing ioctls */
if ((kq->kq_flags & KQ_FLAG_INIT) == 0) {
KQ_LOCK(kq);
kq->kq_refcnt--;
if (kq->kq_refcnt == 1)
wakeup(&kq->kq_refcnt);
if (!locked)
kq->kq_flags |= KQ_FLAG_INIT;
KQ_UNLOCK(kq);
}
/* allocate kqdoms if not present */
kqueue_check_kqdom(kq);
error = kqueue_obtain_kevq(kq, td, &kevq);
if (error == 0) {
*kevqp = kevq;
error = kevq_acquire(kevq, 0);
}
/*
* we can obtain ref then acquire because kevq can be destroyed either:
* 1. by our own thread exiting
* 2. by whoever closes the kq, but then nobody else should be in kqueue syscall context
* All of the above imply the kevq reference cannot be invalid here
*/
return error;
}
static void
@ -2701,8 +2673,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
kevp = keva;
CTR3(KTR_KQ, "kqueue_scan: td %d on kevq %p has %d events", td->td_tid, kevq, kevq->kn_count);
if ((kq->kq_flags & KQ_FLAG_MULTI) != 0 && (kq->kq_sched_flags & KQ_SCHED_WORK_STEALING) != 0 && kevq->kn_count == 0)
{
if ((kq->kq_flags & KQ_FLAG_MULTI) != 0 && (kq->kq_sched_flags & KQ_SCHED_WORK_STEALING) != 0 && kevq->kn_count == 0) {
/* try work stealing */
kevq_worksteal(kevq);
}
@ -2851,7 +2822,7 @@ kqueue_scan(struct kevq *kevq, int maxevents, struct kevent_copyops *k_ops,
/* this flag is here to prevent a subtle workstealing race where one thread gets an identifier
and returns, before it can process the event, another thread steals the knote and
processes the same fd, resulting in the first thread having no data available.
Work stealing will avoid stealing knotes with this flag set*/
Work stealing will avoid stealing knotes with this flag set */
kn->kn_status |= KN_REQUEUE;
CTR3(KTR_KQ, "kqueue_scan: requeued kn %p, ident: %d to kevq %p", kn, kn->kn_id,kevq);
TAILQ_INSERT_TAIL(&kevq->kn_head, kn, kn_tqe);
@ -2988,7 +2959,7 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
int revents = 0;
int error;
if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)))
if ((error = kqueue_acquire_kevq(fp, td, &kq, &kevq)))
return POLLERR;
KQ_LOCK(kq);
@ -3006,7 +2977,6 @@ kqueue_poll(struct file *fp, int events, struct ucred *active_cred,
}
}
kqueue_release(kq, 1);
KQ_UNLOCK(kq);
kevq_release(kevq, 0);
return (revents);
@ -3179,10 +3149,6 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
KASSERT((kq->kq_state & KQ_CLOSING) != KQ_CLOSING,
("kqueue already closing"));
kq->kq_state |= KQ_CLOSING;
if (kq->kq_refcnt > 1)
msleep(&kq->kq_refcnt, &kq->kq_lock, PSOCK, "kqclose", 0);
KASSERT(kq->kq_refcnt == 1, ("other refs are out there!"));
KASSERT(knlist_empty(&kq->kq_sel.si_note),
("kqueue's knlist not empty"));
@ -3191,6 +3157,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
for (i = 0; i < kq->kq_knlistsize; i++) {
while ((kn = SLIST_FIRST(&kq->kq_knlist[i])) != NULL) {
KQ_OWNED(kq);
KN_FLUX_LOCK(kn);
if (kn_in_flux(kn)) {
kn->kn_fluxwait = 1;
@ -3203,6 +3170,7 @@ kqueue_drain(struct kqueue *kq, struct kevq *kevq, struct thread *td)
}
knote_enter_flux(kn);
KN_FLUX_UNLOCK(kn);
KQ_UNLOCK(kq);
knote_drop(kn, td);
KQ_LOCK(kq);
@ -3293,13 +3261,9 @@ kqueue_close(struct file *fp, struct thread *td)
int error;
int filedesc_unlock;
if ((kq->kq_flags & KQ_FLAG_MULTI) == KQ_FLAG_MULTI) {
// only acquire the kqueue lock here
if ((error = kqueue_acquire(fp, &kq)))
return error;
} else {
// acquire both
if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)))
if ((kq->kq_flags & KQ_FLAG_MULTI) == 0) {
/* acquire kevq if we are not in single threaded mode */
if ((error = kqueue_acquire_kevq(fp, td, &kq, &kevq)))
return error;
}
@ -4060,7 +4024,7 @@ knote_next_kevq(struct knote *kn)
kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid));
} else {
if (kn->kn_kqd == NULL) {
/* the first time knote is queued, record the kqdom */
/* the first time a knote is queued, record the kqdom */
kn->kn_kqd = kqdom_find(kq->kq_kqd, PCPU_GET(cpuid));
KASSERT(kn->kn_kqd != NULL, ("knote scheduled on an unidentified CPU2"));
@ -4218,11 +4182,10 @@ kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
if (error != 0)
return (error);
if ((error = kqueue_acquire_both(fp, td, &kq, &kevq)) != 0)
if ((error = kqueue_acquire_kevq(fp, td, &kq, &kevq)) != 0)
goto noacquire;
error = kqueue_register(kq, kevq, kev, td, mflag);
kqueue_release(kq, 0);
kevq_release(kevq, 0);
noacquire:

View File

@ -88,7 +88,6 @@ struct kqdom {
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
struct selinfo kq_sel;
int kq_state;
#define KQ_SEL 0x01

View File

@ -754,6 +754,9 @@ test_evfilt_read_m()
flags = KQ_SCHED_QUEUE;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_queue();
test_socket_brutal();
@ -763,6 +766,9 @@ test_evfilt_read_m()
flags = KQ_SCHED_BEST_OF_N;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_brutal();
@ -771,6 +777,9 @@ test_evfilt_read_m()
flags = KQ_SCHED_WORK_STEALING;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
test_socket_ws();
test_socket_brutal();