[dpdk-dev] [PATCH v3 3/6] net/mlx4: restore Tx gather support

Adrien Mazarguil adrien.mazarguil at 6wind.com
Wed Oct 4 20:48:55 CEST 2017


From: Moti Haimovsky <motih at mellanox.com>

This patch adds support for transmitting packets spanning over multiple
buffers.

In this patch we also take into consideration the amount of entries a
packet occupies in the TxQ when setting the report-completion flag of the
chip.

Signed-off-by: Moti Haimovsky <motih at mellanox.com>
Acked-by: Adrien Mazarguil <adrien.mazarguil at 6wind.com>
---
 drivers/net/mlx4/mlx4_rxtx.c | 197 ++++++++++++++++++++++----------------
 drivers/net/mlx4/mlx4_rxtx.h |   6 +-
 drivers/net/mlx4/mlx4_txq.c  |  12 ++-
 3 files changed, 127 insertions(+), 88 deletions(-)

diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
index fd8ef7b..cc0baaa 100644
--- a/drivers/net/mlx4/mlx4_rxtx.c
+++ b/drivers/net/mlx4/mlx4_rxtx.c
@@ -63,6 +63,15 @@
 #include "mlx4_utils.h"
 
 /**
+ * Pointer-value pair structure used in tx_post_send for saving the first
+ * DWORD (32 byte) of a TXBB.
+ */
+struct pv {
+	struct mlx4_wqe_data_seg *dseg;
+	uint32_t val;
+};
+
+/**
  * Stamp a WQE so it won't be reused by the HW.
  *
  * Routine is used when freeing WQE used by the chip or when failing
@@ -291,24 +300,28 @@ mlx4_txq_mp2mr(struct txq *txq, struct rte_mempool *mp)
  *   Target Tx queue.
  * @param pkt
  *   Packet to transmit.
- * @param send_flags
- *   @p MLX4_WQE_CTRL_CQ_UPDATE to request completion on this packet.
  *
  * @return
  *   0 on success, negative errno value otherwise and rte_errno is set.
  */
 static inline int
-mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt, uint32_t send_flags)
+mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt)
 {
 	struct mlx4_wqe_ctrl_seg *ctrl;
 	struct mlx4_wqe_data_seg *dseg;
 	struct mlx4_sq *sq = &txq->msq;
+	struct rte_mbuf *buf;
 	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
 	uint32_t lkey;
 	uintptr_t addr;
+	uint32_t srcrb_flags;
+	uint32_t owner_opcode = MLX4_OPCODE_SEND;
+	uint32_t byte_count;
 	int wqe_real_size;
 	int nr_txbbs;
 	int rc;
+	struct pv *pv = (struct pv *)txq->bounce_buf;
+	int pv_counter = 0;
 
 	/* Calculate the needed work queue entry size for this packet. */
 	wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
@@ -324,56 +337,81 @@ mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt, uint32_t send_flags)
 		rc = ENOSPC;
 		goto err;
 	}
-	/* Get the control and single-data entries of the WQE. */
+	/* Get the control and data entries of the WQE. */
 	ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx);
 	dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
 					    sizeof(struct mlx4_wqe_ctrl_seg));
