[dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs

Honnappa Nagarahalli honnappa.nagarahalli at arm.com
Mon Feb 24 21:39:31 CET 2020


Add scatter gather APIs to avoid intermediate memcpy. Serial
dequeue APIs are added to support access to ring elements
before actual dequeue.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Reviewed-by: Gavin Hu <gavin.hu at arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
---
 lib/librte_ring/Makefile           |   1 +
 lib/librte_ring/meson.build        |   1 +
 lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
 lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_generic.h |  93 +++++++
 5 files changed, 610 insertions(+)
 create mode 100644 lib/librte_ring/rte_ring_elem_sg.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 917c560ad..824e4a9bb 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -17,6 +17,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
 					rte_ring_elem.h \
+					rte_ring_elem_sg.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f2f3ccc88..30115ad7c 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -4,6 +4,7 @@
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
 		'rte_ring_elem.h',
+		'rte_ring_elem_sg.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
 
diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 0fb73a337..dcae8bcc0 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -178,4 +178,102 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 	return n;
 }
 
+/**
+ * @internal This function updates the consumer head if there are no
+ * prior reserved elements on the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements to dequeue, i.e. how far should the head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head_serial(struct rte_ring *r, int is_sc,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *entries)
+{
+	unsigned int max = n;
+	uint32_t prod_tail;
+	uint32_t cons_tail;
+	int success;
+
+	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
+	do {
+		/* Restore n as it may change every loop */
+		n = max;
+
+		/* Load the cons.tail and ensure that it is the
+		 * same as cons.head. load-acquire synchronizes
+		 * with the store-release in update_tail.
+		 */
+		cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+		if (*old_head != cons_tail) {
+			rte_pause();
+			*old_head = __atomic_load_n(&r->cons.head,
+							__ATOMIC_RELAXED);
+			success = 0;
+			continue;
+		}
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		prod_tail = __atomic_load_n(&r->prod.tail,
+					__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = (prod_tail - *old_head);
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		*new_head = *old_head + n;
+		if (is_sc)
+			r->cons.head = *new_head, success = 1;
+		else
+			/* on failure, *old_head will be updated */
+			success = __atomic_compare_exchange_n(&r->cons.head,
+							old_head, *new_head,
+							0, __ATOMIC_RELAXED,
+							__ATOMIC_RELAXED);
+	} while (unlikely(success == 0));
+	return n;
+}
+
+/**
+ * @internal Discard reserved ring elements
+ *
+ * @param ht
+ *   A pointer to the ring's head-tail structure
+ */
+static __rte_always_inline void
+__rte_ring_revert_head(struct rte_ring_headtail *ht)
+{
+	__atomic_store_n(&ht->head, ht->tail, __ATOMIC_RELAXED);
+}
+
 #endif /* _RTE_RING_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_ring_elem_sg.h
