channel: Make spdk_for_each_channel support async operations

While iterating, allow the user to perform asynchronous
operations. To continue iteration, the user is expected
to call spdk_for_each_channel_continue.

Change-Id: Ifd7d03d5fbf17cf13843704274b036d49ca0484a
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/391309
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: Dariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
This commit is contained in:
Ben Walker 2017-12-11 15:14:19 -07:00 committed by Jim Harris
parent 250f8e07bc
commit 5165aee686
9 changed files with 197 additions and 141 deletions

View File

@ -22,6 +22,10 @@ framework (include/spdk/event.h) to the I/O channel library
(include/spdk/io_channel.h). This allows code that doesn't depend on the event
framework to request registration and unregistration of pollers.
spdk_for_each_channel() now allows asynchronous operations during iteration.
Instead of immediately continuing the interation upon returning from the iteration
callback, the user must call spdk_for_each_channel_continue() to resume iteration.
### Block Device Abstraction Layer (bdev)
The poller abstraction was removed from the bdev layer. There is now a general purpose

View File

@ -48,6 +48,7 @@ extern "C" {
struct spdk_thread;
struct spdk_io_channel;
struct spdk_io_channel_iter;
struct spdk_poller;
typedef void (*spdk_thread_fn)(void *ctx);
@ -66,9 +67,8 @@ typedef void (*spdk_io_channel_destroy_cb)(void *io_device, void *ctx_buf);
typedef void (*spdk_io_device_unregister_cb)(void *io_device);
typedef int (*spdk_channel_msg)(void *io_device, struct spdk_io_channel *ch,
void *ctx);
typedef void (*spdk_channel_for_each_cpl)(void *io_device, void *ctx, int status);
typedef void (*spdk_channel_msg)(struct spdk_io_channel_iter *i);
typedef void (*spdk_channel_for_each_cpl)(struct spdk_io_channel_iter *i, int status);
/**
* \brief Initializes the calling thread for I/O channel allocation.
@ -216,7 +216,8 @@ struct spdk_thread *spdk_io_channel_get_thread(struct spdk_io_channel *ch);
* asynchronously, so fn may be called after spdk_for_each_channel returns.
* 'fn' will be called on the correct thread for each channel. 'fn' will be
* called for each channel serially, such that two calls to 'fn' will not
* overlap in time.
* overlap in time. After 'fn' has been called, call
* spdk_for_each_channel_continue() to continue iterating.
*
* Once 'fn' has been called on each channel, 'cpl' will be called
* on the thread that spdk_for_each_channel was initially called from.
@ -224,6 +225,14 @@ struct spdk_thread *spdk_io_channel_get_thread(struct spdk_io_channel *ch);
void spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
spdk_channel_for_each_cpl cpl);
void *spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i);
struct spdk_io_channel *spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i);
void *spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i);
void spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status);
#ifdef __cplusplus
}
#endif

View File

@ -232,25 +232,27 @@ bdev_aio_poll(void *arg)
}
}
static int
_bdev_aio_get_io_inflight(void *io_device, struct spdk_io_channel *ch,
void *ctx)
static void
_bdev_aio_get_io_inflight(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct bdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
if (aio_ch->io_inflight) {
return -1;
spdk_for_each_channel_continue(i, -1);
return;
}
return 0;
spdk_for_each_channel_continue(i, 0);
}
static void
bdev_aio_reset_retry_timer(void *arg);
static void
_bdev_aio_get_io_inflight_done(void *io_device, void *ctx, int status)
_bdev_aio_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
{
struct file_disk *fdisk = ctx;
struct file_disk *fdisk = spdk_io_channel_iter_get_ctx(i);
if (status == -1) {
fdisk->reset_retry_timer = spdk_poller_register(bdev_aio_reset_retry_timer, fdisk, 500);

View File

@ -1310,9 +1310,9 @@ spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
}
static void
_spdk_bdev_reset_dev(void *io_device, void *ctx, int status)
_spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
{
struct spdk_bdev_channel *ch = ctx;
struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
struct spdk_bdev_io *bdev_io;
bdev_io = TAILQ_FIRST(&ch->queued_resets);
@ -1320,13 +1320,14 @@ _spdk_bdev_reset_dev(void *io_device, void *ctx, int status)
spdk_bdev_io_submit_reset(bdev_io);
}
static int
_spdk_bdev_reset_freeze_channel(void *io_device, struct spdk_io_channel *ch,
void *ctx)
static void
_spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *ch;
struct spdk_bdev_channel *channel;
struct spdk_bdev_mgmt_channel *mgmt_channel;
ch = spdk_io_channel_iter_get_channel(i);
channel = spdk_io_channel_get_ctx(ch);
mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
@ -1336,7 +1337,7 @@ _spdk_bdev_reset_freeze_channel(void *io_device, struct spdk_io_channel *ch,
_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
return 0;
spdk_for_each_channel_continue(i, 0);
}
static void
@ -1583,9 +1584,9 @@ _spdk_bdev_io_complete(void *ctx)
}
static void
_spdk_bdev_reset_complete(void *io_device, void *ctx, int status)
_spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
{
struct spdk_bdev_io *bdev_io = ctx;
struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
if (bdev_io->u.reset.ch_ref != NULL) {
spdk_put_io_channel(bdev_io->u.reset.ch_ref);
@ -1595,9 +1596,10 @@ _spdk_bdev_reset_complete(void *io_device, void *ctx, int status)
_spdk_bdev_io_complete(bdev_io);
}
static int
_spdk_bdev_unfreeze_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
static void
_spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
@ -1605,7 +1607,7 @@ _spdk_bdev_unfreeze_channel(void *io_device, struct spdk_io_channel *_ch, void *
_spdk_bdev_channel_start_reset(ch);
}
return 0;
spdk_for_each_channel_continue(i, 0);
}
void

View File

@ -271,35 +271,38 @@ bdev_nvme_flush(struct nvme_bdev *nbdev, struct nvme_bdev_io *bio,
}
static void
_bdev_nvme_reset_done(void *io_device, void *ctx, int status)
_bdev_nvme_reset_done(struct spdk_io_channel_iter *i, int status)
{
void *ctx = spdk_io_channel_iter_get_ctx(i);
int rc = SPDK_BDEV_IO_STATUS_SUCCESS;
if (status) {
rc = SPDK_BDEV_IO_STATUS_FAILED;
}
spdk_bdev_io_complete(spdk_bdev_io_from_ctx(ctx), rc);
}
static int
_bdev_nvme_reset_create_qpair(void *io_device, struct spdk_io_channel *ch,
void *ctx)
static void
_bdev_nvme_reset_create_qpair(struct spdk_io_channel_iter *i)
{
struct spdk_nvme_ctrlr *ctrlr = io_device;
struct nvme_io_channel *nvme_ch = spdk_io_channel_get_ctx(ch);
struct spdk_nvme_ctrlr *ctrlr = spdk_io_channel_iter_get_io_device(i);
struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
struct nvme_io_channel *nvme_ch = spdk_io_channel_get_ctx(_ch);
nvme_ch->qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, NULL, 0);
if (!nvme_ch->qpair) {
return -1;
spdk_for_each_channel_continue(i, -1);
return;
}
return 0;
spdk_for_each_channel_continue(i, 0);
}
static void
_bdev_nvme_reset(void *io_device, void *ctx, int status)
_bdev_nvme_reset(struct spdk_io_channel_iter *i, int status)
{
struct spdk_nvme_ctrlr *ctrlr = io_device;
struct nvme_bdev_io *bio = ctx;
struct spdk_nvme_ctrlr *ctrlr = spdk_io_channel_iter_get_io_device(i);
struct nvme_bdev_io *bio = spdk_io_channel_iter_get_ctx(i);
int rc;
if (status) {
@ -316,16 +319,16 @@ _bdev_nvme_reset(void *io_device, void *ctx, int status)
/* Recreate all of the I/O queue pairs */
spdk_for_each_channel(ctrlr,
_bdev_nvme_reset_create_qpair,
ctx,
bio,
_bdev_nvme_reset_done);
}
static int
_bdev_nvme_reset_destroy_qpair(void *io_device, struct spdk_io_channel *ch,
void *ctx)
static void
_bdev_nvme_reset_destroy_qpair(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct nvme_io_channel *nvme_ch = spdk_io_channel_get_ctx(ch);
int rc;
@ -334,7 +337,7 @@ _bdev_nvme_reset_destroy_qpair(void *io_device, struct spdk_io_channel *ch,
nvme_ch->qpair = NULL;
}
return rc;
spdk_for_each_channel_continue(i, rc);
}
static int

