[dpdk-dev] [RFC PATCH 14/14] ixgbe: Allow vector RX of scattered packets

Bruce Richardson bruce.richardson at intel.com
Mon Aug 11 22:44:50 CEST 2014


Provide a wrapper routine to enable receive of scattered packets with a
vector driver.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
---
 lib/librte_pmd_ixgbe/ixgbe_rxtx.c     |   8 +-
 lib/librte_pmd_ixgbe/ixgbe_rxtx.h     |   1 +
 lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c | 164 ++++++++++++++++++++++++++++++++--
 3 files changed, 160 insertions(+), 13 deletions(-)

diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
index cca44b8..95c736d 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
@@ -3504,12 +3504,12 @@ ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 		if ((dev->data->dev_conf.rxmode.max_rx_pkt_len +
 				2 * IXGBE_VLAN_TAG_SIZE) > buf_size){
 			dev->data->scattered_rx = 1;
-			dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
+			dev->rx_pkt_burst = ixgbe_recv_scattered_pkts_vec;
 		}
 	}
 
 	if (dev->data->dev_conf.rxmode.enable_scatter) {
-		dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
+		dev->rx_pkt_burst = ixgbe_recv_scattered_pkts_vec;
 		dev->data->scattered_rx = 1;
 	}
 
@@ -3997,12 +3997,12 @@ ixgbevf_dev_rx_init(struct rte_eth_dev *dev)
 		if ((dev->data->dev_conf.rxmode.max_rx_pkt_len +
 				2 * IXGBE_VLAN_TAG_SIZE) > buf_size) {
 			dev->data->scattered_rx = 1;
-			dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
+			dev->rx_pkt_burst = ixgbe_recv_scattered_pkts_vec;
 		}
 	}
 
 	if (dev->data->dev_conf.rxmode.enable_scatter) {
-		dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
+		dev->rx_pkt_burst = ixgbe_recv_scattered_pkts_vec;
 		dev->data->scattered_rx = 1;
 	}
 
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
index 76db3ca..718133c 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
@@ -225,6 +225,7 @@ struct ixgbe_txq_ops {
 
 #ifdef RTE_IXGBE_INC_VECTOR
 uint16_t ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
+uint16_t ixgbe_recv_scattered_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
 uint16_t ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts);
 int ixgbe_txq_vec_setup(struct igb_tx_queue *txq);
 int ixgbe_rx_vec_condition_check(struct rte_eth_dev *dev);
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
index ba9d050..badcdb6 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
@@ -38,7 +38,7 @@
 #include "ixgbe_ethdev.h"
 #include "ixgbe_rxtx.h"
 
-#include <nmmintrin.h>
+#include <tmmintrin.h>
 
 #ifndef __INTEL_COMPILER
 #pragma GCC diagnostic ignored "-Wcast-qual"
@@ -165,12 +165,11 @@ desc_to_olflags_v(__m128i descs[4], struct rte_mbuf **rx_pkts)
  *   numbers of DD bit
  * - don't support ol_flags for rss and csum err
  */
-uint16_t
-ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
-		uint16_t nb_pkts)
+static inline uint16_t
+_recv_raw_pkts_vec(struct igb_rx_queue *rxq, struct rte_mbuf **rx_pkts,
+		uint16_t nb_pkts, uint8_t *split_packet)
 {
 	volatile union ixgbe_adv_rx_desc *rxdp;
-	struct igb_rx_queue *rxq = rx_queue;
 	struct igb_rx_entry *sw_ring;
 	uint16_t nb_pkts_recd;
 	int pos;
@@ -183,7 +182,7 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 				-rxq->crc_len, /* sub crc on data_len */
 				0            /* ignore pkt_type field */
 			);
-	__m128i dd_check;
+	__m128i dd_check, eop_check;
 
 	if (unlikely(nb_pkts < RTE_IXGBE_VPMD_RX_BURST))
 		return 0;
@@ -208,6 +207,9 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 	/* 4 packets DD mask */
 	dd_check = _mm_set_epi64x(0x0000000100000001LL, 0x0000000100000001LL);
 
