lib/idxd: change how we complete batch desc

Previously we used a counter of our own to make sure all batch
elements plus the batch itself were done before we freed the batch.
This was due to some observations early on that the batch desc
could complete before the individual elements and a lack of clarity
as to whether this was due to the simulator or the fact that
we poll on completions and could therefore "see" completions in
a different order at that time (we were using bit arrays to poll).

Now we use an ordered (in time) list to poll locations so if we
instead put the elements on the list first and then the batch desc
itself we are assured to always "see" them in order provided the
underlying device meets spec which there's no reason to assume it
does not.

This simplifies things a bit at the same time and still assures
that we call list calbacks in order and then the batch callback
without "special" handling.

Signed-off-by: paul luse <paul.e.luse@intel.com>
Change-Id: I4d9e3997786f2116ce6515682b8117799c645f51
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8397
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
paul luse 2021-06-16 20:31:00 -04:00 committed by Jim Harris
parent 91cab3414c
commit f95bd3b6c7
2 changed files with 39 additions and 42 deletions

View File

@ -646,7 +646,7 @@ spdk_idxd_batch_create(struct spdk_idxd_io_channel *chan)
if (!TAILQ_EMPTY(&chan->batch_pool)) {
batch = TAILQ_FIRST(&chan->batch_pool);
batch->index = batch->remaining = 0;
batch->index = 0;
TAILQ_REMOVE(&chan->batch_pool, batch, link);
TAILQ_INSERT_TAIL(&chan->batches, batch, link);
} else {
@ -677,7 +677,6 @@ static void
_free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
{
SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch);
assert(batch->remaining == 0);
TAILQ_REMOVE(&chan->batches, batch, link);
TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
}
@ -690,7 +689,7 @@ spdk_idxd_batch_cancel(struct spdk_idxd_io_channel *chan, struct idxd_batch *bat
return -EINVAL;
}
if (batch->remaining > 0) {
if (batch->index > 0) {
SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n");
return -EINVAL;
}
@ -723,40 +722,42 @@ spdk_idxd_batch_submit(struct spdk_idxd_io_channel *chan, struct idxd_batch *bat
}
}
/* Common prep. */
rc = _idxd_prep_command(chan, cb_fn, cb_arg, &desc, &comp);
if (rc) {
return rc;
}
rc = _vtophys(batch->user_desc, &desc_addr, batch->remaining * sizeof(struct idxd_hw_desc));
if (rc) {
return -EINVAL;
}
/* Command specific. */
desc->opcode = IDXD_OPCODE_BATCH;
desc->desc_list_addr = desc_addr;
desc->desc_count = batch->remaining = batch->index;
comp->batch = batch;
assert(batch->index <= DESC_PER_BATCH);
/* Add the batch elements completion contexts to the outstanding list to be polled. */
for (i = 0 ; i < batch->index; i++) {
TAILQ_INSERT_TAIL(&chan->comp_ctx_oustanding, (struct idxd_comp *)&batch->user_completions[i],
link);
}
/* Add one for the batch desc itself, we use this to determine when
* to free the batch.
*/
batch->remaining++;
/* Common prep. */
rc = _idxd_prep_command(chan, cb_fn, cb_arg, &desc, &comp);
if (rc) {
goto error;
}
rc = _vtophys(batch->user_desc, &desc_addr, batch->index * sizeof(struct idxd_hw_desc));
if (rc) {
rc = -EINVAL;
goto error;
}
/* Command specific. */
desc->opcode = IDXD_OPCODE_BATCH;
desc->desc_list_addr = desc_addr;
desc->desc_count = batch->index;
comp->batch = batch;
assert(batch->index <= DESC_PER_BATCH);
/* Submit operation. */
_submit_to_hw(chan, desc);
SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch);
return 0;
error:
for (i = 0 ; i < batch->index; i++) {
comp = TAILQ_LAST(&chan->comp_ctx_oustanding, comp_head);
TAILQ_REMOVE(&chan->comp_ctx_oustanding, comp, link);
}
return rc;
}
static int
@ -1100,11 +1101,8 @@ spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
spdk_bit_array_clear(chan->ring_slots, comp_ctx->index);
}
if (comp_ctx->batch) {
assert(comp_ctx->batch->remaining > 0);
if (--comp_ctx->batch->remaining == 0) {
_free_batch(comp_ctx->batch, chan);
}
if (comp_ctx->desc->opcode == IDXD_OPCODE_BATCH) {
_free_batch(comp_ctx->batch, chan);
}
} else {
/*

View File

@ -84,7 +84,6 @@ static inline void movdir64b(void *dst, const void *src)
struct idxd_batch {
struct idxd_hw_desc *user_desc;
struct idxd_comp *user_completions;
uint32_t remaining;
uint8_t index;
TAILQ_ENTRY(idxd_batch) link;
};
@ -99,35 +98,35 @@ struct device_config {
struct idxd_comp ;
struct spdk_idxd_io_channel {
struct spdk_idxd_device *idxd;
struct spdk_idxd_device *idxd;
/* The portal is the address that we write descriptors to for submission. */
void *portal;
uint32_t portal_offset;
uint16_t ring_size;
void *portal;
uint32_t portal_offset;
uint16_t ring_size;
/*
* Descriptors and completions share the same index. User descriptors
* (those included in a batch) are managed independently from data descriptors
* and are located in the batch structure.
*/
struct idxd_hw_desc *desc;
struct idxd_comp *completions;
struct idxd_hw_desc *desc;
struct idxd_comp *completions;
/* Current list of oustanding completion addresses to poll. */
TAILQ_HEAD(, idxd_comp) comp_ctx_oustanding;
TAILQ_HEAD(comp_head, idxd_comp) comp_ctx_oustanding;
/*
* We use one bit array to track ring slots for both
* desc and completions.
*
*/
struct spdk_bit_array *ring_slots;
struct spdk_bit_array *ring_slots;
/* Lists of batches, free and in use. */
TAILQ_HEAD(, idxd_batch) batch_pool;
TAILQ_HEAD(, idxd_batch) batches;
TAILQ_HEAD(, idxd_batch) batch_pool;
TAILQ_HEAD(, idxd_batch) batches;
void *batch_base;
void *batch_base;
};
struct pci_dev_id {