[dpdk-dev] [RFC v2 1/1] lib/ring: add scatter gather APIs

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Tue Oct 13 00:31:25 CEST 2020


Hi Konstantin,
	Appreciate your feedback.

<snip>

> 
> 
> > Add scatter gather APIs to avoid intermediate memcpy. Use cases that
> > involve copying large amount of data to/from the ring can benefit from
> > these APIs.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > ---
> >  lib/librte_ring/meson.build        |   3 +-
> >  lib/librte_ring/rte_ring_elem.h    |   1 +
> >  lib/librte_ring/rte_ring_peek_sg.h | 552
> > +++++++++++++++++++++++++++++
> >  3 files changed, 555 insertions(+), 1 deletion(-)  create mode 100644
> > lib/librte_ring/rte_ring_peek_sg.h
> 
> As a generic one - need to update ring UT both func and perf to
> test/measure this new API.
Yes, will add.

> 
> >
> > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
> > index 31c0b4649..377694713 100644
> > --- a/lib/librte_ring/meson.build
> > +++ b/lib/librte_ring/meson.build
> > @@ -12,4 +12,5 @@ headers = files('rte_ring.h',
> >  		'rte_ring_peek.h',
> >  		'rte_ring_peek_c11_mem.h',
> >  		'rte_ring_rts.h',
> > -		'rte_ring_rts_c11_mem.h')
> > +		'rte_ring_rts_c11_mem.h',
> > +		'rte_ring_peek_sg.h')
> > diff --git a/lib/librte_ring/rte_ring_elem.h
> > b/lib/librte_ring/rte_ring_elem.h index 938b398fc..7d3933f15 100644
> > --- a/lib/librte_ring/rte_ring_elem.h
> > +++ b/lib/librte_ring/rte_ring_elem.h
> > @@ -1079,6 +1079,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r,
> > void *obj_table,
> >
> >  #ifdef ALLOW_EXPERIMENTAL_API
> >  #include <rte_ring_peek.h>
> > +#include <rte_ring_peek_sg.h>
> >  #endif
> >
> >  #include <rte_ring.h>
> > diff --git a/lib/librte_ring/rte_ring_peek_sg.h
> > b/lib/librte_ring/rte_ring_peek_sg.h
> > new file mode 100644
> > index 000000000..97d5764a6
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_peek_sg.h
> > @@ -0,0 +1,552 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2020 Arm
> > + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_PEEK_SG_H_
> > +#define _RTE_RING_PEEK_SG_H_
> > +
> > +/**
> > + * @file
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + * It is not recommended to include this file directly.
> > + * Please include <rte_ring_elem.h> instead.
> > + *
> > + * Ring Peek Scatter Gather APIs
> > + * Introduction of rte_ring with scatter gather serialized
> > +producer/consumer
> > + * (HTS sync mode) makes it possible to split public enqueue/dequeue
> > +API
> > + * into 3 phases:
> > + * - enqueue/dequeue start
> > + * - copy data to/from the ring
> > + * - enqueue/dequeue finish
> > + * Along with the advantages of the peek APIs, these APIs provide the
> > +ability
> > + * to avoid copying of the data to temporary area.
> > + *
> > + * Note that right now this new API is available only for two sync modes:
> > + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
> > + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
> > + * It is a user responsibility to create/init ring with appropriate
> > +sync
> > + * modes selected.
> > + *
> > + * Example usage:
> > + * // read 1 elem from the ring:
> > + * n = rte_ring_enqueue_sg_bulk_start(ring, 32, &sgd, NULL);
> > + * if (n != 0) {
> > + *	//Copy objects in the ring
> > + *	memcpy (sgd->ptr1, obj, sgd->n1 * sizeof(uintptr_t));
> > + *	if (n != sgd->n1)
> > + *		//Second memcpy because of wrapround
> > + *		n2 = n - sgd->n1;
> > + *		memcpy (sgd->ptr2, obj[n2], n2 * sizeof(uintptr_t));
> > + *	rte_ring_dequeue_sg_finish(ring, n);
> 
> It is not clear from the example above why do you need SG(ZC) API.
> Existing peek API would be able to handle such situation (just copy will be
> done internally). Probably better to use examples you provided in your last
> reply to Olivier.
Agree, not a good example, will change it.

> 
> > + * }
> > + *
> > + * Note that between _start_ and _finish_ none other thread can
> > + proceed
> > + * with enqueue(/dequeue) operation till _finish_ completes.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <rte_ring_peek_c11_mem.h>
> > +
> > +/* Rock that needs to be passed between reserve and commit APIs */
> > +struct rte_ring_sg_data {
> > +	/* Pointer to the first space in the ring */
> > +	void **ptr1;
> > +	/* Pointer to the second space in the ring if there is wrap-around */
> > +	void **ptr2;
> > +	/* Number of elements in the first pointer. If this is equal to
> > +	 * the number of elements requested, then ptr2 is NULL.
> > +	 * Otherwise, subtracting n1 from number of elements requested
> > +	 * will give the number of elements available at ptr2.
> > +	 */
> > +	unsigned int n1;
> > +};
> 
> I wonder what is the primary goal of that API?
> The reason I am asking: from what I understand with this patch ZC API will
> work only for ST and HTS modes (same as peek API).
> Though, I think it is possible to make it work for any sync model, by changing
Agree, the functionality can be extended to other modes as well. I added these 2 modes as I found the use cases for these.

