[dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation

Ola Liljedahl Ola.Liljedahl at arm.com
Tue Jan 22 15:49:39 CET 2019


(resending without the confidential footer, think I figured it out, ignore the
previous email from me in this thread)

-- Ola

On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote:
> This commit adds support for non-blocking circular ring enqueue and dequeue
> functions. The ring uses a 128-bit compare-and-swap instruction, and thus
> is currently limited to x86_64.
> 
> The algorithm is based on the original rte ring (derived from FreeBSD's
> bufring.h) and inspired by Michael and Scott's non-blocking concurrent
> queue. Importantly, it adds a modification counter to each ring entry to
> ensure only one thread can write to an unused entry.
> 
> -----
> Algorithm:
> 
> Multi-producer non-blocking enqueue:
> 1. Move the producer head index 'n' locations forward, effectively
>    reserving 'n' locations.
> 2. For each pointer:
>  a. Read the producer tail index, then ring[tail]. If ring[tail]'s
>     modification counter isn't 'tail', retry.
>  b. Construct the new entry: {pointer, tail + ring size}
>  c. Compare-and-swap the old entry with the new. If unsuccessful, the
>     next loop iteration will try to enqueue this pointer again.
>  d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c
>     succeeded. This guarantees threads can make forward progress.
> 
> Multi-consumer non-blocking dequeue:
> 1. Move the consumer head index 'n' locations forward, effectively
>    reserving 'n' pointers to be dequeued.
> 2. Copy 'n' pointers into the caller's object table (ignoring the
>    modification counter), starting from ring[tail], then compare-and-swap
>    the tail index with 'tail + n'.  If unsuccessful, repeat step 2.
> 
> -----
> Discussion:
> 
> There are two cases where the ABA problem is mitigated:
> 1. Enqueueing a pointer to the ring: without a modification counter
>    tied to the tail index, the index could become stale by the time the
>    enqueue happens, causing it to overwrite valid data. Tying the
>    counter to the tail index gives us an expected value (as opposed to,
>    say, a monotonically incrementing counter).
> 
>    Since the counter will eventually wrap, there is potential for the ABA
>    problem. However, using a 64-bit counter makes this likelihood
>    effectively zero.
> 
> 2. Updating a tail index: the ABA problem can occur if the thread is
>    preempted and the tail index wraps around. However, using 64-bit indexes
>    makes this likelihood effectively zero.
> 
> With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations
> and a dequeue of n pointers uses 2. This algorithm has worse average-case
> performance than the regular rte ring (particularly a highly-contended ring
> with large bulk accesses), however:
> - For applications with preemptible pthreads, the regular rte ring's
>   worst-case performance (i.e. one thread being preempted in the
>   update_tail() critical section) is much worse than the non-blocking
>   ring's.
> - Software caching can mitigate the average case performance for ring-based
>   algorithms. For example, a non-blocking ring based mempool (a likely use
>   case for this ring) with per-thread caching.
> 
> The non-blocking ring is enabled via a new flag, RING_F_NB. Because the
> ring's memsize is now a function of its flags (the non-blocking ring
> requires 128b for each entry), this commit adds a new argument ('flags') to
> rte_ring_get_memsize(). An API deprecation notice will be sent in a
> separate commit.
> 
> For ease-of-use, existing ring enqueue and dequeue functions work on both
> regular and non-blocking rings. This introduces an additional branch in
> the datapath, but this should be a highly predictable branch.
> ring_perf_autotest shows a negligible performance impact; it's hard to
> distinguish a real difference versus system noise.
> 
>                                   | ring_perf_autotest cycles with branch -
>              Test                 |   ring_perf_autotest cycles without
> ------------------------------------------------------------------
> SP/SC single enq/dequeue          | 0.33
> MP/MC single enq/dequeue          | -4.00
> SP/SC burst enq/dequeue (size 8)  | 0.00
> MP/MC burst enq/dequeue (size 8)  | 0.00
> SP/SC burst enq/dequeue (size 32) | 0.00
> MP/MC burst enq/dequeue (size 32) | 0.00
> SC empty dequeue                  | 1.00
> MC empty dequeue                  | 0.00
> 
> Single lcore:
> SP/SC bulk enq/dequeue (size 8)   | 0.49
> MP/MC bulk enq/dequeue (size 8)   | 0.08
> SP/SC bulk enq/dequeue (size 32)  | 0.07
> MP/MC bulk enq/dequeue (size 32)  | 0.09
> 
> Two physical cores:
> SP/SC bulk enq/dequeue (size 8)   | 0.19
> MP/MC bulk enq/dequeue (size 8)   | -0.37
> SP/SC bulk enq/dequeue (size 32)  | 0.09
> MP/MC bulk enq/dequeue (size 32)  | -0.05
> 
> Two NUMA nodes:
> SP/SC bulk enq/dequeue (size 8)   | -1.96
> MP/MC bulk enq/dequeue (size 8)   | 0.88
> SP/SC bulk enq/dequeue (size 32)  | 0.10
> MP/MC bulk enq/dequeue (size 32)  | 0.46
> 
> Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
> running on isolcpus cores with a tickless scheduler. Each test run three
> times and the results averaged.
> 
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> ---
>  lib/librte_ring/rte_ring.c           |  72 ++++-
>  lib/librte_ring/rte_ring.h           | 550 +++++++++++++++++++++++++++++++++-
> -
>  lib/librte_ring/rte_ring_version.map |   7 +
>  3 files changed, 587 insertions(+), 42 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index d215acecc..f3378dccd 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  
>  /* return the size of memory occupied by a ring */
>  ssize_t
> -rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
>  {
> -	ssize_t sz;
> +	ssize_t sz, elt_sz;
>  
>  	/* count must be a power of 2 */
>  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
>  		return -EINVAL;
>  	}
>  
> -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +	elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
> +
> +	sz = sizeof(struct rte_ring) + count * elt_sz;
>  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>  	return sz;
>  }
> +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
> +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
> +					       unsigned int flags),
> +		  rte_ring_get_memsize_v1905);
> +
> +ssize_t
> +rte_ring_get_memsize_v20(unsigned int count)
> +{
> +	return rte_ring_get_memsize_v1905(count, 0);
> +}
> +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
>  
>  int
>  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned
> count,
>  	if (ret < 0 || ret >= (int)sizeof(r->name))
>  		return -ENAMETOOLONG;
>  	r->flags = flags;
> -	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> -	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
>  
>  	if (flags & RING_F_EXACT_SZ) {
>  		r->size = rte_align32pow2(count + 1);
> @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  		r->mask = count - 1;
>  		r->capacity = r->mask;
>  	}
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +
> +	if (flags & RING_F_NB) {
> +		uint64_t i;
> +
> +		r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> +		r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
> +		r->prod_64.head = r->cons_64.head = 0;
> +		r->prod_64.tail = r->cons_64.tail = 0;
> +
> +		for (i = 0; i < r->size; i++) {
> +			struct nb_ring_entry *ring_ptr, *base;
> +
> +			base = ((struct nb_ring_entry *)&r[1]);
> +
> +			ring_ptr = &base[i & r->mask];
> +
> +			ring_ptr->cnt = i;
> +		}
> +	} else {
> +		r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> +		r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> +		r->prod.head = r->cons.head = 0;
> +		r->prod.tail = r->cons.tail = 0;
> +	}
>  
>  	return 0;
>  }
> @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned count, int
> socket_id,
>  
>  	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
>  
> +#if !defined(RTE_ARCH_X86_64)
> +	if (flags & RING_F_NB) {
> +		printf("RING_F_NB is only supported on x86-64 platforms\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +#endif
> +
>  	/* for an exact size ring, round up from count to a power of two */
>  	if (flags & RING_F_EXACT_SZ)
>  		count = rte_align32pow2(count + 1);
>  
> -	ring_size = rte_ring_get_memsize(count);
> +	ring_size = rte_ring_get_memsize(count, flags);
>  	if (ring_size < 0) {
>  		rte_errno = ring_size;
>  		return NULL;
> @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
>  	fprintf(f, "  flags=%x\n", r->flags);
>  	fprintf(f, "  size=%"PRIu32"\n", r->size);
>  	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
> -	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> -	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> -	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> -	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +	if (r->flags & RING_F_NB) {
> +		fprintf(f, "  ct=%"PRIu64"\n", r->cons_64.tail);
> +		fprintf(f, "  ch=%"PRIu64"\n", r->cons_64.head);
> +		fprintf(f, "  pt=%"PRIu64"\n", r->prod_64.tail);
> +		fprintf(f, "  ph=%"PRIu64"\n", r->prod_64.head);
> +	} else {
> +		fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> +		fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> +		fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> +		fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +	}
>  	fprintf(f, "  used=%u\n", rte_ring_count(r));
>  	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index b270a4746..08c9de6a6 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -134,6 +134,18 @@ struct rte_ring {
>   */
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +/**
> + * The ring uses non-blocking enqueue and dequeue functions. These functions
> + * do not have the "non-preemptive" constraint of a regular rte ring, and
> thus
> + * are suited for applications using preemptible pthreads. However, the
> + * non-blocking functions have worse average-case performance than their
> + * regular rte ring counterparts. When used as the handler for a mempool,
> + * per-thread caching can mitigate the performance difference by reducing the
> + * number (and contention) of ring accesses.
> + *
> + * This flag is only supported on x86_64 platforms.
> + */
> +#define RING_F_NB 0x0008
>  
>  /* @internal defines for passing to the enqueue dequeue worker functions */
>  #define __IS_SP 1
> @@ -151,11 +163,15 @@ struct rte_ring {
>   *
>   * @param count
>   *   The number of elements in the ring (must be a power of 2).
> + * @param flags
> + *   The flags the ring will be created with.
>   * @return
>   *   - The memory size needed for the ring on success.
>   *   - -EINVAL if count is not a power of 2.
>   */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
> +ssize_t rte_ring_get_memsize_v20(unsigned int count);
> +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
>  
>  /**
>   * Initialize a ring structure.
> @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   0 on success, or a negative value on error.
>   */
> @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   On success, the pointer to the new allocated ring. NULL on error with
>   *    rte_errno set appropriately. Possible errno values include:
>   *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
>   *    - E_RTE_SECONDARY - function was called from a secondary process
> instance
> - *    - EINVAL - count provided is not a power of 2
> + *    - EINVAL - count provided is not a power of 2, or RING_F_NB is used on
> an
> + *      unsupported platform
>   *    - ENOSPC - the maximum number of memzones has already been allocated
>   *    - EEXIST - a memzone with the same name already exists
>   *    - ENOMEM - no appropriate memory area found in which to create memzone
> @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	} \
>  } while (0)
>  
> +/* The actual enqueue of pointers on the ring.
> + * Used only by the single-producer non-blocking enqueue function, but
> + * out-lined here for code readability.
> + */
> +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
> +	unsigned int i; \
> +	const uint32_t size = (r)->size; \
> +	size_t idx = prod_head & (r)->mask; \
> +	size_t new_cnt = prod_head + size; \
> +	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) {
> \
> +			ring[idx].ptr = obj_table[i]; \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx + 1].ptr = obj_table[i + 1]; \
> +			ring[idx + 1].cnt = new_cnt + i + 1;  \
> +			ring[idx + 2].ptr = obj_table[i + 2]; \
> +			ring[idx + 2].cnt = new_cnt + i + 2;  \
> +			ring[idx + 3].ptr = obj_table[i + 3]; \
> +			ring[idx + 3].cnt = new_cnt + i + 3;  \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> +		case 2: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> +		case 1: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) { \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx].ptr = obj_table[i]; \
> +		} \
> +		for (idx = 0; i < n; i++, idx++) {    \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx].ptr = obj_table[i]; \
> +		} \
> +	} \
> +} while (0)
> +
>  /* the actual copy of pointers on the ring to obj_table.
>   * Placed here since identical code needed in both
>   * single and multi consumer dequeue functions */
> @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	} \
>  } while (0)
>  
> +/* The actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer non-blocking dequeue functions.
> + */
> +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
> +	unsigned int i; \
> +	size_t idx = cons_head & (r)->mask; \
> +	const uint32_t size = (r)->size; \
> +	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> +			obj_table[i] = ring[idx].ptr; \
> +			obj_table[i + 1] = ring[idx + 1].ptr; \
> +			obj_table[i + 2] = ring[idx + 2].ptr; \
> +			obj_table[i + 3] = ring[idx + 3].ptr; \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> +		case 2: \
> +			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> +		case 1: \
> +			obj_table[i++] = ring[idx++].ptr; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) \
> +			obj_table[i] = ring[idx].ptr; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			obj_table[i] = ring[idx].ptr; \
> +	} \
> +} while (0)
> +
> +
>  /* Between load and load. there might be cpu reorder in weak model
>   * (powerpc/arm).
>   * There are 2 choices for the users
> @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  #endif
>  #include "rte_ring_generic_64.h"
>  
> +/* @internal 128-bit structure used by the non-blocking ring */
> +struct nb_ring_entry {
> +	void *ptr; /**< Data pointer */
> +	uint64_t cnt; /**< Modification counter */
Why not make 'cnt' uintptr_t? This way 32-bit architectures will also
be supported. I think there are some claims that DPDK still supports e.g. ARMv7a
and possibly also 32-bit x86?

> +};
> +
> +/* The non-blocking ring algorithm is based on the original rte ring (derived
> + * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
> + * concurrent queue.
> + */
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (single-producer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *free_space)
> +{
> +	uint32_t free_entries;
> +	size_t head, next;
> +
> +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> +					 &head, &next, &free_entries);
> +	if (n == 0)
> +		goto end;
> +
> +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +	r->prod_64.tail += n;
Don't we need release order when (or smp_wmb between) writing of the ring
pointers and the update of tail? By updating the tail pointer, we are
synchronising with a consumer.

