[PATCH v4 03/27] ring: unify memory model on C11, remove atomic32

Stephen Hemminger stephen at networkplumber.org
Wed May 27 01:23:53 CEST 2026


Remove the RTE_USE_C11_MEM_MODEL build switch; C11 atomics are now
the default for all platforms. Unifies __rte_ring_update_tail into
the C11 form (atomic_store_release replaces the older rte_smp_wmb +
plain store on the generic path) and renames rte_ring_generic_pvt.h
to rte_ring_x86_pvt.h to reflect its new scope.

Also splits the head-move helper into separate ST and MT variants,
removing the runtime is_st branch from the MT retry loop.
This gets small boost and scopes the following exception
more tightly.

Exception: on x86 with GCC, atomic_compare_exchange on the head CAS
regresses MP/MC contended throughput by ~20% existing hand-written
cmpxchg. As a workaround, GCC-on-x86 builds use the older
__sync_bool_compare_and_swap builtin, which generates equivalent
code to the original asm. Can be reverted if/when GCC gets
fixed; similar issue was observed in Linux kernel.

Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
 lib/ring/meson.build                          |   2 +-
 lib/ring/rte_ring_c11_pvt.h                   |  75 +++--------
 lib/ring/rte_ring_elem_pvt.h                  | 125 ++++++++++++++++--
 ..._ring_generic_pvt.h => rte_ring_x86_pvt.h} |  61 ++-------
 lib/ring/soring.c                             |  15 ++-
 5 files changed, 158 insertions(+), 120 deletions(-)
 rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_x86_pvt.h} (60%)

diff --git a/lib/ring/meson.build b/lib/ring/meson.build
index 21f2c12989..b178c963b8 100644
--- a/lib/ring/meson.build
+++ b/lib/ring/meson.build
@@ -9,7 +9,7 @@ indirect_headers += files (
         'rte_ring_elem.h',
         'rte_ring_elem_pvt.h',
         'rte_ring_c11_pvt.h',
-        'rte_ring_generic_pvt.h',
+        'rte_ring_x86_pvt.h',
         'rte_ring_hts.h',
         'rte_ring_hts_elem_pvt.h',
         'rte_ring_peek.h',
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 07b6efc416..3efe011f08 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -15,35 +15,10 @@
  * @file rte_ring_c11_pvt.h
  * It is not recommended to include this file directly,
  * include <rte_ring.h> instead.
- * Contains internal helper functions for MP/SP and MC/SC ring modes.
+ * Contains internal helper functions for MP and MC ring modes.
  * For more information please refer to <rte_ring.h>.
  */
 
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
-		uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
-	RTE_SET_USED(enqueue);
-
-	/*
-	 * If there are other enqueues/dequeues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	if (!single)
-		rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
-			rte_memory_order_relaxed);
-
-	/*
-	 * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
-	 * Ensures that memory effects by this thread on ring elements array
-	 * is observed by a different thread of the other type.
-	 */
-	rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
-}
-
 /**
  * @internal This is a helper function that moves the producer/consumer head
  *
@@ -72,14 +47,11 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
  */
 static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
 		const struct rte_ring_headtail *s, uint32_t capacity,
-		unsigned int is_st, unsigned int n,
-		enum rte_ring_queue_behavior behavior,
+		unsigned int n,	enum rte_ring_queue_behavior behavior,
 		uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
-	uint32_t stail;
-	int success;
 	unsigned int max = n;
 
 	/*
@@ -89,8 +61,7 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 	 * d->head.
 	 * If not, an unsafe partial order may ensue.
 	 */
-	*old_head = rte_atomic_load_explicit(&d->head,
-			rte_memory_order_acquire);
+	*old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
@@ -101,15 +72,14 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 		 * ring elements array is observed by the time
 		 * this thread observes its tail update.
 		 */
