[dpdk-dev] [PATCH v6 06/11] net/virtio: implement transmit path for	packed queues
    Jens Freimann 
    jfreimann at redhat.com
       
    Fri Sep 21 12:33:03 CEST 2018
    
    
  
This implements the transmit path for devices with
support for packed virtqueues.
Add the feature bit and enable code to
add buffers to vring and mark descriptors as available.
Signed-off-by: Jens Freiman <jfreimann at redhat.com>
---
 drivers/net/virtio/virtio_ethdev.c |   8 +-
 drivers/net/virtio/virtio_ethdev.h |   2 +
 drivers/net/virtio/virtio_ring.h   |  15 +-
 drivers/net/virtio/virtio_rxtx.c   | 243 +++++++++++++++++++++++++++++
 drivers/net/virtio/virtqueue.h     |  17 +-
 5 files changed, 280 insertions(+), 5 deletions(-)
diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index 29f3e1043..5c28af282 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -384,6 +384,8 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
 	vq->hw = hw;
 	vq->vq_queue_index = vtpci_queue_idx;
 	vq->vq_nentries = vq_size;
+	if (vtpci_packed_queue(hw))
+		vq->vq_ring.avail_wrap_counter = 1;
 
 	/*
 	 * Reserve a memzone for vring elements
@@ -1338,7 +1340,11 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
 		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
 	}
 
-	if (hw->use_inorder_tx) {
+	if (vtpci_packed_queue(hw)) {
+		PMD_INIT_LOG(INFO, "virtio: using virtio 1.1 Tx path on port %u",
+			eth_dev->data->port_id);
+		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
+	} else if (hw->use_inorder_tx) {
 		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
 			eth_dev->data->port_id);
 		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index b726ad108..04161b461 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -79,6 +79,8 @@ uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
 
 uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
+uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
+		uint16_t nb_pkts);
 
 uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h
index b9e63d4d4..dbffd4dcd 100644
--- a/drivers/net/virtio/virtio_ring.h
+++ b/drivers/net/virtio/virtio_ring.h
@@ -108,14 +108,25 @@ set_desc_avail(struct vring *vr, struct vring_desc_packed *desc)
 }
 
 static inline int
-desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+_desc_is_used(struct vring_desc_packed *desc)
 {
 	uint16_t used, avail;
 
 	used = !!(desc->flags & VRING_DESC_F_USED(1));
 	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
 
-	return used == avail && used == vr->used_wrap_counter;
+	return used == avail;
+
+}
+
+static inline int
+desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+{
+	uint16_t used;
+
+	used = !!(desc->flags & VRING_DESC_F_USED(1));
+
+	return _desc_is_used(desc) && used == vr->used_wrap_counter;
 }
 
 /* The standard layout for the ring is a continuous chunk of memory which
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index eb891433e..ea6300563 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -38,6 +38,7 @@
 #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
 #endif
 
+
 int
 virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
 {
@@ -165,6 +166,31 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
 #endif
 
 /* Cleanup from completed transmits. */
+static void
+virtio_xmit_cleanup_packed(struct virtqueue *vq)
+{
+	uint16_t idx;
+	uint16_t size = vq->vq_nentries;
+	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
+	struct vq_desc_extra *dxp;
+
+	idx = vq->vq_used_cons_idx;
+	while (_desc_is_used(&desc[idx]) &&
+	       vq->vq_free_cnt < size) {
+		dxp = &vq->vq_descx[idx];
+		vq->vq_free_cnt += dxp->ndescs;
+		idx += dxp->ndescs;
+		idx = idx >= size ? idx - size : idx;
+		if (idx == 0) {
+			vq->vq_ring.used_wrap_counter ^= 1;
+		}
+		if (dxp->cookie != NULL) {
+			rte_pktmbuf_free(dxp->cookie);
+			dxp->cookie = NULL;
+		}
+	}
+}
+
 static void
 virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
 {
@@ -456,6 +482,129 @@ virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
 	vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
 }
 