I prefer using __atomic operations even for load and store. You can see which
parts of the code that synchronise with each other, e.g. store-release to some
location synchronises with load-acquire from the same location. If you don't
know how different threads synchronise with each other, you are very likely to
make mistakes.

> +
> +end:
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (multi-producer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *free_space)
> +{
> +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> +	RTE_SET_USED(r);
> +	RTE_SET_USED(obj_table);
> +	RTE_SET_USED(n);
> +	RTE_SET_USED(behavior);
> +	RTE_SET_USED(free_space);
> +#ifndef ALLOW_EXPERIMENTAL_API
> +	printf("[%s()] RING_F_NB requires an experimental API."
> +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> +	       , __func__);
> +#endif
> +	return 0;
> +#endif
> +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> +	size_t head, next, tail;
> +	uint32_t free_entries;
> +	unsigned int i;
> +
> +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> +					 &head, &next, &free_entries);
> +	if (n == 0)
> +		goto end;
> +
> +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> +		struct nb_ring_entry old_value, new_value;
> +		struct nb_ring_entry *ring_ptr;
> +
> +		/* Enqueue to the tail entry. If another thread wins the
> race,
> +		 * retry with the new tail.
> +		 */
> +		tail = r->prod_64.tail;
> +
> +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
This is an ugly expression and cast. Also I think it is unnecessary. What's
preventing this from being written without a cast? Perhaps the ring array needs
to be a union of "void *" and struct nb_ring_entry?

