[dpdk-dev] [PATCH v5 12/13] vhost: optimize packed ring dequeue

Marvin Liu yong.liu at intel.com
Tue Oct 15 16:30:13 CEST 2019


Optimize vhost device packed ring dequeue function by splitting batch
and single functions. No-chained and direct descriptors will be handled
by batch and other will be handled by single as before.

Signed-off-by: Marvin Liu <yong.liu at intel.com>

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 01d1603e3..85ccc02da 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -201,69 +201,6 @@ vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
 	vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
 }
 
-static __rte_always_inline void
-flush_shadow_used_ring_packed(struct virtio_net *dev,
-			struct vhost_virtqueue *vq)
-{
-	int i;
-	uint16_t used_idx = vq->last_used_idx;
-	uint16_t head_idx = vq->last_used_idx;
-	uint16_t head_flags = 0;
-
-	/* Split loop in two to save memory barriers */
-	for (i = 0; i < vq->shadow_used_idx; i++) {
-		vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
-		vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
-
-		used_idx += vq->shadow_used_packed[i].count;
-		if (used_idx >= vq->size)
-			used_idx -= vq->size;
-	}
-
-	rte_smp_wmb();
-
-	for (i = 0; i < vq->shadow_used_idx; i++) {
-		uint16_t flags;
-
-		if (vq->shadow_used_packed[i].len)
-			flags = VRING_DESC_F_WRITE;
-		else
-			flags = 0;
-
-		if (vq->used_wrap_counter) {
-			flags |= VRING_DESC_F_USED;
-			flags |= VRING_DESC_F_AVAIL;
-		} else {
-			flags &= ~VRING_DESC_F_USED;
-			flags &= ~VRING_DESC_F_AVAIL;
-		}
-
-		if (i > 0) {
-			vq->desc_packed[vq->last_used_idx].flags = flags;
-
-			vhost_log_cache_used_vring(dev, vq,
-					vq->last_used_idx *
-					sizeof(struct vring_packed_desc),
-					sizeof(struct vring_packed_desc));
-		} else {
-			head_idx = vq->last_used_idx;
-			head_flags = flags;
-		}
-
-		vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
-	}
-
-	vq->desc_packed[head_idx].flags = head_flags;
-
-	vhost_log_cache_used_vring(dev, vq,
-				head_idx *
-				sizeof(struct vring_packed_desc),
-				sizeof(struct vring_packed_desc));
-
-	vq->shadow_used_idx = 0;
-	vhost_log_cache_sync(dev, vq);
-}
-
 static __rte_always_inline void
 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
 				  struct vhost_virtqueue *vq,
@@ -328,17 +265,6 @@ vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
 	vq_inc_last_used_packed(vq, count);
 }
 
-static __rte_always_inline void
-update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
-			 uint16_t desc_idx, uint32_t len, uint16_t count)
-{
-	uint16_t i = vq->shadow_used_idx++;
-
-	vq->shadow_used_packed[i].id  = desc_idx;
-	vq->shadow_used_packed[i].len = len;
-	vq->shadow_used_packed[i].count = count;
-}
-
 static inline void
 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
 {
@@ -395,7 +321,7 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
 	}
 }
 
