[dpdk-dev] [PATCH v11 6/9] net/virtio: implement receive path for packed queues

Jens Freimann jfreimann at redhat.com
Wed Dec 5 13:19:21 CET 2018


On Wed, Dec 05, 2018 at 12:28:27PM +0100, Maxime Coquelin wrote:
>
>
>On 12/3/18 3:15 PM, Jens Freimann wrote:
>>Implement the receive part.
>>
>>Signed-off-by: Jens Freimann <jfreimann at redhat.com>
>>Signed-off-by: Tiwei Bie <tiwei.bie at intel.com>
>>---
>>  drivers/net/virtio/virtio_ethdev.c |  61 +++--
>>  drivers/net/virtio/virtio_ethdev.h |   5 +
>>  drivers/net/virtio/virtio_rxtx.c   | 369 ++++++++++++++++++++++++++++-
>>  drivers/net/virtio/virtqueue.c     |  22 ++
>>  drivers/net/virtio/virtqueue.h     |   2 +-
>>  5 files changed, 428 insertions(+), 31 deletions(-)
>>
>>diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
>>index bdcc9f365..e86300b58 100644
>>--- a/drivers/net/virtio/virtio_ethdev.c
>>+++ b/drivers/net/virtio/virtio_ethdev.c
>>@@ -1359,27 +1359,39 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
>>  		}
>>  	}
>>-	if (hw->use_simple_rx) {
>>-		PMD_INIT_LOG(INFO, "virtio: using simple Rx path on port %u",
>>-			eth_dev->data->port_id);
>>-		eth_dev->rx_pkt_burst = virtio_recv_pkts_vec;
>>-	} else if (hw->use_inorder_rx) {
>>-		PMD_INIT_LOG(INFO,
>>-			"virtio: using inorder mergeable buffer Rx path on port %u",
>>-			eth_dev->data->port_id);
>>-		eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts_inorder;
>>-	} else if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
>>-		PMD_INIT_LOG(INFO,
>>-			"virtio: using mergeable buffer Rx path on port %u",
>>-			eth_dev->data->port_id);
>>-		eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts;
>>-	} else {
>>-		PMD_INIT_LOG(INFO, "virtio: using standard Rx path on port %u",
>>-			eth_dev->data->port_id);
>>-		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
>>+	if (vtpci_packed_queue(hw)) {
>>+		if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
>>+			PMD_INIT_LOG(INFO,
>>+				"virtio: using packed ring mergeable buffer Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts_packed;
>>+		} else {
>>+			PMD_INIT_LOG(INFO,
>>+				"virtio: using packed ring standard Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = &virtio_recv_pkts_packed;
>>+		}
>>+	} else {
>>+		if (hw->use_simple_rx) {
>>+			PMD_INIT_LOG(INFO, "virtio: using simple Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = virtio_recv_pkts_vec;
>>+		} else if (hw->use_inorder_rx) {
>>+			PMD_INIT_LOG(INFO,
>>+				"virtio: using inorder mergeable buffer Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts_inorder;
>>+		} else if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
>>+			PMD_INIT_LOG(INFO,
>>+				"virtio: using mergeable buffer Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts;
>>+		} else {
>>+			PMD_INIT_LOG(INFO, "virtio: using standard Rx path on port %u",
>>+				eth_dev->data->port_id);
>>+			eth_dev->rx_pkt_burst = &virtio_recv_pkts;
>>+		}
>>  	}
>>-
>>-
>>  }
>>  /* Only support 1:1 queue/interrupt mapping so far.
>>@@ -1511,7 +1523,8 @@ virtio_init_device(struct rte_eth_dev *eth_dev, uint64_t req_features)
>>  	/* Setting up rx_header size for the device */
>>  	if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF) ||
>>-	    vtpci_with_feature(hw, VIRTIO_F_VERSION_1))
>>+	    vtpci_with_feature(hw, VIRTIO_F_VERSION_1) ||
>>+	    vtpci_with_feature(hw, VIRTIO_F_RING_PACKED))
>>  		hw->vtnet_hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
>>  	else
>>  		hw->vtnet_hdr_size = sizeof(struct virtio_net_hdr);
>>@@ -1939,6 +1952,12 @@ virtio_dev_configure(struct rte_eth_dev *dev)
>>  		}
>>  	}
>>+	if (vtpci_packed_queue(hw)) {
>>+		hw->use_simple_rx = 0;
>>+		hw->use_inorder_rx = 0;
>>+		hw->use_inorder_tx = 0;
>>+	}
>>+
>>  #if defined RTE_ARCH_ARM64 || defined RTE_ARCH_ARM
>>  	if (!rte_cpu_get_flag_enabled(RTE_CPUFLAG_NEON)) {
>>  		hw->use_simple_rx = 0;
>>diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
>>index 05d355180..5cf295418 100644
>>--- a/drivers/net/virtio/virtio_ethdev.h
>>+++ b/drivers/net/virtio/virtio_ethdev.h
>>@@ -73,10 +73,15 @@ int virtio_dev_tx_queue_setup_finish(struct rte_eth_dev *dev,
>>  uint16_t virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
>>  		uint16_t nb_pkts);
>>+uint16_t virtio_recv_pkts_packed(void *rx_queue, struct rte_mbuf **rx_pkts,
>>+		uint16_t nb_pkts);
>>  uint16_t virtio_recv_mergeable_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
>>  		uint16_t nb_pkts);
>>+uint16_t virtio_recv_mergeable_pkts_packed(void *rx_queue, struct rte_mbuf **rx_pkts,
>>+		uint16_t nb_pkts);
>>+
>>  uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
>>  		struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
>>diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
>>index 1fcc9cef7..f73498602 100644
>>--- a/drivers/net/virtio/virtio_rxtx.c
>>+++ b/drivers/net/virtio/virtio_rxtx.c
>>@@ -31,6 +31,7 @@
>>  #include "virtqueue.h"
>>  #include "virtio_rxtx.h"
>>  #include "virtio_rxtx_simple.h"
>>+#include "virtio_ring.h"
>>  #ifdef RTE_LIBRTE_VIRTIO_DEBUG_DUMP
>>  #define VIRTIO_DUMP_PACKET(m, len) rte_pktmbuf_dump(stdout, m, len)
>>@@ -105,6 +106,47 @@ vq_ring_free_id_packed(struct virtqueue *vq, uint16_t id)
>>  	dxp->next = VQ_RING_DESC_CHAIN_END;
>>  }
>>+static uint16_t
>
>In think it should be inlined.

