event: hide struct spdk_poller internals

This leaves more flexibility for future changes to the poller
representation without requiring API changes (after this one).

It also prevents the user from accidentally using poller fields in a
non-thread-safe way, since they can't be accessed directly anymore.

Change-Id: I7677d5b93668665d29ae39c5e0ba74333ad3f878
Signed-off-by: Daniel Verkamp <daniel.verkamp@intel.com>
This commit is contained in:
Daniel Verkamp 2016-08-11 16:00:45 -07:00
parent 5c9d560b5a
commit 5d8c94536a
14 changed files with 110 additions and 121 deletions

View File

@ -142,8 +142,11 @@ struct spdk_bdev {
/** Whether the poller is registered with the reactor */
bool is_running;
/** Which lcore the poller is running on */
uint32_t lcore;
/** Poller to submit IO and check completion */
struct spdk_poller poller;
struct spdk_poller *poller;
/** True if another blockdev or a LUN is using this device */
bool claimed;

View File

@ -102,14 +102,7 @@ typedef void (*spdk_poller_fn)(void *arg);
/**
* \brief A poller is a function that is repeatedly called on an lcore.
*/
struct spdk_poller {
TAILQ_ENTRY(spdk_poller) tailq;
uint32_t lcore;
uint64_t period_ticks;
uint64_t next_run_tick;
spdk_poller_fn fn;
void *arg;
};
struct spdk_poller;
typedef void (*spdk_app_shutdown_cb)(void);
typedef void (*spdk_sighandler_t)(int);
@ -225,7 +218,9 @@ void spdk_event_queue_run_all(uint32_t lcore);
/**
* \brief Register a poller on the given lcore.
*/
void spdk_poller_register(struct spdk_poller *poller,
void spdk_poller_register(struct spdk_poller **ppoller,
spdk_poller_fn fn,
void *arg,
uint32_t lcore,
struct spdk_event *complete,
uint64_t period_microseconds);
@ -233,15 +228,9 @@ void spdk_poller_register(struct spdk_poller *poller,
/**
* \brief Unregister a poller on the given lcore.
*/
void spdk_poller_unregister(struct spdk_poller *poller,
void spdk_poller_unregister(struct spdk_poller **ppoller,
struct spdk_event *complete);
/**
* \brief Move a poller from its current lcore to a new lcore.
*/
void spdk_poller_migrate(struct spdk_poller *poller, int new_lcore,
struct spdk_event *complete);
struct spdk_subsystem {
const char *name;
int (*init)(void);

View File

@ -440,7 +440,7 @@ spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
{
struct spdk_bdev *bdev = bdev_io->bdev;
struct spdk_event *event, *cb_event = NULL;
uint32_t lcore = bdev->poller.lcore;
uint32_t lcore = bdev->lcore;
/* start the poller when first IO comes */
if (!bdev->is_running) {
@ -448,7 +448,8 @@ spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
if (lcore == 0) {
lcore = rte_lcore_id();
}
spdk_poller_register(&bdev->poller, lcore, NULL, 0);
bdev->lcore = lcore;
spdk_poller_register(&bdev->poller, spdk_bdev_do_work, bdev, lcore, NULL, 0);
}
if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
@ -797,8 +798,6 @@ spdk_bdev_register(struct spdk_bdev *bdev)
/* initialize the reset generation value to zero */
bdev->gencnt = 0;
bdev->is_running = false;
bdev->poller.fn = spdk_bdev_do_work;
bdev->poller.arg = bdev;
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
TAILQ_INSERT_TAIL(&spdk_bdev_list, bdev, link);

View File

@ -59,6 +59,15 @@
#define SPDK_MAX_SOCKET 64
struct spdk_poller {
TAILQ_ENTRY(spdk_poller) tailq;
uint32_t lcore;
uint64_t period_ticks;
uint64_t next_run_tick;
spdk_poller_fn fn;
void *arg;
};
enum spdk_reactor_state {
SPDK_REACTOR_STATE_INVALID = 0,
SPDK_REACTOR_STATE_INITIALIZED = 1,
@ -569,8 +578,6 @@ _spdk_event_add_poller(spdk_event_t event)
struct spdk_poller *poller = spdk_event_get_arg2(event);
struct spdk_event *next = spdk_event_get_next(event);
poller->lcore = reactor->lcore;
if (poller->period_ticks) {
spdk_poller_insert_timer(reactor, poller, rte_get_timer_cycles());
} else {
@ -582,36 +589,47 @@ _spdk_event_add_poller(spdk_event_t event)
}
}
static void
_spdk_poller_register(struct spdk_poller *poller, uint32_t lcore,
struct spdk_event *complete)
void
spdk_poller_register(struct spdk_poller **ppoller, spdk_poller_fn fn, void *arg,
uint32_t lcore, struct spdk_event *complete, uint64_t period_microseconds)
{
struct spdk_poller *poller;
struct spdk_reactor *reactor;
struct spdk_event *event;
reactor = spdk_reactor_get(lcore);
event = spdk_event_allocate(lcore, _spdk_event_add_poller, reactor, poller, complete);
spdk_event_call(event);
}
poller = calloc(1, sizeof(*poller));
if (poller == NULL) {
SPDK_ERRLOG("Poller memory allocation failed\n");
abort();
}
poller->lcore = lcore;
poller->fn = fn;
poller->arg = arg;
void
spdk_poller_register(struct spdk_poller *poller,
uint32_t lcore, struct spdk_event *complete, uint64_t period_microseconds)
{
if (period_microseconds) {
poller->period_ticks = (rte_get_timer_hz() * period_microseconds) / 1000000ULL;
} else {
poller->period_ticks = 0;
}
_spdk_poller_register(poller, lcore, complete);
if (*ppoller != NULL) {
SPDK_ERRLOG("Attempted reuse of poller pointer\n");
abort();
}
*ppoller = poller;
reactor = spdk_reactor_get(lcore);
event = spdk_event_allocate(lcore, _spdk_event_add_poller, reactor, poller, complete);
spdk_event_call(event);
}
static void
_spdk_event_remove_poller(spdk_event_t event)
{
struct spdk_reactor *reactor = spdk_event_get_arg1(event);
struct spdk_poller *poller = spdk_event_get_arg2(event);
struct spdk_poller *poller = spdk_event_get_arg1(event);
struct spdk_reactor *reactor = spdk_reactor_get(poller->lcore);
struct spdk_event *next = spdk_event_get_next(event);
if (poller->period_ticks) {
@ -620,47 +638,30 @@ _spdk_event_remove_poller(spdk_event_t event)
TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
}
free(poller);
if (next) {
spdk_event_call(next);
}
}
void
spdk_poller_unregister(struct spdk_poller *poller,
spdk_poller_unregister(struct spdk_poller **ppoller,
struct spdk_event *complete)
{
struct spdk_reactor *reactor;
struct spdk_event *event;
struct spdk_poller *poller;
reactor = spdk_reactor_get(poller->lcore);
event = spdk_event_allocate(poller->lcore, _spdk_event_remove_poller, reactor, poller, complete);
poller = *ppoller;
spdk_event_call(event);
}
static void
_spdk_poller_migrate(spdk_event_t event)
{
struct spdk_poller *poller = spdk_event_get_arg1(event);
struct spdk_event *next = spdk_event_get_next(event);
/* Register the poller on the current lcore. This works
* because we already set this event up so that it is called
* on the new_lcore.
*/
_spdk_poller_register(poller, rte_lcore_id(), next);
}
void
spdk_poller_migrate(struct spdk_poller *poller, int new_lcore,
struct spdk_event *complete)
{
struct spdk_event *event;
RTE_VERIFY(spdk_app_get_core_mask() & (1ULL << new_lcore));
RTE_VERIFY(poller != NULL);
event = spdk_event_allocate(new_lcore, _spdk_poller_migrate, poller, NULL, complete);
spdk_poller_unregister(poller, event);
*ppoller = NULL;
if (poller == NULL) {
if (complete) {
spdk_event_call(complete);
}
return;
}
spdk_event_call(spdk_event_allocate(poller->lcore, _spdk_event_remove_poller, poller, NULL,
complete));
}

View File

@ -47,7 +47,7 @@
#define ACCEPT_TIMEOUT_US 1000 /* 1ms */
static struct spdk_poller g_acceptor_poller;
static struct spdk_poller *g_acceptor_poller;
static void
spdk_iscsi_portal_accept(struct spdk_iscsi_portal *portal)
@ -94,9 +94,8 @@ spdk_acceptor(void *arg)
void
spdk_iscsi_acceptor_start(void)
{
g_acceptor_poller.fn = spdk_acceptor;
g_acceptor_poller.arg = &g_spdk_iscsi;
spdk_poller_register(&g_acceptor_poller, spdk_app_get_current_core(), NULL, ACCEPT_TIMEOUT_US);
spdk_poller_register(&g_acceptor_poller, spdk_acceptor, &g_spdk_iscsi, spdk_app_get_current_core(),
NULL, ACCEPT_TIMEOUT_US);
}
void

View File

@ -82,7 +82,7 @@ static void __add_idle_conn(spdk_event_t event);
/** Global variables used for managing idle connections. */
static int g_epoll_fd = 0;
static struct spdk_poller g_idle_conn_poller;
static struct spdk_poller *g_idle_conn_poller;
static STAILQ_HEAD(idle_list, spdk_iscsi_conn) g_idle_conn_list_head;
void spdk_iscsi_conn_login_do_work(void *arg);
@ -276,9 +276,8 @@ int spdk_initialize_iscsi_conns(void)
return -1;
}
g_idle_conn_poller.fn = spdk_iscsi_conn_idle_do_work;
g_idle_conn_poller.arg = NULL;
spdk_poller_register(&g_idle_conn_poller, rte_get_master_lcore(), NULL, 0);
spdk_poller_register(&g_idle_conn_poller, spdk_iscsi_conn_idle_do_work, NULL,
rte_get_master_lcore(), NULL, 0);
return 0;
}
@ -388,8 +387,6 @@ error_return:
rte_timer_init(&conn->logout_timer);
rte_timer_init(&conn->shutdown_timer);
SPDK_NOTICELOG("Launching connection on acceptor thread\n");
conn->poller.fn = spdk_iscsi_conn_login_do_work;
conn->poller.arg = conn;
conn->last_activity_tsc = rte_get_timer_cycles();
conn->pending_task_cnt = 0;
conn->pending_activate_event = false;
@ -401,7 +398,8 @@ error_return:
*/
spdk_net_framework_clear_socket_association(conn->sock);
rte_atomic32_inc(&g_num_connections[spdk_app_get_current_core()]);
spdk_poller_register(&conn->poller, spdk_app_get_current_core(), NULL, 0);
spdk_poller_register(&conn->poller, spdk_iscsi_conn_login_do_work, conn,
spdk_app_get_current_core(), NULL, 0);
return 0;
}
@ -639,7 +637,8 @@ void spdk_shutdown_iscsi_conns(void)
*/
STAILQ_FOREACH_SAFE(conn, &g_idle_conn_list_head, link, tmp) {
STAILQ_REMOVE(&g_idle_conn_list_head, conn, spdk_iscsi_conn, link);
spdk_poller_register(&conn->poller, rte_get_master_lcore(), NULL, 0);
spdk_poller_register(&conn->poller, spdk_iscsi_conn_full_feature_do_work, conn,
rte_get_master_lcore(), NULL, 0);
conn->is_idle = 0;
del_idle_conn(conn);
}
@ -1182,12 +1181,23 @@ conn_exit:
return 0;
}
static void
spdk_iscsi_conn_full_feature_migrate(struct spdk_event *event)
{
struct spdk_iscsi_conn *conn = spdk_event_get_arg1(event);
/* The poller has been unregistered, so now we can re-register it on the new core. */
spdk_poller_register(&conn->poller, spdk_iscsi_conn_full_feature_do_work, conn,
spdk_app_get_current_core(), NULL, 0);
}
void
spdk_iscsi_conn_login_do_work(void *arg)
{
struct spdk_iscsi_conn *conn = arg;
int lcore;
int rc;
struct spdk_event *event;
/* General connection processing */
rc = spdk_iscsi_conn_execute(conn);
@ -1199,11 +1209,11 @@ spdk_iscsi_conn_login_do_work(void *arg)
*/
if (conn->login_phase == ISCSI_FULL_FEATURE_PHASE) {
lcore = spdk_iscsi_conn_allocate_reactor(conn->portal->cpumask);
conn->poller.fn = spdk_iscsi_conn_full_feature_do_work;
event = spdk_event_allocate(lcore, spdk_iscsi_conn_full_feature_migrate, conn, NULL, NULL);
rte_atomic32_dec(&g_num_connections[spdk_app_get_current_core()]);
rte_atomic32_inc(&g_num_connections[lcore]);
spdk_net_framework_clear_socket_association(conn->sock);
spdk_poller_migrate(&conn->poller, lcore, NULL);
spdk_poller_unregister(&conn->poller, event);
}
}
@ -1278,7 +1288,7 @@ void spdk_iscsi_conn_idle_do_work(void *arg)
lcore = spdk_iscsi_conn_allocate_reactor(tconn->portal->cpumask);
rte_atomic32_inc(&g_num_connections[lcore]);
spdk_net_framework_clear_socket_association(tconn->sock);
spdk_poller_register(&tconn->poller, lcore, NULL, 0);
spdk_poller_register(&tconn->poller, spdk_iscsi_conn_full_feature_do_work, tconn, lcore, NULL, 0);
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "add conn id = %d, cid = %d poller = %p to lcore = %d active\n",
tconn->id, tconn->cid, &tconn->poller, lcore);
}
@ -1298,14 +1308,15 @@ __add_idle_conn(spdk_event_t e)
* process.
*/
if (conn->state == ISCSI_CONN_STATE_EXITING) {
spdk_poller_register(&conn->poller, rte_get_master_lcore(), NULL, 0);
spdk_poller_register(&conn->poller, spdk_iscsi_conn_full_feature_do_work, conn,
rte_get_master_lcore(), NULL, 0);
return;
}
rc = add_idle_conn(conn);
if (rc == 0) {
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "add conn id = %d, cid = %d poller = %p to lcore = %d idle\n",
conn->id, conn->cid, &conn->poller, conn->poller.lcore);
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "add conn id = %d, cid = %d poller = %p to idle\n",
conn->id, conn->cid, conn->poller);
conn->is_idle = 1;
STAILQ_INSERT_TAIL(&g_idle_conn_list_head, conn, link);
} else {

View File

@ -157,7 +157,7 @@ struct spdk_iscsi_conn {
char *partial_text_parameter;
STAILQ_ENTRY(spdk_iscsi_conn) link;
struct spdk_poller poller;
struct spdk_poller *poller;
TAILQ_HEAD(queued_r2t_tasks, spdk_iscsi_task) queued_r2t_tasks;
TAILQ_HEAD(active_r2t_tasks, spdk_iscsi_task) active_r2t_tasks;
TAILQ_HEAD(queued_datain_tasks, spdk_iscsi_task) queued_datain_tasks;

View File

@ -908,9 +908,6 @@ spdk_rpc_get_iscsi_connections(struct spdk_jsonrpc_server_conn *conn,
spdk_json_write_name(w, "is_idle");
spdk_json_write_int32(w, c->is_idle);
spdk_json_write_name(w, "lcore_id");
spdk_json_write_int32(w, c->poller.lcore);
spdk_json_write_name(w, "initiator_addr");
spdk_json_write_string(w, c->initiator_addr);

View File

@ -140,7 +140,7 @@ struct spdk_nvmf_rdma_session {
};
struct spdk_nvmf_rdma {
struct spdk_poller acceptor_poller;
struct spdk_poller *acceptor_poller;
struct rdma_event_channel *acceptor_event_channel;
struct rdma_cm_id *acceptor_listen_id;
@ -769,7 +769,7 @@ nvmf_rdma_disconnect(struct rdma_cm_event *evt)
}
/* Pass an event to the core that owns this connection */
event = spdk_event_allocate(session->subsys->poller.lcore,
event = spdk_event_allocate(session->subsys->lcore,
spdk_nvmf_handle_disconnect,
session, conn, NULL);
spdk_event_call(event);
@ -1018,9 +1018,8 @@ spdk_nvmf_rdma_acceptor_start(void)
sin_port = ntohs(rdma_get_src_port(g_rdma.acceptor_listen_id));
SPDK_NOTICELOG("*** NVMf Target Listening on port %d ***\n", sin_port);
g_rdma.acceptor_poller.fn = nvmf_rdma_accept;
g_rdma.acceptor_poller.arg = NULL;
spdk_poller_register(&g_rdma.acceptor_poller, rte_lcore_id(), NULL, ACCEPT_TIMEOUT_US);
spdk_poller_register(&g_rdma.acceptor_poller, nvmf_rdma_accept, NULL, rte_lcore_id(), NULL,
ACCEPT_TIMEOUT_US);
return rc;
listen_error:

View File

@ -202,7 +202,7 @@ nvmf_process_connect(struct spdk_nvmf_request *req)
}
/* Pass an event to the lcore that owns this subsystem */
event = spdk_event_allocate(subsystem->poller.lcore, nvmf_handle_connect, req, NULL, NULL);
event = spdk_event_allocate(subsystem->lcore, nvmf_handle_connect, req, NULL, NULL);
spdk_event_call(event);
return SPDK_NVMF_REQUEST_EXEC_STATUS_ASYNCHRONOUS;

View File

@ -112,9 +112,8 @@ nvmf_create_subsystem(int num, const char *name,
TAILQ_INIT(&subsystem->listen_addrs);
TAILQ_INIT(&subsystem->hosts);
subsystem->poller.fn = spdk_nvmf_subsystem_poller;
subsystem->poller.arg = subsystem;
spdk_poller_register(&subsystem->poller, lcore, NULL, 0);
subsystem->lcore = lcore;
spdk_poller_register(&subsystem->poller, spdk_nvmf_subsystem_poller, subsystem, lcore, NULL, 0);
TAILQ_INSERT_HEAD(&g_subsystems, subsystem, entries);

View File

@ -119,13 +119,14 @@ struct spdk_nvmf_controller {
*/
struct spdk_nvmf_subsystem {
uint16_t num;
uint32_t lcore;
char subnqn[MAX_NQN_SIZE];
enum spdk_nvmf_subsystem_mode mode;
enum spdk_nvmf_subtype subtype;
struct nvmf_session *session;
struct spdk_nvmf_controller ctrlr;
struct spdk_poller poller;
struct spdk_poller *poller;
TAILQ_HEAD(, spdk_nvmf_listen_addr) listen_addrs;
uint32_t num_listen_addrs;

View File

@ -39,10 +39,10 @@
#include "spdk/event.h"
static int g_time_in_sec;
static struct spdk_poller test_end_poller;
static struct spdk_poller poller_100ms;
static struct spdk_poller poller_250ms;
static struct spdk_poller poller_500ms;
static struct spdk_poller *test_end_poller;
static struct spdk_poller *poller_100ms;
static struct spdk_poller *poller_250ms;
static struct spdk_poller *poller_500ms;
static void
test_end(void *arg)
@ -65,20 +65,11 @@ test_start(spdk_event_t evt)
printf("test_start\n");
/* Register a poller that will stop the test after the time has elapsed. */
test_end_poller.fn = test_end;
spdk_poller_register(&test_end_poller, 0, NULL, g_time_in_sec * 1000000ULL);
spdk_poller_register(&test_end_poller, test_end, NULL, 0, NULL, g_time_in_sec * 1000000ULL);
poller_100ms.fn = tick;
poller_100ms.arg = (void *)100;
spdk_poller_register(&poller_100ms, 0, NULL, 100000);
poller_250ms.fn = tick;
poller_250ms.arg = (void *)250;
spdk_poller_register(&poller_250ms, 0, NULL, 250000);
poller_500ms.fn = tick;
poller_500ms.arg = (void *)500;
spdk_poller_register(&poller_500ms, 0, NULL, 500000);
spdk_poller_register(&poller_100ms, tick, (void *)100, 0, NULL, 100000);
spdk_poller_register(&poller_250ms, tick, (void *)250, 0, NULL, 250000);
spdk_poller_register(&poller_500ms, tick, (void *)500, 0, NULL, 500000);
}
static void

View File

@ -48,8 +48,8 @@ SPDK_LOG_REGISTER_TRACE_FLAG("nvmf", SPDK_TRACE_NVMF)
struct spdk_nvmf_globals g_nvmf_tgt;
void
spdk_poller_register(struct spdk_poller *poller, uint32_t lcore, struct spdk_event *complete,
uint64_t period_microseconds)
spdk_poller_register(struct spdk_poller **ppoller, spdk_poller_fn fn, void *arg, uint32_t lcore,
struct spdk_event *complete, uint64_t period_microseconds)
{
}