[dpdk-dev] [RFC] lfring: lock-free ring buffer

Ola Liljedahl Ola.Liljedahl at arm.com
Mon Jan 28 13:28:46 CET 2019


Lock-free MP/MC ring buffer with SP/SC options.
The ring buffer metadata only maintains one set of head and tail
pointers. Tail is updated on enqueue, head is updated on dequeue.
The ring slots between head and tail always contains valid (unconsumed)
slots.
Each ring slot consists of a struct of data pointer ("ptr") and ring
index ("idx"). Slot.idx is only updated on enqueue with the new ring
index. The slot index is used to ensure that slots are written
sequentially (important for FIFO ordering). It is also used to
synchronise multiple producers.

MP enqueue scans the ring from the tail until head+size, looking for an
empty slot as indicated by slot.idx equaling the ring index of
the previous lap (index - ring size). The empty slot is written (.ptr =
data, .idx = index) using CAS. If CAS fails, some other thread wrote
the slot and the thread continues with the next slot.

When all elements have been enqueued or if the ring buffer is full, the
thread updates the ring buffer tail pointer (unless this has not already
been updated by some other thread). Thus this thread will help other
threads that have written ring slots but not yet updated the tail pointer.

If a slot with an index different from current or previous lap is found,
it is assumed that the thread has fallen behind (e.g. been preempted)
and the local tail pointer is (conditionally) updated from the ring buffer
metadata in order to quickly catch up.

MP dequeue reads (up to) N slots from the head index and attempts to
commit the operation using CAS on the head pointer.

Enqueue N elements: N+1 CAS operations (but the last CAS operation might
not be necessary during contention as producers will help each other)
Dequeue N elements: 1 CAS operation

As the lock-free ring supports both MT-safe (MP/MC) and single-threaded
(SP/SC) operations, there is a question whether the modes should be
chosen only when creating a ring buffer or if the application can mix
MT-safe and single-threaded enqueue (or dequeue) calls. To follow the
pattern set by rte_ring, specific functions for MP and SP enqueue and
MC and SC dequeue are made available. The wisdom of this ability can
be questioned. The lock-free design allows threads to be blocked for
long periods of time without interfering with the operation of the ring
buffer but mixing (lock-free) MT-safe and single-threaded calls requires
that there are on such threads that wake up at the wrong moment (when a
single-threaded operation is in progress).