+	/* 4 packets EOP mask */
+	eop_check = _mm_set_epi64x(0x0000000200000002LL, 0x0000000200000002LL);
+
 	/* mask to shuffle from desc. to mbuf */
 	shuf_msk = _mm_set_epi8(
 		7, 6, 5, 4,  /* octet 4~7, 32bits rss */
@@ -219,7 +221,6 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 		0xFF, 0xFF   /* skip pkt_type field */
 		);
 
-
 	/* Cache is empty -> need to scan the buffer rings, but first move
 	 * the next 'n' mbufs into the cache */
 	sw_ring = &rxq->sw_ring[rxq->rx_tail];
@@ -228,6 +229,7 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 	 * A. load 4 packet in one loop
 	 * B. copy 4 mbuf point from swring to rx_pkts
 	 * C. calc the number of DD bits among the 4 packets
+	 * [C*. extract the end-of-packet bit, if requested]
 	 * D. fill info. from desc to mbuf
 	 */
 	for (pos = 0, nb_pkts_recd = 0; pos < RTE_IXGBE_VPMD_RX_BURST;
@@ -296,7 +298,30 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 		pkt_mb2 = _mm_add_epi16(pkt_mb2, crc_adjust);
 		pkt_mb1 = _mm_add_epi16(pkt_mb1, crc_adjust);
 
-		/* C.3 calc avaialbe number of desc */
+		/* C* extract and record EOP bit */
+		if (split_packet != NULL){
+			__m128i eop_shuf_mask = _mm_set_epi8(
+					0xFF, 0xFF, 0xFF, 0xFF,
+					0xFF, 0xFF, 0xFF, 0xFF,
+					0xFF, 0xFF, 0xFF, 0xFF,
+					0x04, 0x0C, 0x00, 0x08
+					);
+
+			/* and with mask to extract bits, flipping 1-0 */
+			__m128i eop_bits = _mm_andnot_si128(staterr, eop_check);
+			/* convert from 0/2 value to a 0/1 value */
+			//eop_bits = _mm_srli_epi32(eop_bits, 1);
+			/* the staterr values are not in order, as the count
+			 * count of dd bits doesn't care. However, for end of
+			 * packet tracking, we do care, so shuffle. This also
+			 * compresses the 32-bit values to 8-bit */
+			eop_bits = _mm_shuffle_epi8(eop_bits, eop_shuf_mask);
+			/* store the resulting 32-bit value */
+			*(int *)split_packet = _mm_cvtsi128_si32(eop_bits);
+			split_packet += RTE_IXGBE_DESCS_PER_LOOP;
+		}
+
+		/* C.3 calc available number of desc */
 		staterr = _mm_and_si128(staterr, dd_check);
 		staterr = _mm_packs_epi32(staterr, zero);
 
@@ -307,7 +332,7 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 				pkt_mb1);
 
 		/* C.4 calc avaialbe number of desc */
-		var = _mm_popcnt_u64(_mm_cvtsi128_si64(staterr));
+		var = __builtin_popcountll(_mm_cvtsi128_si64(staterr));
 		nb_pkts_recd += var;
 		if (likely(var != RTE_IXGBE_DESCS_PER_LOOP))
 			break;
@@ -320,6 +345,127 @@ ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
 
 	return nb_pkts_recd;
 }
