[dpdk-dev] [PATCH v3 6/8] stack: add C11 atomic implementation

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Mar 29 00:27:05 CET 2019


> 
> This commit adds an implementation of the lock-free stack push, pop, and
> length functions that use __atomic builtins, for systems that benefit from the
> finer-grained memory ordering control.
> 
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> ---
>  lib/librte_stack/Makefile            |   3 +-
>  lib/librte_stack/meson.build         |   3 +-
>  lib/librte_stack/rte_stack.h         |   4 +
>  lib/librte_stack/rte_stack_c11_mem.h | 175
> +++++++++++++++++++++++++++++++++++
>  4 files changed, 183 insertions(+), 2 deletions(-)  create mode 100644
> lib/librte_stack/rte_stack_c11_mem.h
> 
> diff --git a/lib/librte_stack/Makefile b/lib/librte_stack/Makefile index
> 3ecddf033..94a7c1476 100644
> --- a/lib/librte_stack/Makefile
> +++ b/lib/librte_stack/Makefile
> @@ -19,6 +19,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c
> 
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \
> -					      rte_stack_generic.h
> +					      rte_stack_generic.h \
> +					      rte_stack_c11_mem.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_stack/meson.build b/lib/librte_stack/meson.build index
> 99d7f9ec5..7e2d1dbb8 100644
> --- a/lib/librte_stack/meson.build
> +++ b/lib/librte_stack/meson.build
> @@ -6,4 +6,5 @@ allow_experimental_apis = true  version = 1  sources =
> files('rte_stack.c')  headers = files('rte_stack.h',
> -		'rte_stack_generic.h')
> +		'rte_stack_generic.h',
> +		'rte_stack_c11_mem.h')
> diff --git a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h index
> b484313bb..de16f8fff 100644
> --- a/lib/librte_stack/rte_stack.h
> +++ b/lib/librte_stack/rte_stack.h
> @@ -91,7 +91,11 @@ struct rte_stack {
>   */
>  #define RTE_STACK_F_LF 0x0001
> 
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include "rte_stack_c11_mem.h"
> +#else
>  #include "rte_stack_generic.h"
> +#endif
> 
>  /**
>   * @internal Push several objects on the lock-free stack (MT-safe).
> diff --git a/lib/librte_stack/rte_stack_c11_mem.h
> b/lib/librte_stack/rte_stack_c11_mem.h
> new file mode 100644
> index 000000000..44f9ece6e
> --- /dev/null
> +++ b/lib/librte_stack/rte_stack_c11_mem.h
> @@ -0,0 +1,175 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _RTE_STACK_C11_MEM_H_
> +#define _RTE_STACK_C11_MEM_H_
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_prefetch.h>
> +
> +static __rte_always_inline unsigned int rte_stack_lf_len(struct
> +rte_stack *s) {
> +	/* stack_lf_push() and stack_lf_pop() do not update the list's
> contents
> +	 * and stack_lf->len atomically, which can cause the list to appear
> +	 * shorter than it actually is if this function is called while other
> +	 * threads are modifying the list.
> +	 *
> +	 * However, given the inherently approximate nature of the
> get_count
> +	 * callback -- even if the list and its size were updated atomically,
> +	 * the size could change between when get_count executes and when
> the
> +	 * value is returned to the caller -- this is acceptable.
> +	 *
> +	 * The stack_lf->len updates are placed such that the list may appear
> to
> +	 * have fewer elements than it does, but will never appear to have
> more
> +	 * elements. If the mempool is near-empty to the point that this is a
> +	 * concern, the user should consider increasing the mempool size.
> +	 */
> +	return (unsigned int)__atomic_load_n(&s->stack_lf.used.len.cnt,
> +					     __ATOMIC_RELAXED);
> +}
> +
> +static __rte_always_inline void
> +__rte_stack_lf_push(struct rte_stack_lf_list *list,
> +		    struct rte_stack_lf_elem *first,
> +		    struct rte_stack_lf_elem *last,
> +		    unsigned int num)
> +{
> +#ifndef RTE_ARCH_X86_64
> +	RTE_SET_USED(first);
> +	RTE_SET_USED(last);
> +	RTE_SET_USED(list);
> +	RTE_SET_USED(num);
> +#else
> +	struct rte_stack_lf_head old_head;
> +	int success;
> +
> +	old_head = list->head;
This can be a torn read (same as you have mentioned in __rte_stack_lf_pop). I suggest we use acquire thread fence here as well (please see the comments in __rte_stack_lf_pop).

> +
> +	do {
> +		struct rte_stack_lf_head new_head;
> +
We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here (please see the comments in __rte_stack_lf_pop).

> +		/* Swing the top pointer to the first element in the list and
> +		 * make the last element point to the old top.
> +		 */
> +		new_head.top = first;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		last->next = old_head.top;
> +
> +		/* Use the release memmodel to ensure the writes to the LF
> LIFO
> +		 * elements are visible before the head pointer write.
> +		 */
> +		success = rte_atomic128_cmp_exchange(
> +				(rte_int128_t *)&list->head,
> +				(rte_int128_t *)&old_head,
> +				(rte_int128_t *)&new_head,
> +				1, __ATOMIC_RELEASE,
> +				__ATOMIC_RELAXED);
Success memory order can be RELAXED as the store to list->len.cnt is RELEASE.

> +	} while (success == 0);
> +
> +	/* Ensure the stack modifications are not reordered with respect
> +	 * to the LIFO len update.
> +	 */
> +	__atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE);
> #endif }
> +
> +static __rte_always_inline struct rte_stack_lf_elem *
> +__rte_stack_lf_pop(struct rte_stack_lf_list *list,
> +		   unsigned int num,
> +		   void **obj_table,
> +		   struct rte_stack_lf_elem **last)
> +{
> +#ifndef RTE_ARCH_X86_64
> +	RTE_SET_USED(obj_table);
> +	RTE_SET_USED(last);
> +	RTE_SET_USED(list);
> +	RTE_SET_USED(num);
> +
> +	return NULL;
> +#else
> +	struct rte_stack_lf_head old_head;
> +	int success;
> +
> +	/* Reserve num elements, if available */
> +	while (1) {
> +		uint64_t len = __atomic_load_n(&list->len.cnt,
> +					       __ATOMIC_ACQUIRE);
This can be outside the loop.

> +
> +		/* Does the list contain enough elements? */
> +		if (unlikely(len < num))
> +			return NULL;
> +
> +		if (__atomic_compare_exchange_n(&list->len.cnt,
> +						&len, len - num,
> +						0, __ATOMIC_RELAXED,
> +						__ATOMIC_RELAXED))
> +			break;
> +	}
> +
> +#ifndef RTE_ARCH_X86_64
> +	/* Use the acquire memmodel to ensure the reads to the LF LIFO
> elements
> +	 * are properly ordered with respect to the head pointer read.
> +	 *
> +	 * Note that for aarch64, GCC's implementation of __atomic_load_16
> in
> +	 * libatomic uses locks, and so this function should be replaced by
> +	 * a new function (e.g. "rte_atomic128_load()").
aarch64 does not have 'pure' atomic 128b load instructions. They have to be implemented using load/store exclusives.

> +	 */
> +	__atomic_load((volatile __int128 *)&list->head,
> +		      &old_head,
> +		      __ATOMIC_ACQUIRE);
Since, we know of x86/aarch64 (power?) that cannot implement pure atomic 128b loads, should we just use relaxed reads and assume the possibility of torn reads for all architectures? Then, we can use acquire fence to prevent the reordering (see below)

> +#else
> +	/* x86-64 does not require an atomic load here; if a torn read occurs,
IMO, we should not make architecture specific distinctions as this algorithm is based on C11 memory model.

> +	 * the CAS will fail and set old_head to the correct/latest value.
> +	 */
> +	old_head = list->head;
> +#endif
> +
> +	/* Pop num elements */
> +	do {
> +		struct rte_stack_lf_head new_head;
> +		struct rte_stack_lf_elem *tmp;
> +		unsigned int i;
> +
We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here.

> +		rte_prefetch0(old_head.top);
> +
> +		tmp = old_head.top;
> +
> +		/* Traverse the list to find the new head. A next pointer will
> +		 * either point to another element or NULL; if a thread
> +		 * encounters a pointer that has already been popped, the
> CAS
> +		 * will fail.
> +		 */
> +		for (i = 0; i < num && tmp != NULL; i++) {
> +			rte_prefetch0(tmp->next);
> +			if (obj_table)
> +				obj_table[i] = tmp->data;
> +			if (last)
> +				*last = tmp;
> +			tmp = tmp->next;
> +		}
> +
> +		/* If NULL was encountered, the list was modified while
> +		 * traversing it. Retry.
> +		 */
> +		if (i != num)
> +			continue;
> +
> +		new_head.top = tmp;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		success = rte_atomic128_cmp_exchange(
> +				(rte_int128_t *)&list->head,
> +				(rte_int128_t *)&old_head,
> +				(rte_int128_t *)&new_head,
> +				1, __ATOMIC_ACQUIRE,
> +				__ATOMIC_ACQUIRE);
The success order should be __ATOMIC_RELEASE as the write to list->len.cnt is relaxed.
The failure order can be __ATOMIC_RELAXED if the thread fence is added.

> +	} while (success == 0);
> +
> +	return old_head.top;
> +#endif
> +}
> +
> +#endif /* _RTE_STACK_C11_MEM_H_ */
> --
> 2.13.6



More information about the dev mailing list