[dpdk-dev] [PATCH 2/3] mempool/nb_stack: add non-blocking stack mempool

Gage Eads gage.eads at intel.com
Thu Jan 10 21:55:37 CET 2019


This commit adds support for non-blocking (linked list based) stack mempool
handler. The stack uses a 128-bit compare-and-swap instruction, and thus is
limited to x86_64. The 128-bit CAS atomically updates the stack top pointer
and a modification counter, which protects against the ABA problem.

In mempool_perf_autotest the lock-based stack outperforms the non-blocking
handler*, however:
- For applications with preemptible pthreads, a lock-based stack's
  worst-case performance (i.e. one thread being preempted while
  holding the spinlock) is much worse than the non-blocking stack's.
- Using per-thread mempool caches will largely mitigate the performance
  difference.

*Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. The lock-based stack's
rate_persec was 1x-3.5x the non-blocking stack's.

Signed-off-by: Gage Eads <gage.eads at intel.com>
---
 MAINTAINERS                                        |   4 +
 config/common_base                                 |   1 +
 drivers/mempool/Makefile                           |   1 +
 drivers/mempool/nb_stack/Makefile                  |  30 +++++
 drivers/mempool/nb_stack/meson.build               |   4 +
 drivers/mempool/nb_stack/nb_lifo.h                 | 132 +++++++++++++++++++++
 drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125 +++++++++++++++++++
 .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
 mk/rte.app.mk                                      |   1 +
 9 files changed, 302 insertions(+)
 create mode 100644 drivers/mempool/nb_stack/Makefile
 create mode 100644 drivers/mempool/nb_stack/meson.build
 create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map

diff --git a/MAINTAINERS b/MAINTAINERS
index 470f36b9c..5519d3323 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andreev at oktetlabs.ru>
 M: Andrew Rybchenko <arybchenko at solarflare.com>
 F: drivers/mempool/bucket/
 
+Non-blocking stack memory pool
+M: Gage Eads <gage.eads at intel.com>
+F: drivers/mempool/nb_stack/
+
 
 Bus Drivers
 -----------
diff --git a/config/common_base b/config/common_base
index 964a6956e..40ce47312 100644
--- a/config/common_base
+++ b/config/common_base
@@ -728,6 +728,7 @@ CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
 CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
+CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
 
 #
 # Compile PMD for octeontx fpa mempool device
diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
index 28c2e8360..aeae3ac12 100644
--- a/drivers/mempool/Makefile
+++ b/drivers/mempool/Makefile
@@ -13,5 +13,6 @@ endif
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
 DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack
 
 include $(RTE_SDK)/mk/rte.subdir.mk
