[dpdk-dev] [PATCH 11/13] dma/idxd: add data-path job completion functions

Kevin Laatz kevin.laatz at intel.com
Fri Aug 27 19:20:46 CEST 2021


Add the data path functions for gathering completed operations.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz at intel.com>
---
 doc/guides/dmadevs/idxd.rst      |  25 ++++
 drivers/dma/idxd/idxd_common.c   | 235 +++++++++++++++++++++++++++++++
 drivers/dma/idxd/idxd_internal.h |   5 +
 3 files changed, 265 insertions(+)

diff --git a/doc/guides/dmadevs/idxd.rst b/doc/guides/dmadevs/idxd.rst
index 0c4c105e0f..8bf99ef453 100644
--- a/doc/guides/dmadevs/idxd.rst
+++ b/doc/guides/dmadevs/idxd.rst
@@ -209,6 +209,31 @@ device and start the hardware processing of them:
    }
    rte_dmadev_submit(dev_id, vchan);
 
+To retrieve information about completed copies, ``rte_dmadev_completed()`` and
+``rte_dmadev_completed_status()`` APIs should be used. ``rte_dmadev_completed()``
+will return the number of completed operations, along with the index of the last
+successful completed operation and whether or not an error was encountered. If an
+error was encounted, ``rte_dmadev_completed_status()`` must be used to kick the
+device off to continue processing operations and also to gather the status of each
+individual operations which is filled in to the ``status`` array provided as
+parameter by the application.
+
+The following code shows how to retrieve the number of successfully completed
+copies within a burst and then using ``rte_dmadev_completed_status()`` to check
+which operation failed and kick off the device to continue processing operations:
+
+.. code-block:: C
+
+   enum rte_dma_status_code status[COMP_BURST_SZ];
+   uint16_t count, idx, status_count;
+   bool error = 0;
+
+   count = rte_dmadev_completed(dev_id, vchan, COMP_BURST_SZ, &idx, &error);
+
+   if (error){
+      status_count = rte_dmadev_completed_status(dev_id, vchan, COMP_BURST_SZ, &idx, status);
+   }
+
 Filling an Area of Memory
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/drivers/dma/idxd/idxd_common.c b/drivers/dma/idxd/idxd_common.c
index e2ef7b3b95..50b205d92f 100644
--- a/drivers/dma/idxd/idxd_common.c
+++ b/drivers/dma/idxd/idxd_common.c
@@ -144,6 +144,239 @@ idxd_submit(struct rte_dmadev *dev, uint16_t qid __rte_unused)
 	return 0;
 }
 