+static inline void
+virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
+			uint16_t needed, int use_indirect, int can_push,
+			int in_order)
+{
+	struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
+	struct vq_desc_extra *dxp;
+	struct virtqueue *vq = txvq->vq;
+	struct vring_desc_packed *start_dp;
+	uint16_t seg_num = cookie->nb_segs;
+	uint16_t head_idx, idx, prev;
+	uint16_t head_id;
+	uint16_t head_size = vq->hw->vtnet_hdr_size;
+	struct virtio_net_hdr *hdr;
+	int wrap_counter = vq->vq_ring.avail_wrap_counter;
+
+	head_idx = vq->vq_desc_head_idx;
+	idx = head_idx;
+	dxp = &vq->vq_descx[idx];
+	dxp->cookie = (void *)cookie;
+	dxp->ndescs = needed;
+
+	start_dp = vq->vq_ring.desc_packed;
+	head_id = start_dp[head_idx].index;
+
+	if (can_push) {
+		/* prepend cannot fail, checked by caller */
+		hdr = (struct virtio_net_hdr *)
+			rte_pktmbuf_prepend(cookie, head_size);
+		/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
+		 * which is wrong. Below subtract restores correct pkt size.
+		 */
+		cookie->pkt_len -= head_size;
+
+		/* if offload disabled, it is not zeroed below, do it now */
+		if (!vq->hw->has_tx_offload) {
+			ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
+		}
+	} else if (use_indirect) {
+		/* setup tx ring slot to point to indirect
+		 * descriptor list stored in reserved region.
+		 *
+		 * the first slot in indirect ring is already preset
+		 * to point to the header in reserved region
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_indir, txr);
+		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);
+		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+
+		/* loop below will fill in rest of the indirect elements */
+		start_dp = txr[idx].tx_indir_pq;
+		idx = 1;
+	} else {
+		/* setup first tx ring slot to point to header
+		 * stored in reserved region.
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
+		start_dp[idx].len   = vq->hw->vtnet_hdr_size;
+		start_dp[idx].flags = VRING_DESC_F_NEXT |
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.used_wrap_counter); 
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+
+		idx++;
+	}
+
+	virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
+
+	do {
+		if (idx >= vq->vq_nentries) {
+			idx -= vq->vq_nentries;
+			vq->vq_ring.avail_wrap_counter ^= 1;
+			vq->vq_ring.used_wrap_counter ^= 1;
+		}
+		start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
+		start_dp[idx].len   = cookie->data_len;
+		start_dp[idx].flags = VRING_DESC_F_NEXT |
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.used_wrap_counter); 
+		idx++;
+	} while ((cookie = cookie->next) != NULL);
+
+	if (use_indirect)
+		idx = vq->vq_ring.desc_packed[head_idx].index;
+
+	if (idx >= vq->vq_nentries) {
+		idx -= vq->vq_nentries;
+		vq->vq_ring.avail_wrap_counter ^= 1;
+		vq->vq_ring.used_wrap_counter ^= 1;
+	}
+
+	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
+
+	if (needed > 1) {
+		prev = (idx - 1 > 0 ? idx - 1 : vq->vq_nentries) - 1;
+		start_dp[prev].index = head_id;
+		start_dp[prev].flags =
+			(VRING_DESC_F_AVAIL(wrap_counter) |
+			 VRING_DESC_F_USED(!wrap_counter));
+	}
+	start_dp[head_idx].flags =
+		(VRING_DESC_F_AVAIL(wrap_counter) |
+		 VRING_DESC_F_USED(!wrap_counter));
+	rte_smp_wmb();
+
+	vq->vq_desc_head_idx = idx;
+	vq->vq_avail_idx = idx;
+
+	if (!in_order) {
+		if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+			vq->vq_desc_tail_idx = idx;
+	}
+}
+
+
 static inline void
 virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
 			uint16_t needed, int use_indirect, int can_push,
@@ -736,6 +885,9 @@ virtio_dev_tx_queue_setup_finish(struct rte_eth_dev *dev,
 	if (hw->use_inorder_tx)
 		vq->vq_ring.desc[vq->vq_nentries - 1].next = 0;
 
+	if (vtpci_packed_queue(hw))
+		vq->vq_ring.avail_wrap_counter = 1;
+
 	VIRTQUEUE_DUMP(vq);
 
 	return 0;
@@ -1346,6 +1498,97 @@ virtio_recv_mergeable_pkts(void *rx_queue,
 	return nb_rx;
 }
 
+uint16_t
+virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
+{
+	struct virtnet_tx *txvq = tx_queue;
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t hdr_size = hw->vtnet_hdr_size;
+	uint16_t nb_tx = 0;
+	int error;
+
+	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+		return nb_tx;
+
+	if (unlikely(nb_pkts < 1))
+		return nb_pkts;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	virtio_rmb();
+	if (likely(nb_pkts > vq->vq_nentries - vq->vq_free_thresh))
+		virtio_xmit_cleanup_packed(vq);
+
+	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
+		struct rte_mbuf *txm = tx_pkts[nb_tx];
+		int can_push = 0, use_indirect = 0, slots, need;
+
+		/* Do VLAN tag insertion */
+		if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
+			error = rte_vlan_insert(&txm);
+			if (unlikely(error)) {
+				rte_pktmbuf_free(txm);
+				continue;
+			}
+		}
+
+		/* optimize ring usage */
+		if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
+		      vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
+		    rte_mbuf_refcnt_read(txm) == 1 &&
+		    RTE_MBUF_DIRECT(txm) &&
+		    txm->nb_segs == 1 &&
+		    rte_pktmbuf_headroom(txm) >= hdr_size &&
+		    rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
+				   __alignof__(struct virtio_net_hdr_mrg_rxbuf)))
+			can_push = 1;
+		else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) &&
+			 txm->nb_segs < VIRTIO_MAX_TX_INDIRECT)
+			use_indirect = 1;
+
+		/* How many main ring entries are needed to this Tx?
+		 * any_layout => number of segments
+		 * indirect   => 1
+		 * default    => number of segments + 1
+		 */
+		slots = use_indirect ? 1 : (txm->nb_segs + !can_push);
+		need = slots - vq->vq_free_cnt;
+
+		/* Positive value indicates it need free vring descriptors */
+		if (unlikely(need > 0)) {
+			virtio_rmb();
+			need = RTE_MIN(need, (int)nb_pkts);
+
+			virtio_xmit_cleanup_packed(vq);
+			need = slots - vq->vq_free_cnt;
+			if (unlikely(need > 0)) {
+				PMD_TX_LOG(ERR,
+					   "No free tx descriptors to transmit");
+				break;
+			}
+		}
+
+		/* Enqueue Packet buffers */
+		virtqueue_enqueue_xmit_packed(txvq, txm, slots, use_indirect,
+			can_push, 0);
+
+		txvq->stats.bytes += txm->pkt_len;
+		virtio_update_packet_stats(&txvq->stats, txm);
+	}
+
+	txvq->stats.packets += nb_tx;
+
+	if (likely(nb_tx)) {
+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+			virtqueue_notify(vq);
+			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+		}
+	}
+
+	return nb_tx;
+}
+
 uint16_t
 virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
index eb220563f..ffa2d8f92 100644
--- a/drivers/net/virtio/virtqueue.h
+++ b/drivers/net/virtio/virtqueue.h
@@ -241,8 +241,12 @@ struct virtio_net_hdr_mrg_rxbuf {
 #define VIRTIO_MAX_TX_INDIRECT 8
 struct virtio_tx_region {
 	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
-	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
-			   __attribute__((__aligned__(16)));
+	union {
+		struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+		struct vring_desc_packed tx_indir_pq[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+	};
 };
 
 static inline uint16_t
@@ -360,6 +364,15 @@ virtqueue_kick_prepare(struct virtqueue *vq)
 	return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
 }
 
+static inline int
+virtqueue_kick_prepare_packed(struct virtqueue *vq)
+{
+	uint16_t flags;
+
+	flags = vq->vq_ring.device_event->desc_event_flags & RING_EVENT_FLAGS_DESC;
+	return (flags != RING_EVENT_FLAGS_DISABLE);
+}
+
 static inline void
 virtqueue_notify(struct virtqueue *vq)
 {
-- 
2.17.1
    
    
More information about the dev
mailing list