1. Add SIGEV_THREAD notification for mq_notify.
2. Reuse current timer code and abstract some common code to to support both timer and mqueue.
This commit is contained in:
parent
a4ba4c8368
commit
45180066f9
@ -2,12 +2,14 @@
|
||||
LIBTHREAD_1_0 {
|
||||
global:
|
||||
___creat;
|
||||
___mq_close;
|
||||
__accept;
|
||||
__close;
|
||||
__connect;
|
||||
__error;
|
||||
__fcntl;
|
||||
__fsync;
|
||||
__mq_notify;
|
||||
__msync;
|
||||
__nanosleep;
|
||||
__open;
|
||||
@ -32,6 +34,7 @@ global:
|
||||
_aio_suspend;
|
||||
_execve;
|
||||
_fork;
|
||||
_mq_notify;
|
||||
_nanosleep;
|
||||
_pause;
|
||||
_pselect;
|
||||
@ -193,6 +196,7 @@ global:
|
||||
fcntl;
|
||||
fork;
|
||||
fsync;
|
||||
mq_notify;
|
||||
msync;
|
||||
nanosleep;
|
||||
open;
|
||||
|
@ -27,72 +27,178 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#include <sys/cdefs.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <mqueue.h>
|
||||
#include <time.h>
|
||||
#include <setjmp.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include "thr_private.h"
|
||||
|
||||
struct thread_node {
|
||||
struct pthread_attr attr;
|
||||
TAILQ_ENTRY(thread_node) link;
|
||||
pthread_t thread;
|
||||
int refcount;
|
||||
int exit;
|
||||
jmp_buf jbuf;
|
||||
struct timer *curtmr;
|
||||
struct pthread_attr tn_attr;
|
||||
TAILQ_ENTRY(thread_node)tn_link;
|
||||
pthread_t tn_thread;
|
||||
int tn_refcount;
|
||||
int tn_exit;
|
||||
jmp_buf tn_jbuf;
|
||||
struct rtobj_node * tn_curobj;
|
||||
};
|
||||
|
||||
struct timer {
|
||||
union sigval value;
|
||||
void (*function)(union sigval, int);
|
||||
int timerid;
|
||||
long flags;
|
||||
int gen;
|
||||
struct thread_node *tn;
|
||||
struct rtobj_node {
|
||||
LIST_ENTRY(rtobj_node) rt_link;
|
||||
int rt_type;
|
||||
union sigval rt_value;
|
||||
void * rt_func;
|
||||
umtx_t rt_flags;
|
||||
union {
|
||||
int _timerid;
|
||||
int _mqd;
|
||||
} _rt_id;
|
||||
int rt_gen;
|
||||
struct thread_node * rt_tn;
|
||||
};
|
||||
|
||||
static struct timer **timer_list;
|
||||
static int timer_gen;
|
||||
static int timer_max;
|
||||
static umtx_t timer_list_lock;
|
||||
static TAILQ_HEAD(,thread_node) timer_threads;
|
||||
static umtx_t timer_threads_lock;
|
||||
#define rt_timerid _rt_id._timerid
|
||||
#define rt_mqd _rt_id._mqd
|
||||
|
||||
static void *service_loop(void *);
|
||||
static int register_timer(struct timer *);
|
||||
static struct thread_node *create_timer_thread(pthread_attr_t);
|
||||
static void release_timer_thread(struct thread_node *);
|
||||
LIST_HEAD(rtobj_hash_head, rtobj_node);
|
||||
#define HASH_QUEUES 17
|
||||
#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES)
|
||||
static struct rtobj_hash_head rtobj_hash[HASH_QUEUES];
|
||||
static int generation;
|
||||
static umtx_t hash_lock;
|
||||
static TAILQ_HEAD(,thread_node) threads;
|
||||
static umtx_t threads_lock;
|
||||
|
||||
static struct thread_node *thread_create(pthread_attr_t *);
|
||||
static void thread_release(struct thread_node *);
|
||||
static struct rtobj_node *rtobj_find(int, int);
|
||||
static int rtobj_register(struct rtobj_node *);
|
||||
static int rtobj_delete(int, int);
|
||||
static int rtobj_delete_obj(struct rtobj_node *);
|
||||
static void * service_loop(void *);
|
||||
static void timer_dispatch(struct rtobj_node *p, int);
|
||||
|
||||
extern int __sys_timer_create(clockid_t, struct sigevent *, timer_t *);
|
||||
extern int __sys_timer_delete(timer_t);
|
||||
extern int __sys_mq_notify(mqd_t mqdes, const struct sigevent *);
|
||||
extern int __mq_close(mqd_t mqd);
|
||||
|
||||
__weak_reference(__timer_create, timer_create);
|
||||
__weak_reference(__timer_create, _timer_create);
|
||||
__weak_reference(__timer_delete, timer_delete);
|
||||
__weak_reference(__timer_delete, _timer_delete);
|
||||
__weak_reference(__mq_notify, mq_notify);
|
||||
__weak_reference(__mq_notify, _mq_notify);
|
||||
__weak_reference(___mq_close, mq_close);
|
||||
__weak_reference(___mq_close, _mq_close);
|
||||
|
||||
#define SIGTIMER SIGCANCEL /* Reuse SIGCANCEL */
|
||||
#define SIGSERVICE SIGCANCEL /* Reuse SIGCANCEL */
|
||||
|
||||
#define WORKING 0x01
|
||||
#define WANTED 0x02
|
||||
#define RT_WORKING 0x01
|
||||
#define RT_WANTED 0x02
|
||||
|
||||
#define TIMERS_LOCK(t) THR_UMTX_LOCK((t), &timer_list_lock)
|
||||
#define TIMERS_UNLOCK(t) THR_UMTX_UNLOCK((t), &timer_list_lock)
|
||||
#define HASH_LOCK(t) THR_LOCK_ACQUIRE((t), &hash_lock)
|
||||
#define HASH_UNLOCK(t) THR_LOCK_RELEASE((t), &hash_lock)
|
||||
|
||||
#define THREADS_LOCK(t) THR_UMTX_LOCK((t), &timer_threads_lock)
|
||||
#define THREADS_UNLOCK(t) THR_UMTX_UNLOCK((t), &timer_threads_lock)
|
||||
#define THREADS_LOCK(t) THR_LOCK_ACQUIRE((t), &threads_lock)
|
||||
#define THREADS_UNLOCK(t) THR_LOCK_RELEASE((t), &threads_lock)
|
||||
|
||||
void
|
||||
_thr_timer_init(void)
|
||||
{
|
||||
_thr_umtx_init(&timer_list_lock);
|
||||
_thr_umtx_init(&timer_threads_lock);
|
||||
TAILQ_INIT(&timer_threads);
|
||||
timer_list = NULL;
|
||||
timer_max = 0;
|
||||
int i;
|
||||
|
||||
_thr_umtx_init(&hash_lock);
|
||||
_thr_umtx_init(&threads_lock);
|
||||
for (i = 0; i < HASH_QUEUES; ++i)
|
||||
LIST_INIT(&rtobj_hash[i]);
|
||||
TAILQ_INIT(&threads);
|
||||
}
|
||||
|
||||
static struct rtobj_node *
|
||||
rtobj_alloc(int type, const struct sigevent *evp)
|
||||
{
|
||||
struct rtobj_node *obj;
|
||||
|
||||
obj = calloc(1, sizeof(*obj));
|
||||
if (obj != NULL) {
|
||||
obj->rt_value = evp->sigev_value;
|
||||
obj->rt_func = evp->sigev_notify_function;
|
||||
obj->rt_gen = atomic_fetchadd_int(&generation, 1);
|
||||
obj->rt_type = type;
|
||||
}
|
||||
|
||||
return (obj);
|
||||
}
|
||||
|
||||
static __inline void
|
||||
rtobj_free(struct rtobj_node *obj)
|
||||
{
|
||||
free(obj);
|
||||
}
|
||||
|
||||
static struct rtobj_node *
|
||||
rtobj_find(int type, int id)
|
||||
{
|
||||
struct rtobj_node *obj;
|
||||
int chain = HASH(type, id);
|
||||
|
||||
LIST_FOREACH(obj, &rtobj_hash[chain], rt_link) {
|
||||
if (obj->rt_type == type && obj->rt_mqd == id)
|
||||
break;
|
||||
}
|
||||
return (obj);
|
||||
}
|
||||
|
||||
static int
|
||||
rtobj_register(struct rtobj_node *obj)
|
||||
{
|
||||
int chain = HASH(obj->rt_type, obj->rt_mqd);
|
||||
|
||||
LIST_INSERT_HEAD(&rtobj_hash[chain], obj, rt_link);
|
||||
return (0);
|
||||
}
|
||||
|
||||
static int
|
||||
rtobj_delete(int type, int id)
|
||||
{
|
||||
struct rtobj_node *obj;
|
||||
|
||||
obj = rtobj_find(type, id);
|
||||
if (obj != NULL)
|
||||
return (rtobj_delete_obj(obj));
|
||||
return (0);
|
||||
}
|
||||
|
||||
static int
|
||||
rtobj_delete_obj(struct rtobj_node *obj)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
umtx_t flags;
|
||||
|
||||
LIST_REMOVE(obj, rt_link);
|
||||
/* If the timer is servicing, allow it to complete. */
|
||||
while ((flags = obj->rt_flags) & RT_WORKING) {
|
||||
obj->rt_flags |= RT_WANTED;
|
||||
HASH_UNLOCK(curthread);
|
||||
_thr_umtx_wait(&obj->rt_flags, flags, NULL);
|
||||
HASH_LOCK(curthread);
|
||||
}
|
||||
HASH_UNLOCK(curthread);
|
||||
/*
|
||||
* Drop reference count of servicing thread,
|
||||
* may free the thread.
|
||||
*/
|
||||
thread_release(obj->rt_tn);
|
||||
rtobj_free(obj);
|
||||
|
||||
HASH_LOCK(curthread);
|
||||
return (0);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -102,61 +208,49 @@ _thr_timer_init(void)
|
||||
int
|
||||
__timer_create(clockid_t clockid, struct sigevent *evp, timer_t *timerid)
|
||||
{
|
||||
pthread_attr_t attr;
|
||||
struct sigevent ev;
|
||||
struct timer *tmr;
|
||||
int ret;
|
||||
struct rtobj_node *obj;
|
||||
struct pthread *curthread;
|
||||
int ret, err;
|
||||
|
||||
/* Call syscall directly if it is not SIGEV_THREAD */
|
||||
if (evp == NULL || evp->sigev_notify != SIGEV_THREAD)
|
||||
return (__sys_timer_create(clockid, evp, timerid));
|
||||
|
||||
/* Otherwise, do all magical things. */
|
||||
tmr = malloc(sizeof(*tmr));
|
||||
if (__predict_false(tmr == NULL)) {
|
||||
obj = rtobj_alloc(SI_TIMER, evp);
|
||||
if (obj == NULL) {
|
||||
errno = EAGAIN;
|
||||
return (-1);
|
||||
}
|
||||
tmr->value = evp->sigev_value;
|
||||
/* XXX
|
||||
* Here we pass second parameter an overrun count, this is
|
||||
* not required by POSIX.
|
||||
*/
|
||||
tmr->function = (void (*)(union sigval, int))
|
||||
evp->sigev_notify_function;
|
||||
tmr->flags = 0;
|
||||
tmr->timerid = -1;
|
||||
pthread_attr_init(&attr);
|
||||
if (evp->sigev_notify_attributes != NULL) {
|
||||
*attr = **(pthread_attr_t *)(evp->sigev_notify_attributes);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
}
|
||||
|
||||
tmr->gen = atomic_fetchadd_int(&timer_gen, 1);
|
||||
tmr->tn = create_timer_thread(attr);
|
||||
if (__predict_false(tmr->tn == NULL)) {
|
||||
free(tmr);
|
||||
obj->rt_tn = thread_create(evp->sigev_notify_attributes);
|
||||
if (obj->rt_tn == NULL) {
|
||||
rtobj_free(obj);
|
||||
errno = EAGAIN;
|
||||
return (-1);
|
||||
}
|
||||
|
||||
/*
|
||||
* Build a new sigevent, and tell kernel to deliver
|
||||
* SIGTIMER signal to the new thread.
|
||||
* Build a new sigevent, and tell kernel to deliver SIGSERVICE
|
||||
* signal to the new thread.
|
||||
*/
|
||||
ev.sigev_notify = SIGEV_THREAD_ID;
|
||||
ev.sigev_signo = SIGTIMER;
|
||||
ev.sigev_notify_thread_id = (lwpid_t)tmr->tn->thread->tid;
|
||||
ev.sigev_value.sival_int = tmr->gen;
|
||||
ret = __sys_timer_create(clockid, &ev, &tmr->timerid);
|
||||
if (__predict_false(ret != 0 || register_timer(tmr) != 0)) {
|
||||
ret = errno;
|
||||
release_timer_thread(tmr->tn);
|
||||
free(tmr);
|
||||
errno = ret;
|
||||
ev.sigev_signo = SIGSERVICE;
|
||||
ev.sigev_notify_thread_id = (lwpid_t)obj->rt_tn->tn_thread->tid;
|
||||
ev.sigev_value.sival_int = obj->rt_gen;
|
||||
ret = __sys_timer_create(clockid, &ev, &obj->rt_timerid);
|
||||
if (ret != 0) {
|
||||
err = errno;
|
||||
thread_release(obj->rt_tn);
|
||||
rtobj_free(obj);
|
||||
errno = err;
|
||||
return (-1);
|
||||
}
|
||||
*timerid = tmr->timerid;
|
||||
curthread = _get_curthread();
|
||||
HASH_LOCK(curthread);
|
||||
rtobj_register(obj);
|
||||
HASH_UNLOCK(curthread);
|
||||
*timerid = obj->rt_timerid;
|
||||
return (0);
|
||||
}
|
||||
|
||||
@ -164,209 +258,256 @@ int
|
||||
__timer_delete(timer_t timerid)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
struct timer *tmr = NULL;
|
||||
long flags;
|
||||
|
||||
TIMERS_LOCK(curthread);
|
||||
/*
|
||||
* Check if this is a SIGEV_THREAD timer by looking up
|
||||
* it in the registered list.
|
||||
*/
|
||||
if (timerid >= 0 && timerid < timer_max &&
|
||||
(tmr = timer_list[timerid]) != NULL) {
|
||||
/* Take it from timer list */
|
||||
timer_list[timerid] = NULL;
|
||||
/* If the timer is servicing, allow it to complete. */
|
||||
while ((flags = tmr->flags) & WORKING) {
|
||||
tmr->flags |= WANTED;
|
||||
TIMERS_UNLOCK(curthread);
|
||||
_thr_umtx_wait(&tmr->flags, flags, NULL);
|
||||
TIMERS_LOCK(curthread);
|
||||
}
|
||||
TIMERS_UNLOCK(curthread);
|
||||
/*
|
||||
* Drop reference count of servicing thread,
|
||||
* may free the thread.
|
||||
*/
|
||||
release_timer_thread(tmr->tn);
|
||||
} else
|
||||
TIMERS_UNLOCK(curthread);
|
||||
if (tmr != NULL)
|
||||
free(tmr);
|
||||
HASH_LOCK(curthread);
|
||||
rtobj_delete(SI_TIMER, timerid);
|
||||
HASH_UNLOCK(curthread);
|
||||
return (__sys_timer_delete(timerid));
|
||||
}
|
||||
|
||||
static struct thread_node *
|
||||
create_timer_thread(pthread_attr_t attr)
|
||||
typedef void (*timer_func)(union sigval val, int timerid, int overrun);
|
||||
|
||||
static void
|
||||
timer_dispatch(struct rtobj_node *obj, int overrun)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
timer_func f = obj->rt_func;
|
||||
|
||||
f(obj->rt_value, obj->rt_timerid, overrun);
|
||||
}
|
||||
|
||||
int
|
||||
___mq_close(mqd_t mqd)
|
||||
{
|
||||
struct pthread *curthread;
|
||||
int ret;
|
||||
|
||||
curthread = _get_curthread();
|
||||
HASH_LOCK(curthread);
|
||||
rtobj_delete(SI_MESGQ, mqd);
|
||||
ret = __mq_close(mqd);
|
||||
HASH_UNLOCK(curthread);
|
||||
return (ret);
|
||||
}
|
||||
|
||||
int
|
||||
__mq_notify(mqd_t mqd, const struct sigevent *evp)
|
||||
{
|
||||
struct sigevent ev;
|
||||
struct rtobj_node *obj;
|
||||
struct pthread *curthread;
|
||||
int ret, err;
|
||||
|
||||
curthread = _get_curthread();
|
||||
|
||||
HASH_LOCK(curthread);
|
||||
rtobj_delete(SI_MESGQ, mqd);
|
||||
if (evp == NULL || evp->sigev_notify != SIGEV_THREAD) {
|
||||
ret = __sys_mq_notify(mqd, evp);
|
||||
HASH_UNLOCK(curthread);
|
||||
return (ret);
|
||||
}
|
||||
HASH_UNLOCK(curthread);
|
||||
|
||||
obj = rtobj_alloc(SI_MESGQ, evp);
|
||||
if (obj == NULL) {
|
||||
errno = EAGAIN;
|
||||
return (-1);
|
||||
}
|
||||
obj->rt_tn = thread_create(evp->sigev_notify_attributes);
|
||||
if (obj->rt_tn == NULL) {
|
||||
rtobj_free(obj);
|
||||
errno = EAGAIN;
|
||||
return (-1);
|
||||
}
|
||||
obj->rt_mqd = mqd;
|
||||
/*
|
||||
* Build a new sigevent, and tell kernel to deliver SIGSERVICE
|
||||
* signal to the new thread.
|
||||
*/
|
||||
ev.sigev_notify = SIGEV_THREAD_ID;
|
||||
ev.sigev_signo = SIGSERVICE;
|
||||
ev.sigev_notify_thread_id = (lwpid_t)obj->rt_tn->tn_thread->tid;
|
||||
ev.sigev_value.sival_int = obj->rt_gen;
|
||||
HASH_LOCK(curthread);
|
||||
ret = __sys_mq_notify(mqd, &ev);
|
||||
if (ret != 0) {
|
||||
err = errno;
|
||||
HASH_UNLOCK(curthread);
|
||||
thread_release(obj->rt_tn);
|
||||
rtobj_free(obj);
|
||||
errno = err;
|
||||
return (-1);
|
||||
}
|
||||
rtobj_register(obj);
|
||||
HASH_UNLOCK(curthread);
|
||||
return (0);
|
||||
}
|
||||
|
||||
typedef void (*mq_func)(union sigval val, int mqd);
|
||||
|
||||
static void
|
||||
mq_dispatch(struct rtobj_node *obj)
|
||||
{
|
||||
mq_func f = obj->rt_func;
|
||||
|
||||
f(obj->rt_value, obj->rt_mqd);
|
||||
}
|
||||
|
||||
static struct thread_node *
|
||||
thread_create(pthread_attr_t *pattr)
|
||||
{
|
||||
pthread_attr_t attr;
|
||||
struct pthread *curthread;
|
||||
struct thread_node *tn;
|
||||
int ret;
|
||||
|
||||
curthread = _get_curthread();
|
||||
pthread_attr_init(&attr);
|
||||
if (pattr != NULL) {
|
||||
*attr = **(pthread_attr_t *)pattr;
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||
}
|
||||
|
||||
THREADS_LOCK(curthread);
|
||||
/* Search a thread matching the required pthread_attr. */
|
||||
TAILQ_FOREACH(tn, &timer_threads, link) {
|
||||
TAILQ_FOREACH(tn, &threads, tn_link) {
|
||||
if (attr->stackaddr_attr == NULL) {
|
||||
if (attr->sched_policy == tn->attr.sched_policy &&
|
||||
attr->sched_inherit == tn->attr.sched_inherit &&
|
||||
attr->prio == tn->attr.prio &&
|
||||
attr->stacksize_attr == tn->attr.stacksize_attr &&
|
||||
attr->guardsize_attr == tn->attr.guardsize_attr &&
|
||||
if (attr->sched_policy == tn->tn_attr.sched_policy &&
|
||||
attr->sched_inherit == tn->tn_attr.sched_inherit &&
|
||||
attr->prio == tn->tn_attr.prio &&
|
||||
attr->stacksize_attr == tn->tn_attr.stacksize_attr &&
|
||||
attr->guardsize_attr == tn->tn_attr.guardsize_attr &&
|
||||
((attr->flags & PTHREAD_SCOPE_SYSTEM) ==
|
||||
(tn->attr.flags & PTHREAD_SCOPE_SYSTEM)))
|
||||
(tn->tn_attr.flags & PTHREAD_SCOPE_SYSTEM)))
|
||||
break;
|
||||
} else {
|
||||
/*
|
||||
* Reuse the thread if it has same stack address,
|
||||
* because two threads can not run on same stack.
|
||||
*/
|
||||
if (attr->stackaddr_attr == tn->attr.stackaddr_attr)
|
||||
if (attr->stackaddr_attr == tn->tn_attr.stackaddr_attr)
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (tn != NULL) {
|
||||
tn->refcount++;
|
||||
tn->tn_refcount++;
|
||||
THREADS_UNLOCK(curthread);
|
||||
pthread_attr_destroy(&attr);
|
||||
return (tn);
|
||||
}
|
||||
tn = malloc(sizeof(*tn));
|
||||
tn->refcount = 1;
|
||||
tn->exit = 0;
|
||||
tn->attr = *attr;
|
||||
tn->curtmr = NULL;
|
||||
_thr_signal_block(curthread); /* SIGTIMER is also blocked. */
|
||||
TAILQ_INSERT_TAIL(&timer_threads, tn, link);
|
||||
ret = _pthread_create(&tn->thread, &attr, service_loop, tn);
|
||||
tn->tn_refcount = 1;
|
||||
tn->tn_exit = 0;
|
||||
tn->tn_attr = *attr;
|
||||
tn->tn_curobj = NULL;
|
||||
_thr_signal_block(curthread); /* SIGSERVICE is also blocked. */
|
||||
TAILQ_INSERT_TAIL(&threads, tn, tn_link);
|
||||
ret = _pthread_create(&tn->tn_thread, &attr, service_loop, tn);
|
||||
_thr_signal_unblock(curthread);
|
||||
if (ret != 0) {
|
||||
TAILQ_REMOVE(&timer_threads, tn, link);
|
||||
TAILQ_REMOVE(&threads, tn, tn_link);
|
||||
free(tn);
|
||||
tn = NULL;
|
||||
}
|
||||
THREADS_UNLOCK(curthread);
|
||||
pthread_attr_destroy(&attr);
|
||||
return (tn);
|
||||
}
|
||||
|
||||
static void
|
||||
release_timer_thread(struct thread_node *tn)
|
||||
thread_release(struct thread_node *tn)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
struct pthread *th;
|
||||
|
||||
THREADS_LOCK(curthread);
|
||||
if (--tn->refcount == 0) {
|
||||
/*
|
||||
* If I am the last user, current implement kills the
|
||||
* service thread, is this allowed by POSIX ? does
|
||||
* this hurt performance ?
|
||||
*/
|
||||
tn->exit = 1;
|
||||
th = tn->thread;
|
||||
_thr_send_sig(th, SIGTIMER);
|
||||
pthread_join(th, NULL);
|
||||
TAILQ_REMOVE(&timer_threads, tn, link);
|
||||
tn->tn_refcount--;
|
||||
#if 0
|
||||
if (tn->tn_refcount == 0) {
|
||||
struct pthread *th;
|
||||
tn->tn_exit = 1;
|
||||
th = tn->tn_thread;
|
||||
_thr_send_sig(th, SIGSERVICE);
|
||||
TAILQ_REMOVE(&threads, tn, tn_link);
|
||||
} else
|
||||
#endif
|
||||
tn = NULL;
|
||||
THREADS_UNLOCK(curthread);
|
||||
if (tn != NULL)
|
||||
free(tn);
|
||||
}
|
||||
|
||||
/* Register a SIGEV_THREAD timer. */
|
||||
static int
|
||||
register_timer(struct timer *tmr)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
struct timer **list;
|
||||
int count;
|
||||
|
||||
while ((count = timer_max) <= tmr->timerid) {
|
||||
if (count < 32)
|
||||
count = 32;
|
||||
while (count <= tmr->timerid)
|
||||
count <<= 1;
|
||||
list = malloc(count * sizeof(void *));
|
||||
memset(list, 0, count * sizeof(void *));
|
||||
if (list == NULL)
|
||||
return (-1);
|
||||
TIMERS_LOCK(curthread);
|
||||
if (timer_max >= count) {
|
||||
TIMERS_UNLOCK(curthread);
|
||||
free(list);
|
||||
continue;
|
||||
}
|
||||
memcpy(timer_list, list, timer_max * sizeof(void *));
|
||||
timer_list = list;
|
||||
timer_max = count;
|
||||
TIMERS_UNLOCK(curthread);
|
||||
}
|
||||
TIMERS_LOCK(curthread);
|
||||
timer_list[tmr->timerid] = tmr;
|
||||
TIMERS_UNLOCK(curthread);
|
||||
return (0);
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is called if user callback calls
|
||||
* pthread_exit() or pthread_cancel() for the thread.
|
||||
*/
|
||||
static void
|
||||
cleanup_thread(void *arg)
|
||||
thread_cleanup(void *arg)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
struct thread_node *tn = arg;
|
||||
|
||||
if (tn->exit == 0) {
|
||||
/* broken usercode is killing us. */
|
||||
if (tn->curtmr) {
|
||||
TIMERS_LOCK(curthread);
|
||||
tn->curtmr->flags &= ~WORKING;
|
||||
if (tn->curtmr->flags & WANTED)
|
||||
_thr_umtx_wake(&tn->curtmr->flags, INT_MAX);
|
||||
TIMERS_UNLOCK(curthread);
|
||||
if (tn->tn_exit == 0) {
|
||||
/* broken user code is killing us. */
|
||||
if (tn->tn_curobj != NULL) {
|
||||
HASH_LOCK(curthread);
|
||||
tn->tn_curobj->rt_flags &= ~RT_WORKING;
|
||||
if (tn->tn_curobj->rt_flags & RT_WANTED)
|
||||
_thr_umtx_wake(&tn->tn_curobj->rt_flags, INT_MAX);
|
||||
HASH_UNLOCK(curthread);
|
||||
}
|
||||
atomic_clear_int(&curthread->cancelflags, THR_CANCEL_EXITING);
|
||||
longjmp(tn->jbuf, 1);
|
||||
longjmp(tn->tn_jbuf, 1);
|
||||
}
|
||||
}
|
||||
|
||||
static void *
|
||||
service_loop(void *arg)
|
||||
{
|
||||
struct pthread *curthread = _get_curthread();
|
||||
struct thread_node *tn = arg;
|
||||
struct timer *tmr;
|
||||
siginfo_t si;
|
||||
sigset_t set;
|
||||
struct thread_node *tn;
|
||||
struct pthread *curthread;
|
||||
struct rtobj_node *obj;
|
||||
|
||||
tn = arg;
|
||||
curthread = _get_curthread();
|
||||
/*
|
||||
* Service thread should not be killed by callback, if user
|
||||
* tries to do so, the thread will be restarted.
|
||||
*/
|
||||
setjmp(tn->jbuf);
|
||||
setjmp(tn->tn_jbuf);
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
|
||||
sigemptyset(&set);
|
||||
sigaddset(&set, SIGTIMER);
|
||||
THR_CLEANUP_PUSH(curthread, cleanup_thread, tn);
|
||||
while (tn->exit == 0) {
|
||||
sigaddset(&set, SIGSERVICE);
|
||||
THR_CLEANUP_PUSH(curthread, thread_cleanup, tn);
|
||||
while (tn->tn_exit == 0) {
|
||||
if (__predict_false(__sys_sigwaitinfo(&set, &si) == -1 ||
|
||||
si.si_code != SI_TIMER))
|
||||
(si.si_code != SI_TIMER && si.si_code != SI_MESGQ)))
|
||||
continue;
|
||||
TIMERS_LOCK(curthread);
|
||||
if (si.si_timerid >= 0 && si.si_timerid < timer_max &&
|
||||
(tmr = timer_list[si.si_timerid]) != NULL &&
|
||||
si.si_value.sival_int == tmr->gen) {
|
||||
tmr->flags |= WORKING;
|
||||
TIMERS_UNLOCK(curthread);
|
||||
tn->curtmr = tmr;
|
||||
tmr->function(tmr->value, si.si_overrun);
|
||||
tn->curtmr = NULL;
|
||||
TIMERS_LOCK(curthread);
|
||||
tmr->flags &= ~WORKING;
|
||||
if (tmr->flags & WANTED)
|
||||
_thr_umtx_wake(&tmr->flags, INT_MAX);
|
||||
HASH_LOCK(curthread);
|
||||
obj = rtobj_find(si.si_code, si.si_timerid);
|
||||
if (obj && (obj->rt_gen == si.si_value.sival_int)) {
|
||||
obj->rt_flags |= RT_WORKING;
|
||||
HASH_UNLOCK(curthread);
|
||||
tn->tn_curobj = obj;
|
||||
if (si.si_code == SI_TIMER)
|
||||
timer_dispatch(obj, si.si_overrun);
|
||||
else if (si.si_code == SI_MESGQ)
|
||||
mq_dispatch(obj);
|
||||
tn->tn_curobj = NULL;
|
||||
HASH_LOCK(curthread);
|
||||
obj->rt_flags &= ~RT_WORKING;
|
||||
if (obj->rt_flags & RT_WANTED)
|
||||
_thr_umtx_wake(&obj->rt_flags, INT_MAX);
|
||||
else if (si.si_code == SI_MESGQ) {
|
||||
/*
|
||||
* mq_notify is oneshot event, should remove
|
||||
* atomatically by the thread.
|
||||
*/
|
||||
rtobj_delete_obj(obj);
|
||||
}
|
||||
}
|
||||
TIMERS_UNLOCK(curthread);
|
||||
HASH_UNLOCK(curthread);
|
||||
}
|
||||
THR_CLEANUP_POP(curthread, 0);
|
||||
return (0);
|
||||
|
Loading…
Reference in New Issue
Block a user