Update the LinuxKPI RCU and SRCU wrappers for the concurrency kit, CK.

- Optimise the RCU implementation to not allocate and free
ck_epoch_records during runtime. Instead allocate two sets of
ck_epoch_records per CPU for general purpose use. The first set is
only used for reader locks and the second set is only used for
synchronization and barriers and is protected with a regular mutex to
prevent simultaneous issues.

- Move the task structure away from the rcu_head structure and into
the per-CPU structures. This allows the size of the rcu_head structure
to be reduced down to the size of two pointers.

- Fix a bug where the linux_rcu_barrier() function only waited for one
per-CPU epoch record to be completed instead of all.

- Use a critical section or a mutex to protect ck_epoch_begin() and
ck_epoch_end() depending on RCU or SRCU type. All the ck_epoch_xxx()
functions, except ck_epoch_register(), ck_epoch_unregister() and
ck_epoch_recycle() are not re-entrant and needs a critical section or
a mutex to operate in the LinuxKPI, after inspecting the CK
implementation of the above mentioned functions. The simultaneous
issues arise from per-CPU epoch records being shared between multiple
threads depending on the amount of taskswitching and how many threads
are involved with the RCU and SRCU operations.

- Properly free all epoch records by using safe list traversal at
LinuxKPI module unload. It turns out the ck_epoch_recycle() always
have the records on an internal list and use a flag in the epoch
record to track allocated and free entries. This would lead to use
after free during module unload.

- Remove redundant synchronize_rcu() call from the
linux_compat_uninit() function. Let the linux_rcu_runtime_uninit()
function do the final rcu_barrier() instead.

MFC after:		1 week
Sponsored by:		Mellanox Technologies
This commit is contained in:
Hans Petter Selasky 2017-03-03 16:28:03 +00:00
parent 1df22ae01f
commit 1f827dab9e
4 changed files with 204 additions and 90 deletions

View File

@ -29,9 +29,9 @@
#ifndef _LINUX_SRCU_H_
#define _LINUX_SRCU_H_
struct ck_epoch_record;
struct srcu_epoch_record;
struct srcu_struct {
struct ck_epoch_record *ss_epoch_record;
struct srcu_epoch_record *ss_epoch_record;
};
#define srcu_dereference(ptr,srcu) ((__typeof(*(ptr)) *)(ptr))
@ -41,6 +41,7 @@ struct srcu_struct {
extern int srcu_read_lock(struct srcu_struct *);
extern void srcu_read_unlock(struct srcu_struct *, int index);
extern void synchronize_srcu(struct srcu_struct *);
extern void srcu_barrier(struct srcu_struct *);
extern int init_srcu_struct(struct srcu_struct *);
extern void cleanup_srcu_struct(struct srcu_struct *);
extern void srcu_barrier(struct srcu_struct *);

View File

@ -2,7 +2,7 @@
* Copyright (c) 2010 Isilon Systems, Inc.
* Copyright (c) 2010 iX Systems, Inc.
* Copyright (c) 2010 Panasas, Inc.
* Copyright (c) 2013, 2014 Mellanox Technologies, Ltd.
* Copyright (c) 2013-2017 Mellanox Technologies, Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -65,7 +65,7 @@ typedef u64 phys_addr_t;
unsigned long n[howmany(bits, sizeof(long) * 8)]
struct rcu_head {
void *raw[8];
void *raw[2];
} __aligned(sizeof(void *));
typedef void (*rcu_callback_t)(struct rcu_head *head);

View File

@ -69,7 +69,6 @@ __FBSDID("$FreeBSD$");
#include <linux/netdevice.h>
#include <linux/timer.h>
#include <linux/workqueue.h>
#include <linux/rcupdate.h>
#include <linux/interrupt.h>
#include <linux/uaccess.h>
#include <linux/kernel.h>
@ -1503,8 +1502,6 @@ linux_compat_uninit(void *arg)
linux_kobject_kfree_name(&linux_class_root);
linux_kobject_kfree_name(&linux_root_device.kobj);
linux_kobject_kfree_name(&linux_class_misc.kobj);
synchronize_rcu();
}
SYSUNINIT(linux_compat, SI_SUB_DRIVERS, SI_ORDER_SECOND, linux_compat_uninit, NULL);

View File