-	/* Fill the data segment with buffer information. */
-	addr = rte_pktmbuf_mtod(pkt, uintptr_t);
-	rte_prefetch0((volatile void *)addr);
-	dseg->addr = rte_cpu_to_be_64(addr);
-	/* Memory region key for this memory pool. */
-	lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(pkt));
-	if (unlikely(lkey == (uint32_t)-1)) {
-		/* MR does not exist. */
-		DEBUG("%p: unable to get MP <-> MR association", (void *)txq);
+	/* Fill the data segments with buffer information. */
+	for (buf = pkt; buf != NULL; buf = buf->next, dseg++) {
+		addr = rte_pktmbuf_mtod(buf, uintptr_t);
+		rte_prefetch0((volatile void *)addr);
+		/* Handle WQE wraparound. */
+		if (unlikely(dseg >= (struct mlx4_wqe_data_seg *)sq->eob))
+			dseg = (struct mlx4_wqe_data_seg *)sq->buf;
+		dseg->addr = rte_cpu_to_be_64(addr);
+		/* Memory region key for this memory pool. */
+		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
+		if (unlikely(lkey == (uint32_t)-1)) {
+			/* MR does not exist. */
+			DEBUG("%p: unable to get MP <-> MR association",
+			      (void *)txq);
+			/*
+			 * Restamp entry in case of failure.
+			 * Make sure that size is written correctly
+			 * Note that we give ownership to the SW, not the HW.
+			 */
+			ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+			mlx4_txq_stamp_freed_wqe(sq, head_idx,
+				     (sq->head & sq->txbb_cnt) ? 0 : 1);
+			rc = EFAULT;
+			goto err;
+		}
+		dseg->lkey = rte_cpu_to_be_32(lkey);
+		if (likely(buf->data_len)) {
+			byte_count = rte_cpu_to_be_32(buf->data_len);
+		} else {
+			/*
+			 * Zero length segment is treated as inline segment
+			 * with zero data.
+			 */
+			byte_count = RTE_BE32(0x80000000);
+		}
 		/*
-		 * Restamp entry in case of failure, make sure that size is
-		 * written correctly.
-		 * Note that we give ownership to the SW, not the HW.
+		 * If the data segment is not at the beginning of a
+		 * Tx basic block (TXBB) then write the byte count,
+		 * else postpone the writing to just before updating the
+		 * control segment.
 		 */
-		ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
-		mlx4_txq_stamp_freed_wqe(sq, head_idx,
-					 (sq->head & sq->txbb_cnt) ? 0 : 1);
-		rc = EFAULT;
-		goto err;
+		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
+			/*
+			 * Need a barrier here before writing the byte_count
+			 * fields to make sure that all the data is visible
+			 * before the byte_count field is set.
+			 * Otherwise, if the segment begins a new cacheline,
+			 * the HCA prefetcher could grab the 64-byte chunk and
+			 * get a valid (!= 0xffffffff) byte count but stale
+			 * data, and end up sending the wrong data.
+			 */
+			rte_io_wmb();
+			dseg->byte_count = byte_count;
+		} else {
+			/*
+			 * This data segment starts at the beginning of a new
+			 * TXBB, so we need to postpone its byte_count writing
+			 * for later.
+			 */
+			pv[pv_counter].dseg = dseg;
+			pv[pv_counter++].val = byte_count;
+		}
 	}