> API a bit: instead of returning sg_data to the user, force him to provide
> function to read/write elems from/to the ring.
> Just a schematic one, to illustrate the idea:
> 
> typedef void (*write_ring_func_t)(void *elem, /*pointer to first elem to
> update inside the ring*/
> 				uint32_t num, /* number of elems to update
> */
> 				uint32_t esize,
> 				void *udata  /* caller provide data */);
> 
> rte_ring_enqueue_zc_bulk_elem(struct rte_ring *r, unsigned int esize,
> 	unsigned int n, unsigned int *free_space, write_ring_func_t wf, void
> *udata) {
> 	struct rte_ring_sg_data sgd;
> 	.....
> 	n = move_head_tail(r, ...);
> 
> 	/* get sgd data based on n */
> 	get_elem_addr(r, ..., &sgd);
> 
> 	/* call user defined function to fill reserved elems */
> 	wf(sgd.p1, sgd.n1, esize, udata);
> 	if (n != n1)
> 		wf(sgd.p2, sgd.n2, esize, udata);
> 
> 	....
> 	return n;
> }
> 
I think the call back function makes it difficult to use the API. The call back function would be a wrapper around another function or API which will have its own arguments. Now all those parameters have to passed using the 'udata'. For ex: in the 2nd example that I provided earlier, the user has to create a wrapper around 'rte_eth_rx_burst' API and then provide the parameters to 'rte_eth_rx_burst' through 'udata'. 'udata' would need a structure definition as well.

> If we want ZC peek API also - some extra work need to be done with
> introducing return value for write_ring_func() and checking it properly, but I
> don't see any big problems here too.
> That way ZC API can support all sync models, plus we don't need to expose
> sg_data to the user directly.
Other modes can be supported with the method used in this patch as well. If you see a need, I can add them.
IMO, only issue with exposing sg_data is ABI compatibility in the future. I think, we can align the 'struct rte_ring_sg_data' to cache line boundary and it should provide ability to extend it in the future without affecting the ABI compatibility.

