[dpdk-dev] [PATCH v2 9/9] ring: add C11 memory model for new sync modes

Konstantin Ananyev konstantin.ananyev at intel.com
Fri Apr 3 00:09:59 CEST 2020


Add C11 atomics based implementation for RTS and HTS
head/tail update primitivies.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
---
 lib/librte_ring/Makefile               |   4 +-
 lib/librte_ring/meson.build            |   2 +
 lib/librte_ring/rte_ring_hts.h         |   4 +
 lib/librte_ring/rte_ring_hts_c11_mem.h | 222 +++++++++++++++++++++++++
 lib/librte_ring/rte_ring_hts_elem.h    |   4 +
 lib/librte_ring/rte_ring_rts.h         |   4 +
 lib/librte_ring/rte_ring_rts_c11_mem.h | 198 ++++++++++++++++++++++
 lib/librte_ring/rte_ring_rts_elem.h    |   4 +
 8 files changed, 441 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_ring/rte_ring_hts_c11_mem.h
 create mode 100644 lib/librte_ring/rte_ring_rts_c11_mem.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 5f8662737..927d105bf 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -22,9 +22,11 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
 					rte_ring_hts.h \
 					rte_ring_hts_elem.h \
 					rte_ring_hts_generic.h \
+					rte_ring_hts_c11_mem.h \
 					rte_ring_peek.h \
 					rte_ring_rts.h \
 					rte_ring_rts_elem.h \
-					rte_ring_rts_generic.h
+					rte_ring_rts_generic.h \
+					rte_ring_rts_c11_mem.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f5f84dc6e..f2e37a8e4 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -7,10 +7,12 @@ headers = files('rte_ring.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h',
 		'rte_ring_hts.h',
+		'rte_ring_hts_c11_mem.h',
 		'rte_ring_hts_elem.h',
 		'rte_ring_hts_generic.h',
 		'rte_ring_peek.h',
 		'rte_ring_rts.h',
+		'rte_ring_rts_c11_mem.h',
 		'rte_ring_rts_elem.h',
 		'rte_ring_rts_generic.h')
 
diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h
index 062d7be6c..ddaa47ff1 100644
--- a/lib/librte_ring/rte_ring_hts.h
+++ b/lib/librte_ring/rte_ring_hts.h
@@ -29,7 +29,11 @@
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h
new file mode 100644
index 000000000..ce0f15f8f
--- /dev/null
+++ b/lib/librte_ring/rte_ring_hts_c11_mem.h
@@ -0,0 +1,222 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_HTS_C11_MEM_H_
+#define _RTE_RING_HTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_hts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for head/tail sync (HTS) ring mode.
+ * For more information please refer to <rte_ring_hts.h>.
+ */
+
+/**
+ * @internal get current tail value.
+ * Check that user didn't request to move tail above the head.
+ * In that situation:
+ * - return zero, that will cause abort any pending changes and
+ *   return head to its previous position.
+ * - throw an assert in debug mode.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail,
+	uint32_t num)
+{
+	uint32_t n;
+	union rte_ring_ht_pos p;
+
+	p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
+	n = p.pos.head - p.pos.tail;
+
+	RTE_ASSERT(n >= num);
+	num = (n >= num) ? num : 0;
+
+	*tail = p.pos.tail;
+	return num;
+}
+
+/**
+ * @internal set new values for head and tail as one atomic 64 bit operation.
+ * Should be used only in conjunction with __rte_ring_hts_get_tail.
+ */
+static __rte_always_inline void
+__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail,
+	uint32_t num, uint32_t enqueue)
+{
+	union rte_ring_ht_pos p;
+
+	RTE_SET_USED(enqueue);
+
+	p.pos.head = tail + num;
+	p.pos.tail = p.pos.head;
+
+	__atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE);
+}
+
+static __rte_always_inline void
+__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
+	uint32_t enqueue)
+{
+	uint32_t tail;
+
+	num = __rte_ring_hts_get_tail(ht, &tail, num);
+	__rte_ring_hts_set_head_tail(ht, tail, num, enqueue);
+}
+
+/**
+ * @internal waits till tail will become equal to head.
+ * Means no writer/reader is active for that ring.
+ * Suppose to work as serialization point.
+ */
+static __rte_always_inline void
+__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
+		union rte_ring_ht_pos *p)
+{
+	p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+
+	while (p->pos.head != p->pos.tail) {
+		rte_pause();
+		p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+	}
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	const uint32_t capacity = r->capacity;
+
+	do {
+		/* Reset n to the initial burst count */
+		n = num;
+
+		/* wait for tail to be equal to head, , acquire point */
+		__rte_ring_hts_head_wait(&r->hts_prod, &op);
+
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * and capacity (which is < size).
+		 */
+		*free_entries = capacity + r->cons.tail - op.pos.head;
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > *free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *free_entries;
+
+		if (n == 0)
+			return 0;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
+			&op.raw, np.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = num;
+
+		/* wait for tail to be equal to head */
+		__rte_ring_hts_head_wait(&r->hts_cons, &op);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = r->prod.tail - op.pos.head;
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
+			&op.raw, np.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+#endif /* _RTE_RING_HTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_hts_elem.h b/lib/librte_ring/rte_ring_hts_elem.h
index 34f0d121d..1e9a49c7a 100644
--- a/lib/librte_ring/rte_ring_hts_elem.h
+++ b/lib/librte_ring/rte_ring_hts_elem.h
@@ -24,7 +24,11 @@
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h
index 18404fe48..28b2d25f5 100644
--- a/lib/librte_ring/rte_ring_rts.h
+++ b/lib/librte_ring/rte_ring_rts.h
@@ -55,7 +55,11 @@
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the RTS ring.
diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h b/lib/librte_ring/rte_ring_rts_c11_mem.h
new file mode 100644
index 000000000..19d3ea288
--- /dev/null
+++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
@@ -0,0 +1,198 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_RTS_C11_MEM_H_
+#define _RTE_RING_RTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_rts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
+{
+	union rte_ring_ht_poscnt h, ot, nt;
+
+	/*
+	 * If there are other enqueues/dequeues in progress that
+	 * might preceded us, then don't update tail with new value.
+	 */
+
+	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
+
+	do {
+		/* on 32-bit systems we have to do atomic read here */
+		h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
+
+		nt.raw = ot.raw;
+		if (++nt.val.cnt == h.val.cnt)
+			nt.val.pos = h.val.pos;
+
+	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
+}
+
+/**
+ * @internal This function waits till head/tail distance wouldn't
+ * exceed pre-defined max value.
+ */
+static __rte_always_inline void
+__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
+	union rte_ring_ht_poscnt *h)
+{
+	uint32_t max;
+
+	max = ht->htd_max;
+	h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+
+	while (h->val.pos - ht->tail.val.pos > max) {
+		rte_pause();
+		h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+	}
+}
+
+/**
+ * @internal This function updates the producer head for enqueue.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	uint32_t n;
+	union rte_ring_ht_poscnt nh, oh;
+
+	const uint32_t capacity = r->capacity;
+
+	do {
+		/* Reset n to the initial burst count */
+		n = num;
+
+		/* read prod head (may spin on prod tail, acquire point) */
+		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
+
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * and capacity (which is < size).
+		 */
+		*free_entries = capacity + r->cons.tail - oh.val.pos;
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > *free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *free_entries;
+
+		if (n == 0)
+			return 0;
+
+		nh.val.pos = oh.val.pos + n;
+		nh.val.cnt = oh.val.cnt + 1;
+
+	} while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
+			&oh.raw, nh.raw,
+			0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = oh.val.pos;
+	return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *entries)
+{
+	uint32_t n;
+	union rte_ring_ht_poscnt nh, oh;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = num;
+
+		/* read cons head (may spin on cons tail, acquire point) */
+		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = r->prod.tail - oh.val.pos;
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		nh.val.pos = oh.val.pos + n;
+		nh.val.cnt = oh.val.cnt + 1;
+
+	} while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
+			&oh.raw, nh.raw,
+			1, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+	*old_head = oh.val.pos;
+	return n;
+}
+
+#endif /* _RTE_RING_RTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_rts_elem.h b/lib/librte_ring/rte_ring_rts_elem.h
index 71a331b23..23d8aeec7 100644
--- a/lib/librte_ring/rte_ring_rts_elem.h
+++ b/lib/librte_ring/rte_ring_rts_elem.h
@@ -24,7 +24,11 @@
 extern "C" {
 #endif
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif
 
 /**
  * @internal Enqueue several objects on the RTS ring.
-- 
2.17.1



More information about the dev mailing list