[dpdk-dev] [PATCH v6 1/5] net/mlx4: add Tx bypassing Verbs

Adrien Mazarguil adrien.mazarguil at 6wind.com
Thu Oct 12 14:29:56 CEST 2017


From: Moti Haimovsky <motih at mellanox.com>

Modify PMD to send single-buffer packets directly to the device
bypassing the Verbs Tx post and poll routines.

Tx gather support: add support for transmitting packets spanning
over multiple buffers.

Take into consideration the amount of entries a packet occupies
in the TxQ when setting the report-completion flag of the chip.

Signed-off-by: Moti Haimovsky <motih at mellanox.com>
Signed-off-by: Ophir Munk <ophirmu at mellanox.com>
Acked-by: Adrien Mazarguil <adrien.mazarguil at 6wind.com>
---
 drivers/net/mlx4/mlx4_prm.h  | 120 ++++++++++++
 drivers/net/mlx4/mlx4_rxtx.c | 398 ++++++++++++++++++++++++++++----------
 drivers/net/mlx4/mlx4_rxtx.h |  30 +--
 drivers/net/mlx4/mlx4_txq.c  |  59 ++++++
 4 files changed, 490 insertions(+), 117 deletions(-)

diff --git a/drivers/net/mlx4/mlx4_prm.h b/drivers/net/mlx4/mlx4_prm.h
new file mode 100644
index 0000000..085a595
--- /dev/null
+++ b/drivers/net/mlx4/mlx4_prm.h
@@ -0,0 +1,120 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright 2017 6WIND S.A.
+ *   Copyright 2017 Mellanox
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of 6WIND S.A. nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef MLX4_PRM_H_
+#define MLX4_PRM_H_
+
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_byteorder.h>
+
+/* Verbs headers do not support -pedantic. */
+#ifdef PEDANTIC
+#pragma GCC diagnostic ignored "-Wpedantic"
+#endif
+#include <infiniband/mlx4dv.h>
+#include <infiniband/verbs.h>
+#ifdef PEDANTIC
+#pragma GCC diagnostic error "-Wpedantic"
+#endif
+
+/* ConnectX-3 Tx queue basic block. */
+#define MLX4_TXBB_SHIFT 6
+#define MLX4_TXBB_SIZE (1 << MLX4_TXBB_SHIFT)
+
+/* Typical TSO descriptor with 16 gather entries is 352 bytes. */
+#define MLX4_MAX_WQE_SIZE 512
+#define MLX4_MAX_WQE_TXBBS (MLX4_MAX_WQE_SIZE / MLX4_TXBB_SIZE)
+
+/* Send queue stamping/invalidating information. */
+#define MLX4_SQ_STAMP_STRIDE 64
+#define MLX4_SQ_STAMP_DWORDS (MLX4_SQ_STAMP_STRIDE / 4)
+#define MLX4_SQ_STAMP_SHIFT 31
+#define MLX4_SQ_STAMP_VAL 0x7fffffff
+
+/* Work queue element (WQE) flags. */
+#define MLX4_BIT_WQE_OWN 0x80000000
+
+#define MLX4_SIZE_TO_TXBBS(size) \
+	(RTE_ALIGN((size), (MLX4_TXBB_SIZE)) >> (MLX4_TXBB_SHIFT))
+
+/* Send queue information. */
+struct mlx4_sq {
+	uint8_t *buf; /**< SQ buffer. */
+	uint8_t *eob; /**< End of SQ buffer */
+	uint32_t head; /**< SQ head counter in units of TXBBS. */
+	uint32_t tail; /**< SQ tail counter in units of TXBBS. */
+	uint32_t txbb_cnt; /**< Num of WQEBB in the Q (should be ^2). */
+	uint32_t txbb_cnt_mask; /**< txbbs_cnt mask (txbb_cnt is ^2). */
+	uint32_t headroom_txbbs; /**< Num of txbbs that should be kept free. */
+	uint32_t *db; /**< Pointer to the doorbell. */
+	uint32_t doorbell_qpn; /**< qp number to write to the doorbell. */
+};
+
+#define mlx4_get_send_wqe(sq, n) ((sq)->buf + ((n) * (MLX4_TXBB_SIZE)))
+
+/* Completion queue information. */
+struct mlx4_cq {
+	uint8_t *buf; /**< Pointer to the completion queue buffer. */
+	uint32_t cqe_cnt; /**< Number of entries in the queue. */
+	uint32_t cqe_64:1; /**< CQ entry size is 64 bytes. */
+	uint32_t cons_index; /**< Last queue entry that was handled. */
+	uint32_t *set_ci_db; /**< Pointer to the completion queue doorbell. */
+};
+
+/**
+ * Retrieve a CQE entry from a CQ.
+ *
+ * cqe = cq->buf + cons_index * cqe_size + cqe_offset
+ *
+ * Where cqe_size is 32 or 64 bytes and cqe_offset is 0 or 32 (depending on
+ * cqe_size).
+ *
+ * @param cq
+ *   CQ to retrieve entry from.
+ * @param index
+ *   Entry index.
+ *
+ * @return
+ *   Pointer to CQE entry.
+ */
+static inline struct mlx4_cqe *
+mlx4_get_cqe(struct mlx4_cq *cq, uint32_t index)
+{
+	return (struct mlx4_cqe *)(cq->buf +
+				   ((index & (cq->cqe_cnt - 1)) <<
+				    (5 + cq->cqe_64)) +
+				   (cq->cqe_64 << 5));
+}
+
+#endif /* MLX4_PRM_H_ */
diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
index 859f1bd..38b87a0 100644
--- a/drivers/net/mlx4/mlx4_rxtx.c
+++ b/drivers/net/mlx4/mlx4_rxtx.c
@@ -52,15 +52,81 @@
 
 #include <rte_branch_prediction.h>
 #include <rte_common.h>
