[PATCH v2 19/19] ring: use rte optional stdatomic API
Konstantin Ananyev
konstantin.ananyev at huawei.com
Wed Oct 25 12:06:23 CEST 2023
>
> On Tue, Oct 24, 2023 at 09:43:13AM +0100, Konstantin Ananyev wrote:
> > 17.10.2023 21:31, Tyler Retzlaff пишет:
> > >Replace the use of gcc builtin __atomic_xxx intrinsics with
> > >corresponding rte_atomic_xxx optional stdatomic API
> > >
> > >Signed-off-by: Tyler Retzlaff <roretzla at linux.microsoft.com>
> > >---
> > > drivers/net/mlx5/mlx5_hws_cnt.h | 2 +-
> > > lib/ring/rte_ring_c11_pvt.h | 33 +++++++++++++++++----------------
> > > lib/ring/rte_ring_core.h | 10 +++++-----
> > > lib/ring/rte_ring_generic_pvt.h | 3 ++-
> > > lib/ring/rte_ring_hts_elem_pvt.h | 22 ++++++++++++----------
> > > lib/ring/rte_ring_peek_elem_pvt.h | 6 +++---
> > > lib/ring/rte_ring_rts_elem_pvt.h | 27 ++++++++++++++-------------
> > > 7 files changed, 54 insertions(+), 49 deletions(-)
> > >
> > >diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h
> > >index f462665..cc9ac10 100644
> > >--- a/drivers/net/mlx5/mlx5_hws_cnt.h
> > >+++ b/drivers/net/mlx5/mlx5_hws_cnt.h
> > >@@ -394,7 +394,7 @@ struct mlx5_hws_age_param {
> > > __rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n,
> > > &zcd->ptr1, &zcd->n1, &zcd->ptr2);
> > > /* Update tail */
> > >- __atomic_store_n(&r->prod.tail, revert2head, __ATOMIC_RELEASE);
> > >+ rte_atomic_store_explicit(&r->prod.tail, revert2head, rte_memory_order_release);
> > > return n;
> > > }
> > >diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> > >index f895950..f8be538 100644
> > >--- a/lib/ring/rte_ring_c11_pvt.h
> > >+++ b/lib/ring/rte_ring_c11_pvt.h
> > >@@ -22,9 +22,10 @@
> > > * we need to wait for them to complete
> > > */
> > > if (!single)
> > >- rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
> > >+ rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
> > >+ rte_memory_order_relaxed);
> > >- __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
> > >+ rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
> > > }
> > > /**
> > >@@ -61,19 +62,19 @@
> > > unsigned int max = n;
> > > int success;
> > >- *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
> > >+ *old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed);
> > > do {
> > > /* Reset n to the initial burst count */
> > > n = max;
> > > /* Ensure the head is read before tail */
> > >- __atomic_thread_fence(__ATOMIC_ACQUIRE);
> > >+ __atomic_thread_fence(rte_memory_order_acquire);
> > > /* load-acquire synchronize with store-release of ht->tail
> > > * in update_tail.
> > > */
> > >- cons_tail = __atomic_load_n(&r->cons.tail,
> > >- __ATOMIC_ACQUIRE);
> > >+ cons_tail = rte_atomic_load_explicit(&r->cons.tail,
> > >+ rte_memory_order_acquire);
> > > /* The subtraction is done between two unsigned 32bits value
> > > * (the result is always modulo 32 bits even if we have
> > >@@ -95,10 +96,10 @@
> > > r->prod.head = *new_head, success = 1;
> > > else
> > > /* on failure, *old_head is updated */
> > >- success = __atomic_compare_exchange_n(&r->prod.head,
> > >+ success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
> > > old_head, *new_head,
> > >- 0, __ATOMIC_RELAXED,
> > >- __ATOMIC_RELAXED);
> > >+ rte_memory_order_relaxed,
> > >+ rte_memory_order_relaxed);
> > > } while (unlikely(success == 0));
> > > return n;
> > > }
> > >@@ -137,19 +138,19 @@
> > > int success;
> > > /* move cons.head atomically */
> > >- *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
> > >+ *old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_relaxed);
> > > do {
> > > /* Restore n as it may change every loop */
> > > n = max;
> > > /* Ensure the head is read before tail */
> > >- __atomic_thread_fence(__ATOMIC_ACQUIRE);
> > >+ __atomic_thread_fence(rte_memory_order_acquire);
> > > /* this load-acquire synchronize with store-release of ht->tail
> > > * in update_tail.
> > > */
> > >- prod_tail = __atomic_load_n(&r->prod.tail,
> > >- __ATOMIC_ACQUIRE);
> > >+ prod_tail = rte_atomic_load_explicit(&r->prod.tail,
> > >+ rte_memory_order_acquire);
> > > /* The subtraction is done between two unsigned 32bits value
> > > * (the result is always modulo 32 bits even if we have
> > >@@ -170,10 +171,10 @@
> > > r->cons.head = *new_head, success = 1;
> > > else
> > > /* on failure, *old_head will be updated */
> > >- success = __atomic_compare_exchange_n(&r->cons.head,
> > >+ success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
> > > old_head, *new_head,
> > >- 0, __ATOMIC_RELAXED,
> > >- __ATOMIC_RELAXED);
> > >+ rte_memory_order_relaxed,
> > >+ rte_memory_order_relaxed);
> > > } while (unlikely(success == 0));
> > > return n;
> > > }
> > >diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
> > >index 327fdcf..7a2b577 100644
> > >--- a/lib/ring/rte_ring_core.h
> > >+++ b/lib/ring/rte_ring_core.h
> > >@@ -67,7 +67,7 @@ enum rte_ring_sync_type {
> > > */
> > > struct rte_ring_headtail {
> > > volatile uint32_t head; /**< prod/consumer head. */
> > >- volatile uint32_t tail; /**< prod/consumer tail. */
> > >+ volatile RTE_ATOMIC(uint32_t) tail; /**< prod/consumer tail. */
> >
> > Probably a stupid q:
> > why we do need RTE_ATOMIC() around tail only?
> > Why head is not affected?
>
> you have a good eye and this is a slightly common issue that i've seen
> and there appear to be some interesting things showing up.
>
> the field being qualified has atomic operation performed on it the other
> field does not in the implementation. it may be an indication of a bug in
> the existing code or it may be intentional.
Hmm... but as I can see, we are doing similar operations on both head and tail.
For head it would be: atomic_load(), then either atomic_store() or atomic_cas().
For tail it would be: atomic_load(), then atomic_store().
Or is that because of we missed atomic_store(&r->prod.head, ..., RELAXED) here:
static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
....
if (is_sp)
r->prod.head = *new_head, success = 1;
?
>
> case 1. atomics should be used but they aren't.
>
> there are fields in structures and variables that were accessed in a
> 'mixed' manner. that is in some instances __atomic_op_xxx was being used
> on them and in other instances not. sometimes it is the initialization
> case so it is probably okay, sometimes maybe not...
>
> case 2. broader scope atomic operation, or we don't care if narrower
> access is atomic.
>
> e.g.
> union {
> struct {
> uint32_t head;
> RTE_ATOMIC(uint32_t) tail;
> }
> RTE_ATOMIC(uint64_t) combined;
> }
>
> again, could be an indication of missing use of atomic, often the
> operation on the `combined' field consistently uses atomics but one of
> the head/tail fields will not be. on purpose? maybe if we are just doing
> == comparison?
>
> my approach in this series prioritized no functional change. as a result
> if any of the above are real bugs, they stay real bugs but i have not
> changed the way the variables are accessed. if i were to change the code
> and start atomic specifying it has a risk of performance regression (for
> cases where it isn't a bug) because specifying would result in the
> compiler code generating for strongest ordering seq_cst for accesses
> that are not using atomic generic functions that specify ordering.
>
> there is another case which comes up half a dozen times or so that is
> also concerning to me, but i would need the maintainers of the code to
> adapt the code to be correct or maybe it is okay...
>
>
> case 3. qualification discard .. is the existing code really okay?
>
> e.g.
>
> atomic_compare_exchange(*object, *expected, desired, ...)
>
> the issue is with the specification of the memory aliased by expected.
> gcc doesn't complain or enforce discarding of qualification when using
> builtin intrinsics. the result is that if expected is an atomic type it
> may be accessed in a non-atomic manner by the code generated for the
> atomic operation.
>
> again, i have chosen to maintain existing behavior by casting away the
> qualification if present on the expected argument.
>
> i feel that in terms of mutating the source tree it is best to separate
> conversion to atomic specified/qualified types into this separate series
> and then follow up with additional changes that may have
> functional/performance impact if not for any other reason that it
> narrows where you have to look if there is a change. certainly conversion
> to atomics has made these cases far easier to spot in the code.
>
> finally in terms of most of the toolchain/targets all of this is pretty
> moot because most of them are defaulting to enable_stdatomics=false so
> most likely if there are problems they will manifest on windows built with
> msvc only.
>
> thoughts?
>
> >
> > > union {
> > > /** sync type of prod/cons */
> > > enum rte_ring_sync_type sync_type;
> > >@@ -78,7 +78,7 @@ struct rte_ring_headtail {
> > > union __rte_ring_rts_poscnt {
> > > /** raw 8B value to read/write *cnt* and *pos* as one atomic op */
> > >- uint64_t raw __rte_aligned(8);
> > >+ RTE_ATOMIC(uint64_t) raw __rte_aligned(8);
> > > struct {
> > > uint32_t cnt; /**< head/tail reference counter */
> > > uint32_t pos; /**< head/tail position */
> > >@@ -94,10 +94,10 @@ struct rte_ring_rts_headtail {
> > > union __rte_ring_hts_pos {
> > > /** raw 8B value to read/write *head* and *tail* as one atomic op */
> > >- uint64_t raw __rte_aligned(8);
> > >+ RTE_ATOMIC(uint64_t) raw __rte_aligned(8);
> > > struct {
> > >- uint32_t head; /**< head position */
> > >- uint32_t tail; /**< tail position */
> > >+ RTE_ATOMIC(uint32_t) head; /**< head position */
> > >+ RTE_ATOMIC(uint32_t) tail; /**< tail position */
> > > } pos;
> > > };
> > >diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
> > >index 5acb6e5..ffb3654 100644
> > >--- a/lib/ring/rte_ring_generic_pvt.h
> > >+++ b/lib/ring/rte_ring_generic_pvt.h
> > >@@ -23,7 +23,8 @@
> > > * we need to wait for them to complete
> > > */
> > > if (!single)
> > >- rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
> > >+ rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
> >
> > I suppose we do need that double type conversion only for atomic
> > types right?
> >
> > >+ rte_memory_order_relaxed);
> > > ht->tail = new_val;
> > > }
> > >diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
> > >index a8678d3..91f5eec 100644
> > >--- a/lib/ring/rte_ring_hts_elem_pvt.h
> > >+++ b/lib/ring/rte_ring_hts_elem_pvt.h
> > >@@ -10,6 +10,8 @@
> > > #ifndef _RTE_RING_HTS_ELEM_PVT_H_
> > > #define _RTE_RING_HTS_ELEM_PVT_H_
> > >+#include <rte_stdatomic.h>
> > >+
> > > /**
> > > * @file rte_ring_hts_elem_pvt.h
> > > * It is not recommended to include this file directly,
> > >@@ -30,7 +32,7 @@
> > > RTE_SET_USED(enqueue);
> > > tail = old_tail + num;
> > >- __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE);
> > >+ rte_atomic_store_explicit(&ht->ht.pos.tail, tail, rte_memory_order_release);
> > > }
> > > /**
> > >@@ -44,7 +46,7 @@
> > > {
> > > while (p->pos.head != p->pos.tail) {
> > > rte_pause();
> > >- p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
> > >+ p->raw = rte_atomic_load_explicit(&ht->ht.raw, rte_memory_order_acquire);
> > > }
> > > }
> > >@@ -61,7 +63,7 @@
> > > const uint32_t capacity = r->capacity;
> > >- op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE);
> > >+ op.raw = rte_atomic_load_explicit(&r->hts_prod.ht.raw, rte_memory_order_acquire);
> > > do {
> > > /* Reset n to the initial burst count */
> > >@@ -98,9 +100,9 @@
> > > * - OOO reads of cons tail value
> > > * - OOO copy of elems from the ring
> > > */
> > >- } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
> > >- &op.raw, np.raw,
> > >- 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> > >+ } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_prod.ht.raw,
> > >+ (uint64_t *)(uintptr_t)&op.raw, np.raw,
> > >+ rte_memory_order_acquire, rte_memory_order_acquire) == 0);
> > > *old_head = op.pos.head;
> > > return n;
> > >@@ -117,7 +119,7 @@
> > > uint32_t n;
> > > union __rte_ring_hts_pos np, op;
> > >- op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE);
> > >+ op.raw = rte_atomic_load_explicit(&r->hts_cons.ht.raw, rte_memory_order_acquire);
> > > /* move cons.head atomically */
> > > do {
> > >@@ -153,9 +155,9 @@
> > > * - OOO reads of prod tail value
> > > * - OOO copy of elems from the ring
> > > */
> > >- } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
> > >- &op.raw, np.raw,
> > >- 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> > >+ } while (rte_atomic_compare_exchange_strong_explicit(&r->hts_cons.ht.raw,
> > >+ (uint64_t *)(uintptr_t)&op.raw, np.raw,
> > >+ rte_memory_order_acquire, rte_memory_order_acquire) == 0);
> > > *old_head = op.pos.head;
> > > return n;
> > >diff --git a/lib/ring/rte_ring_peek_elem_pvt.h b/lib/ring/rte_ring_peek_elem_pvt.h
> > >index bb0a7d5..b5f0822 100644
> > >--- a/lib/ring/rte_ring_peek_elem_pvt.h
> > >+++ b/lib/ring/rte_ring_peek_elem_pvt.h
> > >@@ -59,7 +59,7 @@
> > > pos = tail + num;
> > > ht->head = pos;
> > >- __atomic_store_n(&ht->tail, pos, __ATOMIC_RELEASE);
> > >+ rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release);
> > > }
> > > /**
> > >@@ -78,7 +78,7 @@
> > > uint32_t n;
> > > union __rte_ring_hts_pos p;
> > >- p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
> > >+ p.raw = rte_atomic_load_explicit(&ht->ht.raw, rte_memory_order_relaxed);
> > > n = p.pos.head - p.pos.tail;
> > > RTE_ASSERT(n >= num);
> > >@@ -104,7 +104,7 @@
> > > p.pos.head = tail + num;
> > > p.pos.tail = p.pos.head;
> > >- __atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE);
> > >+ rte_atomic_store_explicit(&ht->ht.raw, p.raw, rte_memory_order_release);
> > > }
> > > /**
> > >diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
> > >index 7164213..1226503 100644
> > >--- a/lib/ring/rte_ring_rts_elem_pvt.h
> > >+++ b/lib/ring/rte_ring_rts_elem_pvt.h
> > >@@ -31,18 +31,19 @@
> > > * might preceded us, then don't update tail with new value.
> > > */
> > >- ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
> > >+ ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
> > > do {
> > > /* on 32-bit systems we have to do atomic read here */
> > >- h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
> > >+ h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
> > > nt.raw = ot.raw;
> > > if (++nt.val.cnt == h.val.cnt)
> > > nt.val.pos = h.val.pos;
> > >- } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
> > >- 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
> > >+ } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
> > >+ (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
> > >+ rte_memory_order_release, rte_memory_order_acquire) == 0);
> > > }
> > > /**
> > >@@ -59,7 +60,7 @@
> > > while (h->val.pos - ht->tail.val.pos > max) {
> > > rte_pause();
> > >- h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
> > >+ h->raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_acquire);
> > > }
> > > }
> > >@@ -76,7 +77,7 @@
> > > const uint32_t capacity = r->capacity;
> > >- oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE);
> > >+ oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
> > > do {
> > > /* Reset n to the initial burst count */
> > >@@ -113,9 +114,9 @@
> > > * - OOO reads of cons tail value
> > > * - OOO copy of elems to the ring
> > > */
> > >- } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
> > >- &oh.raw, nh.raw,
> > >- 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> > >+ } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
> > >+ (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
> > >+ rte_memory_order_acquire, rte_memory_order_acquire) == 0);
> > > *old_head = oh.val.pos;
> > > return n;
> > >@@ -132,7 +133,7 @@
> > > uint32_t n;
> > > union __rte_ring_rts_poscnt nh, oh;
> > >- oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE);
> > >+ oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
> > > /* move cons.head atomically */
> > > do {
> > >@@ -168,9 +169,9 @@
> > > * - OOO reads of prod tail value
> > > * - OOO copy of elems from the ring
> > > */
> > >- } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
> > >- &oh.raw, nh.raw,
> > >- 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> > >+ } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
> > >+ (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
> > >+ rte_memory_order_acquire, rte_memory_order_acquire) == 0);
> > > *old_head = oh.val.pos;
> > > return n;
More information about the dev
mailing list