[dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing

Jia He hejianet at gmail.com
Fri Nov 3 02:46:40 CET 2017


Hi Jerin


On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
> -----Original Message-----
>> Date: Thu,  2 Nov 2017 08:43:30 +0000
>> From: Jia He <hejianet at gmail.com>
>> To: jerin.jacob at caviumnetworks.com, dev at dpdk.org, olivier.matz at 6wind.com
>> Cc: konstantin.ananyev at intel.com, bruce.richardson at intel.com,
>>   jianbo.liu at arm.com, hemant.agrawal at nxp.com, Jia He <hejianet at gmail.com>,
>>   jie2.liu at hxt-semitech.com, bing.zhao at hxt-semitech.com,
>>   jia.he at hxt-semitech.com
>> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>> X-Mailer: git-send-email 2.7.4
>>
>> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
>> As for the possible race condition, please refer to [1].
> Hi Jia,
>
> In addition to Konstantin comments. Please find below some review
> comments.
>> Furthermore, there are 2 options as suggested by Jerin:
>> 1. use rte_smp_rmb
>> 2. use load_acquire/store_release(refer to [2]).
>> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
> or something like that.
Ok, but how to distinguish following 2 options?

CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough

1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).

>> default it is n;
>>
>> ---
>> Changelog:
>> V2: let users choose whether using load_acquire/store_release
>> V1: rte_smp_rmb() between 2 loads
>>
>> Signed-off-by: Jia He <hejianet at gmail.com>
>> Signed-off-by: jie2.liu at hxt-semitech.com
>> Signed-off-by: bing.zhao at hxt-semitech.com
>> Signed-off-by: jia.he at hxt-semitech.com
>> Suggested-by: jerin.jacob at caviumnetworks.com
>> ---
>>   lib/librte_ring/Makefile           |  4 +++-
>>   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
>>   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
>>   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>>   4 files changed, 127 insertions(+), 8 deletions(-)
>>   create mode 100644 lib/librte_ring/rte_ring_arm64.h
>>   create mode 100644 lib/librte_ring/rte_ring_generic.h
>>
>> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
>> index e34d9d9..fa57a86 100644
>> --- a/lib/librte_ring/Makefile
>> +++ b/lib/librte_ring/Makefile
>> @@ -45,6 +45,8 @@ LIBABIVER := 1
>>   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>>   
>>   # install includes
>> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
>> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>> +					rte_ring_arm64.h \
> It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
> something like that to reflect the implementation based on c11 memory
> model.
>
>
>> +					rte_ring_generic.h
>>   
>>   include $(RTE_SDK)/mk/rte.lib.mk
>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>> index 5e9b3b7..943b1f9 100644
>> --- a/lib/librte_ring/rte_ring.h
>> +++ b/lib/librte_ring/rte_ring.h
>> @@ -103,6 +103,18 @@ extern "C" {
>>   #include <rte_memzone.h>
>>   #include <rte_pause.h>
>>   
>> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
>> + * between load and load.
>> + * In those weak models(powerpc/arm), there are 2 choices for the users
>> + * 1.use rmb() memory barrier
>> + * 2.use one-direcion load_acquire/store_release barrier
>> + * It depends on performance test results. */
>> +#ifdef RTE_ARCH_ARM64
> s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
> By that way it can used by another architecture like ppc if they choose to do so.
>
>
>> +#include "rte_ring_arm64.h"
>> +#else
>> +#include "rte_ring_generic.h"
>> +#endif
>> +
>>   #define RTE_TAILQ_RING_NAME "RTE_RING"
>>   
>>   enum rte_ring_queue_behavior {
>> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>>   		while (unlikely(ht->tail != old_val))
>>   			rte_pause();
>>   
>> -	ht->tail = new_val;
>> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>>   }
>>   
>>   /**
>> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		/* Reset n to the initial burst count */
>>   		n = max;
>>   
>> -		*old_head = r->prod.head;
>> +		*old_head = arch_rte_atomic_load(&r->prod.head,
>> +						__ATOMIC_ACQUIRE);
> Same as Konstantin comments, i.e move to this function to c11 memory model
> header file
>
>>   		const uint32_t cons_tail = r->cons.tail;
>>   		/*
>>   		 *  The subtraction is done between two unsigned 32bits value
>> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		if (is_sp)
>>   			r->prod.head = *new_head, success = 1;
>>   		else
>> -			success = rte_atomic32_cmpset(&r->prod.head,
>> -					*old_head, *new_head);
>> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
>> +					old_head, *new_head,
>> +					0, __ATOMIC_ACQUIRE,
>> +					__ATOMIC_RELAXED);
>>   	} while (unlikely(success == 0));
>>   	return n;
>>   }
>> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>>   		goto end;
>>   
>>   	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>>   	rte_smp_wmb();
>> +#endif
>>   
>>   	update_tail(&r->prod, prod_head, prod_next, is_sp);
>>   end:
>> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>>   		/* Restore n as it may change every loop */
>>   		n = max;
>>   
>> -		*old_head = r->cons.head;
>> +		*old_head = arch_rte_atomic_load(&r->cons.head,
>> +						__ATOMIC_ACQUIRE);
>>   		const uint32_t prod_tail = r->prod.tail;
>>   		/* The subtraction is done between two unsigned 32bits value
>>   		 * (the result is always modulo 32 bits even if we have
>> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>>   		if (is_sc)
>>   			r->cons.head = *new_head, success = 1;
>>   		else
>> -			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> -					*new_head);
>> +			success = arch_rte_atomic32_cmpset(&r->cons.head,
>> +					old_head, *new_head,
>> +					0, __ATOMIC_ACQUIRE,
>> +					__ATOMIC_RELAXED);
>>   	} while (unlikely(success == 0));
>>   	return n;
>>   }
>> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>>   		goto end;
>>   
>>   	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>>   	rte_smp_rmb();
>> +#endif

-- 
Cheers,
Jia



More information about the dev mailing list