LinuxKPI: Implement kthread_worker related functions

Kthread worker is a single thread workqueue which can be used in cases
where specific kthread association is necessary, for example, when it
should have RT priority or be assigned to certain cgroup.

This change implements Linux v4.9 interface which mostly hides kthread
internals from users thus allowing to use ordinary taskqueue(9) KPI.
As kthread worker prohibits enqueueing of already pending or canceling
tasks some minimal changes to taskqueue(9) were done.
taskqueue_enqueue_flags() was added to taskqueue KPI which accepts extra
flags parameter. It contains one or more of the following flags:

TASKQUEUE_FAIL_IF_PENDING - taskqueue_enqueue_flags() fails if the task
    is already scheduled to execution. EEXIST is returned and the
    ta_pending counter value remains unchanged.
TASKQUEUE_FAIL_IF_CANCELING - taskqueue_enqueue_flags() fails if the
    task is in the canceling state and ECANCELED is returned.

Required by:	drm-kmod 5.10

MFC after:	1 week
Reviewed by:	hselasky, Pau Amma (docs)
Differential Revision:	https://reviews.freebsd.org/D35051
This commit is contained in:
Vladimir Kondratyev 2022-05-17 15:10:20 +03:00
parent 0093bc3cd1
commit b6f87b78b5
5 changed files with 196 additions and 23 deletions

View File

@ -28,7 +28,7 @@
.\"
.\" $FreeBSD$
.\"
.Dd September 1, 2021
.Dd April 25, 2022
.Dt TASKQUEUE 9
.Os
.Sh NAME
@ -85,6 +85,8 @@ struct timeout_task;
.Ft int
.Fn taskqueue_enqueue "struct taskqueue *queue" "struct task *task"
.Ft int
.Fn taskqueue_enqueue_flags "struct taskqueue *queue" "struct task *task" "int flags"
.Ft int
.Fn taskqueue_enqueue_timeout "struct taskqueue *queue" "struct timeout_task *timeout_task" "int ticks"
.Ft int
.Fn taskqueue_enqueue_timeout_sbt "struct taskqueue *queue" "struct timeout_task *timeout_task" "sbintime_t sbt" "sbintime_t pr" "int flags"
@ -225,6 +227,28 @@ is called on the task pointer passed to
.Fn taskqueue_enqueue .
.Pp
The
.Fn taskqueue_enqueue_flags
accepts an extra
.Va flags
parameter which specifies a set of optional flags to alter the behavior of
.Fn taskqueue_enqueue .
It contains one or more of the following flags:
.Bl -tag -width TASKQUEUE_FAIL_IF_CANCELING
.It Dv TASKQUEUE_FAIL_IF_PENDING
.Fn taskqueue_enqueue_flags
fails if the task is already scheduled for execution.
.Er EEXIST
is returned and the
.Va ta_pending
counter value remains unchanged.
.It Dv TASKQUEUE_FAIL_IF_CANCELING
.Fn taskqueue_enqueue_flags
fails if the task is in the canceling state and
.Er ECANCELED
is returned.
.El
.Pp
The
.Fn taskqueue_enqueue_timeout
function is used to schedule the enqueue after the specified number of
.Va ticks .

View File