> Also, in future, we probably can de-duplicate the code by making our non-ZC
> API to use that one internally (pass ring_enqueue_elems()/ob_table as a
> parameters).
> 
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	uint64_t *ring = (uint64_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> > +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void
> > +**dst2) {
> > +	if (esize == 8)
> > +		__rte_ring_get_elem_addr_64(r, head,
> > +						num, dst1, n1, dst2);
> > +	else if (esize == 16)
> > +		__rte_ring_get_elem_addr_128(r, head,
> > +						num, dst1, n1, dst2);
> 
> 
> I don't think we need that special handling for 8/16B sizes.
> In all functions esize is an input parameter.
> If user will specify is as a constant - compiler will be able to convert multiply
> to shift and add ops.
Ok, I will check this out.

> 
> > +	else {
> > +		uint32_t idx, scale, nr_idx;
> > +		uint32_t *ring = (uint32_t *)&r[1];
> > +
> > +		/* Normalize to uint32_t */
> > +		scale = esize / sizeof(uint32_t);
> > +		idx = head & r->mask;
> > +		nr_idx = idx * scale;
> > +
> > +		*dst1 = ring + nr_idx;
> > +		*n1 = num;
> > +
> > +		if (idx + num > r->size) {
> > +			*n1 = num - (r->size - idx - 1);
> > +			*dst2 = ring;
> > +		}
> > +	}
> > +}
> > +
> > +/**
> > + * @internal This function moves prod head value.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_sg_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +		uint32_t n, enum rte_ring_queue_behavior behavior,
> > +		struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	uint32_t free, head, next;
> > +
> > +	switch (r->prod.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
> > +			behavior, &head, &next, &free);
> > +		__rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd-
> >ptr1,
> > +			&sgd->n1, (void **)&sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n = __rte_ring_hts_move_prod_head(r, n, behavior, &head,
> &free);
> > +		__rte_ring_get_elem_addr(r, head, esize, n, (void **)&sgd-
> >ptr1,
> > +			&sgd->n1, (void **)&sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +		n = 0;
> > +		free = 0;
> > +	}
> > +
> > +	if (free_space != NULL)
> > +		*free_space = free - n;
> > +	return n;
> > +}
> > +
> > +/**
> > + * Start to enqueue several objects on the ring.
> > + * Note that no actual objects are put in the queue by this function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy objects into the queue using the returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete
> > +the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_bulk_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*free_space) {
> > +	return __rte_ring_do_enqueue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_FIXED, sgd, free_space); }
> > +
> > +/**
> > + * Start to enqueue several pointers to objects on the ring.
> > + * Note that no actual pointers are put in the queue by this
> > +function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy pointers to objects into the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_bulk_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	return rte_ring_enqueue_sg_bulk_elem_start(r, sizeof(uintptr_t), n,
> > +							sgd, free_space);
> > +}
> > +/**
> > + * Start to enqueue several objects on the ring.
> > + * Note that no actual objects are put in the queue by this function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy objects into the queue using the returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_elem_finish to complete
> > +the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_burst_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*free_space) {
> > +	return __rte_ring_do_enqueue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_VARIABLE, sgd, free_space); }
> > +
> > +/**
> > + * Start to enqueue several pointers to objects on the ring.
> > + * Note that no actual pointers are put in the queue by this
> > +function,
> > + * it just reserves space for the user on the ring.
> > + * User has to copy pointers to objects into the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_enqueue_sg_bulk_finish to complete the
> > + * enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add in the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   reservation operation has finished.
> > + * @return
> > + *   The number of objects that can be enqueued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_enqueue_sg_burst_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *free_space) {
> > +	return rte_ring_enqueue_sg_burst_elem_start(r, sizeof(uintptr_t),
> n,
> > +							sgd, free_space);
> > +}
> > +
> > +/**
> > + * Complete enqueuing several objects on the ring.
> > + * Note that number of objects to enqueue should not exceed previous
> > + * enqueue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to add to the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_enqueue_sg_elem_finish(struct rte_ring *r, unsigned int n) {
> > +	uint32_t tail;
> > +
> > +	switch (r->prod.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n = __rte_ring_st_get_tail(&r->prod, &tail, n);
> > +		__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
> > +		__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +	}
> > +}
> > +
> > +/**
> > + * Complete enqueuing several pointers to objects on the ring.
> > + * Note that number of objects to enqueue should not exceed previous
> > + * enqueue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of pointers to objects to add to the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_enqueue_sg_finish(struct rte_ring *r, unsigned int n) {
> > +	rte_ring_enqueue_sg_elem_finish(r, n); }
> > +
> > +/**
> > + * @internal This function moves cons head value and copies up to *n*
> > + * objects from the ring to the user provided obj_table.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_dequeue_sg_elem_start(struct rte_ring *r,
> > +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> > +	struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	uint32_t avail, head, next;
> > +
> > +	switch (r->cons.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
> > +			behavior, &head, &next, &avail);
> > +		__rte_ring_get_elem_addr(r, head, esize, n,
> > +					sgd->ptr1, &sgd->n1, sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n = __rte_ring_hts_move_cons_head(r, n, behavior,
> > +			&head, &avail);
> > +		__rte_ring_get_elem_addr(r, head, esize, n,
> > +					sgd->ptr1, &sgd->n1, sgd->ptr2);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +		n = 0;
> > +		avail = 0;
> > +	}
> > +
> > +	if (available != NULL)
> > +		*available = avail - n;
> > +	return n;
> > +}
> > +
> > +/**
> > + * Start to dequeue several objects from the ring.
> > + * Note that no actual objects are copied from the queue by this function.
> > + * User has to copy objects from the queue using the returned pointers.
> > + * User should call rte_ring_dequeue_sg_bulk_elem_finish to complete
> > +the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_bulk_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*available) {
> > +	return __rte_ring_do_dequeue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_FIXED, sgd, available); }
> > +
> > +/**
> > + * Start to dequeue several pointers to objects from the ring.
> > + * Note that no actual pointers are removed from the queue by this
> function.
> > + * User has to copy pointers to objects from the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_dequeue_sg_bulk_finish to complete the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_bulk_start(struct rte_ring *r, unsigned int n,
> > +	struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	return rte_ring_dequeue_sg_bulk_elem_start(r, sizeof(uintptr_t),
> > +		n, sgd, available);
> > +}
> > +
> > +/**
> > + * Start to dequeue several objects from the ring.
> > + * Note that no actual objects are copied from the queue by this function.
> > + * User has to copy objects from the queue using the returned pointers.
> > + * User should call rte_ring_dequeue_sg_burst_elem_finish to complete
> > +the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of objects to dequeue from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_burst_elem_start(struct rte_ring *r, unsigned int
> esize,
> > +	unsigned int n, struct rte_ring_sg_data *sgd, unsigned int
> > +*available) {
> > +	return __rte_ring_do_dequeue_sg_elem_start(r, esize, n,
> > +			RTE_RING_QUEUE_VARIABLE, sgd, available); }
> > +
> > +/**
> > + * Start to dequeue several pointers to objects from the ring.
> > + * Note that no actual pointers are removed from the queue by this
> function.
> > + * User has to copy pointers to objects from the queue using the
> > + * returned pointers.
> > + * User should call rte_ring_dequeue_sg_burst_finish to complete the
> > + * dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + * @param sgd
> > + *   The scatter-gather data containing pointers for copying data.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects that can be dequeued, either 0 or n
> > + */
> > +__rte_experimental
> > +static __rte_always_inline unsigned int
> > +rte_ring_dequeue_sg_burst_start(struct rte_ring *r, unsigned int n,
> > +		struct rte_ring_sg_data *sgd, unsigned int *available) {
> > +	return rte_ring_dequeue_sg_burst_elem_start(r, sizeof(uintptr_t),
> n,
> > +			sgd, available);
> > +}
> > +
> > +/**
> > + * Complete dequeuing several objects from the ring.
> > + * Note that number of objects to dequeued should not exceed previous
> > + * dequeue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_dequeue_sg_elem_finish(struct rte_ring *r, unsigned int n) {
> > +	uint32_t tail;
> > +
> > +	switch (r->cons.sync_type) {
> > +	case RTE_RING_SYNC_ST:
> > +		n = __rte_ring_st_get_tail(&r->cons, &tail, n);
> > +		__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
> > +		break;
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
> > +		__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
> > +		break;
> > +	case RTE_RING_SYNC_MT:
> > +	case RTE_RING_SYNC_MT_RTS:
> > +	default:
> > +		/* unsupported mode, shouldn't be here */
> > +		RTE_ASSERT(0);
> > +	}
> > +}
> > +
> > +/**
> > + * Complete dequeuing several objects from the ring.
> > + * Note that number of objects to dequeued should not exceed previous
> > + * dequeue_start return value.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param n
> > + *   The number of objects to remove from the ring.
> > + */
> > +__rte_experimental
> > +static __rte_always_inline void
> > +rte_ring_dequeue_sg_finish(struct rte_ring *r, unsigned int n) {
> > +	rte_ring_dequeue_elem_finish(r, n);
> > +}
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RING_PEEK_SG_H_ */
> > --
> > 2.17.1



More information about the dev mailing list