[dpdk-dev] [RFC PATCH 2/3] ethdev: make it safe to remove rx/tx callback at runtime

Konstantin Ananyev konstantin.ananyev at intel.com
Fri Dec 1 15:48:01 CET 2017


Right now it is not possible for the application to know when it is safe
to free removed callback handle and associated with it resources
(unless the queue is stopped).
That patch changes that behavior to:
- if the rte_eth_remove_(rx|tx)_callback() completes successfully,
then it will automatically free the callback handle and the user can safely
release any associated with the removed callback resources.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
---
 lib/librte_ether/rte_ethdev.c | 180 +++++++++++++++++++++++-------------------
 lib/librte_ether/rte_ethdev.h | 127 ++++++++++++++++++++++-------
 2 files changed, 194 insertions(+), 113 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index ff8571d60..30b23b9b2 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -482,7 +482,7 @@ free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs)
 static void
 reset_queue_local(struct rte_eth_queue_local *ql)
 {
-	free_rxtx_cbs(ql->cbs);
+	free_rxtx_cbs(ql->cbs.head);
 	memset(ql, 0, sizeof(*ql));
 }
 
@@ -3172,6 +3172,79 @@ rte_eth_dev_filter_ctrl(uint16_t port_id, enum rte_filter_type filter_type,
 	return (*dev->dev_ops->filter_ctrl)(dev, filter_type, filter_op, arg);
 }
 
+#ifdef RTE_ETHDEV_RXTX_CALLBACKS
+
+/*
+ * Helper routine - contains common code to add RX/TX queue callbacks
+ * to the list.
+ */
+static struct rte_eth_rxtx_callback *
+add_rxtx_callback(struct rte_eth_rxtx_cbs *cbs, int32_t first,
+	struct rte_eth_rxtx_callback *cb, rte_spinlock_t *lock)
+{
+	struct rte_eth_rxtx_callback *tail, **pt;
+
+	rte_spinlock_lock(lock);
+
+	/* Add callback to the head of the list. */
+	if (first != 0) {
+		cb->next = cbs->head;
+		rte_smp_wmb();
+		cbs->head = cb;
+
+	/* Add callback to the tail of the list. */
+	} else {
+		for (pt = &cbs->head; *pt != NULL; pt = &tail->next)
+			tail = *pt;
+
+		*pt = cb;
+	}
+
+	rte_spinlock_unlock(lock);
+	return cb;
+}
+
+/*
+ * Helper routine - contains common code to delete RX/TX queue callbacks
+ * from the FIFO list.
+ */
+static int
+del_rxtx_callback(struct rte_eth_rxtx_cbs *cbs,
+	struct rte_eth_rxtx_callback *user_cb, rte_spinlock_t *lock)
+{
+	int32_t ret;
+	struct rte_eth_rxtx_callback *cb, **prev_cb;
+
+	ret = -EINVAL;
+	rte_spinlock_lock(lock);
+
+	for (prev_cb = &cbs->head; *prev_cb != NULL; prev_cb = &cb->next) {
+
+		cb = *prev_cb;
+		if (cb == user_cb) {
+			/* Remove the user cb from the callback list. */
+			*prev_cb = cb->next;
+			ret = 0;
+			break;
+		}
+	}
+
+	rte_spinlock_unlock(lock);
+
+	/*
+	 * first make sure datapath doesn't use removed callback anymore,
+	 * then free the callback structure.
+	 */
+	if (ret == 0) {
+		__rte_eth_rxtx_cbs_wait(cbs);
+		rte_free(cb);
+	}
+
+	return ret;
+}
+
+#endif /* RTE_ETHDEV_RXTX_CALLBACKS */
+
 void *
 rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
 		rte_rx_callback_fn fn, void *user_param)
@@ -3180,14 +3253,16 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
 	rte_errno = ENOTSUP;
 	return NULL;
 #endif
