[PATCH v4 03/27] ring: unify memory model on C11, remove atomic32
Stephen Hemminger
stephen at networkplumber.org
Wed May 27 01:23:53 CEST 2026
Remove the RTE_USE_C11_MEM_MODEL build switch; C11 atomics are now
the default for all platforms. Unifies __rte_ring_update_tail into
the C11 form (atomic_store_release replaces the older rte_smp_wmb +
plain store on the generic path) and renames rte_ring_generic_pvt.h
to rte_ring_x86_pvt.h to reflect its new scope.
Also splits the head-move helper into separate ST and MT variants,
removing the runtime is_st branch from the MT retry loop.
This gets small boost and scopes the following exception
more tightly.
Exception: on x86 with GCC, atomic_compare_exchange on the head CAS
regresses MP/MC contended throughput by ~20% existing hand-written
cmpxchg. As a workaround, GCC-on-x86 builds use the older
__sync_bool_compare_and_swap builtin, which generates equivalent
code to the original asm. Can be reverted if/when GCC gets
fixed; similar issue was observed in Linux kernel.
Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
lib/ring/meson.build | 2 +-
lib/ring/rte_ring_c11_pvt.h | 75 +++--------
lib/ring/rte_ring_elem_pvt.h | 125 ++++++++++++++++--
..._ring_generic_pvt.h => rte_ring_x86_pvt.h} | 61 ++-------
lib/ring/soring.c | 15 ++-
5 files changed, 158 insertions(+), 120 deletions(-)
rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_x86_pvt.h} (60%)
diff --git a/lib/ring/meson.build b/lib/ring/meson.build
index 21f2c12989..b178c963b8 100644
--- a/lib/ring/meson.build
+++ b/lib/ring/meson.build
@@ -9,7 +9,7 @@ indirect_headers += files (
'rte_ring_elem.h',
'rte_ring_elem_pvt.h',
'rte_ring_c11_pvt.h',
- 'rte_ring_generic_pvt.h',
+ 'rte_ring_x86_pvt.h',
'rte_ring_hts.h',
'rte_ring_hts_elem_pvt.h',
'rte_ring_peek.h',
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 07b6efc416..3efe011f08 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -15,35 +15,10 @@
* @file rte_ring_c11_pvt.h
* It is not recommended to include this file directly,
* include <rte_ring.h> instead.
- * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * Contains internal helper functions for MP and MC ring modes.
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- RTE_SET_USED(enqueue);
-
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- /*
- * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
- * Ensures that memory effects by this thread on ring elements array
- * is observed by a different thread of the other type.
- */
- rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
-}
-
/**
* @internal This is a helper function that moves the producer/consumer head
*
@@ -72,14 +47,11 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
- enum rte_ring_queue_behavior behavior,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
- uint32_t stail;
- int success;
unsigned int max = n;
/*
@@ -89,8 +61,7 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
* d->head.
* If not, an unsafe partial order may ensue.
*/
- *old_head = rte_atomic_load_explicit(&d->head,
- rte_memory_order_acquire);
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
do {
/* Reset n to the initial burst count */
n = max;
@@ -101,15 +72,14 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
* ring elements array is observed by the time
* this thread observes its tail update.
*/
- stail = rte_atomic_load_explicit(&s->tail,
- rte_memory_order_acquire);
+ uint32_t stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *entries = (capacity + stail - *old_head);
+ *entries = capacity + stail - *old_head;
/* check that we have enough room in ring */
if (unlikely(n > *entries))
@@ -120,25 +90,20 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head is updated */
- /*
- * R1/A2.
- * R1: Establishes a synchronizing edge with A0 of a
- * different thread.
- * A2: Establishes a synchronizing edge with R1 of a
- * different thread to observe same value for stail
- * observed by that thread on CAS failure (to retry
- * with an updated *old_head).
- */
- success = rte_atomic_compare_exchange_strong_explicit(
- &d->head, old_head, *new_head,
- rte_memory_order_release,
- rte_memory_order_acquire);
- } while (unlikely(success == 0));
+
+ /* on failure, *old_head is updated */
+ /*
+ * R1/A2.
+ * R1: Establishes a synchronizing edge with A0 of a
+ * different thread.
+ * A2: Establishes a synchronizing edge with R1 of a
+ * different thread to observe same value for stail
+ * observed by that thread on CAS failure (to retry
+ * with an updated *old_head).
+ */
+ } while (unlikely(!rte_atomic_compare_exchange_strong_explicit(
+ &d->head, old_head, *new_head,
+ rte_memory_order_release, rte_memory_order_acquire)));
return n;
}
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..9d1da12a92 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -299,17 +299,108 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
cons_head & r->mask, esize, num);
}
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier
- * It depends on performance test results.
+/**
+ * @internal This function updates tail values.
*/
-#ifdef RTE_USE_C11_MEM_MODEL
-#include "rte_ring_c11_pvt.h"
+static __rte_always_inline void
+__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
+ uint32_t new_val, uint32_t single, uint32_t enqueue)
+{
+ RTE_SET_USED(enqueue);
+
+ /*
+ * If there are other enqueues/dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ if (!single)
+ rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
+ rte_memory_order_relaxed);
+
+ /*
+ * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+ * Ensures that memory effects by this thread on ring elements array
+ * is observed by a different thread of the other type.
+ */
+ rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ *
+ * This optimized version for single threaded case.
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+
+ /*
+ * A0: Establishes a synchronizing edge with R1.
+ * Ensure that this thread observes same values
+ * to stail observed by the thread that updated
+ * d->head.
+ * If not, an unsafe partial order may ensue.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
+
+ /*
+ * A1: Establishes a synchronizing edge with R0.
+ * Ensures that other thread's memory effects on
+ * ring elements array is observed by the time
+ * this thread observes its tail update.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > s->tail). So 'entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ d->head = *new_head;
+ }
+
+ return n;
+}
+
+/* There are two choices because GCC optimizer does poorly on atomic_compare_exchange */
+#if defined(RTE_TOOLCHAIN_GCC) && defined(RTE_ARCH_X86)
+#include "rte_ring_x86_pvt.h"
#else
-#include "rte_ring_generic_pvt.h"
+#include "rte_ring_c11_pvt.h"
#endif
/**
@@ -341,8 +432,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
- return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
- is_sp, n, behavior, old_head, new_head, free_entries);
+ if (is_sp)
+ return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
}
/**
@@ -374,8 +469,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
uint32_t *old_head, uint32_t *new_head,
uint32_t *entries)
{
- return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
- is_sc, n, behavior, old_head, new_head, entries);
+ if (is_sc)
+ return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
}
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_x86_pvt.h
similarity index 60%
rename from lib/ring/rte_ring_generic_pvt.h
rename to lib/ring/rte_ring_x86_pvt.h
index affd2d5ba7..c8de108bbd 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_x86_pvt.h
@@ -7,39 +7,19 @@
* Used as BSD-3 Licensed with permission from Kip Macy.
*/
-#ifndef _RTE_RING_GENERIC_PVT_H_
-#define _RTE_RING_GENERIC_PVT_H_
+#ifndef _RTE_RING_X86_PVT_H_
+#define _RTE_RING_X86_PVT_H_
/**
- * @file rte_ring_generic_pvt.h
+ * @file rte_ring_x86_pvt.h
* It is not recommended to include this file directly,
* include <rte_ring.h> instead.
- * Contains internal helper functions for MP/SP and MC/SC ring modes.
- * For more information please refer to <rte_ring.h>.
+ *
+ * Contains internal helper functions for MP and MC ring modes.
+ * It is GCC specific to workaround poor optimizer handling of C11 atomic
+ * compare_exchange.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- if (enqueue)
- rte_smp_wmb();
- else
- rte_smp_rmb();
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- ht->tail = new_val;
-}
-
/**
* @internal This is a helper function that moves the producer/consumer head
*
@@ -50,8 +30,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -68,14 +46,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
+ unsigned int n,
enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
- int success;
do {
/* Reset n to the initial burst count */
@@ -83,18 +60,13 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
*old_head = d->head;
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
-
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *entries = (capacity + s->tail - *old_head);
+ *entries = capacity + s->tail - *old_head;
/* check that we have enough room in ring */
if (unlikely(n > *entries))
@@ -105,15 +77,10 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
- } while (unlikely(success == 0));
+ } while (unlikely(!__sync_bool_compare_and_swap(
+ (uint32_t *)(uintptr_t)&d->head,
+ *old_head, *new_head)));
return n;
}
-#endif /* _RTE_RING_GENERIC_PVT_H_ */
+#endif /* _RTE_RING_X86_PVT_H_ */
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 3b90521bdb..0e8bbc03c1 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, st, num, behavior, head, next, free);
+ n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
break;
case RTE_RING_SYNC_MT_RTS:
n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
@@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
+ head, next, avail);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->cons.ht,
- &r->stage[stage].ht, 0, st, num, behavior,
+ n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
head, next, avail);
break;
case RTE_RING_SYNC_MT_RTS:
--
2.53.0
More information about the dev
mailing list