[dpdk-dev] [PATCH v3 6/8] stack: add C11 atomic implementation
Eads, Gage
gage.eads at intel.com
Fri Mar 29 20:24:45 CET 2019
[snip]
> > +static __rte_always_inline void
> > +__rte_stack_lf_push(struct rte_stack_lf_list *list,
> > + struct rte_stack_lf_elem *first,
> > + struct rte_stack_lf_elem *last,
> > + unsigned int num)
> > +{
> > +#ifndef RTE_ARCH_X86_64
> > + RTE_SET_USED(first);
> > + RTE_SET_USED(last);
> > + RTE_SET_USED(list);
> > + RTE_SET_USED(num);
> > +#else
> > + struct rte_stack_lf_head old_head;
> > + int success;
> > +
> > + old_head = list->head;
> This can be a torn read (same as you have mentioned in
> __rte_stack_lf_pop). I suggest we use acquire thread fence here as well
> (please see the comments in __rte_stack_lf_pop).
Agreed. I'll add the acquire fence.
> > +
> > + do {
> > + struct rte_stack_lf_head new_head;
> > +
> We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here (please see
> the comments in __rte_stack_lf_pop).
Will add the fence here.
> > + /* Swing the top pointer to the first element in the list and
> > + * make the last element point to the old top.
> > + */
> > + new_head.top = first;
> > + new_head.cnt = old_head.cnt + 1;
> > +
> > + last->next = old_head.top;
> > +
> > + /* Use the release memmodel to ensure the writes to the LF
> > LIFO
> > + * elements are visible before the head pointer write.
> > + */
> > + success = rte_atomic128_cmp_exchange(
> > + (rte_int128_t *)&list->head,
> > + (rte_int128_t *)&old_head,
> > + (rte_int128_t *)&new_head,
> > + 1, __ATOMIC_RELEASE,
> > + __ATOMIC_RELAXED);
> Success memory order can be RELAXED as the store to list->len.cnt is
> RELEASE.
The RELEASE success order here ensures that the store to 'last->next' is visible before the head update. The RELEASE in the list->len.cnt store only guarantees that the preceding stores are visible before list->len.cnt's store, but doesn't guarantee any ordering between the 'last->next' store and the head update, so we can't rely on that.
> > + } while (success == 0);
> > +
> > + /* Ensure the stack modifications are not reordered with respect
> > + * to the LIFO len update.
> > + */
> > + __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE);
> > #endif }
> > +
> > +static __rte_always_inline struct rte_stack_lf_elem *
> > +__rte_stack_lf_pop(struct rte_stack_lf_list *list,
> > + unsigned int num,
> > + void **obj_table,
> > + struct rte_stack_lf_elem **last) { #ifndef
> RTE_ARCH_X86_64
> > + RTE_SET_USED(obj_table);
> > + RTE_SET_USED(last);
> > + RTE_SET_USED(list);
> > + RTE_SET_USED(num);
> > +
> > + return NULL;
> > +#else
> > + struct rte_stack_lf_head old_head;
> > + int success;
> > +
> > + /* Reserve num elements, if available */
> > + while (1) {
> > + uint64_t len = __atomic_load_n(&list->len.cnt,
> > + __ATOMIC_ACQUIRE);
> This can be outside the loop.
Good idea. I'll move this out of the loop and change the atomic_compare_exchange_n's failure memory order to ACQUIRE.
> > +
> > + /* Does the list contain enough elements? */
> > + if (unlikely(len < num))
> > + return NULL;
> > +
> > + if (__atomic_compare_exchange_n(&list->len.cnt,
> > + &len, len - num,
> > + 0, __ATOMIC_RELAXED,
> > + __ATOMIC_RELAXED))
> > + break;
> > + }
> > +
> > +#ifndef RTE_ARCH_X86_64
> > + /* Use the acquire memmodel to ensure the reads to the LF LIFO
> > elements
> > + * are properly ordered with respect to the head pointer read.
> > + *
> > + * Note that for aarch64, GCC's implementation of __atomic_load_16
> > in
> > + * libatomic uses locks, and so this function should be replaced by
> > + * a new function (e.g. "rte_atomic128_load()").
> aarch64 does not have 'pure' atomic 128b load instructions. They have to be
> implemented using load/store exclusives.
>
> > + */
> > + __atomic_load((volatile __int128 *)&list->head,
> > + &old_head,
> > + __ATOMIC_ACQUIRE);
> Since, we know of x86/aarch64 (power?) that cannot implement pure atomic
> 128b loads, should we just use relaxed reads and assume the possibility of
> torn reads for all architectures? Then, we can use acquire fence to prevent
> the reordering (see below)
That's a cleaner solution. I'll implement that, dropping the architecture distinction.
> > +#else
> > + /* x86-64 does not require an atomic load here; if a torn read
> > +occurs,
> IMO, we should not make architecture specific distinctions as this algorithm is
> based on C11 memory model.
>
> > + * the CAS will fail and set old_head to the correct/latest value.
> > + */
> > + old_head = list->head;
> > +#endif
> > +
> > + /* Pop num elements */
> > + do {
> > + struct rte_stack_lf_head new_head;
> > + struct rte_stack_lf_elem *tmp;
> > + unsigned int i;
> > +
> We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here.
Will do.
> > + rte_prefetch0(old_head.top);
> > +
> > + tmp = old_head.top;
> > +
> > + /* Traverse the list to find the new head. A next pointer will
> > + * either point to another element or NULL; if a thread
> > + * encounters a pointer that has already been popped, the
> > CAS
> > + * will fail.
> > + */
> > + for (i = 0; i < num && tmp != NULL; i++) {
> > + rte_prefetch0(tmp->next);
> > + if (obj_table)
> > + obj_table[i] = tmp->data;
> > + if (last)
> > + *last = tmp;
> > + tmp = tmp->next;
> > + }
> > +
> > + /* If NULL was encountered, the list was modified while
> > + * traversing it. Retry.
> > + */
> > + if (i != num)
> > + continue;
> > +
> > + new_head.top = tmp;
> > + new_head.cnt = old_head.cnt + 1;
> > +
> > + success = rte_atomic128_cmp_exchange(
> > + (rte_int128_t *)&list->head,
> > + (rte_int128_t *)&old_head,
> > + (rte_int128_t *)&new_head,
> > + 1, __ATOMIC_ACQUIRE,
> > + __ATOMIC_ACQUIRE);
> The success order should be __ATOMIC_RELEASE as the write to list->len.cnt
> is relaxed.
> The failure order can be __ATOMIC_RELAXED if the thread fence is added.
Agreed on both counts. Will address.
> > + } while (success == 0);
> > +
> > + return old_head.top;
> > +#endif
> > +}
> > +
> > +#endif /* _RTE_STACK_C11_MEM_H_ */
> > --
> > 2.13.6
More information about the dev
mailing list