+	struct rte_eth_rxtx_callback *cb;
+
 	/* check input parameters */
 	if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
 		    queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
 		rte_errno = EINVAL;
 		return NULL;
 	}
-	struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
 
+	cb = rte_zmalloc(NULL, sizeof(*cb), 0);
 	if (cb == NULL) {
 		rte_errno = ENOMEM;
 		return NULL;
@@ -3196,22 +3271,8 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
 	cb->fn.rx = fn;
 	cb->param = user_param;
 
-	rte_spinlock_lock(&rte_eth_rx_cb_lock);
-	/* Add the callbacks in fifo order. */
-	struct rte_eth_rxtx_callback *tail =
-		rte_eth_devices[port_id].rx_ql[queue_id].cbs;
-
-	if (!tail) {
-		rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
-
-	} else {
-		while (tail->next)
-			tail = tail->next;
-		tail->next = cb;
-	}
-	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
-	return cb;
+	return add_rxtx_callback(&rte_eth_devices[port_id].rx_ql[queue_id].cbs,
+		0, cb, &rte_eth_rx_cb_lock);
 }
 
 void *
@@ -3222,6 +3283,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
 	rte_errno = ENOTSUP;
 	return NULL;
 #endif
+	struct rte_eth_rxtx_callback *cb;
+
 	/* check input parameters */
 	if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
 		queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
@@ -3229,7 +3292,7 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
 		return NULL;
 	}
 
-	struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+	cb = rte_zmalloc(NULL, sizeof(*cb), 0);
 
 	if (cb == NULL) {
 		rte_errno = ENOMEM;
@@ -3239,14 +3302,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
 	cb->fn.rx = fn;
 	cb->param = user_param;
 
-	rte_spinlock_lock(&rte_eth_rx_cb_lock);
-	/* Add the callbacks at fisrt position*/
-	cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs;
-	rte_smp_wmb();
-	rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
-	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
-	return cb;
+	return add_rxtx_callback(&rte_eth_devices[port_id].rx_ql[queue_id].cbs,
+		1, cb, &rte_eth_rx_cb_lock);
 }
 
 void *
@@ -3257,6 +3314,8 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
 	rte_errno = ENOTSUP;
 	return NULL;
 #endif
+	struct rte_eth_rxtx_callback *cb;
+
 	/* check input parameters */
 	if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
 		    queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
@@ -3264,8 +3323,7 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
 		return NULL;
 	}
 
-	struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
-
+	cb = rte_zmalloc(NULL, sizeof(*cb), 0);
 	if (cb == NULL) {
 		rte_errno = ENOMEM;
 		return NULL;
@@ -3274,22 +3332,8 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
 	cb->fn.tx = fn;
 	cb->param = user_param;
 
-	rte_spinlock_lock(&rte_eth_tx_cb_lock);
-	/* Add the callbacks in fifo order. */
-	struct rte_eth_rxtx_callback *tail =
-		rte_eth_devices[port_id].tx_ql[queue_id].cbs;
-
-	if (!tail) {
-		rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb;
-
-	} else {
-		while (tail->next)
-			tail = tail->next;
-		tail->next = cb;
-	}
-	rte_spinlock_unlock(&rte_eth_tx_cb_lock);
-
-	return cb;
+	return add_rxtx_callback(&rte_eth_devices[port_id].tx_ql[queue_id].cbs,
+		0, cb, &rte_eth_tx_cb_lock);
 }
 
 int
@@ -3299,31 +3343,16 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
 #ifndef RTE_ETHDEV_RXTX_CALLBACKS
 	return -ENOTSUP;
 #endif
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+
 	/* Check input parameters. */
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
 	if (user_cb == NULL ||
 			queue_id >= rte_eth_devices[port_id].data->nb_rx_queues)
 		return -EINVAL;
 
