929 lines
23 KiB
C
929 lines
23 KiB
C
/*
|
|
* work_thread.c - threads implementation for blocking worker child.
|
|
*/
|
|
#include <config.h>
|
|
#include "ntp_workimpl.h"
|
|
|
|
#ifdef WORK_THREAD
|
|
|
|
#include <stdio.h>
|
|
#include <ctype.h>
|
|
#include <signal.h>
|
|
#ifndef SYS_WINNT
|
|
#include <pthread.h>
|
|
#endif
|
|
|
|
#include "ntp_stdlib.h"
|
|
#include "ntp_malloc.h"
|
|
#include "ntp_syslog.h"
|
|
#include "ntpd.h"
|
|
#include "ntp_io.h"
|
|
#include "ntp_assert.h"
|
|
#include "ntp_unixtime.h"
|
|
#include "timespecops.h"
|
|
#include "ntp_worker.h"
|
|
|
|
#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
|
|
#define CHILD_GONE_RESP CHILD_EXIT_REQ
|
|
/* Queue size increments:
|
|
* The request queue grows a bit faster than the response queue -- the
|
|
* daemon can push requests and pull results faster on avarage than the
|
|
* worker can process requests and push results... If this really pays
|
|
* off is debatable.
|
|
*/
|
|
#define WORKITEMS_ALLOC_INC 16
|
|
#define RESPONSES_ALLOC_INC 4
|
|
|
|
/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
|
|
* set the maximum to 256kB. If the minimum goes below the
|
|
* system-defined minimum stack size, we have to adjust accordingly.
|
|
*/
|
|
#ifndef THREAD_MINSTACKSIZE
|
|
# define THREAD_MINSTACKSIZE (64U * 1024)
|
|
#endif
|
|
#ifndef __sun
|
|
#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
|
|
# undef THREAD_MINSTACKSIZE
|
|
# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
|
|
#endif
|
|
#endif
|
|
|
|
#ifndef THREAD_MAXSTACKSIZE
|
|
# define THREAD_MAXSTACKSIZE (256U * 1024)
|
|
#endif
|
|
#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
|
|
# undef THREAD_MAXSTACKSIZE
|
|
# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
|
|
#endif
|
|
|
|
|
|
#ifdef SYS_WINNT
|
|
|
|
# define thread_exit(c) _endthreadex(c)
|
|
# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
|
|
u_int WINAPI blocking_thread(void *);
|
|
static BOOL same_os_sema(const sem_ref obj, void * osobj);
|
|
|
|
#else
|
|
|
|
# define thread_exit(c) pthread_exit((void*)(size_t)(c))
|
|
# define tickle_sem sem_post
|
|
void * blocking_thread(void *);
|
|
static void block_thread_signals(sigset_t *);
|
|
|
|
#endif
|
|
|
|
#ifdef WORK_PIPE
|
|
addremove_io_fd_func addremove_io_fd;
|
|
#else
|
|
addremove_io_semaphore_func addremove_io_semaphore;
|
|
#endif
|
|
|
|
static void start_blocking_thread(blocking_child *);
|
|
static void start_blocking_thread_internal(blocking_child *);
|
|
static void prepare_child_sems(blocking_child *);
|
|
static int wait_for_sem(sem_ref, struct timespec *);
|
|
static int ensure_workitems_empty_slot(blocking_child *);
|
|
static int ensure_workresp_empty_slot(blocking_child *);
|
|
static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
|
|
static void cleanup_after_child(blocking_child *);
|
|
|
|
static sema_type worker_mmutex;
|
|
static sem_ref worker_memlock;
|
|
|
|
/* --------------------------------------------------------------------
|
|
* locking the global worker state table (and other global stuff)
|
|
*/
|
|
void
|
|
worker_global_lock(
|
|
int inOrOut)
|
|
{
|
|
if (worker_memlock) {
|
|
if (inOrOut)
|
|
wait_for_sem(worker_memlock, NULL);
|
|
else
|
|
tickle_sem(worker_memlock);
|
|
}
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* implementation isolation wrapper
|
|
*/
|
|
void
|
|
exit_worker(
|
|
int exitcode
|
|
)
|
|
{
|
|
thread_exit(exitcode); /* see #define thread_exit */
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* sleep for a given time or until the wakup semaphore is tickled.
|
|
*/
|
|
int
|
|
worker_sleep(
|
|
blocking_child * c,
|
|
time_t seconds
|
|
)
|
|
{
|
|
struct timespec until;
|
|
int rc;
|
|
|
|
# ifdef HAVE_CLOCK_GETTIME
|
|
if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
|
|
msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
|
|
return -1;
|
|
}
|
|
# else
|
|
if (0 != getclock(TIMEOFDAY, &until)) {
|
|
msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
|
|
return -1;
|
|
}
|
|
# endif
|
|
until.tv_sec += seconds;
|
|
rc = wait_for_sem(c->wake_scheduled_sleep, &until);
|
|
if (0 == rc)
|
|
return -1;
|
|
if (-1 == rc && ETIMEDOUT == errno)
|
|
return 0;
|
|
msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
|
|
return -1;
|
|
}
|
|
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Wake up a worker that takes a nap.
|
|
*/
|
|
void
|
|
interrupt_worker_sleep(void)
|
|
{
|
|
u_int idx;
|
|
blocking_child * c;
|
|
|
|
for (idx = 0; idx < blocking_children_alloc; idx++) {
|
|
c = blocking_children[idx];
|
|
if (NULL == c || NULL == c->wake_scheduled_sleep)
|
|
continue;
|
|
tickle_sem(c->wake_scheduled_sleep);
|
|
}
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Make sure there is an empty slot at the head of the request
|
|
* queue. Tell if the queue is currently empty.
|
|
*/
|
|
static int
|
|
ensure_workitems_empty_slot(
|
|
blocking_child *c
|
|
)
|
|
{
|
|
/*
|
|
** !!! PRECONDITION: caller holds access lock!
|
|
**
|
|
** This simply tries to increase the size of the buffer if it
|
|
** becomes full. The resize operation does *not* maintain the
|
|
** order of requests, but that should be irrelevant since the
|
|
** processing is considered asynchronous anyway.
|
|
**
|
|
** Return if the buffer is currently empty.
|
|
*/
|
|
|
|
static const size_t each =
|
|
sizeof(blocking_children[0]->workitems[0]);
|
|
|
|
size_t new_alloc;
|
|
size_t slots_used;
|
|
size_t sidx;
|
|
|
|
slots_used = c->head_workitem - c->tail_workitem;
|
|
if (slots_used >= c->workitems_alloc) {
|
|
new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
|
|
c->workitems = erealloc(c->workitems, new_alloc * each);
|
|
for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
|
|
c->workitems[sidx] = NULL;
|
|
c->tail_workitem = 0;
|
|
c->head_workitem = c->workitems_alloc;
|
|
c->workitems_alloc = new_alloc;
|
|
}
|
|
INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
|
|
return (0 == slots_used);
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Make sure there is an empty slot at the head of the response
|
|
* queue. Tell if the queue is currently empty.
|
|
*/
|
|
static int
|
|
ensure_workresp_empty_slot(
|
|
blocking_child *c
|
|
)
|
|
{
|
|
/*
|
|
** !!! PRECONDITION: caller holds access lock!
|
|
**
|
|
** Works like the companion function above.
|
|
*/
|
|
|
|
static const size_t each =
|
|
sizeof(blocking_children[0]->responses[0]);
|
|
|
|
size_t new_alloc;
|
|
size_t slots_used;
|
|
size_t sidx;
|
|
|
|
slots_used = c->head_response - c->tail_response;
|
|
if (slots_used >= c->responses_alloc) {
|
|
new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
|
|
c->responses = erealloc(c->responses, new_alloc * each);
|
|
for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
|
|
c->responses[sidx] = NULL;
|
|
c->tail_response = 0;
|
|
c->head_response = c->responses_alloc;
|
|
c->responses_alloc = new_alloc;
|
|
}
|
|
INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
|
|
return (0 == slots_used);
|
|
}
|
|
|
|
|
|
/* --------------------------------------------------------------------
|
|
* queue_req_pointer() - append a work item or idle exit request to
|
|
* blocking_workitems[]. Employ proper locking.
|
|
*/
|
|
static int
|
|
queue_req_pointer(
|
|
blocking_child * c,
|
|
blocking_pipe_header * hdr
|
|
)
|
|
{
|
|
size_t qhead;
|
|
|
|
/* >>>> ACCESS LOCKING STARTS >>>> */
|
|
wait_for_sem(c->accesslock, NULL);
|
|
ensure_workitems_empty_slot(c);
|
|
qhead = c->head_workitem;
|
|
c->workitems[qhead % c->workitems_alloc] = hdr;
|
|
c->head_workitem = 1 + qhead;
|
|
tickle_sem(c->accesslock);
|
|
/* <<<< ACCESS LOCKING ENDS <<<< */
|
|
|
|
/* queue consumer wake-up notification */
|
|
tickle_sem(c->workitems_pending);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* API function to make sure a worker is running, a proper private copy
|
|
* of the data is made, the data eneterd into the queue and the worker
|
|
* is signalled.
|
|
*/
|
|
int
|
|
send_blocking_req_internal(
|
|
blocking_child * c,
|
|
blocking_pipe_header * hdr,
|
|
void * data
|
|
)
|
|
{
|
|
blocking_pipe_header * threadcopy;
|
|
size_t payload_octets;
|
|
|
|
REQUIRE(hdr != NULL);
|
|
REQUIRE(data != NULL);
|
|
DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
|
|
|
|
if (hdr->octets <= sizeof(*hdr))
|
|
return 1; /* failure */
|
|
payload_octets = hdr->octets - sizeof(*hdr);
|
|
|
|
if (NULL == c->thread_ref)
|
|
start_blocking_thread(c);
|
|
threadcopy = emalloc(hdr->octets);
|
|
memcpy(threadcopy, hdr, sizeof(*hdr));
|
|
memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
|
|
|
|
return queue_req_pointer(c, threadcopy);
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Wait for the 'incoming queue no longer empty' signal, lock the shared
|
|
* structure and dequeue an item.
|
|
*/
|
|
blocking_pipe_header *
|
|
receive_blocking_req_internal(
|
|
blocking_child * c
|
|
)
|
|
{
|
|
blocking_pipe_header * req;
|
|
size_t qhead, qtail;
|
|
|
|
req = NULL;
|
|
do {
|
|
/* wait for tickle from the producer side */
|
|
wait_for_sem(c->workitems_pending, NULL);
|
|
|
|
/* >>>> ACCESS LOCKING STARTS >>>> */
|
|
wait_for_sem(c->accesslock, NULL);
|
|
qhead = c->head_workitem;
|
|
do {
|
|
qtail = c->tail_workitem;
|
|
if (qhead == qtail)
|
|
break;
|
|
c->tail_workitem = qtail + 1;
|
|
qtail %= c->workitems_alloc;
|
|
req = c->workitems[qtail];
|
|
c->workitems[qtail] = NULL;
|
|
} while (NULL == req);
|
|
tickle_sem(c->accesslock);
|
|
/* <<<< ACCESS LOCKING ENDS <<<< */
|
|
|
|
} while (NULL == req);
|
|
|
|
INSIST(NULL != req);
|
|
if (CHILD_EXIT_REQ == req) { /* idled out */
|
|
send_blocking_resp_internal(c, CHILD_GONE_RESP);
|
|
req = NULL;
|
|
}
|
|
|
|
return req;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Push a response into the return queue and eventually tickle the
|
|
* receiver.
|
|
*/
|
|
int
|
|
send_blocking_resp_internal(
|
|
blocking_child * c,
|
|
blocking_pipe_header * resp
|
|
)
|
|
{
|
|
size_t qhead;
|
|
int empty;
|
|
|
|
/* >>>> ACCESS LOCKING STARTS >>>> */
|
|
wait_for_sem(c->accesslock, NULL);
|
|
empty = ensure_workresp_empty_slot(c);
|
|
qhead = c->head_response;
|
|
c->responses[qhead % c->responses_alloc] = resp;
|
|
c->head_response = 1 + qhead;
|
|
tickle_sem(c->accesslock);
|
|
/* <<<< ACCESS LOCKING ENDS <<<< */
|
|
|
|
/* queue consumer wake-up notification */
|
|
if (empty)
|
|
{
|
|
# ifdef WORK_PIPE
|
|
write(c->resp_write_pipe, "", 1);
|
|
# else
|
|
tickle_sem(c->responses_pending);
|
|
# endif
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
#ifndef WORK_PIPE
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Check if a (Windows-)hanndle to a semaphore is actually the same we
|
|
* are using inside the sema wrapper.
|
|
*/
|
|
static BOOL
|
|
same_os_sema(
|
|
const sem_ref obj,
|
|
void* osh
|
|
)
|
|
{
|
|
return obj && osh && (obj->shnd == (HANDLE)osh);
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Find the shared context that associates to an OS handle and make sure
|
|
* the data is dequeued and processed.
|
|
*/
|
|
void
|
|
handle_blocking_resp_sem(
|
|
void * context
|
|
)
|
|
{
|
|
blocking_child * c;
|
|
u_int idx;
|
|
|
|
c = NULL;
|
|
for (idx = 0; idx < blocking_children_alloc; idx++) {
|
|
c = blocking_children[idx];
|
|
if (c != NULL &&
|
|
c->thread_ref != NULL &&
|
|
same_os_sema(c->responses_pending, context))
|
|
break;
|
|
}
|
|
if (idx < blocking_children_alloc)
|
|
process_blocking_resp(c);
|
|
}
|
|
#endif /* !WORK_PIPE */
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Fetch the next response from the return queue. In case of signalling
|
|
* via pipe, make sure the pipe is flushed, too.
|
|
*/
|
|
blocking_pipe_header *
|
|
receive_blocking_resp_internal(
|
|
blocking_child * c
|
|
)
|
|
{
|
|
blocking_pipe_header * removed;
|
|
size_t qhead, qtail, slot;
|
|
|
|
#ifdef WORK_PIPE
|
|
int rc;
|
|
char scratch[32];
|
|
|
|
do
|
|
rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
|
|
while (-1 == rc && EINTR == errno);
|
|
#endif
|
|
|
|
/* >>>> ACCESS LOCKING STARTS >>>> */
|
|
wait_for_sem(c->accesslock, NULL);
|
|
qhead = c->head_response;
|
|
qtail = c->tail_response;
|
|
for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
|
|
slot = qtail % c->responses_alloc;
|
|
removed = c->responses[slot];
|
|
c->responses[slot] = NULL;
|
|
}
|
|
c->tail_response = qtail;
|
|
tickle_sem(c->accesslock);
|
|
/* <<<< ACCESS LOCKING ENDS <<<< */
|
|
|
|
if (NULL != removed) {
|
|
DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
|
|
BLOCKING_RESP_MAGIC == removed->magic_sig);
|
|
}
|
|
if (CHILD_GONE_RESP == removed) {
|
|
cleanup_after_child(c);
|
|
removed = NULL;
|
|
}
|
|
|
|
return removed;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Light up a new worker.
|
|
*/
|
|
static void
|
|
start_blocking_thread(
|
|
blocking_child * c
|
|
)
|
|
{
|
|
|
|
DEBUG_INSIST(!c->reusable);
|
|
|
|
prepare_child_sems(c);
|
|
start_blocking_thread_internal(c);
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Create a worker thread. There are several differences between POSIX
|
|
* and Windows, of course -- most notably the Windows thread is no
|
|
* detached thread, and we keep the handle around until we want to get
|
|
* rid of the thread. The notification scheme also differs: Windows
|
|
* makes use of semaphores in both directions, POSIX uses a pipe for
|
|
* integration with 'select()' or alike.
|
|
*/
|
|
static void
|
|
start_blocking_thread_internal(
|
|
blocking_child * c
|
|
)
|
|
#ifdef SYS_WINNT
|
|
{
|
|
BOOL resumed;
|
|
|
|
c->thread_ref = NULL;
|
|
(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
|
|
c->thr_table[0].thnd =
|
|
(HANDLE)_beginthreadex(
|
|
NULL,
|
|
0,
|
|
&blocking_thread,
|
|
c,
|
|
CREATE_SUSPENDED,
|
|
NULL);
|
|
|
|
if (NULL == c->thr_table[0].thnd) {
|
|
msyslog(LOG_ERR, "start blocking thread failed: %m");
|
|
exit(-1);
|
|
}
|
|
/* remember the thread priority is only within the process class */
|
|
if (!SetThreadPriority(c->thr_table[0].thnd,
|
|
THREAD_PRIORITY_BELOW_NORMAL))
|
|
msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
|
|
|
|
resumed = ResumeThread(c->thr_table[0].thnd);
|
|
DEBUG_INSIST(resumed);
|
|
c->thread_ref = &c->thr_table[0];
|
|
}
|
|
#else /* pthreads start_blocking_thread_internal() follows */
|
|
{
|
|
# ifdef NEED_PTHREAD_INIT
|
|
static int pthread_init_called;
|
|
# endif
|
|
pthread_attr_t thr_attr;
|
|
int rc;
|
|
int pipe_ends[2]; /* read then write */
|
|
int is_pipe;
|
|
int flags;
|
|
size_t ostacksize;
|
|
size_t nstacksize;
|
|
sigset_t saved_sig_mask;
|
|
|
|
c->thread_ref = NULL;
|
|
|
|
# ifdef NEED_PTHREAD_INIT
|
|
/*
|
|
* from lib/isc/unix/app.c:
|
|
* BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
|
|
*/
|
|
if (!pthread_init_called) {
|
|
pthread_init();
|
|
pthread_init_called = TRUE;
|
|
}
|
|
# endif
|
|
|
|
rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
|
|
if (0 != rc) {
|
|
msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
|
|
exit(1);
|
|
}
|
|
c->resp_read_pipe = move_fd(pipe_ends[0]);
|
|
c->resp_write_pipe = move_fd(pipe_ends[1]);
|
|
c->ispipe = is_pipe;
|
|
flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
|
|
if (-1 == flags) {
|
|
msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
|
|
exit(1);
|
|
}
|
|
rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
|
|
if (-1 == rc) {
|
|
msyslog(LOG_ERR,
|
|
"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
|
|
exit(1);
|
|
}
|
|
(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
|
|
pthread_attr_init(&thr_attr);
|
|
pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
|
|
#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
|
|
defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
|
|
rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
|
|
if (0 != rc) {
|
|
msyslog(LOG_ERR,
|
|
"start_blocking_thread: pthread_attr_getstacksize() -> %s",
|
|
strerror(rc));
|
|
} else {
|
|
if (ostacksize < THREAD_MINSTACKSIZE)
|
|
nstacksize = THREAD_MINSTACKSIZE;
|
|
else if (ostacksize > THREAD_MAXSTACKSIZE)
|
|
nstacksize = THREAD_MAXSTACKSIZE;
|
|
else
|
|
nstacksize = ostacksize;
|
|
if (nstacksize != ostacksize)
|
|
rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
|
|
if (0 != rc)
|
|
msyslog(LOG_ERR,
|
|
"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
|
|
(u_long)ostacksize, (u_long)nstacksize,
|
|
strerror(rc));
|
|
}
|
|
#else
|
|
UNUSED_ARG(nstacksize);
|
|
UNUSED_ARG(ostacksize);
|
|
#endif
|
|
#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
|
|
pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
|
|
#endif
|
|
c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
|
|
block_thread_signals(&saved_sig_mask);
|
|
rc = pthread_create(&c->thr_table[0], &thr_attr,
|
|
&blocking_thread, c);
|
|
pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
|
|
pthread_attr_destroy(&thr_attr);
|
|
if (0 != rc) {
|
|
msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
|
|
strerror(rc));
|
|
exit(1);
|
|
}
|
|
c->thread_ref = &c->thr_table[0];
|
|
}
|
|
#endif
|
|
|
|
/* --------------------------------------------------------------------
|
|
* block_thread_signals()
|
|
*
|
|
* Temporarily block signals used by ntpd main thread, so that signal
|
|
* mask inherited by child threads leaves them blocked. Returns prior
|
|
* active signal mask via pmask, to be restored by the main thread
|
|
* after pthread_create().
|
|
*/
|
|
#ifndef SYS_WINNT
|
|
void
|
|
block_thread_signals(
|
|
sigset_t * pmask
|
|
)
|
|
{
|
|
sigset_t block;
|
|
|
|
sigemptyset(&block);
|
|
# ifdef HAVE_SIGNALED_IO
|
|
# ifdef SIGIO
|
|
sigaddset(&block, SIGIO);
|
|
# endif
|
|
# ifdef SIGPOLL
|
|
sigaddset(&block, SIGPOLL);
|
|
# endif
|
|
# endif /* HAVE_SIGNALED_IO */
|
|
sigaddset(&block, SIGALRM);
|
|
sigaddset(&block, MOREDEBUGSIG);
|
|
sigaddset(&block, LESSDEBUGSIG);
|
|
# ifdef SIGDIE1
|
|
sigaddset(&block, SIGDIE1);
|
|
# endif
|
|
# ifdef SIGDIE2
|
|
sigaddset(&block, SIGDIE2);
|
|
# endif
|
|
# ifdef SIGDIE3
|
|
sigaddset(&block, SIGDIE3);
|
|
# endif
|
|
# ifdef SIGDIE4
|
|
sigaddset(&block, SIGDIE4);
|
|
# endif
|
|
# ifdef SIGBUS
|
|
sigaddset(&block, SIGBUS);
|
|
# endif
|
|
sigemptyset(pmask);
|
|
pthread_sigmask(SIG_BLOCK, &block, pmask);
|
|
}
|
|
#endif /* !SYS_WINNT */
|
|
|
|
|
|
/* --------------------------------------------------------------------
|
|
* Create & destroy semaphores. This is sufficiently different between
|
|
* POSIX and Windows to warrant wrapper functions and close enough to
|
|
* use the concept of synchronization via semaphore for all platforms.
|
|
*/
|
|
static sem_ref
|
|
create_sema(
|
|
sema_type* semptr,
|
|
u_int inival,
|
|
u_int maxval)
|
|
{
|
|
#ifdef SYS_WINNT
|
|
|
|
long svini, svmax;
|
|
if (NULL != semptr) {
|
|
svini = (inival < LONG_MAX)
|
|
? (long)inival : LONG_MAX;
|
|
svmax = (maxval < LONG_MAX && maxval > 0)
|
|
? (long)maxval : LONG_MAX;
|
|
semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
|
|
if (NULL == semptr->shnd)
|
|
semptr = NULL;
|
|
}
|
|
|
|
#else
|
|
|
|
(void)maxval;
|
|
if (semptr && sem_init(semptr, FALSE, inival))
|
|
semptr = NULL;
|
|
|
|
#endif
|
|
|
|
return semptr;
|
|
}
|
|
|
|
/* ------------------------------------------------------------------ */
|
|
static sem_ref
|
|
delete_sema(
|
|
sem_ref obj)
|
|
{
|
|
|
|
# ifdef SYS_WINNT
|
|
|
|
if (obj) {
|
|
if (obj->shnd)
|
|
CloseHandle(obj->shnd);
|
|
obj->shnd = NULL;
|
|
}
|
|
|
|
# else
|
|
|
|
if (obj)
|
|
sem_destroy(obj);
|
|
|
|
# endif
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* prepare_child_sems()
|
|
*
|
|
* create sync & access semaphores
|
|
*
|
|
* All semaphores are cleared, only the access semaphore has 1 unit.
|
|
* Childs wait on 'workitems_pending', then grabs 'sema_access'
|
|
* and dequeues jobs. When done, 'sema_access' is given one unit back.
|
|
*
|
|
* The producer grabs 'sema_access', manages the queue, restores
|
|
* 'sema_access' and puts one unit into 'workitems_pending'.
|
|
*
|
|
* The story goes the same for the response queue.
|
|
*/
|
|
static void
|
|
prepare_child_sems(
|
|
blocking_child *c
|
|
)
|
|
{
|
|
if (NULL == worker_memlock)
|
|
worker_memlock = create_sema(&worker_mmutex, 1, 1);
|
|
|
|
c->accesslock = create_sema(&c->sem_table[0], 1, 1);
|
|
c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
|
|
c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
|
|
# ifndef WORK_PIPE
|
|
c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
|
|
# endif
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* wait for semaphore. Where the wait can be interrupted, it will
|
|
* internally resume -- When this function returns, there is either no
|
|
* semaphore at all, a timeout occurred, or the caller could
|
|
* successfully take a token from the semaphore.
|
|
*
|
|
* For untimed wait, not checking the result of this function at all is
|
|
* definitely an option.
|
|
*/
|
|
static int
|
|
wait_for_sem(
|
|
sem_ref sem,
|
|
struct timespec * timeout /* wall-clock */
|
|
)
|
|
#ifdef SYS_WINNT
|
|
{
|
|
struct timespec now;
|
|
struct timespec delta;
|
|
DWORD msec;
|
|
DWORD rc;
|
|
|
|
if (!(sem && sem->shnd)) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
|
|
if (NULL == timeout) {
|
|
msec = INFINITE;
|
|
} else {
|
|
getclock(TIMEOFDAY, &now);
|
|
delta = sub_tspec(*timeout, now);
|
|
if (delta.tv_sec < 0) {
|
|
msec = 0;
|
|
} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
|
|
msec = INFINITE;
|
|
} else {
|
|
msec = 1000 * (DWORD)delta.tv_sec;
|
|
msec += delta.tv_nsec / (1000 * 1000);
|
|
}
|
|
}
|
|
rc = WaitForSingleObject(sem->shnd, msec);
|
|
if (WAIT_OBJECT_0 == rc)
|
|
return 0;
|
|
if (WAIT_TIMEOUT == rc) {
|
|
errno = ETIMEDOUT;
|
|
return -1;
|
|
}
|
|
msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
|
|
errno = EFAULT;
|
|
return -1;
|
|
}
|
|
#else /* pthreads wait_for_sem() follows */
|
|
{
|
|
int rc = -1;
|
|
|
|
if (sem) do {
|
|
if (NULL == timeout)
|
|
rc = sem_wait(sem);
|
|
else
|
|
rc = sem_timedwait(sem, timeout);
|
|
} while (rc == -1 && errno == EINTR);
|
|
else
|
|
errno = EINVAL;
|
|
|
|
return rc;
|
|
}
|
|
#endif
|
|
|
|
/* --------------------------------------------------------------------
|
|
* blocking_thread - thread functions have WINAPI (aka 'stdcall')
|
|
* calling conventions under Windows and POSIX-defined signature
|
|
* otherwise.
|
|
*/
|
|
#ifdef SYS_WINNT
|
|
u_int WINAPI
|
|
#else
|
|
void *
|
|
#endif
|
|
blocking_thread(
|
|
void * ThreadArg
|
|
)
|
|
{
|
|
blocking_child *c;
|
|
|
|
c = ThreadArg;
|
|
exit_worker(blocking_child_common(c));
|
|
|
|
/* NOTREACHED */
|
|
return 0;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* req_child_exit() runs in the parent.
|
|
*
|
|
* This function is called from from the idle timer, too, and possibly
|
|
* without a thread being there any longer. Since we have folded up our
|
|
* tent in that case and all the semaphores are already gone, we simply
|
|
* ignore this request in this case.
|
|
*
|
|
* Since the existence of the semaphores is controlled exclusively by
|
|
* the parent, there's no risk of data race here.
|
|
*/
|
|
int
|
|
req_child_exit(
|
|
blocking_child *c
|
|
)
|
|
{
|
|
return (c->accesslock)
|
|
? queue_req_pointer(c, CHILD_EXIT_REQ)
|
|
: 0;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------
|
|
* cleanup_after_child() runs in parent.
|
|
*/
|
|
static void
|
|
cleanup_after_child(
|
|
blocking_child * c
|
|
)
|
|
{
|
|
DEBUG_INSIST(!c->reusable);
|
|
|
|
# ifdef SYS_WINNT
|
|
/* The thread was not created in detached state, so we better
|
|
* clean up.
|
|
*/
|
|
if (c->thread_ref && c->thread_ref->thnd) {
|
|
WaitForSingleObject(c->thread_ref->thnd, INFINITE);
|
|
INSIST(CloseHandle(c->thread_ref->thnd));
|
|
c->thread_ref->thnd = NULL;
|
|
}
|
|
# endif
|
|
c->thread_ref = NULL;
|
|
|
|
/* remove semaphores and (if signalling vi IO) pipes */
|
|
|
|
c->accesslock = delete_sema(c->accesslock);
|
|
c->workitems_pending = delete_sema(c->workitems_pending);
|
|
c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
|
|
|
|
# ifdef WORK_PIPE
|
|
DEBUG_INSIST(-1 != c->resp_read_pipe);
|
|
DEBUG_INSIST(-1 != c->resp_write_pipe);
|
|
(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
|
|
close(c->resp_write_pipe);
|
|
close(c->resp_read_pipe);
|
|
c->resp_write_pipe = -1;
|
|
c->resp_read_pipe = -1;
|
|
# else
|
|
DEBUG_INSIST(NULL != c->responses_pending);
|
|
(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
|
|
c->responses_pending = delete_sema(c->responses_pending);
|
|
# endif
|
|
|
|
/* Is it necessary to check if there are pending requests and
|
|
* responses? If so, and if there are, what to do with them?
|
|
*/
|
|
|
|
/* re-init buffer index sequencers */
|
|
c->head_workitem = 0;
|
|
c->tail_workitem = 0;
|
|
c->head_response = 0;
|
|
c->tail_response = 0;
|
|
|
|
c->reusable = TRUE;
|
|
}
|
|
|
|
|
|
#else /* !WORK_THREAD follows */
|
|
char work_thread_nonempty_compilation_unit;
|
|
#endif
|