Extend taskqueue(9) to enable per-taskqueue callbacks.

The scope of these callbacks is primarily to support actions that affect the
taskqueue's thread environments.  They are entirely optional, and
consequently are introduced as a new API: taskqueue_set_callback().

This interface allows the caller to specify that a taskqueue requires a
callback and optional context pointer for a given callback type.

The callback types included in this commit can be used to register a
constructor and destructor for thread-local storage using osd(9).  This
allows a particular taskqueue to define that its threads require a specific
type of TLS, without the need for a specially-orchestrated task-based
mechanism for startup and shutdown in order to accomplish it.

Two callback types are supported at this point:

- TASKQUEUE_CALLBACK_TYPE_INIT, called by every thread when it starts, prior
  to processing any tasks.
- TASKQUEUE_CALLBACK_TYPE_SHUTDOWN, called by every thread when it exits,
  after it has processed its last task but before the taskqueue is
  reclaimed.

While I'm here:

- Add two new macros, TQ_ASSERT_LOCKED and TQ_ASSERT_UNLOCKED, and use them
  in appropriate locations.
- Fix taskqueue.9 to mention taskqueue_start_threads(), which is a required
  interface for all consumers of taskqueue(9).

Reviewed by:	kib (all), eadler (taskqueue.9), brd (taskqueue.9)
Approved by:	ken (mentor)
Sponsored by:	Spectra Logic
MFC after:	1 month
This commit is contained in:
will 2013-03-23 15:11:53 +00:00
parent a4c39d4efd
commit 5d3a27c743
3 changed files with 87 additions and 3 deletions

View File

@ -53,12 +53,23 @@ struct task {
void *ta_context; /* argument for handler */
};
enum taskqueue_callback_type {
TASKQUEUE_CALLBACK_TYPE_INIT,
TASKQUEUE_CALLBACK_TYPE_SHUTDOWN,
};
typedef void (*taskqueue_callback_fn)(void *context);
struct timeout_task;
.Ed
.Ft struct taskqueue *
.Fn taskqueue_create "const char *name" "int mflags" "taskqueue_enqueue_fn enqueue" "void *context"
.Ft struct taskqueue *
.Fn taskqueue_create_fast "const char *name" "int mflags" "taskqueue_enqueue_fn enqueue" "void *context"
.Ft int
.Fn taskqueue_start_threads "struct taskqueue **tqp" "int count" "int pri" "const char *name" "..."
.Ft void
.Fn taskqueue_set_callback "struct taskqueue *queue" "enum taskqueue_callback_type cb_type" "taskqueue_callback_fn callback" "void *context"
.Ft void
.Fn taskqueue_free "struct taskqueue *queue"
.Ft int
@ -127,6 +138,23 @@ should be used to free the memory used by the queue.
Any tasks that are on the queue will be executed at this time after
which the thread servicing the queue will be signaled that it should exit.
.Pp
Once a taskqueue has been created, its threads should be started using
.Fn taskqueue_start_threads .
Callbacks may optionally be registered using
.Fn taskqueue_set_callback .
Currently, callbacks may be registered for the following purposes:
.Bl -tag -width TASKQUEUE_CALLBACK_TYPE_SHUTDOWN
.It Dv TASKQUEUE_CALLBACK_TYPE_INIT
This callback is called by every thread in the taskqueue, before it executes
any tasks.
This callback must be set before the taskqueue's threads are started.
.It Dv TASKQUEUE_CALLBACK_TYPE_SHUTDOWN
This callback is called by every thread in the taskqueue, after it executes
its last task.
This callback will always be called before the taskqueue structure is
reclaimed.
.El
.Pp
To add a task to the list of tasks queued on a taskqueue, call
.Fn taskqueue_enqueue
with pointers to the queue and task.

View File

