diff --git a/lib/libthr/pthread.map b/lib/libthr/pthread.map index a14341e557ff..ec7017d65c60 100644 --- a/lib/libthr/pthread.map +++ b/lib/libthr/pthread.map @@ -2,12 +2,14 @@ LIBTHREAD_1_0 { global: ___creat; + ___mq_close; __accept; __close; __connect; __error; __fcntl; __fsync; + __mq_notify; __msync; __nanosleep; __open; @@ -32,6 +34,7 @@ global: _aio_suspend; _execve; _fork; + _mq_notify; _nanosleep; _pause; _pselect; @@ -193,6 +196,7 @@ global: fcntl; fork; fsync; + mq_notify; msync; nanosleep; open; diff --git a/lib/libthr/thread/thr_timer.c b/lib/libthr/thread/thr_timer.c index 80e4bd0039e8..c5f204e2bd00 100644 --- a/lib/libthr/thread/thr_timer.c +++ b/lib/libthr/thread/thr_timer.c @@ -27,72 +27,178 @@ * */ +#include +#include + +#include #include #include #include #include -#include #include "thr_private.h" struct thread_node { - struct pthread_attr attr; - TAILQ_ENTRY(thread_node) link; - pthread_t thread; - int refcount; - int exit; - jmp_buf jbuf; - struct timer *curtmr; + struct pthread_attr tn_attr; + TAILQ_ENTRY(thread_node)tn_link; + pthread_t tn_thread; + int tn_refcount; + int tn_exit; + jmp_buf tn_jbuf; + struct rtobj_node * tn_curobj; }; -struct timer { - union sigval value; - void (*function)(union sigval, int); - int timerid; - long flags; - int gen; - struct thread_node *tn; +struct rtobj_node { + LIST_ENTRY(rtobj_node) rt_link; + int rt_type; + union sigval rt_value; + void * rt_func; + umtx_t rt_flags; + union { + int _timerid; + int _mqd; + } _rt_id; + int rt_gen; + struct thread_node * rt_tn; }; -static struct timer **timer_list; -static int timer_gen; -static int timer_max; -static umtx_t timer_list_lock; -static TAILQ_HEAD(,thread_node) timer_threads; -static umtx_t timer_threads_lock; +#define rt_timerid _rt_id._timerid +#define rt_mqd _rt_id._mqd -static void *service_loop(void *); -static int register_timer(struct timer *); -static struct thread_node *create_timer_thread(pthread_attr_t); -static void release_timer_thread(struct thread_node *); +LIST_HEAD(rtobj_hash_head, rtobj_node); +#define HASH_QUEUES 17 +#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) +static struct rtobj_hash_head rtobj_hash[HASH_QUEUES]; +static int generation; +static umtx_t hash_lock; +static TAILQ_HEAD(,thread_node) threads; +static umtx_t threads_lock; + +static struct thread_node *thread_create(pthread_attr_t *); +static void thread_release(struct thread_node *); +static struct rtobj_node *rtobj_find(int, int); +static int rtobj_register(struct rtobj_node *); +static int rtobj_delete(int, int); +static int rtobj_delete_obj(struct rtobj_node *); +static void * service_loop(void *); +static void timer_dispatch(struct rtobj_node *p, int); extern int __sys_timer_create(clockid_t, struct sigevent *, timer_t *); extern int __sys_timer_delete(timer_t); +extern int __sys_mq_notify(mqd_t mqdes, const struct sigevent *); +extern int __mq_close(mqd_t mqd); __weak_reference(__timer_create, timer_create); __weak_reference(__timer_create, _timer_create); __weak_reference(__timer_delete, timer_delete); __weak_reference(__timer_delete, _timer_delete); +__weak_reference(__mq_notify, mq_notify); +__weak_reference(__mq_notify, _mq_notify); +__weak_reference(___mq_close, mq_close); +__weak_reference(___mq_close, _mq_close); -#define SIGTIMER SIGCANCEL /* Reuse SIGCANCEL */ +#define SIGSERVICE SIGCANCEL /* Reuse SIGCANCEL */ -#define WORKING 0x01 -#define WANTED 0x02 +#define RT_WORKING 0x01 +#define RT_WANTED 0x02 -#define TIMERS_LOCK(t) THR_UMTX_LOCK((t), &timer_list_lock) -#define TIMERS_UNLOCK(t) THR_UMTX_UNLOCK((t), &timer_list_lock) +#define HASH_LOCK(t) THR_LOCK_ACQUIRE((t), &hash_lock) +#define HASH_UNLOCK(t) THR_LOCK_RELEASE((t), &hash_lock) -#define THREADS_LOCK(t) THR_UMTX_LOCK((t), &timer_threads_lock) -#define THREADS_UNLOCK(t) THR_UMTX_UNLOCK((t), &timer_threads_lock) +#define THREADS_LOCK(t) THR_LOCK_ACQUIRE((t), &threads_lock) +#define THREADS_UNLOCK(t) THR_LOCK_RELEASE((t), &threads_lock) void _thr_timer_init(void) { - _thr_umtx_init(&timer_list_lock); - _thr_umtx_init(&timer_threads_lock); - TAILQ_INIT(&timer_threads); - timer_list = NULL; - timer_max = 0; + int i; + + _thr_umtx_init(&hash_lock); + _thr_umtx_init(&threads_lock); + for (i = 0; i < HASH_QUEUES; ++i) + LIST_INIT(&rtobj_hash[i]); + TAILQ_INIT(&threads); +} + +static struct rtobj_node * +rtobj_alloc(int type, const struct sigevent *evp) +{ + struct rtobj_node *obj; + + obj = calloc(1, sizeof(*obj)); + if (obj != NULL) { + obj->rt_value = evp->sigev_value; + obj->rt_func = evp->sigev_notify_function; + obj->rt_gen = atomic_fetchadd_int(&generation, 1); + obj->rt_type = type; + } + + return (obj); +} + +static __inline void +rtobj_free(struct rtobj_node *obj) +{ + free(obj); +} + +static struct rtobj_node * +rtobj_find(int type, int id) +{ + struct rtobj_node *obj; + int chain = HASH(type, id); + + LIST_FOREACH(obj, &rtobj_hash[chain], rt_link) { + if (obj->rt_type == type && obj->rt_mqd == id) + break; + } + return (obj); +} + +static int +rtobj_register(struct rtobj_node *obj) +{ + int chain = HASH(obj->rt_type, obj->rt_mqd); + + LIST_INSERT_HEAD(&rtobj_hash[chain], obj, rt_link); + return (0); +} + +static int +rtobj_delete(int type, int id) +{ + struct rtobj_node *obj; + + obj = rtobj_find(type, id); + if (obj != NULL) + return (rtobj_delete_obj(obj)); + return (0); +} + +static int +rtobj_delete_obj(struct rtobj_node *obj) +{ + struct pthread *curthread = _get_curthread(); + umtx_t flags; + + LIST_REMOVE(obj, rt_link); + /* If the timer is servicing, allow it to complete. */ + while ((flags = obj->rt_flags) & RT_WORKING) { + obj->rt_flags |= RT_WANTED; + HASH_UNLOCK(curthread); + _thr_umtx_wait(&obj->rt_flags, flags, NULL); + HASH_LOCK(curthread); + } + HASH_UNLOCK(curthread); + /* + * Drop reference count of servicing thread, + * may free the thread. + */ + thread_release(obj->rt_tn); + rtobj_free(obj); + + HASH_LOCK(curthread); + return (0); } /* @@ -102,61 +208,49 @@ _thr_timer_init(void) int __timer_create(clockid_t clockid, struct sigevent *evp, timer_t *timerid) { - pthread_attr_t attr; struct sigevent ev; - struct timer *tmr; - int ret; + struct rtobj_node *obj; + struct pthread *curthread; + int ret, err; /* Call syscall directly if it is not SIGEV_THREAD */ if (evp == NULL || evp->sigev_notify != SIGEV_THREAD) return (__sys_timer_create(clockid, evp, timerid)); /* Otherwise, do all magical things. */ - tmr = malloc(sizeof(*tmr)); - if (__predict_false(tmr == NULL)) { + obj = rtobj_alloc(SI_TIMER, evp); + if (obj == NULL) { errno = EAGAIN; return (-1); } - tmr->value = evp->sigev_value; - /* XXX - * Here we pass second parameter an overrun count, this is - * not required by POSIX. - */ - tmr->function = (void (*)(union sigval, int)) - evp->sigev_notify_function; - tmr->flags = 0; - tmr->timerid = -1; - pthread_attr_init(&attr); - if (evp->sigev_notify_attributes != NULL) { - *attr = **(pthread_attr_t *)(evp->sigev_notify_attributes); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - } - - tmr->gen = atomic_fetchadd_int(&timer_gen, 1); - tmr->tn = create_timer_thread(attr); - if (__predict_false(tmr->tn == NULL)) { - free(tmr); + obj->rt_tn = thread_create(evp->sigev_notify_attributes); + if (obj->rt_tn == NULL) { + rtobj_free(obj); errno = EAGAIN; return (-1); } /* - * Build a new sigevent, and tell kernel to deliver - * SIGTIMER signal to the new thread. + * Build a new sigevent, and tell kernel to deliver SIGSERVICE + * signal to the new thread. */ ev.sigev_notify = SIGEV_THREAD_ID; - ev.sigev_signo = SIGTIMER; - ev.sigev_notify_thread_id = (lwpid_t)tmr->tn->thread->tid; - ev.sigev_value.sival_int = tmr->gen; - ret = __sys_timer_create(clockid, &ev, &tmr->timerid); - if (__predict_false(ret != 0 || register_timer(tmr) != 0)) { - ret = errno; - release_timer_thread(tmr->tn); - free(tmr); - errno = ret; + ev.sigev_signo = SIGSERVICE; + ev.sigev_notify_thread_id = (lwpid_t)obj->rt_tn->tn_thread->tid; + ev.sigev_value.sival_int = obj->rt_gen; + ret = __sys_timer_create(clockid, &ev, &obj->rt_timerid); + if (ret != 0) { + err = errno; + thread_release(obj->rt_tn); + rtobj_free(obj); + errno = err; return (-1); } - *timerid = tmr->timerid; + curthread = _get_curthread(); + HASH_LOCK(curthread); + rtobj_register(obj); + HASH_UNLOCK(curthread); + *timerid = obj->rt_timerid; return (0); } @@ -164,209 +258,256 @@ int __timer_delete(timer_t timerid) { struct pthread *curthread = _get_curthread(); - struct timer *tmr = NULL; - long flags; - TIMERS_LOCK(curthread); - /* - * Check if this is a SIGEV_THREAD timer by looking up - * it in the registered list. - */ - if (timerid >= 0 && timerid < timer_max && - (tmr = timer_list[timerid]) != NULL) { - /* Take it from timer list */ - timer_list[timerid] = NULL; - /* If the timer is servicing, allow it to complete. */ - while ((flags = tmr->flags) & WORKING) { - tmr->flags |= WANTED; - TIMERS_UNLOCK(curthread); - _thr_umtx_wait(&tmr->flags, flags, NULL); - TIMERS_LOCK(curthread); - } - TIMERS_UNLOCK(curthread); - /* - * Drop reference count of servicing thread, - * may free the thread. - */ - release_timer_thread(tmr->tn); - } else - TIMERS_UNLOCK(curthread); - if (tmr != NULL) - free(tmr); + HASH_LOCK(curthread); + rtobj_delete(SI_TIMER, timerid); + HASH_UNLOCK(curthread); return (__sys_timer_delete(timerid)); } -static struct thread_node * -create_timer_thread(pthread_attr_t attr) +typedef void (*timer_func)(union sigval val, int timerid, int overrun); + +static void +timer_dispatch(struct rtobj_node *obj, int overrun) { - struct pthread *curthread = _get_curthread(); + timer_func f = obj->rt_func; + + f(obj->rt_value, obj->rt_timerid, overrun); +} + +int +___mq_close(mqd_t mqd) +{ + struct pthread *curthread; + int ret; + + curthread = _get_curthread(); + HASH_LOCK(curthread); + rtobj_delete(SI_MESGQ, mqd); + ret = __mq_close(mqd); + HASH_UNLOCK(curthread); + return (ret); +} + +int +__mq_notify(mqd_t mqd, const struct sigevent *evp) +{ + struct sigevent ev; + struct rtobj_node *obj; + struct pthread *curthread; + int ret, err; + + curthread = _get_curthread(); + + HASH_LOCK(curthread); + rtobj_delete(SI_MESGQ, mqd); + if (evp == NULL || evp->sigev_notify != SIGEV_THREAD) { + ret = __sys_mq_notify(mqd, evp); + HASH_UNLOCK(curthread); + return (ret); + } + HASH_UNLOCK(curthread); + + obj = rtobj_alloc(SI_MESGQ, evp); + if (obj == NULL) { + errno = EAGAIN; + return (-1); + } + obj->rt_tn = thread_create(evp->sigev_notify_attributes); + if (obj->rt_tn == NULL) { + rtobj_free(obj); + errno = EAGAIN; + return (-1); + } + obj->rt_mqd = mqd; + /* + * Build a new sigevent, and tell kernel to deliver SIGSERVICE + * signal to the new thread. + */ + ev.sigev_notify = SIGEV_THREAD_ID; + ev.sigev_signo = SIGSERVICE; + ev.sigev_notify_thread_id = (lwpid_t)obj->rt_tn->tn_thread->tid; + ev.sigev_value.sival_int = obj->rt_gen; + HASH_LOCK(curthread); + ret = __sys_mq_notify(mqd, &ev); + if (ret != 0) { + err = errno; + HASH_UNLOCK(curthread); + thread_release(obj->rt_tn); + rtobj_free(obj); + errno = err; + return (-1); + } + rtobj_register(obj); + HASH_UNLOCK(curthread); + return (0); +} + +typedef void (*mq_func)(union sigval val, int mqd); + +static void +mq_dispatch(struct rtobj_node *obj) +{ + mq_func f = obj->rt_func; + + f(obj->rt_value, obj->rt_mqd); +} + +static struct thread_node * +thread_create(pthread_attr_t *pattr) +{ + pthread_attr_t attr; + struct pthread *curthread; struct thread_node *tn; int ret; + curthread = _get_curthread(); + pthread_attr_init(&attr); + if (pattr != NULL) { + *attr = **(pthread_attr_t *)pattr; + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + } + THREADS_LOCK(curthread); /* Search a thread matching the required pthread_attr. */ - TAILQ_FOREACH(tn, &timer_threads, link) { + TAILQ_FOREACH(tn, &threads, tn_link) { if (attr->stackaddr_attr == NULL) { - if (attr->sched_policy == tn->attr.sched_policy && - attr->sched_inherit == tn->attr.sched_inherit && - attr->prio == tn->attr.prio && - attr->stacksize_attr == tn->attr.stacksize_attr && - attr->guardsize_attr == tn->attr.guardsize_attr && + if (attr->sched_policy == tn->tn_attr.sched_policy && + attr->sched_inherit == tn->tn_attr.sched_inherit && + attr->prio == tn->tn_attr.prio && + attr->stacksize_attr == tn->tn_attr.stacksize_attr && + attr->guardsize_attr == tn->tn_attr.guardsize_attr && ((attr->flags & PTHREAD_SCOPE_SYSTEM) == - (tn->attr.flags & PTHREAD_SCOPE_SYSTEM))) + (tn->tn_attr.flags & PTHREAD_SCOPE_SYSTEM))) break; } else { /* * Reuse the thread if it has same stack address, * because two threads can not run on same stack. */ - if (attr->stackaddr_attr == tn->attr.stackaddr_attr) + if (attr->stackaddr_attr == tn->tn_attr.stackaddr_attr) break; } } if (tn != NULL) { - tn->refcount++; + tn->tn_refcount++; THREADS_UNLOCK(curthread); + pthread_attr_destroy(&attr); return (tn); } tn = malloc(sizeof(*tn)); - tn->refcount = 1; - tn->exit = 0; - tn->attr = *attr; - tn->curtmr = NULL; - _thr_signal_block(curthread); /* SIGTIMER is also blocked. */ - TAILQ_INSERT_TAIL(&timer_threads, tn, link); - ret = _pthread_create(&tn->thread, &attr, service_loop, tn); + tn->tn_refcount = 1; + tn->tn_exit = 0; + tn->tn_attr = *attr; + tn->tn_curobj = NULL; + _thr_signal_block(curthread); /* SIGSERVICE is also blocked. */ + TAILQ_INSERT_TAIL(&threads, tn, tn_link); + ret = _pthread_create(&tn->tn_thread, &attr, service_loop, tn); _thr_signal_unblock(curthread); if (ret != 0) { - TAILQ_REMOVE(&timer_threads, tn, link); + TAILQ_REMOVE(&threads, tn, tn_link); free(tn); tn = NULL; } THREADS_UNLOCK(curthread); + pthread_attr_destroy(&attr); return (tn); } static void -release_timer_thread(struct thread_node *tn) +thread_release(struct thread_node *tn) { struct pthread *curthread = _get_curthread(); - struct pthread *th; THREADS_LOCK(curthread); - if (--tn->refcount == 0) { - /* - * If I am the last user, current implement kills the - * service thread, is this allowed by POSIX ? does - * this hurt performance ? - */ - tn->exit = 1; - th = tn->thread; - _thr_send_sig(th, SIGTIMER); - pthread_join(th, NULL); - TAILQ_REMOVE(&timer_threads, tn, link); + tn->tn_refcount--; +#if 0 + if (tn->tn_refcount == 0) { + struct pthread *th; + tn->tn_exit = 1; + th = tn->tn_thread; + _thr_send_sig(th, SIGSERVICE); + TAILQ_REMOVE(&threads, tn, tn_link); } else +#endif tn = NULL; THREADS_UNLOCK(curthread); if (tn != NULL) free(tn); } -/* Register a SIGEV_THREAD timer. */ -static int -register_timer(struct timer *tmr) -{ - struct pthread *curthread = _get_curthread(); - struct timer **list; - int count; - - while ((count = timer_max) <= tmr->timerid) { - if (count < 32) - count = 32; - while (count <= tmr->timerid) - count <<= 1; - list = malloc(count * sizeof(void *)); - memset(list, 0, count * sizeof(void *)); - if (list == NULL) - return (-1); - TIMERS_LOCK(curthread); - if (timer_max >= count) { - TIMERS_UNLOCK(curthread); - free(list); - continue; - } - memcpy(timer_list, list, timer_max * sizeof(void *)); - timer_list = list; - timer_max = count; - TIMERS_UNLOCK(curthread); - } - TIMERS_LOCK(curthread); - timer_list[tmr->timerid] = tmr; - TIMERS_UNLOCK(curthread); - return (0); -} - /* * This function is called if user callback calls * pthread_exit() or pthread_cancel() for the thread. */ static void -cleanup_thread(void *arg) +thread_cleanup(void *arg) { struct pthread *curthread = _get_curthread(); struct thread_node *tn = arg; - if (tn->exit == 0) { - /* broken usercode is killing us. */ - if (tn->curtmr) { - TIMERS_LOCK(curthread); - tn->curtmr->flags &= ~WORKING; - if (tn->curtmr->flags & WANTED) - _thr_umtx_wake(&tn->curtmr->flags, INT_MAX); - TIMERS_UNLOCK(curthread); + if (tn->tn_exit == 0) { + /* broken user code is killing us. */ + if (tn->tn_curobj != NULL) { + HASH_LOCK(curthread); + tn->tn_curobj->rt_flags &= ~RT_WORKING; + if (tn->tn_curobj->rt_flags & RT_WANTED) + _thr_umtx_wake(&tn->tn_curobj->rt_flags, INT_MAX); + HASH_UNLOCK(curthread); } atomic_clear_int(&curthread->cancelflags, THR_CANCEL_EXITING); - longjmp(tn->jbuf, 1); + longjmp(tn->tn_jbuf, 1); } } static void * service_loop(void *arg) { - struct pthread *curthread = _get_curthread(); - struct thread_node *tn = arg; - struct timer *tmr; siginfo_t si; sigset_t set; + struct thread_node *tn; + struct pthread *curthread; + struct rtobj_node *obj; + tn = arg; + curthread = _get_curthread(); /* * Service thread should not be killed by callback, if user * tries to do so, the thread will be restarted. */ - setjmp(tn->jbuf); + setjmp(tn->tn_jbuf); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); sigemptyset(&set); - sigaddset(&set, SIGTIMER); - THR_CLEANUP_PUSH(curthread, cleanup_thread, tn); - while (tn->exit == 0) { + sigaddset(&set, SIGSERVICE); + THR_CLEANUP_PUSH(curthread, thread_cleanup, tn); + while (tn->tn_exit == 0) { if (__predict_false(__sys_sigwaitinfo(&set, &si) == -1 || - si.si_code != SI_TIMER)) + (si.si_code != SI_TIMER && si.si_code != SI_MESGQ))) continue; - TIMERS_LOCK(curthread); - if (si.si_timerid >= 0 && si.si_timerid < timer_max && - (tmr = timer_list[si.si_timerid]) != NULL && - si.si_value.sival_int == tmr->gen) { - tmr->flags |= WORKING; - TIMERS_UNLOCK(curthread); - tn->curtmr = tmr; - tmr->function(tmr->value, si.si_overrun); - tn->curtmr = NULL; - TIMERS_LOCK(curthread); - tmr->flags &= ~WORKING; - if (tmr->flags & WANTED) - _thr_umtx_wake(&tmr->flags, INT_MAX); + HASH_LOCK(curthread); + obj = rtobj_find(si.si_code, si.si_timerid); + if (obj && (obj->rt_gen == si.si_value.sival_int)) { + obj->rt_flags |= RT_WORKING; + HASH_UNLOCK(curthread); + tn->tn_curobj = obj; + if (si.si_code == SI_TIMER) + timer_dispatch(obj, si.si_overrun); + else if (si.si_code == SI_MESGQ) + mq_dispatch(obj); + tn->tn_curobj = NULL; + HASH_LOCK(curthread); + obj->rt_flags &= ~RT_WORKING; + if (obj->rt_flags & RT_WANTED) + _thr_umtx_wake(&obj->rt_flags, INT_MAX); + else if (si.si_code == SI_MESGQ) { + /* + * mq_notify is oneshot event, should remove + * atomatically by the thread. + */ + rtobj_delete_obj(obj); + } } - TIMERS_UNLOCK(curthread); + HASH_UNLOCK(curthread); } THR_CLEANUP_POP(curthread, 0); return (0);