[dpdk-dev] [PATCH v3 13/30] net/mlx5: add reference counter on DPDK Rx queues

Nelio Laranjeiro nelio.laranjeiro at 6wind.com
Mon Oct 9 16:44:49 CEST 2017


Use the same design for DPDK queue as for Verbs queue for symmetry, this
also helps in fixing some issues like the DPDK release queue API which is
not expected to fail.  With such design, the queue is released when the
reference counters reaches 0.

Signed-off-by: Nelio Laranjeiro <nelio.laranjeiro at 6wind.com>
Acked-by: Yongseok Koh <yskoh at mellanox.com>
---
 drivers/net/mlx5/mlx5.c         |  16 +-
 drivers/net/mlx5/mlx5.h         |   1 +
 drivers/net/mlx5/mlx5_rxq.c     | 488 +++++++++++++++++++++-------------------
 drivers/net/mlx5/mlx5_rxtx.h    |  10 +
 drivers/net/mlx5/mlx5_trigger.c |  47 +++-
 5 files changed, 321 insertions(+), 241 deletions(-)

diff --git a/drivers/net/mlx5/mlx5.c b/drivers/net/mlx5/mlx5.c
index 276401d..c2c3d1b 100644
--- a/drivers/net/mlx5/mlx5.c
+++ b/drivers/net/mlx5/mlx5.c
@@ -208,17 +208,8 @@ mlx5_dev_close(struct rte_eth_dev *dev)
 	if (priv->rxqs != NULL) {
 		/* XXX race condition if mlx5_rx_burst() is still running. */
 		usleep(1000);
-		for (i = 0; (i != priv->rxqs_n); ++i) {
-			struct mlx5_rxq_data *rxq = (*priv->rxqs)[i];
-			struct mlx5_rxq_ctrl *rxq_ctrl;
-
-			if (rxq == NULL)
-				continue;
-			rxq_ctrl = container_of(rxq, struct mlx5_rxq_ctrl, rxq);
-			(*priv->rxqs)[i] = NULL;
-			mlx5_rxq_cleanup(rxq_ctrl);
-			rte_free(rxq_ctrl);
-		}
+		for (i = 0; (i != priv->rxqs_n); ++i)
+			mlx5_priv_rxq_release(priv, i);
 		priv->rxqs_n = 0;
 		priv->rxqs = NULL;
 	}
@@ -247,6 +238,9 @@ mlx5_dev_close(struct rte_eth_dev *dev)
 	ret = mlx5_priv_rxq_ibv_verify(priv);
 	if (ret)
 		WARN("%p: some Verbs Rx queue still remain", (void *)priv);
+	ret = mlx5_priv_rxq_verify(priv);
+	if (ret)
+		WARN("%p: some Rx Queues still remain", (void *)priv);
 	ret = mlx5_priv_txq_ibv_verify(priv);
 	if (ret)
 		WARN("%p: some Verbs Tx queue still remain", (void *)priv);
