[dpdk-dev] [PATCH v2 1/7] vhost: refactor rte_vhost_dequeue_burst

Yuanhan Liu yuanhan.liu at linux.intel.com
Thu Feb 18 14:49:06 CET 2016


The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!

However, rte_vhost_dequeue_burst() acutally does a simple job: copy
the packet data from vring desc to mbuf. What's tricky here is:

- desc buff could be chained (by desc->next field), so that you need
  fetch next one if current is wholly drained.

- One mbuf could not be big enough to hold all desc buff, hence you
  need to chain the mbuf as well, by the mbuf->next field.

Even though, the logic could be simple. Here is the pseudo code.

	while (this_desc_is_not_drained_totally || has_next_desc) {
		if (this_desc_has_drained_totally) {
			this_desc = next_desc();
		}

		if (mbuf_has_no_room) {
			mbuf = allocate_a_new_mbuf();
		}

		COPY(mbuf, desc);
	}

And this is how I refactored rte_vhost_dequeue_burst.

Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
and desc_offset var:

	desc_avail  = desc->len - vq->vhost_hlen;
	desc_offset = vq->vhost_hlen;

This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).

Signed-off-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
---

v2: - fix potential NULL dereference bug of var "prev" and "head"
---
 lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
 1 file changed, 116 insertions(+), 181 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
 	}
 }
 
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+	struct vring_desc *desc;
+	uint64_t desc_addr;
+	uint32_t desc_avail, desc_offset;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct rte_mbuf *head = NULL;
+	struct rte_mbuf *cur = NULL, *prev = NULL;
+	struct virtio_net_hdr *hdr;
+
+	desc = &vq->desc[desc_idx];
+	desc_addr = gpa_to_vva(dev, desc->addr);
+	rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+	/* Retrieve virtio net header */
+	hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+	desc_avail  = desc->len - vq->vhost_hlen;
+	desc_offset = vq->vhost_hlen;
+
+	mbuf_avail  = 0;
+	mbuf_offset = 0;
+	while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+		/* This desc reachs to its end, get the next one */
+		if (desc_avail == 0) {
+			desc = &vq->desc[desc->next];
+
+			desc_addr = gpa_to_vva(dev, desc->addr);
+			rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+			desc_offset = 0;
+			desc_avail  = desc->len;
+
+			PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+		}
+
+		/*
+		 * This mbuf reachs to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(!cur)) {
+				RTE_LOG(ERR, VHOST_DATA, "Failed to "
+					"allocate memory for mbuf.\n");
+				if (head)
+					rte_pktmbuf_free(head);
+				return NULL;
+			}
+			if (!head) {
+				head = cur;
+			} else {
+				prev->next = cur;
+				prev->data_len = mbuf_offset;
+				head->nb_segs += 1;
+			}
+			head->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+
+		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+		rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
+			(void *)((uintptr_t)(desc_addr + desc_offset)),
+			cpy_len);
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		desc_avail  -= cpy_len;
+		desc_offset += cpy_len;
+	}
+
+	if (prev) {
+		prev->data_len = mbuf_offset;
+		head->pkt_len += mbuf_offset;
+
+		if (hdr->flags != 0 || hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE)
+			vhost_dequeue_offload(hdr, head);
+	}
+
+	return head;
+}
+
 uint16_t
 rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
 {
-	struct rte_mbuf *m, *prev;
 	struct vhost_virtqueue *vq;
-	struct vring_desc *desc;
-	uint64_t vb_addr = 0;
-	uint64_t vb_net_hdr_addr = 0;
-	uint32_t head[MAX_PKT_BURST];
+	uint32_t desc_indexes[MAX_PKT_BURST];
 	uint32_t used_idx;
 	uint32_t i;
-	uint16_t free_entries, entry_success = 0;
+	uint16_t free_entries;
 	uint16_t avail_idx;
-	struct virtio_net_hdr *hdr = NULL;
+	struct rte_mbuf *m;
 
 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
 		RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
 		return 0;
 
 	avail_idx =  *((volatile uint16_t *)&vq->avail->idx);
-
-	/* If there are no available buffers then return. */
-	if (vq->last_used_idx == avail_idx)
+	free_entries = avail_idx - vq->last_used_idx;
+	if (free_entries == 0)
 		return 0;
 
-	LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
-		dev->device_fh);
+	LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);
 
-	/* Prefetch available ring to retrieve head indexes. */
-	rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+	used_idx = vq->last_used_idx & (vq->size -1);
 
-	/*get the number of free entries in the ring*/
-	free_entries = (avail_idx - vq->last_used_idx);
+	/* Prefetch available ring to retrieve head indexes. */
+	rte_prefetch0(&vq->avail->ring[used_idx]);
 
-	free_entries = RTE_MIN(free_entries, count);
-	/* Limit to MAX_PKT_BURST. */
-	free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
+	count = RTE_MIN(count, MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequeue %u buffers\n",
+			dev->device_fh, count);
 
-	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
-			dev->device_fh, free_entries);
 	/* Retrieve all of the head indexes first to avoid caching issues. */
-	for (i = 0; i < free_entries; i++)
-		head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+	for (i = 0; i < count; i++) {
+		desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+					(vq->size - 1)];
+	}
 
 	/* Prefetch descriptor index. */
-	rte_prefetch0(&vq->desc[head[entry_success]]);
+	rte_prefetch0(&vq->desc[desc_indexes[0]]);
 	rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
 