+
+/*
+ * vPMD receive routine, now only accept (nb_pkts == RTE_IXGBE_VPMD_RX_BURST)
+ * in one loop
+ *
+ * Notice:
+ * - nb_pkts < RTE_IXGBE_VPMD_RX_BURST, just return no packet
+ * - nb_pkts > RTE_IXGBE_VPMD_RX_BURST, only scan RTE_IXGBE_VPMD_RX_BURST
+ *   numbers of DD bit
+ * - don't support ol_flags for rss and csum err
+ */
+uint16_t
+ixgbe_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
+		uint16_t nb_pkts)
+{
+	return _recv_raw_pkts_vec(rx_queue, rx_pkts, nb_pkts, NULL);
+}
+
+static inline uint16_t
+reassemble_packets(struct igb_rx_queue *rxq, struct rte_mbuf **rx_bufs,
+		uint16_t nb_bufs, uint8_t *split_flags)
+{
+	struct rte_mbuf *pkts[RTE_IXGBE_VPMD_RX_BURST]; /*finished pkts*/
+	struct rte_mbuf *start = rxq->pkt_first_seg;
+	struct rte_mbuf *end =  rxq->pkt_last_seg;
+	unsigned pkt_idx = 0, buf_idx = 0;
+
+
+	while (buf_idx < nb_bufs) {
+		if (end != NULL) {
+			/* processing a split packet */
+			end->next = rx_bufs[buf_idx];
+			rx_bufs[buf_idx]->data_len += rxq->crc_len;
+
+			start->nb_segs++;
+			start->pkt_len += rx_bufs[buf_idx]->data_len;
+			end = end->next;
+
+			if (!split_flags[buf_idx]) {
+				/* it's the last packet of the set */
+				start->filters = end->filters;
+				start->ol_flags = end->ol_flags;
+				/* we need to strip crc for the whole packet */
+				start->pkt_len -= rxq->crc_len;
+				if (end->data_len > rxq->crc_len)
+					end->data_len -= rxq->crc_len;
+				else {
+					/* free up last mbuf */
+					struct rte_mbuf *secondlast = start;
+					while (secondlast->next != end)
+						secondlast = secondlast->next;
+					secondlast->data_len -= (rxq->crc_len -
+							end->data_len);
+					secondlast->next = NULL;
+					rte_pktmbuf_free(end);
+					end = secondlast;
+				}
+				pkts[pkt_idx++] = start;
+				start = end = NULL;
+			}
+		} else {
+			/* not processing a split packet */
+			if (!split_flags[buf_idx]) {
+				/* not a split packet, save and skip */
+				pkts[pkt_idx++] = rx_bufs[buf_idx];
+				continue;
+			}
+			end = start = rx_bufs[buf_idx];
+			rx_bufs[buf_idx]->data_len += rxq->crc_len;
+			rx_bufs[buf_idx]->pkt_len += rxq->crc_len;
+		}
+		buf_idx++;
+	}
+
+	/* save the partial packet for next time */
+	rxq->pkt_first_seg = start;
+	rxq->pkt_last_seg = end;
+	memcpy(rx_bufs, pkts, pkt_idx * (sizeof(*pkts)));
+	return pkt_idx;
+}
+
+/*
+ * vPMD receive routine that reassembles scattered packets
+ *
+ * Notice:
+ * - don't support ol_flags for rss and csum err
+ * - now only accept (nb_pkts == RTE_IXGBE_VPMD_RX_BURST)
+ */
+uint16_t
+ixgbe_recv_scattered_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
+		uint16_t nb_pkts)
+{
+	struct igb_rx_queue *rxq = rx_queue;
+	uint8_t split_flags[RTE_IXGBE_VPMD_RX_BURST] = {0};
+
+	/* get some new buffers */
+	uint16_t nb_bufs = _recv_raw_pkts_vec(rxq, rx_pkts, nb_pkts,
+			split_flags);
+	if (nb_bufs == 0)
+		return 0;
+
+	/* happy day case, full burst + no packets to be joined */
+	const uint32_t *split_fl32 = (uint32_t *)split_flags;
+	if (rxq->pkt_first_seg == NULL &&
+			split_fl32[0] == 0 && split_fl32[1] == 0 &&
+			split_fl32[2] == 0 && split_fl32[3] == 0 )
+		return nb_bufs;
+
+	/* reassemble any packets that need reassembly*/
+	unsigned i = 0;
+	if (rxq->pkt_first_seg == NULL ){
+		/* find the first split flag, and only reassemble then*/
+		while (!split_flags[i] && i < nb_bufs)
+			i++;
+		if (i == nb_bufs)
+			return nb_bufs;
+	}
+	return i + reassemble_packets(rxq, &rx_pkts[i], nb_bufs - i,
+		&split_flags[i]);
+}
+
 static inline void
 vtx1(volatile union ixgbe_adv_tx_desc *txdp,
 		struct rte_mbuf *pkt, uint64_t flags)
-- 
1.9.3



More information about the dev mailing list