@ -33,8 +33,29 @@
#include <linux/sched.h>
#include <sys/unistd.h>
#include <sys/param.h>
#include <sys/kernel.h>
#include <sys/kthread.h>
#include <sys/malloc.h>
#include <sys/queue.h>
#include <sys/taskqueue.h>
#include <sys/unistd.h>
struct task_struct;
struct kthread_work;
typedef void (*kthread_work_func_t)(struct kthread_work *work);
struct kthread_worker {
struct task_struct *task;
struct taskqueue *tq;
};
struct kthread_work {
struct taskqueue *tq;
struct task task;
kthread_work_func_t func;
};
#define kthread_run(fn, data, fmt, ...) ({ \
struct task_struct *__task; \
@ -70,4 +91,78 @@ int linux_in_atomic(void);
#define in_atomic() linux_in_atomic()
/* Only kthread_(create|destroy)_worker interface is allowed */
#define kthread_init_worker(worker) \
_Static_assert(false, "pre-4.9 worker interface is not supported");
task_fn_t lkpi_kthread_work_fn;
task_fn_t lkpi_kthread_worker_init_fn;
#define kthread_create_worker(flags, fmt, ...) ({ \
struct kthread_worker *__w; \
struct task __task; \
\
__w = malloc(sizeof(*__w), M_KMALLOC, M_WAITOK | M_ZERO); \
__w->tq = taskqueue_create("lkpi kthread taskq", M_WAITOK, \
taskqueue_thread_enqueue, &__w->tq); \
taskqueue_start_threads(&__w->tq, 1, PWAIT, fmt, ##__VA_ARGS__);\
TASK_INIT(&__task, 0, lkpi_kthread_worker_init_fn, __w); \
taskqueue_enqueue(__w->tq, &__task); \
taskqueue_drain(__w->tq, &__task); \
__w; \
})
static inline void
kthread_destroy_worker(struct kthread_worker *worker)
{
taskqueue_drain_all(worker->tq);
taskqueue_free(worker->tq);
free(worker, M_KMALLOC);
}
static inline void
kthread_init_work(struct kthread_work *work, kthread_work_func_t func)
{
work->tq = NULL;
work->func = func;
TASK_INIT(&work->task, 0, lkpi_kthread_work_fn, work);
}
static inline bool
kthread_queue_work(struct kthread_worker *worker, struct kthread_work *work)
{
int error;
error = taskqueue_enqueue_flags(worker->tq, &work->task,
TASKQUEUE_FAIL_IF_CANCELING | TASKQUEUE_FAIL_IF_PENDING);
if (error == 0)
work->tq = worker->tq;
return (error == 0);
}
static inline bool
kthread_cancel_work_sync(struct kthread_work *work)
{
u_int pending = 0;
if (work->tq != NULL &&
taskqueue_cancel(work->tq, &work->task, &pending) != 0)
taskqueue_drain(work->tq, &work->task);
return (pending != 0);
}
static inline void
kthread_flush_work(struct kthread_work *work)
{
if (work->tq != NULL)
taskqueue_drain(work->tq, &work->task);
}
static inline void
kthread_flush_worker(struct kthread_worker *worker)
{
taskqueue_drain_all(worker->tq);
}
#endif /* _LINUXKPI_LINUX_KTHREAD_H_ */

View File

@ -165,3 +165,19 @@ linux_kthread_fn(void *arg __unused)
}
kthread_exit();
}
void
lkpi_kthread_work_fn(void *context, int pending __unused)
{
struct kthread_work *work = context;
work->func(work);
}
void
lkpi_kthread_worker_init_fn(void *context, int pending __unused)
{
struct kthread_worker *worker = context;
worker->task = current;
}

View File

