[dpdk-dev] [PATCH] ring: fix c11 memory ordering issue

Gavin Hu gavin.hu at arm.com
Mon Aug 6 03:18:05 CEST 2018


1) In update_tail, read ht->tail using __atomic_load.
2) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
   the do {} while loop as upon failure the old_head will be updated,
   another load is not necessary.
3) Synchronize the load-acquires of prod.tail and cons.tail with store-
   releases of update_tail which releases all ring updates up to the
   value of ht->tail.
4) When calling __atomic_compare_exchange_n, relaxed ordering for both
   success and failure, as multiple threads can work independently on
   the same end of the ring (either enqueue or dequeue) without
   synchronization, not as operating on tail, which has to be finished
   in sequence.

Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
Cc: stable at dpdk.org

Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli at arm.com>
Reviewed-by: Steve Capper <steve.capper at arm.com>
Reviewed-by: Ola Liljedahl <Ola.Liljedahl at arm.com>
Signed-off-by: Gavin Hu <gavin.hu at arm.com>
---
 lib/librte_ring/rte_ring_c11_mem.h | 38 +++++++++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 94df3c4a6..cfa3be4a7 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 	 * we need to wait for them to complete
 	 */
 	if (!single)
-		while (unlikely(ht->tail != old_val))
+		while (unlikely(old_val != __atomic_load_n(&ht->tail,
+						__ATOMIC_RELAXED)))
 			rte_pause();
 
 	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
@@ -60,20 +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 	unsigned int max = n;
 	int success;
 
+	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = __atomic_load_n(&r->prod.head,
-					__ATOMIC_ACQUIRE);
 
-		/*
-		 *  The subtraction is done between two unsigned 32bits value
+		/* load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
+							__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + r->cons.tail - *old_head);
+		*free_entries = (capacity + cons_tail - *old_head);
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head is updated */
 			success = __atomic_compare_exchange_n(&r->prod.head,
 					old_head, *new_head,
-					0, __ATOMIC_ACQUIRE,
+					/*weak=*/0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
@@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 	int success;
 
 	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
 	do {
 		/* Restore n as it may change every loop */
 		n = max;
-		*old_head = __atomic_load_n(&r->cons.head,
-					__ATOMIC_ACQUIRE);
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
+							__ATOMIC_ACQUIRE);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = (r->prod.tail - *old_head);
+		*entries = (prod_tail - *old_head);
 
 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head will be updated */
 			success = __atomic_compare_exchange_n(&r->cons.head,
-							old_head, *new_head,
-							0, __ATOMIC_ACQUIRE,
-							__ATOMIC_RELAXED);
+						old_head, *new_head,
+						/*weak=*/0, __ATOMIC_RELAXED,
+						__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
-- 
2.11.0



More information about the dev mailing list