[dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Feb 28 01:18:36 CET 2020


<snip>
> 
> 
> Hi Honnappa,
Thanks Konstantin for the comments.
> 
> > Add scatter gather APIs to avoid intermediate memcpy. Serial dequeue
> > APIs are added to support access to ring elements before actual
> > dequeue.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
> > ---
> >  lib/librte_ring/Makefile           |   1 +
> >  lib/librte_ring/meson.build        |   1 +
> >  lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
> > lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
> > lib/librte_ring/rte_ring_generic.h |  93 +++++++
> >  5 files changed, 610 insertions(+)
> 
> As was already noticed by you this patch overlaps quite a bit with another
> one:
> http://patches.dpdk.org/patch/66006/
I took a cursory look at this. I need to take a detailed look, plan to do so soon.

> 
> Though it seems there are few significant differences in our approaches (both
> API and implementation).
> So we probably need to come-up with some common view first, before
> moving forward with some unified version.
> To start a discussion, I produced some comments, pls see below.
> 
> I don't see changes in rte_ring.h itself, but I suppose that's just because it is an
> RFC and it would be added in later versions?
I did not plan to add them. IMO, we should not add new APIs to that list. We should encourage using the rte_ring_xxx_elem APIs should be used going forward. They are interoperable (I mean, the application can call a mix of APIs)

> Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED)
> mode, I suppose _burst_ will also be added in later versions?
Here, I was trying to avoid providing APIs without a clear need (_bulk_ is enough for RCU for now). If you see the need, I can add them.

> 
> > diff --git a/lib/librte_ring/rte_ring_elem_sg.h
> > b/lib/librte_ring/rte_ring_elem_sg.h
> > new file mode 100644
> > index 000000000..a73f4fbfe
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_elem_sg.h
> > @@ -0,0 +1,417 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2020 Arm Limited
> > + * Copyright (c) 2010-2017 Intel Corporation
> > + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_ELEM_SG_H_
> > +#define _RTE_RING_ELEM_SG_H_
> > +
> > +/**
> > + * @file
> > + * RTE Ring with
> > + * 1) user defined element size
> > + * 2) scatter gather feature to copy objects to/from the ring
> > + * 3) ability to reserve, consume/discard elements in the ring  */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <stdio.h>
> > +#include <stdint.h>
> > +#include <string.h>
> > +#include <sys/queue.h>
> > +#include <errno.h>
> > +#include <rte_common.h>
> > +#include <rte_config.h>
> > +#include <rte_memory.h>
> > +#include <rte_lcore.h>
> > +#include <rte_atomic.h>
> > +#include <rte_branch_prediction.h>
> > +#include <rte_memzone.h>
> > +#include <rte_pause.h>
> > +
> > +#include "rte_ring.h"
> > +#include "rte_ring_elem.h"
> > +
> > +/* Between load and load. there might be cpu reorder in weak model
> > + * (powerpc/arm).
> > + * There are 2 choices for the users
> > + * 1.use rmb() memory barrier
> > + * 2.use one-direction load_acquire/store_release barrier,defined by
> > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > + * It depends on performance test results.
> > + * By default, move common functions to rte_ring_generic.h  */ #ifdef
> > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > +#else
> > +#include "rte_ring_generic.h"
> > +#endif
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	uint64_t *ring = (uint64_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> > +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void
> > +**dst2) {
> > +	if (esize == 8)
> > +		return __rte_ring_get_elem_addr_64(r, head,
> > +						num, dst1, n1, dst2);
> > +	else if (esize == 16)
> > +		return __rte_ring_get_elem_addr_128(r, head,
> > +						num, dst1, n1, dst2);
> > +	else {
> > +		uint32_t idx, scale, nr_idx;
> > +		uint32_t *ring = (uint32_t *)&r[1];
> > +
> > +		/* Normalize to uint32_t */
> > +		scale = esize / sizeof(uint32_t);
> > +		idx = head & r->mask;
> > +		nr_idx = idx * scale;
> > +
> > +		*dst1 = ring + nr_idx;
> > +		*n1 = num;
> > +
> > +		if (idx + num > r->size) {
> > +			*n1 = num - (r->size - idx - 1);
> > +			*dst2 = ring;
> > +		}
> > +	}
> > +}
> > +
> > +/**
> > + * @internal Reserve ring elements to enqueue several objects on the
> > +ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of elements to reserve in the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a
> ring
> > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible
> from ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer reserve
> > + * @param old_head
> > + *   Producer's head index before reservation.
> > + * @param new_head
> > + *   Producer's head index after reservation.
> > + * @param free_space
> > + *   returns the amount of space after the reserve operation has finished.
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param dst1
> > + *   Pointer to location in the ring to copy the data.
> > + * @param n1
> > + *   Number of elements to copy at dst1
> > + * @param dst2
> > + *   In case of ring wrap around, this pointer provides the location to
> > + *   copy the remaining elements. The number of elements to copy at this
> > + *   location is equal to (number of elements reserved - n1)
> > + * @return
> > + *   Actual number of elements reserved.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int
> > +esize,
> 
> 
> I do understand the purpose of reserve, then either commit/abort for serial
> sync mode, but what is the purpose of non-serial version of reserve/commit?
In RCU, I have the need for scatter-gather feature. i.e. the data in the ring element is coming from multiple sources ('token' is generated by the RCU library and the application provides additional data). If I do not provide the reserve/commit, I need to introduce an intermediate memcpy to get these two data contiguously to copy to the ring element. The sequence is 'reserve(1), memcpy1, mempcy2, commit(1)'. Hence, you do not see the abort API for the enqueue.
 
> In serial  MP/MC case, after _reserve_(n) you always have to do
> _commit_(n) - you can't reduce number of elements, or do _abort_.
Agree, the intention here is to provide the scatter/gather feature.

> Again you cannot avoid memcpy(n) here anyhow.
> So what is the point of these functions for non-serial case?
It avoids an intermediate memcpy when the data is coming from multiple sources.

> 
> BTW, I think it would be good to have serial version of _enqueue_ too.
If there is a good use case, they should be provided. I did not come across a good use case.

> 
> > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > +		unsigned int is_sp, unsigned int *old_head,
> > +		unsigned int *new_head, unsigned int *free_space,
> > +		void **dst1, unsigned int *n1, void **dst2)
> 
> I do understand the intention to avoid memcpy(), but proposed API seems
> overcomplicated, error prone, and not very convenient for the user.
The issue is the need to handle the wrap around in ring storage array. i.e. when the space is reserved for more than 1 ring element, the wrap around might happen.

> I don't think that avoiding memcpy() will save us that many cycles here, so
This depends on the amount of data being copied.

> probably better to keep API model a bit more regular:
> 
> n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ...
> /* performs actual memcpy(), m<=n */
> rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
These do not take care of the wrap-around case or I am not able to understand your comment.