-	dseg->lkey = rte_cpu_to_be_32(lkey);
-	/*
-	 * Need a barrier here before writing the byte_count field to
-	 * make sure that all the data is visible before the
-	 * byte_count field is set. Otherwise, if the segment begins
-	 * a new cache line, the HCA prefetcher could grab the 64-byte
-	 * chunk and get a valid (!= 0xffffffff) byte count but
-	 * stale data, and end up sending the wrong data.
-	 */
-	rte_io_wmb();
-	if (likely(pkt->data_len))
-		dseg->byte_count = rte_cpu_to_be_32(pkt->data_len);
-	else
-		/*
-		 * Zero length segment is treated as inline segment
-		 * with zero data.
-		 */
-		dseg->byte_count = RTE_BE32(0x80000000);
-	/*
-	 * Fill the control parameters for this packet.
-	 * For raw Ethernet, the SOLICIT flag is used to indicate that no ICRC
-	 * should be calculated.
-	 */
-	ctrl->srcrb_flags =
-		rte_cpu_to_be_32(MLX4_WQE_CTRL_SOLICIT |
-				 (send_flags & MLX4_WQE_CTRL_CQ_UPDATE));
+	/* Write the first DWORD of each TXBB save earlier. */
+	if (pv_counter) {
+		/* Need a barrier here before writing the byte_count. */
+		rte_io_wmb();
+		for (--pv_counter; pv_counter  >= 0; pv_counter--)
+			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
+	}
+	/* Fill the control parameters for this packet. */
 	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
 	/*
 	 * The caller should prepare "imm" in advance in order to support
@@ -382,14 +420,27 @@ mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt, uint32_t send_flags)
 	 */
 	ctrl->imm = 0;
 	/*
-	 * Make sure descriptor is fully written before setting ownership
-	 * bit (because HW can start executing as soon as we do).
+	 * For raw Ethernet, the SOLICIT flag is used to indicate that no ICRC
+	 * should be calculated.
+	 */
+	txq->elts_comp_cd -= nr_txbbs;
+	if (unlikely(txq->elts_comp_cd <= 0)) {
+		txq->elts_comp_cd = txq->elts_comp_cd_init;
+		srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
+				       MLX4_WQE_CTRL_CQ_UPDATE);
+	} else {
+		srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT);
+	}
+	ctrl->srcrb_flags = srcrb_flags;
+	/*
+	 * Make sure descriptor is fully written before
+	 * setting ownership bit (because HW can start
+	 * executing as soon as we do).
 	 */
 	rte_wmb();
-	ctrl->owner_opcode =
-		rte_cpu_to_be_32(MLX4_OPCODE_SEND |
-				 ((sq->head & sq->txbb_cnt) ?
-				  MLX4_BIT_WQE_OWN : 0));
+	ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
+					      ((sq->head & sq->txbb_cnt) ?
+					       MLX4_BIT_WQE_OWN : 0));
 	sq->head += nr_txbbs;
 	return 0;
 err:
@@ -416,14 +467,13 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 	struct txq *txq = (struct txq *)dpdk_txq;
 	unsigned int elts_head = txq->elts_head;
 	const unsigned int elts_n = txq->elts_n;
-	unsigned int elts_comp_cd = txq->elts_comp_cd;
 	unsigned int elts_comp = 0;
 	unsigned int bytes_sent = 0;
 	unsigned int i;
 	unsigned int max;
 	int err;
 
-	assert(elts_comp_cd != 0);
+	assert(txq->elts_comp_cd != 0);
 	mlx4_txq_complete(txq);
 	max = (elts_n - (elts_head - txq->elts_tail));
 	if (max > elts_n)
@@ -442,8 +492,6 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 			(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
 		struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
 		struct txq_elt *elt = &(*txq->elts)[elts_head];
-		unsigned int segs = buf->nb_segs;
-		uint32_t send_flags = 0;
 
 		/* Clean up old buffer. */
 		if (likely(elt->buf != NULL)) {
@@ -461,34 +509,16 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 				tmp = next;
 			} while (tmp != NULL);
 		}
-		/* Request Tx completion. */
-		if (unlikely(--elts_comp_cd == 0)) {
-			elts_comp_cd = txq->elts_comp_cd_init;
-			++elts_comp;
-			send_flags |= MLX4_WQE_CTRL_CQ_UPDATE;
-		}
-		if (likely(segs == 1)) {
-			/* Update element. */
-			elt->buf = buf;
-			RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
-			/* Post the packet for sending. */
-			err = mlx4_post_send(txq, buf, send_flags);
-			if (unlikely(err)) {
-				if (unlikely(send_flags &
-					     MLX4_WQE_CTRL_CQ_UPDATE)) {
-					elts_comp_cd = 1;
-					--elts_comp;
-				}
-				elt->buf = NULL;
-				goto stop;
-			}
-			elt->buf = buf;
-			bytes_sent += buf->pkt_len;
-		} else {
-			err = -EINVAL;
-			rte_errno = -err;
+		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
+		/* Post the packet for sending. */
+		err = mlx4_post_send(txq, buf);
+		if (unlikely(err)) {
+			elt->buf = NULL;
 			goto stop;
 		}
+		elt->buf = buf;
+		bytes_sent += buf->pkt_len;
+		++elts_comp;
 		elts_head = elts_head_next;
 	}
 stop:
@@ -504,7 +534,6 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 	rte_write32(txq->msq.doorbell_qpn, txq->msq.db);
 	txq->elts_head = elts_head;
 	txq->elts_comp += elts_comp;
-	txq->elts_comp_cd = elts_comp_cd;
 	return i;
 }
 
diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
index ac84177..528e286 100644
--- a/drivers/net/mlx4/mlx4_rxtx.h
+++ b/drivers/net/mlx4/mlx4_rxtx.h
@@ -101,13 +101,15 @@ struct txq {
 	struct mlx4_cq mcq; /**< Info for directly manipulating the CQ. */
 	unsigned int elts_head; /**< Current index in (*elts)[]. */
 	unsigned int elts_tail; /**< First element awaiting completion. */
-	unsigned int elts_comp; /**< Number of completion requests. */
-	unsigned int elts_comp_cd; /**< Countdown for next completion. */
+	unsigned int elts_comp; /**< Number of packets awaiting completion. */
+	int elts_comp_cd; /**< Countdown for next completion. */
 	unsigned int elts_comp_cd_init; /**< Initial value for countdown. */
 	unsigned int elts_n; /**< (*elts)[] length. */
 	struct txq_elt (*elts)[]; /**< Tx elements. */
 	struct mlx4_txq_stats stats; /**< Tx queue counters. */
 	uint32_t max_inline; /**< Max inline send size. */
+	uint8_t *bounce_buf;
+	/**< Memory used for storing the first DWORD of data TXBBs. */
 	struct {
 		const struct rte_mempool *mp; /**< Cached memory pool. */
 		struct ibv_mr *mr; /**< Memory region (for mp). */
diff --git a/drivers/net/mlx4/mlx4_txq.c b/drivers/net/mlx4/mlx4_txq.c
index fb28ef2..7552a88 100644
--- a/drivers/net/mlx4/mlx4_txq.c
+++ b/drivers/net/mlx4/mlx4_txq.c
@@ -83,8 +83,13 @@ mlx4_txq_alloc_elts(struct txq *txq, unsigned int elts_n)
 		rte_calloc_socket("TXQ", 1, sizeof(*elts), 0, txq->socket);
 	int ret = 0;
 
-	if (elts == NULL) {
-		ERROR("%p: can't allocate packets array", (void *)txq);
+	/* Allocate bounce buffer. */
+	txq->bounce_buf = rte_zmalloc_socket("TXQ",
+					     MLX4_MAX_WQE_SIZE,
+					     RTE_CACHE_LINE_MIN_SIZE,
+					     txq->socket);
+	if (!elts || !txq->bounce_buf) {
+		ERROR("%p: can't allocate TXQ memory", (void *)txq);
 		ret = ENOMEM;
 		goto error;
 	}
@@ -110,6 +115,8 @@ mlx4_txq_alloc_elts(struct txq *txq, unsigned int elts_n)
 	assert(ret == 0);
 	return 0;
 error:
+	rte_free(txq->bounce_buf);
+	txq->bounce_buf = NULL;
 	rte_free(elts);
 	DEBUG("%p: failed, freed everything", (void *)txq);
 	assert(ret > 0);
@@ -175,6 +182,7 @@ mlx4_txq_cleanup(struct txq *txq)
 		claim_zero(ibv_destroy_qp(txq->qp));
 	if (txq->cq != NULL)
 		claim_zero(ibv_destroy_cq(txq->cq));
+	rte_free(txq->bounce_buf);
 	for (i = 0; (i != RTE_DIM(txq->mp2mr)); ++i) {
 		if (txq->mp2mr[i].mp == NULL)
 			break;
-- 
2.1.4



More information about the dev mailing list