[RFC 3/7] ring: use C11 atomic operations for MP/SP head/tail

Stephen Hemminger stephen at networkplumber.org
Thu May 21 06:17:03 CEST 2026


Last caller of rte_atomic32_cmpset() in lib/, blocking deprecation
of the rte_atomicNN_*() family.

Replace cmpset with rte_atomic_compare_exchange_weak_explicit(),
and convert head/tail loads/stores from implicit seq_cst to explicit
acquire/release. Matches the HTS/RTS pattern.

Acquire-load of d->head orders the subsequent load of s->tail (was
rte_smp_rmb()). Acquire-load of s->tail pairs with the release-store
of the counterpart tail in __rte_ring_update_tail(), which subsumes
the previous wmb/rmb barriers.

Weak CAS avoids arm64's hidden inner retry; the outer do-while already
loops. CAS orderings relaxed: no data published by the reservation.

The now-unused 'enqueue' parameter of __rte_ring_update_tail() is
removed; both call sites updated.

Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
 lib/ring/rte_ring_generic_pvt.h | 64 +++++++++++++++++++++++----------
 1 file changed, 45 insertions(+), 19 deletions(-)

diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..9497f6737b 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -23,21 +23,25 @@
  */
 static __rte_always_inline void
 __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
-		uint32_t new_val, uint32_t single, uint32_t enqueue)
+		uint32_t new_val, uint32_t single,
+		uint32_t enqueue __rte_unused)
 {
-	if (enqueue)
-		rte_smp_wmb();
-	else
-		rte_smp_rmb();
 	/*
 	 * If there are other enqueues/dequeues in progress that preceded us,
 	 * we need to wait for them to complete
 	 */
 	if (!single)
-		rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
-			rte_memory_order_relaxed);
+		rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail,
+			old_val, rte_memory_order_relaxed);
 
-	ht->tail = new_val;
+	/*
+	 * Release ordering on the tail store ensures that the slot reads
+	 * (dequeue) or writes (enqueue) performed by this thread are visible
+	 * to the other side before the new tail value is observed.
+	 * Pairs with the acquire load of the counterpart's tail in
+	 * __rte_ring_headtail_move_head().
+	 */
+	rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
 }
 
 /**
@@ -76,25 +80,35 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 {
 	unsigned int max = n;
 	int success;
+	uint32_t tail;
 
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = d->head;
+		/*
+		 * Acquire load: orders this load before the load of s->tail
+		 * below (replaces rte_smp_rmb() in the previous version) and
+		 * re-establishes ordering after a failed CAS on retry.
+		 */
+		*old_head = rte_atomic_load_explicit(&d->head,
+				rte_memory_order_acquire);
 
-		/* add rmb barrier to avoid load/load reorder in weak
-		 * memory model. It is noop on x86
+		/*
+		 * Acquire load on the counterpart's tail pairs with the
+		 * release store in __rte_ring_update_tail() on the other
+		 * side, ensuring slot operations performed there are visible
+		 * before the caller accesses the reserved slots.
 		 */
-		rte_smp_rmb();
+		tail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
 
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
-		 * *old_head > s->tail). So 'entries' is always between 0
+		 * *old_head > tail). So 'entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*entries = (capacity + s->tail - *old_head);
+		*entries = (capacity + tail - *old_head);
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *entries))
@@ -106,12 +120,24 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 
 		*new_head = *old_head + n;
 		if (is_st) {
-			d->head = *new_head;
+			rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
 			success = 1;
-		} else
-			success = rte_atomic32_cmpset(
-					(uint32_t *)(uintptr_t)&d->head,
-					*old_head, *new_head);
+		} else {
+			/*
+			 * Weak CAS: the outer do-while handles spurious
+			 * failures, so we avoid the strong variant's
+			 * internal retry (which on arm64 wraps the LL/SC
+			 * pair in a hidden inner loop).
+			 *
+			 * Relaxed on both success and failure: this CAS
+			 * does not publish data. Slot data visibility is
+			 * provided by the acquire loads above and the
+			 * release store of tail in __rte_ring_update_tail().
+			 */
+			success = rte_atomic_compare_exchange_weak_explicit(
+				&d->head, old_head, *new_head,
+				rte_memory_order_relaxed, rte_memory_order_relaxed);
+		}
 	} while (unlikely(success == 0));
 	return n;
 }
-- 
2.53.0



More information about the dev mailing list