diff --git a/include/spdk/event.h b/include/spdk/event.h index 08b9682de5..af3e3d13b4 100644 --- a/include/spdk/event.h +++ b/include/spdk/event.h @@ -131,6 +131,13 @@ struct spdk_app_opts { uint32_t dpdk_mem_channel; uint32_t dpdk_master_core; int dpdk_mem_size; + + /* The maximum latency allowed when passing an event + * from one core to another. A value of 0 + * means all cores continually poll. This is + * specified in microseconds. + */ + uint64_t max_delay_us; }; /** @@ -213,7 +220,7 @@ void spdk_event_call(spdk_event_t event); #define spdk_event_get_arg2(event) (event)->arg2 /* TODO: This is only used by tests and should be made private */ -void spdk_event_queue_run_all(uint32_t lcore); +uint32_t spdk_event_queue_run_all(uint32_t lcore); /** * \brief Register a poller on the given lcore. diff --git a/lib/event/app.c b/lib/event/app.c index 758f49a67d..3c3ab151c9 100644 --- a/lib/event/app.c +++ b/lib/event/app.c @@ -223,6 +223,7 @@ spdk_app_opts_init(struct spdk_app_opts *opts) opts->dpdk_master_core = SPDK_APP_DPDK_DEFAULT_MASTER_CORE; opts->dpdk_mem_channel = SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL; opts->reactor_mask = NULL; + opts->max_delay_us = 0; } void @@ -322,7 +323,7 @@ spdk_app_init(struct spdk_app_opts *opts) * reactor_mask will be NULL which will enable all cores to run * reactors. */ - if (spdk_reactors_init(opts->reactor_mask)) { + if (spdk_reactors_init(opts->reactor_mask, opts->max_delay_us)) { fprintf(stderr, "Invalid reactor mask.\n"); exit(EXIT_FAILURE); } diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 4e9a1fb40b..47f8111e1a 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -59,6 +59,8 @@ #define SPDK_MAX_SOCKET 64 +#define SPDK_REACTOR_SPIN_TIME_US 1 + struct spdk_poller { TAILQ_ENTRY(spdk_poller) tailq; uint32_t lcore; @@ -94,6 +96,8 @@ struct spdk_reactor { TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers; struct rte_ring *events; + + uint64_t max_delay_us; }; static struct spdk_reactor g_reactors[RTE_MAX_LCORE]; @@ -102,7 +106,8 @@ static int g_reactor_count = 0; static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_INVALID; -static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore); +static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore, + uint64_t max_delay_us); struct rte_mempool *g_spdk_event_mempool[SPDK_MAX_SOCKET]; @@ -203,13 +208,15 @@ spdk_event_queue_run(uint32_t lcore, uint32_t count) } } -void +uint32_t spdk_event_queue_run_all(uint32_t lcore) { uint32_t count; count = spdk_event_queue_count(lcore); spdk_event_queue_run(lcore, count); + + return count; } /** @@ -293,12 +300,23 @@ _spdk_reactor_run(void *arg) { struct spdk_reactor *reactor = arg; struct spdk_poller *poller; + uint32_t event_count; + uint64_t last_action, now; + uint64_t spin_cycles, sleep_cycles; + uint32_t sleep_us; set_reactor_thread_name(); - SPDK_NOTICELOG("waiting for work item to arrive...\n"); + SPDK_NOTICELOG("Reactor started on core 0x%x\n", rte_lcore_id()); + + spin_cycles = SPDK_REACTOR_SPIN_TIME_US * rte_get_timer_hz() / 1000000ULL; + sleep_cycles = reactor->max_delay_us * rte_get_timer_hz() / 1000000ULL; + last_action = rte_get_timer_cycles(); while (1) { - spdk_event_queue_run_all(rte_lcore_id()); + event_count = spdk_event_queue_run_all(rte_lcore_id()); + if (event_count > 0) { + last_action = rte_get_timer_cycles(); + } rte_timer_manage(); @@ -307,11 +325,12 @@ _spdk_reactor_run(void *arg) TAILQ_REMOVE(&reactor->active_pollers, poller, tailq); poller->fn(poller->arg); TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq); + last_action = rte_get_timer_cycles(); } poller = TAILQ_FIRST(&reactor->timer_pollers); if (poller) { - uint64_t now = rte_get_timer_cycles(); + now = rte_get_timer_cycles(); if (now >= poller->next_run_tick) { TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq); @@ -320,6 +339,31 @@ _spdk_reactor_run(void *arg) } } + /* Determine if the thread can sleep */ + if (sleep_cycles > 0) { + now = rte_get_timer_cycles(); + if (now >= (last_action + spin_cycles)) { + sleep_us = reactor->max_delay_us; + + poller = TAILQ_FIRST(&reactor->timer_pollers); + if (poller) { + /* There are timers registered, so don't sleep beyond + * when the next timer should fire */ + if (poller->next_run_tick < (now + sleep_cycles)) { + if (poller->next_run_tick <= now) { + sleep_us = 0; + } else { + sleep_us = ((poller->next_run_tick - now) * 1000000ULL) / rte_get_timer_hz(); + } + } + } + + if (sleep_us > 0) { + usleep(sleep_us); + } + } + } + if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { break; } @@ -329,11 +373,12 @@ _spdk_reactor_run(void *arg) } static void -spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) +spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us) { char ring_name[64]; reactor->lcore = lcore; + reactor->max_delay_us = max_delay_us; TAILQ_INIT(&reactor->active_pollers); TAILQ_INIT(&reactor->timer_pollers); @@ -492,7 +537,7 @@ void spdk_reactors_stop(void) } int -spdk_reactors_init(const char *mask) +spdk_reactors_init(const char *mask, unsigned int max_delay_us) { uint32_t i; int rc; @@ -511,7 +556,7 @@ spdk_reactors_init(const char *mask) RTE_LCORE_FOREACH(i) { if (((1ULL << i) & spdk_app_get_core_mask())) { reactor = spdk_reactor_get(i); - spdk_reactor_construct(reactor, i); + spdk_reactor_construct(reactor, i, max_delay_us); g_reactor_count++; } } diff --git a/lib/event/reactor.h b/lib/event/reactor.h index cfbcaa3215..94fe160167 100644 --- a/lib/event/reactor.h +++ b/lib/event/reactor.h @@ -34,7 +34,7 @@ #ifndef SPDK_REACTOR_H_ #define SPDK_REACTOR_H_ -int spdk_reactors_init(const char *mask); +int spdk_reactors_init(const char *mask, unsigned int max_delay_us); int spdk_reactors_fini(void); void spdk_reactors_start(void); diff --git a/test/lib/event/reactor/reactor.c b/test/lib/event/reactor/reactor.c index b531e0dede..be3b295b72 100644 --- a/test/lib/event/reactor/reactor.c +++ b/test/lib/event/reactor/reactor.c @@ -98,6 +98,7 @@ main(int argc, char **argv) spdk_app_opts_init(&opts); opts.name = "reactor"; + opts.max_delay_us = 1000; g_time_in_sec = 0; @@ -121,7 +122,6 @@ main(int argc, char **argv) opts.shutdown_cb = test_cleanup; - spdk_app_opts_init(&opts); spdk_app_init(&opts); spdk_app_start(test_start, NULL, NULL);