From 921bf34289c1aacfea9106776483d2dec2488435 Mon Sep 17 00:00:00 2001 From: Daniel Verkamp Date: Thu, 28 Jul 2016 08:50:50 -0700 Subject: [PATCH] event: replace poller rte_ring with TAILQ The rte_ring used for pollers is already single-producer and single-consumer, so it is not providing any thread safety guarantees. ALl modifications to the active_pollers ring are done from the core that is running the reactor (via events). This means the rte_ring can be replaced with a simpler intrusive linked list. This simplifies the removal of pollers in the middle of the list and avoids extra allocations for the ring. Change-Id: Ica149b7a1668a8af1e6ca8f741c48f2217f6f9bf Signed-off-by: Daniel Verkamp --- include/spdk/event.h | 15 ++++----------- lib/event/reactor.c | 39 +++++++++------------------------------ 2 files changed, 13 insertions(+), 41 deletions(-) diff --git a/include/spdk/event.h b/include/spdk/event.h index 2e54333198..25e22a4307 100644 --- a/include/spdk/event.h +++ b/include/spdk/event.h @@ -103,19 +103,12 @@ typedef void (*spdk_poller_fn)(void *arg); * \brief A poller is a function that is repeatedly called on an lcore. */ struct spdk_poller { - uint32_t lcore; - spdk_poller_fn fn; - void *arg; + TAILQ_ENTRY(spdk_poller) tailq; + uint32_t lcore; + spdk_poller_fn fn; + void *arg; }; -#define SPDK_POLLER_RING_SIZE 4096 - -/* - * -1 accounts for the empty slot needed to differentiate between ring empty - * and ring full. - */ -#define SPDK_MAX_POLLERS_PER_CORE (SPDK_POLLER_RING_SIZE - 1) - typedef void (*spdk_app_shutdown_cb)(void); typedef void (*spdk_sighandler_t)(int); diff --git a/lib/event/reactor.c b/lib/event/reactor.c index 481d9fa691..4cbff1cda9 100644 --- a/lib/event/reactor.c +++ b/lib/event/reactor.c @@ -76,7 +76,7 @@ struct spdk_reactor { * of the ring, executes it, then puts it back at the tail of * the ring. */ - struct rte_ring *active_pollers; + TAILQ_HEAD(, spdk_poller) active_pollers; struct rte_ring *events; }; @@ -253,8 +253,7 @@ static int _spdk_reactor_run(void *arg) { struct spdk_reactor *reactor = arg; - struct spdk_poller *poller = NULL; - int rc; + struct spdk_poller *poller; set_reactor_thread_name(); SPDK_NOTICELOG("waiting for work item to arrive...\n"); @@ -264,14 +263,11 @@ _spdk_reactor_run(void *arg) rte_timer_manage(); - if (rte_ring_dequeue(reactor->active_pollers, (void **)&poller) == 0) { + poller = TAILQ_FIRST(&reactor->active_pollers); + if (poller) { + TAILQ_REMOVE(&reactor->active_pollers, poller, tailq); poller->fn(poller->arg); - rc = rte_ring_enqueue(reactor->active_pollers, - (void *)poller); - if (rc != 0) { - SPDK_ERRLOG("poller could not be enqueued\n"); - exit(EXIT_FAILURE); - } + TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq); } if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { @@ -289,10 +285,7 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) reactor->lcore = lcore; - snprintf(ring_name, sizeof(ring_name), "spdk_active_pollers_%d", lcore); - reactor->active_pollers = - rte_ring_create(ring_name, SPDK_POLLER_RING_SIZE, rte_lcore_to_socket_id(lcore), - RING_F_SP_ENQ | RING_F_SC_DEQ); + TAILQ_INIT(&reactor->active_pollers); snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore); reactor->events = @@ -530,7 +523,7 @@ _spdk_event_add_poller(spdk_event_t event) poller->lcore = reactor->lcore; - rte_ring_enqueue(reactor->active_pollers, (void *)poller); + TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq); if (next) { spdk_event_call(next); @@ -555,22 +548,8 @@ _spdk_event_remove_poller(spdk_event_t event) struct spdk_reactor *reactor = spdk_event_get_arg1(event); struct spdk_poller *poller = spdk_event_get_arg2(event); struct spdk_event *next = spdk_event_get_next(event); - struct spdk_poller *tmp = NULL; - uint32_t i; - int rc; - /* Loop over all pollers, without breaking early, so that - * the list of pollers stays in the same order. */ - for (i = 0; i < rte_ring_count(reactor->active_pollers); i++) { - rte_ring_dequeue(reactor->active_pollers, (void **)&tmp); - if (tmp != poller) { - rc = rte_ring_enqueue(reactor->active_pollers, (void *)tmp); - if (rc != 0) { - SPDK_ERRLOG("poller could not be enqueued\n"); - exit(EXIT_FAILURE); - } - } - } + TAILQ_REMOVE(&reactor->active_pollers, poller, tailq); if (next) { spdk_event_call(next);