[dpdk-dev] [PATCH v5 3/6] ring: add a lock-free implementation

Gage Eads gage.eads at intel.com
Tue Mar 5 18:40:16 CET 2019


This commit adds support for lock-free circular ring enqueue and dequeue
functions. The ring is supported on 32- and 64-bit architectures, however
it uses a 128-bit compare-and-swap instruction when run on a 64-bit
architecture, and thus is currently limited to x86_64.

The algorithm is based on Ola Liljedahl's lfring, modified to fit within
the rte ring API. With no contention, an enqueue of n pointers uses (1 + n)
CAS operations and a dequeue of n pointers uses 1. This algorithm has worse
average-case performance than the regular rte ring (particularly a
highly-contended ring with large bulk accesses), however:
- For applications with preemptible pthreads, the regular rte ring's
  worst-case performance (i.e. one thread being preempted in the
  update_tail() critical section) is much worse than the lock-free ring's.
- Software caching can mitigate the average case performance for ring-based
  algorithms. For example, a lock-free ring based mempool (a likely use
  case for this ring) with per-thread caching.

To avoid the ABA problem, each ring entry contains a modification counter.
On a 64-bit architecture, the chance of ABA occurring are effectively zero;
a 64-bit counter will take many years to wrap at current CPU frequencies.
On a 32-bit architectures, a lock-free ring must be at least 1024-entries
deep; assuming 100 cycles per ring entry access, this guarantees the ring's
modification counters will wrap on the order of days.

The lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's
memsize is now a function of its flags (the lock-free ring requires 128b
for each entry), this commit adds a new argument ('flags') to
rte_ring_get_memsize(). An API deprecation notice will be sent in a
separate commit.

For ease-of-use, existing ring enqueue and dequeue functions work on both
regular and lock-free rings. This introduces an additional branch in the
datapath, but this should be a highly predictable branch.
ring_perf_autotest shows a negligible performance impact; it's hard to
distinguish a real difference versus system noise.

                                  | ring_perf_autotest cycles with branch -
             Test                 |   ring_perf_autotest cycles without
------------------------------------------------------------------
SP/SC single enq/dequeue          | 0.33
MP/MC single enq/dequeue          | -4.00
SP/SC burst enq/dequeue (size 8)  | 0.00
MP/MC burst enq/dequeue (size 8)  | 0.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | 0.00
SC empty dequeue                  | 1.00
MC empty dequeue                  | 0.00

Single lcore:
SP/SC bulk enq/dequeue (size 8)   | 0.49
MP/MC bulk enq/dequeue (size 8)   | 0.08
SP/SC bulk enq/dequeue (size 32)  | 0.07
MP/MC bulk enq/dequeue (size 32)  | 0.09

Two physical cores:
SP/SC bulk enq/dequeue (size 8)   | 0.19
MP/MC bulk enq/dequeue (size 8)   | -0.37
SP/SC bulk enq/dequeue (size 32)  | 0.09
MP/MC bulk enq/dequeue (size 32)  | -0.05

Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8)   | -1.96
MP/MC bulk enq/dequeue (size 8)   | 0.88
SP/SC bulk enq/dequeue (size 32)  | 0.10
MP/MC bulk enq/dequeue (size 32)  | 0.46

Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. Each test run three
times and the results averaged.

Signed-off-by: Gage Eads <gage.eads at intel.com>
---
 lib/librte_ring/rte_ring.c           |  92 +++++++--
 lib/librte_ring/rte_ring.h           | 308 ++++++++++++++++++++++++++---
 lib/librte_ring/rte_ring_c11_mem.h   | 366 ++++++++++++++++++++++++++++++++++-
 lib/librte_ring/rte_ring_generic.h   | 354 +++++++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_version.map |   7 +
 5 files changed, 1080 insertions(+), 47 deletions(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..d4a176f57 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
 {
-	ssize_t sz;
+	ssize_t sz, elt_sz;
 
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	elt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *);
+
+	sz = sizeof(struct rte_ring) + count * elt_sz;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+					       unsigned int flags),
+		  rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+	return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 			  RTE_CACHE_LINE_MASK) != 0);
 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
 			  RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) !=
