[dpdk-dev] [PATCH v3 2/8] vhost: refactor virtio_dev_rx

Yuanhan Liu yuanhan.liu at linux.intel.com
Thu Mar 10 05:32:40 CET 2016


This is a simple refactor, as there isn't any twisted logic in old
code. Here I just broke the code and introduced two helper functions,
reserve_avail_buf() and copy_mbuf_to_desc() to make the code more
readable.

Also, it saves nearly 1K bytes of binary code size.

Signed-off-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
---

v2: - fix NULL dereference bug found by Rich.

v3: - use while (mbuf_avail || m->next) to align with the style of
      coyp_desc_to_mbuf() -- suggestec by Huawei

    - use likely/unlikely when possible
---
 lib/librte_vhost/vhost_rxtx.c | 296 ++++++++++++++++++++----------------------
 1 file changed, 141 insertions(+), 155 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index e12e9ba..0df0612 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -129,6 +129,115 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 	return;
 }
 
+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
+	uint32_t desc_avail, desc_offset;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct vring_desc *desc;
+	uint64_t desc_addr;
+	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+
+	desc = &vq->desc[desc_idx];
+	desc_addr = gpa_to_vva(dev, desc->addr);
+	rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+	virtio_enqueue_offload(m, &virtio_hdr.hdr);
+	rte_memcpy((void *)(uintptr_t)desc_addr,
+			(const void *)&virtio_hdr, vq->vhost_hlen);
+	vhost_log_write(dev, desc->addr, vq->vhost_hlen);
+	PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+	desc_offset = vq->vhost_hlen;
+	desc_avail  = desc->len - vq->vhost_hlen;
+
+	*copied = rte_pktmbuf_pkt_len(m);
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		/* done with current mbuf, fetch next */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		/* done with current desc buf, fetch next */
+		if (desc_avail == 0) {
+			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+				/* Room in vring buffer is not enough */
+				return -1;
+			}
+
+			desc = &vq->desc[desc->next];
+			desc_addr   = gpa_to_vva(dev, desc->addr);
+			desc_offset = 0;
+			desc_avail  = desc->len;
+		}
+
+		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+			cpy_len);
+		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
+		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+			     cpy_len, 0);
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		desc_avail  -= cpy_len;
+		desc_offset += cpy_len;
+	}
+
+	return 0;
+}
+
+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count,
+		  uint16_t *start, uint16_t *end)
+{
+	uint16_t res_start_idx;
+	uint16_t res_end_idx;
+	uint16_t avail_idx;
+	uint16_t free_entries;
+	int success;
+
+	count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+	res_start_idx = vq->last_used_idx_res;
+	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+	free_entries = avail_idx - res_start_idx;
+	count = RTE_MIN(count, free_entries);
+	if (count == 0)
+		return 0;
+
+	res_end_idx = res_start_idx + count;
+
+	/*
+	 * update vq->last_used_idx_res atomically; try again if failed.
+	 *
+	 * TODO: Allow to disable cmpset if no concurrency in application.
+	 */
+	success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+				      res_start_idx, res_end_idx);
+	if (unlikely(!success))
+		goto again;
+
+	*start = res_start_idx;
+	*end   = res_end_idx;
+
+	return count;
+}
+
 /**
  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
  * be received from the physical port or from another virtio device. A packet
@@ -138,21 +247,12 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
  */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