+#include <rte_io.h>
 #include <rte_mbuf.h>
 #include <rte_mempool.h>
 #include <rte_prefetch.h>
 
 #include "mlx4.h"
+#include "mlx4_prm.h"
 #include "mlx4_rxtx.h"
 #include "mlx4_utils.h"
 
 /**
+ * Pointer-value pair structure used in tx_post_send for saving the first
+ * DWORD (32 byte) of a TXBB.
+ */
+struct pv {
+	struct mlx4_wqe_data_seg *dseg;
+	uint32_t val;
+};
+
+/**
+ * Stamp a WQE so it won't be reused by the HW.
+ *
+ * Routine is used when freeing WQE used by the chip or when failing
+ * building an WQ entry has failed leaving partial information on the queue.
+ *
+ * @param sq
+ *   Pointer to the SQ structure.
+ * @param index
+ *   Index of the freed WQE.
+ * @param num_txbbs
+ *   Number of blocks to stamp.
+ *   If < 0 the routine will use the size written in the WQ entry.
+ * @param owner
+ *   The value of the WQE owner bit to use in the stamp.
+ *
+ * @return
+ *   The number of Tx basic blocs (TXBB) the WQE contained.
+ */
+static int
+mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, uint16_t index, uint8_t owner)
+{
+	uint32_t stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
+					  (!!owner << MLX4_SQ_STAMP_SHIFT));
+	uint8_t *wqe = mlx4_get_send_wqe(sq, (index & sq->txbb_cnt_mask));
+	uint32_t *ptr = (uint32_t *)wqe;
+	int i;
+	int txbbs_size;
+	int num_txbbs;
+
+	/* Extract the size from the control segment of the WQE. */
+	num_txbbs = MLX4_SIZE_TO_TXBBS((((struct mlx4_wqe_ctrl_seg *)
+					 wqe)->fence_size & 0x3f) << 4);
+	txbbs_size = num_txbbs * MLX4_TXBB_SIZE;
+	/* Optimize the common case when there is no wrap-around. */
+	if (wqe + txbbs_size <= sq->eob) {
+		/* Stamp the freed descriptor. */
+		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
+			*ptr = stamp;
+			ptr += MLX4_SQ_STAMP_DWORDS;
+		}
+	} else {
+		/* Stamp the freed descriptor. */
+		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
+			*ptr = stamp;
+			ptr += MLX4_SQ_STAMP_DWORDS;
+			if ((uint8_t *)ptr >= sq->eob) {
+				ptr = (uint32_t *)sq->buf;
+				stamp ^= RTE_BE32(0x80000000);
+			}
+		}
+	}
+	return num_txbbs;
+}
+
+/**
  * Manage Tx completions.
  *
  * When sending a burst, mlx4_tx_burst() posts several WRs.
@@ -80,26 +146,71 @@ mlx4_txq_complete(struct txq *txq)
 	unsigned int elts_comp = txq->elts_comp;
 	unsigned int elts_tail = txq->elts_tail;
 	const unsigned int elts_n = txq->elts_n;
-	struct ibv_wc wcs[elts_comp];
-	int wcs_n;
+	struct mlx4_cq *cq = &txq->mcq;
+	struct mlx4_sq *sq = &txq->msq;
+	struct mlx4_cqe *cqe;
+	uint32_t cons_index = cq->cons_index;
+	uint16_t new_index;
+	uint16_t nr_txbbs = 0;
+	int pkts = 0;
 
 	if (unlikely(elts_comp == 0))
 		return 0;
-	wcs_n = ibv_poll_cq(txq->cq, elts_comp, wcs);
-	if (unlikely(wcs_n == 0))
+	/*
+	 * Traverse over all CQ entries reported and handle each WQ entry
+	 * reported by them.
+	 */
+	do {
+		cqe = (struct mlx4_cqe *)mlx4_get_cqe(cq, cons_index);
+		if (unlikely(!!(cqe->owner_sr_opcode & MLX4_CQE_OWNER_MASK) ^
+		    !!(cons_index & cq->cqe_cnt)))
+			break;
+		/*
+		 * Make sure we read the CQE after we read the ownership bit.
+		 */
+		rte_rmb();
+		if (unlikely((cqe->owner_sr_opcode & MLX4_CQE_OPCODE_MASK) ==
+			     MLX4_CQE_OPCODE_ERROR)) {
+			struct mlx4_err_cqe *cqe_err =
+				(struct mlx4_err_cqe *)cqe;
+			ERROR("%p CQE error - vendor syndrome: 0x%x"
+			      " syndrome: 0x%x\n",
+			      (void *)txq, cqe_err->vendor_err,
+			      cqe_err->syndrome);
+		}
+		/* Get WQE index reported in the CQE. */
+		new_index =
+			rte_be_to_cpu_16(cqe->wqe_index) & sq->txbb_cnt_mask;
+		do {
+			/* Free next descriptor. */
+			nr_txbbs +=
+				mlx4_txq_stamp_freed_wqe(sq,
+				     (sq->tail + nr_txbbs) & sq->txbb_cnt_mask,
+				     !!((sq->tail + nr_txbbs) & sq->txbb_cnt));
+			pkts++;
+		} while (((sq->tail + nr_txbbs) & sq->txbb_cnt_mask) !=
+			 new_index);
+		cons_index++;
+	} while (1);
+	if (unlikely(pkts == 0))
 		return 0;