diff --git a/drivers/net/mlx5/mlx5.h b/drivers/net/mlx5/mlx5.h
index b20c39c..d0ef21a 100644
--- a/drivers/net/mlx5/mlx5.h
+++ b/drivers/net/mlx5/mlx5.h
@@ -147,6 +147,7 @@ struct priv {
 	struct rte_flow_drop *flow_drop_queue; /* Flow drop queue. */
 	TAILQ_HEAD(mlx5_flows, rte_flow) flows; /* RTE Flow rules. */
 	LIST_HEAD(mr, mlx5_mr) mr; /* Memory region. */
+	LIST_HEAD(rxq, mlx5_rxq_ctrl) rxqsctrl; /* DPDK Rx queues. */
 	LIST_HEAD(rxqibv, mlx5_rxq_ibv) rxqsibv; /* Verbs Rx queues. */
 	LIST_HEAD(txq, mlx5_txq_ctrl) txqsctrl; /* DPDK Tx queues. */
 	LIST_HEAD(txqibv, mlx5_txq_ibv) txqsibv; /* Verbs Tx queues. */
diff --git a/drivers/net/mlx5/mlx5_rxq.c b/drivers/net/mlx5/mlx5_rxq.c
index 89c2cdb..87efeed 100644
--- a/drivers/net/mlx5/mlx5_rxq.c
+++ b/drivers/net/mlx5/mlx5_rxq.c
@@ -37,6 +37,7 @@
 #include <string.h>
 #include <stdint.h>
 #include <fcntl.h>
+#include <sys/queue.h>
 
 /* Verbs header. */
 /* ISO C doesn't support unnamed structs/unions, disabling -pedantic. */
@@ -629,16 +630,15 @@ priv_rehash_flows(struct priv *priv)
  *
  * @param rxq_ctrl
  *   Pointer to RX queue structure.
- * @param elts_n
- *   Number of elements to allocate.
  *
  * @return
  *   0 on success, errno value on failure.
  */
-static int
-rxq_alloc_elts(struct mlx5_rxq_ctrl *rxq_ctrl, unsigned int elts_n)
+int
+rxq_alloc_elts(struct mlx5_rxq_ctrl *rxq_ctrl)
 {
 	const unsigned int sges_n = 1 << rxq_ctrl->rxq.sges_n;
+	unsigned int elts_n = 1 << rxq_ctrl->rxq.elts_n;
 	unsigned int i;
 	int ret = 0;
 
@@ -667,9 +667,11 @@ rxq_alloc_elts(struct mlx5_rxq_ctrl *rxq_ctrl, unsigned int elts_n)
 		NB_SEGS(buf) = 1;
 		(*rxq_ctrl->rxq.elts)[i] = buf;
 	}
+	/* If Rx vector is activated. */
 	if (rxq_check_vec_support(&rxq_ctrl->rxq) > 0) {
 		struct mlx5_rxq_data *rxq = &rxq_ctrl->rxq;
 		struct rte_mbuf *mbuf_init = &rxq->fake_mbuf;
+		int j;
 
 		/* Initialize default rearm_data for vPMD. */
 		mbuf_init->data_off = RTE_PKTMBUF_HEADROOM;
@@ -681,10 +683,11 @@ rxq_alloc_elts(struct mlx5_rxq_ctrl *rxq_ctrl, unsigned int elts_n)
 		 * rearm_data covers previous fields.
 		 */
 		rte_compiler_barrier();
-		rxq->mbuf_initializer = *(uint64_t *)&mbuf_init->rearm_data;
+		rxq->mbuf_initializer =
+			*(uint64_t *)&mbuf_init->rearm_data;
 		/* Padding with a fake mbuf for vectorized Rx. */
-		for (i = 0; i < MLX5_VPMD_DESCS_PER_LOOP; ++i)
-			(*rxq->elts)[elts_n + i] = &rxq->fake_mbuf;
+		for (j = 0; j < MLX5_VPMD_DESCS_PER_LOOP; ++j)
+			(*rxq->elts)[elts_n + j] = &rxq->fake_mbuf;
 	}
 	DEBUG("%p: allocated and configured %u segments (max %u packets)",
 	      (void *)rxq_ctrl, elts_n, elts_n / (1 << rxq_ctrl->rxq.sges_n));
@@ -754,170 +757,6 @@ mlx5_rxq_cleanup(struct mlx5_rxq_ctrl *rxq_ctrl)
 }
 
 /**
- * Configure a RX queue.
- *
- * @param dev
- *   Pointer to Ethernet device structure.
- * @param rxq_ctrl
- *   Pointer to RX queue structure.
- * @param desc
- *   Number of descriptors to configure in queue.
- * @param socket
- *   NUMA socket on which memory must be allocated.
- * @param[in] conf
- *   Thresholds parameters.
- * @param mp
- *   Memory pool for buffer allocations.
- *
- * @return
- *   0 on success, errno value on failure.
- */
-static int
-rxq_ctrl_setup(struct rte_eth_dev *dev, struct mlx5_rxq_ctrl *rxq_ctrl,
-	       uint16_t desc, unsigned int socket,
-	       const struct rte_eth_rxconf *conf, struct rte_mempool *mp)
-{
-	struct priv *priv = dev->data->dev_private;
-	const uint16_t desc_n =
-		desc + priv->rx_vec_en * MLX5_VPMD_DESCS_PER_LOOP;
-	struct mlx5_rxq_ctrl tmpl = {
-		.priv = priv,
-		.socket = socket,
-		.rxq = {
-			.elts = rte_calloc_socket("RXQ", 1,
-						  desc_n *
-						  sizeof(struct rte_mbuf *), 0,
-						  socket),
-			.elts_n = log2above(desc),
-			.mp = mp,
-			.rss_hash = priv->rxqs_n > 1,
-		},
-	};
-	unsigned int mb_len = rte_pktmbuf_data_room_size(mp);
-	struct rte_mbuf *(*elts)[desc_n] = NULL;
-	int ret = 0;
-
-	(void)conf; /* Thresholds configuration (ignored). */
-	if (dev->data->dev_conf.intr_conf.rxq)
-		tmpl.irq = 1;
-	/* Enable scattered packets support for this queue if necessary. */
-	assert(mb_len >= RTE_PKTMBUF_HEADROOM);
-	if (dev->data->dev_conf.rxmode.max_rx_pkt_len <=
-	    (mb_len - RTE_PKTMBUF_HEADROOM)) {
-		tmpl.rxq.sges_n = 0;
-	} else if (dev->data->dev_conf.rxmode.enable_scatter) {
-		unsigned int size =
-			RTE_PKTMBUF_HEADROOM +
-			dev->data->dev_conf.rxmode.max_rx_pkt_len;
-		unsigned int sges_n;
-
-		/*
-		 * Determine the number of SGEs needed for a full packet
-		 * and round it to the next power of two.
-		 */
-		sges_n = log2above((size / mb_len) + !!(size % mb_len));
-		tmpl.rxq.sges_n = sges_n;
-		/* Make sure rxq.sges_n did not overflow. */
-		size = mb_len * (1 << tmpl.rxq.sges_n);
-		size -= RTE_PKTMBUF_HEADROOM;
-		if (size < dev->data->dev_conf.rxmode.max_rx_pkt_len) {
-			ERROR("%p: too many SGEs (%u) needed to handle"
-			      " requested maximum packet size %u",
-			      (void *)dev,
-			      1 << sges_n,
-			      dev->data->dev_conf.rxmode.max_rx_pkt_len);
-			return EOVERFLOW;
-		}
-	} else {
-		WARN("%p: the requested maximum Rx packet size (%u) is"
-		     " larger than a single mbuf (%u) and scattered"
-		     " mode has not been requested",
-		     (void *)dev,
-		     dev->data->dev_conf.rxmode.max_rx_pkt_len,
-		     mb_len - RTE_PKTMBUF_HEADROOM);
-	}
-	DEBUG("%p: maximum number of segments per packet: %u",
-	      (void *)dev, 1 << tmpl.rxq.sges_n);
-	if (desc % (1 << tmpl.rxq.sges_n)) {
-		ERROR("%p: number of RX queue descriptors (%u) is not a"
-		      " multiple of SGEs per packet (%u)",
-		      (void *)dev,
-		      desc,
-		      1 << tmpl.rxq.sges_n);
-		return EINVAL;
-	}
-	/* Toggle RX checksum offload if hardware supports it. */
-	if (priv->hw_csum)
-		tmpl.rxq.csum = !!dev->data->dev_conf.rxmode.hw_ip_checksum;
-	if (priv->hw_csum_l2tun)
-		tmpl.rxq.csum_l2tun =
-			!!dev->data->dev_conf.rxmode.hw_ip_checksum;
-	/* Configure VLAN stripping. */
-	tmpl.rxq.vlan_strip = (priv->hw_vlan_strip &&
-			       !!dev->data->dev_conf.rxmode.hw_vlan_strip);
-	/* By default, FCS (CRC) is stripped by hardware. */
-	if (dev->data->dev_conf.rxmode.hw_strip_crc) {
-		tmpl.rxq.crc_present = 0;
-	} else if (priv->hw_fcs_strip) {
-		tmpl.rxq.crc_present = 1;
-	} else {
-		WARN("%p: CRC stripping has been disabled but will still"
-		     " be performed by hardware, make sure MLNX_OFED and"
-		     " firmware are up to date",
-		     (void *)dev);
-		tmpl.rxq.crc_present = 0;
-	}
-	DEBUG("%p: CRC stripping is %s, %u bytes will be subtracted from"
-	      " incoming frames to hide it",
-	      (void *)dev,
-	      tmpl.rxq.crc_present ? "disabled" : "enabled",
-	      tmpl.rxq.crc_present << 2);
-#ifdef HAVE_IBV_WQ_FLAG_RX_END_PADDING
-	if (!mlx5_getenv_int("MLX5_PMD_ENABLE_PADDING")) {
-		; /* Nothing else to do. */
-	} else if (priv->hw_padding) {
-		INFO("%p: enabling packet padding on queue %p",
-		     (void *)dev, (void *)rxq_ctrl);
-	} else {
-		WARN("%p: packet padding has been requested but is not"
-		     " supported, make sure MLNX_OFED and firmware are"
-		     " up to date",
-		     (void *)dev);
-	}
-#endif
-	/* Save port ID. */
-	tmpl.rxq.port_id = dev->data->port_id;
-	DEBUG("%p: RTE port ID: %u", (void *)rxq_ctrl, tmpl.rxq.port_id);
-	ret = rxq_alloc_elts(&tmpl, desc);
-	if (ret) {
-		ERROR("%p: RXQ allocation failed: %s",
-		      (void *)dev, strerror(ret));
-		goto error;
-	}
-	/* Clean up rxq in case we're reinitializing it. */
-	DEBUG("%p: cleaning-up old rxq just in case", (void *)rxq_ctrl);
-	mlx5_rxq_cleanup(rxq_ctrl);
-	/* Move mbuf pointers to dedicated storage area in RX queue. */
-	elts = (void *)(rxq_ctrl + 1);
-	rte_memcpy(elts, tmpl.rxq.elts, sizeof(*elts));
-#ifndef NDEBUG
-	memset(tmpl.rxq.elts, 0x55, sizeof(*elts));
-#endif
-	rte_free(tmpl.rxq.elts);
-	tmpl.rxq.elts = elts;
-	*rxq_ctrl = tmpl;
-	DEBUG("%p: rxq updated with %p", (void *)rxq_ctrl, (void *)&tmpl);
-	assert(ret == 0);
-	return 0;
-error:
-	rte_free(tmpl.rxq.elts);
-	mlx5_rxq_cleanup(&tmpl);
-	assert(ret > 0);
-	return ret;
-}
-
-/**
- * DPDK callback to configure a RX queue.
  *
  * @param dev
  *   Pointer to Ethernet device structure.
@@ -944,13 +783,11 @@ mlx5_rx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 	struct mlx5_rxq_data *rxq = (*priv->rxqs)[idx];
 	struct mlx5_rxq_ctrl *rxq_ctrl =
 		container_of(rxq, struct mlx5_rxq_ctrl, rxq);
-	const uint16_t desc_n =
-		desc + priv->rx_vec_en * MLX5_VPMD_DESCS_PER_LOOP;
-	int ret;
+	int ret = 0;
 
+	(void)conf;
 	if (mlx5_is_secondary())
 		return -E_RTE_SECONDARY;
-
 	priv_lock(priv);
 	if (!rte_is_power_of_2(desc)) {
 		desc = 1 << log2above(desc);
@@ -966,54 +803,23 @@ mlx5_rx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
 		priv_unlock(priv);
 		return -EOVERFLOW;
 	}
-	if (rxq != NULL) {
-		DEBUG("%p: reusing already allocated queue index %u (%p)",
-		      (void *)dev, idx, (void *)rxq);
-		if (dev->data->dev_started) {
-			priv_unlock(priv);
-			return -EEXIST;
-		}
-		(*priv->rxqs)[idx] = NULL;
-		mlx5_rxq_cleanup(rxq_ctrl);
-		/* Resize if rxq size is changed. */
-		if (rxq_ctrl->rxq.elts_n != log2above(desc)) {
-			rxq_ctrl = rte_realloc(rxq_ctrl,
-					       sizeof(*rxq_ctrl) + desc_n *
-					       sizeof(struct rte_mbuf *),
-					       RTE_CACHE_LINE_SIZE);
-			if (!rxq_ctrl) {
-				ERROR("%p: unable to reallocate queue index %u",
-					(void *)dev, idx);
-				priv_unlock(priv);
-				return -ENOMEM;
-			}
-		}
-	} else {
-		rxq_ctrl = rte_calloc_socket("RXQ", 1, sizeof(*rxq_ctrl) +
-					     desc_n *
-					     sizeof(struct rte_mbuf *),
-					     0, socket);
-		if (rxq_ctrl == NULL) {
-			ERROR("%p: unable to allocate queue index %u",
-			      (void *)dev, idx);
-			priv_unlock(priv);
-			return -ENOMEM;
-		}
+	if (!mlx5_priv_rxq_releasable(priv, idx)) {
+		ret = EBUSY;
+		ERROR("%p: unable to release queue index %u",
+		      (void *)dev, idx);
+		goto out;
 	}
-	ret = rxq_ctrl_setup(dev, rxq_ctrl, desc, socket, conf, mp);
-	if (ret) {
-		rte_free(rxq_ctrl);
+	mlx5_priv_rxq_release(priv, idx);
+	rxq_ctrl = mlx5_priv_rxq_new(priv, idx, desc, socket, mp);
+	if (!rxq_ctrl) {
+		ERROR("%p: unable to allocate queue index %u",
+		      (void *)dev, idx);
+		ret = ENOMEM;
 		goto out;
 	}
-	rxq_ctrl->rxq.stats.idx = idx;
 	DEBUG("%p: adding RX queue %p to list",
 	      (void *)dev, (void *)rxq_ctrl);
 	(*priv->rxqs)[idx] = &rxq_ctrl->rxq;
-	rxq_ctrl->ibv = mlx5_priv_rxq_ibv_new(priv, idx);
-	if (!rxq_ctrl->ibv) {
-		ret = EAGAIN;
-		goto out;
-	}
 out:
 	priv_unlock(priv);
 	return -ret;
@@ -1031,7 +837,6 @@ mlx5_rx_queue_release(void *dpdk_rxq)
 	struct mlx5_rxq_data *rxq = (struct mlx5_rxq_data *)dpdk_rxq;
 	struct mlx5_rxq_ctrl *rxq_ctrl;
 	struct priv *priv;
-	unsigned int i;
 
 	if (mlx5_is_secondary())
 		return;
@@ -1041,18 +846,10 @@ mlx5_rx_queue_release(void *dpdk_rxq)
 	rxq_ctrl = container_of(rxq, struct mlx5_rxq_ctrl, rxq);
 	priv = rxq_ctrl->priv;
 	priv_lock(priv);
-	if (!mlx5_priv_rxq_ibv_releasable(priv, rxq_ctrl->ibv))
+	if (!mlx5_priv_rxq_releasable(priv, rxq_ctrl->rxq.stats.idx))
 		rte_panic("Rx queue %p is still used by a flow and cannot be"
 			  " removed\n", (void *)rxq_ctrl);
-	for (i = 0; (i != priv->rxqs_n); ++i)
-		if ((*priv->rxqs)[i] == rxq) {
-			DEBUG("%p: removing RX queue %p from list",
-			      (void *)priv->dev, (void *)rxq_ctrl);
-			(*priv->rxqs)[i] = NULL;
-			break;
-		}
-	mlx5_rxq_cleanup(rxq_ctrl);
-	rte_free(rxq_ctrl);
+	mlx5_priv_rxq_release(priv, rxq_ctrl->rxq.stats.idx);
 	priv_unlock(priv);
 }
 
@@ -1590,3 +1387,238 @@ mlx5_priv_rxq_ibv_releasable(struct priv *priv, struct mlx5_rxq_ibv *rxq_ibv)
 	assert(rxq_ibv);
 	return (rte_atomic32_read(&rxq_ibv->refcnt) == 1);
 }
