idxd: refactor flow control for idxd engine

Recent work identified race conditions having to do with the
dynamic flow control mechanism for the idxd engine. In order
to both address the issue and simplify the code a new scheme
is now in place.  Essentially every DSA device will be allowed
to accomodate 8 channels and each channel will get a fixed 1/8
the number of work queue entries regardless of how many
channels there are.  Assignment of channels to devices is round
robin and if/when no more channels can be accommodated the get
channel request will fail.

The performance tests also revealed another issue that was
masked before, it's a one-line so is in this patch for convenience.
In the idxd poller we limit the number of completions allowed
during one run to avoid the poller thread from starving other
threads since as operations complete on this thread they are
immediately replaced up to the limit for the channel.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I913e809a934b562feb495815a9b9c605d622285c
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8171
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
This commit is contained in:
paul luse 2021-06-03 13:29:01 -04:00 committed by Jim Harris
parent 570c8bb483
commit 10808e45d4
7 changed files with 95 additions and 268 deletions

View File

@ -69,14 +69,6 @@ struct idxd_batch;
*/
int spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan);
/**
* Reconfigures this channel based on how many current channels there are.
*
* \param chan IDXD channel to be set.
* \return 0 on success, negative errno on failure.
*/
int spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan);
/**
* Signature for callback function invoked when a request is completed.
*
@ -431,7 +423,6 @@ int spdk_idxd_process_events(struct spdk_idxd_io_channel *chan);
* Returns an IDXD channel for a given IDXD device.
*
* \param idxd IDXD device to get a channel for.
*
* \return pointer to an IDXD channel.
*/
struct spdk_idxd_io_channel *spdk_idxd_get_channel(struct spdk_idxd_device *idxd);
@ -440,17 +431,16 @@ struct spdk_idxd_io_channel *spdk_idxd_get_channel(struct spdk_idxd_device *idxd
* Free an IDXD channel.
*
* \param chan IDXD channel to free.
* \return true if the underlying device needs a rebalance
*/
bool spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan);
void spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan);
/**
* Determine if the idxd device needs rebalancing.
* Get the max number of outstanding operations supported by this channel.
*
* \param idxd IDXD device.
* \return true if rebalance is needed, false if not.
* \param chan IDXD channel to communicate on.
* \return max number of operations supported.
*/
bool spdk_idxd_device_needs_rebalance(struct spdk_idxd_device *idxd);
int spdk_idxd_chan_get_max_operations(struct spdk_idxd_io_channel *chan);
#ifdef __cplusplus
}

View File