+	      struct rte_mbuf **pkts, uint32_t count)
 {
 	struct vhost_virtqueue *vq;
-	struct vring_desc *desc, *hdr_desc;
-	struct rte_mbuf *buff, *first_buff;
-	/* The virtio_hdr is initialised to 0. */
-	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
-	uint64_t buff_addr = 0;
-	uint64_t buff_hdr_addr = 0;
-	uint32_t head[MAX_PKT_BURST];
-	uint32_t head_idx, packet_success = 0;
-	uint16_t avail_idx, res_cur_idx;
-	uint16_t res_base_idx, res_end_idx;
-	uint16_t free_entries;
-	uint8_t success = 0;
+	uint16_t res_start_idx, res_end_idx;
+	uint16_t desc_indexes[MAX_PKT_BURST];
+	uint32_t i;
 
 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -166,161 +266,47 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	if (unlikely(vq->enabled == 0))
 		return 0;
 
-	count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
-
-	/*
-	 * As many data cores may want access to available buffers,
-	 * they need to be reserved.
-	 */
-	do {
-		res_base_idx = vq->last_used_idx_res;
-		avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
-		free_entries = (avail_idx - res_base_idx);
-		/*check that we have enough buffers*/
-		if (unlikely(count > free_entries))
-			count = free_entries;
-
-		if (count == 0)
-			return 0;
-
-		res_end_idx = res_base_idx + count;
-		/* vq->last_used_idx_res is atomically updated. */
-		/* TODO: Allow to disable cmpset if no concurrency in application. */
-		success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-				res_base_idx, res_end_idx);
-	} while (unlikely(success == 0));
-	res_cur_idx = res_base_idx;
-	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
-			dev->device_fh, res_cur_idx, res_end_idx);
-
-	/* Prefetch available ring to retrieve indexes. */
-	rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);
-
-	/* Retrieve all of the head indexes first to avoid caching issues. */
-	for (head_idx = 0; head_idx < count; head_idx++)
-		head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) &
-					(vq->size - 1)];
-
-	/*Prefetch descriptor index. */
-	rte_prefetch0(&vq->desc[head[packet_success]]);
-
-	while (res_cur_idx != res_end_idx) {
-		uint32_t offset = 0, vb_offset = 0;
-		uint32_t pkt_len, len_to_cpy, data_len, total_copied = 0;
-		uint8_t hdr = 0, uncompleted_pkt = 0;
-		uint16_t idx;
-
-		/* Get descriptor from available ring */
-		desc = &vq->desc[head[packet_success]];
-
-		buff = pkts[packet_success];
-		first_buff = buff;
-
-		/* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
-		buff_addr = gpa_to_vva(dev, desc->addr);
-		/* Prefetch buffer address. */
-		rte_prefetch0((void *)(uintptr_t)buff_addr);
-
-		/* Copy virtio_hdr to packet and increment buffer address */
-		buff_hdr_addr = buff_addr;
-		hdr_desc = desc;
-
-		/*
-		 * If the descriptors are chained the header and data are
-		 * placed in separate buffers.
-		 */
-		if ((desc->flags & VRING_DESC_F_NEXT) &&
-			(desc->len == vq->vhost_hlen)) {
-			desc = &vq->desc[desc->next];
-			/* Buffer address translation. */
-			buff_addr = gpa_to_vva(dev, desc->addr);
-		} else {
-			vb_offset += vq->vhost_hlen;
-			hdr = 1;
-		}
+	count = reserve_avail_buf(vq, count, &res_start_idx, &res_end_idx);
+	if (count == 0)
+		return 0;
 
-		pkt_len = rte_pktmbuf_pkt_len(buff);
-		data_len = rte_pktmbuf_data_len(buff);
-		len_to_cpy = RTE_MIN(data_len,
-			hdr ? desc->len - vq->vhost_hlen : desc->len);
-		while (total_copied < pkt_len) {
-			/* Copy mbuf data to buffer */
-			rte_memcpy((void *)(uintptr_t)(buff_addr + vb_offset),
-				rte_pktmbuf_mtod_offset(buff, const void *, offset),
-				len_to_cpy);
-			vhost_log_write(dev, desc->addr + vb_offset, len_to_cpy);
-			PRINT_PACKET(dev, (uintptr_t)(buff_addr + vb_offset),
-				len_to_cpy, 0);
-
-			offset += len_to_cpy;
-			vb_offset += len_to_cpy;
-			total_copied += len_to_cpy;
-
-			/* The whole packet completes */
-			if (total_copied == pkt_len)
-				break;
+	LOG_DEBUG(VHOST_DATA,
+		"(%"PRIu64") res_start_idx %d| res_end_idx Index %d\n",
+		dev->device_fh, res_start_idx, res_end_idx);
 
-			/* The current segment completes */
-			if (offset == data_len) {
-				buff = buff->next;
-				offset = 0;
-				data_len = rte_pktmbuf_data_len(buff);
-			}
+	/* Retrieve all of the desc indexes first to avoid caching issues. */
+	rte_prefetch0(&vq->avail->ring[res_start_idx & (vq->size - 1)]);
+	for (i = 0; i < count; i++) {
+		desc_indexes[i] = vq->avail->ring[(res_start_idx + i) &
+						  (vq->size - 1)];
+	}
 
-			/* The current vring descriptor done */
-			if (vb_offset == desc->len) {
-				if (desc->flags & VRING_DESC_F_NEXT) {
-					desc = &vq->desc[desc->next];
-					buff_addr = gpa_to_vva(dev, desc->addr);
-					vb_offset = 0;
-				} else {
-					/* Room in vring buffer is not enough */
-					uncompleted_pkt = 1;
-					break;
-				}
-			}
-			len_to_cpy = RTE_MIN(data_len - offset, desc->len - vb_offset);
-		}
+	rte_prefetch0(&vq->desc[desc_indexes[0]]);
+	for (i = 0; i < count; i++) {
+		uint16_t desc_idx = desc_indexes[i];
+		uint16_t used_idx = (res_start_idx + i) & (vq->size - 1);
+		uint32_t copied;
+		int err;
 
-		/* Update used ring with desc information */
-		idx = res_cur_idx & (vq->size - 1);
-		vq->used->ring[idx].id = head[packet_success];
+		err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx, &copied);
 
-		/* Drop the packet if it is uncompleted */
-		if (unlikely(uncompleted_pkt == 1))
-			vq->used->ring[idx].len = vq->vhost_hlen;
+		vq->used->ring[used_idx].id = desc_idx;
+		if (unlikely(err))
+			vq->used->ring[used_idx].len = vq->vhost_hlen;
 		else
-			vq->used->ring[idx].len = pkt_len + vq->vhost_hlen;
-
+			vq->used->ring[used_idx].len = copied + vq->vhost_hlen;
 		vhost_log_used_vring(dev, vq,
-			offsetof(struct vring_used, ring[idx]),
-			sizeof(vq->used->ring[idx]));
+			offsetof(struct vring_used, ring[used_idx]),
+			sizeof(vq->used->ring[used_idx]));
 
-		res_cur_idx++;
-		packet_success++;
-
-		if (unlikely(uncompleted_pkt == 1))
-			continue;
-
-		virtio_enqueue_offload(first_buff, &virtio_hdr.hdr);
-
-		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
-			(const void *)&virtio_hdr, vq->vhost_hlen);
-		vhost_log_write(dev, hdr_desc->addr, vq->vhost_hlen);
-
-		PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
-
-		if (res_cur_idx < res_end_idx) {
-			/* Prefetch descriptor index. */
-			rte_prefetch0(&vq->desc[head[packet_success]]);
-		}
+		if (i + 1 < count)
+			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
 	}
 
 	rte_compiler_barrier();
 
 	/* Wait until it's our turn to add our buffer to the used ring. */
-	while (unlikely(vq->last_used_idx != res_base_idx))
+	while (unlikely(vq->last_used_idx != res_start_idx))
 		rte_pause();
 
 	*(volatile uint16_t *)&vq->used->idx += count;
-- 
1.9.0



More information about the dev mailing list