-	if (unlikely(wcs_n < 0)) {
-		DEBUG("%p: ibv_poll_cq() failed (wcs_n=%d)",
-		      (void *)txq, wcs_n);
-		return -1;
-	}
-	elts_comp -= wcs_n;
+	/*
+	 * Update CQ.
+	 * To prevent CQ overflow we first update CQ consumer and only then
+	 * the ring consumer.
+	 */
+	cq->cons_index = cons_index;
+	*cq->set_ci_db = rte_cpu_to_be_32(cq->cons_index & 0xffffff);
+	rte_wmb();
+	sq->tail = sq->tail + nr_txbbs;
+	/* Update the list of packets posted for transmission. */
+	elts_comp -= pkts;
 	assert(elts_comp <= txq->elts_comp);
 	/*
-	 * Assume WC status is successful as nothing can be done about it
-	 * anyway.
+	 * Assume completion status is successful as nothing can be done about
+	 * it anyway.
 	 */
-	elts_tail += wcs_n * txq->elts_comp_cd_init;
+	elts_tail += pkts;
 	if (elts_tail >= elts_n)
 		elts_tail -= elts_n;
 	txq->elts_tail = elts_tail;
@@ -183,6 +294,161 @@ mlx4_txq_mp2mr(struct txq *txq, struct rte_mempool *mp)
 }
 
 /**
+ * Posts a single work request to a send queue.
+ *
+ * @param txq
+ *   Target Tx queue.
+ * @param pkt
+ *   Packet to transmit.
+ *
+ * @return
+ *   0 on success, negative errno value otherwise and rte_errno is set.
+ */
+static inline int
+mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt)
+{
+	struct mlx4_wqe_ctrl_seg *ctrl;
+	struct mlx4_wqe_data_seg *dseg;
+	struct mlx4_sq *sq = &txq->msq;
+	struct rte_mbuf *buf;
+	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
+	uint32_t lkey;
+	uintptr_t addr;
+	uint32_t srcrb_flags;
+	uint32_t owner_opcode = MLX4_OPCODE_SEND;
+	uint32_t byte_count;
+	int wqe_real_size;
+	int nr_txbbs;
+	int rc;
+	struct pv *pv = (struct pv *)txq->bounce_buf;
+	int pv_counter = 0;
+
+	/* Calculate the needed work queue entry size for this packet. */
+	wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
+			pkt->nb_segs * sizeof(struct mlx4_wqe_data_seg);
+	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
+	/*
+	 * Check that there is room for this WQE in the send queue and that
+	 * the WQE size is legal.
+	 */
+	if (((sq->head - sq->tail) + nr_txbbs +
+	     sq->headroom_txbbs) >= sq->txbb_cnt ||
+	    nr_txbbs > MLX4_MAX_WQE_TXBBS) {
+		rc = ENOSPC;
+		goto err;
+	}
+	/* Get the control and data entries of the WQE. */
+	ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx);
+	dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
+					    sizeof(struct mlx4_wqe_ctrl_seg));
+	/* Fill the data segments with buffer information. */
+	for (buf = pkt; buf != NULL; buf = buf->next, dseg++) {
+		addr = rte_pktmbuf_mtod(buf, uintptr_t);
+		rte_prefetch0((volatile void *)addr);
+		/* Handle WQE wraparound. */
+		if (unlikely(dseg >= (struct mlx4_wqe_data_seg *)sq->eob))
+			dseg = (struct mlx4_wqe_data_seg *)sq->buf;
+		dseg->addr = rte_cpu_to_be_64(addr);
+		/* Memory region key for this memory pool. */
+		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
+		if (unlikely(lkey == (uint32_t)-1)) {
+			/* MR does not exist. */
+			DEBUG("%p: unable to get MP <-> MR association",
+			      (void *)txq);
+			/*
+			 * Restamp entry in case of failure.
+			 * Make sure that size is written correctly
+			 * Note that we give ownership to the SW, not the HW.
+			 */
+			ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+			mlx4_txq_stamp_freed_wqe(sq, head_idx,
+				     (sq->head & sq->txbb_cnt) ? 0 : 1);
+			rc = EFAULT;
+			goto err;
+		}
+		dseg->lkey = rte_cpu_to_be_32(lkey);
+		if (likely(buf->data_len)) {
+			byte_count = rte_cpu_to_be_32(buf->data_len);
+		} else {
+			/*
+			 * Zero length segment is treated as inline segment
+			 * with zero data.
+			 */
+			byte_count = RTE_BE32(0x80000000);
+		}
+		/*
+		 * If the data segment is not at the beginning of a
+		 * Tx basic block (TXBB) then write the byte count,
+		 * else postpone the writing to just before updating the
+		 * control segment.
+		 */
+		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
+			/*
+			 * Need a barrier here before writing the byte_count
+			 * fields to make sure that all the data is visible
+			 * before the byte_count field is set.
+			 * Otherwise, if the segment begins a new cacheline,
+			 * the HCA prefetcher could grab the 64-byte chunk and
+			 * get a valid (!= 0xffffffff) byte count but stale
+			 * data, and end up sending the wrong data.
+			 */
+			rte_io_wmb();
+			dseg->byte_count = byte_count;
+		} else {
+			/*
+			 * This data segment starts at the beginning of a new
+			 * TXBB, so we need to postpone its byte_count writing
+			 * for later.
+			 */
+			pv[pv_counter].dseg = dseg;
+			pv[pv_counter++].val = byte_count;
+		}
+	}
+	/* Write the first DWORD of each TXBB save earlier. */
+	if (pv_counter) {
+		/* Need a barrier here before writing the byte_count. */
+		rte_io_wmb();
+		for (--pv_counter; pv_counter  >= 0; pv_counter--)
+			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
+	}
+	/* Fill the control parameters for this packet. */
+	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+	/*
+	 * The caller should prepare "imm" in advance in order to support
+	 * VF to VF communication (when the device is a virtual-function
+	 * device (VF)).
+	 */
+	ctrl->imm = 0;
+	/*
+	 * For raw Ethernet, the SOLICIT flag is used to indicate that no ICRC
+	 * should be calculated.
+	 */
+	txq->elts_comp_cd -= nr_txbbs;
+	if (unlikely(txq->elts_comp_cd <= 0)) {
+		txq->elts_comp_cd = txq->elts_comp_cd_init;
+		srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
+				       MLX4_WQE_CTRL_CQ_UPDATE);
+	} else {
+		srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT);
+	}
+	ctrl->srcrb_flags = srcrb_flags;
+	/*
+	 * Make sure descriptor is fully written before
+	 * setting ownership bit (because HW can start
+	 * executing as soon as we do).
+	 */
+	rte_wmb();
+	ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
+					      ((sq->head & sq->txbb_cnt) ?
+					       MLX4_BIT_WQE_OWN : 0));
+	sq->head += nr_txbbs;
+	return 0;
+err:
+	rte_errno = rc;
+	return -rc;
+}
+
+/**
  * DPDK callback for Tx.
  *
  * @param dpdk_txq
@@ -199,18 +465,15 @@ uint16_t
 mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 {
 	struct txq *txq = (struct txq *)dpdk_txq;
-	struct ibv_send_wr *wr_head = NULL;
-	struct ibv_send_wr **wr_next = &wr_head;
-	struct ibv_send_wr *wr_bad = NULL;
 	unsigned int elts_head = txq->elts_head;
 	const unsigned int elts_n = txq->elts_n;
-	unsigned int elts_comp_cd = txq->elts_comp_cd;
 	unsigned int elts_comp = 0;
+	unsigned int bytes_sent = 0;
 	unsigned int i;
 	unsigned int max;
 	int err;
 
-	assert(elts_comp_cd != 0);
+	assert(txq->elts_comp_cd != 0);
 	mlx4_txq_complete(txq);
 	max = (elts_n - (elts_head - txq->elts_tail));
 	if (max > elts_n)
@@ -229,10 +492,6 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 			(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
 		struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
 		struct txq_elt *elt = &(*txq->elts)[elts_head];
-		struct ibv_send_wr *wr = &elt->wr;
-		unsigned int segs = buf->nb_segs;
-		unsigned int sent_size = 0;
-		uint32_t send_flags = 0;
 
 		/* Clean up old buffer. */
 		if (likely(elt->buf != NULL)) {
@@ -250,100 +509,31 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, uint16_t pkts_n)
 				tmp = next;
 			} while (tmp != NULL);
 		}
