[dpdk-dev] [PATCH v3 1/4] vhost: simplify async copy completion

Fu, Patrick patrick.fu at intel.com
Fri Oct 9 13:16:13 CEST 2020


> > On 9/29/20 11:29 AM, Patrick Fu wrote:
> > Current async ops allows check_completed_copies() callback to return
> > arbitrary number of async iov segments finished from backend async
> > devices. This design creates complexity for vhost to handle breaking
> > transfer of a single packet (i.e. transfer completes in the middle
> > of a async descriptor) and prevents application callbacks from
> > leveraging hardware capability to offload the work. Thus, this patch
> > enforces the check_completed_copies() callback to return the number
> > of async memory descriptors, which is aligned with async transfer
> > data ops callbacks. vHost async data path are revised to work with
> > new ops define, which provides a clean and simplified processing.
> >
> > Signed-off-by: Patrick Fu <patrick.fu at intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |  15 ++-
> >  lib/librte_vhost/vhost.c           |  20 ++--
> >  lib/librte_vhost/vhost.h           |   8 +-
> >  lib/librte_vhost/vhost_user.c      |   6 +-
> >  lib/librte_vhost/virtio_net.c      | 151 ++++++++++++-----------------
> >  5 files changed, 90 insertions(+), 110 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> > index cb6284539..c73bd7c99 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
> >  	 * @param max_packets
> >  	 *  max number of packets could be completed
> >  	 * @return
> > -	 *  number of iov segments completed
> > +	 *  number of async descs completed
> >  	 */
> >  	uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
> >  		struct rte_vhost_async_status *opaque_data,
> >  		uint16_t max_packets);
> >  };
> >
> > +/**
> > + * inflight async packet information
> > + */
> > +struct async_inflight_info {
> > +	union {
> > +		uint32_t info;
> > +		struct {
> > +			uint16_t descs; /* num of descs inflight */
> > +			uint16_t segs; /* iov segs inflight */
> > +		};
> > +	};
> 
> I think you can removing the union.
> 
There are some cases in the code that we want to set descs=N and set segs=0. In such cases, reference info=N directly will be faster than setting descs & segs separately.  So I would prefer to keep the union.

> > +};
> > +
> >  /**
> >   *  dma channel feature bit definition
> >   */
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> > index 8f20a0818..eca507836 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct
> vhost_virtqueue *vq)
> >  		rte_free(vq->shadow_used_split);
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >  	}
> >  	rte_free(vq->batch_copy_elems);
> >  	rte_mempool_free(vq->iotlb_pool);
> > @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >  	vq->async_pkts_pending = rte_malloc(NULL,
> >  			vq->size * sizeof(uintptr_t),
> >  			RTE_CACHE_LINE_SIZE);
> > -	vq->async_pending_info = rte_malloc(NULL,
> > -			vq->size * sizeof(uint64_t),
> > +	vq->async_pkts_info = rte_malloc(NULL,
> > +			vq->size * sizeof(struct async_inflight_info),
> >  			RTE_CACHE_LINE_SIZE);
> > -	if (!vq->async_pkts_pending || !vq->async_pending_info) {
> > +	if (!vq->async_pkts_pending || !vq->async_pkts_info) {
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> >
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >
> >  		VHOST_LOG_CONFIG(ERR,
> >  				"async register failed: cannot allocate
> memory for vq data "
> > @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid,
> uint16_t queue_id)
> >  		vq->async_pkts_pending = NULL;
> >  	}
> >
> > -	if (vq->async_pending_info) {
> > -		rte_free(vq->async_pending_info);
> > -		vq->async_pending_info = NULL;
> > +	if (vq->async_pkts_info) {
> > +		rte_free(vq->async_pkts_info);
> > +		vq->async_pkts_info = NULL;
> >  	}
> >
> >  	vq->async_ops.transfer_data = NULL;
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 73a1ed889..155a832c1 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -46,8 +46,6 @@
> >
> >  #define MAX_PKT_BURST 32
> >
> > -#define ASYNC_MAX_POLL_SEG 255
> > -
> >  #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
> >  #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
> >
> > @@ -225,12 +223,10 @@ struct vhost_virtqueue {
> >
> >  	/* async data transfer status */
> >  	uintptr_t	**async_pkts_pending;
> > -	#define		ASYNC_PENDING_INFO_N_MSK 0xFFFF
> > -	#define		ASYNC_PENDING_INFO_N_SFT 16
> > -	uint64_t	*async_pending_info;
> > +	struct async_inflight_info *async_pkts_info;
> >  	uint16_t	async_pkts_idx;
> >  	uint16_t	async_pkts_inflight_n;
> > -	uint16_t	async_last_seg_n;
> > +	uint16_t	async_last_pkts_n;
> >
> >  	/* vq async features */
> >  	bool		async_inorder;
> > diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> > index 501218e19..67d2a2d43 100644
> > --- a/lib/librte_vhost/vhost_user.c
> > +++ b/lib/librte_vhost/vhost_user.c
> > @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net
> **pdev,
> >  		vq->shadow_used_split = NULL;
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >  		vq->async_pkts_pending = NULL;
> > -		vq->async_pending_info = NULL;
> > +		vq->async_pkts_info = NULL;
> >  	}
> >
> >  	rte_free(vq->batch_copy_elems);
> > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> > index bd9303c8a..68ead9a71 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> >  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
> >  }
> >
> > -static __rte_always_inline void
> > -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> > -	struct vhost_virtqueue *vq, uint16_t queue_id,
> > -	uint16_t last_idx, uint16_t shadow_idx)
> > -{
> > -	uint16_t start_idx, pkts_idx, vq_size;
> > -	uint64_t *async_pending_info;
> > -
> > -	pkts_idx = vq->async_pkts_idx;
> > -	async_pending_info = vq->async_pending_info;
> > -	vq_size = vq->size;
> > -	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> > -		vq_size, vq->async_pkts_inflight_n);
> > -
> > -	while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> > -		uint64_t n_seg =
> > -			async_pending_info[(start_idx) & (vq_size - 1)] >>
> > -			ASYNC_PENDING_INFO_N_SFT;
> > -
> > -		while (n_seg)
> > -			n_seg -= vq-
> >async_ops.check_completed_copies(dev->vid,
> > -				queue_id, 0, 1);
> > -	}
> > -
> > -	vq->async_pkts_inflight_n = 0;
> > -	vq->batch_copy_nb_elems = 0;
> > -
> > -	vq->shadow_used_idx = shadow_idx;
> > -	vq->last_avail_idx = last_idx;
> > -}
> > -
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct vhost_virtqueue *vq, uint16_t queue_id,
> > @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> >  	uint16_t num_buffers;
> >  	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > -	uint16_t avail_head, last_idx, shadow_idx;
> > +	uint16_t avail_head;
> >
> >  	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> >  	struct iovec *vec_pool = vq->vec_pool;
> > @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	struct rte_vhost_iov_iter *src_it = it_pool;
> >  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >  	uint16_t n_free_slot, slot_idx;
> > +	uint16_t pkt_err = 0;
> > +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >  	int n_pkts = 0;
> >
> >  	avail_head = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE);
> > -	last_idx = vq->last_avail_idx;
> > -	shadow_idx = vq->shadow_used_idx;
> >
> >  	/*
> >  	 * The ordering between avail index and
> > @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  		if (src_it->count) {
> >  			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
> >  			pkt_burst_idx++;
> > -			vq->async_pending_info[slot_idx] =
> > -				num_buffers | (src_it->nr_segs << 16);
> > +			pkts_info[slot_idx].descs = num_buffers;
> > +			pkts_info[slot_idx].segs = src_it->nr_segs;
> >  			src_iovec += src_it->nr_segs;
> >  			dst_iovec += dst_it->nr_segs;
> >  			src_it += 2;
> >  			dst_it += 2;
> >  		} else {
> > -			vq->async_pending_info[slot_idx] = num_buffers;
> > +			pkts_info[slot_idx].info = num_buffers;
> >  			vq->async_pkts_inflight_n++;
> >  		}
> >
> > @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> >  			src_it = it_pool;
> >  			dst_it = it_pool + 1;
> > +			vq->async_pkts_inflight_n += n_pkts;
> >
> >  			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -				vq->async_pkts_inflight_n +=
> > -					n_pkts > 0 ? n_pkts : 0;
> > -				virtio_dev_rx_async_submit_split_err(dev,
> > -					vq, queue_id, last_idx, shadow_idx);
> > -				return 0;
> > +				/*
> > +				 * log error packets number here and do
> actual
> > +				 * error processing when applications poll
> > +				 * completion
> > +				 */
> > +				pkt_err = pkt_burst_idx - n_pkts;
> > +				pkt_burst_idx = 0;
> > +				break;
> >  			}
> >
> >  			pkt_burst_idx = 0;
> > -			vq->async_pkts_inflight_n += n_pkts;
> >  		}
> >  	}
> >
> >  	if (pkt_burst_idx) {
> >  		n_pkts = vq->async_ops.transfer_data(dev->vid,
> >  				queue_id, tdes, 0, pkt_burst_idx);
> > -		if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> > -			virtio_dev_rx_async_submit_split_err(dev, vq,
> queue_id,
> > -				last_idx, shadow_idx);
> > -			return 0;
> > -		}
> > -
> >  		vq->async_pkts_inflight_n += n_pkts;
> > +
> > +		if (unlikely(n_pkts < (int)pkt_burst_idx))
> > +			pkt_err = pkt_burst_idx - n_pkts;
> >  	}
> >
> >  	do_data_copy_enqueue(dev, vq);
> >
> > +	if (unlikely(pkt_err)) {
> > +		do {
> > +			if (pkts_info[slot_idx].segs)
> > +				pkt_err--;
> > +			vq->last_avail_idx -= pkts_info[slot_idx].descs;
> > +			vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> > +			vq->async_pkts_inflight_n--;
> > +			slot_idx--;
> > +			pkt_idx--;
> > +		} while (pkt_err);
> 
> That does not sound really robust.
> Maybe you could exit also on slot_idx < 0 to avoid out of range
> accesses?
> 
> } while (pkt_err && slot_idx >= 0)
>
slot_idx is unsigned type and it's design to be working like a ring index. But I do need to send a new patch to handle the case that "slot_idx==0".
For the robust, I can add a check: "pkt_idx > 0".

Thanks,

Patrick



More information about the dev mailing list