@ -59,6 +59,7 @@ static void taskqueue_swi_giant_enqueue(void *);
struct taskqueue_busy {
struct task *tb_running;
u_int tb_seq;
bool tb_canceling;
LIST_ENTRY(taskqueue_busy) tb_link;
};
@ -125,6 +126,19 @@ TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
return (msleep(p, &tq->tq_mutex, 0, wm, 0));
}
static struct taskqueue_busy *
task_get_busy(struct taskqueue *queue, struct task *task)
{
struct taskqueue_busy *tb;
TQ_ASSERT_LOCKED(queue);
LIST_FOREACH(tb, &queue->tq_active, tb_link) {
if (tb->tb_running == task)
return (tb);
}
return (NULL);
}
static struct taskqueue *
_taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueue, void *context,
@ -217,16 +231,32 @@ taskqueue_free(struct taskqueue *queue)
}
static int
taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags)
{
struct task *ins;
struct task *prev;
struct taskqueue_busy *tb;
KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
/*
* Ignore canceling task if requested.
*/
if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) {
tb = task_get_busy(queue, task);
if (tb != NULL && tb->tb_canceling) {
TQ_UNLOCK(queue);
return (ECANCELED);
}
}
/*
* Count multiple enqueues.
*/
if (task->ta_pending) {
if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) {
TQ_UNLOCK(queue);
return (EEXIST);
}
if (task->ta_pending < USHRT_MAX)
task->ta_pending++;
TQ_UNLOCK(queue);
@ -274,17 +304,23 @@ taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
}
int
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags)
{
int res;
TQ_LOCK(queue);
res = taskqueue_enqueue_locked(queue, task);
res = taskqueue_enqueue_locked(queue, task, flags);
/* The lock is released inside. */
return (res);
}
int
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
{
return (taskqueue_enqueue_flags(queue, task, 0));
}
static void
taskqueue_timeout_func(void *arg)
{
@ -296,7 +332,7 @@ taskqueue_timeout_func(void *arg)
KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
timeout_task->f &= ~DT_CALLOUT_ARMED;
queue->tq_callouts--;
taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0);
/* The lock is released inside. */
}
@ -316,7 +352,7 @@ taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
TQ_UNLOCK(queue);
res = -1;
} else if (sbt == 0) {
taskqueue_enqueue_locked(queue, &timeout_task->t);
taskqueue_enqueue_locked(queue, &timeout_task->t, 0);
/* The lock is released inside. */
} else {
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
@ -464,6 +500,7 @@ taskqueue_run_locked(struct taskqueue *queue)
task->ta_pending = 0;
tb.tb_running = task;
tb.tb_seq = ++queue->tq_seq;
tb.tb_canceling = false;
TQ_UNLOCK(queue);
KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
@ -493,19 +530,6 @@ taskqueue_run(struct taskqueue *queue)
TQ_UNLOCK(queue);
}
static int
task_is_running(struct taskqueue *queue, struct task *task)
{
struct taskqueue_busy *tb;
TQ_ASSERT_LOCKED(queue);
LIST_FOREACH(tb, &queue->tq_active, tb_link) {
if (tb->tb_running == task)
return (1);
}
return (0);
}
/*
* Only use this function in single threaded contexts. It returns
* non-zero if the given task is either pending or running. Else the
@ -517,7 +541,7 @@ taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
int retval;
TQ_LOCK(queue);
retval = task->ta_pending > 0 || task_is_running(queue, task);
retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL;
TQ_UNLOCK(queue);
return (retval);
@ -527,6 +551,8 @@ static int
taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
u_int *pendp)
{
struct taskqueue_busy *tb;
int retval = 0;
if (task->ta_pending > 0) {
STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
@ -536,7 +562,13 @@ taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
if (pendp != NULL)
*pendp = task->ta_pending;
task->ta_pending = 0;
return (task_is_running(queue, task) ? EBUSY : 0);
tb = task_get_busy(queue, task);
if (tb != NULL) {
tb->tb_canceling = true;
retval = EBUSY;
}
return (retval);
}
int
@ -580,7 +612,7 @@ taskqueue_drain(struct taskqueue *queue, struct task *task)
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
TQ_LOCK(queue);
while (task->ta_pending != 0 || task_is_running(queue, task))
while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL)
TQ_SLEEP(queue, task, "tq_drain");
TQ_UNLOCK(queue);
}

View File

@ -61,6 +61,10 @@ enum taskqueue_callback_type {
#define TASKQUEUE_NUM_CALLBACKS TASKQUEUE_CALLBACK_TYPE_MAX + 1
#define TASKQUEUE_NAMELEN 32
/* taskqueue_enqueue flags */
#define TASKQUEUE_FAIL_IF_PENDING (1 << 0)
#define TASKQUEUE_FAIL_IF_CANCELING (1 << 1)
typedef void (*taskqueue_callback_fn)(void *context);
/*
@ -82,6 +86,8 @@ int taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count,
int taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count,
int pri, cpuset_t *mask, const char *name, ...) __printflike(5, 6);
int taskqueue_enqueue(struct taskqueue *queue, struct task *task);
int taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task,
int flags);
int taskqueue_enqueue_timeout(struct taskqueue *queue,
struct timeout_task *timeout_task, int ticks);
int taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,