Prevent live-lock and access of destroyed data in taskqueue_drain_all().
Phabric: https://reviews.freebsd.org/D1247 Reviewed by: jhb, avg Sponsored by: Spectra Logic Corporation sys/kern_subr_taskqueue.c: Modify taskqueue_drain_all() processing to use a temporary "barrier task", rather than rely on a user task that may be destroyed during taskqueue_drain_all()'s execution. The barrier task is queued behind all previously queued tasks and then has its priority elevated so that future tasks cannot pass it in the queue. Use a similar barrier scheme to drain threads processing current tasks. This requires taskqueue_run_locked() to insert and remove the taskqueue_busy object for the running thread for every task processed. share/man/man9/taskqueue.9: Remove warning about live-lock issues with taskqueue_drain_all() and indicate that it does not wait for tasks queued after it begins processing.
This commit is contained in:
parent
aad37c8145
commit
5b326a32ce
@ -28,7 +28,7 @@
|
||||
.\"
|
||||
.\" $FreeBSD$
|
||||
.\"
|
||||
.Dd May 24, 2014
|
||||
.Dd January 4, 2015
|
||||
.Dt TASKQUEUE 9
|
||||
.Os
|
||||
.Sh NAME
|
||||
@ -285,10 +285,16 @@ The
|
||||
.Fn taskqueue_drain_all
|
||||
function is used to wait for all pending and running tasks that
|
||||
are enqueued on the taskqueue to finish.
|
||||
The caller must arrange that the tasks are not re-enqueued.
|
||||
Note that
|
||||
Tasks posted to the taskqueue after
|
||||
.Fn taskqueue_drain_all
|
||||
currently does not handle tasks with delayed enqueueing.
|
||||
begins processing,
|
||||
including pending enqueues scheduled by a previous call to
|
||||
.Fn taskqueue_enqueue_timeout ,
|
||||
do not extend the wait time of
|
||||
.Fn taskqueue_drain_all
|
||||
and may complete after
|
||||
.Fn taskqueue_drain_all
|
||||
returns.
|
||||
.Pp
|
||||
The
|
||||
.Fn taskqueue_block
|
||||
|
@ -56,6 +56,8 @@ struct taskqueue_busy {
|
||||
TAILQ_ENTRY(taskqueue_busy) tb_link;
|
||||
};
|
||||
|
||||
struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
|
||||
|
||||
struct taskqueue {
|
||||
STAILQ_HEAD(, task) tq_queue;
|
||||
taskqueue_enqueue_fn tq_enqueue;
|
||||
@ -241,6 +243,7 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
|
||||
/* Return with lock released. */
|
||||
return (0);
|
||||
}
|
||||
|
||||
int
|
||||
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
|
||||
{
|
||||
@ -302,12 +305,82 @@ taskqueue_enqueue_timeout(struct taskqueue *queue,
|
||||
}
|
||||
|
||||
static void
|
||||
taskqueue_drain_running(struct taskqueue *queue)
|
||||
taskqueue_task_nop_fn(void *context, int pending)
|
||||
{
|
||||
}
|
||||
|
||||
while (!TAILQ_EMPTY(&queue->tq_active))
|
||||
TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
|
||||
PWAIT, "-", 0);
|
||||
/*
|
||||
* Block until all currently queued tasks in this taskqueue
|
||||
* have begun execution. Tasks queued during execution of
|
||||
* this function are ignored.
|
||||
*/
|
||||
static void
|
||||
taskqueue_drain_tq_queue(struct taskqueue *queue)
|
||||
{
|
||||
struct task t_barrier;
|
||||
|
||||
if (STAILQ_EMPTY(&queue->tq_queue))
|
||||
return;
|
||||
|
||||
/*
|
||||
* Enqueue our barrier with the lowest possible priority
|
||||
* so we are inserted after all current tasks.
|
||||
*/
|
||||
TASK_INIT(&t_barrier, 0, taskqueue_task_nop_fn, &t_barrier);
|
||||
taskqueue_enqueue_locked(queue, &t_barrier);
|
||||
|
||||
/*
|
||||
* Raise the barrier's priority so newly queued tasks cannot
|
||||
* pass it.
|
||||
*/
|
||||
t_barrier.ta_priority = USHRT_MAX;
|
||||
|
||||
/*
|
||||
* Once the barrier has executed, all previously queued tasks
|
||||
* have completed or are currently executing.
|
||||
*/
|
||||
while (t_barrier.ta_pending != 0)
|
||||
TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Block until all currently executing tasks for this taskqueue
|
||||
* complete. Tasks that begin execution during the execution
|
||||
* of this function are ignored.
|
||||
*/
|
||||
static void
|
||||
taskqueue_drain_tq_active(struct taskqueue *queue)
|
||||
{
|
||||
struct taskqueue_busy tb_marker, *tb_first;
|
||||
|
||||
if (TAILQ_EMPTY(&queue->tq_active))
|
||||
return;
|
||||
|
||||
/* Block taskq_terminate().*/
|
||||
queue->tq_callouts++;
|
||||
|
||||
/*
|
||||
* Wait for all currently executing taskqueue threads
|
||||
* to go idle.
|
||||
*/
|
||||
tb_marker.tb_running = TB_DRAIN_WAITER;
|
||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
|
||||
while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
|
||||
TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
|
||||
TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
|
||||
|
||||
/*
|
||||
* Wakeup any other drain waiter that happened to queue up
|
||||
* without any intervening active thread.
|
||||
*/
|
||||
tb_first = TAILQ_FIRST(&queue->tq_active);
|
||||
if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
|
||||
wakeup(tb_first);
|
||||
|
||||
/* Release taskqueue_terminate(). */
|
||||
queue->tq_callouts--;
|
||||
if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
|
||||
wakeup_one(queue->tq_threads);
|
||||
}
|
||||
|
||||
void
|
||||
@ -334,14 +407,16 @@ static void
|
||||
taskqueue_run_locked(struct taskqueue *queue)
|
||||
{
|
||||
struct taskqueue_busy tb;
|
||||
struct taskqueue_busy *tb_first;
|
||||
struct task *task;
|
||||
int pending;
|
||||
|
||||
TQ_ASSERT_LOCKED(queue);
|
||||
tb.tb_running = NULL;
|
||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
|
||||
|
||||
while (STAILQ_FIRST(&queue->tq_queue)) {
|
||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
|
||||
|
||||
/*
|
||||
* Carefully remove the first task from the queue and
|
||||
* zero its pending count.
|
||||
@ -358,10 +433,13 @@ taskqueue_run_locked(struct taskqueue *queue)
|
||||
TQ_LOCK(queue);
|
||||
tb.tb_running = NULL;
|
||||
wakeup(task);
|
||||
|
||||
TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
|
||||
tb_first = TAILQ_FIRST(&queue->tq_active);
|
||||
if (tb_first != NULL &&
|
||||
tb_first->tb_running == TB_DRAIN_WAITER)
|
||||
wakeup(tb_first);
|
||||
}
|
||||
TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
|
||||
if (TAILQ_EMPTY(&queue->tq_active))
|
||||
wakeup(&queue->tq_active);
|
||||
}
|
||||
|
||||
void
|
||||
@ -448,19 +526,13 @@ taskqueue_drain(struct taskqueue *queue, struct task *task)
|
||||
void
|
||||
taskqueue_drain_all(struct taskqueue *queue)
|
||||
{
|
||||
struct task *task;
|
||||
|
||||
if (!queue->tq_spin)
|
||||
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
|
||||
|
||||
TQ_LOCK(queue);
|
||||
task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
|
||||
if (task != NULL)
|
||||
while (task->ta_pending != 0)
|
||||
TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
|
||||
taskqueue_drain_running(queue);
|
||||
KASSERT(STAILQ_EMPTY(&queue->tq_queue),
|
||||
("taskqueue queue is not empty after draining"));
|
||||
taskqueue_drain_tq_queue(queue);
|
||||
taskqueue_drain_tq_active(queue);
|
||||
TQ_UNLOCK(queue);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user