[dpdk-dev] [PATCH 2/2] vhost: enhance async enqueue for small	packets
    Jiayu Hu 
    jiayu.hu at intel.com
       
    Fri Dec 11 10:21:40 CET 2020
    
    
  
Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.
This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.
Signed-off-by: Jiayu Hu <jiayu.hu at intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  18 +--
 lib/librte_vhost/vhost.c           |  14 +--
 lib/librte_vhost/vhost.h           |   7 +-
 lib/librte_vhost/vhost_user.c      |   7 +-
 lib/librte_vhost/virtio_net.c      | 240 +++++++++++++++++++++----------------
 5 files changed, 160 insertions(+), 126 deletions(-)
diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 3be4ee4..55c5d1c 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
-	union {
-		uint32_t info;
-		struct {
-			uint16_t descs; /* num of descs inflight */
-			uint16_t segs; /* iov segs inflight */
-		};
-	};
+	struct rte_mbuf *mbuf;
+	uint16_t descs; /* num of descs inflight */
 };
 
 /**
@@ -159,12 +154,17 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param completed_pkts
+ *  array of transfer completed packets
+ * @param num_completed
+ *  num of transfer completed packets
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count);
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **completed_pkts, uint32_t *num_completed);
 
 /**
  * This function checks async completion status for a specific vhost
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index b83cf63..47e378b 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-	if (vq->async_pkts_pending)
-		rte_free(vq->async_pkts_pending);
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
+	if (vq->async_descs_split)
+		rte_free(vq->async_descs_split);
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
-	vq->async_pkts_pending = NULL;
 	vq->async_pkts_info = NULL;
+	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	node = SOCKET_ID_ANY;
 #endif
 
-	vq->async_pkts_pending = rte_malloc_socket(NULL,
-			vq->size * sizeof(uintptr_t),
-			RTE_CACHE_LINE_SIZE, node);
 	vq->async_pkts_info = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+	vq->async_descs_split = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem),
+			RTE_CACHE_LINE_SIZE, node);
+	if (!vq->async_descs_split || !vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 361c9f7..d2076b4 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
 	struct iovec *vec_pool;
 
 	/* async data transfer status */
-	uintptr_t	**async_pkts_pending;
 	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
+	struct vring_used_elem  *async_descs_split;
+	uint16_t async_desc_idx;
+	uint16_t last_async_desc_idx;
 
 	/* vq async features */
 	bool		async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	/* Don't kick guest if we don't reach index specified by guest. */
 	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
 		uint16_t old = vq->signalled_used;
-		uint16_t new = vq->async_pkts_inflight_n ?
-					vq->used->idx:vq->last_used_idx;
+		uint16_t new = vq->last_used_idx;
 		bool signalled_used_valid = vq->signalled_used_valid;
 
 		vq->signalled_used = new;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 45c8ac0..2b00249 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -1967,12 +1967,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
 	} else {
 		rte_free(vq->shadow_used_split);
 		vq->shadow_used_split = NULL;
-		if (vq->async_pkts_pending)
-			rte_free(vq->async_pkts_pending);
+
 		if (vq->async_pkts_info)
 			rte_free(vq->async_pkts_info);
-		vq->async_pkts_pending = NULL;
+		if (vq->async_descs_split)
+			rte_free(vq->async_descs_split);
 		vq->async_pkts_info = NULL;
+		vq->async_descs_split = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index fc654be..93f3d93 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -118,31 +118,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
 }
 
 static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-	struct vhost_virtqueue *vq)
-{
-	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-	if (used_idx + vq->shadow_used_idx <= vq->size) {
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-					  vq->shadow_used_idx);
-	} else {
-		uint16_t size;
-
-		/* update used ring interval [used_idx, vq->size] */
-		size = vq->size - used_idx;
-		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-		/* update the left half used ring interval [0, left_size] */
-		do_flush_shadow_used_ring_split(dev, vq, 0, size,
-					  vq->shadow_used_idx - size);
-	}
-
-	vq->last_used_idx += vq->shadow_used_idx;
-	vq->shadow_used_idx = 0;
-}
-
-static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -1480,7 +1455,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
@@ -1494,10 +1470,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-	uint16_t n_free_slot, slot_idx = 0;
+	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct async_pkt_index {
+		uint16_t pkt_idx;
+		uint16_t last_avail_idx;
+	} async_pkts_log[MAX_PKT_BURST];
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
 
