event: Allow idle reactors to sleep

The user can now specify a maximum delay, in microseconds, that
defines the maximum amount of time a reactor will sleep for
between polling for new events. By default, the time is 0
which means the reactor will never sleep.

Change-Id: I94cddb69c832524878cad97b66673daa4bd5c721
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Ben Walker 2016-08-16 12:51:25 -07:00
parent 4179c0acc5
commit 086346a4fc
5 changed files with 65 additions and 12 deletions

View File

@ -131,6 +131,13 @@ struct spdk_app_opts {
uint32_t dpdk_mem_channel;
uint32_t dpdk_master_core;
int dpdk_mem_size;
/* The maximum latency allowed when passing an event
* from one core to another. A value of 0
* means all cores continually poll. This is
* specified in microseconds.
*/
uint64_t max_delay_us;
};
/**
@ -213,7 +220,7 @@ void spdk_event_call(spdk_event_t event);
#define spdk_event_get_arg2(event) (event)->arg2
/* TODO: This is only used by tests and should be made private */
void spdk_event_queue_run_all(uint32_t lcore);
uint32_t spdk_event_queue_run_all(uint32_t lcore);
/**
* \brief Register a poller on the given lcore.

View File

@ -223,6 +223,7 @@ spdk_app_opts_init(struct spdk_app_opts *opts)
opts->dpdk_master_core = SPDK_APP_DPDK_DEFAULT_MASTER_CORE;
opts->dpdk_mem_channel = SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL;
opts->reactor_mask = NULL;
opts->max_delay_us = 0;
}
void
@ -322,7 +323,7 @@ spdk_app_init(struct spdk_app_opts *opts)
* reactor_mask will be NULL which will enable all cores to run
* reactors.
*/
if (spdk_reactors_init(opts->reactor_mask)) {
if (spdk_reactors_init(opts->reactor_mask, opts->max_delay_us)) {
fprintf(stderr, "Invalid reactor mask.\n");
exit(EXIT_FAILURE);
}

View File

@ -59,6 +59,8 @@
#define SPDK_MAX_SOCKET 64
#define SPDK_REACTOR_SPIN_TIME_US 1
struct spdk_poller {
TAILQ_ENTRY(spdk_poller) tailq;
uint32_t lcore;
@ -94,6 +96,8 @@ struct spdk_reactor {
TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
struct rte_ring *events;
uint64_t max_delay_us;
};
static struct spdk_reactor g_reactors[RTE_MAX_LCORE];
@ -102,7 +106,8 @@ static int g_reactor_count = 0;
static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_INVALID;
static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore);
static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore,
uint64_t max_delay_us);
struct rte_mempool *g_spdk_event_mempool[SPDK_MAX_SOCKET];
@ -203,13 +208,15 @@ spdk_event_queue_run(uint32_t lcore, uint32_t count)
}
}
void
uint32_t
spdk_event_queue_run_all(uint32_t lcore)
{
uint32_t count;
count = spdk_event_queue_count(lcore);
spdk_event_queue_run(lcore, count);
return count;
}
/**
@ -293,12 +300,23 @@ _spdk_reactor_run(void *arg)
{
struct spdk_reactor *reactor = arg;
struct spdk_poller *poller;
uint32_t event_count;
uint64_t last_action, now;
uint64_t spin_cycles, sleep_cycles;
uint32_t sleep_us;
set_reactor_thread_name();
SPDK_NOTICELOG("waiting for work item to arrive...\n");
SPDK_NOTICELOG("Reactor started on core 0x%x\n", rte_lcore_id());
spin_cycles = SPDK_REACTOR_SPIN_TIME_US * rte_get_timer_hz() / 1000000ULL;
sleep_cycles = reactor->max_delay_us * rte_get_timer_hz() / 1000000ULL;
last_action = rte_get_timer_cycles();
while (1) {
spdk_event_queue_run_all(rte_lcore_id());
event_count = spdk_event_queue_run_all(rte_lcore_id());
if (event_count > 0) {
last_action = rte_get_timer_cycles();
}
rte_timer_manage();
@ -307,11 +325,12 @@ _spdk_reactor_run(void *arg)
TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
poller->fn(poller->arg);
TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
last_action = rte_get_timer_cycles();
}
poller = TAILQ_FIRST(&reactor->timer_pollers);
if (poller) {
uint64_t now = rte_get_timer_cycles();
now = rte_get_timer_cycles();
if (now >= poller->next_run_tick) {
TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
@ -320,6 +339,31 @@ _spdk_reactor_run(void *arg)
}
}
/* Determine if the thread can sleep */
if (sleep_cycles > 0) {
now = rte_get_timer_cycles();
if (now >= (last_action + spin_cycles)) {
sleep_us = reactor->max_delay_us;
poller = TAILQ_FIRST(&reactor->timer_pollers);
if (poller) {
/* There are timers registered, so don't sleep beyond
* when the next timer should fire */
if (poller->next_run_tick < (now + sleep_cycles)) {
if (poller->next_run_tick <= now) {
sleep_us = 0;
} else {
sleep_us = ((poller->next_run_tick - now) * 1000000ULL) / rte_get_timer_hz();
}
}
}
if (sleep_us > 0) {
usleep(sleep_us);
}
}
}
if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
break;
}
@ -329,11 +373,12 @@ _spdk_reactor_run(void *arg)
}
static void
spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us)
{
char ring_name[64];
reactor->lcore = lcore;
reactor->max_delay_us = max_delay_us;
TAILQ_INIT(&reactor->active_pollers);
TAILQ_INIT(&reactor->timer_pollers);
@ -492,7 +537,7 @@ void spdk_reactors_stop(void)
}
int
spdk_reactors_init(const char *mask)
spdk_reactors_init(const char *mask, unsigned int max_delay_us)
{
uint32_t i;
int rc;
@ -511,7 +556,7 @@ spdk_reactors_init(const char *mask)
RTE_LCORE_FOREACH(i) {
if (((1ULL << i) & spdk_app_get_core_mask())) {
reactor = spdk_reactor_get(i);
spdk_reactor_construct(reactor, i);
spdk_reactor_construct(reactor, i, max_delay_us);
g_reactor_count++;
}
}

View File

@ -34,7 +34,7 @@
#ifndef SPDK_REACTOR_H_
#define SPDK_REACTOR_H_
int spdk_reactors_init(const char *mask);
int spdk_reactors_init(const char *mask, unsigned int max_delay_us);
int spdk_reactors_fini(void);
void spdk_reactors_start(void);

View File

@ -98,6 +98,7 @@ main(int argc, char **argv)
spdk_app_opts_init(&opts);
opts.name = "reactor";
opts.max_delay_us = 1000;
g_time_in_sec = 0;
@ -121,7 +122,6 @@ main(int argc, char **argv)
opts.shutdown_cb = test_cleanup;
spdk_app_opts_init(&opts);
spdk_app_init(&opts);
spdk_app_start(test_start, NULL, NULL);