new file mode 100644
index 000000000..a73f4fbfe
--- /dev/null
+++ b/lib/librte_ring/rte_ring_elem_sg.h
@@ -0,0 +1,417 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2020 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_ELEM_SG_H_
+#define _RTE_RING_ELEM_SG_H_
+
+/**
+ * @file
+ * RTE Ring with
+ * 1) user defined element size
+ * 2) scatter gather feature to copy objects to/from the ring
+ * 3) ability to reserve, consume/discard elements in the ring
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "rte_ring.h"
+#include "rte_ring_elem.h"
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
+	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	uint32_t idx = head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+
+	*dst1 = ring + idx;
+	*n1 = num;
+
+	if (idx + num > r->size) {
+		*n1 = num - (r->size - idx - 1);
+		*dst2 = ring;
+	}
+}
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
+	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	uint32_t idx = head & r->mask;
+	rte_int128_t *ring = (rte_int128_t *)&r[1];
+
+	*dst1 = ring + idx;
+	*n1 = num;
+
+	if (idx + num > r->size) {
+		*n1 = num - (r->size - idx - 1);
+		*dst2 = ring;
+	}
+}
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
+	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	if (esize == 8)
+		return __rte_ring_get_elem_addr_64(r, head,
+						num, dst1, n1, dst2);
+	else if (esize == 16)
+		return __rte_ring_get_elem_addr_128(r, head,
+						num, dst1, n1, dst2);
+	else {
+		uint32_t idx, scale, nr_idx;
+		uint32_t *ring = (uint32_t *)&r[1];
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		idx = head & r->mask;
+		nr_idx = idx * scale;
+
+		*dst1 = ring + nr_idx;
+		*n1 = num;
+
+		if (idx + num > r->size) {
+			*n1 = num - (r->size - idx - 1);
+			*dst2 = ring;
+		}
+	}
+}
+
+/**
+ * @internal Reserve ring elements to enqueue several objects on the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of elements to reserve in the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer reserve
+ * @param old_head
+ *   Producer's head index before reservation.
+ * @param new_head
+ *   Producer's head index after reservation.
+ * @param free_space
+ *   returns the amount of space after the reserve operation has finished.
+ *   It is not updated if the number of reserved elements is zero.
+ * @param dst1
+ *   Pointer to location in the ring to copy the data.
+ * @param n1
+ *   Number of elements to copy at dst1
+ * @param dst2
+ *   In case of ring wrap around, this pointer provides the location to
+ *   copy the remaining elements. The number of elements to copy at this
+ *   location is equal to (number of elements reserved - n1)
+ * @return
+ *   Actual number of elements reserved.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		unsigned int is_sp, unsigned int *old_head,
+		unsigned int *new_head, unsigned int *free_space,
+		void **dst1, unsigned int *n1, void **dst2)
+{
+	uint32_t free_entries;
+
+	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+			old_head, new_head, &free_entries);
+
+	if (n == 0)
+		goto end;
+
+	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
+
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+
+end:
+	return n;
+}
+
+/**
+ * @internal Consume previously reserved ring elements (for enqueue)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Producer's head index before reservation.
+ * @param new_head
+ *   Producer's head index after reservation.
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ */
+static __rte_always_inline void
+__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
+		unsigned int old_head, unsigned int new_head,
+		unsigned int is_sp)
+{
+	update_tail(&r->prod, old_head, new_head, is_sp, 1);
+}
+
+/**
+ * Reserve one element for enqueuing one object on a ring
+ * (multi-producers safe). Application must call
+ * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param old_head
+ *   Producer's head index before reservation. The same should be passed to
+ *   'rte_ring_mp_enqueue_elem_commit' function.
+ * @param new_head
+ *   Producer's head index after reservation. The same should be passed to
+ *   'rte_ring_mp_enqueue_elem_commit' function.
+ * @param free_space
+ *   Returns the amount of space after the reservation operation has finished.
+ *   It is not updated if the number of reserved elements is zero.
+ * @param dst
+ *   Pointer to location in the ring to copy the data.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to reserve; no element is reserved.
+ */
+static __rte_always_inline int
+rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *free_space, void **dst)
+{
+	unsigned int n;
+
+	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
+			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
+			free_space, dst, &n, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Consume previously reserved elements (for enqueue) in a ring
+ * (multi-producers safe). This API completes the enqueue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Producer's head index before reservation. This value was returned
+ *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
+ * @param new_head
+ *   Producer's head index after reservation. This value was returned
+ *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
+ */
+static __rte_always_inline void
+rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head,
+		unsigned int new_head)
+{
+	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0);
+}
+
+/**
+ * @internal Reserve elements to dequeue several objects on the ring.
+ * This function blocks if there are elements reserved already.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to reserve in the ring
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
+ *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param old_head
+ *   Consumer's head index before reservation.
+ * @param new_head
+ *   Consumer's head index after reservation.
+ * @param available
+ *   returns the number of remaining ring elements after the reservation
+ *   It is not updated if the number of reserved elements is zero.
+ * @param src1
+ *   Pointer to location in the ring to copy the data from.
+ * @param n1
+ *   Number of elements to copy from src1
+ * @param src2
+ *   In case of wrap around in the ring, this pointer provides the location
+ *   to copy the remaining elements from. The number of elements to copy from
+ *   this pointer is equal to (number of elements reserved - n1)
+ * @return
+ *   Actual number of elements reserved.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *available, void **src1, unsigned int *n1,
+		void **src2)
+{
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
+			old_head, new_head, &entries);
+
+	if (n == 0)
+		goto end;
+
+	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
+
+	if (available != NULL)
+		*available = entries - n;
+
+end:
+	return n;
+}
+
+/**
+ * @internal Consume previously reserved ring elements (for dequeue)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Consumer's head index before reservation.
+ * @param new_head
+ *   Consumer's head index after reservation.
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ */
+static __rte_always_inline void
+__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
+		unsigned int old_head, unsigned int new_head,
+		unsigned int is_sc)
+{
+	update_tail(&r->cons, old_head, new_head, is_sc, 1);
+}
+
+/**
+ * Reserve one element on a ring for dequeue. This function blocks if there
+ * are elements reserved already. Application must call
+ * 'rte_ring_do_dequeue_elem_commit' or
+ * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param old_head
+ *   Consumer's head index before reservation. The same should be passed to
+ *   'rte_ring_dequeue_elem_commit' function.
+ * @param new_head
+ *   Consumer's head index after reservation. The same should be passed to
+ *   'rte_ring_dequeue_elem_commit' function.
+ * @param available
+ *   returns the number of remaining ring elements after the reservation
+ *   It is not updated if the number of reserved elements is zero.
+ * @param src
+ *   Pointer to location in the ring to copy the data from.
+ * @return
+ *   - 0: Success; elements reserved
+ *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
+ */
+static __rte_always_inline int
+rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *available, void **src)
+{
+	unsigned int n;
+
+	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
+			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
+			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Consume previously reserved elements (for dequeue) in a ring
+ * (multi-consumer safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Consumer's head index before reservation. This value was returned
+ *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
+ * @param new_head
+ *   Consumer's head index after reservation. This value was returned
+ *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
+ */
+static __rte_always_inline void
+rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
+		unsigned int new_head)
+{
+	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
+						r->cons.single);
+}
+
+/**
+ * Discard previously reserved elements (for dequeue) in a ring.
+ *
+ * @warning
+ * This API can be called only if the ring elements were reserved
+ * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ */
+static __rte_always_inline void
+rte_ring_dequeue_elem_revert_serial(struct rte_ring *r)
+{
+	__rte_ring_revert_head(&r->cons);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_ELEM_SG_H_ */
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index 953cdbbd5..8d7a7ffcc 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 	return n;
 }
 
