Implement the delayed task execution extension to the taskqueue
mechanism. The caller may specify a timeout in ticks after which the task will be scheduled. Sponsored by: The FreeBSD Foundation Reviewed by: jeff, jhb MFC after: 1 month
This commit is contained in:
parent
5bd186a65a
commit
b2ad91f26b
@ -61,12 +61,15 @@ struct taskqueue {
|
|||||||
int tq_tcount;
|
int tq_tcount;
|
||||||
int tq_spin;
|
int tq_spin;
|
||||||
int tq_flags;
|
int tq_flags;
|
||||||
|
int tq_callouts;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TQ_FLAGS_ACTIVE (1 << 0)
|
#define TQ_FLAGS_ACTIVE (1 << 0)
|
||||||
#define TQ_FLAGS_BLOCKED (1 << 1)
|
#define TQ_FLAGS_BLOCKED (1 << 1)
|
||||||
#define TQ_FLAGS_PENDING (1 << 2)
|
#define TQ_FLAGS_PENDING (1 << 2)
|
||||||
|
|
||||||
|
#define DT_CALLOUT_ARMED (1 << 0)
|
||||||
|
|
||||||
#define TQ_LOCK(tq) \
|
#define TQ_LOCK(tq) \
|
||||||
do { \
|
do { \
|
||||||
if ((tq)->tq_spin) \
|
if ((tq)->tq_spin) \
|
||||||
@ -83,6 +86,17 @@ struct taskqueue {
|
|||||||
mtx_unlock(&(tq)->tq_mutex); \
|
mtx_unlock(&(tq)->tq_mutex); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
void
|
||||||
|
_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
|
||||||
|
int priority, task_fn_t func, void *context)
|
||||||
|
{
|
||||||
|
|
||||||
|
TASK_INIT(&timeout_task->t, priority, func, context);
|
||||||
|
callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
|
||||||
|
timeout_task->q = queue;
|
||||||
|
timeout_task->f = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static __inline int
|
static __inline int
|
||||||
TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
|
TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
|
||||||
int t)
|
int t)
|
||||||
@ -129,7 +143,7 @@ static void
|
|||||||
taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
|
taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
|
||||||
{
|
{
|
||||||
|
|
||||||
while (tq->tq_tcount > 0) {
|
while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
|
||||||
wakeup(tq);
|
wakeup(tq);
|
||||||
TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
|
TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
|
||||||
}
|
}
|
||||||
@ -143,26 +157,24 @@ taskqueue_free(struct taskqueue *queue)
|
|||||||
queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
|
queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
|
||||||
taskqueue_terminate(queue->tq_threads, queue);
|
taskqueue_terminate(queue->tq_threads, queue);
|
||||||
KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
|
KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
|
||||||
|
KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
|
||||||
mtx_destroy(&queue->tq_mutex);
|
mtx_destroy(&queue->tq_mutex);
|
||||||
free(queue->tq_threads, M_TASKQUEUE);
|
free(queue->tq_threads, M_TASKQUEUE);
|
||||||
free(queue, M_TASKQUEUE);
|
free(queue, M_TASKQUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
static int
|
||||||
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
|
taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
|
||||||
{
|
{
|
||||||
struct task *ins;
|
struct task *ins;
|
||||||
struct task *prev;
|
struct task *prev;
|
||||||
|
|
||||||
TQ_LOCK(queue);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Count multiple enqueues.
|
* Count multiple enqueues.
|
||||||
*/
|
*/
|
||||||
if (task->ta_pending) {
|
if (task->ta_pending) {
|
||||||
task->ta_pending++;
|
task->ta_pending++;
|
||||||
TQ_UNLOCK(queue);
|
return (0);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -190,9 +202,60 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
|
|||||||
else
|
else
|
||||||
queue->tq_flags |= TQ_FLAGS_PENDING;
|
queue->tq_flags |= TQ_FLAGS_PENDING;
|
||||||
|
|
||||||
|
return (0);
|
||||||
|
}
|
||||||
|
int
|
||||||
|
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
|
||||||
|
TQ_LOCK(queue);
|
||||||
|
res = taskqueue_enqueue_locked(queue, task);
|
||||||
TQ_UNLOCK(queue);
|
TQ_UNLOCK(queue);
|
||||||
|
|
||||||
return 0;
|
return (res);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
taskqueue_timeout_func(void *arg)
|
||||||
|
{
|
||||||
|
struct taskqueue *queue;
|
||||||
|
struct timeout_task *timeout_task;
|
||||||
|
|
||||||
|
timeout_task = arg;
|
||||||
|
queue = timeout_task->q;
|
||||||
|
KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
|
||||||
|
timeout_task->f &= ~DT_CALLOUT_ARMED;
|
||||||
|
queue->tq_callouts--;
|
||||||
|
taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
taskqueue_enqueue_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task, int ticks)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
|
||||||
|
TQ_LOCK(queue);
|
||||||
|
KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
|
||||||
|
("Migrated queue"));
|
||||||
|
KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
|
||||||
|
timeout_task->q = queue;
|
||||||
|
res = timeout_task->t.ta_pending;
|
||||||
|
if (ticks == 0) {
|
||||||
|
taskqueue_enqueue_locked(queue, &timeout_task->t);
|
||||||
|
} else {
|
||||||
|
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
|
||||||
|
res++;
|
||||||
|
} else {
|
||||||
|
queue->tq_callouts++;
|
||||||
|
timeout_task->f |= DT_CALLOUT_ARMED;
|
||||||
|
}
|
||||||
|
callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
|
||||||
|
timeout_task);
|
||||||
|
}
|
||||||
|
TQ_UNLOCK(queue);
|
||||||
|
return (res);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -271,6 +334,19 @@ task_is_running(struct taskqueue *queue, struct task *task)
|
|||||||
return (0);
|
return (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
|
||||||
|
u_int *pendp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (task->ta_pending > 0)
|
||||||
|
STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
|
||||||
|
if (pendp != NULL)
|
||||||
|
*pendp = task->ta_pending;
|
||||||
|
task->ta_pending = 0;
|
||||||
|
return (task_is_running(queue, task) ? EBUSY : 0);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
|
taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
|
||||||
{
|
{
|
||||||
@ -278,14 +354,31 @@ taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
|
|||||||
int error;
|
int error;
|
||||||
|
|
||||||
TQ_LOCK(queue);
|
TQ_LOCK(queue);
|
||||||
if ((pending = task->ta_pending) > 0)
|
pending = task->ta_pending;
|
||||||
STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
|
error = taskqueue_cancel_locked(queue, task, pendp);
|
||||||
task->ta_pending = 0;
|
TQ_UNLOCK(queue);
|
||||||
error = task_is_running(queue, task) ? EBUSY : 0;
|
|
||||||
|
return (error);
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
taskqueue_cancel_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task, u_int *pendp)
|
||||||
|
{
|
||||||
|
u_int pending, pending1;
|
||||||
|
int error;
|
||||||
|
|
||||||
|
TQ_LOCK(queue);
|
||||||
|
pending = !!callout_stop(&timeout_task->c);
|
||||||
|
error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
|
||||||
|
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
|
||||||
|
timeout_task->f &= ~DT_CALLOUT_ARMED;
|
||||||
|
queue->tq_callouts--;
|
||||||
|
}
|
||||||
TQ_UNLOCK(queue);
|
TQ_UNLOCK(queue);
|
||||||
|
|
||||||
if (pendp != NULL)
|
if (pendp != NULL)
|
||||||
*pendp = pending;
|
*pendp = pending + pending1;
|
||||||
return (error);
|
return (error);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,6 +395,15 @@ taskqueue_drain(struct taskqueue *queue, struct task *task)
|
|||||||
TQ_UNLOCK(queue);
|
TQ_UNLOCK(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
taskqueue_drain_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task)
|
||||||
|
{
|
||||||
|
|
||||||
|
callout_drain(&timeout_task->c);
|
||||||
|
taskqueue_drain(queue, &timeout_task->t);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
taskqueue_swi_enqueue(void *context)
|
taskqueue_swi_enqueue(void *context)
|
||||||
{
|
{
|
||||||
|
61
sys/sys/_callout.h
Normal file
61
sys/sys/_callout.h
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
/*-
|
||||||
|
* Copyright (c) 1990, 1993
|
||||||
|
* The Regents of the University of California. All rights reserved.
|
||||||
|
* (c) UNIX System Laboratories, Inc.
|
||||||
|
* All or some portions of this file are derived from material licensed
|
||||||
|
* to the University of California by American Telephone and Telegraph
|
||||||
|
* Co. or Unix System Laboratories, Inc. and are reproduced herein with
|
||||||
|
* the permission of UNIX System Laboratories, Inc.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions
|
||||||
|
* are met:
|
||||||
|
* 1. Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
* 2. Redistributions in binary form must reproduce the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer in the
|
||||||
|
* documentation and/or other materials provided with the distribution.
|
||||||
|
* 4. Neither the name of the University nor the names of its contributors
|
||||||
|
* may be used to endorse or promote products derived from this software
|
||||||
|
* without specific prior written permission.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
||||||
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||||
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||||
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
||||||
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||||
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
||||||
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||||
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||||
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
||||||
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||||||
|
* SUCH DAMAGE.
|
||||||
|
*
|
||||||
|
* @(#)callout.h 8.2 (Berkeley) 1/21/94
|
||||||
|
* $FreeBSD$
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _SYS__CALLOUT_H
|
||||||
|
#define _SYS__CALLOUT_H
|
||||||
|
|
||||||
|
#include <sys/queue.h>
|
||||||
|
|
||||||
|
struct lock_object;
|
||||||
|
|
||||||
|
SLIST_HEAD(callout_list, callout);
|
||||||
|
TAILQ_HEAD(callout_tailq, callout);
|
||||||
|
|
||||||
|
struct callout {
|
||||||
|
union {
|
||||||
|
SLIST_ENTRY(callout) sle;
|
||||||
|
TAILQ_ENTRY(callout) tqe;
|
||||||
|
} c_links;
|
||||||
|
int c_time; /* ticks to the event */
|
||||||
|
void *c_arg; /* function argument */
|
||||||
|
void (*c_func)(void *); /* function to call */
|
||||||
|
struct lock_object *c_lock; /* lock to handle */
|
||||||
|
int c_flags; /* state of this entry */
|
||||||
|
volatile int c_cpu; /* CPU we're scheduled on */
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
@ -38,25 +38,7 @@
|
|||||||
#ifndef _SYS_CALLOUT_H_
|
#ifndef _SYS_CALLOUT_H_
|
||||||
#define _SYS_CALLOUT_H_
|
#define _SYS_CALLOUT_H_
|
||||||
|
|
||||||
#include <sys/queue.h>
|
#include <sys/_callout.h>
|
||||||
|
|
||||||
struct lock_object;
|
|
||||||
|
|
||||||
SLIST_HEAD(callout_list, callout);
|
|
||||||
TAILQ_HEAD(callout_tailq, callout);
|
|
||||||
|
|
||||||
struct callout {
|
|
||||||
union {
|
|
||||||
SLIST_ENTRY(callout) sle;
|
|
||||||
TAILQ_ENTRY(callout) tqe;
|
|
||||||
} c_links;
|
|
||||||
int c_time; /* ticks to the event */
|
|
||||||
void *c_arg; /* function argument */
|
|
||||||
void (*c_func)(void *); /* function to call */
|
|
||||||
struct lock_object *c_lock; /* lock to handle */
|
|
||||||
int c_flags; /* state of this entry */
|
|
||||||
volatile int c_cpu; /* CPU we're scheduled on */
|
|
||||||
};
|
|
||||||
|
|
||||||
#define CALLOUT_LOCAL_ALLOC 0x0001 /* was allocated from callfree */
|
#define CALLOUT_LOCAL_ALLOC 0x0001 /* was allocated from callfree */
|
||||||
#define CALLOUT_ACTIVE 0x0002 /* callout is currently active */
|
#define CALLOUT_ACTIVE 0x0002 /* callout is currently active */
|
||||||
|
@ -35,10 +35,18 @@
|
|||||||
|
|
||||||
#include <sys/queue.h>
|
#include <sys/queue.h>
|
||||||
#include <sys/_task.h>
|
#include <sys/_task.h>
|
||||||
|
#include <sys/_callout.h>
|
||||||
|
|
||||||
struct taskqueue;
|
struct taskqueue;
|
||||||
struct thread;
|
struct thread;
|
||||||
|
|
||||||
|
struct timeout_task {
|
||||||
|
struct taskqueue *q;
|
||||||
|
struct task t;
|
||||||
|
struct callout c;
|
||||||
|
int f;
|
||||||
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A notification callback function which is called from
|
* A notification callback function which is called from
|
||||||
* taskqueue_enqueue(). The context argument is given in the call to
|
* taskqueue_enqueue(). The context argument is given in the call to
|
||||||
@ -54,9 +62,15 @@ struct taskqueue *taskqueue_create(const char *name, int mflags,
|
|||||||
int taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
|
int taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
|
||||||
const char *name, ...) __printflike(4, 5);
|
const char *name, ...) __printflike(4, 5);
|
||||||
int taskqueue_enqueue(struct taskqueue *queue, struct task *task);
|
int taskqueue_enqueue(struct taskqueue *queue, struct task *task);
|
||||||
|
int taskqueue_enqueue_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task, int ticks);
|
||||||
int taskqueue_cancel(struct taskqueue *queue, struct task *task,
|
int taskqueue_cancel(struct taskqueue *queue, struct task *task,
|
||||||
u_int *pendp);
|
u_int *pendp);
|
||||||
|
int taskqueue_cancel_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task, u_int *pendp);
|
||||||
void taskqueue_drain(struct taskqueue *queue, struct task *task);
|
void taskqueue_drain(struct taskqueue *queue, struct task *task);
|
||||||
|
void taskqueue_drain_timeout(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task);
|
||||||
void taskqueue_free(struct taskqueue *queue);
|
void taskqueue_free(struct taskqueue *queue);
|
||||||
void taskqueue_run(struct taskqueue *queue);
|
void taskqueue_run(struct taskqueue *queue);
|
||||||
void taskqueue_block(struct taskqueue *queue);
|
void taskqueue_block(struct taskqueue *queue);
|
||||||
@ -79,6 +93,12 @@ void taskqueue_thread_enqueue(void *context);
|
|||||||
(task)->ta_context = (context); \
|
(task)->ta_context = (context); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
void _timeout_task_init(struct taskqueue *queue,
|
||||||
|
struct timeout_task *timeout_task, int priority, task_fn_t func,
|
||||||
|
void *context);
|
||||||
|
#define TIMEOUT_TASK_INIT(queue, timeout_task, priority, func, context) \
|
||||||
|
_timeout_task_init(queue, timeout_task, priority, func, context);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Declare a reference to a taskqueue.
|
* Declare a reference to a taskqueue.
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user