diff --git a/drivers/mempool/nb_stack/Makefile b/drivers/mempool/nb_stack/Makefile
new file mode 100644
index 000000000..38b45f4f5
--- /dev/null
+++ b/drivers/mempool/nb_stack/Makefile
@@ -0,0 +1,30 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# The non-blocking stack uses a 128-bit compare-and-swap instruction, and thus
+# is limited to x86_64.
+ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
+
+#
+# library name
+#
+LIB = librte_mempool_nb_stack.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+# Headers
+CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
+LDLIBS += -lrte_eal -lrte_mempool
+
+EXPORT_MAP := rte_mempool_nb_stack_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += rte_mempool_nb_stack.c
+
+endif
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/mempool/nb_stack/meson.build b/drivers/mempool/nb_stack/meson.build
new file mode 100644
index 000000000..66d64a9ba
--- /dev/null
+++ b/drivers/mempool/nb_stack/meson.build
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+sources = files('rte_mempool_nb_stack.c')
diff --git a/drivers/mempool/nb_stack/nb_lifo.h b/drivers/mempool/nb_stack/nb_lifo.h
new file mode 100644
index 000000000..701d75e37
--- /dev/null
+++ b/drivers/mempool/nb_stack/nb_lifo.h
@@ -0,0 +1,132 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _NB_LIFO_H_
+#define _NB_LIFO_H_
+
+struct nb_lifo_elem {
+	void *data;
+	struct nb_lifo_elem *next;
+};
+
+struct nb_lifo_head {
+	struct nb_lifo_elem *top; /**< Stack top */
+	uint64_t cnt; /**< Modification counter */
+};
+
+struct nb_lifo {
+	volatile struct nb_lifo_head head __rte_aligned(16);
+	rte_atomic64_t len;
+} __rte_cache_aligned;
+
+static __rte_always_inline void
+nb_lifo_init(struct nb_lifo *lifo)
+{
+	memset(lifo, 0, sizeof(*lifo));
+	rte_atomic64_set(&lifo->len, 0);
+}
+
+static __rte_always_inline unsigned int
+nb_lifo_len(struct nb_lifo *lifo)
+{
+	return (unsigned int) rte_atomic64_read(&lifo->len);
+}
+
+static __rte_always_inline void
+nb_lifo_push(struct nb_lifo *lifo,
+	     struct nb_lifo_elem *first,
+	     struct nb_lifo_elem *last,
+	     unsigned int num)
+{
+	while (1) {
+		struct nb_lifo_head old_head, new_head;
+
+		old_head = lifo->head;
+
+		/* Swing the top pointer to the first element in the list and
+		 * make the last element point to the old top.
+		 */
+		new_head.top = first;
+		new_head.cnt = old_head.cnt + 1;
+
+		last->next = old_head.top;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	rte_atomic64_add(&lifo->len, num);
+}
+
+static __rte_always_inline void
+nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem)
+{
+	nb_lifo_push(lifo, elem, elem, 1);
+}
+
+static __rte_always_inline struct nb_lifo_elem *
+nb_lifo_pop(struct nb_lifo *lifo,
+	    unsigned int num,
+	    void **obj_table,
+	    struct nb_lifo_elem **last)
+{
+	struct nb_lifo_head old_head;
+
+	/* Reserve num elements, if available */
+	while (1) {
+		uint64_t len = rte_atomic64_read(&lifo->len);
+
+		/* Does the list contain enough elements? */
+		if (len < num)
+			return NULL;
+
+		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
+					len, len - num))
+			break;
+	}
+
+	/* Pop num elements */
+	while (1) {
+		struct nb_lifo_head new_head;
+		struct nb_lifo_elem *tmp;
+		unsigned int i;
+
+		old_head = lifo->head;
+
+		tmp = old_head.top;
+
+		/* Traverse the list to find the new head. A next pointer will
+		 * either point to another element or NULL; if a thread
+		 * encounters a pointer that has already been popped, the CAS
+		 * will fail.
+		 */
+		for (i = 0; i < num && tmp != NULL; i++) {
+			if (obj_table)
+				obj_table[i] = tmp->data;
+			if (last)
+				*last = tmp;
+			tmp = tmp->next;
+		}
+
+		/* If NULL was encountered, the list was modified while
+		 * traversing it. Retry.
+		 */
+		if (i != num)
+			continue;
+
+		new_head.top = tmp;
+		new_head.cnt = old_head.cnt + 1;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	return old_head.top;
+}
+
+#endif /* _NB_LIFO_H_ */
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
new file mode 100644
index 000000000..1b30775f7
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
@@ -0,0 +1,125 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#include <stdio.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+#include "nb_lifo.h"
+
+struct rte_mempool_nb_stack {
+	uint64_t size;
+	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
+	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements */
+};
+
+static int
+nb_stack_alloc(struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s;
+	struct nb_lifo_elem *elems;
+	unsigned int n = mp->size;
+	unsigned int size, i;
+
+	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
+
+	/* Allocate our local memory structure */
+	s = rte_zmalloc_socket("mempool-nb_stack",
+			       size,
+			       RTE_CACHE_LINE_SIZE,
+			       mp->socket_id);
+	if (s == NULL) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
+		return -ENOMEM;
+	}
+
+	s->size = n;
+
+	nb_lifo_init(&s->used_lifo);
+	nb_lifo_init(&s->free_lifo);
+
+	elems = (struct nb_lifo_elem *) &s[1];
+	for (i = 0; i < n; i++)
+		nb_lifo_push_single(&s->free_lifo, &elems[i]);
+
+	mp->pool_data = s;
+
+	return 0;
+}
+
+static int
+nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last, *tmp;
+	unsigned int i;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n free elements */
+	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
+	if (unlikely(!first))
+		return -ENOBUFS;
+
+	/* Prepare the list elements */
+	tmp = first;
+	for (i = 0; i < n; i++) {
+		tmp->data = obj_table[i];
+		last = tmp;
+		tmp = tmp->next;
+	}
+
+	/* Enqueue them to the used list */
+	nb_lifo_push(&s->used_lifo, first, last, n);
+
+	return 0;
+}
+
+static int
+nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n used elements */
+	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
+	if (unlikely(!first))
+		return -ENOENT;
+
+	/* Enqueue the list elements to the free list */
+	nb_lifo_push(&s->free_lifo, first, last, n);
+
+	return 0;
+}
+
+static unsigned
+nb_stack_get_count(const struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+
+	return nb_lifo_len(&s->used_lifo);
+}
+
+static void
+nb_stack_free(struct rte_mempool *mp)
+{
+	rte_free((void *)(mp->pool_data));
+}
+
+static struct rte_mempool_ops ops_nb_stack = {
+	.name = "nb_stack",
+	.alloc = nb_stack_alloc,
+	.free = nb_stack_free,
+	.enqueue = nb_stack_enqueue,
+	.dequeue = nb_stack_dequeue,
+	.get_count = nb_stack_get_count
+};
+
+MEMPOOL_REGISTER_OPS(ops_nb_stack);
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
new file mode 100644
index 000000000..fc8c95e91
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
@@ -0,0 +1,4 @@
+DPDK_19.05 {
+
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 02e8b6f05..0b11d9417 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -133,6 +133,7 @@ ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
 
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK)  += -lrte_mempool_nb_stack
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa
 endif
-- 
2.13.6



More information about the dev mailing list