-		stail = rte_atomic_load_explicit(&s->tail,
-					rte_memory_order_acquire);
+		uint32_t stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > s->tail). So 'entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*entries = (capacity + stail - *old_head);
+		*entries = capacity + stail - *old_head;
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *entries))
@@ -120,25 +90,20 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_st) {
-			d->head = *new_head;
-			success = 1;
-		} else
-			/* on failure, *old_head is updated */
-			/*
-			 * R1/A2.
-			 * R1: Establishes a synchronizing edge with A0 of a
-			 * different thread.
-			 * A2: Establishes a synchronizing edge with R1 of a
-			 * different thread to observe same value for stail
-			 * observed by that thread on CAS failure (to retry
-			 * with an updated *old_head).
-			 */
-			success = rte_atomic_compare_exchange_strong_explicit(
-					&d->head, old_head, *new_head,
-					rte_memory_order_release,
-					rte_memory_order_acquire);
-	} while (unlikely(success == 0));
+
+		/* on failure, *old_head is updated */
+		/*
+		 * R1/A2.
+		 * R1: Establishes a synchronizing edge with A0 of a
+		 * different thread.
+		 * A2: Establishes a synchronizing edge with R1 of a
+		 * different thread to observe same value for stail
+		 * observed by that thread on CAS failure (to retry
+		 * with an updated *old_head).
+		 */
+	} while (unlikely(!rte_atomic_compare_exchange_strong_explicit(
+				  &d->head, old_head, *new_head,
+				  rte_memory_order_release, rte_memory_order_acquire)));
 	return n;
 }
 
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..9d1da12a92 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -299,17 +299,108 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
 			cons_head & r->mask, esize, num);
 }
 
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier
- * It depends on performance test results.
+/**
+ * @internal This function updates tail values.
  */
-#ifdef RTE_USE_C11_MEM_MODEL
-#include "rte_ring_c11_pvt.h"
+static __rte_always_inline void
+__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
+		uint32_t new_val, uint32_t single, uint32_t enqueue)
+{
+	RTE_SET_USED(enqueue);
+
+	/*
+	 * If there are other enqueues/dequeues in progress that preceded us,
+	 * we need to wait for them to complete
+	 */
+	if (!single)
+		rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
+			rte_memory_order_relaxed);
+
+	/*
+	 * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+	 * Ensures that memory effects by this thread on ring elements array
+	 * is observed by a different thread of the other type.
+	 */
+	rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ *
+ *
+ * This optimized version for single threaded case.
+ *
+ * @param d
+ *   A pointer to the headtail structure with head value to be moved
+ * @param s
+ *   A pointer to the counter-part headtail structure. Note that this
+ *   function only reads tail value from it
+ * @param capacity
+ *   Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ *   The number of elements we want to move head value on
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Move on a fixed number of items
+ *   RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ *   Returns head value as it was before the move
+ * @param new_head
+ *   Returns the new head value
+ * @param entries
+ *   Returns the number of ring entries available BEFORE head was moved
+ * @return
+ *   Actual number of objects the head was moved on
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+		const struct rte_ring_headtail *s, uint32_t capacity,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+	uint32_t stail;
+
+	/*
+	 * A0: Establishes a synchronizing edge with R1.
+	 * Ensure that this thread observes same values
+	 * to stail observed by the thread that updated
+	 * d->head.
+	 * If not, an unsafe partial order may ensue.
+	 */
+	*old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
+
+	/*
+	 * A1: Establishes a synchronizing edge with R0.
+	 * Ensures that other thread's memory effects on
+	 * ring elements array is observed by the time
+	 * this thread observes its tail update.
+	 */
+	stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+	/* The subtraction is done between two unsigned 32bits value
+	 * (the result is always modulo 32 bits even if we have
+	 * *old_head > s->tail). So 'entries' is always between 0
+	 * and capacity (which is < size).
+	 */
+	*entries = capacity + stail - *old_head;
+
+	/* check that we have enough room in ring */
+	if (unlikely(n > *entries))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+	if (n > 0) {
+		*new_head = *old_head + n;
+		d->head = *new_head;
+	}
+
+	return n;
+}
+
+/* There are two choices because GCC optimizer does poorly on atomic_compare_exchange */
+#if defined(RTE_TOOLCHAIN_GCC) && defined(RTE_ARCH_X86)
+#include "rte_ring_x86_pvt.h"
 #else
-#include "rte_ring_generic_pvt.h"
+#include "rte_ring_c11_pvt.h"
 #endif
 
 /**
@@ -341,8 +432,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		uint32_t *old_head, uint32_t *new_head,
 		uint32_t *free_entries)
 {
-	return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
-			is_sp, n, behavior, old_head, new_head, free_entries);
+	if (is_sp)
+		return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity,
+				n, behavior, old_head, new_head, free_entries);
+	else
+		return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity,
+				n, behavior, old_head, new_head, free_entries);
 }
 
 /**
@@ -374,8 +469,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 		uint32_t *old_head, uint32_t *new_head,
 		uint32_t *entries)
 {
-	return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
-			is_sc, n, behavior, old_head, new_head, entries);
+	if (is_sc)
+		return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0,
+				n, behavior, old_head, new_head, entries);
+	else
+		return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0,
+				n, behavior, old_head, new_head, entries);
 }
 
 /**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_x86_pvt.h
similarity index 60%
rename from lib/ring/rte_ring_generic_pvt.h
rename to lib/ring/rte_ring_x86_pvt.h
index affd2d5ba7..c8de108bbd 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_x86_pvt.h
@@ -7,39 +7,19 @@
  * Used as BSD-3 Licensed with permission from Kip Macy.
  */
 