-		/* Request Tx completion. */
-		if (unlikely(--elts_comp_cd == 0)) {
-			elts_comp_cd = txq->elts_comp_cd_init;
-			++elts_comp;
-			send_flags |= IBV_SEND_SIGNALED;
-		}
-		if (likely(segs == 1)) {
-			struct ibv_sge *sge = &elt->sge;
-			uintptr_t addr;
-			uint32_t length;
-			uint32_t lkey;
-
-			/* Retrieve buffer information. */
-			addr = rte_pktmbuf_mtod(buf, uintptr_t);
-			length = buf->data_len;
-			/* Retrieve memory region key for this memory pool. */
-			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
-			if (unlikely(lkey == (uint32_t)-1)) {
-				/* MR does not exist. */
-				DEBUG("%p: unable to get MP <-> MR"
-				      " association", (void *)txq);
-				/* Clean up Tx element. */
-				elt->buf = NULL;
-				goto stop;
-			}
-			/* Update element. */
-			elt->buf = buf;
-			if (txq->priv->vf)
-				rte_prefetch0((volatile void *)
-					      (uintptr_t)addr);
-			RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
-			sge->addr = addr;
-			sge->length = length;
-			sge->lkey = lkey;
-			sent_size += length;
-		} else {
-			err = -1;
+		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
+		/* Post the packet for sending. */
+		err = mlx4_post_send(txq, buf);
+		if (unlikely(err)) {
+			elt->buf = NULL;
 			goto stop;
 		}
-		if (sent_size <= txq->max_inline)
-			send_flags |= IBV_SEND_INLINE;
+		elt->buf = buf;
+		bytes_sent += buf->pkt_len;
+		++elts_comp;
 		elts_head = elts_head_next;
-		/* Increment sent bytes counter. */
-		txq->stats.obytes += sent_size;
-		/* Set up WR. */
-		wr->sg_list = &elt->sge;
-		wr->num_sge = segs;
-		wr->opcode = IBV_WR_SEND;
-		wr->send_flags = send_flags;
-		*wr_next = wr;
-		wr_next = &wr->next;
 	}
 stop:
 	/* Take a shortcut if nothing must be sent. */
 	if (unlikely(i == 0))
 		return 0;