+static enum rte_dma_status_code
+get_comp_status(struct idxd_completion *c)
+{
+	uint8_t st = c->status;
+	switch (st) {
+	/* successful descriptors are not written back normally */
+	case IDXD_COMP_STATUS_INCOMPLETE:
+	case IDXD_COMP_STATUS_SUCCESS:
+		return RTE_DMA_STATUS_SUCCESSFUL;
+	case IDXD_COMP_STATUS_INVALID_SIZE:
+		return RTE_DMA_STATUS_INVALID_LENGTH;
+	case IDXD_COMP_STATUS_SKIPPED:
+		return RTE_DMA_STATUS_NOT_ATTEMPTED;
+	default: /* TODO - get more detail */
+		return RTE_DMA_STATUS_ERROR_UNKNOWN;
+	}
+}
+
+static __rte_always_inline int
+batch_ok(struct idxd_dmadev *idxd, uint8_t max_ops, enum rte_dma_status_code *status)
+{
+	uint16_t ret;
+	uint8_t bstatus;
+
+	if (max_ops == 0)
+		return 0;
+
+	/* first check if there are any unreturned handles from last time */
+	if (idxd->ids_avail != idxd->ids_returned) {
+		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
+		idxd->ids_returned += ret;
+		if (status)
+			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
+		return ret;
+	}
+
+	if (idxd->batch_idx_read == idxd->batch_idx_write)
+		return 0;
+
+	bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status;
+	/* now check if next batch is complete and successful */
+	if (bstatus == IDXD_COMP_STATUS_SUCCESS) {
+		/* since the batch idx ring stores the start of each batch, pre-increment to lookup
+		 * start of next batch.
+		 */
+		if (++idxd->batch_idx_read > idxd->max_batches)
+			idxd->batch_idx_read = 0;
+		idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read];
+
+		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
+		idxd->ids_returned += ret;
+		if (status)
+			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
+		return ret;
+	}
+	/* check if batch is incomplete */
+	else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE)
+		return 0;
+
+	return -1; /* error case */
+}
+
+static inline uint16_t
+batch_completed(struct idxd_dmadev *idxd, uint8_t max_ops, bool *has_error)
+{
+	uint16_t i;
+	uint16_t b_start, b_end, next_batch;
+
+	int ret = batch_ok(idxd, max_ops, NULL);
+	if (ret >= 0)
+		return ret;
+
+	/* ERROR case, not successful, not incomplete */
+	/* Get the batch size, and special case size 1.
+	 * once we identify the actual failure job, return other jobs, then update
+	 * the batch ring indexes to make it look like the first job of the batch has failed.
+	 * Subsequent calls here will always return zero packets, and the error must be cleared by
+	 * calling the completed_status() function.
+	 */
+	next_batch = (idxd->batch_idx_read + 1);
+	if (next_batch > idxd->max_batches)
+		next_batch = 0;
+	b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
+	b_end = idxd->batch_idx_ring[next_batch];
+
+	if (b_end - b_start == 1) { /* not a batch */
+		*has_error = true;
+		return 0;
+	}
+
+	for (i = b_start; i < b_end; i++) {
+		struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask];
+		if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */
+			break;
+	}
+	ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops);
+	if (ret < max_ops)
+		*has_error = true; /* we got up to the point of error */
+	idxd->ids_avail = idxd->ids_returned += ret;
+
+	/* to ensure we can call twice and just return 0, set start of batch to where we finished */
+	idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret;
+	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
+	if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) {
+		/* copy over the descriptor status to the batch ring as if no batch */
+		uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask;
+		struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx];
+		idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status;
+	}
+
+	return ret;
+}
+
+static uint16_t
+batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
+{
+	uint16_t next_batch;
+
+	int ret = batch_ok(idxd, max_ops, status);
+	if (ret >= 0)
+		return ret;
+
+	/* ERROR case, not successful, not incomplete */
+	/* Get the batch size, and special case size 1.
+	 */
+	next_batch = (idxd->batch_idx_read + 1);
+	if (next_batch > idxd->max_batches)
+		next_batch = 0;
+	const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
+	const uint16_t b_end = idxd->batch_idx_ring[next_batch];
+	const uint16_t b_len = b_end - b_start;
+	if (b_len == 1) {/* not a batch */
+		*status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]);
+		idxd->ids_avail++;
+		idxd->ids_returned++;
+		idxd->batch_idx_read = next_batch;
+		return 1;
+	}
+
+	/* not a single-element batch, need to process more.
+	 * Scenarios:
+	 * 1. max_ops >= batch_size - can fit everything, simple case
+	 *   - loop through completed ops and then add on any not-attempted ones
+	 * 2. max_ops < batch_size - can't fit everything, more complex case
+	 *   - loop through completed/incomplete and stop when hit max_ops
+	 *   - adjust the batch descriptor to update where we stopped, with appropriate bcount
+	 *   - if bcount is to be exactly 1, update the batch descriptor as it will be treated as
+	 *     non-batch next time.
+	 */
+	const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size;
+	for (ret = 0; ret < b_len && ret < max_ops; ret++) {
+		struct idxd_completion *c = (void *)
+				&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
+		status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED;
+	}
+	idxd->ids_avail = idxd->ids_returned += ret;
+
+	/* everything fit */
+	if (ret == b_len) {
+		idxd->batch_idx_read = next_batch;
+		return ret;
+	}
+
+	/* set up for next time, update existing batch descriptor & start idx at batch_idx_read */
+	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
+	if (ret > bcount) {
+		/* we have only incomplete ones - set batch completed size to 0 */
+		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
+		comp->completed_size = 0;
+		/* if there is only one descriptor left, job skipped so set flag appropriately */
+		if (b_len - ret == 1)
+			comp->status = IDXD_COMP_STATUS_SKIPPED;
+	} else {
+		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
+		comp->completed_size -= ret;
+		/* if there is only one descriptor left, copy status info straight to desc */
+		if (comp->completed_size == 1) {
+			struct idxd_completion *c = (void *)
+					&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
+			comp->status = c->status;
+			/* individual descs can be ok without writeback, but not batches */
+			if (comp->status == IDXD_COMP_STATUS_INCOMPLETE)
+				comp->status = IDXD_COMP_STATUS_SUCCESS;
+		} else if (bcount == b_len) {
+			/* check if we still have an error, and clear flag if not */
+			uint16_t i;
+			for (i = b_start + ret; i < b_end; i++) {
+				struct idxd_completion *c = (void *)
+						&idxd->desc_ring[i & idxd->desc_ring_mask];
+				if (c->status > IDXD_COMP_STATUS_SUCCESS)
+					break;
+			}
+			if (i == b_end) /* no errors */
+				comp->status = IDXD_COMP_STATUS_SUCCESS;
+		}
+	}
+
+	return ret;
+}
+
+uint16_t
+idxd_completed(struct rte_dmadev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
+		uint16_t *last_idx, bool *has_error)
+{
+	struct idxd_dmadev *idxd = dev->dev_private;
+	uint16_t batch, ret = 0;
+
+	do {
+		batch = batch_completed(idxd, max_ops - ret, has_error);
+		ret += batch;
+	} while (batch > 0 && *has_error == false);
+
+	*last_idx = idxd->ids_returned - 1;
+	return ret;
+}
+
+uint16_t
+idxd_completed_status(struct rte_dmadev *dev, uint16_t qid __rte_unused, uint16_t max_ops,
+		uint16_t *last_idx, enum rte_dma_status_code *status)
+{
+	struct idxd_dmadev *idxd = dev->dev_private;
+
+	uint16_t batch, ret = 0;
+
+	do {
+		batch = batch_completed_status(idxd, max_ops - ret, &status[ret]);
+		ret += batch;
+	} while (batch > 0);
+
+	*last_idx = idxd->ids_returned - 1;
+	return ret;
+}
+
 int
 idxd_dump(const struct rte_dmadev *dev, FILE *f)
 {
@@ -272,6 +505,8 @@ idxd_dmadev_create(const char *name, struct rte_device *dev,
 	dmadev->copy = idxd_enqueue_copy;
 	dmadev->fill = idxd_enqueue_fill;
 	dmadev->submit = idxd_submit;
+	dmadev->completed = idxd_completed;
+	dmadev->completed_status = idxd_completed_status;
 
 	idxd = rte_malloc_socket(NULL, sizeof(struct idxd_dmadev), 0, dev->numa_node);
 	if (idxd == NULL) {
diff --git a/drivers/dma/idxd/idxd_internal.h b/drivers/dma/idxd/idxd_internal.h
index 6a6c69fd61..4bcfe5372b 100644
--- a/drivers/dma/idxd/idxd_internal.h
+++ b/drivers/dma/idxd/idxd_internal.h
@@ -90,5 +90,10 @@ int idxd_enqueue_copy(struct rte_dmadev *dev, uint16_t qid, rte_iova_t src,
 int idxd_enqueue_fill(struct rte_dmadev *dev, uint16_t qid, uint64_t pattern,
 		rte_iova_t dst, unsigned int length, uint64_t flags);
 int idxd_submit(struct rte_dmadev *dev, uint16_t qid);
+uint16_t idxd_completed(struct rte_dmadev *dev, uint16_t qid, uint16_t max_ops,
+		uint16_t *last_idx, bool *has_error);
+uint16_t idxd_completed_status(struct rte_dmadev *dev, uint16_t qid __rte_unused,
+		uint16_t max_ops, uint16_t *last_idx,
+		enum rte_dma_status_code *status);
 
 #endif /* _IDXD_INTERNAL_H_ */
-- 
2.30.2



More information about the dev mailing list