freebsd-nq/contrib/ntp/libntp/work_thread.c
2018-02-28 07:59:55 +00:00

929 lines
23 KiB
C

/*
* work_thread.c - threads implementation for blocking worker child.
*/
#include <config.h>
#include "ntp_workimpl.h"
#ifdef WORK_THREAD
#include <stdio.h>
#include <ctype.h>
#include <signal.h>
#ifndef SYS_WINNT
#include <pthread.h>
#endif
#include "ntp_stdlib.h"
#include "ntp_malloc.h"
#include "ntp_syslog.h"
#include "ntpd.h"
#include "ntp_io.h"
#include "ntp_assert.h"
#include "ntp_unixtime.h"
#include "timespecops.h"
#include "ntp_worker.h"
#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
#define CHILD_GONE_RESP CHILD_EXIT_REQ
/* Queue size increments:
* The request queue grows a bit faster than the response queue -- the
* daemon can push requests and pull results faster on avarage than the
* worker can process requests and push results... If this really pays
* off is debatable.
*/
#define WORKITEMS_ALLOC_INC 16
#define RESPONSES_ALLOC_INC 4
/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
* set the maximum to 256kB. If the minimum goes below the
* system-defined minimum stack size, we have to adjust accordingly.
*/
#ifndef THREAD_MINSTACKSIZE
# define THREAD_MINSTACKSIZE (64U * 1024)
#endif
#ifndef __sun
#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
# undef THREAD_MINSTACKSIZE
# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
#endif
#endif
#ifndef THREAD_MAXSTACKSIZE
# define THREAD_MAXSTACKSIZE (256U * 1024)
#endif
#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
# undef THREAD_MAXSTACKSIZE
# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
#endif
#ifdef SYS_WINNT
# define thread_exit(c) _endthreadex(c)
# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
u_int WINAPI blocking_thread(void *);
static BOOL same_os_sema(const sem_ref obj, void * osobj);
#else
# define thread_exit(c) pthread_exit((void*)(size_t)(c))
# define tickle_sem sem_post
void * blocking_thread(void *);
static void block_thread_signals(sigset_t *);
#endif
#ifdef WORK_PIPE
addremove_io_fd_func addremove_io_fd;
#else
addremove_io_semaphore_func addremove_io_semaphore;
#endif
static void start_blocking_thread(blocking_child *);
static void start_blocking_thread_internal(blocking_child *);
static void prepare_child_sems(blocking_child *);
static int wait_for_sem(sem_ref, struct timespec *);
static int ensure_workitems_empty_slot(blocking_child *);
static int ensure_workresp_empty_slot(blocking_child *);
static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
static void cleanup_after_child(blocking_child *);
static sema_type worker_mmutex;
static sem_ref worker_memlock;
/* --------------------------------------------------------------------
* locking the global worker state table (and other global stuff)
*/
void
worker_global_lock(
int inOrOut)
{
if (worker_memlock) {
if (inOrOut)
wait_for_sem(worker_memlock, NULL);
else
tickle_sem(worker_memlock);
}
}
/* --------------------------------------------------------------------
* implementation isolation wrapper
*/
void
exit_worker(
int exitcode
)
{
thread_exit(exitcode); /* see #define thread_exit */
}
/* --------------------------------------------------------------------
* sleep for a given time or until the wakup semaphore is tickled.
*/
int
worker_sleep(
blocking_child * c,
time_t seconds
)
{
struct timespec until;
int rc;
# ifdef HAVE_CLOCK_GETTIME
if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
return -1;
}
# else
if (0 != getclock(TIMEOFDAY, &until)) {
msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
return -1;
}
# endif
until.tv_sec += seconds;
rc = wait_for_sem(c->wake_scheduled_sleep, &until);
if (0 == rc)
return -1;
if (-1 == rc && ETIMEDOUT == errno)
return 0;
msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
return -1;
}
/* --------------------------------------------------------------------
* Wake up a worker that takes a nap.
*/
void
interrupt_worker_sleep(void)
{
u_int idx;
blocking_child * c;
for (idx = 0; idx < blocking_children_alloc; idx++) {
c = blocking_children[idx];
if (NULL == c || NULL == c->wake_scheduled_sleep)
continue;
tickle_sem(c->wake_scheduled_sleep);
}
}
/* --------------------------------------------------------------------
* Make sure there is an empty slot at the head of the request
* queue. Tell if the queue is currently empty.
*/
static int
ensure_workitems_empty_slot(
blocking_child *c
)
{
/*
** !!! PRECONDITION: caller holds access lock!
**
** This simply tries to increase the size of the buffer if it
** becomes full. The resize operation does *not* maintain the
** order of requests, but that should be irrelevant since the
** processing is considered asynchronous anyway.
**
** Return if the buffer is currently empty.
*/
static const size_t each =
sizeof(blocking_children[0]->workitems[0]);
size_t new_alloc;
size_t slots_used;
size_t sidx;
slots_used = c->head_workitem - c->tail_workitem;
if (slots_used >= c->workitems_alloc) {
new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
c->workitems = erealloc(c->workitems, new_alloc * each);
for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
c->workitems[sidx] = NULL;
c->tail_workitem = 0;
c->head_workitem = c->workitems_alloc;
c->workitems_alloc = new_alloc;
}
INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
return (0 == slots_used);
}
/* --------------------------------------------------------------------
* Make sure there is an empty slot at the head of the response
* queue. Tell if the queue is currently empty.
*/
static int
ensure_workresp_empty_slot(
blocking_child *c
)
{
/*
** !!! PRECONDITION: caller holds access lock!
**
** Works like the companion function above.
*/
static const size_t each =
sizeof(blocking_children[0]->responses[0]);
size_t new_alloc;
size_t slots_used;
size_t sidx;
slots_used = c->head_response - c->tail_response;
if (slots_used >= c->responses_alloc) {
new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
c->responses = erealloc(c->responses, new_alloc * each);
for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
c->responses[sidx] = NULL;
c->tail_response = 0;
c->head_response = c->responses_alloc;
c->responses_alloc = new_alloc;
}
INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
return (0 == slots_used);
}
/* --------------------------------------------------------------------
* queue_req_pointer() - append a work item or idle exit request to
* blocking_workitems[]. Employ proper locking.
*/
static int
queue_req_pointer(
blocking_child * c,
blocking_pipe_header * hdr
)
{
size_t qhead;
/* >>>> ACCESS LOCKING STARTS >>>> */
wait_for_sem(c->accesslock, NULL);
ensure_workitems_empty_slot(c);
qhead = c->head_workitem;
c->workitems[qhead % c->workitems_alloc] = hdr;
c->head_workitem = 1 + qhead;
tickle_sem(c->accesslock);
/* <<<< ACCESS LOCKING ENDS <<<< */
/* queue consumer wake-up notification */
tickle_sem(c->workitems_pending);
return 0;
}
/* --------------------------------------------------------------------
* API function to make sure a worker is running, a proper private copy
* of the data is made, the data eneterd into the queue and the worker
* is signalled.
*/
int
send_blocking_req_internal(
blocking_child * c,
blocking_pipe_header * hdr,
void * data
)
{
blocking_pipe_header * threadcopy;
size_t payload_octets;
REQUIRE(hdr != NULL);
REQUIRE(data != NULL);
DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
if (hdr->octets <= sizeof(*hdr))
return 1; /* failure */
payload_octets = hdr->octets - sizeof(*hdr);
if (NULL == c->thread_ref)
start_blocking_thread(c);
threadcopy = emalloc(hdr->octets);
memcpy(threadcopy, hdr, sizeof(*hdr));
memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
return queue_req_pointer(c, threadcopy);
}
/* --------------------------------------------------------------------
* Wait for the 'incoming queue no longer empty' signal, lock the shared
* structure and dequeue an item.
*/
blocking_pipe_header *
receive_blocking_req_internal(
blocking_child * c
)
{
blocking_pipe_header * req;
size_t qhead, qtail;
req = NULL;
do {
/* wait for tickle from the producer side */
wait_for_sem(c->workitems_pending, NULL);
/* >>>> ACCESS LOCKING STARTS >>>> */
wait_for_sem(c->accesslock, NULL);
qhead = c->head_workitem;
do {
qtail = c->tail_workitem;
if (qhead == qtail)
break;
c->tail_workitem = qtail + 1;
qtail %= c->workitems_alloc;
req = c->workitems[qtail];
c->workitems[qtail] = NULL;
} while (NULL == req);
tickle_sem(c->accesslock);
/* <<<< ACCESS LOCKING ENDS <<<< */
} while (NULL == req);
INSIST(NULL != req);
if (CHILD_EXIT_REQ == req) { /* idled out */
send_blocking_resp_internal(c, CHILD_GONE_RESP);
req = NULL;
}
return req;
}
/* --------------------------------------------------------------------
* Push a response into the return queue and eventually tickle the
* receiver.
*/
int
send_blocking_resp_internal(
blocking_child * c,
blocking_pipe_header * resp
)
{
size_t qhead;
int empty;
/* >>>> ACCESS LOCKING STARTS >>>> */
wait_for_sem(c->accesslock, NULL);
empty = ensure_workresp_empty_slot(c);
qhead = c->head_response;
c->responses[qhead % c->responses_alloc] = resp;
c->head_response = 1 + qhead;
tickle_sem(c->accesslock);
/* <<<< ACCESS LOCKING ENDS <<<< */
/* queue consumer wake-up notification */
if (empty)
{
# ifdef WORK_PIPE
write(c->resp_write_pipe, "", 1);
# else
tickle_sem(c->responses_pending);
# endif
}
return 0;
}
#ifndef WORK_PIPE
/* --------------------------------------------------------------------
* Check if a (Windows-)hanndle to a semaphore is actually the same we
* are using inside the sema wrapper.
*/
static BOOL
same_os_sema(
const sem_ref obj,
void* osh
)
{
return obj && osh && (obj->shnd == (HANDLE)osh);
}
/* --------------------------------------------------------------------
* Find the shared context that associates to an OS handle and make sure
* the data is dequeued and processed.
*/
void
handle_blocking_resp_sem(
void * context
)
{
blocking_child * c;
u_int idx;
c = NULL;
for (idx = 0; idx < blocking_children_alloc; idx++) {
c = blocking_children[idx];
if (c != NULL &&
c->thread_ref != NULL &&
same_os_sema(c->responses_pending, context))
break;
}
if (idx < blocking_children_alloc)
process_blocking_resp(c);
}
#endif /* !WORK_PIPE */
/* --------------------------------------------------------------------
* Fetch the next response from the return queue. In case of signalling
* via pipe, make sure the pipe is flushed, too.
*/
blocking_pipe_header *
receive_blocking_resp_internal(
blocking_child * c
)
{
blocking_pipe_header * removed;
size_t qhead, qtail, slot;
#ifdef WORK_PIPE
int rc;
char scratch[32];
do
rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
while (-1 == rc && EINTR == errno);
#endif
/* >>>> ACCESS LOCKING STARTS >>>> */
wait_for_sem(c->accesslock, NULL);
qhead = c->head_response;
qtail = c->tail_response;
for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
slot = qtail % c->responses_alloc;
removed = c->responses[slot];
c->responses[slot] = NULL;
}
c->tail_response = qtail;
tickle_sem(c->accesslock);
/* <<<< ACCESS LOCKING ENDS <<<< */
if (NULL != removed) {
DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
BLOCKING_RESP_MAGIC == removed->magic_sig);
}
if (CHILD_GONE_RESP == removed) {
cleanup_after_child(c);
removed = NULL;
}
return removed;
}
/* --------------------------------------------------------------------
* Light up a new worker.
*/
static void
start_blocking_thread(
blocking_child * c
)
{
DEBUG_INSIST(!c->reusable);
prepare_child_sems(c);
start_blocking_thread_internal(c);
}
/* --------------------------------------------------------------------
* Create a worker thread. There are several differences between POSIX
* and Windows, of course -- most notably the Windows thread is no
* detached thread, and we keep the handle around until we want to get
* rid of the thread. The notification scheme also differs: Windows
* makes use of semaphores in both directions, POSIX uses a pipe for
* integration with 'select()' or alike.
*/
static void
start_blocking_thread_internal(
blocking_child * c
)
#ifdef SYS_WINNT
{
BOOL resumed;
c->thread_ref = NULL;
(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
c->thr_table[0].thnd =
(HANDLE)_beginthreadex(
NULL,
0,
&blocking_thread,
c,
CREATE_SUSPENDED,
NULL);
if (NULL == c->thr_table[0].thnd) {
msyslog(LOG_ERR, "start blocking thread failed: %m");
exit(-1);
}
/* remember the thread priority is only within the process class */
if (!SetThreadPriority(c->thr_table[0].thnd,
THREAD_PRIORITY_BELOW_NORMAL))
msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
resumed = ResumeThread(c->thr_table[0].thnd);
DEBUG_INSIST(resumed);
c->thread_ref = &c->thr_table[0];
}
#else /* pthreads start_blocking_thread_internal() follows */
{
# ifdef NEED_PTHREAD_INIT
static int pthread_init_called;
# endif
pthread_attr_t thr_attr;
int rc;
int pipe_ends[2]; /* read then write */
int is_pipe;
int flags;
size_t ostacksize;
size_t nstacksize;
sigset_t saved_sig_mask;
c->thread_ref = NULL;
# ifdef NEED_PTHREAD_INIT
/*
* from lib/isc/unix/app.c:
* BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
*/
if (!pthread_init_called) {
pthread_init();
pthread_init_called = TRUE;
}
# endif
rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
if (0 != rc) {
msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
exit(1);
}
c->resp_read_pipe = move_fd(pipe_ends[0]);
c->resp_write_pipe = move_fd(pipe_ends[1]);
c->ispipe = is_pipe;
flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
if (-1 == flags) {
msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
exit(1);
}
rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
if (-1 == rc) {
msyslog(LOG_ERR,
"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
exit(1);
}
(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
if (0 != rc) {
msyslog(LOG_ERR,
"start_blocking_thread: pthread_attr_getstacksize() -> %s",
strerror(rc));
} else {
if (ostacksize < THREAD_MINSTACKSIZE)
nstacksize = THREAD_MINSTACKSIZE;
else if (ostacksize > THREAD_MAXSTACKSIZE)
nstacksize = THREAD_MAXSTACKSIZE;
else
nstacksize = ostacksize;
if (nstacksize != ostacksize)
rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
if (0 != rc)
msyslog(LOG_ERR,
"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
(u_long)ostacksize, (u_long)nstacksize,
strerror(rc));
}
#else
UNUSED_ARG(nstacksize);
UNUSED_ARG(ostacksize);
#endif
#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
#endif
c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
block_thread_signals(&saved_sig_mask);
rc = pthread_create(&c->thr_table[0], &thr_attr,
&blocking_thread, c);
pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
pthread_attr_destroy(&thr_attr);
if (0 != rc) {
msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
strerror(rc));
exit(1);
}
c->thread_ref = &c->thr_table[0];
}
#endif
/* --------------------------------------------------------------------
* block_thread_signals()
*
* Temporarily block signals used by ntpd main thread, so that signal
* mask inherited by child threads leaves them blocked. Returns prior
* active signal mask via pmask, to be restored by the main thread
* after pthread_create().
*/
#ifndef SYS_WINNT
void
block_thread_signals(
sigset_t * pmask
)
{
sigset_t block;
sigemptyset(&block);
# ifdef HAVE_SIGNALED_IO
# ifdef SIGIO
sigaddset(&block, SIGIO);
# endif
# ifdef SIGPOLL
sigaddset(&block, SIGPOLL);
# endif
# endif /* HAVE_SIGNALED_IO */
sigaddset(&block, SIGALRM);
sigaddset(&block, MOREDEBUGSIG);
sigaddset(&block, LESSDEBUGSIG);
# ifdef SIGDIE1
sigaddset(&block, SIGDIE1);
# endif
# ifdef SIGDIE2
sigaddset(&block, SIGDIE2);
# endif
# ifdef SIGDIE3
sigaddset(&block, SIGDIE3);
# endif
# ifdef SIGDIE4
sigaddset(&block, SIGDIE4);
# endif
# ifdef SIGBUS
sigaddset(&block, SIGBUS);
# endif
sigemptyset(pmask);
pthread_sigmask(SIG_BLOCK, &block, pmask);
}
#endif /* !SYS_WINNT */
/* --------------------------------------------------------------------
* Create & destroy semaphores. This is sufficiently different between
* POSIX and Windows to warrant wrapper functions and close enough to
* use the concept of synchronization via semaphore for all platforms.
*/
static sem_ref
create_sema(
sema_type* semptr,
u_int inival,
u_int maxval)
{
#ifdef SYS_WINNT
long svini, svmax;
if (NULL != semptr) {
svini = (inival < LONG_MAX)
? (long)inival : LONG_MAX;
svmax = (maxval < LONG_MAX && maxval > 0)
? (long)maxval : LONG_MAX;
semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
if (NULL == semptr->shnd)
semptr = NULL;
}
#else
(void)maxval;
if (semptr && sem_init(semptr, FALSE, inival))
semptr = NULL;
#endif
return semptr;
}
/* ------------------------------------------------------------------ */
static sem_ref
delete_sema(
sem_ref obj)
{
# ifdef SYS_WINNT
if (obj) {
if (obj->shnd)
CloseHandle(obj->shnd);
obj->shnd = NULL;
}
# else
if (obj)
sem_destroy(obj);
# endif
return NULL;
}
/* --------------------------------------------------------------------
* prepare_child_sems()
*
* create sync & access semaphores
*
* All semaphores are cleared, only the access semaphore has 1 unit.
* Childs wait on 'workitems_pending', then grabs 'sema_access'
* and dequeues jobs. When done, 'sema_access' is given one unit back.
*
* The producer grabs 'sema_access', manages the queue, restores
* 'sema_access' and puts one unit into 'workitems_pending'.
*
* The story goes the same for the response queue.
*/
static void
prepare_child_sems(
blocking_child *c
)
{
if (NULL == worker_memlock)
worker_memlock = create_sema(&worker_mmutex, 1, 1);
c->accesslock = create_sema(&c->sem_table[0], 1, 1);
c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
# ifndef WORK_PIPE
c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
# endif
}
/* --------------------------------------------------------------------
* wait for semaphore. Where the wait can be interrupted, it will
* internally resume -- When this function returns, there is either no
* semaphore at all, a timeout occurred, or the caller could
* successfully take a token from the semaphore.
*
* For untimed wait, not checking the result of this function at all is
* definitely an option.
*/
static int
wait_for_sem(
sem_ref sem,
struct timespec * timeout /* wall-clock */
)
#ifdef SYS_WINNT
{
struct timespec now;
struct timespec delta;
DWORD msec;
DWORD rc;
if (!(sem && sem->shnd)) {
errno = EINVAL;
return -1;
}
if (NULL == timeout) {
msec = INFINITE;
} else {
getclock(TIMEOFDAY, &now);
delta = sub_tspec(*timeout, now);
if (delta.tv_sec < 0) {
msec = 0;
} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
msec = INFINITE;
} else {
msec = 1000 * (DWORD)delta.tv_sec;
msec += delta.tv_nsec / (1000 * 1000);
}
}
rc = WaitForSingleObject(sem->shnd, msec);
if (WAIT_OBJECT_0 == rc)
return 0;
if (WAIT_TIMEOUT == rc) {
errno = ETIMEDOUT;
return -1;
}
msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
errno = EFAULT;
return -1;
}
#else /* pthreads wait_for_sem() follows */
{
int rc = -1;
if (sem) do {
if (NULL == timeout)
rc = sem_wait(sem);
else
rc = sem_timedwait(sem, timeout);
} while (rc == -1 && errno == EINTR);
else
errno = EINVAL;
return rc;
}
#endif
/* --------------------------------------------------------------------
* blocking_thread - thread functions have WINAPI (aka 'stdcall')
* calling conventions under Windows and POSIX-defined signature
* otherwise.
*/
#ifdef SYS_WINNT
u_int WINAPI
#else
void *
#endif
blocking_thread(
void * ThreadArg
)
{
blocking_child *c;
c = ThreadArg;
exit_worker(blocking_child_common(c));
/* NOTREACHED */
return 0;
}
/* --------------------------------------------------------------------
* req_child_exit() runs in the parent.
*
* This function is called from from the idle timer, too, and possibly
* without a thread being there any longer. Since we have folded up our
* tent in that case and all the semaphores are already gone, we simply
* ignore this request in this case.
*
* Since the existence of the semaphores is controlled exclusively by
* the parent, there's no risk of data race here.
*/
int
req_child_exit(
blocking_child *c
)
{
return (c->accesslock)
? queue_req_pointer(c, CHILD_EXIT_REQ)
: 0;
}
/* --------------------------------------------------------------------
* cleanup_after_child() runs in parent.
*/
static void
cleanup_after_child(
blocking_child * c
)
{
DEBUG_INSIST(!c->reusable);
# ifdef SYS_WINNT
/* The thread was not created in detached state, so we better
* clean up.
*/
if (c->thread_ref && c->thread_ref->thnd) {
WaitForSingleObject(c->thread_ref->thnd, INFINITE);
INSIST(CloseHandle(c->thread_ref->thnd));
c->thread_ref->thnd = NULL;
}
# endif
c->thread_ref = NULL;
/* remove semaphores and (if signalling vi IO) pipes */
c->accesslock = delete_sema(c->accesslock);
c->workitems_pending = delete_sema(c->workitems_pending);
c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
# ifdef WORK_PIPE
DEBUG_INSIST(-1 != c->resp_read_pipe);
DEBUG_INSIST(-1 != c->resp_write_pipe);
(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
close(c->resp_write_pipe);
close(c->resp_read_pipe);
c->resp_write_pipe = -1;
c->resp_read_pipe = -1;
# else
DEBUG_INSIST(NULL != c->responses_pending);
(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
c->responses_pending = delete_sema(c->responses_pending);
# endif
/* Is it necessary to check if there are pending requests and
* responses? If so, and if there are, what to do with them?
*/
/* re-init buffer index sequencers */
c->head_workitem = 0;
c->tail_workitem = 0;
c->head_response = 0;
c->tail_response = 0;
c->reusable = TRUE;
}
#else /* !WORK_THREAD follows */
char work_thread_nonempty_compilation_unit;
#endif