bdev: Move pending queues to mgmt channel

Instead of an array of queues per core, allocate
the queues per management channel.

Change-Id: I4ace5bd13362a549a45aba62e978dabbbe1a333d
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/362617
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: John Meneghini <johnm@netapp.com>
This commit is contained in:
Ben Walker 2017-05-10 14:42:45 -07:00 committed by Daniel Verkamp
parent 49977bc28e
commit 40217741c0

View File

@ -36,8 +36,6 @@
#include "spdk/bdev.h"
#include <rte_config.h>
#include <rte_lcore.h>
#include "spdk/env.h"
#include "spdk/io_channel.h"
#include "spdk/likely.h"
@ -61,9 +59,6 @@ struct spdk_bdev_mgr {
struct spdk_mempool *buf_small_pool;
struct spdk_mempool *buf_large_pool;
need_buf_tailq_t need_buf_small[RTE_MAX_LCORE];
need_buf_tailq_t need_buf_large[RTE_MAX_LCORE];
TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
TAILQ_HEAD(, spdk_bdev_module_if) vbdev_modules;
@ -80,6 +75,8 @@ static struct spdk_bdev_module_if *g_next_bdev_module;
static struct spdk_bdev_module_if *g_next_vbdev_module;
struct spdk_bdev_mgmt_channel {
need_buf_tailq_t need_buf_small;
need_buf_tailq_t need_buf_large;
};
struct spdk_bdev_channel {
@ -154,18 +151,21 @@ spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
void *buf;
need_buf_tailq_t *tailq;
uint64_t length;
struct spdk_bdev_mgmt_channel *ch;
assert(bdev_io->u.read.iovcnt == 1);
length = bdev_io->u.read.len;
buf = bdev_io->buf;
ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
pool = g_bdev_mgr.buf_small_pool;
tailq = &g_bdev_mgr.need_buf_small[rte_lcore_id()];
tailq = &ch->need_buf_small;
} else {
pool = g_bdev_mgr.buf_large_pool;
tailq = &g_bdev_mgr.need_buf_large[rte_lcore_id()];
tailq = &ch->need_buf_large;
}
if (TAILQ_EMPTY(tailq)) {
@ -184,6 +184,7 @@ spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
struct spdk_mempool *pool;
need_buf_tailq_t *tailq;
void *buf = NULL;
struct spdk_bdev_mgmt_channel *ch;
assert(cb != NULL);
assert(bdev_io->u.read.iovs != NULL);
@ -194,13 +195,15 @@ spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
return;
}
ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
bdev_io->get_buf_cb = cb;
if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
pool = g_bdev_mgr.buf_small_pool;
tailq = &g_bdev_mgr.need_buf_small[rte_lcore_id()];
tailq = &ch->need_buf_small;
} else {
pool = g_bdev_mgr.buf_large_pool;
tailq = &g_bdev_mgr.need_buf_large[rte_lcore_id()];
tailq = &ch->need_buf_large;
}
buf = spdk_mempool_get(pool);
@ -253,12 +256,22 @@ spdk_bdev_config_text(FILE *fp)
static int
spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
{
struct spdk_bdev_mgmt_channel *ch = ctx_buf;
TAILQ_INIT(&ch->need_buf_small);
TAILQ_INIT(&ch->need_buf_large);
return 0;
}
static void
spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
{
struct spdk_bdev_mgmt_channel *ch = ctx_buf;
if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
}
}
void
@ -310,7 +323,7 @@ spdk_vbdev_module_init_next(int rc)
static void
spdk_bdev_initialize(void)
{
int i, cache_size;
int cache_size;
int rc = 0;
g_bdev_mgr.bdev_io_pool = spdk_mempool_create("blockdev_io",
@ -326,11 +339,6 @@ spdk_bdev_initialize(void)
goto end;
}
for (i = 0; i < RTE_MAX_LCORE; i++) {
TAILQ_INIT(&g_bdev_mgr.need_buf_small[i]);
TAILQ_INIT(&g_bdev_mgr.need_buf_large[i]);
}
/**
* Ensure no more than half of the total buffers end up local caches, by
* using spdk_env_get_core_count() to determine how many local caches we need
@ -572,9 +580,12 @@ static void
spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
{
struct spdk_bdev_channel *ch = ctx_buf;
struct spdk_bdev_mgmt_channel *mgmt_channel;
_spdk_bdev_abort_io(&g_bdev_mgr.need_buf_small[spdk_env_get_current_core()], ch);
_spdk_bdev_abort_io(&g_bdev_mgr.need_buf_large[spdk_env_get_current_core()], ch);
mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
spdk_put_io_channel(ch->channel);
spdk_put_io_channel(ch->mgmt_channel);
@ -902,11 +913,13 @@ _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
void *ctx)
{
struct spdk_bdev_channel *channel;
struct spdk_bdev_mgmt_channel *mgmt_channel;
channel = spdk_io_channel_get_ctx(ch);
mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
_spdk_bdev_abort_io(&g_bdev_mgr.need_buf_small[spdk_env_get_current_core()], channel);
_spdk_bdev_abort_io(&g_bdev_mgr.need_buf_large[spdk_env_get_current_core()], channel);
_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
}
int
@ -929,7 +942,7 @@ spdk_bdev_reset(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
/* First, abort all I/O queued up waiting for buffers. */
spdk_for_each_channel(&g_bdev_mgr,
spdk_for_each_channel(bdev,
_spdk_bdev_reset_abort_channel,
bdev_io,
_spdk_bdev_reset_dev);