dma/idxd: add data path job completion

Add the data path functions for gathering completed operations.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Reviewed-by: Conor Walsh <conor.walsh@intel.com>
This commit is contained in:
Kevin Laatz 2021-10-20 16:30:07 +00:00 committed by Thomas Monjalon
parent 3d36a0a1c7
commit 97aeed5637
3 changed files with 272 additions and 1 deletions

View File

@ -143,7 +143,37 @@ Performing Data Copies
~~~~~~~~~~~~~~~~~~~~~~~
Refer to the :ref:`Enqueue / Dequeue APIs <dmadev_enqueue_dequeue>` section of the dmadev library
documentation for details on operation enqueue and submission API usage.
documentation for details on operation enqueue, submission and completion API usage.
It is expected that, for efficiency reasons, a burst of operations will be enqueued to the
device via multiple enqueue calls between calls to the ``rte_dma_submit()`` function.
When gathering completions, ``rte_dma_completed()`` should be used, up until the point an error
occurs in an operation. If an error was encountered, ``rte_dma_completed_status()`` must be used
to kick the device off to continue processing operations and also to gather the status of each
individual operations which is filled in to the ``status`` array provided as parameter by the
application.
The following status codes are supported by IDXD:
* ``RTE_DMA_STATUS_SUCCESSFUL``: The operation was successful.
* ``RTE_DMA_STATUS_INVALID_OPCODE``: The operation failed due to an invalid operation code.
* ``RTE_DMA_STATUS_INVALID_LENGTH``: The operation failed due to an invalid data length.
* ``RTE_DMA_STATUS_NOT_ATTEMPTED``: The operation was not attempted.
* ``RTE_DMA_STATUS_ERROR_UNKNOWN``: The operation failed due to an unspecified error.
The following code shows how to retrieve the number of successfully completed
copies within a burst and then using ``rte_dma_completed_status()`` to check
which operation failed and kick off the device to continue processing operations:
.. code-block:: C
enum rte_dma_status_code status[COMP_BURST_SZ];
uint16_t count, idx, status_count;
bool error = 0;
count = rte_dma_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
if (error){
status_count = rte_dma_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status);
}

View File