+/**
+ * @internal This function updates the consumer head if there are no
+ * prior reserved elements on the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to dequeue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *entries)
+{
+	unsigned int max = n;
+	int success;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = max;
+
+		*old_head = r->cons.head;
+
+		/* add rmb barrier to avoid load/load reorder in weak
+		 * memory model. It is noop on x86
+		 */
+		rte_smp_rmb();
+
+		/* Ensure that cons.tail and cons.head are the same */
+		if (*old_head != r->cons.tail) {
+			rte_pause();
+
+			success = 0;
+			continue;
+		}
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = (r->prod.tail - *old_head);
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		*new_head = *old_head + n;
+		if (is_sc) {
+			r->cons.head = *new_head;
+			rte_smp_rmb();
+			success = 1;
+		} else {
+			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+					*new_head);
+		}
+	} while (unlikely(success == 0));
+	return n;
+}
+
+/**
+ * @internal This function updates the head to match the tail
+ *
+ * @param ht
+ *   A pointer to the ring's head-tail structure
+ */
+static __rte_always_inline void
+__rte_ring_revert_head(struct rte_ring_headtail *ht)
+{
+	/* Discard the reserved ring elements. */
+	ht->head = ht->tail;
+}
+
 #endif /* _RTE_RING_GENERIC_H_ */
-- 
2.17.1



More information about the dev mailing list