[dpdk-dev] [PATCH v2 6/8] net/mlx4: mitigate Tx send entry size calculations

Matan Azrad matan at mellanox.com
Wed Dec 6 15:48:11 CET 2017


The previuse code took a send queue entry size for stamping from the
send queue entry pointed by completion queue entry; This 2 reads were
done per packet in completion stage.

The completion burst packets number is managed by fixed size stored in
Tx queue, so we can infer that each valid completion entry actually frees
the next fixed number packets.

The descriptors ring holds the send queue entry, so we just can infer
all the completion burst packet entries size by simple calculation and
prevent calculations per packet.

Adjust completion functions to free full completion bursts packets
by one time and prevent per packet work queue entry reads and
calculations.

Save only start of completion burst or Tx burst send queue entry
pointers in the appropriate descriptor element.

Signed-off-by: Matan Azrad <matan at mellanox.com>
Acked-by: Adrien Mazarguil <adrien.mazarguil at 6wind.com>
---
 drivers/net/mlx4/mlx4_rxtx.c | 105 +++++++++++++++++++------------------------
 drivers/net/mlx4/mlx4_rxtx.h |   5 ++-
 2 files changed, 50 insertions(+), 60 deletions(-)

diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
index 2467d1d..8b8d95e 100644
--- a/drivers/net/mlx4/mlx4_rxtx.c
+++ b/drivers/net/mlx4/mlx4_rxtx.c
@@ -258,55 +258,48 @@ struct pv {
 };
 
 /**
- * Stamp a WQE so it won't be reused by the HW.
+ * Stamp TXBB burst so it won't be reused by the HW.
  *
  * Routine is used when freeing WQE used by the chip or when failing
  * building an WQ entry has failed leaving partial information on the queue.
  *
  * @param sq
  *   Pointer to the SQ structure.
- * @param wqe
- *   Pointer of WQE address to stamp.
+ * @param start
+ *   Pointer to the first TXBB to stamp.
+ * @param end
+ *   Pointer to the followed end TXBB to stamp.
  *
  * @return
- *   WQE size and updates WQE address to the next WQE.
+ *   Stamping burst size in byte units.
  */
 static uint32_t
-mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, volatile uint32_t **wqe)
+mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, volatile uint32_t *start,
+			 volatile uint32_t *end)
 {
 	uint32_t stamp = sq->stamp;
-	volatile uint32_t *next_txbb = *wqe;
-	/* Extract the size from the control segment of the WQE. */
-	uint32_t size = RTE_ALIGN((uint32_t)
-				  ((((volatile struct mlx4_wqe_ctrl_seg *)
-				     next_txbb)->fence_size & 0x3f) << 4),
-				  MLX4_TXBB_SIZE);
-	uint32_t size_cd = size;
+	int32_t size = (intptr_t)end - (intptr_t)start;
 
-	/* Optimize the common case when there is no wrap-around. */
-	if ((uintptr_t)next_txbb + size < (uintptr_t)sq->eob) {
-		/* Stamp the freed descriptor. */
+	assert(start != end);
+	/* Hold SQ ring wrap around. */
+	if (size < 0) {
+		size = (int32_t)sq->size + size;
 		do {
-			*next_txbb = stamp;
-			next_txbb += MLX4_SQ_STAMP_DWORDS;
-			size_cd -= MLX4_TXBB_SIZE;
-		} while (size_cd);
-	} else {
-		/* Stamp the freed descriptor. */
-		do {
-			*next_txbb = stamp;
-			next_txbb += MLX4_SQ_STAMP_DWORDS;
-			if ((volatile uint8_t *)next_txbb >= sq->eob) {
-				next_txbb = (volatile uint32_t *)sq->buf;
-				/* Flip invalid stamping ownership. */
-				stamp ^= RTE_BE32(0x1 << MLX4_SQ_OWNER_BIT);
-				sq->stamp = stamp;
-			}
-			size_cd -= MLX4_TXBB_SIZE;
-		} while (size_cd);
+			*start = stamp;
+			start += MLX4_SQ_STAMP_DWORDS;
+		} while (start != (volatile uint32_t *)sq->eob);
+		start = (volatile uint32_t *)sq->buf;
+		/* Flip invalid stamping ownership. */
+		stamp ^= RTE_BE32(0x1 << MLX4_SQ_OWNER_BIT);
+		sq->stamp = stamp;
+		if (start == end)
+			return size;
 	}
-	*wqe = next_txbb;
-	return size;
+	do {
+		*start = stamp;
+		start += MLX4_SQ_STAMP_DWORDS;
+	} while (start != end);
+	return (uint32_t)size;
 }
 
 /**
@@ -327,14 +320,10 @@ struct pv {
 	unsigned int elts_tail = txq->elts_tail;
 	struct mlx4_cq *cq = &txq->mcq;
 	volatile struct mlx4_cqe *cqe;
+	uint32_t completed;
 	uint32_t cons_index = cq->cons_index;
-	volatile uint32_t *first_wqe;
-	volatile uint32_t *next_wqe = (volatile uint32_t *)
-			((&(*txq->elts)[elts_tail])->wqe);
-	volatile uint32_t *last_wqe;
-	uint16_t mask = (((uintptr_t)sq->eob - (uintptr_t)sq->buf) >>
-			 MLX4_TXBB_SHIFT) - 1;
-	uint32_t pkts = 0;
+	volatile uint32_t *first_txbb;
+
 	/*
 	 * Traverse over all CQ entries reported and handle each WQ entry
 	 * reported by them.
@@ -360,28 +349,23 @@ struct pv {
 			break;
 		}
 #endif /* NDEBUG */