@@ -1534,21 +1515,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			break;
 		}
 
-		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
 		if (src_it->count) {
-			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-			pkt_burst_idx++;
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
 			pkts_info[slot_idx].descs = num_buffers;
-			pkts_info[slot_idx].segs = src_it->nr_segs;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+			async_pkts_log[num_async_pkts++].last_avail_idx =
+				vq->last_avail_idx;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 			segs_await += src_it->nr_segs;
-		} else {
-			pkts_info[slot_idx].info = num_buffers;
-			vq->async_pkts_inflight_n++;
-		}
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_desc_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						num_buffers *
+						sizeof(struct vring_used_elem));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_descs_split[to],
+						&vq->shadow_used_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->async_descs_split,
+						&vq->shadow_used_split[from +
+						size], (num_buffers - size) *
+					   sizeof(struct vring_used_elem));
+			}
+			vq->async_desc_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			completed_pkts[num_done_pkts++] = pkts[pkt_idx];
 
 		vq->last_avail_idx += num_buffers;
 
@@ -1557,9 +1567,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		 * - buffered packet number reaches transfer threshold
 		 * - unused async iov number is less than max vhost vector
 		 */
-		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-			(VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-			BUF_VECTOR_MAX)) {
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
 			src_iovec = vec_pool;
@@ -1567,7 +1577,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			src_it = it_pool;
 			dst_it = it_pool + 1;
 			segs_await = 0;
-			vq->async_pkts_inflight_n += pkt_burst_idx;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < pkt_burst_idx)) {
 				/*
@@ -1587,7 +1597,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		vq->async_pkts_inflight_n += pkt_burst_idx;
+		vq->async_pkts_inflight_n += n_pkts;
 
 		if (unlikely(n_pkts < pkt_burst_idx))
 			pkt_err = pkt_burst_idx - n_pkts;
@@ -1595,32 +1605,32 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 	do_data_copy_enqueue(dev, vq);
 
-	while (unlikely(pkt_err && pkt_idx)) {
-		if (pkts_info[slot_idx].segs)
-			pkt_err--;
-		vq->last_avail_idx -= pkts_info[slot_idx].descs;
-		vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-		vq->async_pkts_inflight_n--;
-		slot_idx = (slot_idx - 1) & (vq->size - 1);
-		pkt_idx--;
-	}
-
-	n_free_slot = vq->size - vq->async_pkts_idx;
-	if (n_free_slot > pkt_idx) {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, pkt_idx * sizeof(uintptr_t));
-		vq->async_pkts_idx += pkt_idx;
-	} else {
-		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-			pkts, n_free_slot * sizeof(uintptr_t));
-		rte_memcpy(&vq->async_pkts_pending[0],
-			&pkts[n_free_slot],
-			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
-		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	if (unlikely(pkt_err)) {
+		uint16_t num_descs = 0;
+
+		num_async_pkts -= pkt_err;
+		/* calculate the sum fo descriptors of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+			slot_idx--;
+		}
+		vq->async_desc_idx -= num_descs;
+		/* recover shadow used ring and available ring */
+		vq->shadow_used_idx -= (vq->last_avail_idx -
+				async_pkts_log[num_async_pkts].last_avail_idx -
+				num_descs);
+		vq->last_avail_idx =
+			async_pkts_log[num_async_pkts].last_avail_idx;
+		pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
 	}
 
-	if (likely(vq->shadow_used_idx))
-		async_flush_shadow_used_ring_split(dev, vq);
+	vq->async_pkts_idx += num_async_pkts;
+	*num_completed = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring_split(dev, vq);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	return pkt_idx;
 }
@@ -1632,8 +1642,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
-	uint16_t n_inflight;
 	struct async_inflight_info *pkts_info;
+	uint16_t from, i;
 
 	if (!dev)
 		return 0;