-	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
-	struct rte_eth_rxtx_callback *cb;
-	struct rte_eth_rxtx_callback **prev_cb;
-	int ret = -EINVAL;
-
-	rte_spinlock_lock(&rte_eth_rx_cb_lock);
-	prev_cb = &dev->rx_ql[queue_id].cbs;
-	for (; *prev_cb != NULL; prev_cb = &cb->next) {
-		cb = *prev_cb;
-		if (cb == user_cb) {
-			/* Remove the user cb from the callback list. */
-			*prev_cb = cb->next;
-			ret = 0;
-			break;
-		}
-	}
-	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
-	return ret;
+	return del_rxtx_callback(&dev->rx_ql[queue_id].cbs, user_cb,
+		&rte_eth_rx_cb_lock);
 }
 
 int
@@ -3333,31 +3362,16 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
 #ifndef RTE_ETHDEV_RXTX_CALLBACKS
 	return -ENOTSUP;
 #endif
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+
 	/* Check input parameters. */
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
 	if (user_cb == NULL ||
 			queue_id >= rte_eth_devices[port_id].data->nb_tx_queues)
 		return -EINVAL;
 
-	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
-	int ret = -EINVAL;
-	struct rte_eth_rxtx_callback *cb;
-	struct rte_eth_rxtx_callback **prev_cb;
-
-	rte_spinlock_lock(&rte_eth_tx_cb_lock);
-	prev_cb = &dev->tx_ql[queue_id].cbs;
-	for (; *prev_cb != NULL; prev_cb = &cb->next) {
-		cb = *prev_cb;
-		if (cb == user_cb) {
-			/* Remove the user cb from the callback list. */
-			*prev_cb = cb->next;
-			ret = 0;
-			break;
-		}
-	}
-	rte_spinlock_unlock(&rte_eth_tx_cb_lock);
-
-	return ret;
+	return del_rxtx_callback(&dev->tx_ql[queue_id].cbs, user_cb,
+		&rte_eth_tx_cb_lock);
 }
 
 int
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index d62e1bcc3..2957f620e 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1709,6 +1709,23 @@ struct rte_eth_rxtx_callback {
 };
 
 /**
+ * @internal
+ * Structure used to hold list of RX/TX callbacks, plus usage counter.
+ * Usage counter is incremented each time (rx|tx)_burst starts/stops
+ * using callback list.
+ */
+struct rte_eth_rxtx_cbs {
+	struct rte_eth_rxtx_callback *head; /**< head of callbacks list */
+	uint32_t use; /**< usage counter */
+};
+
+/*
+ * Odd number means that callback list is used by datapath (RX/TX)
+ * Even number means that callback list is not used by datapath (RX/TX)
+ */
+#define	RTE_ETH_RXTX_CBS_INUSE	1
+
+/**
  * A set of values to describe the possible states of an eth device.
  */
 enum rte_eth_dev_state {
@@ -1731,7 +1748,7 @@ struct rte_eth_queue_local {
 	eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */
 	eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */
 
-	struct rte_eth_rxtx_callback *cbs;
+	struct rte_eth_rxtx_cbs cbs;
 	/**< list of user supplied callbacks */
 } __rte_cache_aligned;
 
@@ -2814,6 +2831,60 @@ int rte_eth_dev_get_vlan_offload(uint16_t port_id);
 int rte_eth_dev_set_vlan_pvid(uint16_t port_id, uint16_t pvid, int on);
 
 /**
+ * @internal
+ * Marks given callback list as used by datapath (RX/TX).
+ * @param cbs
+ *  Pointer to the callback list structure.
+ */
+static __rte_always_inline void
+__rte_eth_rxtx_cbs_inuse(struct rte_eth_rxtx_cbs *cbs)
+{
+	cbs->use++;
+	/* make sure no store/load reordering could happen */
+	rte_smp_mb();
+}
+
+/**
+ * @internal
+ * Marks given callback list as not used by datapath (RX/TX).
+ * @param cbs
+ *  Pointer to the callback list structure.
+ */
+static __rte_always_inline void
+__rte_eth_rxtx_cbs_unuse(struct rte_eth_rxtx_cbs *cbs)
+{
+	/* make sure all previous loads are completed */
+	rte_smp_rmb();
+	cbs->use++;
+}
+
+/**
+ * @internal
+ * Waits till datapath (RX/TX) finished using given callback list.
+ * @param cbs
+ *  Pointer to the callback list structure.
+ */
+static inline void
+__rte_eth_rxtx_cbs_wait(const struct rte_eth_rxtx_cbs *cbs)
+{
+	uint32_t nuse, puse;
+
+	/* make sure all previous loads and stores are completed */
+	rte_smp_mb();
+
+	puse = cbs->use;
+
+	/* in use, busy wait till current RX/TX iteration is finished */
+	if ((puse & RTE_ETH_RXTX_CBS_INUSE) != 0) {
+		do {
+			rte_pause();
+			rte_compiler_barrier();
+			nuse = cbs->use;
+		} while (nuse == puse);
+	}
+}
+
+/**
  *
  * Retrieve a burst of input packets from a receive queue of an Ethernet
  * device. The retrieved packets are stored in *rte_mbuf* structures whose
@@ -2911,6 +2982,7 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 		return 0;
 	}
 #endif
+
 	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
 			rx_pkts, nb_pkts);
 
@@ -2920,15 +2992,15 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 		struct rte_eth_rxtx_callback *cb;
 
 		ql = dev->rx_ql + queue_id;
-		cb = ql->cbs;
-
-		if (unlikely(cb != NULL)) {
-			do {
-				nb_rx = cb->fn.rx(port_id, queue_id,
-						rx_pkts, nb_rx,
-						nb_pkts, cb->param);
-				cb = cb->next;
-			} while (cb != NULL);
+		if (unlikely(ql->cbs.head != NULL)) {
+
+			__rte_eth_rxtx_cbs_inuse(&ql->cbs);
+
+			for (cb = ql->cbs.head; cb != NULL; cb = cb->next)
+				nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts,
+					nb_rx, nb_pkts, cb->param);
+
+			__rte_eth_rxtx_cbs_unuse(&ql->cbs);
 		}
 	}
 #endif
@@ -3187,20 +3259,21 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
 		struct rte_eth_rxtx_callback *cb;
 
 		ql = dev->tx_ql + queue_id;
-		cb = ql->cbs;
-
-		if (unlikely(cb != NULL)) {
-			do {
-				nb_pkts = cb->fn.tx(port_id, queue_id,
-						tx_pkts, nb_pkts,
-						cb->param);
-				cb = cb->next;
-			} while (cb != NULL);
+		if (unlikely(ql->cbs.head != NULL)) {
+
+			__rte_eth_rxtx_cbs_inuse(&ql->cbs);
+
+			for (cb = ql->cbs.head; cb != NULL; cb = cb->next)
+				nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts,
+					nb_pkts, cb->param);
+
+			__rte_eth_rxtx_cbs_unuse(&ql->cbs);
 		}
 	}
 #endif
 
-	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
+	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts,
+		nb_pkts);
 }
 
 /**
@@ -4199,16 +4272,10 @@ void *rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
  * This function is used to removed callbacks that were added to a NIC port
  * queue using rte_eth_add_rx_callback().
  *
- * Note: the callback is removed from the callback list but it isn't freed
- * since the it may still be in use. The memory for the callback can be
- * subsequently freed back by the application by calling rte_free():
- *
- * - Immediately - if the port is stopped, or the user knows that no
- *   callbacks are in flight e.g. if called from the thread doing RX/TX
- *   on that queue.
- *
- * - After a short delay - where the delay is sufficient to allow any
- *   in-flight callbacks to complete.
+ * Note: that after callback is removed from the callback list associated
+ * with it memory is freed, and user shouldn't refer it any more.
+ * After successfull completion of that function user can safely release
+ * any resources associated with that callback.
  *
  * @param port_id
  *   The port identifier of the Ethernet device.
-- 
2.13.5



More information about the dev mailing list