lib/event: remove scheduler dependency on lw_thread
Removing dependency on schedulers to directly modify lw_thread field structures will help making schedulers truly plugable. Instead of using lw_thread, new structure is created that holds copy of stats and refer to the thread by spdk_thread id. As an added benefit of not changing lw_thread directly, we won't run into issue of balancing function changing it while other reactor removes and frees it. In the future an API will be added for scheduler to call in order to move the thread directly. Rather than for event framework to rely on modified core_info/thread_info structure. Signed-off-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com> Change-Id: I8f85bb8dc080fd13b78b07ee9ef8e8be7051659b Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8411 Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com> Community-CI: Mellanox Build Bot Reviewed-by: Konrad Sztyber <konrad.sztyber@intel.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
parent
012b233426
commit
b74b6133fa
@ -255,6 +255,18 @@ struct spdk_governor *_spdk_governor_get(void);
|
||||
_spdk_governor_list_add(governor); \
|
||||
} \
|
||||
|
||||
/**
|
||||
* Structure representing thread used for scheduling.
|
||||
*/
|
||||
struct spdk_scheduler_thread_info {
|
||||
uint32_t lcore;
|
||||
uint64_t thread_id;
|
||||
/* stats over a lifetime of a thread */
|
||||
struct spdk_thread_stats total_stats;
|
||||
/* stats during the last scheduling period */
|
||||
struct spdk_thread_stats current_stats;
|
||||
};
|
||||
|
||||
/**
|
||||
* A list of cores and threads which is used for scheduling.
|
||||
*/
|
||||
@ -269,7 +281,7 @@ struct spdk_scheduler_core_info {
|
||||
uint32_t lcore;
|
||||
uint32_t threads_count;
|
||||
bool interrupt_mode;
|
||||
struct spdk_lw_thread **threads;
|
||||
struct spdk_scheduler_thread_info *thread_infos;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -333,7 +333,7 @@ spdk_reactors_fini(void)
|
||||
reactor_interrupt_fini(reactor);
|
||||
|
||||
if (g_core_infos != NULL) {
|
||||
free(g_core_infos[i].threads);
|
||||
free(g_core_infos[i].thread_infos);
|
||||
}
|
||||
}
|
||||
|
||||
@ -687,19 +687,37 @@ _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_threa
|
||||
lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
|
||||
}
|
||||
|
||||
static void
|
||||
_threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
|
||||
{
|
||||
struct spdk_lw_thread *lw_thread;
|
||||
struct spdk_thread *thread;
|
||||
|
||||
thread = spdk_thread_get_by_id(thread_info->thread_id);
|
||||
if (thread == NULL) {
|
||||
/* Thread no longer exists. */
|
||||
return;
|
||||
}
|
||||
lw_thread = spdk_thread_get_ctx(thread);
|
||||
assert(lw_thread != NULL);
|
||||
|
||||
lw_thread->lcore = thread_info->lcore;
|
||||
lw_thread->resched = true;
|
||||
}
|
||||
|
||||
static void
|
||||
_threads_reschedule(struct spdk_scheduler_core_info *cores_info)
|
||||
{
|
||||
struct spdk_scheduler_core_info *core;
|
||||
struct spdk_lw_thread *lw_thread;
|
||||
struct spdk_scheduler_thread_info *thread_info;
|
||||
uint32_t i, j;
|
||||
|
||||
SPDK_ENV_FOREACH_CORE(i) {
|
||||
core = &cores_info[i];
|
||||
for (j = 0; j < core->threads_count; j++) {
|
||||
lw_thread = core->threads[j];
|
||||
if (lw_thread->lcore != i) {
|
||||
lw_thread->resched = true;
|
||||
thread_info = &core->thread_infos[j];
|
||||
if (thread_info->lcore != i) {
|
||||
_threads_reschedule_thread(thread_info);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -782,6 +800,7 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
|
||||
{
|
||||
struct spdk_scheduler_core_info *core_info;
|
||||
struct spdk_lw_thread *lw_thread;
|
||||
struct spdk_thread *thread;
|
||||
struct spdk_reactor *reactor;
|
||||
struct spdk_event *evt;
|
||||
uint32_t next_core;
|
||||
@ -800,8 +819,8 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
|
||||
|
||||
SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
|
||||
|
||||
free(core_info->threads);
|
||||
core_info->threads = NULL;
|
||||
free(core_info->thread_infos);
|
||||
core_info->thread_infos = NULL;
|
||||
|
||||
i = 0;
|
||||
|
||||
@ -813,8 +832,8 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
|
||||
core_info->threads_count = i;
|
||||
|
||||
if (core_info->threads_count > 0) {
|
||||
core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *));
|
||||
if (core_info->threads == NULL) {
|
||||
core_info->thread_infos = calloc(core_info->threads_count, sizeof(*core_info->thread_infos));
|
||||
if (core_info->thread_infos == NULL) {
|
||||
SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
|
||||
|
||||
/* Cancel this round of schedule work */
|
||||
@ -825,7 +844,11 @@ _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
|
||||
|
||||
i = 0;
|
||||
TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
|
||||
core_info->threads[i] = lw_thread;
|
||||
core_info->thread_infos[i].lcore = lw_thread->lcore;
|
||||
thread = spdk_thread_get_from_ctx(lw_thread);
|
||||
core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
|
||||
core_info->thread_infos[i].total_stats = lw_thread->total_stats;
|
||||
core_info->thread_infos[i].current_stats = lw_thread->current_stats;
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
@ -56,12 +56,12 @@ static struct core_stats *g_cores;
|
||||
#define SCHEDULER_CORE_LIMIT 95
|
||||
|
||||
static uint8_t
|
||||
_get_thread_load(struct spdk_lw_thread *lw_thread)
|
||||
_get_thread_load(struct spdk_scheduler_thread_info *thread_info)
|
||||
{
|
||||
uint64_t busy, idle;
|
||||
|
||||
busy = lw_thread->current_stats.busy_tsc;
|
||||
idle = lw_thread->current_stats.idle_tsc;
|
||||
busy = thread_info->current_stats.busy_tsc;
|
||||
idle = thread_info->current_stats.idle_tsc;
|
||||
|
||||
if (busy == 0) {
|
||||
/* No work was done, exit before possible division by 0. */
|
||||
@ -71,7 +71,7 @@ _get_thread_load(struct spdk_lw_thread *lw_thread)
|
||||
return busy * 100 / (busy + idle);
|
||||
}
|
||||
|
||||
typedef void (*_foreach_fn)(struct spdk_lw_thread *lw_thread);
|
||||
typedef void (*_foreach_fn)(struct spdk_scheduler_thread_info *thread_info);
|
||||
|
||||
static void
|
||||
_foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn)
|
||||
@ -82,18 +82,18 @@ _foreach_thread(struct spdk_scheduler_core_info *cores_info, _foreach_fn fn)
|
||||
SPDK_ENV_FOREACH_CORE(i) {
|
||||
core = &cores_info[i];
|
||||
for (j = 0; j < core->threads_count; j++) {
|
||||
fn(core->threads[j]);
|
||||
fn(&core->thread_infos[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
_move_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core)
|
||||
_move_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
|
||||
{
|
||||
struct core_stats *dst = &g_cores[dst_core];
|
||||
struct core_stats *src = &g_cores[lw_thread->lcore];
|
||||
uint64_t busy_tsc = lw_thread->current_stats.busy_tsc;
|
||||
uint64_t idle_tsc = lw_thread->current_stats.idle_tsc;
|
||||
struct core_stats *src = &g_cores[thread_info->lcore];
|
||||
uint64_t busy_tsc = thread_info->current_stats.busy_tsc;
|
||||
uint64_t idle_tsc = thread_info->current_stats.idle_tsc;
|
||||
|
||||
if (src == dst) {
|
||||
/* Don't modify stats if thread is already on that core. */
|
||||
@ -111,7 +111,7 @@ _move_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core)
|
||||
assert(src->thread_count > 0);
|
||||
src->thread_count--;
|
||||
|
||||
lw_thread->lcore = dst_core;
|
||||
thread_info->lcore = dst_core;
|
||||
}
|
||||
|
||||
static bool
|
||||
@ -142,12 +142,12 @@ _is_core_over_limit(uint32_t core_id)
|
||||
}
|
||||
|
||||
static bool
|
||||
_can_core_fit_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core)
|
||||
_can_core_fit_thread(struct spdk_scheduler_thread_info *thread_info, uint32_t dst_core)
|
||||
{
|
||||
struct core_stats *dst = &g_cores[dst_core];
|
||||
|
||||
/* Thread can always fit on the core it's currently on. */
|
||||
if (lw_thread->lcore == dst_core) {
|
||||
if (thread_info->lcore == dst_core) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -162,22 +162,28 @@ _can_core_fit_thread(struct spdk_lw_thread *lw_thread, uint32_t dst_core)
|
||||
return true;
|
||||
}
|
||||
|
||||
if (lw_thread->current_stats.busy_tsc <= dst->idle) {
|
||||
if (thread_info->current_stats.busy_tsc <= dst->idle) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static uint32_t
|
||||
_find_optimal_core(struct spdk_lw_thread *lw_thread)
|
||||
_find_optimal_core(struct spdk_scheduler_thread_info *thread_info)
|
||||
{
|
||||
uint32_t i;
|
||||
uint32_t current_lcore = lw_thread->lcore;
|
||||
uint32_t least_busy_lcore = lw_thread->lcore;
|
||||
struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
|
||||
struct spdk_cpuset *cpumask = spdk_thread_get_cpumask(thread);
|
||||
uint32_t current_lcore = thread_info->lcore;
|
||||
uint32_t least_busy_lcore = thread_info->lcore;
|
||||
struct spdk_thread *thread;
|
||||
struct spdk_cpuset *cpumask;
|
||||
bool core_over_limit = _is_core_over_limit(current_lcore);
|
||||
|
||||
thread = spdk_thread_get_by_id(thread_info->thread_id);
|
||||
if (thread == NULL) {
|
||||
return current_lcore;
|
||||
}
|
||||
cpumask = spdk_thread_get_cpumask(thread);
|
||||
|
||||
/* Find a core that can fit the thread. */
|
||||
SPDK_ENV_FOREACH_CORE(i) {
|
||||
/* Ignore cores outside cpumask. */
|
||||
@ -191,7 +197,7 @@ _find_optimal_core(struct spdk_lw_thread *lw_thread)
|
||||
}
|
||||
|
||||
/* Skip cores that cannot fit the thread and current one. */
|
||||
if (!_can_core_fit_thread(lw_thread, i) || i == current_lcore) {
|
||||
if (!_can_core_fit_thread(thread_info, i) || i == current_lcore) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -263,27 +269,27 @@ deinit(struct spdk_governor *governor)
|
||||
}
|
||||
|
||||
static void
|
||||
_balance_idle(struct spdk_lw_thread *lw_thread)
|
||||
_balance_idle(struct spdk_scheduler_thread_info *thread_info)
|
||||
{
|
||||
if (_get_thread_load(lw_thread) >= SCHEDULER_LOAD_LIMIT) {
|
||||
if (_get_thread_load(thread_info) >= SCHEDULER_LOAD_LIMIT) {
|
||||
return;
|
||||
}
|
||||
/* This thread is idle, move it to the main core. */
|
||||
_move_thread(lw_thread, g_main_lcore);
|
||||
_move_thread(thread_info, g_main_lcore);
|
||||
}
|
||||
|
||||
static void
|
||||
_balance_active(struct spdk_lw_thread *lw_thread)
|
||||
_balance_active(struct spdk_scheduler_thread_info *thread_info)
|
||||
{
|
||||
uint32_t target_lcore;
|
||||
|
||||
if (_get_thread_load(lw_thread) < SCHEDULER_LOAD_LIMIT) {
|
||||
if (_get_thread_load(thread_info) < SCHEDULER_LOAD_LIMIT) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* This thread is active. */
|
||||
target_lcore = _find_optimal_core(lw_thread);
|
||||
_move_thread(lw_thread, target_lcore);
|
||||
target_lcore = _find_optimal_core(thread_info);
|
||||
_move_thread(thread_info, target_lcore);
|
||||
}
|
||||
|
||||
static void
|
||||
|
Loading…
Reference in New Issue
Block a user