> 
> /* performs actual memcpy for num elems */ n =
> rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space); ....
> /* m<=n */
> rte_ring_mp_serial_dequeue_bulk_commit(ring, obj,  m);
> 
> Plus, we can have usual enqueue/dequeue API for serial sync mode:
> rte_ring_serial_(enqueue/dequeue)_(bulk/burst)
It would be good to understand the use cases. IMO, if we do not have use cases, we should not add for now. We can add them as and when the use cases are understood.

> 
> > +{
> > +	uint32_t free_entries;
> > +
> > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > +			old_head, new_head, &free_entries);
> > +
> > +	if (n == 0)
> > +		goto end;
> 
> Here and in other similar places, why not just 'return 0;'?
Yes, should be possible.

> 
> > +
> > +	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
> > +
> > +	if (free_space != NULL)
> > +		*free_space = free_entries - n;
> > +
> > +end:
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal Consume previously reserved ring elements (for enqueue)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Producer's head index before reservation.
> > + * @param new_head
> > + *   Producer's head index after reservation.
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer head
> update
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
> > +		unsigned int old_head, unsigned int new_head,
> > +		unsigned int is_sp)
> > +{
> > +	update_tail(&r->prod, old_head, new_head, is_sp, 1); }
> > +
> > +/**
> > + * Reserve one element for enqueuing one object on a ring
> > + * (multi-producers safe). Application must call
> > + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue
> operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param old_head
> > + *   Producer's head index before reservation. The same should be passed
> to
> > + *   'rte_ring_mp_enqueue_elem_commit' function.
> > + * @param new_head
> > + *   Producer's head index after reservation. The same should be passed to
> > + *   'rte_ring_mp_enqueue_elem_commit' function.
> > + * @param free_space
> > + *   Returns the amount of space after the reservation operation has
> finished.
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param dst
> > + *   Pointer to location in the ring to copy the data.
> > + * @return
> > + *   - 0: Success; objects enqueued.
> > + *   - -ENOBUFS: Not enough room in the ring to reserve; no element is
> reserved.
> > + */
> > +static __rte_always_inline int
> > +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *free_space, void **dst) {
> > +	unsigned int n;
> > +
> > +	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
> > +			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
> > +			free_space, dst, &n, NULL) ? 0 : -ENOBUFS; }
> > +
> > +/**
> > + * Consume previously reserved elements (for enqueue) in a ring
> > + * (multi-producers safe). This API completes the enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Producer's head index before reservation. This value was returned
> > + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> > + * @param new_head
> > + *   Producer's head index after reservation. This value was returned
> > + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int
> old_head,
> > +		unsigned int new_head)
> > +{
> > +	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); }
> > +
> > +/**
> > + * @internal Reserve elements to dequeue several objects on the ring.
> > + * This function blocks if there are elements reserved already.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of objects to reserve in the ring
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in
> a ring
> > + * @param is_sc
> > + *   Indicates whether to use single consumer or multi-consumer head
> update
> > + * @param old_head
> > + *   Consumer's head index before reservation.
> > + * @param new_head
> > + *   Consumer's head index after reservation.
> > + * @param available
> > + *   returns the number of remaining ring elements after the reservation
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param src1
> > + *   Pointer to location in the ring to copy the data from.
> > + * @param n1
> > + *   Number of elements to copy from src1
> > + * @param src2
> > + *   In case of wrap around in the ring, this pointer provides the location
> > + *   to copy the remaining elements from. The number of elements to copy
> from
> > + *   this pointer is equal to (number of elements reserved - n1)
> > + * @return
> > + *   Actual number of elements reserved.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
> > +		unsigned int esize, unsigned int n,
> > +		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *available, void **src1, unsigned int *n1,
> > +		void **src2)
> > +{
> > +	uint32_t entries;
> > +
> > +	n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
> > +			old_head, new_head, &entries);
> > +
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
> > +
> > +	if (available != NULL)
> > +		*available = entries - n;
> > +
> > +end:
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal Consume previously reserved ring elements (for dequeue)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Consumer's head index before reservation.
> > + * @param new_head
> > + *   Consumer's head index after reservation.
> > + * @param is_sc
> > + *   Indicates whether to use single consumer or multi-consumer head
> update
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
> > +		unsigned int old_head, unsigned int new_head,
> > +		unsigned int is_sc)
> > +{
> 
> I think it is a bit dangerous and error-prone approach to let user specify
> old_head/new_head manually.
old_head and new_head are local to the thread in the non-serial MP/MC case. Hence, they need to be returned back to the caller.

> Seems better just _commit(ring, num) - see above.
This would not work for non-serial cases.

> That way suer don't have to calculate new head mannualy,
I do not understand the 'calculate' part. The user has to provide the same values that were returned.

> plus we can have a check that ring.tail - ring.head >= num.
> 
> > +	update_tail(&r->cons, old_head, new_head, is_sc, 1);
> 
> I think update_tail() is not enough here.
> As in some cases we need to update  ring.head also:
> let say user reserved 2 elems, but then decided to commit only one.
This patch does not address that use case. Do you see use cases for this?

> So I think we need a special new function instead of update_tail() here.
> BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in
> one go.
> This is not really required - two 32bit ops would work too, I think.
> As usual, both ways have some pros and cons:
> using 64bit ops might be faster on 64-bit target, plus it is less error prone (no
> need to think about head/tail read/write ordering, etc.), though for 32-bit
> target it would mean some extra overhead.
> 
> > +}
> > +
> > +/**
> > + * Reserve one element on a ring for dequeue. This function blocks if
> > +there
> > + * are elements reserved already. Application must call
> > + * 'rte_ring_do_dequeue_elem_commit' or
> > + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue
> operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param old_head
> > + *   Consumer's head index before reservation. The same should be passed
> to
> > + *   'rte_ring_dequeue_elem_commit' function.
> > + * @param new_head
> > + *   Consumer's head index after reservation. The same should be passed to
> > + *   'rte_ring_dequeue_elem_commit' function.
> > + * @param available
> > + *   returns the number of remaining ring elements after the reservation
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param src
> > + *   Pointer to location in the ring to copy the data from.
> > + * @return
> > + *   - 0: Success; elements reserved
> > + *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
> > + */
> > +static __rte_always_inline int
> > +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int
> esize,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *available, void **src) {
> > +	unsigned int n;
> > +
> > +	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
> > +			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
> > +			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; }
> > +
> > +/**
> > + * Consume previously reserved elements (for dequeue) in a ring
> > + * (multi-consumer safe).
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Consumer's head index before reservation. This value was returned
> > + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> > + * @param new_head
> > + *   Consumer's head index after reservation. This value was returned
> > + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
> > +		unsigned int new_head)
> > +{
> > +	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
> > +						r->cons.single);
> > +}
> > +
> > +/**
> > + * Discard previously reserved elements (for dequeue) in a ring.
> > + *
> > + * @warning
> > + * This API can be called only if the ring elements were reserved
> > + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) {
> > +	__rte_ring_revert_head(&r->cons);
> > +}
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RING_ELEM_SG_H_ */
> > diff --git a/lib/librte_ring/rte_ring_generic.h
> > b/lib/librte_ring/rte_ring_generic.h
> > index 953cdbbd5..8d7a7ffcc 100644
> > --- a/lib/librte_ring/rte_ring_generic.h
> > +++ b/lib/librte_ring/rte_ring_generic.h
> > @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> unsigned int is_sc,
> >  	return n;
> >  }
> >
> > +/**
> > + * @internal This function updates the consumer head if there are no
> > + * prior reserved elements on the ring.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure
> > + * @param is_sc
> > + *   Indicates whether multi-consumer path is needed or not
> > + * @param n
> > + *   The number of elements we will want to dequeue, i.e. how far should
> the
> > + *   head be moved
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a
> ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> > + * @param old_head
> > + *   Returns head value as it was before the move, i.e. where dequeue
> starts
> > + * @param new_head
> > + *   Returns the current/new head value i.e. where dequeue finishes
> > + * @param entries
> > + *   Returns the number of entries in the ring BEFORE head was moved
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
> > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > +		uint32_t *old_head, uint32_t *new_head,
> > +		uint32_t *entries)
> > +{
> > +	unsigned int max = n;
> > +	int success;
> > +
> > +	/* move cons.head atomically */
> > +	do {
> > +		/* Restore n as it may change every loop */
> > +		n = max;
> > +
> > +		*old_head = r->cons.head;
> > +
> > +		/* add rmb barrier to avoid load/load reorder in weak
> > +		 * memory model. It is noop on x86
> > +		 */
> > +		rte_smp_rmb();
> > +
> > +		/* Ensure that cons.tail and cons.head are the same */
> > +		if (*old_head != r->cons.tail) {
> > +			rte_pause();
> > +
> > +			success = 0;
> > +			continue;
> > +		}
> > +
> > +		/* The subtraction is done between two unsigned 32bits value
> > +		 * (the result is always modulo 32 bits even if we have
> > +		 * cons_head > prod_tail). So 'entries' is always between 0
> > +		 * and size(ring)-1.
> > +		 */
> > +		*entries = (r->prod.tail - *old_head);
> > +
> > +		/* Set the actual entries for dequeue */
> > +		if (n > *entries)
> > +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> > +
> > +		if (unlikely(n == 0))
> > +			return 0;
> > +
> > +		*new_head = *old_head + n;
> > +		if (is_sc) {
> > +			r->cons.head = *new_head;
> > +			rte_smp_rmb();
> > +			success = 1;
> 
> I don't think we need to worry about SC case in this function.
> For sc(/sp)_serial (if we need such mode) - we probably can use normal
> move_(cons/prod)_head().
Agree

> 
> > +		} else {
> > +			success = rte_atomic32_cmpset(&r->cons.head,
> *old_head,
> > +					*new_head);
> > +		}
> > +	} while (unlikely(success == 0));
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal This function updates the head to match the tail
> > + *
> > + * @param ht
> > + *   A pointer to the ring's head-tail structure
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_revert_head(struct rte_ring_headtail *ht) {
> > +	/* Discard the reserved ring elements. */
> > +	ht->head = ht->tail;
> > +}
> > +
> >  #endif /* _RTE_RING_GENERIC_H_ */
> > --
> > 2.17.1



More information about the dev mailing list