[dpdk-dev] Making space in mbuf data-structure

Richardson, Bruce bruce.richardson at intel.com
Fri Jul 4 01:38:58 CEST 2014


Hi all,

At this stage it's been well recognised that we need to make more space in the mbuf data structure for new fields. We in Intel have had some discussion on this and this email is to outline what our current thinking and approach on this issue is, and look for additional suggestions and feedback.

Firstly, we believe that there is no possible way that we can ever fit all the fields we need to fit into a 64-byte mbuf, and so we need to start looking at a 128-byte mbuf instead. Examples of new fields that need to fit in, include - 32-64 bits for additional offload flags, 32-bits more for filter information for support for the new filters in the i40e driver, an additional 2-4 bytes for storing info on a second vlan tag, 4-bytes for storing a sequence number to enable packet reordering in future, as well as potentially a number of other fields or splitting out fields that are superimposed over each other right now, e.g. for the qos scheduler. We also want to allow space for use by other non-Intel NIC drivers that may be open-sourced to dpdk.org in the future too, where they support fields and offloads that our hardware doesn't.

If we accept the fact of a 2-cache-line mbuf, then the issue becomes how to rework things so that we spread our fields over the two cache lines while causing the lowest slow-down possible. The general approach that we are looking to take is to focus the first cache line on fields that are updated on RX , so that receive only deals with one cache line. The second cache line can be used for application data and information that will only be used on the TX leg. This would allow us to work on the first cache line in RX as now, and have the second cache line being prefetched in the background so that it is available when necessary. Hardware prefetches should help us out here. We also may move rarely used, or slow-path RX fields e.g. such as those for chained mbufs with jumbo frames, to the second cache line, depending upon the performance impact and bytes savings achieved.

With this approach, we still need to make space in the first cache line for information for the new or expanded receive offloads. First off the blocks is to look at moving the mempool pointer into the second cache line. This will free-up 8 bytes in cache line  0, with a field that is only used when cleaning up after TX. A prototype patch for this change is given below, for your review and comment. Initial quick tests with testpmd (testpmd -c 600 -n 4 -- --mbcache=250 --txqflags=0xf01 --burst=32 --txrst=32 --txfreet=32 --rxfreet=32 --total-num-mbufs=65536), and l3fwd (l3fwd -c 400 -n 4 -- -p F -P --config="(0,0,10),(1,0,10),(2,0,10),(3,0,10)") showed only a slight performance decrease with testpmd and equal or slightly better performance with l3fwd. These would both have been using the vector PMD - I haven't looked to change the fallback TX implementation yet for this change, since it's not as performance optimized and hence cycle-count sensitive.

Beyond this change, I'm also investigating potentially moving the "next" pointer to the second cache line, but it's looking harder to move without serious impact, so we'll have to see there what can be done. Other fields I'll examine in due course too, and send on any findings I may have. 

Once we have freed up space, then we can start looking at what fields get to use that space and what way we shuffle the existing fields about, but that's a discussion for another day!

Please see patch below for moving pool pointer to second cache line of mbuf. All feedback welcome, naturally.

Regards,
/Bruce

