[dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation

Ola Liljedahl Ola.Liljedahl at arm.com
Tue Jan 22 11:12:36 CET 2019


On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote:
> This commit adds support for non-blocking circular ring enqueue and
> dequeue
> functions. The ring uses a 128-bit compare-and-swap instruction, and
> thus
> is currently limited to x86_64.
>
> The algorithm is based on the original rte ring (derived from
> FreeBSD's
> bufring.h) and inspired by Michael and Scott's non-blocking
> concurrent
> queue. Importantly, it adds a modification counter to each ring entry
> to
> ensure only one thread can write to an unused entry.
> -----
> Algorithm:
>
> Multi-producer non-blocking enqueue:
> 1. Move the producer head index 'n' locations forward, effectively
>    reserving 'n' locations.
> 2. For each pointer:
>  a. Read the producer tail index, then ring[tail]. If ring[tail]'s
>     modification counter isn't 'tail', retry.
>  b. Construct the new entry: {pointer, tail + ring size}
>  c. Compare-and-swap the old entry with the new. If unsuccessful, the
>     next loop iteration will try to enqueue this pointer again.
>  d. Compare-and-swap the tail index with 'tail + 1', whether or not
> step 2c
>     succeeded. This guarantees threads can make forward progress.
>
> Multi-consumer non-blocking dequeue:
> 1. Move the consumer head index 'n' locations forward, effectively
>    reserving 'n' pointers to be dequeued.
> 2. Copy 'n' pointers into the caller's object table (ignoring the
>    modification counter), starting from ring[tail], then compare-and-
> swap
>    the tail index with 'tail + n'.  If unsuccessful, repeat step 2.
>
> -----
> Discussion:
>
> There are two cases where the ABA problem is mitigated:
> 1. Enqueueing a pointer to the ring: without a modification counter
>    tied to the tail index, the index could become stale by the time
> the
>    enqueue happens, causing it to overwrite valid data. Tying the
>    counter to the tail index gives us an expected value (as opposed
> to,
>    say, a monotonically incrementing counter).
>
>    Since the counter will eventually wrap, there is potential for the
> ABA
>    problem. However, using a 64-bit counter makes this likelihood
>    effectively zero.
>
> 2. Updating a tail index: the ABA problem can occur if the thread is
>    preempted and the tail index wraps around. However, using 64-bit
> indexes
>    makes this likelihood effectively zero.
>
> With no contention, an enqueue of n pointers uses (1 + 2n) CAS
> operations
> and a dequeue of n pointers uses 2. This algorithm has worse average-
> case
> performance than the regular rte ring (particularly a highly-
> contended ring
> with large bulk accesses), however:
> - For applications with preemptible pthreads, the regular rte ring's
>   worst-case performance (i.e. one thread being preempted in the
>   update_tail() critical section) is much worse than the non-blocking
>   ring's.
> - Software caching can mitigate the average case performance for
> ring-based
>   algorithms. For example, a non-blocking ring based mempool (a
> likely use
>   case for this ring) with per-thread caching.
>
> The non-blocking ring is enabled via a new flag, RING_F_NB. Because
> the
> ring's memsize is now a function of its flags (the non-blocking ring
> requires 128b for each entry), this commit adds a new argument
> ('flags') to
> rte_ring_get_memsize(). An API deprecation notice will be sent in a
> separate commit.
>
> For ease-of-use, existing ring enqueue and dequeue functions work on
> both
> regular and non-blocking rings. This introduces an additional branch
> in
> the datapath, but this should be a highly predictable branch.
> ring_perf_autotest shows a negligible performance impact; it's hard
> to
> distinguish a real difference versus system noise.
>
>                                   | ring_perf_autotest cycles with
> branch -
>              Test                 |   ring_perf_autotest cycles
> without
> ------------------------------------------------------------------
> SP/SC single enq/dequeue          | 0.33
> MP/MC single enq/dequeue          | -4.00
> SP/SC burst enq/dequeue (size 8)  | 0.00
> MP/MC burst enq/dequeue (size 8)  | 0.00
> SP/SC burst enq/dequeue (size 32) | 0.00
> MP/MC burst enq/dequeue (size 32) | 0.00
> SC empty dequeue                  | 1.00
> MC empty dequeue                  | 0.00
>
> Single lcore:
> SP/SC bulk enq/dequeue (size 8)   | 0.49
> MP/MC bulk enq/dequeue (size 8)   | 0.08
> SP/SC bulk enq/dequeue (size 32)  | 0.07
> MP/MC bulk enq/dequeue (size 32)  | 0.09
>
> Two physical cores:
> SP/SC bulk enq/dequeue (size 8)   | 0.19
> MP/MC bulk enq/dequeue (size 8)   | -0.37
> SP/SC bulk enq/dequeue (size 32)  | 0.09
> MP/MC bulk enq/dequeue (size 32)  | -0.05
>
> Two NUMA nodes:
> SP/SC bulk enq/dequeue (size 8)   | -1.96
> MP/MC bulk enq/dequeue (size 8)   | 0.88
> SP/SC bulk enq/dequeue (size 32)  | 0.10
> MP/MC bulk enq/dequeue (size 32)  | 0.46
>
> Test setup: x86_64 build with default config, dual-socket Xeon E5-
> 2699 v4,
> running on isolcpus cores with a tickless scheduler. Each test run
> three
> times and the results averaged.
>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> ---
>  lib/librte_ring/rte_ring.c           |  72 ++++-
>  lib/librte_ring/rte_ring.h           | 550
> +++++++++++++++++++++++++++++++++--
>  lib/librte_ring/rte_ring_version.map |   7 +
>  3 files changed, 587 insertions(+), 42 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index d215acecc..f3378dccd 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>
>  /* return the size of memory occupied by a ring */
>  ssize_t
> -rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
>  {
> -ssize_t sz;
> +ssize_t sz, elt_sz;
>
>  /* count must be a power of 2 */
>  if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
>  return -EINVAL;
>  }
>
> -sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) :
> sizeof(void *);
> +
> +sz = sizeof(struct rte_ring) + count * elt_sz;
>  sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>  return sz;
>  }
> +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
> +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
> +       unsigned int flags),
> +  rte_ring_get_memsize_v1905);
> +
> +ssize_t
> +rte_ring_get_memsize_v20(unsigned int count)
> +{
> +return rte_ring_get_memsize_v1905(count, 0);
> +}
> +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
>
>  int
>  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  if (ret < 0 || ret >= (int)sizeof(r->name))
>  return -ENAMETOOLONG;
>  r->flags = flags;
> -r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> -r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
>
>  if (flags & RING_F_EXACT_SZ) {
>  r->size = rte_align32pow2(count + 1);
> @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char
> *name, unsigned count,
>  r->mask = count - 1;
>  r->capacity = r->mask;
>  }
> -r->prod.head = r->cons.head = 0;
> -r->prod.tail = r->cons.tail = 0;
> +
> +if (flags & RING_F_NB) {
> +uint64_t i;
> +
> +r->prod_64.single = (flags & RING_F_SP_ENQ) ?
> __IS_SP : __IS_MP;
> +r->cons_64.single = (flags & RING_F_SC_DEQ) ?
> __IS_SC : __IS_MC;
> +r->prod_64.head = r->cons_64.head = 0;
> +r->prod_64.tail = r->cons_64.tail = 0;
> +
> +for (i = 0; i < r->size; i++) {
> +struct nb_ring_entry *ring_ptr, *base;
> +
> +base = ((struct nb_ring_entry *)&r[1]);
> +
> +ring_ptr = &base[i & r->mask];
> +
> +ring_ptr->cnt = i;
> +}
> +} else {
> +r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> +r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
> +r->prod.head = r->cons.head = 0;
> +r->prod.tail = r->cons.tail = 0;
> +}
>
>  return 0;
>  }
> @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned
> count, int socket_id,
>
>  ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head,
> rte_ring_list);
>
> +#if !defined(RTE_ARCH_X86_64)
> +if (flags & RING_F_NB) {
> +printf("RING_F_NB is only supported on x86-64
> platforms\n");
> +rte_errno = EINVAL;
> +return NULL;
> +}
> +#endif
> +
>  /* for an exact size ring, round up from count to a power of
> two */
>  if (flags & RING_F_EXACT_SZ)
>  count = rte_align32pow2(count + 1);
>
> -ring_size = rte_ring_get_memsize(count);
> +ring_size = rte_ring_get_memsize(count, flags);
>  if (ring_size < 0) {
>  rte_errno = ring_size;
>  return NULL;
> @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring
> *r)
>  fprintf(f, "  flags=%x\n", r->flags);
>  fprintf(f, "  size=%"PRIu32"\n", r->size);
>  fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
> -fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> -fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> -fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> -fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +if (r->flags & RING_F_NB) {
> +fprintf(f, "  ct=%"PRIu64"\n", r->cons_64.tail);
> +fprintf(f, "  ch=%"PRIu64"\n", r->cons_64.head);
> +fprintf(f, "  pt=%"PRIu64"\n", r->prod_64.tail);
> +fprintf(f, "  ph=%"PRIu64"\n", r->prod_64.head);
> +} else {
> +fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> +fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> +fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> +fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +}
>  fprintf(f, "  used=%u\n", rte_ring_count(r));
>  fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index b270a4746..08c9de6a6 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -134,6 +134,18 @@ struct rte_ring {
>   */
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +/**
> + * The ring uses non-blocking enqueue and dequeue functions. These
> functions
> + * do not have the "non-preemptive" constraint of a regular rte
> ring, and thus
> + * are suited for applications using preemptible pthreads. However,
> the
> + * non-blocking functions have worse average-case performance than
> their
> + * regular rte ring counterparts. When used as the handler for a
> mempool,
> + * per-thread caching can mitigate the performance difference by
> reducing the
> + * number (and contention) of ring accesses.
> + *
> + * This flag is only supported on x86_64 platforms.
> + */
> +#define RING_F_NB 0x0008
>
>  /* @internal defines for passing to the enqueue dequeue worker
> functions */
>  #define __IS_SP 1
> @@ -151,11 +163,15 @@ struct rte_ring {
>   *
>   * @param count
>   *   The number of elements in the ring (must be a power of 2).
> + * @param flags
> + *   The flags the ring will be created with.
>   * @return
>   *   - The memory size needed for the ring on success.
>   *   - -EINVAL if count is not a power of 2.
>   */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int
> flags);
> +ssize_t rte_ring_get_memsize_v20(unsigned int count);
> +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int
> flags);
>
>  /**
>   * Initialize a ring structure.
> @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior
> when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-
> power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   0 on success, or a negative value on error.
>   */
> @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const
> char *name, unsigned count,
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior
> when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-
> power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   On success, the pointer to the new allocated ring. NULL on
> error with
>   *    rte_errno set appropriately. Possible errno values include:
>   *    - E_RTE_NO_CONFIG - function could not get pointer to
> rte_config structure
>   *    - E_RTE_SECONDARY - function was called from a secondary
> process instance
> - *    - EINVAL - count provided is not a power of 2
> + *    - EINVAL - count provided is not a power of 2, or RING_F_NB is
> used on an
> + *      unsupported platform
>   *    - ENOSPC - the maximum number of memzones has already been
> allocated
>   *    - EEXIST - a memzone with the same name already exists
>   *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  } \
>  } while (0)
>
> +/* The actual enqueue of pointers on the ring.
> + * Used only by the single-producer non-blocking enqueue function,
> but
> + * out-lined here for code readability.
> + */
> +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do {
> \
> +unsigned int i; \
> +const uint32_t size = (r)->size; \
> +size_t idx = prod_head & (r)->mask; \
> +size_t new_cnt = prod_head + size; \
> +struct nb_ring_entry *ring = (struct nb_ring_entry
> *)ring_start; \
> +if (likely(idx + n < size)) { \
> +for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4,
> idx += 4) { \
> +ring[idx].ptr = obj_table[i]; \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx + 1].ptr = obj_table[i + 1]; \
> +ring[idx + 1].cnt = new_cnt + i + 1;  \
> +ring[idx + 2].ptr = obj_table[i + 2]; \
> +ring[idx + 2].cnt = new_cnt + i + 2;  \
> +ring[idx + 3].ptr = obj_table[i + 3]; \
> +ring[idx + 3].cnt = new_cnt + i + 3;  \
> +} \
> +switch (n & 0x3) { \
> +case 3: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; /*
> fallthrough */ \
> +case 2: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; /*
> fallthrough */ \
> +case 1: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; \
> +} \
> +} else { \
> +for (i = 0; idx < size; i++, idx++) { \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx].ptr = obj_table[i]; \
> +} \
> +for (idx = 0; i < n; i++, idx++) {    \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx].ptr = obj_table[i]; \
> +} \
> +} \
> +} while (0)
> +
>  /* the actual copy of pointers on the ring to obj_table.
>   * Placed here since identical code needed in both
>   * single and multi consumer dequeue functions */
> @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  } \
>  } while (0)
>
> +/* The actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer non-blocking dequeue functions.
> + */
> +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do {
> \
> +unsigned int i; \
> +size_t idx = cons_head & (r)->mask; \
> +const uint32_t size = (r)->size; \
> +struct nb_ring_entry *ring = (struct nb_ring_entry
> *)ring_start; \
> +if (likely(idx + n < size)) { \
> +for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx
> += 4) {\
> +obj_table[i] = ring[idx].ptr; \
> +obj_table[i + 1] = ring[idx + 1].ptr; \
> +obj_table[i + 2] = ring[idx + 2].ptr; \
> +obj_table[i + 3] = ring[idx + 3].ptr; \
> +} \
> +switch (n & 0x3) { \
> +case 3: \
> +obj_table[i++] = ring[idx++].ptr; /*
> fallthrough */ \
> +case 2: \
> +obj_table[i++] = ring[idx++].ptr; /*
> fallthrough */ \
> +case 1: \
> +obj_table[i++] = ring[idx++].ptr; \
> +} \
> +} else { \
> +for (i = 0; idx < size; i++, idx++) \
> +obj_table[i] = ring[idx].ptr; \
> +for (idx = 0; i < n; i++, idx++) \
> +obj_table[i] = ring[idx].ptr; \
> +} \
> +} while (0)
> +
> +
>  /* Between load and load. there might be cpu reorder in weak model
>   * (powerpc/arm).
>   * There are 2 choices for the users
> @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  #endif
>  #include "rte_ring_generic_64.h"
>
> +/* @internal 128-bit structure used by the non-blocking ring */
> +struct nb_ring_entry {
> +void *ptr; /**< Data pointer */
> +uint64_t cnt; /**< Modification counter */
Why not make 'cnt' uintptr_t? This way 32-bit architectures will also
be supported.

> +};
> +
> +/* The non-blocking ring algorithm is based on the original rte ring
> (derived
> + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> non-blocking
> + * concurrent queue.
> + */
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (single-
> producer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const
> *obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *free_space)
> +{
> +uint32_t free_entries;
> +size_t head, next;
> +
> +n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> + &head, &next,
> &free_entries);
> +if (n == 0)
> +goto end;
> +
> +ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +r->prod_64.tail += n;
Don't we need release order (or smp_wmb) between writing of the ring
pointers and the update of tail? By updating the tail pointer, we are
synchronising with a consumer.