-		/* Get WQE address buy index from the CQE. */
-		last_wqe = (volatile uint32_t *)((uintptr_t)sq->buf +
-			((rte_be_to_cpu_16(cqe->wqe_index) & mask) <<
-			 MLX4_TXBB_SHIFT));
-		do {
-			/* Free next descriptor. */
-			first_wqe = next_wqe;
-			sq->remain_size +=
-				mlx4_txq_stamp_freed_wqe(sq, &next_wqe);
-			pkts++;
-		} while (first_wqe != last_wqe);
 		cons_index++;
 	} while (1);
-	if (unlikely(pkts == 0))
+	completed = (cons_index - cq->cons_index) * txq->elts_comp_cd_init;
+	if (unlikely(!completed))
 		return;
+	/* First stamping address is the end of the last one. */
+	first_txbb = (&(*txq->elts)[elts_tail])->eocb;
+	elts_tail += completed;
+	if (elts_tail >= elts_n)
+		elts_tail -= elts_n;
+	/* The new tail element holds the end address. */
+	sq->remain_size += mlx4_txq_stamp_freed_wqe(sq, first_txbb,
+		(&(*txq->elts)[elts_tail])->eocb);
 	/* Update CQ consumer index. */
 	cq->cons_index = cons_index;
 	*cq->set_ci_db = rte_cpu_to_be_32(cons_index & MLX4_CQ_DB_CI_MASK);
-	txq->elts_comp -= pkts;
-	elts_tail += pkts;
-	if (elts_tail >= elts_n)
-		elts_tail -= elts_n;
+	txq->elts_comp -= completed;
 	txq->elts_tail = elts_tail;
 }
 
@@ -616,7 +600,7 @@ struct pv {
 	if (max > pkts_n)
 		max = pkts_n;
 	elt = &(*txq->elts)[elts_head];
-	/* Each element saves its appropriate work queue. */
+	/* First Tx burst element saves the next WQE control segment. */
 	ctrl = elt->wqe;
 	for (i = 0; (i != max); ++i) {
 		struct rte_mbuf *buf = pkts[i];
@@ -691,6 +675,8 @@ struct pv {
 		 * that no ICRC should be calculated.
 		 */
 		if (--txq->elts_comp_cd == 0) {
+			/* Save the completion burst end address. */
+			elt_next->eocb = (volatile uint32_t *)ctrl_next;
 			txq->elts_comp_cd = txq->elts_comp_cd_init;
 			srcrb.flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
 					       MLX4_WQE_CTRL_CQ_UPDATE);
@@ -740,13 +726,14 @@ struct pv {
 		elt->buf = buf;
 		bytes_sent += buf->pkt_len;
 		elts_head = elts_head_next;
-		elt_next->wqe = ctrl_next;
 		ctrl = ctrl_next;
 		elt = elt_next;
 	}
 	/* Take a shortcut if nothing must be sent. */
 	if (unlikely(i == 0))
 		return 0;
+	/* Save WQE address of the next Tx burst element. */
+	elt->wqe = ctrl;
 	/* Increment send statistics counters. */
 	txq->stats.opackets += i;
 	txq->stats.obytes += bytes_sent;
diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
index d56e48d..36ae03a 100644
--- a/drivers/net/mlx4/mlx4_rxtx.h
+++ b/drivers/net/mlx4/mlx4_rxtx.h
@@ -105,7 +105,10 @@ struct mlx4_rss {
 /** Tx element. */
 struct txq_elt {
 	struct rte_mbuf *buf; /**< Buffer. */
-	volatile struct mlx4_wqe_ctrl_seg *wqe; /**< SQ WQE. */
+	union {
+		volatile struct mlx4_wqe_ctrl_seg *wqe; /**< SQ WQE. */
+		volatile uint32_t *eocb; /**< End of completion burst. */
+	};
 };
 
 /** Rx queue counters. */
-- 
1.8.3.1



More information about the dev mailing list