[dpdk-dev] [PATCH v7 7/9] net/virtio: add vectorized packed ring Tx path

Marvin Liu yong.liu at intel.com
Wed Apr 22 08:16:22 CEST 2020


Optimize packed ring Tx path alike Rx path. Split Tx path into batch and
single Tx functions. Batch function is further optimized by vector
instructions.

Signed-off-by: Marvin Liu <yong.liu at intel.com>

diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index 5c112cac7..b7d52d497 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -108,6 +108,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 		uint16_t nb_pkts);
 
+uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+		uint16_t nb_pkts);
+
 int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
 
 void virtio_interrupt_handler(void *param);
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index cf18fe564..f82fe8d64 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -2175,3 +2175,11 @@ virtio_recv_pkts_packed_vec(void *rx_queue __rte_unused,
 {
 	return 0;
 }
+
+__rte_weak uint16_t
+virtio_xmit_pkts_packed_vec(void *tx_queue __rte_unused,
+			    struct rte_mbuf **tx_pkts __rte_unused,
+			    uint16_t nb_pkts __rte_unused)
+{
+	return 0;
+}
diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c b/drivers/net/virtio/virtio_rxtx_packed_avx.c
index d02ba9ba6..60d03b6d8 100644
--- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
+++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
@@ -23,6 +23,24 @@
 #define PACKED_FLAGS_MASK ((0ULL | VRING_PACKED_DESC_F_AVAIL_USED) << \
 	FLAGS_BITS_OFFSET)
 
+/* reference count offset in mbuf rearm data */
+#define REFCNT_BITS_OFFSET ((offsetof(struct rte_mbuf, refcnt) - \
+	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
+/* segment number offset in mbuf rearm data */
+#define SEG_NUM_BITS_OFFSET ((offsetof(struct rte_mbuf, nb_segs) - \
+	offsetof(struct rte_mbuf, rearm_data)) * BYTE_SIZE)
+
+/* default rearm data */
+#define DEFAULT_REARM_DATA (1ULL << SEG_NUM_BITS_OFFSET | \
+	1ULL << REFCNT_BITS_OFFSET)
+
+/* id bits offset in packed ring desc higher 64bits */
+#define ID_BITS_OFFSET ((offsetof(struct vring_packed_desc, id) - \
+	offsetof(struct vring_packed_desc, len)) * BYTE_SIZE)
+
+/* net hdr short size mask */
+#define NET_HDR_MASK 0x3F
+
 #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
 	sizeof(struct vring_packed_desc))
 #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
@@ -47,6 +65,47 @@
 	for (iter = val; iter < num; iter++)
 #endif
 