@ -46,6 +46,12 @@
#define ALIGN_4K 0x1000
#define USERSPACE_DRIVER_NAME "user"
#define CHAN_PER_DEVICE(total_wq_size) ((total_wq_size >= 128) ? 8 : 4)
/*
* Need to limit how many completions we reap in one poller to avoid starving
* other threads as callers can submit new operations on the polling thread.
*/
#define MAX_COMPLETIONS_PER_POLL 16
static STAILQ_HEAD(, spdk_idxd_impl) g_idxd_impls = STAILQ_HEAD_INITIALIZER(g_idxd_impls);
static struct spdk_idxd_impl *g_idxd_impl;
@ -75,12 +81,6 @@ struct device_config g_dev_cfg1 = {
.total_engines = 4,
};
bool
spdk_idxd_device_needs_rebalance(struct spdk_idxd_device *idxd)
{
return idxd->needs_rebalance;
}
static uint64_t
idxd_read_8(struct spdk_idxd_device *idxd, void *portal, uint32_t offset)
{
@ -99,11 +99,6 @@ spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
SPDK_ERRLOG("Failed to allocate idxd chan\n");
return NULL;
}
chan->idxd = idxd;
TAILQ_INIT(&chan->batches);
TAILQ_INIT(&chan->batch_pool);
TAILQ_INIT(&chan->comp_ctx_oustanding);
chan->batch_base = calloc(NUM_BATCHES_PER_CHANNEL, sizeof(struct idxd_batch));
if (chan->batch_base == NULL) {
@ -112,36 +107,39 @@ spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
return NULL;
}
pthread_mutex_lock(&idxd->num_channels_lock);
if (idxd->num_channels == CHAN_PER_DEVICE(idxd->total_wq_size)) {
/* too many channels sharing this device */
pthread_mutex_unlock(&idxd->num_channels_lock);
free(chan->batch_base);
free(chan);
return NULL;
}
idxd->num_channels++;
pthread_mutex_unlock(&idxd->num_channels_lock);
chan->idxd = idxd;
TAILQ_INIT(&chan->batches);
TAILQ_INIT(&chan->batch_pool);
TAILQ_INIT(&chan->comp_ctx_oustanding);
batch = chan->batch_base;
for (i = 0 ; i < NUM_BATCHES_PER_CHANNEL ; i++) {
TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
batch++;
}
pthread_mutex_lock(&chan->idxd->num_channels_lock);
chan->idxd->num_channels++;
if (chan->idxd->num_channels > 1) {
chan->idxd->needs_rebalance = true;
} else {
chan->idxd->needs_rebalance = false;
}
pthread_mutex_unlock(&chan->idxd->num_channels_lock);
return chan;
}
bool
void
spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
{
struct idxd_batch *batch;
bool rebalance = false;
pthread_mutex_lock(&chan->idxd->num_channels_lock);
assert(chan->idxd->num_channels > 0);
chan->idxd->num_channels--;
if (chan->idxd->num_channels > 0) {
rebalance = true;
}
pthread_mutex_unlock(&chan->idxd->num_channels_lock);
spdk_free(chan->completions);
@ -154,8 +152,13 @@ spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
}
free(chan->batch_base);
free(chan);
}
return rebalance;
/* returns the total max operations for channel. */
int
spdk_idxd_chan_get_max_operations(struct spdk_idxd_io_channel *chan)
{
return chan->idxd->total_wq_size / CHAN_PER_DEVICE(chan->idxd->total_wq_size);
}
int
@ -170,9 +173,8 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
chan->idxd->wq_id = 0;
}
pthread_mutex_lock(&chan->idxd->num_channels_lock);
num_ring_slots = chan->idxd->queues[chan->idxd->wq_id].wqcfg.wq_size / chan->idxd->num_channels;
pthread_mutex_unlock(&chan->idxd->num_channels_lock);
num_ring_slots = chan->idxd->queues[chan->idxd->wq_id].wqcfg.wq_size / CHAN_PER_DEVICE(
chan->idxd->total_wq_size);
chan->ring_slots = spdk_bit_array_create(num_ring_slots);
if (chan->ring_slots == NULL) {
@ -180,13 +182,7 @@ spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
return -ENOMEM;
}
/*
* max ring slots can change as channels come and go but we
* start off getting all of the slots for this work queue.
*/
chan->max_ring_slots = num_ring_slots;
/* Store the original size of the ring. */
/* Store the size of the ring. */
chan->ring_size = num_ring_slots;
chan->desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc),
@ -250,61 +246,6 @@ err_desc:
return rc;
}
static void
_idxd_drain(struct spdk_idxd_io_channel *chan)
{
uint32_t index;
int set = 0;
do {
spdk_idxd_process_events(chan);
set = 0;
for (index = 0; index < chan->max_ring_slots; index++) {
set |= spdk_bit_array_get(chan->ring_slots, index);
}
} while (set);
}
int
spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan)
{
uint32_t num_ring_slots;
int rc;
_idxd_drain(chan);
assert(spdk_bit_array_count_set(chan->ring_slots) == 0);
pthread_mutex_lock(&chan->idxd->num_channels_lock);
assert(chan->idxd->num_channels > 0);
num_ring_slots = chan->ring_size / chan->idxd->num_channels;
/* If no change (ie this was a call from another thread doing its for_each_channel,
* then we can just bail now.
*/
if (num_ring_slots == chan->max_ring_slots) {
pthread_mutex_unlock(&chan->idxd->num_channels_lock);
return 0;
}
pthread_mutex_unlock(&chan->idxd->num_channels_lock);
/* re-allocate our descriptor ring for hw flow control. */
rc = spdk_bit_array_resize(&chan->ring_slots, num_ring_slots);
if (rc < 0) {
SPDK_ERRLOG("Unable to resize channel bit array\n");
return -ENOMEM;
}
chan->max_ring_slots = num_ring_slots;
/*
* Note: The batch descriptor ring does not change with the
* number of channels as descriptors on this ring do not
* "count" for flow control.
*/
return rc;
}
static inline struct spdk_idxd_impl *
idxd_get_impl_by_name(const char *impl_name)
{
@ -1118,6 +1059,10 @@ spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
int rc = 0;
TAILQ_FOREACH_SAFE(comp_ctx, &chan->comp_ctx_oustanding, link, tmp) {
if (rc == MAX_COMPLETIONS_PER_POLL) {
break;
}
if (IDXD_COMPLETION(comp_ctx->hw.status)) {
TAILQ_REMOVE(&chan->comp_ctx_oustanding, comp_ctx, link);

View File

@ -119,15 +119,8 @@ struct spdk_idxd_io_channel {
* We use one bit array to track ring slots for both
* desc and completions.
*
* TODO: We can get rid of the bit array and just use a uint
* to manage flow control as the current implementation saves
* enough info in comp_ctx that it doesn't need the index. Keeping
* the bit arrays for now as (a) they provide some extra debug benefit
* until we have silicon and (b) they may still be needed depending on
* polling implementation experiments that we need to run with real silicon.
*/
struct spdk_bit_array *ring_slots;
uint32_t max_ring_slots;
/* Lists of batches, free and in use. */
TAILQ_HEAD(, idxd_batch) batch_pool;
@ -197,7 +190,7 @@ struct spdk_idxd_device {
void *portals;
int wq_id;
uint32_t num_channels;
bool needs_rebalance;
uint32_t total_wq_size;
pthread_mutex_t num_channels_lock;
struct idxd_group *groups;

View File

@ -52,9 +52,9 @@ struct spdk_user_idxd_device {
void *reg_base;
uint32_t wqcfg_offset;
uint32_t grpcfg_offset;
uint32_t ims_offset;
uint32_t msix_perm_offset;
uint32_t perfmon_offset;
uint32_t ims_offset;
uint32_t msix_perm_offset;
uint32_t perfmon_offset;
};
typedef bool (*spdk_idxd_probe_cb)(void *cb_ctx, struct spdk_pci_device *pci_dev);
@ -279,6 +279,7 @@ idxd_wq_config(struct spdk_user_idxd_device *user_idxd)
assert(LOG2_WQ_MAX_BATCH <= user_idxd->registers.gencap.max_batch_shift);
assert(LOG2_WQ_MAX_XFER <= user_idxd->registers.gencap.max_xfer_shift);
idxd->total_wq_size = user_idxd->registers.wqcap.total_wq_size;
idxd->queues = calloc(1, user_idxd->registers.wqcap.num_wqs * sizeof(struct idxd_wq));
if (idxd->queues == NULL) {
SPDK_ERRLOG("Failed to allocate queue memory\n");

View File

@ -2,9 +2,8 @@
global:
# public functions
spdk_idxd_device_needs_rebalance;
spdk_idxd_chan_get_max_operations;
spdk_idxd_configure_chan;
spdk_idxd_reconfigure_chan;
spdk_idxd_probe;
spdk_idxd_detach;
spdk_idxd_batch_prep_copy;

View File

@ -64,36 +64,64 @@ struct idxd_device {
};
static TAILQ_HEAD(, idxd_device) g_idxd_devices = TAILQ_HEAD_INITIALIZER(g_idxd_devices);
static struct idxd_device *g_next_dev = NULL;
static uint32_t g_num_devices = 0;
struct idxd_io_channel {
struct spdk_idxd_io_channel *chan;
struct idxd_device *dev;
enum channel_state state;
struct spdk_poller *poller;
uint32_t num_outstanding;
uint32_t max_outstanding;
TAILQ_HEAD(, spdk_accel_task) queued_tasks;
};
static struct spdk_io_channel *idxd_get_io_channel(void);
static struct idxd_device *
idxd_select_device(void)
idxd_select_device(struct idxd_io_channel *chan)
{
uint32_t count = 0;
/*
* We allow channels to share underlying devices,
* selection is round-robin based.
* selection is round-robin based with a limitation
* on how many channel can share one device.
*/
do {
/* select next device */
g_next_dev = TAILQ_NEXT(g_next_dev, tailq);
if (g_next_dev == NULL) {
g_next_dev = TAILQ_FIRST(&g_idxd_devices);
}
g_next_dev = TAILQ_NEXT(g_next_dev, tailq);
if (g_next_dev == NULL) {
g_next_dev = TAILQ_FIRST(&g_idxd_devices);
}
return g_next_dev;
/*
* Now see if a channel is available on this one. We only
* allow a specific number of channels to share a device
* to limit outstanding IO for flow control purposes.
*/
chan->chan = spdk_idxd_get_channel(g_next_dev->idxd);
if (chan->chan != NULL) {
chan->max_outstanding = spdk_idxd_chan_get_max_operations(chan->chan);
return g_next_dev;
}
} while (count++ < g_num_devices);
/* we are out of available channels and devices. */
SPDK_ERRLOG("No more DSA devices available!\n");
return NULL;
}
static void
idxd_done(void *cb_arg, int status)
{
struct spdk_accel_task *accel_task = cb_arg;
struct idxd_io_channel *chan = spdk_io_channel_get_ctx(accel_task->accel_ch->engine_ch);
assert(chan->num_outstanding > 0);
if (chan->num_outstanding-- == chan->max_outstanding) {
chan->state = IDXD_CHANNEL_ACTIVE;
}
spdk_accel_task_complete(accel_task, status);
}
@ -106,6 +134,12 @@ _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task)
uint8_t fill_pattern = (uint8_t)task->fill_pattern;
void *src;
if (chan->num_outstanding == chan->max_outstanding) {
chan->state = IDXD_CHANNEL_PAUSED;
return -EBUSY;
}
chan->num_outstanding++;
switch (task->op_code) {
case ACCEL_OPCODE_MEMMOVE:
rc = spdk_idxd_submit_copy(chan->chan, task->dst, task->src, task->nbytes, idxd_done, task);
@ -239,70 +273,6 @@ static struct spdk_accel_engine idxd_accel_engine = {
.submit_tasks = idxd_submit_tasks,
};
/*
* Configure the max number of descriptors that a channel is
* allowed to use based on the total number of current channels.
* This is to allow for dynamic load balancing for hw flow control.
*/
static void
_config_max_desc(struct spdk_io_channel_iter *i)
{
struct idxd_io_channel *chan;
struct spdk_io_channel *ch;
struct spdk_idxd_device *idxd;
int rc;
ch = spdk_io_channel_iter_get_channel(i);
chan = spdk_io_channel_get_ctx(ch);
idxd = spdk_io_channel_iter_get_ctx(i);
/* reconfigure channel only if this channel is on the same idxd
* device that initiated the rebalance.
*/
if (chan->dev->idxd == idxd) {
rc = spdk_idxd_reconfigure_chan(chan->chan);
if (rc == 0) {
chan->state = IDXD_CHANNEL_ACTIVE;
} else {
chan->state = IDXD_CHANNEL_ERROR;
}
}
spdk_for_each_channel_continue(i, 0);
}
/* Pauses a channel so that it can be re-configured. */
static void
_pause_chan(struct spdk_io_channel_iter *i)
{
struct idxd_io_channel *chan;
struct spdk_io_channel *ch;
struct spdk_idxd_device *idxd;
ch = spdk_io_channel_iter_get_channel(i);
chan = spdk_io_channel_get_ctx(ch);
idxd = spdk_io_channel_iter_get_ctx(i);
/* start queueing up new requests if this channel is on the same idxd
* device that initiated the rebalance.
*/
if (chan->dev->idxd == idxd) {
chan->state = IDXD_CHANNEL_PAUSED;
}
spdk_for_each_channel_continue(i, 0);
}
static void
_pause_chan_done(struct spdk_io_channel_iter *i, int status)
{
struct spdk_idxd_device *idxd;
idxd = spdk_io_channel_iter_get_ctx(i);
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, idxd, NULL);
}
static int
idxd_create_cb(void *io_device, void *ctx_buf)
{
@ -310,28 +280,16 @@ idxd_create_cb(void *io_device, void *ctx_buf)
struct idxd_device *dev;
int rc;
dev = idxd_select_device();
dev = idxd_select_device(chan);
if (dev == NULL) {
SPDK_ERRLOG("Failed to allocate idxd_device\n");
SPDK_ERRLOG("Failed to get an idxd channel\n");
return -EINVAL;
}
chan->chan = spdk_idxd_get_channel(dev->idxd);
if (chan->chan == NULL) {
return -ENOMEM;
}
chan->dev = dev;
chan->poller = SPDK_POLLER_REGISTER(idxd_poll, chan, 0);
TAILQ_INIT(&chan->queued_tasks);
/*
* Configure the channel but leave paused until all others
* are paused and re-configured based on the new number of
* channels. This enables dynamic load balancing for HW
* flow control. The idxd device will tell us if rebalance is
* needed based on how many channels are using it.
*/
rc = spdk_idxd_configure_chan(chan->chan);
if (rc) {
SPDK_ERRLOG("Failed to configure new channel rc = %d\n", rc);
@ -340,52 +298,19 @@ idxd_create_cb(void *io_device, void *ctx_buf)
return rc;
}
if (spdk_idxd_device_needs_rebalance(chan->dev->idxd) == false) {
chan->state = IDXD_CHANNEL_ACTIVE;
return 0;
}
chan->state = IDXD_CHANNEL_PAUSED;
/*
* Pause all channels so that we can set proper flow control
* per channel. When all are paused, we'll update the max
* number of descriptors allowed per channel.
*/
spdk_for_each_channel(&idxd_accel_engine, _pause_chan, chan->dev->idxd,
_pause_chan_done);
chan->num_outstanding = 0;
chan->state = IDXD_CHANNEL_ACTIVE;
return 0;
}
static void
_pause_chan_destroy_done(struct spdk_io_channel_iter *i, int status)
{
struct spdk_idxd_device *idxd;
idxd = spdk_io_channel_iter_get_ctx(i);
/* Rebalance the rings with the smaller number of remaining channels, but
* pass the idxd device along so its only done on shared channels.
*/
spdk_for_each_channel(&idxd_accel_engine, _config_max_desc, idxd, NULL);
}
static void
idxd_destroy_cb(void *io_device, void *ctx_buf)
{
struct idxd_io_channel *chan = ctx_buf;
bool rebalance;
spdk_poller_unregister(&chan->poller);
rebalance = spdk_idxd_put_channel(chan->chan);
/* Only rebalance if there are still other channels on this device */
if (rebalance == true) {
/* Pause each channel then rebalance the max number of ring slots. */
spdk_for_each_channel(&idxd_accel_engine, _pause_chan, chan->dev->idxd,
_pause_chan_destroy_done);
}
spdk_idxd_put_channel(chan->chan);
}
static struct spdk_io_channel *
@ -411,6 +336,7 @@ attach_cb(void *cb_ctx, struct spdk_idxd_device *idxd)
}
TAILQ_INSERT_TAIL(&g_idxd_devices, dev, tailq);
g_num_devices++;
}
void

View File

@ -45,7 +45,7 @@ user_idxd_set_config(struct device_config *dev_cfg, uint32_t config_num)
}
static struct spdk_idxd_impl g_user_idxd_impl = {
.name = "user",
.name = "user",
.set_config = user_idxd_set_config,
};
@ -61,32 +61,6 @@ test_spdk_idxd_set_config(void)
return 0;
}
static int
test_spdk_idxd_reconfigure_chan(void)
{
struct spdk_idxd_io_channel chan = {};
struct spdk_idxd_device idxd = {};
int rc;
uint32_t test_ring_size = 8;
uint32_t num_channels = 2;
chan.ring_slots = spdk_bit_array_create(test_ring_size);
chan.ring_size = test_ring_size;
chan.completions = spdk_zmalloc(test_ring_size * sizeof(struct idxd_hw_desc), 0, NULL,
SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
SPDK_CU_ASSERT_FATAL(chan.completions != NULL);
chan.idxd = &idxd;
chan.idxd->num_channels = num_channels;
rc = spdk_idxd_reconfigure_chan(&chan);
CU_ASSERT(rc == 0);
CU_ASSERT(chan.max_ring_slots == test_ring_size / num_channels);
spdk_bit_array_free(&chan.ring_slots);
spdk_free(chan.completions);
return 0;
}
static int
test_setup(void)
{
@ -106,7 +80,6 @@ int main(int argc, char **argv)
suite = CU_add_suite("idxd", test_setup, NULL);
CU_ADD_TEST(suite, test_spdk_idxd_reconfigure_chan);
CU_ADD_TEST(suite, test_spdk_idxd_set_config);
CU_basic_set_mode(CU_BRM_VERBOSE);