128-bit lock-free atomic compare exchange operations for AArch64
(ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
functions to find a different home and possibly a different API.
Note that a 'frail' version atomic_compare_exchange is implemented,
this means that in the case of failure, the old value returned is
not guaranteed to be atomically read (as this is not possible on ARMv8.0
without also writing to the location). Atomically read values are
often not necessary, it is a successful compare and exchange operation
which provides atomicity.

Signed-off-by: Ola Liljedahl <ola.liljedahl at arm.com>
---
 config/common_base                       |   5 +
 lib/Makefile                             |   2 +
 lib/librte_lfring/Makefile               |  22 ++
 lib/librte_lfring/lockfree16.h           | 144 ++++++++
 lib/librte_lfring/meson.build            |   6 +
 lib/librte_lfring/rte_lfring.c           | 261 ++++++++++++++
 lib/librte_lfring/rte_lfring.h           | 567 +++++++++++++++++++++++++++++++
 lib/librte_lfring/rte_lfring_version.map |  19 ++
 8 files changed, 1026 insertions(+)
 create mode 100644 lib/librte_lfring/Makefile
 create mode 100644 lib/librte_lfring/lockfree16.h
 create mode 100644 lib/librte_lfring/meson.build
 create mode 100644 lib/librte_lfring/rte_lfring.c
 create mode 100644 lib/librte_lfring/rte_lfring.h
 create mode 100644 lib/librte_lfring/rte_lfring_version.map

diff --git a/config/common_base b/config/common_base
index 7c6da51..375f9c3 100644
--- a/config/common_base
+++ b/config/common_base
@@ -719,6 +719,11 @@ CONFIG_RTE_LIBRTE_PMD_IFPGA_RAWDEV=y
 CONFIG_RTE_LIBRTE_RING=y
 
 #
+# Compile librte_lfring
+#
+CONFIG_RTE_LIBRTE_LFRING=y
+
+#
 # Compile librte_mempool
 #
 CONFIG_RTE_LIBRTE_MEMPOOL=y
diff --git a/lib/Makefile b/lib/Makefile
index d6239d2..cd46339 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
 DEPDIRS-librte_pci := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring
 DEPDIRS-librte_ring := librte_eal
+DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring
+DEPDIRS-librte_lfring := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool
 DEPDIRS-librte_mempool := librte_eal librte_ring
 DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf
diff --git a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile
new file mode 100644
index 0000000..c04d2e9
--- /dev/null
+++ b/lib/librte_lfring/Makefile
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_lfring.a
+
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_lfring_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
new file mode 100644
index 0000000..2550a35
--- /dev/null
+++ b/lib/librte_lfring/lockfree16.h
@@ -0,0 +1,144 @@
+//Copyright (c) 2018-2019, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#ifndef _LOCKFREE16_H
+#define _LOCKFREE16_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE)
+#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || (mo) == __ATOMIC_ACQ_REL || (mo) == __ATOMIC_SEQ_CST)
+
+#define MO_LOAD(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED)
+#define MO_STORE(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED)
+
+#if defined __aarch64__
+
+static inline __int128 ldx128(const __int128 *var, int mm)
+{
+	__int128 old;
+	if (mm == __ATOMIC_ACQUIRE)
+		__asm volatile("ldaxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("ldxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t stx128(__int128 *var, __int128 neu, int mm)
+{
+	uint32_t ret;
+	if (mm == __ATOMIC_RELEASE)
+		__asm volatile("stlxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("stxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return ret;
+}
+
+//Weak compare-exchange and non-atomic load on failure
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_success,
+				   int mo_failure)
+{
+	(void)mo_failure;//Ignore memory ordering for failure, memory order for
+	//success must be stronger or equal
+	int ldx_mo = MO_LOAD(mo_success);
+	int stx_mo = MO_STORE(mo_success);
+	register __int128 expected = *exp;
+	__asm __volatile("" ::: "memory");
+	//Atomicity of LDXP is not guaranteed
+	register __int128 old = ldx128(var, ldx_mo);
+	if (old == expected && !stx128(var, neu, stx_mo)) {
+		//Right value and STX succeeded
+		__asm __volatile("" ::: "memory");
+		return 1;
+	}
+	__asm __volatile("" ::: "memory");
+	//Wrong value or STX failed
+	*exp = old;//Old possibly torn value (OK for 'frail' flavour)
+	return 0;//Failure, *exp updated
+}
+
+#elif defined __x86_64__
+
+union u128
+{
+	struct {
+		uint64_t lo, hi;
+	} s;
+	__int128 ui;
+};
+
+static inline bool
+cmpxchg16b(__int128 *src, union u128 *cmp, union u128 with)
+{
+	bool result;
+	__asm__ __volatile__
+		("lock cmpxchg16b %1\n\tsetz %0"
+		 : "=q" (result), "+m" (*src), "+d" (cmp->s.hi), "+a" (cmp->s.lo)
+		 : "c" (with.s.hi), "b" (with.s.lo)
+		 : "cc", "memory"
+		);
+	return result;
+}
+
+static inline bool
+lockfree_compare_exchange_16(register __int128 *var,
+			     __int128 *exp,
+			     register __int128 neu,
+			     bool weak,
+			     int mo_success,
+			     int mo_failure)
+{
+	(void)weak;
+	(void)mo_success;
+	(void)mo_failure;
+	union u128 cmp, with;
+	cmp.ui = *exp;
+	with.ui = neu;
+	if (cmpxchg16b(var, &cmp, with)) {
+		return true;
+	} else {
+		*exp = cmp.ui;
+		return false;
+	}
+}
+
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_s,
+				   int mo_f)
+{
+	return lockfree_compare_exchange_16(var, exp, neu, true, mo_s, mo_f);
+}
+
+#else
+
+#error Unsupported architecture
+
+#endif
+
+#endif
diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build
new file mode 100644
index 0000000..c8b90cb
--- /dev/null
+++ b/lib/librte_lfring/meson.build
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+version = 1
+sources = files('rte_lfring.c')
+headers = files('rte_lfring.h','lockfree16.h')
diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
new file mode 100644
index 0000000..2e3f1c8
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.c
@@ -0,0 +1,261 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_spinlock.h>
+
+#include "rte_lfring.h"
+
+TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_lfring_tailq = {
+	.name = RTE_TAILQ_LFRING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_lfring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_lfring_get_memsize(unsigned count)
+{
+	ssize_t sz;
+
+	/* count must be a power of 2 */
+	if ((!POWEROF2(count)) || (count > 0x80000000U )) {
+		RTE_LOG(ERR, RING,
+			"Requested size is invalid, must be power of 2, and "
+			"do not exceed the size limit %u\n", 0x80000000U);
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_lfring) + count * sizeof(struct rte_lfr_element);
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+int
+rte_lfring_init(struct rte_lfring *r, const char *name, unsigned count,
+		unsigned flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_lfring) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* init the ring structure */
+	memset(r, 0, sizeof(*r));
+	ret = snprintf(r->name, sizeof(r->name), "%s", name);
+	if (ret < 0 || ret >= (int)sizeof(r->name))
+		return -ENAMETOOLONG;
+	r->flags = flags;
+
+	r->size = rte_align32pow2(count);
+	r->mask = r->size - 1;
+	r->head = 0;
+	r->tail = 0;
+	for (uint64_t i = 0; i < r->size; i++)
+	{
+	    r->ring[i].ptr = NULL;
+	    r->ring[i].idx = i - r->size;
+	}
+
+	return 0;
+}
+
+/* create the ring */
+struct rte_lfring *
+rte_lfring_create(const char *name, unsigned count, int socket_id,
+		  unsigned flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_lfring *r;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	ssize_t ring_size;
+	int mz_flags = 0;
+	struct rte_lfring_list* ring_list = NULL;
+	const unsigned int requested_count = count;
+	int ret;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	count = rte_align32pow2(count);
+
+	ring_size = rte_lfring_get_memsize(count);
+	if (ring_size < 0) {
+		rte_errno = ring_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_LFRING_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* reserve a memory zone for this ring. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this this function */
+	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
+					 mz_flags, __alignof__(*r));
+	if (mz != NULL) {
+		r = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above */
+		rte_lfring_init(r, name, requested_count, flags);
+
+		te->data = (void *) r;
+		r->memzone = mz;
+
+		TAILQ_INSERT_TAIL(ring_list, te, next);
+	} else {
+		r = NULL;
+		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+		rte_free(te);
+	}
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return r;
+}
+
+/* free the ring */
+void
+rte_lfring_free(struct rte_lfring *r)
+{
+	struct rte_lfring_list *ring_list = NULL;
+	struct rte_tailq_entry *te;
+
+	if (r == NULL)
+		return;
+
+	/*
+	 * Ring was not created with rte_lfring_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (r->memzone == NULL) {
+		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_lfring_create()");
+		return;
+	}
+
+	if (rte_memzone_free(r->memzone) != 0) {
+		RTE_LOG(ERR, RING, "Cannot free memory\n");
+		return;
+	}
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find out tailq entry */
+	TAILQ_FOREACH(te, ring_list, next) {
+		if (te->data == (void *) r)
+			break;
+	}
+
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(ring_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(te);
+}
+
+/* dump the status of the ring on the console */
+void
+rte_lfring_dump(FILE *f, const struct rte_lfring *r)
+{
+	fprintf(f, "ring <%s>@%p\n", r->name, r);
+	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  size=%"PRIu64"\n", r->size);
+	fprintf(f, "  tail=%"PRIu64"\n", r->tail);
+	fprintf(f, "  head=%"PRIu64"\n", r->head);
+	fprintf(f, "  used=%u\n", rte_lfring_count(r));
+	fprintf(f, "  avail=%u\n", rte_lfring_free_count(r));
+}
+
+/* dump the status of all rings on the console */
+void
+rte_lfring_list_dump(FILE *f)
+{
+	const struct rte_tailq_entry *te;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		rte_lfring_dump(f, (struct rte_lfring *) te->data);
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/* search a ring from its name */
+struct rte_lfring *
+rte_lfring_lookup(const char *name)
+{
+	struct rte_tailq_entry *te;
+	struct rte_lfring *r = NULL;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		r = (struct rte_lfring *) te->data;
+		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
+			break;
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return r;
+}
diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
new file mode 100644
index 0000000..f33bad6
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.h
@@ -0,0 +1,567 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#ifndef _RTE_LFRING_H_
+#define _RTE_LFRING_H_
+
+/**
+ * @file
+ * RTE Lfring
+ *
+ * The Lfring Manager is a fixed-size queue, implemented as a table of
+ * (pointers + counters).
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lock-free implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "lockfree16.h"
+
+#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
+
+#define RTE_LFRING_MZ_PREFIX "RG_"
+/**< The maximum length of an lfring name. */
+#define RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+struct rte_lfr_element
+{
+    void *ptr;
+    uintptr_t idx;
+};
+
+/**
+ * An RTE lfring structure.
+ *
+ */
+struct rte_lfring {
+	char name[RTE_LFRING_NAMESIZE] __rte_cache_aligned; /**< Name of the lfring. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_lfring */
+
+	char pad0 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by producer. */
+	uint64_t tail __rte_cache_aligned;
+	uint64_t size;	   /**< Size of ring. */
+	uint64_t mask;	   /**< Mask (size-1) of lfring. */
+	uint32_t flags;	   /**< Flags supplied at creation. */
+	char pad1 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by consumer. */
+	uint64_t head __rte_cache_aligned;
+	char pad2 __rte_cache_aligned; /**< empty cache line */
+
+	struct rte_lfr_element ring[] __rte_cache_aligned;
+};
+
+#define LFRING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define LFRING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+
+/**
+ * Calculate the memory size needed for a lfring
+ *
+ * This function returns the number of bytes needed for a lfring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_lfring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the lfring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_lfring_get_memsize(unsigned count);
+
+/**
+ * Initialize a lfring structure.
+ *
+ * Initialize a lfring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the lfring structure and the
+ * object table. It is advised to use rte_lfring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The lfring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable lfring size is
+ * *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the lfring structure followed by the objects table.
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @param flags
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned count,
+		    unsigned flags);
+
+/**
+ * Create a new lfring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_lfring_init() to initialize an empty lfring.
+ *
+ * The new lfring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable lfring size
+ * is *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is added in RTE_TAILQ_LFRING list.
+ *
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The size of the lfring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated lfring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_lfring *rte_lfring_create(const char *name, unsigned count,
+				     int socket_id, unsigned flags);
+/**
+ * De-allocate all memory used by the lfring.
+ *
+ * @param r
+ *   Lfring to free
+ */
+void rte_lfring_free(struct rte_lfring *r);
+
+/**
+ * Dump the status of the lfring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the lfring structure.
+ */
+void rte_lfring_dump(FILE *f, const struct rte_lfring *r);
+
+/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
+static __rte_always_inline bool
+rte_lfring_before(uint64_t a, uint64_t b)
+{
+    return (int64_t)(a - b) < 0;
+}
+
+static __rte_always_inline uint64_t
+rte_lfring_update(uint64_t *loc, uint64_t neu)
+{
+	uint64_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	do {
+		if (rte_lfring_before(neu, old)) {
+			/* neu < old */
+			return old;
+		}
+		/* Else neu > old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc,
+					      &old,
+					      neu,
+					      /*weak=*/true,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	return neu;
+}
+
+static __rte_always_inline uint64_t
+rte_lfring_reload(const uint64_t *loc, uint64_t idx)
+{
+	uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	if (rte_lfring_before(idx, fresh)) {
+		/* fresh is after idx, use this instead */
+		idx = fresh;
+	} else {
+		/* Continue with next slot */
+		idx++;
+	}
+	return idx;
+}
+
+static __rte_always_inline void
+rte_lfring_enq_elems(void *const *elems,
+		     struct rte_lfr_element *ring,
+		     uint64_t mask,
+		     uint64_t idx,
+		     uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++) {
+		ring0[i].ptr = *elems++;
+		ring0[i].idx = idx++;
+	}
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++) {
+			ring[i].ptr = *elems++;
+			ring[i].idx = idx++;
+		}
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk_sp(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Single-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_ACQUIRE);
+	int64_t actual = RTE_MIN((int64_t)(head + size - tail), (int64_t)nelems);
+	if (actual <= 0)
+		return 0;
+	rte_lfring_enq_elems(elems, r->ring, mask, tail, actual);
+	__atomic_store_n(&r->tail, tail + actual, __ATOMIC_RELEASE);
+	if (free_space != NULL)
+		*free_space = head + size - (tail + actual);
+	return actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk_mp(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Lock-free multi-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t actual = 0;
+restart:
+	while (actual < nelems &&
+	       rte_lfring_before(tail,
+			__atomic_load_n(&r->head, __ATOMIC_ACQUIRE)) + size) {
+		union {
+			struct rte_lfr_element e;
+			__int128 ui;
+		} old, neu;
+		struct rte_lfr_element *slot = &r->ring[tail & mask];
+		old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
+		old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
+		do {
+			if (old.e.idx != tail - size) {
+				if (old.e.idx != tail) {
+					/* We are far behind,
+					   restart with fresh index */
+					tail = rte_lfring_reload(&r->tail, tail);
+					goto restart;
+				} else {
+					/* Slot already enqueued,
+					   try next slot */
+					tail++;
+					goto restart;
+				}
+			}
+			/* Found slot that was used one lap back,
+			   try to enqueue next element */
+			neu.e.ptr = elems[actual];
+			neu.e.idx = tail;/* Set idx on enqueue */
+		} while (!lockfree_compare_exchange_16_frail((__int128 *)slot,
+							     &old.ui,
+							     neu.ui,
+							     __ATOMIC_RELEASE,
+							     __ATOMIC_RELAXED));
+		/* Enqueue succeeded */
+		actual++;
+		/* Continue with next slot */
+		tail++;
+	}
+	tail = rte_lfring_update(&r->tail, tail);
+	if (free_space != NULL)
+		*free_space = __atomic_load_n(&r->head, __ATOMIC_RELAXED) + size - tail;
+	return actual;
+}
+
+/**
+ * Enqueue several objects on an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to enqueue on the lfring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the lfring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, between 0 and n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk(struct rte_lfring *r,
+			void * const *elems,
+			unsigned int nelems,
+			unsigned int *free_space)
+{
+	if (r->flags & LFRING_F_SP_ENQ)
+		return rte_lfring_enqueue_bulk_sp(r, elems, nelems, free_space);
+	else
+		return rte_lfring_enqueue_bulk_mp(r, elems, nelems, free_space);
+}
+
+static __rte_always_inline void
+rte_lfring_deq_elems(void **elems,
+		     const struct rte_lfr_element *ring,
+		     uint64_t mask,
+		     uint64_t idx,
+		     uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	const struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++)
+		*elems++ = ring0[i].ptr;
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++)
+			*elems++ = ring[i].ptr;
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk_sc(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Single-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+	if (unlikely(actual <= 0))
+		return 0;
+	rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	__atomic_store_n(&r->head, head + actual, __ATOMIC_RELEASE);
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk_mc(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Lock-free multi-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	do {
+		actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+		if (unlikely(actual <= 0))
+			return 0;
+		rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	} while (!__atomic_compare_exchange_n(&r->head,
+					      &head,
+					      head + actual,
+					      /*weak*/false,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+/**
+ * Dequeue several objects from an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to dequeue from the lfring to the obj_table.
+ * @param available
+ *   Returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   The number of objects dequeued, from 0 to n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk(struct rte_lfring *r,
+			void **elems,
+			unsigned int nelems,
+			unsigned int *available)
+{
+	if (r->flags & LFRING_F_SC_DEQ)
+		return rte_lfring_dequeue_bulk_sc(r, elems, nelems, available);
+	else
+		return rte_lfring_dequeue_bulk_mc(r, elems, nelems, available);
+}
+
+/**
+ * Return the number of entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_count(const struct rte_lfring *r)
+{
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t avail = tail - head;
+	return (int64_t)avail > 0 ? avail : 0;
+}
+
+/**
+ * Return the number of free entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of free entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_free_count(const struct rte_lfring *r)
+{
+	return r->size - rte_lfring_count(r);
+}
+
+/**
+ * Test if a lfring is full.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is full.
+ *   - 0: The lfring is not full.
+ */
+static inline int
+rte_lfring_full(const struct rte_lfring *r)
+{
+	return rte_lfring_free_count(r) == 0;
+}
+
+/**
+ * Test if a lfring is empty.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is empty.
+ *   - 0: The lfring is not empty.
+ */
+static inline int
+rte_lfring_empty(const struct rte_lfring *r)
+{
+	return rte_lfring_count(r) == 0;
+}
+
+/**
+ * Return the size of the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The size of the data store used by the lfring.
+ *   NOTE: this is not the same as the usable space in the lfring. To query that
+ *   use ``rte_lfring_get_capacity()``.
+ */
+static inline unsigned int
+rte_lfring_get_size(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The usable size of the lfring.
+ */
+static inline unsigned int
+rte_lfring_get_capacity(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Dump the status of all lfrings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+void rte_lfring_list_dump(FILE *f);
+
+/**
+ * Search a lfring from its name
+ *
+ * @param name
+ *   The name of the lfring.
+ * @return
+ *   The pointer to the lfring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+struct rte_lfring *rte_lfring_lookup(const char *name);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_LFRING_H_ */
diff --git a/lib/librte_lfring/rte_lfring_version.map b/lib/librte_lfring/rte_lfring_version.map
new file mode 100644
index 0000000..d935efd
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring_version.map
@@ -0,0 +1,19 @@
+DPDK_2.0 {
+	global:
+
+	rte_ring_create;
+	rte_ring_dump;
+	rte_ring_get_memsize;
+	rte_ring_init;
+	rte_ring_list_dump;
+	rte_ring_lookup;
+
+	local: *;
+};
+
+DPDK_2.2 {
+	global:
+
+	rte_ring_free;
+
+} DPDK_2.0;
-- 
2.7.4



More information about the dev mailing list