[dpdk-dev] [RFC,v2] lfring: lock-free ring buffer

Ola Liljedahl Ola.Liljedahl at arm.com
Wed Jan 30 17:36:17 CET 2019


Lock-free MP/MC ring buffer with SP/SC options.
The ring buffer metadata only maintains one set of head and tail
indexes. Tail is updated on enqueue, head is updated on dequeue.
The ring slots between head and tail always contains valid (unconsumed)
slots.
Each ring slot consists of a struct of data pointer ("ptr") and lap
counter ("lap"). Slot.lap is only updated on enqueue with the new lap
counter (ring index / ring size). The slot lap conuter is used to ensure
that slots are written sequentially (important for FIFO ordering). It is
also used to synchronise multiple producers.

MP enqueue scans the ring from the tail until head+size, looking for an
empty slot as indicated by slot.lap equaling the lap counter of
the previous lap ((index - ring size) / ring size). The empty slot is
written (.ptr = data, .lap = index / ring size) using CAS. If CAS fails,
some other thread wrote the slot and the thread continues with the next slot.
There is some logic to detect that the thread has fallen behind and should
reload the shared tail index instead of just trying the next slot.

When all elements have been enqueued or if the ring buffer is full, the
thread updates the ring buffer tail index (unless this has not already
been updated by some other thread). The test is using "sequence number
arithmetic". Thus this thread will help other threads that have written
ring slots but not yet updated the shared tail index.

If a slot with a lap counter different from current or previous lap is
found, it is assumed that the thread has fallen behind (e.g. been preempted)
and the local tail index is (conditionally) updated from the ring buffer
metadata in order to quickly catch up.

MP dequeue ensures at least N elements are available and reads the slots
from the shared head index and attempts to commit the operation using
CAS on the shared head index.

Enqueue N elements: N+1 CAS operations (but the last CAS operation might
not be necessary during contention as producers will help each other)
Dequeue N elements: 1 CAS operation

Single-threaded (SP/SC) enqueue and dequeue operations will inter-operate
with MT-safe (MP/MC) operations but it is the responsibility of the user
to ensure single-threaded operations are safe to use.

