[dpdk-dev] [PATCH v5 3/4] vhost: support async dequeue for split ring

Maxime Coquelin maxime.coquelin at redhat.com
Tue Jul 13 16:30:11 CEST 2021



On 7/5/21 8:11 PM, Wenwu Ma wrote:
> From: Yuan Wang <yuanx.wang at intel.com>
> 
> This patch implements asynchronous dequeue data path for split ring.
> A new asynchronous dequeue function is introduced. With this function,
> the application can try to receive packets from the guest with
> offloading large copies to the DMA engine, thus saving precious CPU
> cycles.
> 
> Signed-off-by: Yuan Wang <yuanx.wang at intel.com>
> Signed-off-by: Jiayu Hu <jiayu.hu at intel.com>
> Signed-off-by: Wenwu Ma <wenwux.ma at intel.com>
> ---
>  doc/guides/prog_guide/vhost_lib.rst |  10 +
>  lib/vhost/rte_vhost_async.h         |  44 +-
>  lib/vhost/version.map               |   3 +
>  lib/vhost/virtio_net.c              | 601 ++++++++++++++++++++++++++++
>  4 files changed, 655 insertions(+), 3 deletions(-)
> 
> diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
> index d18fb98910..05c42c9b11 100644
> --- a/doc/guides/prog_guide/vhost_lib.rst
> +++ b/doc/guides/prog_guide/vhost_lib.rst
> @@ -281,6 +281,16 @@ The following is an overview of some key Vhost API functions:
>    Poll enqueue completion status from async data path. Completed packets
>    are returned to applications through ``pkts``.
>  
> +* ``rte_vhost_async_try_dequeue_burst(vid, queue_id, mbuf_pool, pkts, count, nr_inflight)``
> +
> +  Try to receive packets from the guest with offloading large packets
> +  to the DMA engine. Successfully dequeued packets are transfer
> +  completed and returned in ``pkts``. But there may be other packets
> +  that are sent from the guest but being transferred by the DMA engine,
> +  called in-flight packets. This function will return in-flight packets
> +  only after the DMA engine finishes transferring. The amount of
> +  in-flight packets by now is returned in ``nr_inflight``.
> +
>  Vhost-user Implementations
>  --------------------------
>  
> diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> index 6faa31f5ad..58019408f1 100644
> --- a/lib/vhost/rte_vhost_async.h
> +++ b/lib/vhost/rte_vhost_async.h
> @@ -84,13 +84,21 @@ struct rte_vhost_async_channel_ops {
>  };
>  
>  /**
> - * inflight async packet information
> + * in-flight async packet information
>   */
> +struct async_nethdr {

Please prefix with rte_vhost_

> +	struct virtio_net_hdr hdr;
> +	bool valid;
> +};
> +
>  struct async_inflight_info {
>  	struct rte_mbuf *mbuf;
> -	uint16_t descs; /* num of descs inflight */
> +	union {
> +		uint16_t descs; /* num of descs in-flight */
> +		struct async_nethdr nethdr;
> +	};
>  	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> -};
> +} __rte_cache_aligned;

Does it really need to be cache aligned?

