[dpdk-dev] [PATCH v3 4/6] raw/octeontx2_ep: add enqueue operation

Gavin Hu Gavin.Hu at arm.com
Tue Jan 7 06:55:32 CET 2020


Hi Mahipal,

> -----Original Message-----
> From: Mahipal Challa <mchalla at marvell.com>
> Sent: Monday, January 6, 2020 8:07 PM
> To: dev at dpdk.org
> Cc: jerinj at marvell.com; pathreya at marvell.com; snilla at marvell.com;
> venkatn at marvell.com; Gavin Hu <Gavin.Hu at arm.com>
> Subject: [dpdk-dev] [PATCH v3 4/6] raw/octeontx2_ep: add enqueue
> operation
> 
> Add rawdev enqueue operation for SDP VF devices.
> 
> Signed-off-by: Mahipal Challa <mchalla at marvell.com>
> ---
>  doc/guides/rawdevs/octeontx2_ep.rst       |   6 +
>  drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c | 242
> ++++++++++++++++++++++++++++++
>  drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h |  39 +++++
>  drivers/raw/octeontx2_ep/otx2_ep_rawdev.c |   1 +
>  drivers/raw/octeontx2_ep/otx2_ep_rawdev.h |  20 +++
>  drivers/raw/octeontx2_ep/otx2_ep_vf.c     |  24 +++
>  6 files changed, 332 insertions(+)
> 
> diff --git a/doc/guides/rawdevs/octeontx2_ep.rst
> b/doc/guides/rawdevs/octeontx2_ep.rst
> index 2507fcf..39a7c29 100644
> --- a/doc/guides/rawdevs/octeontx2_ep.rst
> +++ b/doc/guides/rawdevs/octeontx2_ep.rst
> @@ -68,3 +68,9 @@ The following code shows how the device is configured
> 
>     rte_rawdev_configure(dev_id, (rte_rawdev_obj_t)&rdev_info);
> 
> +Performing Data Transfer
> +------------------------
> +
> +To perform data transfer using SDP VF EP rawdev devices use standard
> +``rte_rawdev_enqueue_buffers()`` and ``rte_rawdev_dequeue_buffers()``
> APIs.
> +
> diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c
> b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c
> index 584b818..6910f08 100644
> --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c
> +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c
> @@ -403,3 +403,245 @@
>  	return -ENOMEM;
>  }
> 
> +static inline void
> +sdp_iqreq_delete(struct sdp_device *sdpvf,
> +		struct sdp_instr_queue *iq, uint32_t idx)
> +{
> +	uint32_t reqtype;
> +	void *buf;
> +
> +	buf     = iq->req_list[idx].buf;
> +	reqtype = iq->req_list[idx].reqtype;
> +
> +	switch (reqtype) {
> +	case SDP_REQTYPE_NORESP:
> +		rte_mempool_put(sdpvf->enqdeq_mpool, buf);
> +		otx2_sdp_dbg("IQ buffer freed at idx[%d]", idx);
> +		break;
> +
> +	case SDP_REQTYPE_NORESP_GATHER:
> +	case SDP_REQTYPE_NONE:
> +	default:
> +		otx2_info("This iqreq mode is not supported:%d", reqtype);
> +
> +	}
> +
> +	/* Reset the request list at this index */
> +	iq->req_list[idx].buf = NULL;
> +	iq->req_list[idx].reqtype = 0;
> +}
> +
> +static inline void
> +sdp_iqreq_add(struct sdp_instr_queue *iq, void *buf,
> +		uint32_t reqtype)
> +{
> +	iq->req_list[iq->host_write_index].buf = buf;
> +	iq->req_list[iq->host_write_index].reqtype = reqtype;
> +
> +	otx2_sdp_dbg("IQ buffer added at idx[%d]", iq->host_write_index);
> +
> +}
> +
> +static void
> +sdp_flush_iq(struct sdp_device *sdpvf,
> +		struct sdp_instr_queue *iq,
> +		uint32_t pending_thresh __rte_unused)
> +{
> +	uint32_t instr_processed = 0;
> +
> +	rte_spinlock_lock(&iq->lock);
> +
> +	iq->otx_read_index = sdpvf->fn_list.update_iq_read_idx(iq);
> +	while (iq->flush_index != iq->otx_read_index) {
> +		/* Free the IQ data buffer to the pool */
> +		sdp_iqreq_delete(sdpvf, iq, iq->flush_index);
> +		iq->flush_index =
> +			sdp_incr_index(iq->flush_index, 1, iq->nb_desc);
> +
> +		instr_processed++;
> +	}
> +
> +	iq->stats.instr_processed = instr_processed;
> +	rte_atomic64_sub(&iq->instr_pending, instr_processed);
> +
> +	rte_spinlock_unlock(&iq->lock);
> +}
> +
> +static inline void
> +sdp_ring_doorbell(struct sdp_device *sdpvf __rte_unused,
> +		struct sdp_instr_queue *iq)
> +{
> +	otx2_write64(iq->fill_cnt, iq->doorbell_reg);
> +
> +	/* Make sure doorbell write goes through */
> +	rte_cio_wmb();
I think I commented this in v2.
The barrier should be hoisted up in between the two memory accesses which you want a definite ordering, by CPU or NIC or both. 
I does not guarantee "going through" or completeness(maybe "dsb" can, as it stalls execution until the preceding memory access instruction complete, but that is from the point of CPU, not the memory. The early acknowledge mechanism will induce the CPU to think it is done but in reality it does mean to arrive at the memory, especially the IO device registers). 

Anyway I am not against the comments mentioning "completeness" for "dsb" based barriers, but for io and cio barriers, they are about observed ordering, not about "completeness". 
> +	iq->fill_cnt = 0;
> +
> +}
> +
> +static inline int
> +post_iqcmd(struct sdp_instr_queue *iq, uint8_t *iqcmd)
> +{
> +	uint8_t *iqptr, cmdsize;
> +
> +	/* This ensures that the read index does not wrap around to
> +	 * the same position if queue gets full before OCTEON TX2 could
> +	 * fetch any instr.
> +	 */
> +	if (rte_atomic64_read(&iq->instr_pending) >=
> +			      (int32_t)(iq->nb_desc - 1)) {
> +		otx2_err("IQ is full, pending:%ld",
> +			 (long)rte_atomic64_read(&iq->instr_pending));
> +
> +		return SDP_IQ_SEND_FAILED;
> +	}
> +
> +	/* Copy cmd into iq */
> +	cmdsize = ((iq->iqcmd_64B) ? 64 : 32);
> +	iqptr   = iq->base_addr + (cmdsize * iq->host_write_index);
> +
> +	rte_memcpy(iqptr, iqcmd, cmdsize);
> +
> +	otx2_sdp_dbg("IQ cmd posted @ index:%d", iq->host_write_index);
> +
> +	/* Increment the host write index */
> +	iq->host_write_index =
> +		sdp_incr_index(iq->host_write_index, 1, iq->nb_desc);
> +
> +	iq->fill_cnt++;
> +
> +	/* Flush the command into memory. We need to be sure the data
> +	 * is in memory before indicating that the instruction is
> +	 * pending.
> +	 */
> +	rte_io_wmb();
If both memory accesses are in normal memory, then rte_smp_wmb should be used.
> +	rte_atomic64_inc(&iq->instr_pending);
> +
> +	/* SDP_IQ_SEND_SUCCESS */
> +	return 0;
> +}
> +
> +
> +static int
> +sdp_send_data(struct sdp_device *sdpvf,
> +	      struct sdp_instr_queue *iq, void *cmd)
> +{
> +	uint32_t ret;
> +
> +	/* Lock this IQ command queue before posting instruction */
> +	rte_spinlock_lock(&iq->post_lock);
> +
> +	/* Submit IQ command */
> +	ret = post_iqcmd(iq, cmd);
> +
> +	if (ret == SDP_IQ_SEND_SUCCESS) {
> +		sdp_ring_doorbell(sdpvf, iq);
> +
> +		iq->stats.instr_posted++;
> +		otx2_sdp_dbg("Instr submit success posted: %ld\n",
> +			     (long)iq->stats.instr_posted);
> +
> +	} else {
> +		iq->stats.instr_dropped++;
> +		otx2_err("Instr submit failled, dropped: %ld\n",
> +			 (long)iq->stats.instr_dropped);
> +
> +	}
> +
> +	rte_spinlock_unlock(&iq->post_lock);
> +
> +	return ret;
> +}
> +
> +
> +/* Enqueue requests/packets to SDP IQ queue.
> + * returns number of requests enqueued successfully
> + */
> +int
> +sdp_rawdev_enqueue(struct rte_rawdev *rawdev,
> +		   struct rte_rawdev_buf **buffers __rte_unused,
> +		   unsigned int count, rte_rawdev_obj_t context)
> +{
> +	struct sdp_instr_64B *iqcmd;
> +	struct sdp_instr_queue *iq;
> +	struct sdp_soft_instr *si;
> +	struct sdp_device *sdpvf;
> +
> +	struct sdp_instr_ih ihx;
> +
> +	sdpvf = (struct sdp_device *)rawdev->dev_private;
> +	si = (struct sdp_soft_instr *)context;
> +
> +	iq = sdpvf->instr_queue[si->q_no];
> +
> +	if ((count > 1) || (count < 1)) {
> +		otx2_err("This mode not supported: req[%d]", count);
> +		goto enq_fail;
> +	}
> +
> +	memset(&ihx, 0, sizeof(struct sdp_instr_ih));
> +
> +	iqcmd = &si->command;
> +	memset(iqcmd, 0, sizeof(struct sdp_instr_64B));
> +
> +	iqcmd->dptr = (uint64_t)si->dptr;
> +
> +	/* Populate SDP IH */
> +	ihx.pkind  = sdpvf->pkind;
> +	ihx.fsz    = si->ih.fsz + 8; /* 8B for NIX IH */
> +	ihx.gather = si->ih.gather;
> +
> +	/* Direct data instruction */
> +	ihx.tlen   = si->ih.tlen + ihx.fsz;
> +
> +	switch (ihx.gather) {
> +	case 0: /* Direct data instr */
> +		ihx.tlen = si->ih.tlen + ihx.fsz;
> +		break;
> +
> +	default: /* Gather */
> +		switch (si->ih.gsz) {
> +		case 0: /* Direct gather instr */
> +			otx2_err("Direct Gather instr : not supported");
> +			goto enq_fail;
> +
> +		default: /* Indirect gather instr */
> +			otx2_err("Indirect Gather instr : not supported");
> +			goto enq_fail;
> +		}
> +	}
> +
> +	rte_memcpy(&iqcmd->ih, &ihx, sizeof(uint64_t));
> +	iqcmd->rptr = (uint64_t)si->rptr;
> +	rte_memcpy(&iqcmd->irh, &si->irh, sizeof(uint64_t));
> +
> +	/* Swap FSZ(front data) here, to avoid swapping on OCTEON TX2 side
> */
> +	sdp_swap_8B_data(&iqcmd->rptr, 1);
> +	sdp_swap_8B_data(&iqcmd->irh, 1);
> +
> +	otx2_sdp_dbg("After swapping");
> +	otx2_sdp_dbg("Word0 [dptr]: 0x%016lx", (unsigned long)iqcmd-
> >dptr);
> +	otx2_sdp_dbg("Word1 [ihtx]: 0x%016lx", (unsigned long)iqcmd->ih);
> +	otx2_sdp_dbg("Word2 [rptr]: 0x%016lx", (unsigned long)iqcmd->rptr);
> +	otx2_sdp_dbg("Word3 [irh]: 0x%016lx", (unsigned long)iqcmd->irh);
> +	otx2_sdp_dbg("Word4 [exhdr[0]]: 0x%016lx",
> +			(unsigned long)iqcmd->exhdr[0]);
> +
> +	sdp_iqreq_add(iq, si->dptr, si->reqtype);
> +
> +	if (sdp_send_data(sdpvf, iq, iqcmd)) {
> +		otx2_err("Data send failled :");
> +		sdp_iqreq_delete(sdpvf, iq, iq->host_write_index);
> +		goto enq_fail;
> +	}
> +
> +	if (rte_atomic64_read(&iq->instr_pending) >= 1)
> +		sdp_flush_iq(sdpvf, iq, 1 /*(iq->nb_desc / 2)*/);
> +
> +	/* Return no# of instructions posted successfully. */
> +	return count;
> +
> +enq_fail:
> +	return SDP_IQ_SEND_FAILED;
> +}
> +
> diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h
> b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h
> index 4c28283..b9b7c0b 100644
> --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h
> +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h
> @@ -8,4 +8,43 @@
>  #include <rte_byteorder.h>
>  #include "otx2_ep_rawdev.h"
> 
> +#define SDP_IQ_SEND_FAILED      (-1)
> +#define SDP_IQ_SEND_SUCCESS     (0)
> +
> +
> +static inline uint64_t
> +sdp_endian_swap_8B(uint64_t _d)
> +{
> +	return ((((((uint64_t)(_d)) >>  0) & (uint64_t)0xff) << 56) |
> +		(((((uint64_t)(_d)) >>  8) & (uint64_t)0xff) << 48) |
> +		(((((uint64_t)(_d)) >> 16) & (uint64_t)0xff) << 40) |
> +		(((((uint64_t)(_d)) >> 24) & (uint64_t)0xff) << 32) |
> +		(((((uint64_t)(_d)) >> 32) & (uint64_t)0xff) << 24) |
> +		(((((uint64_t)(_d)) >> 40) & (uint64_t)0xff) << 16) |
> +		(((((uint64_t)(_d)) >> 48) & (uint64_t)0xff) <<  8) |
> +		(((((uint64_t)(_d)) >> 56) & (uint64_t)0xff) <<  0));
> +}
> +
> +static inline void
> +sdp_swap_8B_data(uint64_t *data, uint32_t blocks)
> +{
> +	/* Swap 8B blocks */
> +	while (blocks) {
> +		*data = sdp_endian_swap_8B(*data);
> +		blocks--;
> +		data++;
> +	}
> +}
> +
> +static inline uint32_t
> +sdp_incr_index(uint32_t index, uint32_t count, uint32_t max)
> +{
> +	if ((index + count) >= max)
> +		index = index + count - max;
> +	else
> +		index += count;
> +
> +	return index;
> +}
> +
>  #endif /* _OTX2_EP_ENQDEQ_H_ */
> diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c
> b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c
> index 3db5a74..22a6beb 100644
> --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c
> +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c
> @@ -251,6 +251,7 @@
>  	.dev_start      = sdp_rawdev_start,
>  	.dev_stop       = sdp_rawdev_stop,
>  	.dev_close      = sdp_rawdev_close,
> +	.enqueue_bufs   = sdp_rawdev_enqueue,
>  };
> 
>  static int
> diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h
> b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h
> index a01f48d..8fd06fb 100644
> --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h
> +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h
> @@ -8,6 +8,10 @@
>  #include <rte_byteorder.h>
>  #include <rte_spinlock.h>
> 
> +/* IQ instruction req types */
> +#define SDP_REQTYPE_NONE             (0)
> +#define SDP_REQTYPE_NORESP           (1)
> +#define SDP_REQTYPE_NORESP_GATHER    (2)
> 
>  /* Input Request Header format */
>  struct sdp_instr_irh {
> @@ -128,6 +132,13 @@ struct sdp_instr_list {
>  };
>  #define SDP_IQREQ_LIST_SIZE	(sizeof(struct sdp_instr_list))
> 
> +/* Input Queue statistics. Each input queue has four stats fields. */
> +struct sdp_iq_stats {
> +	uint64_t instr_posted; /* Instructions posted to this queue. */
> +	uint64_t instr_processed; /* Instructions processed in this queue. */
> +	uint64_t instr_dropped; /* Instructions that could not be processed */
> +};
> +
>  /* Structure to define the configuration attributes for each Input queue. */
>  struct sdp_iq_config {
>  	/* Max number of IQs available */
> @@ -195,6 +206,9 @@ struct sdp_instr_queue {
>  	/* Number of instructions pending to be posted to OCTEON TX2. */
>  	uint32_t fill_cnt;
> 
> +	/* Statistics for this input queue. */
> +	struct sdp_iq_stats stats;
> +
>  	/* DMA mapped base address of the input descriptor ring. */
>  	uint64_t base_addr_dma;
> 
> @@ -380,6 +394,8 @@ struct sdp_fn_list {
>  	void (*setup_oq_regs)(struct sdp_device *sdpvf, uint32_t q_no);
> 
>  	int (*setup_device_regs)(struct sdp_device *sdpvf);
> +	uint32_t (*update_iq_read_idx)(struct sdp_instr_queue *iq);
> +
>  	void (*enable_io_queues)(struct sdp_device *sdpvf);
>  	void (*disable_io_queues)(struct sdp_device *sdpvf);
> 
> @@ -458,4 +474,8 @@ struct sdp_device {
>  int sdp_setup_oqs(struct sdp_device *sdpvf, uint32_t oq_no);
>  int sdp_delete_oqs(struct sdp_device *sdpvf, uint32_t oq_no);
> 
> +int sdp_rawdev_enqueue(struct rte_rawdev *dev, struct rte_rawdev_buf
> **buffers,
> +		       unsigned int count, rte_rawdev_obj_t context);
> +
> +
>  #endif /* _OTX2_EP_RAWDEV_H_ */
> diff --git a/drivers/raw/octeontx2_ep/otx2_ep_vf.c
> b/drivers/raw/octeontx2_ep/otx2_ep_vf.c
> index 8e79fe8..c5c0bc3 100644
> --- a/drivers/raw/octeontx2_ep/otx2_ep_vf.c
> +++ b/drivers/raw/octeontx2_ep/otx2_ep_vf.c
> @@ -409,6 +409,28 @@
>  		sdp_vf_disable_oq(sdpvf, q_no);
>  }
> 
> +static uint32_t
> +sdp_vf_update_read_index(struct sdp_instr_queue *iq)
> +{
> +	uint32_t new_idx = rte_read32(iq->inst_cnt_reg);
> +
> +	/* The new instr cnt reg is a 32-bit counter that can roll over.
> +	 * We have noted the counter's initial value at init time into
> +	 * reset_instr_cnt
> +	 */
> +	if (iq->reset_instr_cnt < new_idx)
> +		new_idx -= iq->reset_instr_cnt;
> +	else
> +		new_idx += (0xffffffff - iq->reset_instr_cnt) + 1;
> +
> +	/* Modulo of the new index with the IQ size will give us
> +	 * the new index.
> +	 */
> +	new_idx %= iq->nb_desc;
> +
> +	return new_idx;
> +}
> +
>  int
>  sdp_vf_setup_device(struct sdp_device *sdpvf)
>  {
> @@ -436,6 +458,8 @@
>  	sdpvf->fn_list.setup_oq_regs       = sdp_vf_setup_oq_regs;
> 
>  	sdpvf->fn_list.setup_device_regs   = sdp_vf_setup_device_regs;
> +	sdpvf->fn_list.update_iq_read_idx  = sdp_vf_update_read_index;
> +
>  	sdpvf->fn_list.enable_io_queues    = sdp_vf_enable_io_queues;
>  	sdpvf->fn_list.disable_io_queues   = sdp_vf_disable_io_queues;
> 
> --
> 1.8.3.1



More information about the dev mailing list