128-bit lock-free atomic compare exchange operations for AArch64
(ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
functions to find a different home and possibly a different API.
Note that a 'frail' version atomic_compare_exchange is implemented,
this means that in the case of failure, the old value returned is
not guaranteed to be atomically read (as this is not possible on ARMv8.0
without also writing to the location). Atomically read values are
often not necessary, it is a successful compare and exchange operation
which provides atomicity.

Changes from v1:
* Fix check-patch warnings
* Use lap counter instead of ring index in ring slots
* Support for 32-bit architectures
* Prepend '_' to internal functions in rte_lfring.h
* Change name of public MP/MC/SP/SC bulk enqueue/dequeue functions to
  match rte_ring pattern
* Perform only fixed amount of retries in enqueue before reloading
  shared ring buffer tail index to (hopefully) quicker catch up
* Reload shared ring buffer tail on every iteration in dequeue in order
  to avoid spurious queue empty situations
* Dequeue requires nelems element to be available in order to do bulk
  dequeue.
* Use rte_smp_rmb() + atomic store (or cas)-relaxed instead of store
  (or cas)-release in dequeue functions as we only read shared data.
  This avoids ordering stores to private data.

Signed-off-by: Ola Liljedahl <ola.liljedahl at arm.com>
---
 config/common_base                       |   5 +
 lib/Makefile                             |   2 +
 lib/librte_lfring/Makefile               |  22 ++
 lib/librte_lfring/lockfree16.h           | 143 +++++++
 lib/librte_lfring/meson.build            |   6 +
 lib/librte_lfring/rte_lfring.c           | 264 +++++++++++++
 lib/librte_lfring/rte_lfring.h           | 621 +++++++++++++++++++++++++++++++
 lib/librte_lfring/rte_lfring_version.map |  19 +
 8 files changed, 1082 insertions(+)
 create mode 100644 lib/librte_lfring/Makefile
 create mode 100644 lib/librte_lfring/lockfree16.h
 create mode 100644 lib/librte_lfring/meson.build
 create mode 100644 lib/librte_lfring/rte_lfring.c
 create mode 100644 lib/librte_lfring/rte_lfring.h
 create mode 100644 lib/librte_lfring/rte_lfring_version.map

diff --git a/config/common_base b/config/common_base
index 7c6da51..375f9c3 100644
--- a/config/common_base
+++ b/config/common_base
@@ -719,6 +719,11 @@ CONFIG_RTE_LIBRTE_PMD_IFPGA_RAWDEV=y
 CONFIG_RTE_LIBRTE_RING=y
 
 #
+# Compile librte_lfring
+#
+CONFIG_RTE_LIBRTE_LFRING=y
+
+#
 # Compile librte_mempool
 #
 CONFIG_RTE_LIBRTE_MEMPOOL=y
diff --git a/lib/Makefile b/lib/Makefile
index d6239d2..cd46339 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
 DEPDIRS-librte_pci := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring
 DEPDIRS-librte_ring := librte_eal
+DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring
+DEPDIRS-librte_lfring := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool
 DEPDIRS-librte_mempool := librte_eal librte_ring
 DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf
diff --git a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile
new file mode 100644
index 0000000..c04d2e9
--- /dev/null
+++ b/lib/librte_lfring/Makefile
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_lfring.a
+
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_lfring_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
new file mode 100644
index 0000000..199add9
--- /dev/null
+++ b/lib/librte_lfring/lockfree16.h
@@ -0,0 +1,143 @@
+//Copyright (c) 2018-2019, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#ifndef _LOCKFREE16_H
+#define _LOCKFREE16_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE)
+#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || \
+		     (mo) == __ATOMIC_ACQ_REL || \
+		     (mo) == __ATOMIC_SEQ_CST)
+
+#define MO_LOAD(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED)
+#define MO_STORE(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED)
+
+#if defined __aarch64__
+
+static inline __int128 ldx128(const __int128 *var, int mm)
+{
+	__int128 old;
+	if (mm == __ATOMIC_ACQUIRE)
+		__asm volatile("ldaxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("ldxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t stx128(__int128 *var, __int128 neu, int mm)
+{
+	uint32_t ret;
+	if (mm == __ATOMIC_RELEASE)
+		__asm volatile("stlxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("stxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return ret;
+}
+
+//Weak compare-exchange and non-atomic load on failure
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_success,
+				   int mo_failure)
+{
+	(void)mo_failure;//Ignore memory ordering for failure, memory order for
+	//success must be stronger or equal
+	int ldx_mo = MO_LOAD(mo_success);
+	int stx_mo = MO_STORE(mo_success);
+	register __int128 expected = *exp;
+	__asm __volatile("" ::: "memory");
+	//Atomicity of LDXP is not guaranteed
+	register __int128 old = ldx128(var, ldx_mo);
+	if (old == expected && !stx128(var, neu, stx_mo)) {
+		//Right value and STX succeeded
+		__asm __volatile("" ::: "memory");
+		return 1;
+	}
+	__asm __volatile("" ::: "memory");
+	//Wrong value or STX failed
+	*exp = old;//Old possibly torn value (OK for 'frail' flavour)
+	return 0;//Failure, *exp updated
+}
+
+#elif defined __x86_64__
+
+union u128 {
+	struct {
+		uint64_t lo, hi;
+	} s;
+	__int128 ui;
+};
+
+static inline bool
+cmpxchg16b(__int128 *src, union u128 *cmp, union u128 with)
+{
+	bool result;
+	__asm__ __volatile__
+		("lock cmpxchg16b %1\n\tsetz %0"
+		 : "=q" (result), "+m" (*src), "+d" (cmp->s.hi), "+a" (cmp->s.lo)
+		 : "c" (with.s.hi), "b" (with.s.lo)
+		 : "cc", "memory"
+		);
+	return result;
+}
+
+static inline bool
+lockfree_compare_exchange_16(register __int128 *var,
+			     __int128 *exp,
+			     register __int128 neu,
+			     bool weak,
+			     int mo_success,
+			     int mo_failure)
+{
+	(void)weak;
+	(void)mo_success;
+	(void)mo_failure;
+	union u128 cmp, with;
+	cmp.ui = *exp;
+	with.ui = neu;
+	if (cmpxchg16b(var, &cmp, with))
+		return true;
+	*exp = cmp.ui;
+	return false;
+}
+
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_s,
+				   int mo_f)
+{
+	return lockfree_compare_exchange_16(var, exp, neu, true, mo_s, mo_f);
+}
+
+#else
+
+#error Unsupported architecture
+
+#endif
+
+#endif
diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build
new file mode 100644
index 0000000..c8b90cb
--- /dev/null
+++ b/lib/librte_lfring/meson.build
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+version = 1
+sources = files('rte_lfring.c')
+headers = files('rte_lfring.h','lockfree16.h')
diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
new file mode 100644
index 0000000..e130f71
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.c
@@ -0,0 +1,264 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_spinlock.h>
+
+#include "rte_lfring.h"
+
+TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_lfring_tailq = {
+	.name = RTE_TAILQ_LFRING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_lfring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_lfring_get_memsize(unsigned int count)
+{
+	ssize_t sz;
+
+	/* count must be a power of 2 */
+	if ((!POWEROF2(count)) || (count > 0x80000000U)) {
+		RTE_LOG(ERR, RING,
+			"Requested size is invalid, must be power of 2, and "
+			"do not exceed the size limit %u\n", 0x80000000U);
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_lfring) + count * sizeof(struct rte_lfr_element);
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+int
+rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+		unsigned int flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_lfring) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* init the ring structure */
+	memset(r, 0, sizeof(*r));
+	ret = snprintf(r->name, sizeof(r->name), "%s", name);
+	if (ret < 0 || ret >= (int)sizeof(r->name))
+		return -ENAMETOOLONG;
+	r->flags = flags;
+
+	r->size = rte_align32pow2(count);
+	r->mask = r->size - 1;
+	r->log2 = rte_log2_u64(r->size);
+	r->head = 0;
+	r->tail = 0;
+	for (uint64_t i = 0; i < r->size; i++) {
+		r->ring[i].ptr = NULL;
+		r->ring[i].lap = (i - r->size) >> r->log2;
+	}
+
+	return 0;
+}
+
+/* create the ring */
+struct rte_lfring *
+rte_lfring_create(const char *name, unsigned int count, int socket_id,
+		  unsigned int flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_lfring *r;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	ssize_t ring_size;
+	int mz_flags = 0;
+	struct rte_lfring_list *ring_list = NULL;
+	const unsigned int requested_count = count;
+	int ret;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	count = rte_align32pow2(count);
+
+	ring_size = rte_lfring_get_memsize(count);
+	if (ring_size < 0) {
+		rte_errno = ring_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_LFRING_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* reserve a memory zone for this ring. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this this
+	 * function
+	 */
+	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
+					 mz_flags, __alignof__(*r));
+	if (mz != NULL) {
+		r = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above
+		 */
+		rte_lfring_init(r, name, requested_count, flags);
+
+		te->data = (void *) r;
+		r->memzone = mz;
+
+		TAILQ_INSERT_TAIL(ring_list, te, next);
+	} else {
+		r = NULL;
+		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+		rte_free(te);
+	}
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return r;
+}
+
+/* free the ring */
+void
+rte_lfring_free(struct rte_lfring *r)
+{
+	struct rte_lfring_list *ring_list = NULL;
+	struct rte_tailq_entry *te;
+
+	if (r == NULL)
+		return;
+
+	/*
+	 * Ring was not created with rte_lfring_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (r->memzone == NULL) {
+		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_lfring_create()");
+		return;
+	}
+
+	if (rte_memzone_free(r->memzone) != 0) {
+		RTE_LOG(ERR, RING, "Cannot free memory\n");
+		return;
+	}
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find out tailq entry */
+	TAILQ_FOREACH(te, ring_list, next) {
+		if (te->data == (void *) r)
+			break;
+	}
+
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(ring_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(te);
+}
+
+/* dump the status of the ring on the console */
+void
+rte_lfring_dump(FILE *f, const struct rte_lfring *r)
+{
+	fprintf(f, "ring <%s>@%p\n", r->name, r);
+	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  size=%"PRIu64"\n", r->size);
+	fprintf(f, "  tail=%"PRIu64"\n", r->tail);
+	fprintf(f, "  head=%"PRIu64"\n", r->head);
+	fprintf(f, "  used=%u\n", rte_lfring_count(r));
+	fprintf(f, "  avail=%u\n", rte_lfring_free_count(r));
+}
+
+/* dump the status of all rings on the console */
+void
+rte_lfring_list_dump(FILE *f)
+{
+	const struct rte_tailq_entry *te;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		rte_lfring_dump(f, (struct rte_lfring *) te->data);
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/* search a ring from its name */
+struct rte_lfring *
+rte_lfring_lookup(const char *name)
+{
+	struct rte_tailq_entry *te;
+	struct rte_lfring *r = NULL;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		r = (struct rte_lfring *) te->data;
+		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
+			break;
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return r;
+}
diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
new file mode 100644
index 0000000..22d3e1c
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.h
@@ -0,0 +1,621 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#ifndef _RTE_LFRING_H_
+#define _RTE_LFRING_H_
+
+/**
+ * @file
+ * RTE Lfring
+ *
+ * The Lfring Manager is a fixed-size queue, implemented as a table of
+ * (pointers + counters).
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lock-free implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "lockfree16.h"
+
+#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
+
+#define RTE_LFRING_MZ_PREFIX "RG_"
+/**< The maximum length of an lfring name. */
+#define RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+struct rte_lfr_element {
+	void *ptr;
+	uintptr_t lap;
+};
+
+#if __SIZEOF_POINTER__ == 8
+typedef __int128 dintptr_t;
+#elif __SIZEOF_POINTER__ == 4
+typedef uint64_t dintptr_t;
+#else
+#error
+#endif
+
+/**
+ * An RTE lfring structure.
+ *
+ */
+struct rte_lfring {
+	char name[RTE_LFRING_NAMESIZE] __rte_cache_aligned; /**< Name of the lfring. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_lfring */
+
+	char pad0 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by producer. */
+	uint64_t tail __rte_cache_aligned;
+	uint64_t size;	   /**< Size of ring. */
+	uint64_t mask;	   /**< Mask (size-1) of lfring. */
+	uint32_t log2;	   /**< log2 size. */
+	uint32_t flags;	   /**< Flags supplied at creation. */
+	char pad1 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by consumer. */
+	uint64_t head __rte_cache_aligned;
+	char pad2 __rte_cache_aligned; /**< empty cache line */
+
+	struct rte_lfr_element ring[] __rte_cache_aligned;
+};
+
+#define LFRING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define LFRING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+
+/**
+ * Calculate the memory size needed for a lfring
+ *
+ * This function returns the number of bytes needed for a lfring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_lfring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the lfring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_lfring_get_memsize(unsigned int count);
+
+/**
+ * Initialize a lfring structure.
+ *
+ * Initialize a lfring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the lfring structure and the
+ * object table. It is advised to use rte_lfring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The lfring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable lfring size is
+ * *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the lfring structure followed by the objects table.
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @param flags
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+		    unsigned int flags);
+
+/**
+ * Create a new lfring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_lfring_init() to initialize an empty lfring.
+ *
+ * The new lfring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable lfring size
+ * is *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is added in RTE_TAILQ_LFRING list.
+ *
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The size of the lfring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated lfring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_lfring *rte_lfring_create(const char *name, unsigned int count,
+				     int socket_id, unsigned int flags);
+/**
+ * De-allocate all memory used by the lfring.
+ *
+ * @param r
+ *   Lfring to free
+ */
+void rte_lfring_free(struct rte_lfring *r);
+
+/**
+ * Dump the status of the lfring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the lfring structure.
+ */
+void rte_lfring_dump(FILE *f, const struct rte_lfring *r);
+
+/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
+static __rte_always_inline bool
+_rte_lfring_before(uint64_t a, uint64_t b)
+{
+	return (int64_t)(a - b) < 0;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_update(uint64_t *loc, uint64_t neu)
+{
+	uint64_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	do {
+		if (_rte_lfring_before(neu, old)) {
+			/* neu < old */
+			return old;
+		}
+		/* Else neu > old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc,
+					      &old,
+					      neu,
+					      /*weak=*/true,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	return neu;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_reload(const uint64_t *loc, uint64_t idx)
+{
+	uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	if (_rte_lfring_before(idx, fresh)) {
+		/* fresh is after idx, use this instead */
+		idx = fresh;
+	} else {
+		/* Continue with next slot */
+		idx++;
+	}
+	return idx;
+}
+
+static __rte_always_inline void
+_rte_lfring_enq_elems(void *const *elems,
+		      struct rte_lfr_element *ring,
+		      uint64_t mask,
+		      uint32_t log2,
+		      uint64_t idx,
+		      uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++) {
+		ring0[i].ptr = *elems++;
+		ring0[i].lap = (idx++) >> log2;
+	}
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++) {
+			ring[i].ptr = *elems++;
+			ring[i].lap = (idx++) >> log2;
+		}
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sp_enqueue_bulk(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Single-producer */
+	uint64_t size = r->mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_ACQUIRE);
+	int64_t actual = RTE_MIN((int64_t)(head + size - tail), (int64_t)nelems);
+	if (actual <= 0)
+		return 0;
+	_rte_lfring_enq_elems(elems, r->ring, r->mask, r->log2, tail, actual);
+	__atomic_store_n(&r->tail, tail + actual, __ATOMIC_RELEASE);
+	if (free_space != NULL)
+		*free_space = head + size - (tail + actual);
+	return actual;
+}
+
+static inline bool
+lockfree_compare_exchange_frail(dintptr_t *loc,
+				dintptr_t *old,
+				dintptr_t neu,
+				int mo_success,
+				int mo_failure)
+{
+#if __SIZEOF_POINTER__ == 8
+	return lockfree_compare_exchange_16_frail(loc,
+						  old,
+						  neu,
+						  mo_success,
+						  mo_failure);
+#elif __SIZEOF_POINTER__ == 4
+	return lockfree_compare_exchange_8_frail(loc,
+						 old,
+						 neu,
+						 mo_success,
+						 mo_failure);
+#else
+#error
+#endif
+}
+
+#define ENQ_RETRY_LIMIT 32
+
+static __rte_always_inline unsigned int
+rte_lfring_mp_enqueue_bulk(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Lock-free multi-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t actual = 0;
+	uint32_t retries = 0;
+restart:
+	while (actual < nelems &&
+	       _rte_lfring_before(tail,
+			__atomic_load_n(&r->head, __ATOMIC_ACQUIRE)) + size) {
+		union {
+			struct rte_lfr_element e;
+			dintptr_t ui;
+		} old, neu;
+		struct rte_lfr_element *slot = &r->ring[tail & mask];
+		old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
+		old.e.lap = __atomic_load_n(&slot->lap, __ATOMIC_RELAXED);
+		do {
+			if (old.e.lap != (tail - size) >> r->log2) {
+				if (old.e.lap != tail >> r->log2 ||
+					++retries == ENQ_RETRY_LIMIT) {
+					/* We are far behind,
+					 * restart with fresh index
+					 */
+					tail = _rte_lfring_reload(&r->tail,
+								  tail);
+					retries = 0;
+					goto restart;
+				} else {
+					/* Slot already enqueued,
+					 * try next slot
+					 */
+					tail++;
+					goto restart;
+				}
+			}
+			/* Found slot that was used one lap back,
+			 * try to enqueue next element
+			 */
+			neu.e.ptr = elems[actual];
+			neu.e.lap = tail >> r->log2;/* Set lap on enqueue */
+		} while (!lockfree_compare_exchange_frail((dintptr_t *)slot,
+							  &old.ui,
+							  neu.ui,
+							  __ATOMIC_RELEASE,
+							  __ATOMIC_RELAXED));
+		/* Enqueue succeeded */
+		actual++;
+		/* Continue with next slot */
+		tail++;
+		retries = 0;
+	}
+	tail = _rte_lfring_update(&r->tail, tail);
+	if (free_space != NULL)
+		*free_space = __atomic_load_n(&r->head,
+					      __ATOMIC_RELAXED) + size - tail;
+	return actual;
+}
+
+/**
+ * Enqueue several objects on an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to enqueue on the lfring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the lfring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, between 0 and n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk(struct rte_lfring *r,
+			void * const *elems,
+			unsigned int nelems,
+			unsigned int *free_space)
+{
+	if (r->flags & LFRING_F_SP_ENQ)
+		return rte_lfring_sp_enqueue_bulk(r, elems, nelems, free_space);
+	else
+		return rte_lfring_mp_enqueue_bulk(r, elems, nelems, free_space);
+}
+
+static __rte_always_inline void
+_rte_lfring_deq_elems(void **elems,
+		      const struct rte_lfr_element *ring,
+		      uint64_t mask,
+		      uint64_t idx,
+		      uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	const struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++)
+		*elems++ = ring0[i].ptr;
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++)
+			*elems++ = ring[i].ptr;
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sc_dequeue_bulk(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Single-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+	if (unlikely(actual < nelems))
+		return 0;
+	_rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	/* Use RMB + STORE-RELAXED instead of STORE-RELEASE as we only
+	 * read shared data, no need to order writes.
+	 */
+	rte_smp_rmb();
+	__atomic_store_n(&r->head, head + actual, __ATOMIC_RELAXED);
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_mc_dequeue_bulk(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Lock-free multi-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail;
+	do {
+		/* Load tail on every iteration to avoid spurious queue
+		 * empty situations
+		 */
+		tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+		actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+		if (unlikely(actual < nelems))
+			return 0;
+		_rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+		/* Use RMB + CAS-RELAXED instead of CAS-RELEASE as we only
+		 * read shared data, no need to order writes.
+		 */
+	} while (!__atomic_compare_exchange_n(&r->head,
+					      &head,
+					      head + actual,
+					      /*weak*/false,
+					      __ATOMIC_RELAXED,
+					      __ATOMIC_RELAXED));
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+/**
+ * Dequeue several objects from an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to dequeue from the lfring to the obj_table.
+ * @param available
+ *   Returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   The number of objects dequeued, from 0 to n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk(struct rte_lfring *r,
+			void **elems,
+			unsigned int nelems,
+			unsigned int *available)
+{
+	if (r->flags & LFRING_F_SC_DEQ)
+		return rte_lfring_sc_dequeue_bulk(r, elems, nelems, available);
+	else
+		return rte_lfring_mc_dequeue_bulk(r, elems, nelems, available);
+}
+
+/**
+ * Return the number of entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_count(const struct rte_lfring *r)
+{
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t avail = tail - head;
+	return (int64_t)avail > 0 ? avail : 0;
+}
+
+/**
+ * Return the number of free entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of free entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_free_count(const struct rte_lfring *r)
+{
+	return r->size - rte_lfring_count(r);
+}
+
+/**
+ * Test if a lfring is full.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is full.
+ *   - 0: The lfring is not full.
+ */
+static inline int
+rte_lfring_full(const struct rte_lfring *r)
+{
+	return rte_lfring_free_count(r) == 0;
+}
+
+/**
+ * Test if a lfring is empty.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is empty.
+ *   - 0: The lfring is not empty.
+ */
+static inline int
+rte_lfring_empty(const struct rte_lfring *r)
+{
+	return rte_lfring_count(r) == 0;
+}
+
+/**
+ * Return the size of the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The size of the data store used by the lfring.
+ *   NOTE: this is not the same as the usable space in the lfring. To query that
+ *   use ``rte_lfring_get_capacity()``.
+ */
+static inline unsigned int
+rte_lfring_get_size(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The usable size of the lfring.
+ */
+static inline unsigned int
+rte_lfring_get_capacity(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Dump the status of all lfrings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+void rte_lfring_list_dump(FILE *f);
+
+/**
+ * Search a lfring from its name
+ *
+ * @param name
+ *   The name of the lfring.
+ * @return
+ *   The pointer to the lfring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+struct rte_lfring *rte_lfring_lookup(const char *name);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_LFRING_H_ */
diff --git a/lib/librte_lfring/rte_lfring_version.map b/lib/librte_lfring/rte_lfring_version.map
new file mode 100644
index 0000000..d935efd
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring_version.map
@@ -0,0 +1,19 @@
+DPDK_2.0 {
+	global:
+
+	rte_ring_create;
+	rte_ring_dump;
+	rte_ring_get_memsize;
+	rte_ring_init;
+	rte_ring_list_dump;
+	rte_ring_lookup;
+
+	local: *;
+};
+
+DPDK_2.2 {
+	global:
+
+	rte_ring_free;
+
+} DPDK_2.0;
-- 
2.7.4



More information about the dev mailing list