+			 2 * sizeof(void *));
 
 	/* init the ring structure */
 	memset(r, 0, sizeof(*r));
@@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
-	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
 
 	if (flags & RING_F_EXACT_SZ) {
 		r->size = rte_align32pow2(count + 1);
@@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 		r->mask = count - 1;
 		r->capacity = r->mask;
 	}
-	r->prod.head = r->cons.head = 0;
-	r->prod.tail = r->cons.tail = 0;
+
+	r->log2_size = rte_log2_u64(r->size);
+
+	if (flags & RING_F_LF) {
+		uint32_t i;
+
+		r->prod_ptr.single =
+			(flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons_ptr.single =
+			(flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod_ptr.head = r->cons_ptr.head = 0;
+		r->prod_ptr.tail = r->cons_ptr.tail = 0;
+
+		for (i = 0; i < r->size; i++) {
+			struct rte_ring_lf_entry *ring_ptr, *base;
+
+			base = (struct rte_ring_lf_entry *)&r->ring;
+
+			ring_ptr = &base[i & r->mask];
+
+			ring_ptr->cnt = 0;
+		}
+	} else {
+		r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod.head = r->cons.head = 0;
+		r->prod.tail = r->cons.tail = 0;
+	}
 
 	return 0;
 }
 
+/* If a ring entry is written on average every M cycles, then a ring entry is
+ * reused every M*count cycles, and a ring entry's counter repeats every
+ * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's
+ * counters would repeat every 2.37 days. The likelihood of ABA occurring is
+ * considered sufficiently low for 1024-entry and larger rings.
+ */
+#define MIN_32_BIT_LF_RING_SIZE 1024
+
 /* create the ring */
 struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
@@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
+#ifdef RTE_ARCH_64
+#if !defined(RTE_ARCH_X86_64)
+	printf("This platform does not support the atomic operation required for RING_F_LF\n");
+	rte_errno = EINVAL;
+	return NULL;
+#endif
+#else
+	if ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) {
+		printf("RING_F_LF is only supported on 32-bit platforms for rings with at least 1024 entries.\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+#endif
+
 	/* for an exact size ring, round up from count to a power of two */
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize(count, flags);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  flags=%x\n", r->flags);
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
-	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	if (r->flags & RING_F_LF) {
+		fprintf(f, "  ct=%"PRIuPTR"\n", r->cons_ptr.tail);
+		fprintf(f, "  ch=%"PRIuPTR"\n", r->cons_ptr.head);
+		fprintf(f, "  pt=%"PRIuPTR"\n", r->prod_ptr.tail);
+		fprintf(f, "  ph=%"PRIuPTR"\n", r->prod_ptr.head);
+	} else {
+		fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
+		fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+		fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
+		fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	}
 	fprintf(f, "  used=%u\n", rte_ring_count(r));
 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index f16d77b8a..200d7b2a0 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -20,7 +20,7 @@
  *
  * - FIFO (First In First Out)
  * - Maximum size is fixed; the pointers are stored in a table.
- * - Lockless implementation.
+ * - Lockless (and optionally, non-blocking/lock-free) implementation.
  * - Multi- or single-consumer dequeue.
  * - Multi- or single-producer enqueue.
  * - Bulk dequeue.
@@ -98,6 +98,7 @@ struct rte_ring {
 	const struct rte_memzone *memzone;
 			/**< Memzone, if any, containing the rte_ring */
 	uint32_t size;           /**< Size of ring. */
+	uint32_t log2_size;      /**< log2(size of ring) */
 	uint32_t mask;           /**< Mask (size-1) of ring. */
 	uint32_t capacity;       /**< Usable size of ring */
 
@@ -133,6 +134,18 @@ struct rte_ring {
  */
 #define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses lock-free enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * lock-free functions have worse average-case performance than their regular
+ * rte ring counterparts. When used as the handler for a mempool, per-thread
+ * caching can mitigate the performance difference by reducing the number (and
+ * contention) of ring accesses.
+ *
+ * This flag is only supported on 32-bit and x86_64 platforms.
+ */
+#define RING_F_LF 0x0008
 
 /* @internal defines for passing to the enqueue dequeue worker functions */
 #define __IS_SP 1
@@ -150,11 +163,15 @@ struct rte_ring {
  *
  * @param count
  *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   The flags the ring will be created with.
  * @return
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
 
 /**
  * Initialize a ring structure.
@@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   0 on success, or a negative value on error.
  */
@@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   On success, the pointer to the new allocated ring. NULL on error with
  *    rte_errno set appropriately. Possible errno values include:
  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
  *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - EINVAL - count provided is not a power of 2
+ *    - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an
+ *      unsupported platform
  *    - ENOSPC - the maximum number of memzones has already been allocated
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
@@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual enqueue of pointers on the lock-free ring, used by the
+ * single-producer lock-free enqueue function.
+ */
+#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	size_t idx = prod_head & (r)->mask; \
+	size_t new_cnt = prod_head + size; \
+	struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+	unsigned int mask = ~0x3; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & mask); i += 4, idx += 4) { \
+			ring[idx].ptr = obj_table[i]; \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx + 1].ptr = obj_table[i + 1]; \
+			ring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \
+			ring[idx + 2].ptr = obj_table[i + 2]; \
+			ring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \
+			ring[idx + 3].ptr = obj_table[i + 3]; \
+			ring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 2: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 1: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) { \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+		for (idx = 0; i < n; i++, idx++) {    \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+	} \
+} while (0)
+
 /* the actual copy of pointers on the ring to obj_table.
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
@@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual copy of pointers on the lock-free ring to obj_table. */
+#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	size_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+	unsigned int mask = ~0x3; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & mask); i += 4, idx += 4) {\
+			obj_table[i] = ring[idx].ptr; \
+			obj_table[i + 1] = ring[idx + 1].ptr; \
+			obj_table[i + 2] = ring[idx + 2].ptr; \
+			obj_table[i + 3] = ring[idx + 3].ptr; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 2: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 1: \
+			obj_table[i++] = ring[idx++].ptr; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+	} \
+} while (0)
+
+
+/* @internal 128-bit structure used by the lock-free ring */
+struct rte_ring_lf_entry {
+	void *ptr; /**< Data pointer */
+	uintptr_t cnt; /**< Modification counter */
+};
+
 /* Between load and load. there might be cpu reorder in weak model
  * (powerpc/arm).
  * There are 2 choices for the users
@@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 #endif
 
 /**
+ * @internal Enqueue several objects on the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, enum rte_ring_queue_behavior behavior,
+			 unsigned int is_sp, unsigned int *free_space)
+{
+	if (is_sp)
+		return __rte_ring_do_lf_enqueue_sp(r, obj_table, n,
+						   behavior, free_space);
+	else
+		return __rte_ring_do_lf_enqueue_mp(r, obj_table, n,
+						   behavior, free_space);
+}
+
+/**
+ * @internal Dequeue several objects from the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table,
+		 unsigned int n, enum rte_ring_queue_behavior behavior,
+		 unsigned int is_sc, unsigned int *available)
+{
+	if (is_sc)
+		return __rte_ring_do_lf_dequeue_sc(r, obj_table, n,
+						   behavior, available);
+	else
+		return __rte_ring_do_lf_dequeue_mc(r, obj_table, n,
+						   behavior, available);
+}
+
+/**
  * @internal Enqueue several objects on the ring
  *
   * @param r
@@ -436,8 +607,14 @@ static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MP,
+					     free_space);
 }
 
 /**
@@ -459,8 +636,14 @@ static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SP,
+					     free_space);
 }
 
 /**
@@ -486,8 +669,14 @@ static __rte_always_inline unsigned int
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->prod_ptr.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -570,8 +759,14 @@ static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MC,
+					     available);
 }
 
 /**
@@ -594,8 +789,14 @@ static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SC,
+					     available);
 }
 
 /**
@@ -621,8 +822,14 @@ static __rte_always_inline unsigned int
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-				r->cons.single, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->cons_ptr.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->cons.single, available);
 }
 
 /**
@@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	uint32_t count = (prod_tail - cons_tail) & r->mask;
+	uint32_t count;
+
+	if (r->flags & RING_F_LF)
+		count = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask;
+	else
+		count = (r->prod.tail - r->cons.tail) & r->mask;
+
 	return (count > r->capacity) ? r->capacity : count;
 }
 
@@ -819,8 +1030,14 @@ static __rte_always_inline unsigned
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MP, free_space);
 }
 
 /**
@@ -842,8 +1059,14 @@ static __rte_always_inline unsigned
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SP, free_space);
 }
 
 /**
@@ -869,8 +1092,14 @@ static __rte_always_inline unsigned
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->prod_ptr.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -897,8 +1126,14 @@ static __rte_always_inline unsigned
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MC, available);
 }
 
 /**
@@ -922,8 +1157,14 @@ static __rte_always_inline unsigned
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SC, available);
 }
 
 /**
@@ -949,9 +1190,14 @@ static __rte_always_inline unsigned
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->cons_ptr.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->cons.single, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 545caf257..a672d161e 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -221,8 +221,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
 		/* Ensure the head is read before tail */
 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-		/* load-acquire synchronize with store-release of ht->tail
-		 * in update_tail.
+		/* load-acquire synchronize with store-release of tail in
+		 * __rte_ring_do_lf_dequeue_{sc, mc}.
 		 */
 		cons_tail = __atomic_load_n(&r->cons_ptr.tail,
 					__ATOMIC_ACQUIRE);