>  
>  /**
>   *  dma channel feature bit definition
> @@ -193,4 +201,34 @@ __rte_experimental
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count);
>  
> +/**
> + * This function tries to receive packets from the guest with offloading
> + * large copies to the DMA engine. Successfully dequeued packets are
> + * transfer completed, either by the CPU or the DMA engine, and they are
> + * returned in "pkts". There may be other packets that are sent from
> + * the guest but being transferred by the DMA engine, called in-flight
> + * packets. The amount of in-flight packets by now is returned in
> + * "nr_inflight". This function will return in-flight packets only after
> + * the DMA engine finishes transferring.

I am not sure to understand that comment. Is it still "in-flight" if the
DMA transfer is completed?

Are we ensuring packets are not reordered with this way of working?

> + *
> + * @param vid
> + *  id of vhost device to dequeue data
> + * @param queue_id
> + *  queue id to dequeue data
> + * @param pkts
> + *  blank array to keep successfully dequeued packets
> + * @param count
> + *  size of the packet array
> + * @param nr_inflight
> + *  the amount of in-flight packets by now. If error occurred, its
> + *  value is set to -1.
> + * @return
> + *  num of successfully dequeued packets
> + */
> +__rte_experimental
> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight);
> +
>  #endif /* _RTE_VHOST_ASYNC_H_ */
> diff --git a/lib/vhost/version.map b/lib/vhost/version.map
> index 9103a23cd4..a320f889cd 100644
> --- a/lib/vhost/version.map
> +++ b/lib/vhost/version.map
> @@ -79,4 +79,7 @@ EXPERIMENTAL {
>  
>  	# added in 21.05
>  	rte_vhost_get_negotiated_protocol_features;
> +
> +	# added in 21.08
> +	rte_vhost_async_try_dequeue_burst;
>  };
> diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
> index b93482587c..52237e8600 100644
> --- a/lib/vhost/virtio_net.c
> +++ b/lib/vhost/virtio_net.c
> @@ -2673,6 +2673,32 @@ virtio_dev_pktmbuf_prep(struct virtio_net *dev, struct rte_mbuf *pkt,
>  	return -1;
>  }
>  
> +/*
> + * Allocate a host supported pktmbuf.
> + */
> +static __rte_always_inline struct rte_mbuf *
> +virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
> +			 uint32_t data_len)
> +{
> +	struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
> +
> +	if (unlikely(pkt == NULL)) {
> +		VHOST_LOG_DATA(ERR,
> +			"Failed to allocate memory for mbuf.\n");
> +		return NULL;
> +	}
> +
> +	if (virtio_dev_pktmbuf_prep(dev, pkt, data_len)) {
> +		/* Data doesn't fit into the buffer and the host supports
> +		 * only linear buffers
> +		 */
> +		rte_pktmbuf_free(pkt);
> +		return NULL;
> +	}
> +
> +	return pkt;
> +}
> +

I think you should be able to use rte_pktmbuf_alloc_bulk and
virtio_dev_pktmbuf_prep instead of re-introducing the function that was
removed by Balazs. It should help perf a bit.