@ -141,6 +141,240 @@ idxd_submit(void *dev_private, uint16_t qid __rte_unused)
return 0;
}
static enum rte_dma_status_code
get_comp_status(struct idxd_completion *c)
{
uint8_t st = c->status;
switch (st) {
/* successful descriptors are not written back normally */
case IDXD_COMP_STATUS_INCOMPLETE:
case IDXD_COMP_STATUS_SUCCESS:
return RTE_DMA_STATUS_SUCCESSFUL;
case IDXD_COMP_STATUS_INVALID_OPCODE:
return RTE_DMA_STATUS_INVALID_OPCODE;
case IDXD_COMP_STATUS_INVALID_SIZE:
return RTE_DMA_STATUS_INVALID_LENGTH;
case IDXD_COMP_STATUS_SKIPPED:
return RTE_DMA_STATUS_NOT_ATTEMPTED;
default:
return RTE_DMA_STATUS_ERROR_UNKNOWN;
}
}
static __rte_always_inline int
batch_ok(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
{
uint16_t ret;
uint8_t bstatus;
if (max_ops == 0)
return 0;
/* first check if there are any unreturned handles from last time */
if (idxd->ids_avail != idxd->ids_returned) {
ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
idxd->ids_returned += ret;
if (status)
memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
return ret;
}
if (idxd->batch_idx_read == idxd->batch_idx_write)
return 0;
bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status;
/* now check if next batch is complete and successful */
if (bstatus == IDXD_COMP_STATUS_SUCCESS) {
/* since the batch idx ring stores the start of each batch, pre-increment to lookup
* start of next batch.
*/
if (++idxd->batch_idx_read > idxd->max_batches)
idxd->batch_idx_read = 0;
idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read];
ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
idxd->ids_returned += ret;
if (status)
memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
return ret;
}
/* check if batch is incomplete */
else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE)
return 0;
return -1; /* error case */
}
static inline uint16_t
batch_completed(struct idxd_dmadev *idxd, uint16_t max_ops, bool *has_error)
{
uint16_t i;
uint16_t b_start, b_end, next_batch;
int ret = batch_ok(idxd, max_ops, NULL);
if (ret >= 0)
return ret;
/* ERROR case, not successful, not incomplete */
/* Get the batch size, and special case size 1.
* once we identify the actual failure job, return other jobs, then update
* the batch ring indexes to make it look like the first job of the batch has failed.
* Subsequent calls here will always return zero packets, and the error must be cleared by
* calling the completed_status() function.
*/
next_batch = (idxd->batch_idx_read + 1);
if (next_batch > idxd->max_batches)
next_batch = 0;
b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
b_end = idxd->batch_idx_ring[next_batch];
if (b_end - b_start == 1) { /* not a batch */
*has_error = true;
return 0;
}
for (i = b_start; i < b_end; i++) {
struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask];
if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */
break;
}
ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops);
if (ret < max_ops)
*has_error = true; /* we got up to the point of error */
idxd->ids_avail = idxd->ids_returned += ret;
/* to ensure we can call twice and just return 0, set start of batch to where we finished */
idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret;
idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) {
/* copy over the descriptor status to the batch ring as if no batch */
uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask;
struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx];
idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status;
}
return ret;
}
static uint16_t
batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
{
uint16_t next_batch;
int ret = batch_ok(idxd, max_ops, status);
if (ret >= 0)
return ret;
/* ERROR case, not successful, not incomplete */
/* Get the batch size, and special case size 1.
*/
next_batch = (idxd->batch_idx_read + 1);
if (next_batch > idxd->max_batches)
next_batch = 0;
const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
const uint16_t b_end = idxd->batch_idx_ring[next_batch];
const uint16_t b_len = b_end - b_start;
if (b_len == 1) {/* not a batch */
*status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]);
idxd->ids_avail++;
idxd->ids_returned++;
idxd->batch_idx_read = next_batch;
return 1;
}
/* not a single-element batch, need to process more.
* Scenarios:
* 1. max_ops >= batch_size - can fit everything, simple case
* - loop through completed ops and then add on any not-attempted ones
* 2. max_ops < batch_size - can't fit everything, more complex case
* - loop through completed/incomplete and stop when hit max_ops
* - adjust the batch descriptor to update where we stopped, with appropriate bcount
* - if bcount is to be exactly 1, update the batch descriptor as it will be treated as
* non-batch next time.
*/
const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size;
for (ret = 0; ret < b_len && ret < max_ops; ret++) {
struct idxd_completion *c = (void *)
&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED;
}
idxd->ids_avail = idxd->ids_returned += ret;
/* everything fit */
if (ret == b_len) {
idxd->batch_idx_read = next_batch;
return ret;
}
/* set up for next time, update existing batch descriptor & start idx at batch_idx_read */
idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
if (ret > bcount) {
/* we have only incomplete ones - set batch completed size to 0 */
struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
comp->completed_size = 0;
/* if there is only one descriptor left, job skipped so set flag appropriately */
if (b_len - ret == 1)
comp->status = IDXD_COMP_STATUS_SKIPPED;
} else {
struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
comp->completed_size -= ret;
/* if there is only one descriptor left, copy status info straight to desc */
if (comp->completed_size == 1) {
struct idxd_completion *c = (void *)
&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
comp->status = c->status;
/* individual descs can be ok without writeback, but not batches */
if (comp->status == IDXD_COMP_STATUS_INCOMPLETE)
comp->status = IDXD_COMP_STATUS_SUCCESS;
} else if (bcount == b_len) {
/* check if we still have an error, and clear flag if not */
uint16_t i;
for (i = b_start + ret; i < b_end; i++) {
struct idxd_completion *c = (void *)
&idxd->desc_ring[i & idxd->desc_ring_mask];
if (c->status > IDXD_COMP_STATUS_SUCCESS)
break;
}
if (i == b_end) /* no errors */
comp->status = IDXD_COMP_STATUS_SUCCESS;
}
}
return ret;
}
uint16_t
idxd_completed(void *dev_private, uint16_t qid __rte_unused, uint16_t max_ops,
uint16_t *last_idx, bool *has_error)
{
struct idxd_dmadev *idxd = dev_private;
uint16_t batch, ret = 0;
do {
batch = batch_completed(idxd, max_ops - ret, has_error);
ret += batch;
} while (batch > 0 && *has_error == false);
*last_idx = idxd->ids_returned - 1;
return ret;
}
uint16_t
idxd_completed_status(void *dev_private, uint16_t qid __rte_unused, uint16_t max_ops,
uint16_t *last_idx, enum rte_dma_status_code *status)
{
struct idxd_dmadev *idxd = dev_private;
uint16_t batch, ret = 0;
do {
batch = batch_completed_status(idxd, max_ops - ret, &status[ret]);
ret += batch;
} while (batch > 0);
*last_idx = idxd->ids_returned - 1;
return ret;
}
int
idxd_dump(const struct rte_dma_dev *dev, FILE *f)
{
@ -273,6 +507,8 @@ idxd_dmadev_create(const char *name, struct rte_device *dev,
dmadev->fp_obj->copy = idxd_enqueue_copy;
dmadev->fp_obj->fill = idxd_enqueue_fill;
dmadev->fp_obj->submit = idxd_submit;
dmadev->fp_obj->completed = idxd_completed;
dmadev->fp_obj->completed_status = idxd_completed_status;
idxd = dmadev->data->dev_private;
*idxd = *base_idxd; /* copy over the main fields already passed in */

View File

@ -92,5 +92,10 @@ int idxd_enqueue_copy(void *dev_private, uint16_t qid, rte_iova_t src,
int idxd_enqueue_fill(void *dev_private, uint16_t qid, uint64_t pattern,
rte_iova_t dst, unsigned int length, uint64_t flags);
int idxd_submit(void *dev_private, uint16_t qid);
uint16_t idxd_completed(void *dev_private, uint16_t qid, uint16_t max_ops,
uint16_t *last_idx, bool *has_error);
uint16_t idxd_completed_status(void *dev_private, uint16_t qid __rte_unused,
uint16_t max_ops, uint16_t *last_idx,
enum rte_dma_status_code *status);
#endif /* _IDXD_INTERNAL_H_ */