[dpdk-dev] [PATCH 2/2] lib/ethdev: fix memory ordering for call back functions
Ananyev, Konstantin
konstantin.ananyev at intel.com
Tue Oct 13 16:25:10 CEST 2020
>
> Call back functions are registered on the control plane. They
> are accessed from the data plane. Hence, correct memory orderings
> should be used to avoid race conditions.
>
> Fixes: 4dc294158cac ("ethdev: support optional Rx and Tx callbacks")
> Fixes: c8231c63ddcb ("ethdev: insert Rx callback as head of list")
> Cc: bruce.richardson at intel.com
> Cc: john.mcnamara at intel.com
> Cc: reshma.pattan at intel.com
> Cc: stable at dpdk.org
>
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
> ---
> lib/librte_ethdev/rte_ethdev.c | 28 +++++++++++++++++++++------
> lib/librte_ethdev/rte_ethdev.h | 35 ++++++++++++++++++++++++++--------
> 2 files changed, 49 insertions(+), 14 deletions(-)
>
> diff --git a/lib/librte_ethdev/rte_ethdev.c b/lib/librte_ethdev/rte_ethdev.c
> index 59a41c07f..d89fcdc77 100644
> --- a/lib/librte_ethdev/rte_ethdev.c
> +++ b/lib/librte_ethdev/rte_ethdev.c
> @@ -4486,12 +4486,20 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
> rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
>
> if (!tail) {
> - rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
> + /* Stores to cb->fn and cb->param should complete before
> + * cb is visible to data plane.
> + */
> + __atomic_store_n(
> + &rte_eth_devices[port_id].post_rx_burst_cbs[queue_id],
> + cb, __ATOMIC_RELEASE);
>
> } else {
> while (tail->next)
> tail = tail->next;
> - tail->next = cb;
> + /* Stores to cb->fn and cb->param should complete before
> + * cb is visible to data plane.
> + */
> + __atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE);
> }
> rte_spinlock_unlock(&rte_eth_rx_cb_lock);
>
> @@ -4576,12 +4584,20 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
> rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
>
> if (!tail) {
> - rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
> + /* Stores to cb->fn and cb->param should complete before
> + * cb is visible to data plane.
> + */
> + __atomic_store_n(
> + &rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id],
> + cb, __ATOMIC_RELEASE);
>
> } else {
> while (tail->next)
> tail = tail->next;
> - tail->next = cb;
> + /* Stores to cb->fn and cb->param should complete before
> + * cb is visible to data plane.
> + */
> + __atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE);
> }
> rte_spinlock_unlock(&rte_eth_tx_cb_lock);
>
> @@ -4612,7 +4628,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
> cb = *prev_cb;
> if (cb == user_cb) {
> /* Remove the user cb from the callback list. */
> - *prev_cb = cb->next;
> + __atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED);
> ret = 0;
> break;
> }
> @@ -4646,7 +4662,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
> cb = *prev_cb;
> if (cb == user_cb) {
> /* Remove the user cb from the callback list. */
> - *prev_cb = cb->next;
> + __atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED);
> ret = 0;
> break;
> }
> diff --git a/lib/librte_ethdev/rte_ethdev.h b/lib/librte_ethdev/rte_ethdev.h
> index 70295d7ab..d810e3e38 100644
> --- a/lib/librte_ethdev/rte_ethdev.h
> +++ b/lib/librte_ethdev/rte_ethdev.h
> @@ -3734,7 +3734,8 @@ struct rte_eth_rxtx_callback;
> * The callback function
> * @param user_param
> * A generic pointer parameter which will be passed to each invocation of the
> - * callback function on this port and queue.
> + * callback function on this port and queue. Inter-thread synchronization
> + * of any user data changes is the responsibility of the user.
> *
> * @return
> * NULL on error.
> @@ -3763,7 +3764,8 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
> * The callback function
> * @param user_param
> * A generic pointer parameter which will be passed to each invocation of the
> - * callback function on this port and queue.
> + * callback function on this port and queue. Inter-thread synchronization
> + * of any user data changes is the responsibility of the user.
> *
> * @return
> * NULL on error.
> @@ -3791,7 +3793,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
> * The callback function
> * @param user_param
> * A generic pointer parameter which will be passed to each invocation of the
> - * callback function on this port and queue.
> + * callback function on this port and queue. Inter-thread synchronization
> + * of any user data changes is the responsibility of the user.
> *
> * @return
> * NULL on error.
> @@ -3816,7 +3819,9 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
> * on that queue.
> *
> * - After a short delay - where the delay is sufficient to allow any
> - * in-flight callbacks to complete.
> + * in-flight callbacks to complete. Alternately, the RCU mechanism can be
> + * used to detect when data plane threads have ceased referencing the
> + * callback memory.
> *
> * @param port_id
> * The port identifier of the Ethernet device.
> @@ -3849,7 +3854,9 @@ int rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
> * on that queue.
> *
> * - After a short delay - where the delay is sufficient to allow any
> - * in-flight callbacks to complete.
> + * in-flight callbacks to complete. Alternately, the RCU mechanism can be
> + * used to detect when data plane threads have ceased referencing the
> + * callback memory.
> *
> * @param port_id
> * The port identifier of the Ethernet device.
> @@ -4510,10 +4517,16 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
> rx_pkts, nb_pkts);
>
> #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> - if (unlikely(dev->post_rx_burst_cbs[queue_id] != NULL)) {
> - struct rte_eth_rxtx_callback *cb =
> - dev->post_rx_burst_cbs[queue_id];
> + /* __ATOMIC_RELEASE memory order was used when the
> + * call back was inserted into the list.
> + * Since there is a clear dependency between loading
> + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
> + * not required.
> + */
> + struct rte_eth_rxtx_callback *cb =
> + dev->post_rx_burst_cbs[queue_id];
>
> + if (unlikely(cb != NULL)) {
> do {
> nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
> nb_pkts, cb->param);
> @@ -4775,6 +4788,12 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
> #endif
>
> #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> + /* __ATOMIC_RELEASE memory order was used when the
> + * call back was inserted into the list.
> + * Since there is a clear dependency between loading
> + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
> + * not required.
> + */
> struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
>
> if (unlikely(cb != NULL)) {
> --
Acked-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
> 2.17.1
More information about the dev
mailing list