View File

@ -216,22 +216,23 @@ struct spdk_nvmf_tgt_listen_ctx {
};
static void
spdk_nvmf_tgt_listen_done(void *io_device, void *c, int status)
spdk_nvmf_tgt_listen_done(struct spdk_io_channel_iter *i, int status)
{
free(c);
void *ctx = spdk_io_channel_iter_get_ctx(i);
free(ctx);
}
static int
spdk_nvmf_tgt_listen_add_transport(void *io_device,
struct spdk_io_channel *ch,
void *c)
static void
spdk_nvmf_tgt_listen_add_transport(struct spdk_io_channel_iter *i)
{
struct spdk_nvmf_tgt_listen_ctx *ctx = c;
struct spdk_nvmf_poll_group *group;
struct spdk_nvmf_tgt_listen_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
int rc;
group = spdk_io_channel_get_ctx(ch);
return spdk_nvmf_poll_group_add_transport(group, ctx->transport);
rc = spdk_nvmf_poll_group_add_transport(group, ctx->transport);
spdk_for_each_channel_continue(i, rc);
}
int

View File

@ -71,21 +71,20 @@ spdk_nvmf_valid_nqn(const char *nqn)
}
static void
spdk_nvmf_subsystem_create_done(void *io_device, void *ctx, int status)
spdk_nvmf_subsystem_create_done(struct spdk_io_channel_iter *i, int status)
{
}
static int
spdk_nvmf_subsystem_add_to_poll_group(void *io_device,
struct spdk_io_channel *ch,
void *ctx)
static void
spdk_nvmf_subsystem_add_to_poll_group(struct spdk_io_channel_iter *i)
{
struct spdk_nvmf_poll_group *group;
struct spdk_nvmf_subsystem *subsystem = ctx;
struct spdk_nvmf_subsystem *subsystem = spdk_io_channel_iter_get_ctx(i);
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
int rc;
group = spdk_io_channel_get_ctx(ch);
return spdk_nvmf_poll_group_add_subsystem(group, subsystem);
rc = spdk_nvmf_poll_group_add_subsystem(group, subsystem);
spdk_for_each_channel_continue(i, rc);
}
struct spdk_nvmf_subsystem *
@ -167,10 +166,10 @@ spdk_nvmf_create_subsystem(struct spdk_nvmf_tgt *tgt,
}
static void
spdk_nvmf_subsystem_delete_done(void *io_device, void *ctx, int status)
spdk_nvmf_subsystem_delete_done(struct spdk_io_channel_iter *i, int status)
{
struct spdk_nvmf_tgt *tgt = io_device;
struct spdk_nvmf_subsystem *subsystem = ctx;
struct spdk_nvmf_tgt *tgt = spdk_io_channel_iter_get_io_device(i);
struct spdk_nvmf_subsystem *subsystem = spdk_io_channel_iter_get_ctx(i);
struct spdk_nvmf_ns *ns;
for (ns = spdk_nvmf_subsystem_get_first_ns(subsystem); ns != NULL;
@ -189,17 +188,17 @@ spdk_nvmf_subsystem_delete_done(void *io_device, void *ctx, int status)
free(subsystem);
}
static int
spdk_nvmf_subsystem_remove_from_poll_group(void *io_device,
struct spdk_io_channel *ch,
void *ctx)
static void
spdk_nvmf_subsystem_remove_from_poll_group(struct spdk_io_channel_iter *i)
{
struct spdk_nvmf_poll_group *group;
struct spdk_nvmf_subsystem *subsystem = ctx;
struct spdk_nvmf_subsystem *subsystem = spdk_io_channel_iter_get_ctx(i);
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
int rc;
group = spdk_io_channel_get_ctx(ch);
rc = spdk_nvmf_poll_group_remove_subsystem(group, subsystem);
return spdk_nvmf_poll_group_remove_subsystem(group, subsystem);
spdk_for_each_channel_continue(i, rc);
}
void
@ -433,22 +432,24 @@ struct spdk_nvmf_subsystem_add_ns_ctx {
};
static void
spdk_nvmf_subsystem_add_ns_done(void *io_device, void *ctx, int status)
spdk_nvmf_subsystem_add_ns_done(struct spdk_io_channel_iter *i, int status)
{
void *ctx = spdk_io_channel_iter_get_ctx(i);
free(ctx);
}
static int
spdk_nvmf_subsystem_ns_update_poll_group(void *io_device,
struct spdk_io_channel *ch,
void *c)
static void
spdk_nvmf_subsystem_ns_update_poll_group(struct spdk_io_channel_iter *i)
{
struct spdk_nvmf_poll_group *group;
struct spdk_nvmf_subsystem_add_ns_ctx *ctx = c;
struct spdk_nvmf_subsystem_add_ns_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
int rc;
group = spdk_io_channel_get_ctx(ch);
rc = spdk_nvmf_poll_group_add_ns(group, ctx->subsystem, ctx->ns);
return spdk_nvmf_poll_group_add_ns(group, ctx->subsystem, ctx->ns);
spdk_for_each_channel_continue(i, rc);
}
uint32_t

View File

@ -514,12 +514,13 @@ spdk_io_channel_get_thread(struct spdk_io_channel *ch)
return ch->thread;
}
struct call_channel {
struct spdk_io_channel_iter {
void *io_device;
struct io_device *dev;
spdk_channel_msg fn;
int status;
void *ctx;
struct spdk_io_channel *ch;
struct spdk_thread *cur_thread;
@ -527,64 +528,59 @@ struct call_channel {
spdk_channel_for_each_cpl cpl;
};
void *
spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
{
return i->io_device;
}
struct spdk_io_channel *
spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
{
return i->ch;
}
void *
spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
{
return i->ctx;
}
static void
_call_completion(void *ctx)
{
struct call_channel *ch_ctx = ctx;
struct spdk_io_channel_iter *i = ctx;
if (ch_ctx->cpl != NULL) {
ch_ctx->cpl(ch_ctx->io_device, ch_ctx->ctx, ch_ctx->status);
if (i->cpl != NULL) {
i->cpl(i, i->status);
}
free(ch_ctx);
free(i);
}
static void
_call_channel(void *ctx)
{
struct call_channel *ch_ctx = ctx;
struct spdk_thread *thread;
struct spdk_io_channel_iter *i = ctx;
struct spdk_io_channel *ch;
thread = ch_ctx->cur_thread;
pthread_mutex_lock(&g_devlist_mutex);
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == ch_ctx->io_device) {
break;
}
}
pthread_mutex_unlock(&g_devlist_mutex);
/*
* It is possible that the channel was deleted before this
* message had a chance to execute. If so, skip calling
* the fn() on this thread.
*/
if (ch != NULL) {
ch_ctx->status = ch_ctx->fn(ch_ctx->io_device, ch, ch_ctx->ctx);
}
pthread_mutex_lock(&g_devlist_mutex);
if (ch_ctx->status) {
goto end;
}
thread = TAILQ_NEXT(thread, tailq);
while (thread) {
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == ch_ctx->io_device) {
ch_ctx->cur_thread = thread;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(thread, _call_channel, ch_ctx);
return;
}
TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
if (ch->dev->io_device == i->io_device) {
break;
}
thread = TAILQ_NEXT(thread, tailq);
}
end:
ch_ctx->dev->for_each_count--;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(ch_ctx->orig_thread, _call_completion, ch_ctx);
if (ch) {
i->fn(i);
} else {
spdk_for_each_channel_continue(i, 0);
}
}
void
@ -593,38 +589,75 @@ spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
struct call_channel *ch_ctx;
struct spdk_io_channel_iter *i;
ch_ctx = calloc(1, sizeof(*ch_ctx));
if (!ch_ctx) {
SPDK_ERRLOG("Unable to allocate context\n");
i = calloc(1, sizeof(*i));
if (!i) {
SPDK_ERRLOG("Unable to allocate iterator\n");
return;
}
ch_ctx->io_device = io_device;
ch_ctx->fn = fn;
ch_ctx->ctx = ctx;
ch_ctx->cpl = cpl;
i->io_device = io_device;
i->fn = fn;
i->ctx = ctx;
i->cpl = cpl;
pthread_mutex_lock(&g_devlist_mutex);
ch_ctx->orig_thread = _get_thread();
i->orig_thread = _get_thread();
TAILQ_FOREACH(thread, &g_threads, tailq) {
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == io_device) {
ch->dev->for_each_count++;
ch_ctx->dev = ch->dev;
ch_ctx->cur_thread = thread;
i->dev = ch->dev;
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(thread, _call_channel, ch_ctx);
spdk_thread_send_msg(thread, _call_channel, i);
return;
}
}
}
free(ch_ctx);
pthread_mutex_unlock(&g_devlist_mutex);
cpl(io_device, ctx, 0);
cpl(i, 0);
free(i);
}
void
spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
{
struct spdk_thread *thread;
struct spdk_io_channel *ch;
assert(i->cur_thread == spdk_get_thread());
i->status = status;
pthread_mutex_lock(&g_devlist_mutex);
if (status) {
goto end;
}
thread = TAILQ_NEXT(i->cur_thread, tailq);
while (thread) {
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
if (ch->dev->io_device == i->io_device) {
i->cur_thread = thread;
i->ch = ch;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(thread, _call_channel, i);
return;
}
}
thread = TAILQ_NEXT(thread, tailq);
}
end:
i->dev->for_each_count--;
i->ch = NULL;
pthread_mutex_unlock(&g_devlist_mutex);
spdk_thread_send_msg(i->orig_thread, _call_completion, i);
}

View File

@ -146,18 +146,19 @@ channel_destroy(void *io_device, void *ctx_buf)
{
}
static int
channel_msg(void *io_device, struct spdk_io_channel *ch, void *ctx)
static void
channel_msg(struct spdk_io_channel_iter *i)
{
struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
int *count = spdk_io_channel_get_ctx(ch);
(*count)++;
return 0;
spdk_for_each_channel_continue(i, 0);
}
static void
channel_cpl(void *io_device, void *ctx, int status)
channel_cpl(struct spdk_io_channel_iter *i, int status)
{
}
@ -215,20 +216,20 @@ struct unreg_ctx {
bool foreach_done;
};
static int
unreg_ch_done(void *io_device, struct spdk_io_channel *_ch, void *_ctx)
static void
unreg_ch_done(struct spdk_io_channel_iter *i)
{
struct unreg_ctx *ctx = _ctx;
struct unreg_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
ctx->ch_done = true;
return 0;
spdk_for_each_channel_continue(i, 0);
}
static void
unreg_foreach_done(void *io_device, void *_ctx, int status)
unreg_foreach_done(struct spdk_io_channel_iter *i, int status)
{
struct unreg_ctx *ctx = _ctx;
struct unreg_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
ctx->foreach_done = true;
}