-static __rte_unused void
+static __rte_always_inline void
 vhost_flush_dequeue_packed(struct virtio_net *dev,
 			   struct vhost_virtqueue *vq)
 {
@@ -1799,7 +1725,7 @@ vhost_reserve_avail_batch_packed(struct virtio_net *dev,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed(struct virtio_net *dev,
 			   struct vhost_virtqueue *vq,
 			   struct rte_mempool *mbuf_pool,
@@ -1866,7 +1792,7 @@ vhost_dequeue_single_packed(struct virtio_net *dev,
 	return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed(struct virtio_net *dev,
 			    struct vhost_virtqueue *vq,
 			    struct rte_mempool *mbuf_pool,
@@ -1886,7 +1812,7 @@ virtio_dev_tx_single_packed(struct virtio_net *dev,
 	return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
 				 struct vhost_virtqueue *vq,
 				 struct rte_mempool *mbuf_pool,
@@ -1935,7 +1861,7 @@ virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
 				  struct vhost_virtqueue *vq,
 				  struct rte_mempool *mbuf_pool,
@@ -2003,114 +1929,78 @@ free_zmbuf(struct vhost_virtqueue *vq)
 }
 
 static __rte_noinline uint16_t
-virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+virtio_dev_tx_packed_zmbuf(struct virtio_net *dev,
+			   struct vhost_virtqueue *vq,
+			   struct rte_mempool *mbuf_pool,
+			   struct rte_mbuf **pkts,
+			   uint32_t count)
 {
-	uint16_t i;
-
-	if (unlikely(dev->dequeue_zero_copy)) {
-		struct zcopy_mbuf *zmbuf, *next;
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
 
-		for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-		     zmbuf != NULL; zmbuf = next) {
-			next = TAILQ_NEXT(zmbuf, next);
+	free_zmbuf(vq);
 
-			if (mbuf_is_consumed(zmbuf->mbuf)) {
-				update_shadow_used_ring_packed(vq,
-						zmbuf->desc_idx,
-						0,
-						zmbuf->desc_count);
-
-				TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-				restore_mbuf(zmbuf->mbuf);
-				rte_pktmbuf_free(zmbuf->mbuf);
-				put_zmbuf(zmbuf);
-				vq->nr_zmbuf -= 1;
+	do {
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (virtio_dev_tx_batch_packed_zmbuf(dev, vq,
+							     mbuf_pool,
+							     &pkts[pkt_idx])) {
+				pkt_idx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
 			}
 		}
 
-		if (likely(vq->shadow_used_idx)) {
-			flush_shadow_used_ring_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
-	}
+		if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
+						      &pkts[pkt_idx]))
+			break;
+		pkt_idx++;
+		remained--;
 
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
+	} while (remained);
 
-	count = RTE_MIN(count, MAX_PKT_BURST);
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
-			dev->vid, count);
+	if (pkt_idx)
+		vhost_vring_call_packed(dev, vq);
 
-	for (i = 0; i < count; i++) {
-		struct buf_vector buf_vec[BUF_VECTOR_MAX];
-		uint16_t buf_id;
-		uint32_t dummy_len;
-		uint16_t desc_count, nr_vec = 0;
-		int err;
+	return pkt_idx;
+}
 
-		if (unlikely(fill_vec_buf_packed(dev, vq,
-						vq->last_avail_idx, &desc_count,
-						buf_vec, &nr_vec,
-						&buf_id, &dummy_len,
-						VHOST_ACCESS_RO) < 0))
-			break;
+static __rte_noinline uint16_t
+virtio_dev_tx_packed(struct virtio_net *dev,
+		     struct vhost_virtqueue *vq,
+		     struct rte_mempool *mbuf_pool,
+		     struct rte_mbuf **pkts,
+		     uint32_t count)
+{
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
 
-		if (likely(dev->dequeue_zero_copy == 0))
-			update_shadow_used_ring_packed(vq, buf_id, 0,
-					desc_count);
+	do {
+		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
 
-		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
-		if (unlikely(pkts[i] == NULL)) {
-			RTE_LOG(ERR, VHOST_DATA,
-				"Failed to allocate memory for mbuf.\n");
-			break;
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
+							&pkts[pkt_idx])) {
+				vhost_flush_dequeue_packed(dev, vq);
+				pkt_idx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
+			}
 		}
 
-		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-				mbuf_pool);
-		if (unlikely(err)) {
-			rte_pktmbuf_free(pkts[i]);
+		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
+						&pkts[pkt_idx]))
 			break;
-		}
-
-		if (unlikely(dev->dequeue_zero_copy)) {
-			struct zcopy_mbuf *zmbuf;
-
-			zmbuf = get_zmbuf(vq);
-			if (!zmbuf) {
-				rte_pktmbuf_free(pkts[i]);
-				break;
-			}
-			zmbuf->mbuf = pkts[i];
-			zmbuf->desc_idx = buf_id;
-			zmbuf->desc_count = desc_count;
-
-			/*
-			 * Pin lock the mbuf; we will check later to see
-			 * whether the mbuf is freed (when we are the last
-			 * user) or not. If that's the case, we then could
-			 * update the used ring safely.
-			 */
-			rte_mbuf_refcnt_update(pkts[i], 1);
-
-			vq->nr_zmbuf += 1;
-			TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
-		}
+		vhost_flush_dequeue_packed(dev, vq);
+		pkt_idx++;
+		remained--;
 
-		vq_inc_last_avail_packed(vq, desc_count);
-	}
+	} while (remained);
 
-	if (likely(dev->dequeue_zero_copy == 0)) {
+	if (vq->shadow_used_idx)
 		do_data_copy_dequeue(vq);
-		if (unlikely(i < count))
-			vq->shadow_used_idx = i;
-		if (likely(vq->shadow_used_idx)) {
-			flush_shadow_used_ring_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
-	}
 
-	return i;
+	return pkt_idx;
 }
 
 uint16_t
@@ -2186,9 +2076,14 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 		count -= 1;
 	}
 
-	if (vq_is_packed(dev))
-		count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
-	else
+	if (vq_is_packed(dev)) {
+		if (unlikely(dev->dequeue_zero_copy))
+			count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
+							   pkts, count);
+		else
+			count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
+						     count);
+	} else
 		count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
 
 out:
-- 
2.17.1



More information about the dev mailing list