@@ -247,6 +247,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
 					0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
+
 	return n;
 }
 
@@ -293,8 +294,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 		/* Ensure the head is read before tail */
 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-		/* this load-acquire synchronize with store-release of ht->tail
-		 * in update_tail.
+		/* load-acquire synchronize with store-release of tail in
+		 * __rte_ring_do_lf_enqueue_{sp, mp}.
 		 */
 		prod_tail = __atomic_load_n(&r->prod_ptr.tail,
 					__ATOMIC_ACQUIRE);
@@ -318,6 +319,363 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 							0, __ATOMIC_RELAXED,
 							__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+	uint32_t free_entries;
+	uintptr_t head, next;
+
+	n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+					  &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+	__atomic_store_n(&r->prod_ptr.tail,
+			 r->prod_ptr.tail + n,
+			 __ATOMIC_RELEASE);
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+	uintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+
+	if ((intptr_t)(idx - fresh) < 0)
+		idx = fresh; /* fresh is after idx, use it instead */
+	else
+		idx++; /* Continue with next slot */
+
+	return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+	volatile uintptr_t *loc = &r->prod_ptr.tail;
+	uintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+
+	do {
+		/* Check if the tail has already been updated. */
+		if ((intptr_t)(val - old) < 0)
+			return old;
+
+		/* Else val >= old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc, &old, val,
+					      1, __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+
+	return val;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+	RTE_SET_USED(r);
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(n);
+	RTE_SET_USED(behavior);
+	RTE_SET_USED(free_space);
+	printf("[%s()] RING_F_LF requires an experimental API."
+	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+	       , __func__);
+	return 0;
+#else
+	struct rte_ring_lf_entry *base;
+	uintptr_t head, next, tail;
+	unsigned int i;
+	uint32_t avail;
+
+	/* Atomically update the prod head to reserve n slots. The prod tail
+	 * is modified at the end of the function.
+	 */
+	n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+					  &head, &next, &avail);
+
+	tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+	head = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE);
+
+	if (unlikely(n == 0))
+		goto end;
+
+	base = (struct rte_ring_lf_entry *)&r->ring;
+
+	for (i = 0; i < n; i++) {
+		unsigned int retries = 0;
+		int success = 0;
+
+		/* Enqueue to the tail entry. If another thread wins the race,
+		 * retry with the new tail.
+		 */
+		do {
+			struct rte_ring_lf_entry old_value, new_value;
+			struct rte_ring_lf_entry *ring_ptr;
+
+			ring_ptr = &base[tail & r->mask];
+
+			old_value = *ring_ptr;
+
+			if (old_value.cnt != (tail >> r->log2_size)) {
+				/* This slot has already been used. Depending
+				 * on how far behind this thread is, either go
+				 * to the next slot or reload the tail.
+				 */
+				uintptr_t prev_tail;
+
+				prev_tail = (tail + r->size) >> r->log2_size;
+
+				if (old_value.cnt != prev_tail ||
+				    ++retries == ENQ_RETRY_LIMIT) {
+					/* This thread either fell 2+ laps
+					 * behind or hit the retry limit, so
+					 * reload the tail index.
+					 */
+					tail = __rte_ring_reload_tail(r, tail);
+					retries = 0;
+				} else {
+					/* Slot already used, try the next. */
+					tail++;
+
+				}
+
+				continue;
+			}
+
+			/* Found a free slot, try to enqueue next element. */
+			new_value.ptr = obj_table[i];
+			new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+			success = rte_atomic128_cmp_exchange(
+					(rte_int128_t *)ring_ptr,
+					(rte_int128_t *)&old_value,
+					(rte_int128_t *)&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#else
+			success = __atomic_compare_exchange(
+					(uint64_t *)ring_ptr,
+					&old_value,
+					&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#endif
+		} while (success == 0);
+
+		/* Only increment tail if the CAS succeeds, since it can
+		 * spuriously fail on some architectures.
+		 */
+		tail++;
+	}
+
+end:
+	tail = __rte_ring_lf_update_tail(r, tail);
+
+	if (free_space != NULL)
+		*free_space = avail - n;
+	return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+	prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE);
+
+	avail = prod_tail - cons_tail;
+
+	/* Set the actual entries for dequeue */
+	if (unlikely(avail < n))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	/* Use a read barrier and store-relaxed so we don't unnecessarily order
+	 * writes.
+	 */
+	rte_smp_rmb();
+
+	__atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED);
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+
+	do {
+		/* Load tail on every iteration to avoid spurious queue empty
+		 * situations.
+		 */
+		prod_tail = __atomic_load_n(&r->prod_ptr.tail,
+					    __ATOMIC_ACQUIRE);
+
+		avail = prod_tail - cons_tail;
+
+		/* Set the actual entries for dequeue */
+		if (unlikely(avail < n))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+		if (unlikely(n == 0))
+			goto end;
+
+		DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+		/* Use a read barrier and store-relaxed so we don't
+		 * unnecessarily order writes.
+		 */
+		rte_smp_rmb();
+
+	} while (!__atomic_compare_exchange_n(&r->cons_ptr.tail,
+					      &cons_tail, cons_tail + n,
+					      0, __ATOMIC_RELAXED,
+					      __ATOMIC_RELAXED));
+
+end:
+	if (available != NULL)
+		*available = avail - n;
+
 	return n;
 }
 
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index 6a0e1bbfb..944b353f4 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -297,4 +297,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 	return n;
 }
 
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+	uint32_t free_entries;
+	uintptr_t head, next;
+
+	n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+					  &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+	rte_smp_wmb();
+
+	r->prod_ptr.tail += n;
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+	uintptr_t fresh = r->prod_ptr.tail;
+
+	if ((intptr_t)(idx - fresh) < 0)
+		/* fresh is after idx, use it instead */
+		idx = fresh;
+	else
+		/* Continue with next slot */
+		idx++;
+
+	return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+	volatile uintptr_t *loc = &r->prod_ptr.tail;
+	uintptr_t old = *loc;
+
+	do {
+		/* Check if the tail has already been updated. */
+		if ((intptr_t)(val - old) < 0)
+			return old;
+
+		/* Else val >= old, need to update *loc */
+	} while (!__sync_bool_compare_and_swap(loc, old, val));
+
+	return val;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+	RTE_SET_USED(r);
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(n);
+	RTE_SET_USED(behavior);
+	RTE_SET_USED(free_space);
+	printf("[%s()] RING_F_LF requires an experimental API."
+	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+	       , __func__);
+	return 0;
+#else
+	struct rte_ring_lf_entry *base;
+	uintptr_t head, next, tail;
+	unsigned int i;
+	uint32_t avail;
+
+	/* Atomically update the prod head to reserve n slots. The prod tail
+	 * is modified at the end of the function.
+	 */
+	n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+					  &head, &next, &avail);
+
+	tail = r->prod_ptr.tail;
+
+	rte_smp_rmb();
+
+	head = r->cons_ptr.tail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	base = (struct rte_ring_lf_entry *)&r->ring;
+
+	for (i = 0; i < n; i++) {
+		unsigned int retries = 0;
+		int success = 0;
+
+		/* Enqueue to the tail entry. If another thread wins the race,
+		 * retry with the new tail.
+		 */
+		do {
+			struct rte_ring_lf_entry old_value, new_value;
+			struct rte_ring_lf_entry *ring_ptr;
+
+			ring_ptr = &base[tail & r->mask];
+
+			old_value = *ring_ptr;
+
+			if (old_value.cnt != (tail >> r->log2_size)) {
+				/* This slot has already been used. Depending
+				 * on how far behind this thread is, either go
+				 * to the next slot or reload the tail.
+				 */
+				uintptr_t prev_tail;
+
+				prev_tail = (tail + r->size) >> r->log2_size;
+
+				if (old_value.cnt != prev_tail ||
+				    ++retries == ENQ_RETRY_LIMIT) {
+					/* This thread either fell 2+ laps
+					 * behind or hit the retry limit, so
+					 * reload the tail index.
+					 */
+					tail = __rte_ring_reload_tail(r, tail);
+					retries = 0;
+				} else {
+					/* Slot already used, try the next. */
+					tail++;
+
+				}
+
+				continue;
+			}
+
+			/* Found a free slot, try to enqueue next element. */
+			new_value.ptr = obj_table[i];
+			new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+			success = rte_atomic128_cmp_exchange(
+					(rte_int128_t *)ring_ptr,
+					(rte_int128_t *)&old_value,
+					(rte_int128_t *)&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#else
+			uint64_t *old_ptr = (uint64_t *)&old_value;
+			uint64_t *new_ptr = (uint64_t *)&new_value;
+
+			success = rte_atomic64_cmpset(
+					(volatile uint64_t *)ring_ptr,
+					*old_ptr, *new_ptr);
+#endif
+		} while (success == 0);
+
+		/* Only increment tail if the CAS succeeds, since it can
+		 * spuriously fail on some architectures.
+		 */
+		tail++;
+	}
+
+end:
+
+	tail = __rte_ring_lf_update_tail(r, tail);
+
+	if (free_space != NULL)
+		*free_space = avail - n;
+	return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = r->cons_ptr.tail;
+
+	rte_smp_rmb();
+
+	prod_tail = r->prod_ptr.tail;
+
+	avail = prod_tail - cons_tail;
+
+	/* Set the actual entries for dequeue */
+	if (unlikely(avail < n))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	rte_smp_rmb();
+
+	r->cons_ptr.tail += n;
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = r->cons_ptr.tail;
+
+	do {
+		rte_smp_rmb();
+
+		/* Load tail on every iteration to avoid spurious queue empty
+		 * situations.
+		 */
+		prod_tail = r->prod_ptr.tail;
+
+		avail = prod_tail - cons_tail;
+
+		/* Set the actual entries for dequeue */
+		if (unlikely(avail < n))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+		if (unlikely(n == 0))
+			goto end;
+
+		DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	} while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail,
+					       cons_tail, cons_tail + n));
+
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
 #endif /* _RTE_RING_GENERIC_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..8969467af 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,10 @@ DPDK_2.2 {
 	rte_ring_free;
 
 } DPDK_2.0;
+
+DPDK_19.05 {
+	global:
+
+	rte_ring_get_memsize;
+
+} DPDK_2.2;
-- 
2.13.6



More information about the dev mailing list