-	/* Increment sent packets counter. */
+	/* Increment send statistics counters. */
 	txq->stats.opackets += i;
+	txq->stats.obytes += bytes_sent;
+	/* Make sure that descriptors are written before doorbell record. */
+	rte_wmb();
 	/* Ring QP doorbell. */
-	*wr_next = NULL;
-	assert(wr_head);
-	err = ibv_post_send(txq->qp, wr_head, &wr_bad);
-	if (unlikely(err)) {
-		uint64_t obytes = 0;
-		uint64_t opackets = 0;
-
-		/* Rewind bad WRs. */
-		while (wr_bad != NULL) {
-			int j;
-
-			/* Force completion request if one was lost. */
-			if (wr_bad->send_flags & IBV_SEND_SIGNALED) {
-				elts_comp_cd = 1;
-				--elts_comp;
-			}
-			++opackets;
-			for (j = 0; j < wr_bad->num_sge; ++j)
-				obytes += wr_bad->sg_list[j].length;
-			elts_head = (elts_head ? elts_head : elts_n) - 1;
-			wr_bad = wr_bad->next;
-		}
-		txq->stats.opackets -= opackets;
-		txq->stats.obytes -= obytes;
-		i -= opackets;
-		DEBUG("%p: ibv_post_send() failed, %" PRIu64 " packets"
-		      " (%" PRIu64 " bytes) rejected: %s",
-		      (void *)txq,
-		      opackets,
-		      obytes,
-		      (err <= -1) ? "Internal error" : strerror(err));
-	}
+	rte_write32(txq->msq.doorbell_qpn, txq->msq.db);
 	txq->elts_head = elts_head;
 	txq->elts_comp += elts_comp;