diff --git a/app/test/test_mbuf.c b/app/test/test_mbuf.c
index 2b87521..e0cf30c 100644
--- a/app/test/test_mbuf.c
+++ b/app/test/test_mbuf.c
@@ -832,7 +832,7 @@ test_failing_mbuf_sanity_check(void)
 int
 test_mbuf(void)
 {
-	RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != 64);
+	RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != CACHE_LINE_SIZE*2);
 
 	/* create pktmbuf pool if it does not exist */
 	if (pktmbuf_pool == NULL) {
diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
index 2735f37..aa3ec32 100644
--- a/lib/librte_mbuf/rte_mbuf.h
+++ b/lib/librte_mbuf/rte_mbuf.h
@@ -181,7 +181,8 @@ enum rte_mbuf_type {
  * The generic rte_mbuf, containing a packet mbuf or a control mbuf.
  */
 struct rte_mbuf {
-	struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
+	uint64_t this_space_for_rent; // 8-bytes free for reuse.
+
 	void *buf_addr;           /**< Virtual address of segment buffer. */
 	phys_addr_t buf_physaddr; /**< Physical address of segment buffer. */
 	uint16_t buf_len;         /**< Length of segment buffer. */
@@ -210,12 +211,16 @@ struct rte_mbuf {
 		struct rte_pktmbuf pkt;
 	};
 
+// second cache line starts here
+	struct rte_mempool *pool __rte_cache_aligned;
+			/**< Pool from which mbuf was allocated. */
+
 	union {
 		uint8_t metadata[0];
 		uint16_t metadata16[0];
 		uint32_t metadata32[0];
 		uint64_t metadata64[0];
-	};
+	} __rte_cache_aligned;
 } __rte_cache_aligned;
 
 #define RTE_MBUF_METADATA_UINT8(mbuf, offset)              \
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
index 64c0695..7374e0d 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
@@ -171,10 +171,6 @@ struct igb_tx_queue {
 	volatile union ixgbe_adv_tx_desc *tx_ring;
 	uint64_t            tx_ring_phys_addr; /**< TX ring DMA address. */
 	struct igb_tx_entry *sw_ring;      /**< virtual address of SW ring. */
-#ifdef RTE_IXGBE_INC_VECTOR
-	/** continuous tx entry sequence within the same mempool */
-	struct igb_tx_entry_seq *sw_ring_seq;
-#endif
 	volatile uint32_t   *tdt_reg_addr; /**< Address of TDT register. */
 	uint16_t            nb_tx_desc;    /**< number of TX descriptors. */
 	uint16_t            tx_tail;       /**< current value of TDT reg. */
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
index 09e19a3..e9fd739 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
@@ -401,9 +401,8 @@ static inline int __attribute__((always_inline))
 ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
 {
 	struct igb_tx_entry_v *txep;
-	struct igb_tx_entry_seq *txsp;
 	uint32_t status;
-	uint32_t n, k;
+	uint32_t n;
 #ifdef RTE_MBUF_SCATTER_GATHER
 	uint32_t i;
 	int nb_free = 0;
@@ -423,25 +422,36 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
 	 */
 	txep = &((struct igb_tx_entry_v *)txq->sw_ring)[txq->tx_next_dd -
 			(n - 1)];
-	txsp = &txq->sw_ring_seq[txq->tx_next_dd - (n - 1)];
 
-	while (n > 0) {
-		k = RTE_MIN(n, txsp[n-1].same_pool);
 #ifdef RTE_MBUF_SCATTER_GATHER
-		for (i = 0; i < k; i++) {
-			m = __rte_pktmbuf_prefree_seg((txep+n-k+i)->mbuf);
+	m = __rte_pktmbuf_prefree_seg(txep[0].mbuf);
+	if (likely(m != NULL)) {
+		free[0] = m;
+		nb_free = 1;
+		for (i = 1; i < n; i++) {
+			m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
+			if (likely(m != NULL)) {
+				if (likely(m->pool == free[0]->pool))
+					free[nb_free++] = m;
+				else
+					rte_mempool_put(m->pool, m);
+			}
+		}
+		rte_mempool_put_bulk(free[0]->pool, (void **)free, nb_free);
+	}
+	else {
+		for (i = 1; i < n; i++) {
+			m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
 			if (m != NULL)
-				free[nb_free++] = m;
+				rte_mempool_put(m->pool, m);
 		}
-		rte_mempool_put_bulk((void *)txsp[n-1].pool,
-				(void **)free, nb_free);
-#else
-		rte_mempool_put_bulk((void *)txsp[n-1].pool,
-				(void **)(txep+n-k), k);
-#endif
-		n -= k;
 	}
 
+#else /* no scatter_gather */
+	for (i = 0; i < n; i++)
+		rte_mempool_put(m->pool, txep[i].mbuf);
+#endif /* scatter_gather */
+
 	/* buffers were freed, update counters */
 	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free + txq->tx_rs_thresh);
 	txq->tx_next_dd = (uint16_t)(txq->tx_next_dd + txq->tx_rs_thresh);
@@ -453,19 +463,11 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
 
 static inline void __attribute__((always_inline))
 tx_backlog_entry(struct igb_tx_entry_v *txep,
-		 struct igb_tx_entry_seq *txsp,
 		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
 	int i;
-	for (i = 0; i < (int)nb_pkts; ++i) {
+	for (i = 0; i < (int)nb_pkts; ++i)
 		txep[i].mbuf = tx_pkts[i];
-		/* check and update sequence number */
-		txsp[i].pool = tx_pkts[i]->pool;
-		if (txsp[i-1].pool == tx_pkts[i]->pool)
-			txsp[i].same_pool = txsp[i-1].same_pool + 1;
-		else
-			txsp[i].same_pool = 1;
-	}
 }
 
 uint16_t
@@ -475,7 +477,6 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
 	struct igb_tx_queue *txq = (struct igb_tx_queue *)tx_queue;
 	volatile union ixgbe_adv_tx_desc *txdp;
 	struct igb_tx_entry_v *txep;
-	struct igb_tx_entry_seq *txsp;
 	uint16_t n, nb_commit, tx_id;
 	__m128i flags = _mm_set_epi32(DCMD_DTYP_FLAGS, 0, 0, 0);
 	__m128i rs = _mm_set_epi32(IXGBE_ADVTXD_DCMD_RS|DCMD_DTYP_FLAGS,
@@ -495,14 +496,13 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
 	tx_id = txq->tx_tail;
 	txdp = &txq->tx_ring[tx_id];
 	txep = &((struct igb_tx_entry_v *)txq->sw_ring)[tx_id];
-	txsp = &txq->sw_ring_seq[tx_id];
 
 	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free - nb_pkts);
 
 	n = (uint16_t)(txq->nb_tx_desc - tx_id);
 	if (nb_commit >= n) {
 
-		tx_backlog_entry(txep, txsp, tx_pkts, n);
+		tx_backlog_entry(txep, tx_pkts, n);
 
 		for (i = 0; i < n - 1; ++i, ++tx_pkts, ++txdp)
 			vtx1(txdp, *tx_pkts, flags);
@@ -517,10 +517,9 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
 		/* avoid reach the end of ring */
 		txdp = &(txq->tx_ring[tx_id]);
 		txep = &(((struct igb_tx_entry_v *)txq->sw_ring)[tx_id]);
-		txsp = &(txq->sw_ring_seq[tx_id]);
 	}
 
-	tx_backlog_entry(txep, txsp, tx_pkts, nb_commit);
+	tx_backlog_entry(txep, tx_pkts, nb_commit);
 
 	vtx(txdp, tx_pkts, nb_commit, flags);
 
@@ -544,7 +543,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
 {
 	unsigned i;
 	struct igb_tx_entry_v *txe;
-	struct igb_tx_entry_seq *txs;
 	uint16_t nb_free, max_desc;
 
 	if (txq->sw_ring != NULL) {
@@ -562,10 +560,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
 		for (i = 0; i < txq->nb_tx_desc; i++) {
 			txe = (struct igb_tx_entry_v *)&txq->sw_ring[i];
 			txe->mbuf = NULL;
-
-			txs = &txq->sw_ring_seq[i];
-			txs->pool = NULL;
-			txs->same_pool = 0;
 		}
 	}
 }
@@ -580,11 +574,6 @@ ixgbe_tx_free_swring(struct igb_tx_queue *txq)
 		rte_free((struct igb_rx_entry *)txq->sw_ring - 1);
 		txq->sw_ring = NULL;
 	}
-
-	if (txq->sw_ring_seq != NULL) {
-		rte_free(txq->sw_ring_seq - 1);
-		txq->sw_ring_seq = NULL;
-	}
 }
 
 static void
@@ -593,7 +582,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
 	static const union ixgbe_adv_tx_desc zeroed_desc = { .read = {
 			.buffer_addr = 0} };
 	struct igb_tx_entry_v *txe = (struct igb_tx_entry_v *)txq->sw_ring;
-	struct igb_tx_entry_seq *txs = txq->sw_ring_seq;
 	uint16_t i;
 
 	/* Zero out HW ring memory */
@@ -605,8 +593,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
 		volatile union ixgbe_adv_tx_desc *txd = &txq->tx_ring[i];
 		txd->wb.status = IXGBE_TXD_STAT_DD;
 		txe[i].mbuf = NULL;
-		txs[i].pool = NULL;
-		txs[i].same_pool = 0;
 	}
 
 	txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
@@ -632,27 +618,14 @@ static struct ixgbe_txq_ops vec_txq_ops = {
 };
 
 int ixgbe_txq_vec_setup(struct igb_tx_queue *txq,
-			unsigned int socket_id)
+			__rte_unused unsigned int socket_id)
 {
-	uint16_t nb_desc;
-
 	if (txq->sw_ring == NULL)
 		return -1;
 
-	/* request addtional one entry for continous sequence check */
-	nb_desc = (uint16_t)(txq->nb_tx_desc + 1);
-
-	txq->sw_ring_seq = rte_zmalloc_socket("txq->sw_ring_seq",
-				sizeof(struct igb_tx_entry_seq) * nb_desc,
-				CACHE_LINE_SIZE, socket_id);
-	if (txq->sw_ring_seq == NULL)
-		return -1;
-
-
 	/* leave the first one for overflow */
 	txq->sw_ring = (struct igb_tx_entry *)
 		((struct igb_tx_entry_v *)txq->sw_ring + 1);
-	txq->sw_ring_seq += 1;
 	txq->ops = &vec_txq_ops;
 
 	return 0;


More information about the dev mailing list