>  __rte_always_inline
>  static uint16_t
>  virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
> @@ -3147,3 +3173,578 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
>  
>  	return count;
>  }
> +
> +static __rte_always_inline int
> +async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		  struct buf_vector *buf_vec, uint16_t nr_vec,
> +		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
> +		  struct iovec *src_iovec, struct iovec *dst_iovec,
> +		  struct rte_vhost_iov_iter *src_it,
> +		  struct rte_vhost_iov_iter *dst_it,
> +		  struct async_nethdr *nethdr,
> +		  bool legacy_ol_flags)
> +{
> +	uint64_t buf_addr, buf_iova;
> +	uint64_t mapped_len;
> +	uint32_t tlen = 0;
> +	uint32_t buf_avail, buf_offset, buf_len;
> +	uint32_t mbuf_avail, mbuf_offset;
> +	uint32_t cpy_len, cpy_threshold;
> +	/* A counter to avoid desc dead loop chain */
> +	uint16_t vec_idx = 0;
> +	int tvec_idx = 0;
> +	struct rte_mbuf *cur = m, *prev = m;
> +	struct virtio_net_hdr tmp_hdr;
> +	struct virtio_net_hdr *hdr = NULL;
> +	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
> +
> +	buf_addr = buf_vec[vec_idx].buf_addr;
> +	buf_len = buf_vec[vec_idx].buf_len;
> +	buf_iova = buf_vec[vec_idx].buf_iova;
> +
> +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
> +		return -1;
> +
> +	cpy_threshold = vq->async_threshold;
> +
> +	if (virtio_net_with_host_offload(dev)) {
> +		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
> +			/*
> +			 * No luck, the virtio-net header doesn't fit
> +			 * in a contiguous virtual area.
> +			 */
> +			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
> +			hdr = &tmp_hdr;
> +		} else {
> +			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
> +		}
> +	}
> +
> +	/*
> +	 * A virtio driver normally uses at least 2 desc buffers
> +	 * for Tx: the first for storing the header, and others
> +	 * for storing the data.
> +	 */
> +	if (unlikely(buf_len < dev->vhost_hlen)) {
> +		buf_offset = dev->vhost_hlen - buf_len;
> +		vec_idx++;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +		buf_avail  = buf_len - buf_offset;
> +	} else if (buf_len == dev->vhost_hlen) {
> +		if (unlikely(++vec_idx >= nr_vec))
> +			return -1;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +
> +		buf_offset = 0;
> +		buf_avail = buf_len;
> +	} else {
> +		buf_offset = dev->vhost_hlen;
> +		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
> +	}
> +
> +	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset),
> +			(uint32_t)buf_avail, 0);
> +
> +	mbuf_offset = 0;
> +	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
> +	while (1) {
> +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> +
> +		while (cpy_len && cpy_len >= cpy_threshold) {
> +			void *hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
> +						buf_iova + buf_offset, cpy_len,
> +						&mapped_len);
> +
> +			if (unlikely(!hpa || mapped_len < cpy_threshold))
> +				break;
> +
> +			async_fill_vec(src_iovec + tvec_idx, hpa,
> +				(size_t)mapped_len);
> +			async_fill_vec(dst_iovec + tvec_idx,
> +				(void *)(uintptr_t)rte_pktmbuf_iova_offset(cur,
> +							mbuf_offset),
> +				(size_t)mapped_len);
> +
> +			tvec_idx++;
> +			tlen += (uint32_t)mapped_len;
> +			cpy_len -= (uint32_t)mapped_len;
> +			mbuf_avail -= (uint32_t)mapped_len;
> +			mbuf_offset += (uint32_t)mapped_len;
> +			buf_avail -= (uint32_t)mapped_len;
> +			buf_offset += (uint32_t)mapped_len;
> +		}
> +
> +		if (cpy_len) {
> +			if (vq->batch_copy_nb_elems >= vq->size ||
> +				(hdr && cur == m)) {
> +				rte_memcpy(
> +					rte_pktmbuf_mtod_offset(cur, void *,
> +							mbuf_offset),
> +					(void *)((uintptr_t)(buf_addr +
> +								buf_offset)),
> +					cpy_len);
> +			} else {
> +				batch_copy[vq->batch_copy_nb_elems].dst =
> +					rte_pktmbuf_mtod_offset(cur, void *,
> +							mbuf_offset);
> +				batch_copy[vq->batch_copy_nb_elems].src =
> +					(void *)((uintptr_t)(buf_addr +
> +								buf_offset));
> +				batch_copy[vq->batch_copy_nb_elems].len =
> +					cpy_len;
> +				vq->batch_copy_nb_elems++;
> +			}
> +
> +			mbuf_avail  -= cpy_len;
> +			mbuf_offset += cpy_len;
> +			buf_avail -= cpy_len;
> +			buf_offset += cpy_len;
> +		}
> +
> +		/* This buf reaches to its end, get the next one */
> +		if (buf_avail == 0) {
> +			if (++vec_idx >= nr_vec)
> +				break;
> +
> +			buf_addr = buf_vec[vec_idx].buf_addr;
> +			buf_len = buf_vec[vec_idx].buf_len;
> +
> +			buf_offset = 0;
> +			buf_avail = buf_len;
> +
> +			PRINT_PACKET(dev, (uintptr_t)buf_addr,
> +					(uint32_t)buf_avail, 0);
> +		}
> +
> +		/*
> +		 * This mbuf reaches to its end, get a new one
> +		 * to hold more data.
> +		 */
> +		if (mbuf_avail == 0) {
> +			cur = rte_pktmbuf_alloc(mbuf_pool);
> +			if (unlikely(cur == NULL)) {
> +				VHOST_LOG_DATA(ERR, "Failed to "
> +					"allocate memory for mbuf.\n");
> +				return -1;
> +			}
> +
> +			prev->next = cur;
> +			prev->data_len = mbuf_offset;
> +			m->nb_segs += 1;
> +			m->pkt_len += mbuf_offset;
> +			prev = cur;
> +
> +			mbuf_offset = 0;
> +			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
> +		}
> +	}
> +
> +	prev->data_len = mbuf_offset;
> +	m->pkt_len += mbuf_offset;
> +
> +	if (hdr && tlen) {
> +		nethdr->valid = true;
> +		nethdr->hdr = *hdr;
> +	} else if (hdr)
> +		vhost_dequeue_offload(hdr, m, legacy_ol_flags);
> +
> +	if (tlen) {
> +		async_fill_iter(src_it, tlen, src_iovec, tvec_idx);
> +		async_fill_iter(dst_it, tlen, dst_iovec, tvec_idx);
> +	} else
> +		src_it->count = 0;
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline uint16_t
> +async_poll_dequeue_completed_split(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count, bool legacy_ol_flags)
> +{
> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0;
> +	uint16_t start_idx, pkt_idx, from;
> +	struct async_inflight_info *pkts_info;
> +
> +	pkt_idx = vq->async_pkts_idx & (vq->size - 1);
> +	pkts_info = vq->async_pkts_info;
> +	start_idx = virtio_dev_rx_async_get_info_idx(pkt_idx, vq->size,
> +			vq->async_pkts_inflight_n);
> +
> +	if (count > vq->async_last_pkts_n) {
> +		n_pkts_cpl = vq->async_ops.check_completed_copies(dev->vid,
> +			queue_id, 0, count - vq->async_last_pkts_n);
> +	}
> +
> +	n_pkts_cpl += vq->async_last_pkts_n;
> +	if (unlikely(n_pkts_cpl == 0))
> +		return 0;
> +
> +	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
> +
> +	for (pkt_idx = 0; pkt_idx < n_pkts_put; pkt_idx++) {
> +		from = (start_idx + pkt_idx) & (vq->size - 1);
> +		pkts[pkt_idx] = pkts_info[from].mbuf;
> +
> +		if (pkts_info[from].nethdr.valid) {
> +			vhost_dequeue_offload(&pkts_info[from].nethdr.hdr,
> +					pkts[pkt_idx], legacy_ol_flags);
> +		}
> +	}
> +	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
> +
> +	if (n_pkts_put) {
> +		/* write back completed descs to used ring */
> +		write_back_completed_descs_split(vq, n_pkts_put);
> +		/* update used ring */
> +		__atomic_add_fetch(&vq->used->idx,
> +				n_pkts_put, __ATOMIC_RELEASE);
> +
> +		vq->async_pkts_inflight_n -= n_pkts_put;
> +	}
> +
> +	return n_pkts_put;
> +}
> +
> +static __rte_always_inline uint16_t
> +virtio_dev_tx_async_split(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count, bool legacy_ol_flags)
> +{
> +	static bool allocerr_warned;
> +	uint16_t pkt_idx;
> +	uint16_t free_entries;
> +	uint16_t slot_idx = 0;
> +	uint16_t segs_await = 0;
> +	uint16_t nr_done_pkts = 0, nr_async_pkts = 0, nr_async_cmpl_pkts = 0;
> +	uint16_t nr_async_burst = 0;
> +	uint16_t pkt_err = 0;
> +	uint16_t iovec_idx = 0, it_idx = 0;
> +
> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> +
> +	struct async_pkt_index {
> +		uint16_t last_avail_idx;
> +	} async_pkts_log[MAX_PKT_BURST];
> +
> +	/**
> +	 * The ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) -
> +			vq->last_avail_idx;
> +	if (free_entries == 0)
> +		goto out;
> +
> +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	count = RTE_MIN(count, MAX_PKT_BURST);
> +	count = RTE_MIN(count, free_entries);
> +	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
> +			dev->vid, count);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint16_t head_idx = 0;
> +		uint16_t nr_vec = 0;
> +		uint32_t buf_len;
> +		int err;
> +		struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +		struct rte_mbuf *pkt;
> +
> +		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
> +						&nr_vec, buf_vec,
> +						&head_idx, &buf_len,
> +						VHOST_ACCESS_RO) < 0))
> +			break;
> +
> +		pkt = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
> +		if (unlikely(pkt == NULL)) {
> +			/**
> +			 * mbuf allocation fails for jumbo packets when external
> +			 * buffer allocation is not allowed and linear buffer
> +			 * is required. Drop this packet.
> +			 */
> +			if (!allocerr_warned) {
> +				VHOST_LOG_DATA(ERR,
> +					"Failed mbuf alloc of size %d from %s on %s.\n",
> +					buf_len, mbuf_pool->name, dev->ifname);
> +				allocerr_warned = true;
> +			}
> +			break;
> +		}
> +
> +		slot_idx = (vq->async_pkts_idx + nr_async_pkts) &
> +				(vq->size - 1);
> +		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt,
> +				mbuf_pool, &src_iovec[iovec_idx],
> +				&dst_iovec[iovec_idx], &it_pool[it_idx],
> +				&it_pool[it_idx + 1],
> +				&pkts_info[slot_idx].nethdr, legacy_ol_flags);
> +		if (unlikely(err)) {
> +			rte_pktmbuf_free(pkt);
> +			if (!allocerr_warned) {
> +				VHOST_LOG_DATA(ERR,
> +					"Failed to copy desc to mbuf on %s.\n",
> +					dev->ifname);
> +				allocerr_warned = true;
> +			}
> +			break;
> +		}
> +
> +		if (it_pool[it_idx].count) {
> +			uint16_t to = vq->async_desc_idx_split & (vq->size - 1);
> +
> +			async_fill_desc(&tdes[nr_async_burst], &it_pool[it_idx],
> +				&it_pool[it_idx + 1]);
> +			pkts_info[slot_idx].mbuf = pkt;
> +			async_pkts_log[nr_async_pkts++].last_avail_idx =
> +				vq->last_avail_idx;
> +			nr_async_burst++;
> +			iovec_idx += it_pool[it_idx].nr_segs;
> +			it_idx += 2;
> +			segs_await += it_pool[it_idx].nr_segs;
> +
> +			/* keep used desc */
> +			vq->async_descs_split[to].id = head_idx;
> +			vq->async_descs_split[to].len = 0;
> +			vq->async_desc_idx_split++;
> +		} else {
> +			update_shadow_used_ring_split(vq, head_idx, 0);
> +			pkts[nr_done_pkts++] = pkt;
> +		}
> +
> +		vq->last_avail_idx++;
> +
> +		if (unlikely((nr_async_burst >= VHOST_ASYNC_BATCH_THRESHOLD) ||
> +					((VHOST_MAX_ASYNC_VEC >> 1) -
> +					 segs_await < BUF_VECTOR_MAX))) {
> +			uint16_t nr_pkts;
> +
> +			nr_pkts = vq->async_ops.transfer_data(dev->vid,
> +					queue_id, tdes, 0, nr_async_burst);
> +			src_iovec = vec_pool;
> +			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +			it_idx = 0;
> +			segs_await = 0;
> +			vq->async_pkts_inflight_n += nr_pkts;
> +
> +			if (unlikely(nr_pkts < nr_async_burst)) {
> +				pkt_err = nr_async_burst - nr_pkts;
> +				nr_async_burst = 0;
> +				break;
> +			}
> +			nr_async_burst = 0;
> +		}
> +	}
> +
> +	if (nr_async_burst) {
> +		uint32_t nr_pkts;
> +
> +		nr_pkts = vq->async_ops.transfer_data(dev->vid, queue_id,
> +				tdes, 0, nr_async_burst);
> +		vq->async_pkts_inflight_n += nr_pkts;
> +
> +		if (unlikely(nr_pkts < nr_async_burst))
> +			pkt_err = nr_async_burst - nr_pkts;
> +	}
> +
> +	do_data_copy_dequeue(vq);
> +
> +	if (unlikely(pkt_err)) {
> +		uint16_t nr_err_dma = pkt_err;
> +		uint16_t nr_err_sw;
> +
> +		nr_async_pkts -= nr_err_dma;
> +
> +		/**
> +		 * revert shadow used ring and free pktmbufs for
> +		 * CPU-copied pkts after the first DMA-error pkt.
> +		 */
> +		nr_err_sw = vq->last_avail_idx -
> +			async_pkts_log[nr_async_pkts].last_avail_idx -
> +			nr_err_dma;
> +		vq->shadow_used_idx -= nr_err_sw;
> +		while (nr_err_sw-- > 0)
> +			rte_pktmbuf_free(pkts[--nr_done_pkts]);
> +
> +		/**
> +		 * recover DMA-copy related structures and free pktmbufs
> +		 * for DMA-error pkts.
> +		 */
> +		vq->async_desc_idx_split -= nr_err_dma;
> +		while (nr_err_dma-- > 0) {
> +			rte_pktmbuf_free(
> +				pkts_info[slot_idx & (vq->size - 1)].mbuf);
> +			slot_idx--;
> +		}
> +
> +		/* recover available ring */
> +		vq->last_avail_idx =
> +			async_pkts_log[nr_async_pkts].last_avail_idx;
> +	}
> +
> +	vq->async_pkts_idx += nr_async_pkts;
> +
> +	if (likely(vq->shadow_used_idx))
> +		flush_shadow_used_ring_split(dev, vq);
> +
> +out:
> +	if (nr_done_pkts < count && vq->async_pkts_inflight_n > 0) {
> +		nr_async_cmpl_pkts = async_poll_dequeue_completed_split(dev, vq,
> +					queue_id, &pkts[nr_done_pkts],
> +					count - nr_done_pkts,
> +					legacy_ol_flags);
> +		nr_done_pkts += nr_async_cmpl_pkts;
> +	}
> +	if (likely(nr_done_pkts))
> +		vhost_vring_call_split(dev, vq);
> +
> +	return nr_done_pkts;
> +}
> +
> +__rte_noinline
> +static uint16_t
> +virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count)
> +{
> +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> +				pkts, count, true);