-#ifndef _RTE_RING_GENERIC_PVT_H_
-#define _RTE_RING_GENERIC_PVT_H_
+#ifndef _RTE_RING_X86_PVT_H_
+#define _RTE_RING_X86_PVT_H_
 
 /**
- * @file rte_ring_generic_pvt.h
+ * @file rte_ring_x86_pvt.h
  * It is not recommended to include this file directly,
  * include <rte_ring.h> instead.
- * Contains internal helper functions for MP/SP and MC/SC ring modes.
- * For more information please refer to <rte_ring.h>.
+ *
+ * Contains internal helper functions for MP and MC ring modes.
+ * It is GCC specific to workaround poor optimizer handling of C11 atomic
+ * compare_exchange.
  */
 
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
-		uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
-	if (enqueue)
-		rte_smp_wmb();
-	else
-		rte_smp_rmb();
-	/*
-	 * If there are other enqueues/dequeues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	if (!single)
-		rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
-			rte_memory_order_relaxed);
-
-	ht->tail = new_val;
-}
-
 /**
  * @internal This is a helper function that moves the producer/consumer head
  *
@@ -50,8 +30,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
  *   function only reads tail value from it
  * @param capacity
  *   Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- *   Indicates whether multi-thread safe path is needed or not
  * @param n
  *   The number of elements we want to move head value on
  * @param behavior
@@ -68,14 +46,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
  */
 static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
 		const struct rte_ring_headtail *s, uint32_t capacity,
-		unsigned int is_st, unsigned int n,
+		unsigned int n,
 		enum rte_ring_queue_behavior behavior,
 		uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
 {
 	unsigned int max = n;
-	int success;
 
 	do {
 		/* Reset n to the initial burst count */
@@ -83,18 +60,13 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 
 		*old_head = d->head;
 
-		/* add rmb barrier to avoid load/load reorder in weak
-		 * memory model. It is noop on x86
-		 */
-		rte_smp_rmb();
-
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > s->tail). So 'entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*entries = (capacity + s->tail - *old_head);
+		*entries = capacity + s->tail - *old_head;
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *entries))
@@ -105,15 +77,10 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_st) {
-			d->head = *new_head;
-			success = 1;
-		} else
-			success = rte_atomic32_cmpset(
-					(uint32_t *)(uintptr_t)&d->head,
-					*old_head, *new_head);
-	} while (unlikely(success == 0));
+	} while (unlikely(!__sync_bool_compare_and_swap(
+				  (uint32_t *)(uintptr_t)&d->head,
+				  *old_head, *new_head)));
 	return n;
 }
 
-#endif /* _RTE_RING_GENERIC_PVT_H_ */
+#endif /* _RTE_RING_X86_PVT_H_ */
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 3b90521bdb..0e8bbc03c1 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
 
 	switch (st) {
 	case RTE_RING_SYNC_ST:
+		n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+				r->capacity, num, behavior, head, next, free);
+		break;
 	case RTE_RING_SYNC_MT:
-		n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
-			r->capacity, st, num, behavior, head, next, free);
+		n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht,
+				r->capacity, num, behavior, head, next, free);
 		break;
 	case RTE_RING_SYNC_MT_RTS:
 		n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
@@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
 
 	switch (st) {
 	case RTE_RING_SYNC_ST:
+		n = __rte_ring_headtail_move_head_st(&r->cons.ht,
+			&r->stage[stage].ht, 0, num, behavior,
+			head, next, avail);
+		break;
 	case RTE_RING_SYNC_MT:
-		n = __rte_ring_headtail_move_head(&r->cons.ht,
-			&r->stage[stage].ht, 0, st, num, behavior,
+		n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
+			&r->stage[stage].ht, 0, num, behavior,
 			head, next, avail);
 		break;
 	case RTE_RING_SYNC_MT_RTS:
-- 
2.53.0



More information about the dev mailing list