0aadc82afb
components: apr-1.4.6 -> 1.4.8 and apr-util-1.4.1 -> 1.5.2. This is a post point-zero bug-fix / fix-sharp-edges release, including some workarounds for UTF-8 for people who haven't yet turned on WITH_ICONV.
957 lines
28 KiB
C
957 lines
28 KiB
C
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed
|
|
* with this work for additional information regarding copyright
|
|
* ownership. The ASF licenses this file to you under the Apache
|
|
* License, Version 2.0 (the "License"); you may not use this file
|
|
* except in compliance with the License. You may obtain a copy of
|
|
* the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
* implied. See the License for the specific language governing
|
|
* permissions and limitations under the License.
|
|
*/
|
|
|
|
#include <assert.h>
|
|
#include "apr_thread_pool.h"
|
|
#include "apr_ring.h"
|
|
#include "apr_thread_cond.h"
|
|
#include "apr_portable.h"
|
|
|
|
#if APR_HAS_THREADS
|
|
|
|
#define TASK_PRIORITY_SEGS 4
|
|
#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
|
|
|
|
typedef struct apr_thread_pool_task
|
|
{
|
|
APR_RING_ENTRY(apr_thread_pool_task) link;
|
|
apr_thread_start_t func;
|
|
void *param;
|
|
void *owner;
|
|
union
|
|
{
|
|
apr_byte_t priority;
|
|
apr_time_t time;
|
|
} dispatch;
|
|
} apr_thread_pool_task_t;
|
|
|
|
APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
|
|
|
|
struct apr_thread_list_elt
|
|
{
|
|
APR_RING_ENTRY(apr_thread_list_elt) link;
|
|
apr_thread_t *thd;
|
|
volatile void *current_owner;
|
|
volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
|
|
};
|
|
|
|
APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
|
|
|
|
struct apr_thread_pool
|
|
{
|
|
apr_pool_t *pool;
|
|
volatile apr_size_t thd_max;
|
|
volatile apr_size_t idle_max;
|
|
volatile apr_interval_time_t idle_wait;
|
|
volatile apr_size_t thd_cnt;
|
|
volatile apr_size_t idle_cnt;
|
|
volatile apr_size_t task_cnt;
|
|
volatile apr_size_t scheduled_task_cnt;
|
|
volatile apr_size_t threshold;
|
|
volatile apr_size_t tasks_run;
|
|
volatile apr_size_t tasks_high;
|
|
volatile apr_size_t thd_high;
|
|
volatile apr_size_t thd_timed_out;
|
|
struct apr_thread_pool_tasks *tasks;
|
|
struct apr_thread_pool_tasks *scheduled_tasks;
|
|
struct apr_thread_list *busy_thds;
|
|
struct apr_thread_list *idle_thds;
|
|
apr_thread_mutex_t *lock;
|
|
apr_thread_cond_t *cond;
|
|
volatile int terminated;
|
|
struct apr_thread_pool_tasks *recycled_tasks;
|
|
struct apr_thread_list *recycled_thds;
|
|
apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
|
|
};
|
|
|
|
static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
|
|
apr_size_t init_threads,
|
|
apr_size_t max_threads)
|
|
{
|
|
apr_status_t rv;
|
|
int i;
|
|
|
|
me->thd_max = max_threads;
|
|
me->idle_max = init_threads;
|
|
me->threshold = init_threads / 2;
|
|
rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
|
|
me->pool);
|
|
if (APR_SUCCESS != rv) {
|
|
return rv;
|
|
}
|
|
rv = apr_thread_cond_create(&me->cond, me->pool);
|
|
if (APR_SUCCESS != rv) {
|
|
apr_thread_mutex_destroy(me->lock);
|
|
return rv;
|
|
}
|
|
me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
|
|
if (!me->tasks) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
|
|
me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
|
|
if (!me->scheduled_tasks) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
|
|
me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
|
|
if (!me->recycled_tasks) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
|
|
me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
|
|
if (!me->busy_thds) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
|
|
me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
|
|
if (!me->idle_thds) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
|
|
me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
|
|
if (!me->recycled_thds) {
|
|
goto CATCH_ENOMEM;
|
|
}
|
|
APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
|
|
me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
|
|
me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
|
|
me->idle_wait = 0;
|
|
me->terminated = 0;
|
|
for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
|
|
me->task_idx[i] = NULL;
|
|
}
|
|
goto FINAL_EXIT;
|
|
CATCH_ENOMEM:
|
|
rv = APR_ENOMEM;
|
|
apr_thread_mutex_destroy(me->lock);
|
|
apr_thread_cond_destroy(me->cond);
|
|
FINAL_EXIT:
|
|
return rv;
|
|
}
|
|
|
|
/*
|
|
* NOTE: This function is not thread safe by itself. Caller should hold the lock
|
|
*/
|
|
static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
|
|
{
|
|
apr_thread_pool_task_t *task = NULL;
|
|
int seg;
|
|
|
|
/* check for scheduled tasks */
|
|
if (me->scheduled_task_cnt > 0) {
|
|
task = APR_RING_FIRST(me->scheduled_tasks);
|
|
assert(task != NULL);
|
|
assert(task !=
|
|
APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
|
|
link));
|
|
/* if it's time */
|
|
if (task->dispatch.time <= apr_time_now()) {
|
|
--me->scheduled_task_cnt;
|
|
APR_RING_REMOVE(task, link);
|
|
return task;
|
|
}
|
|
}
|
|
/* check for normal tasks if we're not returning a scheduled task */
|
|
if (me->task_cnt == 0) {
|
|
return NULL;
|
|
}
|
|
|
|
task = APR_RING_FIRST(me->tasks);
|
|
assert(task != NULL);
|
|
assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
|
|
--me->task_cnt;
|
|
seg = TASK_PRIORITY_SEG(task);
|
|
if (task == me->task_idx[seg]) {
|
|
me->task_idx[seg] = APR_RING_NEXT(task, link);
|
|
if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
|
|
apr_thread_pool_task, link)
|
|
|| TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
|
|
me->task_idx[seg] = NULL;
|
|
}
|
|
}
|
|
APR_RING_REMOVE(task, link);
|
|
return task;
|
|
}
|
|
|
|
static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
|
|
{
|
|
apr_thread_pool_task_t *task = NULL;
|
|
|
|
task = APR_RING_FIRST(me->scheduled_tasks);
|
|
assert(task != NULL);
|
|
assert(task !=
|
|
APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
|
|
link));
|
|
return task->dispatch.time - apr_time_now();
|
|
}
|
|
|
|
/*
|
|
* NOTE: This function is not thread safe by itself. Caller should hold the lock
|
|
*/
|
|
static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
|
|
apr_thread_t * t)
|
|
{
|
|
struct apr_thread_list_elt *elt;
|
|
|
|
if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
|
|
elt = apr_pcalloc(me->pool, sizeof(*elt));
|
|
if (NULL == elt) {
|
|
return NULL;
|
|
}
|
|
}
|
|
else {
|
|
elt = APR_RING_FIRST(me->recycled_thds);
|
|
APR_RING_REMOVE(elt, link);
|
|
}
|
|
|
|
APR_RING_ELEM_INIT(elt, link);
|
|
elt->thd = t;
|
|
elt->current_owner = NULL;
|
|
elt->state = TH_RUN;
|
|
return elt;
|
|
}
|
|
|
|
/*
|
|
* The worker thread function. Take a task from the queue and perform it if
|
|
* there is any. Otherwise, put itself into the idle thread list and waiting
|
|
* for signal to wake up.
|
|
* The thread terminate directly by detach and exit when it is asked to stop
|
|
* after finishing a task. Otherwise, the thread should be in idle thread list
|
|
* and should be joined.
|
|
*/
|
|
static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
|
|
{
|
|
apr_thread_pool_t *me = param;
|
|
apr_thread_pool_task_t *task = NULL;
|
|
apr_interval_time_t wait;
|
|
struct apr_thread_list_elt *elt;
|
|
|
|
apr_thread_mutex_lock(me->lock);
|
|
elt = elt_new(me, t);
|
|
if (!elt) {
|
|
apr_thread_mutex_unlock(me->lock);
|
|
apr_thread_exit(t, APR_ENOMEM);
|
|
}
|
|
|
|
while (!me->terminated && elt->state != TH_STOP) {
|
|
/* Test if not new element, it is awakened from idle */
|
|
if (APR_RING_NEXT(elt, link) != elt) {
|
|
--me->idle_cnt;
|
|
APR_RING_REMOVE(elt, link);
|
|
}
|
|
|
|
APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
|
|
task = pop_task(me);
|
|
while (NULL != task && !me->terminated) {
|
|
++me->tasks_run;
|
|
elt->current_owner = task->owner;
|
|
apr_thread_mutex_unlock(me->lock);
|
|
apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
|
|
task->func(t, task->param);
|
|
apr_thread_mutex_lock(me->lock);
|
|
APR_RING_INSERT_TAIL(me->recycled_tasks, task,
|
|
apr_thread_pool_task, link);
|
|
elt->current_owner = NULL;
|
|
if (TH_STOP == elt->state) {
|
|
break;
|
|
}
|
|
task = pop_task(me);
|
|
}
|
|
assert(NULL == elt->current_owner);
|
|
if (TH_STOP != elt->state)
|
|
APR_RING_REMOVE(elt, link);
|
|
|
|
/* Test if a busy thread been asked to stop, which is not joinable */
|
|
if ((me->idle_cnt >= me->idle_max
|
|
&& !(me->scheduled_task_cnt && 0 >= me->idle_max)
|
|
&& !me->idle_wait)
|
|
|| me->terminated || elt->state != TH_RUN) {
|
|
--me->thd_cnt;
|
|
if ((TH_PROBATION == elt->state) && me->idle_wait)
|
|
++me->thd_timed_out;
|
|
APR_RING_INSERT_TAIL(me->recycled_thds, elt,
|
|
apr_thread_list_elt, link);
|
|
apr_thread_mutex_unlock(me->lock);
|
|
apr_thread_detach(t);
|
|
apr_thread_exit(t, APR_SUCCESS);
|
|
return NULL; /* should not be here, safe net */
|
|
}
|
|
|
|
/* busy thread become idle */
|
|
++me->idle_cnt;
|
|
APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
|
|
|
|
/*
|
|
* If there is a scheduled task, always scheduled to perform that task.
|
|
* Since there is no guarantee that current idle threads are scheduled
|
|
* for next scheduled task.
|
|
*/
|
|
if (me->scheduled_task_cnt)
|
|
wait = waiting_time(me);
|
|
else if (me->idle_cnt > me->idle_max) {
|
|
wait = me->idle_wait;
|
|
elt->state = TH_PROBATION;
|
|
}
|
|
else
|
|
wait = -1;
|
|
|
|
if (wait >= 0) {
|
|
apr_thread_cond_timedwait(me->cond, me->lock, wait);
|
|
}
|
|
else {
|
|
apr_thread_cond_wait(me->cond, me->lock);
|
|
}
|
|
}
|
|
|
|
/* idle thread been asked to stop, will be joined */
|
|
--me->thd_cnt;
|
|
apr_thread_mutex_unlock(me->lock);
|
|
apr_thread_exit(t, APR_SUCCESS);
|
|
return NULL; /* should not be here, safe net */
|
|
}
|
|
|
|
static apr_status_t thread_pool_cleanup(void *me)
|
|
{
|
|
apr_thread_pool_t *_myself = me;
|
|
|
|
_myself->terminated = 1;
|
|
apr_thread_pool_idle_max_set(_myself, 0);
|
|
while (_myself->thd_cnt) {
|
|
apr_sleep(20 * 1000); /* spin lock with 20 ms */
|
|
}
|
|
apr_thread_mutex_destroy(_myself->lock);
|
|
apr_thread_cond_destroy(_myself->cond);
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
|
|
apr_size_t init_threads,
|
|
apr_size_t max_threads,
|
|
apr_pool_t * pool)
|
|
{
|
|
apr_thread_t *t;
|
|
apr_status_t rv = APR_SUCCESS;
|
|
apr_thread_pool_t *tp;
|
|
|
|
*me = NULL;
|
|
tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
|
|
|
|
/*
|
|
* This pool will be used by different threads. As we cannot ensure that
|
|
* our caller won't use the pool without acquiring the mutex, we must
|
|
* create a new sub pool.
|
|
*/
|
|
rv = apr_pool_create(&tp->pool, pool);
|
|
if (APR_SUCCESS != rv)
|
|
return rv;
|
|
rv = thread_pool_construct(tp, init_threads, max_threads);
|
|
if (APR_SUCCESS != rv)
|
|
return rv;
|
|
apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
|
|
|
|
while (init_threads) {
|
|
/* Grab the mutex as apr_thread_create() and thread_pool_func() will
|
|
* allocate from (*me)->pool. This is dangerous if there are multiple
|
|
* initial threads to create.
|
|
*/
|
|
apr_thread_mutex_lock(tp->lock);
|
|
rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
|
|
apr_thread_mutex_unlock(tp->lock);
|
|
if (APR_SUCCESS != rv) {
|
|
break;
|
|
}
|
|
tp->thd_cnt++;
|
|
if (tp->thd_cnt > tp->thd_high) {
|
|
tp->thd_high = tp->thd_cnt;
|
|
}
|
|
--init_threads;
|
|
}
|
|
|
|
if (rv == APR_SUCCESS) {
|
|
*me = tp;
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
|
|
{
|
|
apr_pool_destroy(me->pool);
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* NOTE: This function is not thread safe by itself. Caller should hold the lock
|
|
*/
|
|
static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
|
|
apr_thread_start_t func,
|
|
void *param, apr_byte_t priority,
|
|
void *owner, apr_time_t time)
|
|
{
|
|
apr_thread_pool_task_t *t;
|
|
|
|
if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
|
|
t = apr_pcalloc(me->pool, sizeof(*t));
|
|
if (NULL == t) {
|
|
return NULL;
|
|
}
|
|
}
|
|
else {
|
|
t = APR_RING_FIRST(me->recycled_tasks);
|
|
APR_RING_REMOVE(t, link);
|
|
}
|
|
|
|
APR_RING_ELEM_INIT(t, link);
|
|
t->func = func;
|
|
t->param = param;
|
|
t->owner = owner;
|
|
if (time > 0) {
|
|
t->dispatch.time = apr_time_now() + time;
|
|
}
|
|
else {
|
|
t->dispatch.priority = priority;
|
|
}
|
|
return t;
|
|
}
|
|
|
|
/*
|
|
* Test it the task is the only one within the priority segment.
|
|
* If it is not, return the first element with same or lower priority.
|
|
* Otherwise, add the task into the queue and return NULL.
|
|
*
|
|
* NOTE: This function is not thread safe by itself. Caller should hold the lock
|
|
*/
|
|
static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
|
|
apr_thread_pool_task_t * const t)
|
|
{
|
|
int seg;
|
|
int next;
|
|
apr_thread_pool_task_t *t_next;
|
|
|
|
seg = TASK_PRIORITY_SEG(t);
|
|
if (me->task_idx[seg]) {
|
|
assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
|
|
me->task_idx[seg]);
|
|
t_next = me->task_idx[seg];
|
|
while (t_next->dispatch.priority > t->dispatch.priority) {
|
|
t_next = APR_RING_NEXT(t_next, link);
|
|
if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
|
|
t_next) {
|
|
return t_next;
|
|
}
|
|
}
|
|
return t_next;
|
|
}
|
|
|
|
for (next = seg - 1; next >= 0; next--) {
|
|
if (me->task_idx[next]) {
|
|
APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
|
|
break;
|
|
}
|
|
}
|
|
if (0 > next) {
|
|
APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
|
|
}
|
|
me->task_idx[seg] = t;
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* schedule a task to run in "time" microseconds. Find the spot in the ring where
|
|
* the time fits. Adjust the short_time so the thread wakes up when the time is reached.
|
|
*/
|
|
static apr_status_t schedule_task(apr_thread_pool_t *me,
|
|
apr_thread_start_t func, void *param,
|
|
void *owner, apr_interval_time_t time)
|
|
{
|
|
apr_thread_pool_task_t *t;
|
|
apr_thread_pool_task_t *t_loc;
|
|
apr_thread_t *thd;
|
|
apr_status_t rv = APR_SUCCESS;
|
|
apr_thread_mutex_lock(me->lock);
|
|
|
|
t = task_new(me, func, param, 0, owner, time);
|
|
if (NULL == t) {
|
|
apr_thread_mutex_unlock(me->lock);
|
|
return APR_ENOMEM;
|
|
}
|
|
t_loc = APR_RING_FIRST(me->scheduled_tasks);
|
|
while (NULL != t_loc) {
|
|
/* if the time is less than the entry insert ahead of it */
|
|
if (t->dispatch.time < t_loc->dispatch.time) {
|
|
++me->scheduled_task_cnt;
|
|
APR_RING_INSERT_BEFORE(t_loc, t, link);
|
|
break;
|
|
}
|
|
else {
|
|
t_loc = APR_RING_NEXT(t_loc, link);
|
|
if (t_loc ==
|
|
APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
|
|
link)) {
|
|
++me->scheduled_task_cnt;
|
|
APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
|
|
apr_thread_pool_task, link);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
/* there should be at least one thread for scheduled tasks */
|
|
if (0 == me->thd_cnt) {
|
|
rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
|
|
if (APR_SUCCESS == rv) {
|
|
++me->thd_cnt;
|
|
if (me->thd_cnt > me->thd_high)
|
|
me->thd_high = me->thd_cnt;
|
|
}
|
|
}
|
|
apr_thread_cond_signal(me->cond);
|
|
apr_thread_mutex_unlock(me->lock);
|
|
return rv;
|
|
}
|
|
|
|
static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
|
|
void *param, apr_byte_t priority, int push,
|
|
void *owner)
|
|
{
|
|
apr_thread_pool_task_t *t;
|
|
apr_thread_pool_task_t *t_loc;
|
|
apr_thread_t *thd;
|
|
apr_status_t rv = APR_SUCCESS;
|
|
|
|
apr_thread_mutex_lock(me->lock);
|
|
|
|
t = task_new(me, func, param, priority, owner, 0);
|
|
if (NULL == t) {
|
|
apr_thread_mutex_unlock(me->lock);
|
|
return APR_ENOMEM;
|
|
}
|
|
|
|
t_loc = add_if_empty(me, t);
|
|
if (NULL == t_loc) {
|
|
goto FINAL_EXIT;
|
|
}
|
|
|
|
if (push) {
|
|
while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
|
|
t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
|
|
t_loc = APR_RING_NEXT(t_loc, link);
|
|
}
|
|
}
|
|
APR_RING_INSERT_BEFORE(t_loc, t, link);
|
|
if (!push) {
|
|
if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
|
|
me->task_idx[TASK_PRIORITY_SEG(t)] = t;
|
|
}
|
|
}
|
|
|
|
FINAL_EXIT:
|
|
me->task_cnt++;
|
|
if (me->task_cnt > me->tasks_high)
|
|
me->tasks_high = me->task_cnt;
|
|
if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
|
|
me->task_cnt > me->threshold)) {
|
|
rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
|
|
if (APR_SUCCESS == rv) {
|
|
++me->thd_cnt;
|
|
if (me->thd_cnt > me->thd_high)
|
|
me->thd_high = me->thd_cnt;
|
|
}
|
|
}
|
|
|
|
apr_thread_cond_signal(me->cond);
|
|
apr_thread_mutex_unlock(me->lock);
|
|
|
|
return rv;
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
|
|
apr_thread_start_t func,
|
|
void *param,
|
|
apr_byte_t priority,
|
|
void *owner)
|
|
{
|
|
return add_task(me, func, param, priority, 1, owner);
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
|
|
apr_thread_start_t func,
|
|
void *param,
|
|
apr_interval_time_t time,
|
|
void *owner)
|
|
{
|
|
return schedule_task(me, func, param, owner, time);
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
|
|
apr_thread_start_t func,
|
|
void *param,
|
|
apr_byte_t priority,
|
|
void *owner)
|
|
{
|
|
return add_task(me, func, param, priority, 0, owner);
|
|
}
|
|
|
|
static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
|
|
void *owner)
|
|
{
|
|
apr_thread_pool_task_t *t_loc;
|
|
apr_thread_pool_task_t *next;
|
|
|
|
t_loc = APR_RING_FIRST(me->scheduled_tasks);
|
|
while (t_loc !=
|
|
APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
|
|
link)) {
|
|
next = APR_RING_NEXT(t_loc, link);
|
|
/* if this is the owner remove it */
|
|
if (t_loc->owner == owner) {
|
|
--me->scheduled_task_cnt;
|
|
APR_RING_REMOVE(t_loc, link);
|
|
}
|
|
t_loc = next;
|
|
}
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
|
|
{
|
|
apr_thread_pool_task_t *t_loc;
|
|
apr_thread_pool_task_t *next;
|
|
int seg;
|
|
|
|
t_loc = APR_RING_FIRST(me->tasks);
|
|
while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
|
|
next = APR_RING_NEXT(t_loc, link);
|
|
if (t_loc->owner == owner) {
|
|
--me->task_cnt;
|
|
seg = TASK_PRIORITY_SEG(t_loc);
|
|
if (t_loc == me->task_idx[seg]) {
|
|
me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
|
|
if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
|
|
apr_thread_pool_task,
|
|
link)
|
|
|| TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
|
|
me->task_idx[seg] = NULL;
|
|
}
|
|
}
|
|
APR_RING_REMOVE(t_loc, link);
|
|
}
|
|
t_loc = next;
|
|
}
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
|
|
{
|
|
#ifndef NDEBUG
|
|
apr_os_thread_t *os_thread;
|
|
#endif
|
|
struct apr_thread_list_elt *elt;
|
|
apr_thread_mutex_lock(me->lock);
|
|
elt = APR_RING_FIRST(me->busy_thds);
|
|
while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
|
|
if (elt->current_owner != owner) {
|
|
elt = APR_RING_NEXT(elt, link);
|
|
continue;
|
|
}
|
|
#ifndef NDEBUG
|
|
/* make sure the thread is not the one calling tasks_cancel */
|
|
apr_os_thread_get(&os_thread, elt->thd);
|
|
#ifdef WIN32
|
|
/* hack for apr win32 bug */
|
|
assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
|
|
#else
|
|
assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
|
|
#endif
|
|
#endif
|
|
while (elt->current_owner == owner) {
|
|
apr_thread_mutex_unlock(me->lock);
|
|
apr_sleep(200 * 1000);
|
|
apr_thread_mutex_lock(me->lock);
|
|
}
|
|
elt = APR_RING_FIRST(me->busy_thds);
|
|
}
|
|
apr_thread_mutex_unlock(me->lock);
|
|
return;
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
|
|
void *owner)
|
|
{
|
|
apr_status_t rv = APR_SUCCESS;
|
|
|
|
apr_thread_mutex_lock(me->lock);
|
|
if (me->task_cnt > 0) {
|
|
rv = remove_tasks(me, owner);
|
|
}
|
|
if (me->scheduled_task_cnt > 0) {
|
|
rv = remove_scheduled_tasks(me, owner);
|
|
}
|
|
apr_thread_mutex_unlock(me->lock);
|
|
wait_on_busy_threads(me, owner);
|
|
|
|
return rv;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
|
|
{
|
|
return me->task_cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t)
|
|
apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
|
|
{
|
|
return me->scheduled_task_cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
|
|
{
|
|
return me->thd_cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
|
|
{
|
|
return me->thd_cnt - me->idle_cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
|
|
{
|
|
return me->idle_cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t)
|
|
apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
|
|
{
|
|
return me->tasks_run;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t)
|
|
apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
|
|
{
|
|
return me->tasks_high;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t)
|
|
apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
|
|
{
|
|
return me->thd_high;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t)
|
|
apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
|
|
{
|
|
return me->thd_timed_out;
|
|
}
|
|
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
|
|
{
|
|
return me->idle_max;
|
|
}
|
|
|
|
APU_DECLARE(apr_interval_time_t)
|
|
apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
|
|
{
|
|
return me->idle_wait;
|
|
}
|
|
|
|
/*
|
|
* This function stop extra idle threads to the cnt.
|
|
* @return the number of threads stopped
|
|
* NOTE: There could be busy threads become idle during this function
|
|
*/
|
|
static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
|
|
apr_size_t *cnt, int idle)
|
|
{
|
|
struct apr_thread_list *thds;
|
|
apr_size_t n, n_dbg, i;
|
|
struct apr_thread_list_elt *head, *tail, *elt;
|
|
|
|
apr_thread_mutex_lock(me->lock);
|
|
if (idle) {
|
|
thds = me->idle_thds;
|
|
n = me->idle_cnt;
|
|
}
|
|
else {
|
|
thds = me->busy_thds;
|
|
n = me->thd_cnt - me->idle_cnt;
|
|
}
|
|
if (n <= *cnt) {
|
|
apr_thread_mutex_unlock(me->lock);
|
|
*cnt = 0;
|
|
return NULL;
|
|
}
|
|
n -= *cnt;
|
|
|
|
head = APR_RING_FIRST(thds);
|
|
for (i = 0; i < *cnt; i++) {
|
|
head = APR_RING_NEXT(head, link);
|
|
}
|
|
tail = APR_RING_LAST(thds);
|
|
if (idle) {
|
|
APR_RING_UNSPLICE(head, tail, link);
|
|
me->idle_cnt = *cnt;
|
|
}
|
|
|
|
n_dbg = 0;
|
|
for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
|
|
elt->state = TH_STOP;
|
|
n_dbg++;
|
|
}
|
|
elt->state = TH_STOP;
|
|
n_dbg++;
|
|
assert(n == n_dbg);
|
|
*cnt = n;
|
|
|
|
apr_thread_mutex_unlock(me->lock);
|
|
|
|
APR_RING_PREV(head, link) = NULL;
|
|
APR_RING_NEXT(tail, link) = NULL;
|
|
return head;
|
|
}
|
|
|
|
static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
|
|
{
|
|
apr_size_t n_dbg;
|
|
struct apr_thread_list_elt *elt, *head, *tail;
|
|
apr_status_t rv;
|
|
|
|
elt = trim_threads(me, &cnt, 1);
|
|
|
|
apr_thread_mutex_lock(me->lock);
|
|
apr_thread_cond_broadcast(me->cond);
|
|
apr_thread_mutex_unlock(me->lock);
|
|
|
|
n_dbg = 0;
|
|
if (NULL != (head = elt)) {
|
|
while (elt) {
|
|
tail = elt;
|
|
apr_thread_join(&rv, elt->thd);
|
|
elt = APR_RING_NEXT(elt, link);
|
|
++n_dbg;
|
|
}
|
|
apr_thread_mutex_lock(me->lock);
|
|
APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
|
|
apr_thread_list_elt, link);
|
|
apr_thread_mutex_unlock(me->lock);
|
|
}
|
|
assert(cnt == n_dbg);
|
|
|
|
return cnt;
|
|
}
|
|
|
|
/* don't join on busy threads for performance reasons, who knows how long will
|
|
* the task takes to perform
|
|
*/
|
|
static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
|
|
{
|
|
trim_threads(me, &cnt, 0);
|
|
return cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
|
|
apr_size_t cnt)
|
|
{
|
|
me->idle_max = cnt;
|
|
cnt = trim_idle_threads(me, cnt);
|
|
return cnt;
|
|
}
|
|
|
|
APU_DECLARE(apr_interval_time_t)
|
|
apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
|
|
apr_interval_time_t timeout)
|
|
{
|
|
apr_interval_time_t oldtime;
|
|
|
|
oldtime = me->idle_wait;
|
|
me->idle_wait = timeout;
|
|
|
|
return oldtime;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
|
|
{
|
|
return me->thd_max;
|
|
}
|
|
|
|
/*
|
|
* This function stop extra working threads to the new limit.
|
|
* NOTE: There could be busy threads become idle during this function
|
|
*/
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
|
|
apr_size_t cnt)
|
|
{
|
|
unsigned int n;
|
|
|
|
me->thd_max = cnt;
|
|
if (0 == cnt || me->thd_cnt <= cnt) {
|
|
return 0;
|
|
}
|
|
|
|
n = me->thd_cnt - cnt;
|
|
if (n >= me->idle_cnt) {
|
|
trim_busy_threads(me, n - me->idle_cnt);
|
|
trim_idle_threads(me, 0);
|
|
}
|
|
else {
|
|
trim_idle_threads(me, me->idle_cnt - n);
|
|
}
|
|
return n;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
|
|
{
|
|
return me->threshold;
|
|
}
|
|
|
|
APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
|
|
apr_size_t val)
|
|
{
|
|
apr_size_t ov;
|
|
|
|
ov = me->threshold;
|
|
me->threshold = val;
|
|
return ov;
|
|
}
|
|
|
|
APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
|
|
void **owner)
|
|
{
|
|
apr_status_t rv;
|
|
apr_thread_pool_task_t *task;
|
|
void *data;
|
|
|
|
rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
|
|
if (rv != APR_SUCCESS) {
|
|
return rv;
|
|
}
|
|
|
|
task = data;
|
|
if (!task) {
|
|
*owner = NULL;
|
|
return APR_BADARG;
|
|
}
|
|
|
|
*owner = task->owner;
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
#endif /* APR_HAS_THREADS */
|
|
|
|
/* vim: set ts=4 sw=4 et cin tw=80: */
|