[dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation
Ola Liljedahl
Ola.Liljedahl at arm.com
Tue Jan 22 15:49:39 CET 2019
(resending without the confidential footer, think I figured it out, ignore the
previous email from me in this thread)
-- Ola
On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote:
> This commit adds support for non-blocking circular ring enqueue and dequeue
> functions. The ring uses a 128-bit compare-and-swap instruction, and thus
> is currently limited to x86_64.
>
> The algorithm is based on the original rte ring (derived from FreeBSD's
> bufring.h) and inspired by Michael and Scott's non-blocking concurrent
> queue. Importantly, it adds a modification counter to each ring entry to
> ensure only one thread can write to an unused entry.
>
> -----
> Algorithm:
>
> Multi-producer non-blocking enqueue:
> 1. Move the producer head index 'n' locations forward, effectively
> reserving 'n' locations.
> 2. For each pointer:
> a. Read the producer tail index, then ring[tail]. If ring[tail]'s
> modification counter isn't 'tail', retry.
> b. Construct the new entry: {pointer, tail + ring size}
> c. Compare-and-swap the old entry with the new. If unsuccessful, the
> next loop iteration will try to enqueue this pointer again.
> d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c
> succeeded. This guarantees threads can make forward progress.
>
> Multi-consumer non-blocking dequeue:
> 1. Move the consumer head index 'n' locations forward, effectively
> reserving 'n' pointers to be dequeued.
> 2. Copy 'n' pointers into the caller's object table (ignoring the
> modification counter), starting from ring[tail], then compare-and-swap
> the tail index with 'tail + n'. If unsuccessful, repeat step 2.
>
> -----
> Discussion:
>
> There are two cases where the ABA problem is mitigated:
> 1. Enqueueing a pointer to the ring: without a modification counter
> tied to the tail index, the index could become stale by the time the
> enqueue happens, causing it to overwrite valid data. Tying the
> counter to the tail index gives us an expected value (as opposed to,
> say, a monotonically incrementing counter).
>
> Since the counter will eventually wrap, there is potential for the ABA
> problem. However, using a 64-bit counter makes this likelihood
> effectively zero.
>
> 2. Updating a tail index: the ABA problem can occur if the thread is
> preempted and the tail index wraps around. However, using 64-bit indexes
> makes this likelihood effectively zero.
>
> With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations
> and a dequeue of n pointers uses 2. This algorithm has worse average-case
> performance than the regular rte ring (particularly a highly-contended ring
> with large bulk accesses), however:
> - For applications with preemptible pthreads, the regular rte ring's
> worst-case performance (i.e. one thread being preempted in the
> update_tail() critical section) is much worse than the non-blocking
> ring's.
> - Software caching can mitigate the average case performance for ring-based
> algorithms. For example, a non-blocking ring based mempool (a likely use
> case for this ring) with per-thread caching.
>
> The non-blocking ring is enabled via a new flag, RING_F_NB. Because the
> ring's memsize is now a function of its flags (the non-blocking ring
> requires 128b for each entry), this commit adds a new argument ('flags') to
> rte_ring_get_memsize(). An API deprecation notice will be sent in a
> separate commit.
>
> For ease-of-use, existing ring enqueue and dequeue functions work on both
> regular and non-blocking rings. This introduces an additional branch in
> the datapath, but this should be a highly predictable branch.
> ring_perf_autotest shows a negligible performance impact; it's hard to
> distinguish a real difference versus system noise.
>
> | ring_perf_autotest cycles with branch -
> Test | ring_perf_autotest cycles without
> ------------------------------------------------------------------
> SP/SC single enq/dequeue | 0.33
> MP/MC single enq/dequeue | -4.00
> SP/SC burst enq/dequeue (size 8) | 0.00
> MP/MC burst enq/dequeue (size 8) | 0.00
> SP/SC burst enq/dequeue (size 32) | 0.00
> MP/MC burst enq/dequeue (size 32) | 0.00
> SC empty dequeue | 1.00
> MC empty dequeue | 0.00
>
> Single lcore:
> SP/SC bulk enq/dequeue (size 8) | 0.49
> MP/MC bulk enq/dequeue (size 8) | 0.08
> SP/SC bulk enq/dequeue (size 32) | 0.07
> MP/MC bulk enq/dequeue (size 32) | 0.09
>
> Two physical cores:
> SP/SC bulk enq/dequeue (size 8) | 0.19
> MP/MC bulk enq/dequeue (size 8) | -0.37
> SP/SC bulk enq/dequeue (size 32) | 0.09
> MP/MC bulk enq/dequeue (size 32) | -0.05
>
> Two NUMA nodes:
> SP/SC bulk enq/dequeue (size 8) | -1.96
> MP/MC bulk enq/dequeue (size 8) | 0.88
> SP/SC bulk enq/dequeue (size 32) | 0.10
> MP/MC bulk enq/dequeue (size 32) | 0.46
>
> Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
> running on isolcpus cores with a tickless scheduler. Each test run three
> times and the results averaged.
>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> ---
> lib/librte_ring/rte_ring.c | 72 ++++-
> lib/librte_ring/rte_ring.h | 550 +++++++++++++++++++++++++++++++++-
> -
> lib/librte_ring/rte_ring_version.map | 7 +
> 3 files changed, 587 insertions(+), 42 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index d215acecc..f3378dccd 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>
> /* return the size of memory occupied by a ring */
> ssize_t
> -rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
> {
> - ssize_t sz;
> + ssize_t sz, elt_sz;
>
> /* count must be a power of 2 */
> if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
> return -EINVAL;
> }
>
> - sz = sizeof(struct rte_ring) + count * sizeof(void *);
> + elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
> +
> + sz = sizeof(struct rte_ring) + count * elt_sz;
> sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> return sz;
> }
> +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
> +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
> + unsigned int flags),
> + rte_ring_get_memsize_v1905);
> +
> +ssize_t
> +rte_ring_get_memsize_v20(unsigned int count)
> +{
> + return rte_ring_get_memsize_v1905(count, 0);
> +}
> +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
>
> int
> rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned
> count,
> if (ret < 0 || ret >= (int)sizeof(r->name))
> return -ENAMETOOLONG;
> r->flags = flags;
> - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
>
> if (flags & RING_F_EXACT_SZ) {
> r->size = rte_align32pow2(count + 1);
> @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
> r->mask = count - 1;
> r->capacity = r->mask;
> }
> - r->prod.head = r->cons.head = 0;
> - r->prod.tail = r->cons.tail = 0;
> +
> + if (flags & RING_F_NB) {
> + uint64_t i;
> +
> + r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> + r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
> + r->prod_64.head = r->cons_64.head = 0;
> + r->prod_64.tail = r->cons_64.tail = 0;
> +
> + for (i = 0; i < r->size; i++) {
> + struct nb_ring_entry *ring_ptr, *base;
> +
> + base = ((struct nb_ring_entry *)&r[1]);
> +
> + ring_ptr = &base[i & r->mask];
> +
> + ring_ptr->cnt = i;
> + }
> + } else {
> + r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> + r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> + r->prod.head = r->cons.head = 0;
> + r->prod.tail = r->cons.tail = 0;
> + }
>
> return 0;
> }
> @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned count, int
> socket_id,
>
> ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
>
> +#if !defined(RTE_ARCH_X86_64)
> + if (flags & RING_F_NB) {
> + printf("RING_F_NB is only supported on x86-64 platforms\n");
> + rte_errno = EINVAL;
> + return NULL;
> + }
> +#endif
> +
> /* for an exact size ring, round up from count to a power of two */
> if (flags & RING_F_EXACT_SZ)
> count = rte_align32pow2(count + 1);
>
> - ring_size = rte_ring_get_memsize(count);
> + ring_size = rte_ring_get_memsize(count, flags);
> if (ring_size < 0) {
> rte_errno = ring_size;
> return NULL;
> @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
> fprintf(f, " flags=%x\n", r->flags);
> fprintf(f, " size=%"PRIu32"\n", r->size);
> fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
> - fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
> - fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
> - fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
> - fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
> + if (r->flags & RING_F_NB) {
> + fprintf(f, " ct=%"PRIu64"\n", r->cons_64.tail);
> + fprintf(f, " ch=%"PRIu64"\n", r->cons_64.head);
> + fprintf(f, " pt=%"PRIu64"\n", r->prod_64.tail);
> + fprintf(f, " ph=%"PRIu64"\n", r->prod_64.head);
> + } else {
> + fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
> + fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
> + fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
> + fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
> + }
> fprintf(f, " used=%u\n", rte_ring_count(r));
> fprintf(f, " avail=%u\n", rte_ring_free_count(r));
> }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index b270a4746..08c9de6a6 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -134,6 +134,18 @@ struct rte_ring {
> */
> #define RING_F_EXACT_SZ 0x0004
> #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
> +/**
> + * The ring uses non-blocking enqueue and dequeue functions. These functions
> + * do not have the "non-preemptive" constraint of a regular rte ring, and
> thus
> + * are suited for applications using preemptible pthreads. However, the
> + * non-blocking functions have worse average-case performance than their
> + * regular rte ring counterparts. When used as the handler for a mempool,
> + * per-thread caching can mitigate the performance difference by reducing the
> + * number (and contention) of ring accesses.
> + *
> + * This flag is only supported on x86_64 platforms.
> + */
> +#define RING_F_NB 0x0008
>
> /* @internal defines for passing to the enqueue dequeue worker functions */
> #define __IS_SP 1
> @@ -151,11 +163,15 @@ struct rte_ring {
> *
> * @param count
> * The number of elements in the ring (must be a power of 2).
> + * @param flags
> + * The flags the ring will be created with.
> * @return
> * - The memory size needed for the ring on success.
> * - -EINVAL if count is not a power of 2.
> */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
> +ssize_t rte_ring_get_memsize_v20(unsigned int count);
> +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
>
> /**
> * Initialize a ring structure.
> @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
> * - RING_F_SC_DEQ: If this flag is set, the default behavior when
> * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> * is "single-consumer". Otherwise, it is "multi-consumers".
> + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + * number, but up to half the ring space may be wasted.
> + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + * non-blocking variants of the dequeue and enqueue functions.
> * @return
> * 0 on success, or a negative value on error.
> */
> @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
> * - RING_F_SC_DEQ: If this flag is set, the default behavior when
> * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> * is "single-consumer". Otherwise, it is "multi-consumers".
> + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + * number, but up to half the ring space may be wasted.
> + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + * non-blocking variants of the dequeue and enqueue functions.
> * @return
> * On success, the pointer to the new allocated ring. NULL on error with
> * rte_errno set appropriately. Possible errno values include:
> * - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> * - E_RTE_SECONDARY - function was called from a secondary process
> instance
> - * - EINVAL - count provided is not a power of 2
> + * - EINVAL - count provided is not a power of 2, or RING_F_NB is used on
> an
> + * unsupported platform
> * - ENOSPC - the maximum number of memzones has already been allocated
> * - EEXIST - a memzone with the same name already exists
> * - ENOMEM - no appropriate memory area found in which to create memzone
> @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
> } \
> } while (0)
>
> +/* The actual enqueue of pointers on the ring.
> + * Used only by the single-producer non-blocking enqueue function, but
> + * out-lined here for code readability.
> + */
> +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
> + unsigned int i; \
> + const uint32_t size = (r)->size; \
> + size_t idx = prod_head & (r)->mask; \
> + size_t new_cnt = prod_head + size; \
> + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> + if (likely(idx + n < size)) { \
> + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) {
> \
> + ring[idx].ptr = obj_table[i]; \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx + 1].ptr = obj_table[i + 1]; \
> + ring[idx + 1].cnt = new_cnt + i + 1; \
> + ring[idx + 2].ptr = obj_table[i + 2]; \
> + ring[idx + 2].cnt = new_cnt + i + 2; \
> + ring[idx + 3].ptr = obj_table[i + 3]; \
> + ring[idx + 3].cnt = new_cnt + i + 3; \
> + } \
> + switch (n & 0x3) { \
> + case 3: \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> + case 2: \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> + case 1: \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx++].ptr = obj_table[i++]; \
> + } \
> + } else { \
> + for (i = 0; idx < size; i++, idx++) { \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx].ptr = obj_table[i]; \
> + } \
> + for (idx = 0; i < n; i++, idx++) { \
> + ring[idx].cnt = new_cnt + i; \
> + ring[idx].ptr = obj_table[i]; \
> + } \
> + } \
> +} while (0)
> +
> /* the actual copy of pointers on the ring to obj_table.
> * Placed here since identical code needed in both
> * single and multi consumer dequeue functions */
> @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
> } \
> } while (0)
>
> +/* The actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer non-blocking dequeue functions.
> + */
> +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
> + unsigned int i; \
> + size_t idx = cons_head & (r)->mask; \
> + const uint32_t size = (r)->size; \
> + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> + if (likely(idx + n < size)) { \
> + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> + obj_table[i] = ring[idx].ptr; \
> + obj_table[i + 1] = ring[idx + 1].ptr; \
> + obj_table[i + 2] = ring[idx + 2].ptr; \
> + obj_table[i + 3] = ring[idx + 3].ptr; \
> + } \
> + switch (n & 0x3) { \
> + case 3: \
> + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> + case 2: \
> + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> + case 1: \
> + obj_table[i++] = ring[idx++].ptr; \
> + } \
> + } else { \
> + for (i = 0; idx < size; i++, idx++) \
> + obj_table[i] = ring[idx].ptr; \
> + for (idx = 0; i < n; i++, idx++) \
> + obj_table[i] = ring[idx].ptr; \
> + } \
> +} while (0)
> +
> +
> /* Between load and load. there might be cpu reorder in weak model
> * (powerpc/arm).
> * There are 2 choices for the users
> @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
> #endif
> #include "rte_ring_generic_64.h"
>
> +/* @internal 128-bit structure used by the non-blocking ring */
> +struct nb_ring_entry {
> + void *ptr; /**< Data pointer */
> + uint64_t cnt; /**< Modification counter */
Why not make 'cnt' uintptr_t? This way 32-bit architectures will also
be supported. I think there are some claims that DPDK still supports e.g. ARMv7a
and possibly also 32-bit x86?
> +};
> +
> +/* The non-blocking ring algorithm is based on the original rte ring (derived
> + * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
> + * concurrent queue.
> + */
> +
> +/**
> + * @internal
> + * Enqueue several objects on the non-blocking ring (single-producer only)
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + * returns the amount of space after the enqueue operation has finished
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + unsigned int *free_space)
> +{
> + uint32_t free_entries;
> + size_t head, next;
> +
> + n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> + &head, &next, &free_entries);
> + if (n == 0)
> + goto end;
> +
> + ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> + r->prod_64.tail += n;
Don't we need release order when (or smp_wmb between) writing of the ring
pointers and the update of tail? By updating the tail pointer, we are
synchronising with a consumer.
I prefer using __atomic operations even for load and store. You can see which
parts of the code that synchronise with each other, e.g. store-release to some
location synchronises with load-acquire from the same location. If you don't
know how different threads synchronise with each other, you are very likely to
make mistakes.
> +
> +end:
> + if (free_space != NULL)
> + *free_space = free_entries - n;
> + return n;
> +}
> +
> +/**
> + * @internal
> + * Enqueue several objects on the non-blocking ring (multi-producer safe)
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + * returns the amount of space after the enqueue operation has finished
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + unsigned int *free_space)
> +{
> +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> + RTE_SET_USED(r);
> + RTE_SET_USED(obj_table);
> + RTE_SET_USED(n);
> + RTE_SET_USED(behavior);
> + RTE_SET_USED(free_space);
> +#ifndef ALLOW_EXPERIMENTAL_API
> + printf("[%s()] RING_F_NB requires an experimental API."
> + " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> + , __func__);
> +#endif
> + return 0;
> +#endif
> +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> + size_t head, next, tail;
> + uint32_t free_entries;
> + unsigned int i;
> +
> + n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> + &head, &next, &free_entries);
> + if (n == 0)
> + goto end;
> +
> + for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> + struct nb_ring_entry old_value, new_value;
> + struct nb_ring_entry *ring_ptr;
> +
> + /* Enqueue to the tail entry. If another thread wins the
> race,
> + * retry with the new tail.
> + */
> + tail = r->prod_64.tail;
> +
> + ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
This is an ugly expression and cast. Also I think it is unnecessary. What's
preventing this from being written without a cast? Perhaps the ring array needs
to be a union of "void *" and struct nb_ring_entry?
> +
> + old_value = *ring_ptr;
> +
> + /* If the tail entry's modification counter doesn't match the
> + * producer tail index, it's already been updated.
> + */
> + if (old_value.cnt != tail)
> + continue;
Continue restarts the loop at the condition test in the for statement,
'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and
'ring[tail & mask]'. If some other thread never updates 'prod_64.tail', the
test here (ring[tail].cnt != tail) will still be true and we will spin
forever.
Waiting for other threads <=> blocking behaviour so this is not a non-
blocking design.
> +
> + /* Prepare the new entry. The cnt field mitigates the ABA
> + * problem on the ring write.
> + */
> + new_value.ptr = obj_table[i];
> + new_value.cnt = tail + r->size;
> +
> + if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
> + (rte_int128_t *)&old_value,
> + (rte_int128_t *)&new_value))
> + i++;
> +
> + /* Every thread attempts the cmpset, so they don't have to
> wait
> + * for the thread that successfully enqueued to the ring.
> + * Using a 64-bit tail mitigates the ABA problem here.
> + *
> + * Built-in used to handle variable-sized tail index.
> + */
But prod_64.tail is 64 bits so not really variable size?
> + __sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail +
> 1);
What memory order is required here? Why not use
__atomic_compare_exchange() with explicit memory order parameters?
> + }
> +
> +end:
> + if (free_space != NULL)
> + *free_space = free_entries - n;
> + return n;
> +#endif
> +}
> +
> +/**
> + * @internal Enqueue several objects on the non-blocking ring
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
> + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param is_sp
> + * Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + * returns the amount of space after the enqueue operation has finished
> + * @return
> + * Actual number of objects enqueued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> + unsigned int n, enum rte_ring_queue_behavior
> behavior,
> + unsigned int is_sp, unsigned int *free_space)
> +{
> + if (is_sp)
> + return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> + behavior, free_space);
> + else
> + return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> + behavior, free_space);
> +}
> +
> +/**
> + * @internal
> + * Dequeue several objects from the non-blocking ring (single-consumer
> only)
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to pull from the ring.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + * returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + unsigned int *available)
> +{
> + size_t head, next;
> + uint32_t entries;
> +
> + n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> + &head, &next, &entries);
> + if (n == 0)
> + goto end;
> +
> + DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> + r->cons_64.tail += n;
Memory ordering? Consumer synchronises with producer.
> +
> +end:
> + if (available != NULL)
> + *available = entries - n;
> + return n;
> +}
> +
> +/**
> + * @internal
> + * Dequeue several objects from the non-blocking ring (multi-consumer safe)
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to pull from the ring.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + * returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + unsigned int *available)
> +{
> + size_t head, next;
> + uint32_t entries;
> +
> + n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
> + &head, &next, &entries);
> + if (n == 0)
> + goto end;
> +
> + while (1) {
> + size_t tail = r->cons_64.tail;
> +
> + /* Dequeue from the cons tail onwards. If multiple threads
> read
> + * the same pointers, the thread that successfully performs
> the
> + * CAS will keep them and the other(s) will retry.
> + */
> + DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
> +
> + next = tail + n;
> +
> + /* Built-in used to handle variable-sized tail index. */
> + if (__sync_bool_compare_and_swap(&r->cons_64.tail, tail,
> next))
> + /* There is potential for the ABA problem here, but
> + * that is mitigated by the large (64-bit) tail.
> + */
> + break;
> + }
> +
> +end:
> + if (available != NULL)
> + *available = entries - n;
> + return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the non-blocking ring
> + *
> + * @param r
> + * A pointer to the ring structure.
> + * @param obj_table
> + * A pointer to a table of void * pointers (objects).
> + * @param n
> + * The number of objects to pull from the ring.
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
> + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + * returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + * - Actual number of objects dequeued.
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> + unsigned int is_sc, unsigned int *available)
> +{
> + if (is_sc)
> + return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
> + behavior, available);
> + else
> + return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
> + behavior, available);
> +}
> +
> /**
> * @internal Enqueue several objects on the ring
> *
> @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int
> rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - __IS_MP, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> __IS_MP,
> + free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED, __IS_MP,
> + free_space);
> }
>
> /**
> @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int
> rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - __IS_SP, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> __IS_SP,
> + free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED, __IS_SP,
> + free_space);
> }
>
> /**
> @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int
> rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - r->prod.single, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> + r->prod_64.single,
> free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> + r->prod.single, free_space);
> }
>
> /**
> @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int
> rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - __IS_MC, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> __IS_MC,
> + available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED, __IS_MC,
> + available);
> }
>
> /**
> @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int
> rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - __IS_SC, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> __IS_SC,
> + available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED, __IS_SC,
> + available);
> }
>
> /**
> @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int
> rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
> unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> - r->cons.single, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> + r->cons_64.single,
> available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_FIXED,
> + r->cons.single, available);
> }
>
> /**
> @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
> static inline unsigned
> rte_ring_count(const struct rte_ring *r)
> {
> - uint32_t prod_tail = r->prod.tail;
> - uint32_t cons_tail = r->cons.tail;
> - uint32_t count = (prod_tail - cons_tail) & r->mask;
> + uint32_t count;
> +
> + if (r->flags & RING_F_NB)
> + count = (r->prod_64.tail - r->cons_64.tail) & r->mask;
> + else
> + count = (r->prod.tail - r->cons.tail) & r->mask;
> +
> return (count > r->capacity) ? r->capacity : count;
> }
>
> @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned
> rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_MP, free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_MP, free_space);
> }
>
> /**
> @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned
> rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_SP, free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_SP, free_space);
> }
>
> /**
> @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned
> rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> unsigned int n, unsigned int *free_space)
> {
> - return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> - r->prod.single, free_space);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + r->prod_64.single,
> free_space);
> + else
> + return __rte_ring_do_enqueue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + r->prod.single, free_space);
> }
>
> /**
> @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned
> rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_MC, available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_MC, available);
> }
>
> /**
> @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned
> rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_SC, available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + __IS_SC, available);
> }
>
> /**
> @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned
> rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *available)
> {
> - return __rte_ring_do_dequeue(r, obj_table, n,
> - RTE_RING_QUEUE_VARIABLE,
> - r->cons.single, available);
> + if (r->flags & RING_F_NB)
> + return __rte_ring_do_nb_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + r->cons_64.single,
> available);
> + else
> + return __rte_ring_do_dequeue(r, obj_table, n,
> + RTE_RING_QUEUE_VARIABLE,
> + r->cons.single, available);
> }
>
> #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index d935efd0d..8969467af 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -17,3 +17,10 @@ DPDK_2.2 {
> rte_ring_free;
>
> } DPDK_2.0;
> +
> +DPDK_19.05 {
> + global:
> +
> + rte_ring_get_memsize;
> +
> +} DPDK_2.2;
--
Ola Liljedahl, Networking System Architect, Arm
Phone +46706866373, Skype ola.liljedahl
More information about the dev
mailing list