@@ -1655,8 +1665,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	rte_spinlock_lock(&vq->access_lock);
 
-	n_inflight = vq->async_pkts_inflight_n;
-	pkts_idx = vq->async_pkts_idx;
+	pkts_idx = vq->async_pkts_idx & (vq->size - 1);
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1667,42 +1676,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			queue_id, 0, count - vq->async_last_pkts_n);
 	n_pkts_cpl += vq->async_last_pkts_n;
 
-	rte_smp_wmb();
-
-	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-		if (n_pkts_cpl && pkts_info[info_idx].segs)
-			n_pkts_cpl--;
-		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-			break;
-		n_pkts_put++;
-		n_inflight--;
-		n_descs += pkts_info[info_idx].descs;
-	}
-
-	vq->async_last_pkts_n = n_pkts_cpl;
+	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+	if (unlikely(n_pkts_put == 0)) {
+		vq->async_last_pkts_n = n_pkts_cpl;
+		goto done;
+	}
+
+	for (i = 0; i < n_pkts_put; i++) {
+		from = (start_idx + i) & (vq_size - 1);
+		n_descs += pkts_info[from].descs;
+		pkts[i] = pkts_info[from].mbuf;
+	}
+	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+	vq->async_pkts_inflight_n -= n_pkts_put;
+
+	if (likely(vq->enabled && vq->access_ok)) {
+		uint16_t nr_left = n_descs;
+		uint16_t nr_copy;
+		uint16_t to;
+
+		/* write back completed descriptors to used ring */
+		do {
+			from = vq->last_async_desc_idx & (vq->size - 1);
+			nr_copy = nr_left + from <= vq->size ? nr_left :
+				vq->size - from;
+			to = vq->last_used_idx & (vq->size - 1);
+
+			if (to + nr_copy <= vq->size) {
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						nr_copy *
+						sizeof(struct vring_used_elem));
+			} else {
+				uint16_t size = vq->size - to;
+
+				rte_memcpy(&vq->used->ring[to],
+						&vq->async_descs_split[from],
+						size *
+						sizeof(struct vring_used_elem));
+				rte_memcpy(vq->used->ring,
+						&vq->async_descs_split[from +
+						size], (nr_copy - size) *
+						sizeof(struct vring_used_elem));
+			}
 
-	if (n_pkts_put) {
-		vq->async_pkts_inflight_n = n_inflight;
-		if (likely(vq->enabled && vq->access_ok)) {
-			__atomic_add_fetch(&vq->used->idx,
-					n_descs, __ATOMIC_RELEASE);
-			vhost_vring_call_split(dev, vq);
-		}
+			vq->last_async_desc_idx += nr_copy;
+			vq->last_used_idx += nr_copy;
+			nr_left -= nr_copy;
+		} while (nr_left > 0);
 
-		if (start_idx + n_pkts_put <= vq_size) {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				n_pkts_put * sizeof(uintptr_t));
-		} else {
-			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-				(vq_size - start_idx) * sizeof(uintptr_t));
-			rte_memcpy(&pkts[vq_size - start_idx],
-				vq->async_pkts_pending,
-				(n_pkts_put + start_idx - vq_size) *
-				sizeof(uintptr_t));
-		}
-	}
+		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	} else
+		vq->last_async_desc_idx += n_descs;
 
+done:
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
@@ -1710,7 +1738,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	struct vhost_virtqueue *vq;
 	uint32_t nb_tx = 0;
@@ -1745,7 +1774,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 		nb_tx = 0;
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
-				vq, queue_id, pkts, count);
+				vq, queue_id, pkts, count,
+				completed_pkts, num_completed);
 
 out:
 	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1759,7 +1789,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-		struct rte_mbuf **pkts, uint16_t count)
+		struct rte_mbuf **pkts, uint16_t count,
+		struct rte_mbuf **completed_pkts, uint32_t *num_completed)
 {
 	struct virtio_net *dev = get_device(vid);
 
@@ -1773,7 +1804,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 		return 0;
 	}
 
-	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count,
+			completed_pkts, num_completed);
 }
 
 static inline bool
-- 
2.7.4
    
    
More information about the dev
mailing list