-	while (entry_success < free_entries) {
-		uint32_t vb_avail, vb_offset;
-		uint32_t seg_avail, seg_offset;
-		uint32_t cpy_len;
-		uint32_t seg_num = 0;
-		struct rte_mbuf *cur;
-		uint8_t alloc_err = 0;
-
-		desc = &vq->desc[head[entry_success]];
-
-		vb_net_hdr_addr = gpa_to_vva(dev, desc->addr);
-		hdr = (struct virtio_net_hdr *)((uintptr_t)vb_net_hdr_addr);
-
-		/* Discard first buffer as it is the virtio header */
-		if (desc->flags & VRING_DESC_F_NEXT) {
-			desc = &vq->desc[desc->next];
-			vb_offset = 0;
-			vb_avail = desc->len;
-		} else {
-			vb_offset = vq->vhost_hlen;
-			vb_avail = desc->len - vb_offset;
-		}
-
-		/* Buffer address translation. */
-		vb_addr = gpa_to_vva(dev, desc->addr);
-		/* Prefetch buffer address. */
-		rte_prefetch0((void *)(uintptr_t)vb_addr);
-
-		used_idx = vq->last_used_idx & (vq->size - 1);
-
-		if (entry_success < (free_entries - 1)) {
-			/* Prefetch descriptor index. */
-			rte_prefetch0(&vq->desc[head[entry_success+1]]);
-			rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
-		}
-
-		/* Update used index buffer information. */
-		vq->used->ring[used_idx].id = head[entry_success];
-		vq->used->ring[used_idx].len = 0;
-
-		/* Allocate an mbuf and populate the structure. */
-		m = rte_pktmbuf_alloc(mbuf_pool);
-		if (unlikely(m == NULL)) {
-			RTE_LOG(ERR, VHOST_DATA,
-				"Failed to allocate memory for mbuf.\n");
-			break;
-		}
-		seg_offset = 0;
-		seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
-		cpy_len = RTE_MIN(vb_avail, seg_avail);
-
-		PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
-		seg_num++;
-		cur = m;
-		prev = m;
-		while (cpy_len != 0) {
-			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
-				(void *)((uintptr_t)(vb_addr + vb_offset)),
-				cpy_len);
-
-			seg_offset += cpy_len;
-			vb_offset += cpy_len;
-			vb_avail -= cpy_len;
-			seg_avail -= cpy_len;
-
-			if (vb_avail != 0) {
-				/*
-				 * The segment reachs to its end,
-				 * while the virtio buffer in TX vring has
-				 * more data to be copied.
-				 */
-				cur->data_len = seg_offset;
-				m->pkt_len += seg_offset;
-				/* Allocate mbuf and populate the structure. */
-				cur = rte_pktmbuf_alloc(mbuf_pool);
-				if (unlikely(cur == NULL)) {
-					RTE_LOG(ERR, VHOST_DATA, "Failed to "
-						"allocate memory for mbuf.\n");
-					rte_pktmbuf_free(m);
-					alloc_err = 1;
-					break;
-				}
-
-				seg_num++;
-				prev->next = cur;
-				prev = cur;
-				seg_offset = 0;
-				seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
-			} else {
-				if (desc->flags & VRING_DESC_F_NEXT) {
-					/*
-					 * There are more virtio buffers in
-					 * same vring entry need to be copied.
-					 */
-					if (seg_avail == 0) {
-						/*
-						 * The current segment hasn't
-						 * room to accomodate more
-						 * data.
-						 */
-						cur->data_len = seg_offset;
-						m->pkt_len += seg_offset;
-						/*
-						 * Allocate an mbuf and
-						 * populate the structure.
-						 */
-						cur = rte_pktmbuf_alloc(mbuf_pool);
-						if (unlikely(cur == NULL)) {
-							RTE_LOG(ERR,
-								VHOST_DATA,
-								"Failed to "
-								"allocate memory "
-								"for mbuf\n");
-							rte_pktmbuf_free(m);
-							alloc_err = 1;
-							break;
-						}
-						seg_num++;
-						prev->next = cur;
-						prev = cur;
-						seg_offset = 0;
-						seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
-					}
-
-					desc = &vq->desc[desc->next];
-
-					/* Buffer address translation. */
-					vb_addr = gpa_to_vva(dev, desc->addr);
-					/* Prefetch buffer address. */
-					rte_prefetch0((void *)(uintptr_t)vb_addr);
-					vb_offset = 0;
-					vb_avail = desc->len;
-
-					PRINT_PACKET(dev, (uintptr_t)vb_addr,
-						desc->len, 0);
-				} else {
-					/* The whole packet completes. */
-					cur->data_len = seg_offset;
-					m->pkt_len += seg_offset;
-					vb_avail = 0;
-				}
-			}
-
-			cpy_len = RTE_MIN(vb_avail, seg_avail);
-		}
-
-		if (unlikely(alloc_err == 1))
+	for (i = 0; i < count; i++) {
+		m = copy_desc_to_mbuf(dev, vq, desc_indexes[i], mbuf_pool);
+		if (m == NULL)
 			break;
+		pkts[i] = m;
 
-		m->nb_segs = seg_num;
-		if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
-			vhost_dequeue_offload(hdr, m);
-
-		pkts[entry_success] = m;
-		vq->last_used_idx++;
-		entry_success++;
+		used_idx = vq->last_used_idx++ & (vq->size - 1);
+		vq->used->ring[used_idx].id  = desc_indexes[i];
+		vq->used->ring[used_idx].len = 0;
 	}
 
 	rte_compiler_barrier();
-	vq->used->idx += entry_success;
+	vq->used->idx += i;
+
 	/* Kick guest if required. */
 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
 		eventfd_write(vq->callfd, (eventfd_t)1);
-	return entry_success;
+
+	return i;
 }
-- 
1.9.0



More information about the dev mailing list