+static inline void
+virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
+{
+	struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
+	struct vq_desc_extra *dxp;
+	uint16_t used_idx, id, curr_id, free_cnt = 0;
+	uint16_t size = vq->vq_nentries;
+	struct rte_mbuf *mbufs[size];
+	uint16_t nb_mbuf = 0, i;
+
+	used_idx = vq->vq_used_cons_idx;
+
+	if (!desc_is_used(&desc[used_idx], vq))
+		return;
+
+	id = desc[used_idx].id;
+
+	do {
+		curr_id = used_idx;
+		dxp = &vq->vq_descx[used_idx];
+		used_idx += dxp->ndescs;
+		free_cnt += dxp->ndescs;
+
+		if (dxp->cookie != NULL) {
+			mbufs[nb_mbuf] = dxp->cookie;
+			dxp->cookie = NULL;
+			nb_mbuf++;
+		}
+
+		if (used_idx >= size) {
+			used_idx -= size;
+			vq->vq_packed.used_wrap_counter ^= 1;
+		}
+	} while (curr_id != id);
+
+	for (i = 0; i < nb_mbuf; i++)
+		rte_pktmbuf_free(mbufs[i]);
+
+	vq->vq_used_cons_idx = used_idx;
+	vq->vq_free_cnt += free_cnt;
+}
 
 static inline void
 virtio_update_batch_stats(struct virtnet_stats *stats,
@@ -60,6 +119,236 @@ virtio_update_batch_stats(struct virtnet_stats *stats,
 	stats->bytes += pkt_len3;
 	stats->bytes += pkt_len4;
 }
+
+static inline int
+virtqueue_enqueue_batch_packed_vec(struct virtnet_tx *txvq,
+				   struct rte_mbuf **tx_pkts)
+{
+	struct virtqueue *vq = txvq->vq;
+	uint16_t head_size = vq->hw->vtnet_hdr_size;
+	uint16_t idx = vq->vq_avail_idx;
+	struct virtio_net_hdr *hdr;
+	uint16_t i, cmp;
+
+	if (vq->vq_avail_idx & PACKED_BATCH_MASK)
+		return -1;
+
+	if (unlikely((idx + PACKED_BATCH_SIZE) > vq->vq_nentries))
+		return -1;
+
+	/* Load four mbufs rearm data */
+	RTE_BUILD_BUG_ON(REFCNT_BITS_OFFSET >= 64);
+	RTE_BUILD_BUG_ON(SEG_NUM_BITS_OFFSET >= 64);
+	__m256i mbufs = _mm256_set_epi64x(*tx_pkts[3]->rearm_data,
+					  *tx_pkts[2]->rearm_data,
+					  *tx_pkts[1]->rearm_data,
+					  *tx_pkts[0]->rearm_data);
+
+	/* refcnt=1 and nb_segs=1 */
+	__m256i mbuf_ref = _mm256_set1_epi64x(DEFAULT_REARM_DATA);
+	__m256i head_rooms = _mm256_set1_epi16(head_size);
+
+	/* Check refcnt and nb_segs */
+	cmp = _mm256_mask_cmpneq_epu16_mask(0x6666, mbufs, mbuf_ref);
+	if (unlikely(cmp))
+		return -1;
+
+	/* Check headroom is enough */
+	RTE_BUILD_BUG_ON(offsetof(struct rte_mbuf, data_off) !=
+		offsetof(struct rte_mbuf, rearm_data));
+	cmp = _mm256_mask_cmplt_epu16_mask(0x1111, mbufs, head_rooms);
+	if (unlikely(cmp))
+		return -1;
+
+	__m512i v_descx = _mm512_set_epi64(0x1, (uint64_t)tx_pkts[3],
+					   0x1, (uint64_t)tx_pkts[2],
+					   0x1, (uint64_t)tx_pkts[1],
+					   0x1, (uint64_t)tx_pkts[0]);
+
+	_mm512_storeu_si512((void *)&vq->vq_descx[idx], v_descx);
+
+	virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		tx_pkts[i]->data_off -= head_size;
+		tx_pkts[i]->data_len += head_size;
+	}
+
+#ifdef RTE_VIRTIO_USER
+	__m512i descs_base = _mm512_set_epi64(tx_pkts[3]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[3])),
+			tx_pkts[2]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[2])),
+			tx_pkts[1]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[1])),
+			tx_pkts[0]->data_len,
+			(uint64_t)(*(uintptr_t *)((uintptr_t)tx_pkts[0])));
+#else
+	__m512i descs_base = _mm512_set_epi64(tx_pkts[3]->data_len,
+					      tx_pkts[3]->buf_iova,
+					      tx_pkts[2]->data_len,
+					      tx_pkts[2]->buf_iova,
+					      tx_pkts[1]->data_len,
+					      tx_pkts[1]->buf_iova,
+					      tx_pkts[0]->data_len,
+					      tx_pkts[0]->buf_iova);
+#endif
+
+	/* id offset and data offset */
+	__m512i data_offsets = _mm512_set_epi64((uint64_t)3 << ID_BITS_OFFSET,
+						tx_pkts[3]->data_off,
+						(uint64_t)2 << ID_BITS_OFFSET,
+						tx_pkts[2]->data_off,
+						(uint64_t)1 << ID_BITS_OFFSET,
+						tx_pkts[1]->data_off,
+						0, tx_pkts[0]->data_off);
+
+	__m512i new_descs = _mm512_add_epi64(descs_base, data_offsets);
+
+	uint64_t flags_temp = (uint64_t)idx << ID_BITS_OFFSET |
+		(uint64_t)vq->vq_packed.cached_flags << FLAGS_BITS_OFFSET;
+
+	/* flags offset and guest virtual address offset */
+#ifdef RTE_VIRTIO_USER
+	__m128i flag_offset = _mm_set_epi64x(flags_temp, (uint64_t)vq->offset);
+#else
+	__m128i flag_offset = _mm_set_epi64x(flags_temp, 0);
+#endif
+	__m512i v_offset = _mm512_broadcast_i32x4(flag_offset);
+
+	__m512i v_desc = _mm512_add_epi64(new_descs, v_offset);
+
+	if (!vq->hw->has_tx_offload) {
+		__m128i mask = _mm_set1_epi16(0xFFFF);
+		virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+			hdr = rte_pktmbuf_mtod_offset(tx_pkts[i],
+					struct virtio_net_hdr *, -head_size);
+			__m128i v_hdr = _mm_loadu_si128((void *)hdr);
+			if (unlikely(_mm_mask_test_epi16_mask(NET_HDR_MASK,
+							v_hdr, mask))) {
+				__m128i all_zero = _mm_setzero_si128();
+				_mm_mask_storeu_epi16((void *)hdr,
+						NET_HDR_MASK, all_zero);
+			}
+		}
+	} else {
+		virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+			hdr = rte_pktmbuf_mtod_offset(tx_pkts[i],
+					struct virtio_net_hdr *, -head_size);
+			virtqueue_xmit_offload(hdr, tx_pkts[i], true);
+		}
+	}
+
+	/* Enqueue Packet buffers */
+	_mm512_storeu_si512((void *)&vq->vq_packed.ring.desc[idx], v_desc);
+
+	virtio_update_batch_stats(&txvq->stats, tx_pkts[0]->pkt_len,
+			tx_pkts[1]->pkt_len, tx_pkts[2]->pkt_len,
+			tx_pkts[3]->pkt_len);
+
+	vq->vq_avail_idx += PACKED_BATCH_SIZE;
+	vq->vq_free_cnt -= PACKED_BATCH_SIZE;
+
+	if (vq->vq_avail_idx >= vq->vq_nentries) {
+		vq->vq_avail_idx -= vq->vq_nentries;
+		vq->vq_packed.cached_flags ^=
+			VRING_PACKED_DESC_F_AVAIL_USED;
+	}
+
+	return 0;
+}
+
+static inline int
+virtqueue_enqueue_single_packed_vec(struct virtnet_tx *txvq,
+				    struct rte_mbuf *txm)
+{
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t hdr_size = hw->vtnet_hdr_size;
+	uint16_t slots, can_push;
+	int16_t need;
+
+	/* How many main ring entries are needed to this Tx?
+	 * any_layout => number of segments
+	 * default    => number of segments + 1
+	 */
+	can_push = rte_mbuf_refcnt_read(txm) == 1 &&
+		   RTE_MBUF_DIRECT(txm) &&
+		   txm->nb_segs == 1 &&
+		   rte_pktmbuf_headroom(txm) >= hdr_size;
+
+	slots = txm->nb_segs + !can_push;
+	need = slots - vq->vq_free_cnt;
+
+	/* Positive value indicates it need free vring descriptors */
+	if (unlikely(need > 0)) {
+		virtio_xmit_cleanup_packed_vec(vq);
+		need = slots - vq->vq_free_cnt;
+		if (unlikely(need > 0)) {
+			PMD_TX_LOG(ERR,
+				   "No free tx descriptors to transmit");
+			return -1;
+		}
+	}
+
+	/* Enqueue Packet buffers */
+	virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push, 1);
+
+	txvq->stats.bytes += txm->pkt_len;
+	return 0;
+}
+
+uint16_t
+virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+			uint16_t nb_pkts)
+{
+	struct virtnet_tx *txvq = tx_queue;
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t nb_tx = 0;
+	uint16_t remained;
+
+	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+		return nb_tx;
+
+	if (unlikely(nb_pkts < 1))
+		return nb_pkts;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	if (vq->vq_free_cnt <= vq->vq_nentries - vq->vq_free_thresh)
+		virtio_xmit_cleanup_packed_vec(vq);
+
+	remained = RTE_MIN(nb_pkts, vq->vq_free_cnt);
+
+	while (remained) {
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (!virtqueue_enqueue_batch_packed_vec(txvq,
+						&tx_pkts[nb_tx])) {
+				nb_tx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
+			}
+		}
+		if (!virtqueue_enqueue_single_packed_vec(txvq,
+					tx_pkts[nb_tx])) {
+			nb_tx++;
+			remained--;
+			continue;
+		}
+		break;
+	};
+
+	txvq->stats.packets += nb_tx;
+
+	if (likely(nb_tx)) {
+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+			virtqueue_notify(vq);
+			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+		}
+	}
+
+	return nb_tx;
+}
+
 /* Optionally fill offload information in structure */
 static inline int
 virtio_vec_rx_offload(struct rte_mbuf *m, struct virtio_net_hdr *hdr)
-- 
2.17.1



More information about the dev mailing list