-	txq->elts_comp_cd = elts_comp_cd;
 	return i;
 }
 
diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
index eca966f..ff27126 100644
--- a/drivers/net/mlx4/mlx4_rxtx.h
+++ b/drivers/net/mlx4/mlx4_rxtx.h
@@ -41,6 +41,7 @@
 #ifdef PEDANTIC
 #pragma GCC diagnostic ignored "-Wpedantic"
 #endif
+#include <infiniband/mlx4dv.h>
 #include <infiniband/verbs.h>
 #ifdef PEDANTIC
 #pragma GCC diagnostic error "-Wpedantic"
@@ -51,6 +52,7 @@
 #include <rte_mempool.h>
 
 #include "mlx4.h"
+#include "mlx4_prm.h"
 
 /** Rx queue counters. */
 struct mlx4_rxq_stats {
@@ -101,8 +103,6 @@ struct mlx4_rss {
 
 /** Tx element. */
 struct txq_elt {
-	struct ibv_send_wr wr; /**< Work request. */
-	struct ibv_sge sge; /**< Scatter/gather element. */
 	struct rte_mbuf *buf; /**< Buffer. */
 };
 
@@ -116,24 +116,28 @@ struct mlx4_txq_stats {
 
 /** Tx queue descriptor. */
 struct txq {
-	struct priv *priv; /**< Back pointer to private data. */
+	struct mlx4_sq msq; /**< Info for directly manipulating the SQ. */
+	struct mlx4_cq mcq; /**< Info for directly manipulating the CQ. */
+	unsigned int elts_head; /**< Current index in (*elts)[]. */
+	unsigned int elts_tail; /**< First element awaiting completion. */
+	unsigned int elts_comp; /**< Number of packets awaiting completion. */
+	int elts_comp_cd; /**< Countdown for next completion. */
+	unsigned int elts_comp_cd_init; /**< Initial value for countdown. */
+	unsigned int elts_n; /**< (*elts)[] length. */
+	struct txq_elt (*elts)[]; /**< Tx elements. */
+	struct mlx4_txq_stats stats; /**< Tx queue counters. */
+	uint32_t max_inline; /**< Max inline send size. */
+	uint8_t *bounce_buf;
+	/**< Memory used for storing the first DWORD of data TXBBs. */
 	struct {
 		const struct rte_mempool *mp; /**< Cached memory pool. */
 		struct ibv_mr *mr; /**< Memory region (for mp). */
 		uint32_t lkey; /**< mr->lkey copy. */
 	} mp2mr[MLX4_PMD_TX_MP_CACHE]; /**< MP to MR translation table. */
+	struct priv *priv; /**< Back pointer to private data. */
+	unsigned int socket; /**< CPU socket ID for allocations. */
 	struct ibv_cq *cq; /**< Completion queue. */
 	struct ibv_qp *qp; /**< Queue pair. */
-	uint32_t max_inline; /**< Max inline send size. */
-	unsigned int elts_n; /**< (*elts)[] length. */
-	struct txq_elt (*elts)[]; /**< Tx elements. */
-	unsigned int elts_head; /**< Current index in (*elts)[]. */
-	unsigned int elts_tail; /**< First element awaiting completion. */
-	unsigned int elts_comp; /**< Number of completion requests. */
-	unsigned int elts_comp_cd; /**< Countdown for next completion. */
-	unsigned int elts_comp_cd_init; /**< Initial value for countdown. */
-	struct mlx4_txq_stats stats; /**< Tx queue counters. */
-	unsigned int socket; /**< CPU socket ID for allocations. */
 	uint8_t data[]; /**< Remaining queue resources. */
 };
 
diff --git a/drivers/net/mlx4/mlx4_txq.c b/drivers/net/mlx4/mlx4_txq.c
index 915f8d7..fbb028a 100644
--- a/drivers/net/mlx4/mlx4_txq.c
+++ b/drivers/net/mlx4/mlx4_txq.c
@@ -60,6 +60,7 @@
 
 #include "mlx4.h"
 #include "mlx4_autoconf.h"
+#include "mlx4_prm.h"
 #include "mlx4_rxtx.h"
 #include "mlx4_utils.h"
 
@@ -148,6 +149,41 @@ mlx4_txq_mp2mr_iter(struct rte_mempool *mp, void *arg)
 }
 
 /**
+ * Retrieves information needed in order to directly access the Tx queue.
+ *
+ * @param txq
+ *   Pointer to Tx queue structure.
+ * @param mlxdv
+ *   Pointer to device information for this Tx queue.
+ */
+static void
+mlx4_txq_fill_dv_obj_info(struct txq *txq, struct mlx4dv_obj *mlxdv)
+{
+	struct mlx4_sq *sq = &txq->msq;
+	struct mlx4_cq *cq = &txq->mcq;
+	struct mlx4dv_qp *dqp = mlxdv->qp.out;
+	struct mlx4dv_cq *dcq = mlxdv->cq.out;
+	uint32_t sq_size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
+
+	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
+	/* Total length, including headroom and spare WQEs. */
+	sq->eob = sq->buf + sq_size;
+	sq->head = 0;
+	sq->tail = 0;
+	sq->txbb_cnt =
+		(dqp->sq.wqe_cnt << dqp->sq.wqe_shift) >> MLX4_TXBB_SHIFT;
+	sq->txbb_cnt_mask = sq->txbb_cnt - 1;
+	sq->db = dqp->sdb;
+	sq->doorbell_qpn = dqp->doorbell_qpn;
+	sq->headroom_txbbs =
+		(2048 + (1 << dqp->sq.wqe_shift)) >> MLX4_TXBB_SHIFT;
+	cq->buf = dcq->buf.buf;
+	cq->cqe_cnt = dcq->cqe_cnt;
+	cq->set_ci_db = dcq->set_ci_db;
+	cq->cqe_64 = (dcq->cqe_size & 64) ? 1 : 0;
+}
+
+/**
  * DPDK callback to configure a Tx queue.
  *
  * @param dev
@@ -169,9 +205,13 @@ mlx4_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 		    unsigned int socket, const struct rte_eth_txconf *conf)
 {
 	struct priv *priv = dev->data->dev_private;
+	struct mlx4dv_obj mlxdv;
+	struct mlx4dv_qp dv_qp;
+	struct mlx4dv_cq dv_cq;
 	struct txq_elt (*elts)[desc];
 	struct ibv_qp_init_attr qp_init_attr;
 	struct txq *txq;
+	uint8_t *bounce_buf;
 	struct mlx4_malloc_vec vec[] = {
 		{
 			.align = RTE_CACHE_LINE_SIZE,
@@ -183,6 +223,11 @@ mlx4_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			.size = sizeof(*elts),
 			.addr = (void **)&elts,
 		},
+		{
+			.align = RTE_CACHE_LINE_SIZE,
+			.size = MLX4_MAX_WQE_SIZE,
+			.addr = (void **)&bounce_buf,
+		},
 	};
 	int ret;
 
@@ -231,6 +276,7 @@ mlx4_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 			RTE_MIN(MLX4_PMD_TX_PER_COMP_REQ, desc / 4),
 		.elts_comp_cd_init =
 			RTE_MIN(MLX4_PMD_TX_PER_COMP_REQ, desc / 4),
+		.bounce_buf = bounce_buf,
 	};
 	txq->cq = ibv_create_cq(priv->ctx, desc, NULL, NULL, 0);
 	if (!txq->cq) {
@@ -297,6 +343,19 @@ mlx4_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 		      (void *)dev, strerror(rte_errno));
 		goto error;
 	}
+	/* Retrieve device queue information. */
+	mlxdv.cq.in = txq->cq;
+	mlxdv.cq.out = &dv_cq;
+	mlxdv.qp.in = txq->qp;
+	mlxdv.qp.out = &dv_qp;
+	ret = mlx4dv_init_obj(&mlxdv, MLX4DV_OBJ_QP | MLX4DV_OBJ_CQ);
+	if (ret) {
+		rte_errno = EINVAL;
+		ERROR("%p: failed to obtain information needed for"
+		      " accessing the device queues", (void *)dev);
+		goto error;
+	}
+	mlx4_txq_fill_dv_obj_info(txq, &mlxdv);
 	/* Pre-register known mempools. */
 	rte_mempool_walk(mlx4_txq_mp2mr_iter, txq);
 	DEBUG("%p: adding Tx queue %p to list", (void *)dev, (void *)txq);
-- 
2.1.4



More information about the dev mailing list