@ -46,11 +46,24 @@ __FBSDID("$FreeBSD$");
#include <linux/slab.h>
#include <linux/kernel.h>
struct callback_head {
ck_epoch_entry_t epoch_entry;
rcu_callback_t func;
ck_epoch_record_t *epoch_record;
struct callback_head;
struct writer_epoch_record {
ck_epoch_record_t epoch_record;
struct mtx head_lock;
struct mtx sync_lock;
struct task task;
STAILQ_HEAD(, callback_head) head;
} __aligned(CACHE_LINE_SIZE);
struct callback_head {
STAILQ_ENTRY(callback_head) entry;
rcu_callback_t func;
};
struct srcu_epoch_record {
ck_epoch_record_t epoch_record;
struct mtx read_lock;
struct mtx sync_lock;
};
/*
@ -61,33 +74,55 @@ struct callback_head {
*/
CTASSERT(sizeof(struct rcu_head) >= sizeof(struct callback_head));
/*
* Verify that "epoch_record" is at beginning of "struct
* writer_epoch_record":
*/
CTASSERT(offsetof(struct writer_epoch_record, epoch_record) == 0);
/*
* Verify that "epoch_record" is at beginning of "struct
* srcu_epoch_record":
*/
CTASSERT(offsetof(struct srcu_epoch_record, epoch_record) == 0);
static ck_epoch_t linux_epoch;
static MALLOC_DEFINE(M_LRCU, "lrcu", "Linux RCU");
static DPCPU_DEFINE(ck_epoch_record_t *, epoch_record);
static MALLOC_DEFINE(M_LRCU, "lrcu", "Linux RCU");
static DPCPU_DEFINE(ck_epoch_record_t *, linux_reader_epoch_record);
static DPCPU_DEFINE(struct writer_epoch_record *, linux_writer_epoch_record);
static void linux_rcu_cleaner_func(void *, int);
static void
linux_rcu_runtime_init(void *arg __unused)
{
ck_epoch_record_t **pcpu_record;
ck_epoch_record_t *record;
int i;
ck_epoch_init(&linux_epoch);
/* setup reader records */
CPU_FOREACH(i) {
ck_epoch_record_t *record;
record = malloc(sizeof(*record), M_LRCU, M_WAITOK | M_ZERO);
ck_epoch_register(&linux_epoch, record);
pcpu_record = DPCPU_ID_PTR(i, epoch_record);
*pcpu_record = record;
DPCPU_ID_SET(i, linux_reader_epoch_record, record);
}
/*
* Populate the epoch with 5 * ncpus # of records
*/
for (i = 0; i < 5 * mp_ncpus; i++) {
/* setup writer records */
CPU_FOREACH(i) {
struct writer_epoch_record *record;
record = malloc(sizeof(*record), M_LRCU, M_WAITOK | M_ZERO);
ck_epoch_register(&linux_epoch, record);
ck_epoch_unregister(record);
ck_epoch_register(&linux_epoch, &record->epoch_record);
mtx_init(&record->head_lock, "LRCU-HEAD", NULL, MTX_DEF);
mtx_init(&record->sync_lock, "LRCU-SYNC", NULL, MTX_DEF);
TASK_INIT(&record->task, 0, linux_rcu_cleaner_func, record);
STAILQ_INIT(&record->head);
DPCPU_ID_SET(i, linux_writer_epoch_record, record);
}
}
SYSINIT(linux_rcu_runtime, SI_SUB_LOCK, SI_ORDER_SECOND, linux_rcu_runtime_init, NULL);
@ -95,66 +130,99 @@ SYSINIT(linux_rcu_runtime, SI_SUB_LOCK, SI_ORDER_SECOND, linux_rcu_runtime_init,
static void
linux_rcu_runtime_uninit(void *arg __unused)
{
ck_epoch_record_t **pcpu_record;
ck_epoch_record_t *record;
ck_stack_entry_t *cursor;
ck_stack_entry_t *next;
int i;
while ((record = ck_epoch_recycle(&linux_epoch)) != NULL)
free(record, M_LRCU);
/* make sure all callbacks have been called */
linux_rcu_barrier();
/* destroy all writer record mutexes */
CPU_FOREACH(i) {
pcpu_record = DPCPU_ID_PTR(i, epoch_record);
record = *pcpu_record;
*pcpu_record = NULL;
struct writer_epoch_record *record;
record = DPCPU_ID_GET(i, linux_writer_epoch_record);
mtx_destroy(&record->head_lock);
mtx_destroy(&record->sync_lock);
}
/* free all registered reader and writer records */
CK_STACK_FOREACH_SAFE(&linux_epoch.records, cursor, next) {
ck_epoch_record_t *record;
record = container_of(cursor,
struct ck_epoch_record, record_next);
free(record, M_LRCU);
}
}
SYSUNINIT(linux_rcu_runtime, SI_SUB_LOCK, SI_ORDER_SECOND, linux_rcu_runtime_uninit, NULL);
static ck_epoch_record_t *
linux_rcu_get_record(int canblock)
static inline struct srcu_epoch_record *
linux_srcu_get_record(void)
{
ck_epoch_record_t *record;
struct srcu_epoch_record *record;
if (__predict_true((record = ck_epoch_recycle(&linux_epoch)) != NULL))
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
"linux_srcu_get_record() might sleep");
/*
* NOTE: The only records that are unregistered and can be
* recycled are srcu_epoch_records.
*/
record = (struct srcu_epoch_record *)ck_epoch_recycle(&linux_epoch);
if (__predict_true(record != NULL))
return (record);
if ((record = malloc(sizeof(*record), M_LRCU, M_NOWAIT | M_ZERO)) != NULL) {
ck_epoch_register(&linux_epoch, record);
return (record);
} else if (!canblock)
return (NULL);
record = malloc(sizeof(*record), M_LRCU, M_WAITOK | M_ZERO);
ck_epoch_register(&linux_epoch, record);
mtx_init(&record->read_lock, "SRCU-READ", NULL, MTX_DEF | MTX_NOWITNESS);
mtx_init(&record->sync_lock, "SRCU-SYNC", NULL, MTX_DEF | MTX_NOWITNESS);
ck_epoch_register(&linux_epoch, &record->epoch_record);
return (record);
}
static void
linux_rcu_destroy_object(ck_epoch_entry_t *e)
static inline void
linux_rcu_synchronize_sub(struct writer_epoch_record *record)
{
struct callback_head *rcu;
uintptr_t offset;
rcu = container_of(e, struct callback_head, epoch_entry);
offset = (uintptr_t)rcu->func;
MPASS(rcu->task.ta_pending == 0);
if (offset < LINUX_KFREE_RCU_OFFSET_MAX)
kfree((char *)rcu - offset);
else
rcu->func((struct rcu_head *)rcu);
/* protect access to epoch_record */
mtx_lock(&record->sync_lock);
ck_epoch_synchronize(&record->epoch_record);
mtx_unlock(&record->sync_lock);
}
static void
linux_rcu_cleaner_func(void *context, int pending __unused)
{
struct callback_head *rcu = context;
ck_epoch_record_t *record = rcu->epoch_record;
struct writer_epoch_record *record;
struct callback_head *rcu;
STAILQ_HEAD(, callback_head) head;
ck_epoch_barrier(record);
ck_epoch_unregister(record);
record = context;
/* move current callbacks into own queue */
mtx_lock(&record->head_lock);
STAILQ_INIT(&head);
STAILQ_CONCAT(&head, &record->head);
mtx_unlock(&record->head_lock);
/* synchronize */
linux_rcu_synchronize_sub(record);
/* dispatch all callbacks, if any */
while ((rcu = STAILQ_FIRST(&head)) != NULL) {
uintptr_t offset;
STAILQ_REMOVE_HEAD(&head, entry);
offset = (uintptr_t)rcu->func;
if (offset < LINUX_KFREE_RCU_OFFSET_MAX)
kfree((char *)rcu - offset);
else
rcu->func((struct rcu_head *)rcu);
}
}
void
@ -162,11 +230,21 @@ linux_rcu_read_lock(void)
{
ck_epoch_record_t *record;
/*
* Pin thread to current CPU so that the unlock code gets the
* same per-CPU reader epoch record:
*/
sched_pin();
record = DPCPU_GET(epoch_record);
MPASS(record != NULL);
record = DPCPU_GET(linux_reader_epoch_record);
/*
* Use a critical section to prevent recursion inside
* ck_epoch_begin(). Else this function supports recursion.
*/
critical_enter();
ck_epoch_begin(record, NULL);
critical_exit();
}
void
@ -174,57 +252,63 @@ linux_rcu_read_unlock(void)
{
ck_epoch_record_t *record;
record = DPCPU_GET(epoch_record);
record = DPCPU_GET(linux_reader_epoch_record);
/*
* Use a critical section to prevent recursion inside
* ck_epoch_end(). Else this function supports recursion.
*/
critical_enter();
ck_epoch_end(record, NULL);
critical_exit();
sched_unpin();
}
void
linux_synchronize_rcu(void)
{
ck_epoch_record_t *record;
sched_pin();
record = DPCPU_GET(epoch_record);
MPASS(record != NULL);
ck_epoch_synchronize(record);
sched_unpin();
linux_rcu_synchronize_sub(DPCPU_GET(linux_writer_epoch_record));
}
void
linux_rcu_barrier(void)
{
ck_epoch_record_t *record;
int i;
record = linux_rcu_get_record(0);
ck_epoch_barrier(record);
ck_epoch_unregister(record);
CPU_FOREACH(i) {
struct writer_epoch_record *record;
record = DPCPU_ID_GET(i, linux_writer_epoch_record);
linux_rcu_synchronize_sub(record);
/* wait for callbacks to complete */
taskqueue_drain(taskqueue_fast, &record->task);
}
}
void
linux_call_rcu(struct rcu_head *context, rcu_callback_t func)
{
struct callback_head *ptr = (struct callback_head *)context;
ck_epoch_record_t *record;
struct callback_head *rcu = (struct callback_head *)context;
struct writer_epoch_record *record;
record = linux_rcu_get_record(0);
record = DPCPU_GET(linux_writer_epoch_record);
sched_pin();
MPASS(record != NULL);
ptr->func = func;
ptr->epoch_record = record;
ck_epoch_call(record, &ptr->epoch_entry, linux_rcu_destroy_object);
TASK_INIT(&ptr->task, 0, linux_rcu_cleaner_func, ptr);
taskqueue_enqueue(taskqueue_fast, &ptr->task);
sched_unpin();
mtx_lock(&record->head_lock);
rcu->func = func;
STAILQ_INSERT_TAIL(&record->head, rcu, entry);
taskqueue_enqueue(taskqueue_fast, &record->task);
mtx_unlock(&record->head_lock);
}
int
init_srcu_struct(struct srcu_struct *srcu)
{
ck_epoch_record_t *record;
struct srcu_epoch_record *record;
record = linux_rcu_get_record(0);
record = linux_srcu_get_record();
srcu->ss_epoch_record = record;
return (0);
}
@ -232,28 +316,60 @@ init_srcu_struct(struct srcu_struct *srcu)
void
cleanup_srcu_struct(struct srcu_struct *srcu)
{
ck_epoch_record_t *record;
struct srcu_epoch_record *record;
record = srcu->ss_epoch_record;
srcu->ss_epoch_record = NULL;
ck_epoch_unregister(record);
ck_epoch_unregister(&record->epoch_record);
}
int
srcu_read_lock(struct srcu_struct *srcu)
{
ck_epoch_begin(srcu->ss_epoch_record, NULL);
struct srcu_epoch_record *record;
record = srcu->ss_epoch_record;
mtx_lock(&record->read_lock);
ck_epoch_begin(&record->epoch_record, NULL);
mtx_unlock(&record->read_lock);
return (0);
}
void
srcu_read_unlock(struct srcu_struct *srcu, int key __unused)
{
ck_epoch_end(srcu->ss_epoch_record, NULL);
struct srcu_epoch_record *record;
record = srcu->ss_epoch_record;
mtx_lock(&record->read_lock);
ck_epoch_end(&record->epoch_record, NULL);
mtx_unlock(&record->read_lock);
}
void
synchronize_srcu(struct srcu_struct *srcu)
{
ck_epoch_synchronize(srcu->ss_epoch_record);
struct srcu_epoch_record *record;
record = srcu->ss_epoch_record;
mtx_lock(&record->sync_lock);
ck_epoch_synchronize(&record->epoch_record);
mtx_unlock(&record->sync_lock);
}
void
srcu_barrier(struct srcu_struct *srcu)
{
struct srcu_epoch_record *record;
record = srcu->ss_epoch_record;
mtx_lock(&record->sync_lock);
ck_epoch_barrier(&record->epoch_record);
mtx_unlock(&record->sync_lock);
}