[dpdk-dev] [PATCH v5 1/4] vhost: abstract and reorganize async split ring code

Jiang, Cheng1 cheng1.jiang at intel.com
Tue Apr 13 11:06:29 CEST 2021


Hi Maxime,

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin at redhat.com>
> Sent: Tuesday, April 13, 2021 3:12 PM
> To: Jiang, Cheng1 <cheng1.jiang at intel.com>; Xia, Chenbo
> <chenbo.xia at intel.com>
> Cc: dev at dpdk.org; Hu, Jiayu <jiayu.hu at intel.com>; Yang, YvonneX
> <yvonnex.yang at intel.com>; Wang, Yinan <yinan.wang at intel.com>; Liu,
> Yong <yong.liu at intel.com>
> Subject: Re: [PATCH v5 1/4] vhost: abstract and reorganize async split ring
> code
> 
> Hi Cheng,
> 
> On 4/12/21 1:34 PM, Cheng Jiang wrote:
> > In order to improve code efficiency and readability when async packed
> > ring support is enabled. This patch abstract some functions like
> > shadow_ring_store and write_back_completed_descs_split. And improve
> > the efficiency of some pointer offset calculation.
> >
> > Signed-off-by: Cheng Jiang <Cheng1.jiang at intel.com>
> > ---
> >  lib/librte_vhost/virtio_net.c | 146
> > +++++++++++++++++++---------------
> >  1 file changed, 84 insertions(+), 62 deletions(-)
> >
> > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c index ff3987860..c43ab0093 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1458,6 +1458,29 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> >  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);  }
> >
> > +static __rte_always_inline void
> > +shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void
> *d_ring,
> > +		uint16_t s_idx, uint16_t d_idx,
> > +		uint16_t count, uint16_t elem_size) {
> > +	if (d_idx + count <= vq->size) {
> > +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> > +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> > +			count * elem_size);
> > +	} else {
> > +		uint16_t size = vq->size - d_idx;
> > +
> > +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> > +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> > +			size * elem_size);
> > +
> > +		rte_memcpy((void *)((uintptr_t)d_ring),
> > +			(void *)((uintptr_t)shadow_ring +
> > +				(s_idx + size) * elem_size),
> > +			(count - size) * elem_size);
> > +	}
> > +}
> > +
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1478,6 +1501,7
> @@
> > virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >  	uint16_t slot_idx = 0;
> >  	uint16_t segs_await = 0;
> > +	uint16_t iovec_idx = 0, it_idx = 0;
> >  	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >  	uint32_t n_pkts = 0, pkt_err = 0;
> >  	uint32_t num_async_pkts = 0, num_done_pkts = 0; @@ -1513,27
> +1537,32
> > @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >
> >  		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> >  				buf_vec, nr_vec, num_buffers,
> > -				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> > +				&src_iovec[iovec_idx],
> > +				&dst_iovec[iovec_idx],
> > +				&src_it[it_idx],
> > +				&dst_it[it_idx]) < 0) {
> >  			vq->shadow_used_idx -= num_buffers;
> >  			break;
> >  		}
> >
> >  		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> >  			(vq->size - 1);
> > -		if (src_it->count) {
> > +		if (src_it[it_idx].count) {
> >  			uint16_t from, to;
> >
> > -			async_fill_desc(&tdes[pkt_burst_idx++], src_it,
> dst_it);
> > +			async_fill_desc(&tdes[pkt_burst_idx++],
> > +				&src_it[it_idx],
> > +				&dst_it[it_idx]);
> >  			pkts_info[slot_idx].descs = num_buffers;
> >  			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> >  			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
> >  			async_pkts_log[num_async_pkts++].last_avail_idx =
> >  				vq->last_avail_idx;
> > -			src_iovec += src_it->nr_segs;
> > -			dst_iovec += dst_it->nr_segs;
> > -			src_it += 2;
> > -			dst_it += 2;
> > -			segs_await += src_it->nr_segs;
> > +
> > +			iovec_idx += src_it[it_idx].nr_segs;
> > +			it_idx += 2;
> > +
> > +			segs_await += src_it[it_idx].nr_segs;
> >
> >  			/**
> >  			 * recover shadow used ring and keep DMA-occupied
> @@ -1541,23
> > +1570,12 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  			 */
> >  			from = vq->shadow_used_idx - num_buffers;
> >  			to = vq->async_desc_idx & (vq->size - 1);
> > -			if (num_buffers + to <= vq->size) {
> > -				rte_memcpy(&vq->async_descs_split[to],
> > -						&vq-
> >shadow_used_split[from],
> > -						num_buffers *
> > -						sizeof(struct
> vring_used_elem));
> > -			} else {
> > -				int size = vq->size - to;
> > -
> > -				rte_memcpy(&vq->async_descs_split[to],
> > -						&vq-
> >shadow_used_split[from],
> > -						size *
> > -						sizeof(struct
> vring_used_elem));
> > -				rte_memcpy(vq->async_descs_split,
> > -						&vq-
> >shadow_used_split[from +
> > -						size], (num_buffers - size) *
> > -					   sizeof(struct vring_used_elem));
> > -			}
> > +
> > +			shadow_ring_store(vq, vq->shadow_used_split,
> > +					vq->async_descs_split,
> > +					from, to, num_buffers,
> > +					sizeof(struct vring_used_elem));
> > +
> 
> I'm not convinced with this rework.
> 
> I think it is good to create a dedicated function for this to simplify this huge
> virtio_dev_rx_async_submit_split() function. But we should have a
> dedicated version for split ring. Having a single function for both split and
> packed ring does not improve readability, and unlikely improve performance.

Sure, I'm agree with you. I will use two functions for split and packed separately in the next version.

> 
> >  			vq->async_desc_idx += num_buffers;
> >  			vq->shadow_used_idx -= num_buffers;
> >  		} else
> > @@ -1575,10 +1593,9 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  			BUF_VECTOR_MAX))) {
> >  			n_pkts = vq->async_ops.transfer_data(dev->vid,
> >  					queue_id, tdes, 0, pkt_burst_idx);
> > -			src_iovec = vec_pool;
> > -			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> > -			src_it = it_pool;
> > -			dst_it = it_pool + 1;
> > +			iovec_idx = 0;
> > +			it_idx = 0;
> > +
> >  			segs_await = 0;
> >  			vq->async_pkts_inflight_n += n_pkts;
> >
> > @@ -1639,6 +1656,43 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_always_inline void
> > +write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> > +n_descs) {
> > +	uint16_t nr_left = n_descs;
> > +	uint16_t nr_copy;
> > +	uint16_t to, from;
> > +
> > +	do {
> > +		from = vq->last_async_desc_idx & (vq->size - 1);
> > +		nr_copy = nr_left + from <= vq->size ? nr_left :
> > +			vq->size - from;
> > +		to = vq->last_used_idx & (vq->size - 1);
> > +
> > +		if (to + nr_copy <= vq->size) {
> > +			rte_memcpy(&vq->used->ring[to],
> > +					&vq->async_descs_split[from],
> > +					nr_copy *
> > +					sizeof(struct vring_used_elem));
> > +		} else {
> > +			uint16_t size = vq->size - to;
> > +
> > +			rte_memcpy(&vq->used->ring[to],
> > +					&vq->async_descs_split[from],
> > +					size *
> > +					sizeof(struct vring_used_elem));
> > +			rte_memcpy(vq->used->ring,
> &vq->used->ring[0] for consistency

It will be fixed in the next version.

> > +					&vq->async_descs_split[from +
> > +					size], (nr_copy - size) *
> > +					sizeof(struct vring_used_elem));
> 
> Lines can now be up to 100 chars.
> Please take the opportunity to indent properly not to have parts of each args
> being put on the same line. It will help readability.

Ok, glad to know. I will fix them in the next version.

Thanks a lot.
Cheng

> 
> > +		}
> > +
> > +		vq->last_async_desc_idx += nr_copy;
> > +		vq->last_used_idx += nr_copy;
> > +		nr_left -= nr_copy;
> > +	} while (nr_left > 0);
> > +}
> > +
> >  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  		struct rte_mbuf **pkts, uint16_t count)  { @@ -1695,39
> +1749,7 @@
> > uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  	vq->async_pkts_inflight_n -= n_pkts_put;
> >
> >  	if (likely(vq->enabled && vq->access_ok)) {
> > -		uint16_t nr_left = n_descs;
> > -		uint16_t nr_copy;
> > -		uint16_t to;
> > -
> > -		/* write back completed descriptors to used ring */
> > -		do {
> > -			from = vq->last_async_desc_idx & (vq->size - 1);
> > -			nr_copy = nr_left + from <= vq->size ? nr_left :
> > -				vq->size - from;
> > -			to = vq->last_used_idx & (vq->size - 1);
> > -
> > -			if (to + nr_copy <= vq->size) {
> > -				rte_memcpy(&vq->used->ring[to],
> > -						&vq-
> >async_descs_split[from],
> > -						nr_copy *
> > -						sizeof(struct
> vring_used_elem));
> > -			} else {
> > -				uint16_t size = vq->size - to;
> > -
> > -				rte_memcpy(&vq->used->ring[to],
> > -						&vq-
> >async_descs_split[from],
> > -						size *
> > -						sizeof(struct
> vring_used_elem));
> > -				rte_memcpy(vq->used->ring,
> > -						&vq->async_descs_split[from
> +
> > -						size], (nr_copy - size) *
> > -						sizeof(struct
> vring_used_elem));
> > -			}
> > -
> > -			vq->last_async_desc_idx += nr_copy;
> > -			vq->last_used_idx += nr_copy;
> > -			nr_left -= nr_copy;
> > -		} while (nr_left > 0);
> > +		write_back_completed_descs_split(vq, n_descs);
> >
> >  		__atomic_add_fetch(&vq->used->idx, n_descs,
> __ATOMIC_RELEASE);
> >  		vhost_vring_call_split(dev, vq);
> >



More information about the dev mailing list