ok

>
>>+virtqueue_dequeue_burst_rx_packed(struct virtqueue *vq,
>>+				  struct rte_mbuf **rx_pkts,
>>+				  uint32_t *len,
>>+				  uint16_t num)
>>+{
>>+	struct rte_mbuf *cookie;
>>+	uint16_t used_idx;
>>+	uint16_t id;
>>+	struct vring_packed_desc *desc;
>>+	uint16_t i;
>>+
>>+	desc = vq->ring_packed.desc_packed;
>>+
>>+	for (i = 0; i < num; i++) {
>>+		used_idx = vq->vq_used_cons_idx;
>>+		if (!desc_is_used(&desc[used_idx], vq))
>>+			return i;
>>+		len[i] = desc[used_idx].len;
>>+		id = desc[used_idx].id;
>>+		cookie = (struct rte_mbuf *)vq->vq_descx[id].cookie;
>>+		if (unlikely(cookie == NULL)) {
>>+			PMD_DRV_LOG(ERR, "vring descriptor with no mbuf cookie at %u",
>>+				vq->vq_used_cons_idx);
>>+			break;
>>+		}
>>+		rte_prefetch0(cookie);
>>+		rte_packet_prefetch(rte_pktmbuf_mtod(cookie, void *));
>>+		rx_pkts[i] = cookie;
>>+
>>+		vq->vq_free_cnt++;
>>+		vq->vq_used_cons_idx++;
>>+		if (vq->vq_used_cons_idx >= vq->vq_nentries) {
>>+			vq->vq_used_cons_idx -= vq->vq_nentries;
>>+			vq->used_wrap_counter ^= 1;
>>+		}
>>+	}
>>+
>>+	return i;
>>+}
>>+
>>  static uint16_t
>>  virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct rte_mbuf **rx_pkts,
>>  			   uint32_t *len, uint16_t num)
>>@@ -350,6 +392,44 @@ virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf *cookie)
>>  	return 0;
>>  }
>>+static inline int
>>+virtqueue_enqueue_recv_refill_packed(struct virtqueue *vq, struct rte_mbuf *cookie)
>>+{
>>+	struct vq_desc_extra *dxp;
>>+	struct virtio_hw *hw = vq->hw;
>>+	struct vring_packed_desc *dp;
>>+	uint16_t idx;
>>+	uint16_t flags;
>>+
>>+	if (unlikely(vq->vq_free_cnt < 1))
>>+		return -ENOSPC;
>>+
>>+	idx = vq->vq_avail_idx;
>>+
>>+	dxp = &vq->vq_descx[idx];
>>+	dxp->cookie = cookie;
>>+
>>+	dp = &vq->ring_packed.desc_packed[idx];
>>+	dp->addr = VIRTIO_MBUF_ADDR(cookie, vq) +
>>+			RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
>>+	dp->len = cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw->vtnet_hdr_size;
>>+
>>+	flags = VRING_DESC_F_WRITE;
>>+	flags |= VRING_DESC_F_AVAIL(vq->avail_wrap_counter) |
>>+		 VRING_DESC_F_USED(!vq->avail_wrap_counter);
>>+	rte_smp_wmb();
>>+	dp->flags = flags;
>>+
>>+	vq->vq_free_cnt--;
>>+	vq->vq_avail_idx++;
>>+	if (vq->vq_avail_idx >= vq->vq_nentries) {
>>+		vq->vq_avail_idx -= vq->vq_nentries;
>>+		vq->avail_wrap_counter ^= 1;
>>+	}
>>+
>>+	return 0;
>>+}
>>+
>>  /* When doing TSO, the IP length is not included in the pseudo header
>>   * checksum of the packet given to the PMD, but for virtio it is
>>   * expected.
>>@@ -801,7 +881,10 @@ virtio_dev_rx_queue_setup_finish(struct rte_eth_dev *dev, uint16_t queue_idx)
>>  				break;
>>  			/* Enqueue allocated buffers */
>>-			error = virtqueue_enqueue_recv_refill(vq, m);
>>+			if (vtpci_packed_queue(vq->hw))
>>+				error = virtqueue_enqueue_recv_refill_packed(vq, m);
>>+			else
>>+				error = virtqueue_enqueue_recv_refill(vq, m);
>>  			if (error) {
>>  				rte_pktmbuf_free(m);
>>  				break;
>>@@ -809,7 +892,8 @@ virtio_dev_rx_queue_setup_finish(struct rte_eth_dev *dev, uint16_t queue_idx)
>>  			nbufs++;
>>  		}
>>-		vq_update_avail_idx(vq);
>>+		if (!vtpci_packed_queue(vq->hw))
>>+			vq_update_avail_idx(vq);
>>  	}
>>  	PMD_INIT_LOG(DEBUG, "Allocated %d bufs", nbufs);
>>@@ -896,7 +980,10 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct rte_mbuf *m)
>>  	 * Requeue the discarded mbuf. This should always be
>>  	 * successful since it was just dequeued.
>>  	 */
>>-	error = virtqueue_enqueue_recv_refill(vq, m);
>>+	if (vtpci_packed_queue(vq->hw))
>>+		error = virtqueue_enqueue_recv_refill_packed(vq, m);
>>+	else
>>+		error = virtqueue_enqueue_recv_refill(vq, m);
>>  	if (unlikely(error)) {
>>  		RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf");
>>@@ -1135,6 +1222,103 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
>>  	return nb_rx;
>>  }
>>+uint16_t
>>+virtio_recv_pkts_packed(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
>>+{
>>+	struct virtnet_rx *rxvq = rx_queue;
>>+	struct virtqueue *vq = rxvq->vq;
>>+	struct virtio_hw *hw = vq->hw;
>>+	struct rte_mbuf *rxm, *new_mbuf;
>>+	uint16_t num, nb_rx;
>>+	uint32_t len[VIRTIO_MBUF_BURST_SZ];
>>+	struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ];
>>+	int error;
>>+	uint32_t i, nb_enqueued;
>>+	uint32_t hdr_size;
>>+	struct virtio_net_hdr *hdr;
>>+
>>+	nb_rx = 0;
>>+	if (unlikely(hw->started == 0))
>>+		return nb_rx;
>>+
>>+	num = RTE_MIN(VIRTIO_MBUF_BURST_SZ, nb_pkts);
>>+	if (likely(num > DESC_PER_CACHELINE))
>>+		num = num - ((vq->vq_used_cons_idx + num) % DESC_PER_CACHELINE);
>>+
>>+	num = virtqueue_dequeue_burst_rx_packed(vq, rcv_pkts, len, num);
>>+	PMD_RX_LOG(DEBUG, "dequeue:%d", num);
>>+
>>+	nb_enqueued = 0;
>>+	hdr_size = hw->vtnet_hdr_size;
>>+
>>+	for (i = 0; i < num; i++) {
>>+		rxm = rcv_pkts[i];
>>+
>>+		PMD_RX_LOG(DEBUG, "packet len:%d", len[i]);
>>+
>>+		if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) {
>>+			PMD_RX_LOG(ERR, "Packet drop");
>>+			nb_enqueued++;
>>+			virtio_discard_rxbuf(vq, rxm);
>>+			rxvq->stats.errors++;
>>+			continue;
>>+		}
>>+
>>+		rxm->port = rxvq->port_id;
>>+		rxm->data_off = RTE_PKTMBUF_HEADROOM;
>>+		rxm->ol_flags = 0;
>>+		rxm->vlan_tci = 0;
>>+
>>+		rxm->pkt_len = (uint32_t)(len[i] - hdr_size);
>>+		rxm->data_len = (uint16_t)(len[i] - hdr_size);
>>+
>>+		hdr = (struct virtio_net_hdr *)((char *)rxm->buf_addr +
>>+			RTE_PKTMBUF_HEADROOM - hdr_size);
>>+
>>+		if (hw->vlan_strip)
>>+			rte_vlan_strip(rxm);
>>+
>>+		if (hw->has_rx_offload && virtio_rx_offload(rxm, hdr) < 0) {
>>+			virtio_discard_rxbuf(vq, rxm);
>>+			rxvq->stats.errors++;
>>+			continue;
>>+		}
>>+
>>+		virtio_rx_stats_updated(rxvq, rxm);
>>+
>>+		rx_pkts[nb_rx++] = rxm;
>>+	}
>>+
>>+	rxvq->stats.packets += nb_rx;
>>+
>>+	/* Allocate new mbuf for the used descriptor */
>>+	while (likely(!virtqueue_full(vq))) {
>>+		new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool);
>>+		if (unlikely(new_mbuf == NULL)) {
>>+			struct rte_eth_dev *dev
>>+				= &rte_eth_devices[rxvq->port_id];
>>+			dev->data->rx_mbuf_alloc_failed++;
>>+			break;
>>+		}
>>+		error = virtqueue_enqueue_recv_refill_packed(vq, new_mbuf);
>>+		if (unlikely(error)) {
>>+			rte_pktmbuf_free(new_mbuf);
>>+			break;
>>+		}
>>+		nb_enqueued++;
>>+	}
>>+
>>+	if (likely(nb_enqueued)) {
>>+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
>>+			virtqueue_notify(vq);
>>+			PMD_RX_LOG(DEBUG, "Notified");
>>+		}
>>+	}
>>+
>>+	return nb_rx;
>>+}
>>+
>>+
>>  uint16_t
>>  virtio_recv_mergeable_pkts_inorder(void *rx_queue,
>>  			struct rte_mbuf **rx_pkts,
>>@@ -1341,6 +1525,7 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  	uint16_t extra_idx;
>>  	uint32_t seg_res;
>>  	uint32_t hdr_size;
>>+	uint32_t rx_num = 0;
>>  	nb_rx = 0;
>>  	if (unlikely(hw->started == 0))
>>@@ -1366,6 +1551,8 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  			break;
>>  		num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1);
>>+		if (num == 0)
>>+			return nb_rx;
>>  		if (num != 1)
>>  			continue;
>>@@ -1418,11 +1605,8 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  			uint16_t  rcv_cnt =
>>  				RTE_MIN(seg_res, RTE_DIM(rcv_pkts));
>>  			if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
>>-				uint32_t rx_num =
>>-					virtqueue_dequeue_burst_rx(vq,
>>-					rcv_pkts, len, rcv_cnt);
>>-				i += rx_num;
>>-				rcv_cnt = rx_num;
>>+				rx_num = virtqueue_dequeue_burst_rx(vq,
>>+					      rcv_pkts, len, rcv_cnt);
>>  			} else {
>>  				PMD_RX_LOG(ERR,
>>  					   "No enough segments for packet.");
>>@@ -1431,6 +1615,8 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  				rxvq->stats.errors++;
>>  				break;
>>  			}
>>+			i += rx_num;
>>+			rcv_cnt = rx_num;
>>  			extra_idx = 0;
>>@@ -1483,7 +1669,6 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  	if (likely(nb_enqueued)) {
>>  		vq_update_avail_idx(vq);
>>-
>>  		if (unlikely(virtqueue_kick_prepare(vq))) {
>>  			virtqueue_notify(vq);
>>  			PMD_RX_LOG(DEBUG, "Notified");
>>@@ -1493,6 +1678,172 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>  	return nb_rx;
>>  }
>>+uint16_t
>>+virtio_recv_mergeable_pkts_packed(void *rx_queue,
>>+			struct rte_mbuf **rx_pkts,
>>+			uint16_t nb_pkts)
>>+{
>>+	struct virtnet_rx *rxvq = rx_queue;
>>+	struct virtqueue *vq = rxvq->vq;
>>+	struct virtio_hw *hw = vq->hw;
>>+	struct rte_mbuf *rxm, *new_mbuf;
>>+	uint16_t nb_used, num, nb_rx;
>>+	uint32_t len[VIRTIO_MBUF_BURST_SZ];
>>+	struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ];
>>+	struct rte_mbuf *prev;
>>+	int error;
>>+	uint32_t i, nb_enqueued;
>>+	uint32_t seg_num;
>>+	uint16_t extra_idx;
>>+	uint32_t seg_res;
>>+	uint32_t hdr_size;
>>+	uint32_t rx_num = 0;
>>+
>>+	nb_rx = 0;
>>+	if (unlikely(hw->started == 0))
>>+		return nb_rx;
>>+
>>+	nb_used = VIRTIO_MBUF_BURST_SZ;
>>+
>>+	i = 0;
>>+	nb_enqueued = 0;
>>+	seg_num = 0;
>>+	extra_idx = 0;
>>+	seg_res = 0;
>>+	hdr_size = hw->vtnet_hdr_size;
>>+
>>+	while (i < nb_used) {
>>+		struct virtio_net_hdr_mrg_rxbuf *header;
>>+
>>+		if (nb_rx == nb_pkts)
>>+			break;
>>+
>>+		num = virtqueue_dequeue_burst_rx_packed(vq, rcv_pkts, len, 1);
>>+		if (num == 0)
>>+			break;
>>+		if (num != 1)
>>+			continue;
>
>I see this check also in split function, but it should just be dropped
>as the result can only be 0 or 1.
>
>I will remove it for split ring.

Yes, I tried to break this function up into a dequeue_one() that is
called from above function. It didn't give any performance improvement
though. But I agree that we can remove it. 

>
>>+
>>+		i++;
>>+
>>+		PMD_RX_LOG(DEBUG, "dequeue:%d", num);
>>+		PMD_RX_LOG(DEBUG, "packet len:%d", len[0]);
>>+
>>+		rxm = rcv_pkts[0];
>>+
>>+		if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) {
>>+			PMD_RX_LOG(ERR, "Packet drop");
>>+			nb_enqueued++;
>>+			virtio_discard_rxbuf(vq, rxm);
>>+			rxvq->stats.errors++;
>>+			continue;
>>+		}
>>+
>>+		header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm->buf_addr +
>>+			RTE_PKTMBUF_HEADROOM - hdr_size);
>>+		seg_num = header->num_buffers;
>>+
>>+		if (seg_num == 0)
>>+			seg_num = 1;
>
>I would suggest we use this function also for non-mergeable case, as I
>did for split ring, by forcing seg_num to 1 if mergeable feature not
>negotiated.
>
>Doing so, we can drop virtio_recv_pkts_packed(() function and save 100
>LoC. If you do that, rename this function to virtio_recv_pkts_packed().

Yes, as discussed offline. I'll do it for the next version. 

regards,
Jens 


More information about the dev mailing list