+
+/**
+ * Create a DPDK Rx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ * @param desc
+ *   Number of descriptors to configure in queue.
+ * @param socket
+ *   NUMA socket on which memory must be allocated.
+ *
+ * @return
+ *   A DPDK queue object on success.
+ */
+struct mlx5_rxq_ctrl*
+mlx5_priv_rxq_new(struct priv *priv, uint16_t idx, uint16_t desc,
+		  unsigned int socket, struct rte_mempool *mp)
+{
+	struct rte_eth_dev *dev = priv->dev;
+	struct mlx5_rxq_ctrl *tmpl;
+	const uint16_t desc_n =
+		desc + priv->rx_vec_en * MLX5_VPMD_DESCS_PER_LOOP;
+	unsigned int mb_len = rte_pktmbuf_data_room_size(mp);
+
+	tmpl = rte_calloc_socket("RXQ", 1,
+				 sizeof(*tmpl) +
+				 desc_n * sizeof(struct rte_mbuf *),
+				 0, socket);
+	if (!tmpl)
+		return NULL;
+	if (priv->dev->data->dev_conf.intr_conf.rxq)
+		tmpl->irq = 1;
+	/* Enable scattered packets support for this queue if necessary. */
+	assert(mb_len >= RTE_PKTMBUF_HEADROOM);
+	if (dev->data->dev_conf.rxmode.max_rx_pkt_len <=
+	    (mb_len - RTE_PKTMBUF_HEADROOM)) {
+		tmpl->rxq.sges_n = 0;
+	} else if (dev->data->dev_conf.rxmode.enable_scatter) {
+		unsigned int size =
+			RTE_PKTMBUF_HEADROOM +
+			dev->data->dev_conf.rxmode.max_rx_pkt_len;
+		unsigned int sges_n;
+
+		/*
+		 * Determine the number of SGEs needed for a full packet
+		 * and round it to the next power of two.
+		 */
+		sges_n = log2above((size / mb_len) + !!(size % mb_len));
+		tmpl->rxq.sges_n = sges_n;
+		/* Make sure rxq.sges_n did not overflow. */
+		size = mb_len * (1 << tmpl->rxq.sges_n);
+		size -= RTE_PKTMBUF_HEADROOM;
+		if (size < dev->data->dev_conf.rxmode.max_rx_pkt_len) {
+			ERROR("%p: too many SGEs (%u) needed to handle"
+			      " requested maximum packet size %u",
+			      (void *)dev,
+			      1 << sges_n,
+			      dev->data->dev_conf.rxmode.max_rx_pkt_len);
+			goto error;
+		}
+	} else {
+		WARN("%p: the requested maximum Rx packet size (%u) is"
+		     " larger than a single mbuf (%u) and scattered"
+		     " mode has not been requested",
+		     (void *)dev,
+		     dev->data->dev_conf.rxmode.max_rx_pkt_len,
+		     mb_len - RTE_PKTMBUF_HEADROOM);
+	}
+	DEBUG("%p: maximum number of segments per packet: %u",
+	      (void *)dev, 1 << tmpl->rxq.sges_n);
+	if (desc % (1 << tmpl->rxq.sges_n)) {
+		ERROR("%p: number of RX queue descriptors (%u) is not a"
+		      " multiple of SGEs per packet (%u)",
+		      (void *)dev,
+		      desc,
+		      1 << tmpl->rxq.sges_n);
+		goto error;
+	}
+	/* Toggle RX checksum offload if hardware supports it. */
+	if (priv->hw_csum)
+		tmpl->rxq.csum = !!dev->data->dev_conf.rxmode.hw_ip_checksum;
+	if (priv->hw_csum_l2tun)
+		tmpl->rxq.csum_l2tun =
+			!!dev->data->dev_conf.rxmode.hw_ip_checksum;
+	/* Configure VLAN stripping. */
+	tmpl->rxq.vlan_strip = (priv->hw_vlan_strip &&
+			       !!dev->data->dev_conf.rxmode.hw_vlan_strip);
+	/* By default, FCS (CRC) is stripped by hardware. */
+	if (dev->data->dev_conf.rxmode.hw_strip_crc) {
+		tmpl->rxq.crc_present = 0;
+	} else if (priv->hw_fcs_strip) {
+		tmpl->rxq.crc_present = 1;
+	} else {
+		WARN("%p: CRC stripping has been disabled but will still"
+		     " be performed by hardware, make sure MLNX_OFED and"
+		     " firmware are up to date",
+		     (void *)dev);
+		tmpl->rxq.crc_present = 0;
+	}
+	DEBUG("%p: CRC stripping is %s, %u bytes will be subtracted from"
+	      " incoming frames to hide it",
+	      (void *)dev,
+	      tmpl->rxq.crc_present ? "disabled" : "enabled",
+	      tmpl->rxq.crc_present << 2);
+	/* Save port ID. */
+	tmpl->rxq.rss_hash = priv->rxqs_n > 1;
+	tmpl->rxq.port_id = dev->data->port_id;
+	tmpl->priv = priv;
+	tmpl->rxq.mp = mp;
+	tmpl->rxq.stats.idx = idx;
+	tmpl->rxq.elts_n = log2above(desc);
+	tmpl->rxq.elts =
+		(struct rte_mbuf *(*)[1 << tmpl->rxq.elts_n])(tmpl + 1);
+	rte_atomic32_inc(&tmpl->refcnt);
+	DEBUG("%p: Rx queue %p: refcnt %d", (void *)priv,
+	      (void *)tmpl, rte_atomic32_read(&tmpl->refcnt));
+	LIST_INSERT_HEAD(&priv->rxqsctrl, tmpl, next);
+	return tmpl;
+error:
+	rte_free(tmpl);
+	return NULL;
+}
+
+/**
+ * Get a Rx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   A pointer to the queue if it exists.
+ */
+struct mlx5_rxq_ctrl*
+mlx5_priv_rxq_get(struct priv *priv, uint16_t idx)
+{
+	struct mlx5_rxq_ctrl *rxq_ctrl = NULL;
+
+	if ((*priv->rxqs)[idx]) {
+		rxq_ctrl = container_of((*priv->rxqs)[idx],
+					struct mlx5_rxq_ctrl,
+					rxq);
+
+		mlx5_priv_rxq_ibv_get(priv, idx);
+		rte_atomic32_inc(&rxq_ctrl->refcnt);
+		DEBUG("%p: Rx queue %p: refcnt %d", (void *)priv,
+		      (void *)rxq_ctrl, rte_atomic32_read(&rxq_ctrl->refcnt));
+	}
+	return rxq_ctrl;
+}
+
+/**
+ * Release a Rx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   0 on success, errno value on failure.
+ */
+int
+mlx5_priv_rxq_release(struct priv *priv, uint16_t idx)
+{
+	struct mlx5_rxq_ctrl *rxq_ctrl;
+
+	if (!(*priv->rxqs)[idx])
+		return 0;
+	rxq_ctrl = container_of((*priv->rxqs)[idx], struct mlx5_rxq_ctrl, rxq);
+	assert(rxq_ctrl->priv);
+	if (rxq_ctrl->ibv) {
+		int ret;
+
+		ret = mlx5_priv_rxq_ibv_release(rxq_ctrl->priv, rxq_ctrl->ibv);
+		if (!ret)
+			rxq_ctrl->ibv = NULL;
+	}
+	DEBUG("%p: Rx queue %p: refcnt %d", (void *)priv,
+	      (void *)rxq_ctrl, rte_atomic32_read(&rxq_ctrl->refcnt));
+	if (rte_atomic32_dec_and_test(&rxq_ctrl->refcnt)) {
+		LIST_REMOVE(rxq_ctrl, next);
+		rte_free(rxq_ctrl);
+		(*priv->rxqs)[idx] = NULL;
+		return 0;
+	}
+	return EBUSY;
+}
+
+/**
+ * Verify if the queue can be released.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   1 if the queue can be released.
+ */
+int
+mlx5_priv_rxq_releasable(struct priv *priv, uint16_t idx)
+{
+	struct mlx5_rxq_ctrl *rxq_ctrl;
+
+	if (!(*priv->rxqs)[idx])
+		return -1;
+	rxq_ctrl = container_of((*priv->rxqs)[idx], struct mlx5_rxq_ctrl, rxq);
+	return (rte_atomic32_read(&rxq_ctrl->refcnt) == 1);
+}
+
+/**
+ * Verify the Rx Queue list is empty
+ *
+ * @param priv
+ *  Pointer to private structure.
+ *
+ * @return the number of object not released.
+ */
+int
+mlx5_priv_rxq_verify(struct priv *priv)
+{
+	struct mlx5_rxq_ctrl *rxq_ctrl;
+	int ret = 0;
+
+	LIST_FOREACH(rxq_ctrl, &priv->rxqsctrl, next) {
+		DEBUG("%p: Rx Queue %p still referenced", (void *)priv,
+		      (void *)rxq_ctrl);
+		++ret;
+	}
+	return ret;
+}
diff --git a/drivers/net/mlx5/mlx5_rxtx.h b/drivers/net/mlx5/mlx5_rxtx.h
index 69344f6..44cfef5 100644
--- a/drivers/net/mlx5/mlx5_rxtx.h
+++ b/drivers/net/mlx5/mlx5_rxtx.h
@@ -147,6 +147,8 @@ struct mlx5_rxq_ibv {
 
 /* RX queue control descriptor. */
 struct mlx5_rxq_ctrl {
+	LIST_ENTRY(mlx5_rxq_ctrl) next; /* Pointer to the next element. */
+	rte_atomic32_t refcnt; /* Reference counter. */
 	struct priv *priv; /* Back pointer to private data. */
 	struct mlx5_rxq_ibv *ibv; /* Verbs elements. */
 	struct mlx5_rxq_data rxq; /* Data path structure. */
@@ -335,6 +337,14 @@ struct mlx5_rxq_ibv *mlx5_priv_rxq_ibv_get(struct priv *, uint16_t);
 int mlx5_priv_rxq_ibv_release(struct priv *, struct mlx5_rxq_ibv *);
 int mlx5_priv_rxq_ibv_releasable(struct priv *, struct mlx5_rxq_ibv *);
 int mlx5_priv_rxq_ibv_verify(struct priv *);
+struct mlx5_rxq_ctrl *mlx5_priv_rxq_new(struct priv *, uint16_t,
+					uint16_t, unsigned int,
+					struct rte_mempool *);
+struct mlx5_rxq_ctrl *mlx5_priv_rxq_get(struct priv *, uint16_t);
+int mlx5_priv_rxq_release(struct priv *, uint16_t);
+int mlx5_priv_rxq_releasable(struct priv *, uint16_t);
+int mlx5_priv_rxq_verify(struct priv *);
+int rxq_alloc_elts(struct mlx5_rxq_ctrl *);
 
 /* mlx5_txq.c */
 
diff --git a/drivers/net/mlx5/mlx5_trigger.c b/drivers/net/mlx5/mlx5_trigger.c
index 7a12768..a311499 100644
--- a/drivers/net/mlx5/mlx5_trigger.c
+++ b/drivers/net/mlx5/mlx5_trigger.c
@@ -79,6 +79,41 @@ priv_txq_start(struct priv *priv)
 	return -ret;
 }
 
+static void
+priv_rxq_stop(struct priv *priv)
+{
+	unsigned int i;
+
+	for (i = 0; i != priv->rxqs_n; ++i)
+		mlx5_priv_rxq_release(priv, i);
+}
+
+static int
+priv_rxq_start(struct priv *priv)
+{
+	unsigned int i;
+	int ret = 0;
+
+	for (i = 0; i != priv->rxqs_n; ++i) {
+		struct mlx5_rxq_ctrl *rxq_ctrl = mlx5_priv_rxq_get(priv, i);
+
+		if (!rxq_ctrl)
+			continue;
+		ret = rxq_alloc_elts(rxq_ctrl);
+		if (ret)
+			goto error;
+		rxq_ctrl->ibv = mlx5_priv_rxq_ibv_new(priv, i);
+		if (!rxq_ctrl->ibv) {
+			ret = ENOMEM;
+			goto error;
+		}
+	}
+	return -ret;
+error:
+	priv_rxq_stop(priv);
+	return -ret;
+}
+
 /**
  * DPDK callback to start the device.
  *
@@ -101,8 +136,6 @@ mlx5_dev_start(struct rte_eth_dev *dev)
 		return -E_RTE_SECONDARY;
 
 	priv_lock(priv);
-	/* Update Rx/Tx callback. */
-	priv_dev_select_rx_function(priv, dev);
 	DEBUG("%p: allocating and configuring hash RX queues", (void *)dev);
 	rte_mempool_walk(mlx5_mp2mr_iter, priv);
 	err = priv_txq_start(priv);
@@ -113,6 +146,14 @@ mlx5_dev_start(struct rte_eth_dev *dev)
 	}
 	/* Update send callback. */
 	priv_dev_select_tx_function(priv, dev);
+	err = priv_rxq_start(priv);
+	if (err) {
+		ERROR("%p: RXQ allocation failed: %s",
+		      (void *)dev, strerror(err));
+		goto error;
+	}
+	/* Update receive callback. */
+	priv_dev_select_rx_function(priv, dev);
 	err = priv_create_hash_rxqs(priv);
 	if (!err)
 		err = priv_rehash_flows(priv);
@@ -147,6 +188,7 @@ mlx5_dev_start(struct rte_eth_dev *dev)
 	priv_mac_addrs_disable(priv);
 	priv_destroy_hash_rxqs(priv);
 	priv_flow_stop(priv);
+	priv_rxq_stop(priv);
 	priv_txq_stop(priv);
 	priv_unlock(priv);
 	return -err;
@@ -183,6 +225,7 @@ mlx5_dev_stop(struct rte_eth_dev *dev)
 	priv_flow_stop(priv);
 	priv_rx_intr_vec_disable(priv);
 	priv_txq_stop(priv);
+	priv_rxq_stop(priv);
 	LIST_FOREACH(mr, &priv->mr, next) {
 		priv_mr_release(priv, mr);
 	}
-- 
2.1.4



More information about the dev mailing list