@ -63,6 +63,8 @@ struct taskqueue {
int tq_spin;
int tq_flags;
int tq_callouts;
taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
};
#define TQ_FLAGS_ACTIVE (1 << 0)
@ -78,6 +80,7 @@ struct taskqueue {
else \
mtx_lock(&(tq)->tq_mutex); \
} while (0)
#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
#define TQ_UNLOCK(tq) \
do { \
@ -86,6 +89,7 @@ struct taskqueue {
else \
mtx_unlock(&(tq)->tq_mutex); \
} while (0)
#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
void
_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
@ -137,6 +141,23 @@ taskqueue_create(const char *name, int mflags,
MTX_DEF, "taskqueue");
}
void
taskqueue_set_callback(struct taskqueue *queue,
enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
void *context)
{
KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
(cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
("Callback type %d not valid, must be %d-%d", cb_type,
TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
KASSERT((queue->tq_callbacks[cb_type] == NULL),
("Re-initialization of taskqueue callback?"));
queue->tq_callbacks[cb_type] = callback;
queue->tq_cb_contexts[cb_type] = context;
}
/*
* Signal a taskqueue thread to terminate.
*/
@ -293,7 +314,7 @@ taskqueue_run_locked(struct taskqueue *queue)
struct task *task;
int pending;
mtx_assert(&queue->tq_mutex, MA_OWNED);
TQ_ASSERT_LOCKED(queue);
tb.tb_running = NULL;
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
@ -332,7 +353,7 @@ task_is_running(struct taskqueue *queue, struct task *task)
{
struct taskqueue_busy *tb;
mtx_assert(&queue->tq_mutex, MA_OWNED);
TQ_ASSERT_LOCKED(queue);
TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
if (tb->tb_running == task)
return (1);
@ -489,6 +510,18 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
return (0);
}
static inline void
taskqueue_run_callback(struct taskqueue *tq,
enum taskqueue_callback_type cb_type)
{
taskqueue_callback_fn tq_callback;
TQ_ASSERT_UNLOCKED(tq);
tq_callback = tq->tq_callbacks[cb_type];
if (tq_callback != NULL)
tq_callback(tq->tq_cb_contexts[cb_type]);
}
void
taskqueue_thread_loop(void *arg)
{
@ -496,6 +529,7 @@ taskqueue_thread_loop(void *arg)
tqp = arg;
tq = *tqp;
taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
TQ_LOCK(tq);
while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
taskqueue_run_locked(tq);
@ -510,6 +544,15 @@ taskqueue_thread_loop(void *arg)
}
taskqueue_run_locked(tq);
/*
* This thread is on its way out, so just drop the lock temporarily
* in order to call the shutdown callback. This allows the callback
* to look at the taskqueue, even just before it dies.
*/
TQ_UNLOCK(tq);
taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
TQ_LOCK(tq);
/* rendezvous with thread that asked us to terminate */
tq->tq_tcount--;
wakeup_one(tq->tq_threads);
@ -525,7 +568,7 @@ taskqueue_thread_enqueue(void *context)
tqp = context;
tq = *tqp;
mtx_assert(&tq->tq_mutex, MA_OWNED);
TQ_ASSERT_LOCKED(tq);
wakeup_one(tq);
}

View File

@ -47,6 +47,16 @@ struct timeout_task {
int f;
};
enum taskqueue_callback_type {
TASKQUEUE_CALLBACK_TYPE_INIT,
TASKQUEUE_CALLBACK_TYPE_SHUTDOWN,
};
#define TASKQUEUE_CALLBACK_TYPE_MIN TASKQUEUE_CALLBACK_TYPE_INIT
#define TASKQUEUE_CALLBACK_TYPE_MAX TASKQUEUE_CALLBACK_TYPE_SHUTDOWN
#define TASKQUEUE_NUM_CALLBACKS TASKQUEUE_CALLBACK_TYPE_MAX + 1
typedef void (*taskqueue_callback_fn)(void *context);
/*
* A notification callback function which is called from
* taskqueue_enqueue(). The context argument is given in the call to
@ -76,6 +86,9 @@ void taskqueue_run(struct taskqueue *queue);
void taskqueue_block(struct taskqueue *queue);
void taskqueue_unblock(struct taskqueue *queue);
int taskqueue_member(struct taskqueue *queue, struct thread *td);
void taskqueue_set_callback(struct taskqueue *queue,
enum taskqueue_callback_type cb_type,
taskqueue_callback_fn callback, void *context);
#define TASK_INITIALIZER(priority, func, context) \
{ .ta_pending = 0, \