I think we don't need to support legacy offload.
It may be better to have the Vhost example to support the compliant way,
what do you think?

> +}
> +
> +__rte_noinline
> +static uint16_t
> +virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count)
> +{
> +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> +				pkts, count, false);
> +}
> +
> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight)
> +{
> +	struct virtio_net *dev;
> +	struct rte_mbuf *rarp_mbuf = NULL;
> +	struct vhost_virtqueue *vq;
> +	int16_t success = 1;
> +
> +	*nr_inflight = -1;
> +
> +	dev = get_device(vid);
> +	if (!dev)
> +		return 0;
> +
> +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: built-in vhost net backend is disabled.\n",
> +			dev->vid, __func__);
> +		return 0;
> +	}
> +
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
> +		return 0;
> +
> +	if (unlikely(vq->enabled == 0)) {
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (unlikely(!vq->async_registered)) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
> +			dev->vid, __func__, queue_id);
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	if (unlikely(vq->access_ok == 0))
> +		if (unlikely(vring_translate(dev, vq) < 0)) {
> +			count = 0;
> +			goto out_access_unlock;
> +		}
> +
> +	/*
> +	 * Construct a RARP broadcast packet, and inject it to the "pkts"
> +	 * array, to looks like that guest actually send such packet.
> +	 *
> +	 * Check user_send_rarp() for more information.
> +	 *
> +	 * broadcast_rarp shares a cacheline in the virtio_net structure
> +	 * with some fields that are accessed during enqueue and
> +	 * __atomic_compare_exchange_n causes a write if performed compare
> +	 * and exchange. This could result in false sharing between enqueue
> +	 * and dequeue.
> +	 *
> +	 * Prevent unnecessary false sharing by reading broadcast_rarp first
> +	 * and only performing compare and exchange if the read indicates it
> +	 * is likely to be set.
> +	 */
> +	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
> +			__atomic_compare_exchange_n(&dev->broadcast_rarp,
> +			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
> +
> +		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
> +		if (rarp_mbuf == NULL) {
> +			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
> +			count = 0;
> +			goto out;
> +		}
> +		count -= 1;
> +	}
> +
> +	if (unlikely(vq_is_packed(dev)))
> +		return 0;
> +
> +	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
> +		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
> +				mbuf_pool, pkts, count);
> +	else
> +		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
> +				mbuf_pool, pkts, count);
> +
> +out:
> +	*nr_inflight = vq->async_pkts_inflight_n;
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_unlock(vq);
> +
> +out_access_unlock:
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	if (unlikely(rarp_mbuf != NULL)) {
> +		/*
> +		 * Inject it to the head of "pkts" array, so that switch's mac
> +		 * learning table will get updated first.
> +		 */
> +		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
> +		pkts[0] = rarp_mbuf;
> +		count += 1;
> +	}
> +
> +	return count;
> +}
> 



More information about the dev mailing list