> +
> +		old_value = *ring_ptr;
> +
> +		/* If the tail entry's modification counter doesn't match the
> +		 * producer tail index, it's already been updated.
> +		 */
> +		if (old_value.cnt != tail)
> +			continue;
Continue restarts the loop at the condition test in the for statement,
'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and
'ring[tail & mask]'. If some other thread never updates 'prod_64.tail', the
test here (ring[tail].cnt != tail) will still be true and we will spin
forever.

Waiting for other threads <=> blocking behaviour so this is not a non-
blocking design.

> +
> +		/* Prepare the new entry. The cnt field mitigates the ABA
> +		 * problem on the ring write.
> +		 */
> +		new_value.ptr = obj_table[i];
> +		new_value.cnt = tail + r->size;
> +
> +		if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
> +					 (rte_int128_t *)&old_value,
> +					 (rte_int128_t *)&new_value))
> +			i++;
> +
> +		/* Every thread attempts the cmpset, so they don't have to
> wait
> +		 * for the thread that successfully enqueued to the ring.
> +		 * Using a 64-bit tail mitigates the ABA problem here.
> +		 *
> +		 * Built-in used to handle variable-sized tail index.
> +		 */
But prod_64.tail is 64 bits so not really variable size?

> +		__sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail +
> 1);
What memory order is required here? Why not use
__atomic_compare_exchange() with explicit memory order parameters?