> +
> +end:
> +if (free_space != NULL)
> +*free_space = free_entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (multi-
> producer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const
> *obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *free_space)
> +{
> +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> +RTE_SET_USED(r);
> +RTE_SET_USED(obj_table);
> +RTE_SET_USED(n);
> +RTE_SET_USED(behavior);
> +RTE_SET_USED(free_space);
> +#ifndef ALLOW_EXPERIMENTAL_API
> +printf("[%s()] RING_F_NB requires an experimental API."
> +       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> +       , __func__);
> +#endif
> +return 0;
> +#endif
> +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> +size_t head, next, tail;
> +uint32_t free_entries;
> +unsigned int i;
> +
> +n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> + &head, &next,
> &free_entries);
> +if (n == 0)
> +goto end;
> +
> +for (i = 0; i < n; /* i incremented if enqueue succeeds */)
> {
> +struct nb_ring_entry old_value, new_value;
> +struct nb_ring_entry *ring_ptr;
> +
> +/* Enqueue to the tail entry. If another thread wins
> the race,
> + * retry with the new tail.
> + */
> +tail = r->prod_64.tail;
> +
> +ring_ptr = &((struct nb_ring_entry *)&r[1])[tail &
> r->mask];
This is a very ugly cast. Also I think it is unnecessary. What's
preventing this from being written without a cast? Perhaps the ring
array needs to be a union of "void *" and struct nb_ring_entry?

> +
> +old_value = *ring_ptr;
> +
> +/* If the tail entry's modification counter doesn't
> match the
> + * producer tail index, it's already been updated.
> + */
> +if (old_value.cnt != tail)
> +continue;
Continue restarts the loop at the condition test in the for statement,
'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and
'ring[tail]'. If some other thread never updates 'prod_64.tail', the
test here (ring[tail].cnt != tail) will still be false and we will spin
forever.
Waiting for other threads <=> blocking behaviour so this is not a non-
blocking design.

> +
> +/* Prepare the new entry. The cnt field mitigates
> the ABA
> + * problem on the ring write.
> + */
> +new_value.ptr = obj_table[i];
> +new_value.cnt = tail + r->size;
> +
> +if (rte_atomic128_cmpset((volatile rte_int128_t
> *)ring_ptr,
> + (rte_int128_t *)&old_value,
> + (rte_int128_t
> *)&new_value))
> +i++;
> +
> +/* Every thread attempts the cmpset, so they don't
> have to wait
> + * for the thread that successfully enqueued to the
> ring.
> + * Using a 64-bit tail mitigates the ABA problem
> here.
> + *
> + * Built-in used to handle variable-sized tail
> index.
> + */
But prod_64.tail is 64 bits so not really variable size?

> +__sync_bool_compare_and_swap(&r->prod_64.tail, tail,
> tail + 1);
What memory order is required here? Why not use
__atomic_compare_exchange() with explicit memory order parameters?

> +}
> +
> +end:
> +if (free_space != NULL)
> +*free_space = free_entries - n;
> +return n;
> +#endif
> +}
> +
> +/**
> + * @internal Enqueue several objects on the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head
> update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const
> *obj_table,
> + unsigned int n, enum
> rte_ring_queue_behavior behavior,
> + unsigned int is_sp, unsigned int
> *free_space)
> +{
> +if (is_sp)
> +return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> +   behavior,
> free_space);
> +else
> +return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> +   behavior,
> free_space);
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (single-
> consumer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *available)
> +{
> +size_t head, next;
> +uint32_t entries;
> +
> +n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> + &head, &next, &entries);
> +if (n == 0)
> +goto end;
> +
> +DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +r->cons_64.tail += n;
Memory ordering? Consumer synchronises with producer.

> +
> +end:
> +if (available != NULL)
> +*available = entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (multi-
> consumer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *available)
> +{
> +size_t head, next;
> +uint32_t entries;
> +
> +n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
> + &head, &next, &entries);
> +if (n == 0)
> +goto end;
> +
> +while (1) {
> +size_t tail = r->cons_64.tail;
> +
> +/* Dequeue from the cons tail onwards. If multiple
> threads read
> + * the same pointers, the thread that successfully
> performs the
> + * CAS will keep them and the other(s) will retry.
> + */
> +DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
> +
> +next = tail + n;
> +
> +/* Built-in used to handle variable-sized tail
> index. */
> +if (__sync_bool_compare_and_swap(&r->cons_64.tail,
> tail, next))
> +/* There is potential for the ABA problem
> here, but
> + * that is mitigated by the large (64-bit)
> tail.
> + */
> +break;
> +}
> +
> +end:
> +if (available != NULL)
> +*available = entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
> + unsigned int n, enum rte_ring_queue_behavior
> behavior,
> + unsigned int is_sc, unsigned int *available)
> +{
> +if (is_sc)
> +return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
> +   behavior,
> available);
> +else
> +return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
> +   behavior,
> available);
> +}
> +
>  /**
>   * @internal Enqueue several objects on the ring
>   *
> @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_MP, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_MP,
> +free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_MP,
> +     free_space);
>  }
>
>  /**
> @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_SP, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_SP,
> +free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_SP,
> +     free_space);
>  }
>
>  /**
> @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>        unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -r->prod.single, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> ,
> +r->prod_64.single,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> +     r->prod.single,
> free_space);
>  }
>
>  /**
> @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_MC, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_MC,
> +available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_MC,
> +     available);
>  }
>
>  /**
> @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_SC, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_SC,
> +available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_SC,
> +     available);
>  }
>
>  /**
> @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned
> int n,
>  unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -r->cons.single, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> ,
> +r->cons_64.single,
> available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> +     r->cons.single,
> available);
>  }
>
>  /**
> @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void
> **obj_p)
>  static inline unsigned
>  rte_ring_count(const struct rte_ring *r)
>  {
> -uint32_t prod_tail = r->prod.tail;
> -uint32_t cons_tail = r->cons.tail;
> -uint32_t count = (prod_tail - cons_tail) & r->mask;
> +uint32_t count;
> +
> +if (r->flags & RING_F_NB)
> +count = (r->prod_64.tail - r->cons_64.tail) & r-
> >mask;
> +else
> +count = (r->prod.tail - r->cons.tail) & r->mask;
> +
>  return (count > r->capacity) ? r->capacity : count;
>  }
>
> @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned
>  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_MP,
> free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_MP,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_MP, free_space);
>  }
>
>  /**
> @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned
>  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_SP,
> free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_SP,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_SP, free_space);
>  }
>
>  /**
> @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>        unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -r->prod.single, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +r->prod_64.single,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     r->prod.single,
> free_space);
>  }
>
>  /**
> @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned
>  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_MC,
> available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_MC, available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_MC, available);
>  }
>
>  /**
> @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned
>  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_SC,
> available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_SC, available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_SC, available);
>  }
>
>  /**
> @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE,
> -r->cons.single, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +r->cons_64.single,
> available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     r->cons.single,
> available);
>  }
>
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index d935efd0d..8969467af 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -17,3 +17,10 @@ DPDK_2.2 {
>  rte_ring_free;
>
>  } DPDK_2.0;
> +
> +DPDK_19.05 {
> +global:
> +
> +rte_ring_get_memsize;
> +
> +} DPDK_2.2;
IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.


More information about the dev mailing list