> +	}
> +
> +end:
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +	return n;
> +#endif
> +}
> +
> +/**
> + * @internal Enqueue several objects on the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, enum rte_ring_queue_behavior
> behavior,
> +			 unsigned int is_sp, unsigned int *free_space)
> +{
> +	if (is_sp)
> +		return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> +						   behavior, free_space);
> +	else
> +		return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> +						   behavior, free_space);
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (single-consumer
> only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *available)
> +{
> +	size_t head, next;
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> +					 &head, &next, &entries);
> +	if (n == 0)
> +		goto end;
> +
> +	DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +	r->cons_64.tail += n;
Memory ordering? Consumer synchronises with producer.

> +
> +end:
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (multi-consumer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *available)
> +{
> +	size_t head, next;
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
> +					 &head, &next, &entries);
> +	if (n == 0)
> +		goto end;
> +
> +	while (1) {
> +		size_t tail = r->cons_64.tail;
> +
> +		/* Dequeue from the cons tail onwards. If multiple threads
> read
> +		 * the same pointers, the thread that successfully performs
> the
> +		 * CAS will keep them and the other(s) will retry.
> +		 */
> +		DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
> +
> +		next = tail + n;
> +
> +		/* Built-in used to handle variable-sized tail index. */
> +		if (__sync_bool_compare_and_swap(&r->cons_64.tail, tail,
> next))
> +			/* There is potential for the ABA problem here, but
> +			 * that is mitigated by the large (64-bit) tail.
> +			 */
> +			break;
> +	}
> +
> +end:
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
> +		 unsigned int n, enum rte_ring_queue_behavior behavior,
> +		 unsigned int is_sc, unsigned int *available)
> +{
> +	if (is_sc)
> +		return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
> +						   behavior, available);
> +	else
> +		return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
> +						   behavior, available);
> +}
> +
>  /**
>   * @internal Enqueue several objects on the ring
>   *
> @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_MP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_MP,
> +						free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_MP,
> +					     free_space);
>  }
>  
>  /**
> @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_SP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_SP,
> +						free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_SP,
> +					     free_space);
>  }
>  
>  /**
> @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			r->prod.single, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> +						r->prod_64.single,
> free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED,
> +					     r->prod.single, free_space);
>  }
>  
>  /**
> @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_MC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_MC,
> +						available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_MC,
> +					     available);
>  }
>  
>  /**
> @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_SC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_SC,
> +						available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_SC,
> +					     available);
>  }
>  
>  /**
> @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
>  		unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -				r->cons.single, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> +						r->cons_64.single,
> available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED,
> +					     r->cons.single, available);
>  }
>  
>  /**
> @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
>  static inline unsigned
>  rte_ring_count(const struct rte_ring *r)
>  {
> -	uint32_t prod_tail = r->prod.tail;
> -	uint32_t cons_tail = r->cons.tail;
> -	uint32_t count = (prod_tail - cons_tail) & r->mask;
> +	uint32_t count;
> +
> +	if (r->flags & RING_F_NB)
> +		count = (r->prod_64.tail - r->cons_64.tail) & r->mask;
> +	else
> +		count = (r->prod.tail - r->cons.tail) & r->mask;
> +
>  	return (count > r->capacity) ? r->capacity : count;
>  }
>  
> @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned
>  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_MP, free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_MP, free_space);
>  }
>  
>  /**
> @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned
>  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_SP, free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_SP, free_space);
>  }
>  
>  /**
> @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -			r->prod.single, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						r->prod_64.single,
> free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     r->prod.single, free_space);
>  }
>  
>  /**
> @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned
>  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_MC, available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_MC, available);
>  }
>  
>  /**
> @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned
>  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_SC, available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_SC, available);
>  }
>  
>  /**
> @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.single, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						r->cons_64.single,
> available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     r->cons.single, available);
>  }
>  
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index d935efd0d..8969467af 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -17,3 +17,10 @@ DPDK_2.2 {
>  	rte_ring_free;
>  
>  } DPDK_2.0;
> +
> +DPDK_19.05 {
> +	global:
> +
> +	rte_ring_get_memsize;
> +
> +} DPDK_2.2;
-- 
Ola Liljedahl, Networking System Architect, Arm
Phone +46706866373, Skype ola.liljedahl



More information about the dev mailing list