[dpdk-dev] [PATCH v4 1/2] lib/ring: apis to support configurable element size

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Tue Oct 15 01:56:04 CEST 2019


Hi Konstantin,
	Thank you for the feedback.

<snip>

> 
> > >
> > > Current APIs assume ring elements to be pointers. However, in many
> > > use cases, the size can be different. Add new APIs to support
> > > configurable ring element sizes.
> > >
> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
> > > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > > Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> > > ---
> > >  lib/librte_ring/Makefile             |   3 +-
> > >  lib/librte_ring/meson.build          |   3 +
> > >  lib/librte_ring/rte_ring.c           |  45 +-
> > >  lib/librte_ring/rte_ring.h           |   1 +
> > >  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
> > >  lib/librte_ring/rte_ring_version.map |   2 +
> > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > 100644 lib/librte_ring/rte_ring_elem.h
> > >
> > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > index 21a36770d..515a967bb 100644
> > > --- a/lib/librte_ring/Makefile
> > > +++ b/lib/librte_ring/Makefile
> > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > LIB = librte_ring.a
> > >
> > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> > > DALLOW_EXPERIMENTAL_API
> > >  LDLIBS += -lrte_eal
> > >
> > >  EXPORT_MAP := rte_ring_version.map
> > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > >
> > >  # install includes
> > >  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > +					rte_ring_elem.h \
> > >  					rte_ring_generic.h \
> > >  					rte_ring_c11_mem.h
> > >
> > > diff --git a/lib/librte_ring/meson.build
> > > b/lib/librte_ring/meson.build index ab8b0b469..74219840a 100644
> > > --- a/lib/librte_ring/meson.build
> > > +++ b/lib/librte_ring/meson.build
> > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
> > >  		'rte_ring_c11_mem.h',
> > >  		'rte_ring_generic.h')
> > > +
> > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > +experimental allow_experimental_apis = true
> > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > > index d9b308036..6fed3648b 100644
> > > --- a/lib/librte_ring/rte_ring.c
> > > +++ b/lib/librte_ring/rte_ring.c
> > > @@ -33,6 +33,7 @@
> > >  #include <rte_tailq.h>
> > >
> > >  #include "rte_ring.h"
> > > +#include "rte_ring_elem.h"
> > >
> > >  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> > >
> > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> > >
> > >  /* return the size of memory occupied by a ring */  ssize_t -
> > > rte_ring_get_memsize(unsigned count)
> > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
> > >  {
> > >  	ssize_t sz;
> > >
> > > +	/* Supported esize values are 4/8/16.
> > > +	 * Others can be added on need basis.
> > > +	 */
> > > +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> > > +		RTE_LOG(ERR, RING,
> > > +			"Unsupported esize value. Supported values are 4, 8
> > > and 16\n");
> > > +
> > > +		return -EINVAL;
> > > +	}
> > > +
> > >  	/* count must be a power of 2 */
> > >  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> > >  		RTE_LOG(ERR, RING,
> > > -			"Requested size is invalid, must be power of 2, and "
> > > -			"do not exceed the size limit %u\n",
> > > RTE_RING_SZ_MASK);
> > > +			"Requested number of elements is invalid, must be "
> > > +			"power of 2, and do not exceed the limit %u\n",
> > > +			RTE_RING_SZ_MASK);
> > > +
> > >  		return -EINVAL;
> > >  	}
> > >
> > > -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> > > +	sz = sizeof(struct rte_ring) + count * esize;
> > >  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> > >  	return sz;
> > >  }
> > >
> > > +/* return the size of memory occupied by a ring */ ssize_t
> > > +rte_ring_get_memsize(unsigned count) {
> > > +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> > > +
> > >  void
> > >  rte_ring_reset(struct rte_ring *r)
> > >  {
> > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char
> > > *name, unsigned count,
> > >  	return 0;
> > >  }
> > >
> > > -/* create the ring */
> > > +/* create the ring for a given element size */
> > >  struct rte_ring *
> > > -rte_ring_create(const char *name, unsigned count, int socket_id,
> > > -		unsigned flags)
> > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> > > +		int socket_id, unsigned flags)
> > >  {
> > >  	char mz_name[RTE_MEMZONE_NAMESIZE];
> > >  	struct rte_ring *r;
> > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned
> > > count, int socket_id,
> > >  	if (flags & RING_F_EXACT_SZ)
> > >  		count = rte_align32pow2(count + 1);
> > >
> > > -	ring_size = rte_ring_get_memsize(count);
> > > +	ring_size = rte_ring_get_memsize_elem(count, esize);
> > >  	if (ring_size < 0) {
> > >  		rte_errno = ring_size;
> > >  		return NULL;
> > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned
> > > count, int socket_id,
> > >  	return r;
> > >  }
> > >
> > > +/* create the ring */
> > > +struct rte_ring *
> > > +rte_ring_create(const char *name, unsigned count, int socket_id,
> > > +		unsigned flags)
> > > +{
> > > +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> > > +		flags);
> > > +}
> > > +
> > >  /* free the ring */
> > >  void
> > >  rte_ring_free(struct rte_ring *r)
> > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > index
> > > 2a9f768a1..18fc5d845 100644
> > > --- a/lib/librte_ring/rte_ring.h
> > > +++ b/lib/librte_ring/rte_ring.h
> > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char
> > > *name, unsigned count,
> > >   */
> > >  struct rte_ring *rte_ring_create(const char *name, unsigned count,
> > >  				 int socket_id, unsigned flags);
> > > +
> > >  /**
> > >   * De-allocate all memory used by the ring.
> > >   *
> > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > 000000000..860f059ad
> > > --- /dev/null
> > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > @@ -0,0 +1,946 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + *
> > > + * Copyright (c) 2019 Arm Limited
> > > + * Copyright (c) 2010-2017 Intel Corporation
> > > + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> > > + * All rights reserved.
> > > + * Derived from FreeBSD's bufring.h
> > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > + */
> > > +
> > > +#ifndef _RTE_RING_ELEM_H_
> > > +#define _RTE_RING_ELEM_H_
> > > +
> > > +/**
> > > + * @file
> > > + * RTE Ring with flexible element size  */
> > > +
> > > +#ifdef __cplusplus
> > > +extern "C" {
> > > +#endif
> > > +
> > > +#include <stdio.h>
> > > +#include <stdint.h>
> > > +#include <sys/queue.h>
> > > +#include <errno.h>
> > > +#include <rte_common.h>
> > > +#include <rte_config.h>
> > > +#include <rte_memory.h>
> > > +#include <rte_lcore.h>
> > > +#include <rte_atomic.h>
> > > +#include <rte_branch_prediction.h>
> > > +#include <rte_memzone.h>
> > > +#include <rte_pause.h>
> > > +
> > > +#include "rte_ring.h"
> > > +
> > > +/**
> > > + * @warning
> > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > + *
> > > + * Calculate the memory size needed for a ring with given element
> > > +size
> > > + *
> > > + * This function returns the number of bytes needed for a ring,
> > > +given
> > > + * the number of elements in it and the size of the element. This
> > > +value
> > > + * is the sum of the size of the structure rte_ring and the size of
> > > +the
> > > + * memory needed for storing the elements. The value is aligned to
> > > +a cache
> > > + * line size.
> > > + *
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > + * @return
> > > + *   - The memory size needed for the ring on success.
> > > + *   - -EINVAL if count is not a power of 2.
> > > + */
> > > +__rte_experimental
> > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> > > +
> > > +/**
> > > + * @warning
> > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > + *
> > > + * Create a new ring named *name* that stores elements with given size.
> > > + *
> > > + * This function uses ``memzone_reserve()`` to allocate memory.
> > > +Then it
> > > + * calls rte_ring_init() to initialize an empty ring.
> > > + *
> > > + * The new ring size is set to *count*, which must be a power of
> > > + * two. Water marking is disabled by default. The real usable ring
> > > +size
> > > + * is *count-1* instead of *count* to differentiate a free ring
> > > +from an
> > > + * empty ring.
> > > + *
> > > + * The ring is added in RTE_TAILQ_RING list.
> > > + *
> > > + * @param name
> > > + *   The name of the ring.
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > + * @param socket_id
> > > + *   The *socket_id* argument is the socket identifier in case of
> > > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > > + *   constraint for the reserved zone.
> > > + * @param flags
> > > + *   An OR of the following:
> > > + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> > > + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> > > + *      is "single-producer". Otherwise, it is "multi-producers".
> > > + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> > > + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> > > + *      is "single-consumer". Otherwise, it is "multi-consumers".
> > > + * @return
> > > + *   On success, the pointer to the new allocated ring. NULL on error with
> > > + *    rte_errno set appropriately. Possible errno values include:
> > > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > > structure
> > > + *    - E_RTE_SECONDARY - function was called from a secondary process
> > > instance
> > > + *    - EINVAL - count provided is not a power of 2
> > > + *    - ENOSPC - the maximum number of memzones has already been
> > > allocated
> > > + *    - EEXIST - a memzone with the same name already exists
> > > + *    - ENOMEM - no appropriate memory area found in which to create
> > > memzone
> > > + */
> > > +__rte_experimental
> > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> > > +				unsigned esize, int socket_id, unsigned flags);
> > > +
> > > +/* the actual enqueue of pointers on the ring.
> > > + * Placed here since identical code needed in both
> > > + * single and multi producer enqueue functions.
> > > + */
> > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > +esize, n)
> > > do { \
> > > +	if (esize == 4) \
> > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> > > +	else if (esize == 8) \
> > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> > > +	else if (esize == 16) \
> > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n);
> \ }
> > > while
> > > +(0)
> > > +
> > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +			ring[idx + 2] = obj[i + 2]; \
> > > +			ring[idx + 3] = obj[i + 3]; \
> > > +			ring[idx + 4] = obj[i + 4]; \
> > > +			ring[idx + 5] = obj[i + 5]; \
> > > +			ring[idx + 6] = obj[i + 6]; \
> > > +			ring[idx + 7] = obj[i + 7]; \
> > > +		} \
> > > +		switch (n & 0x7) { \
> > > +		case 7: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 6: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 5: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 4: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 3: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +			ring[idx + 2] = obj[i + 2]; \
> > > +			ring[idx + 3] = obj[i + 3]; \
> > > +		} \
> > > +		switch (n & 0x3) { \
> > > +		case 3: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do
> { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +		} \
> > > +		switch (n & 0x1) { \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +/* the actual copy of pointers on the ring to obj_table.
> > > + * Placed here since identical code needed in both
> > > + * single and multi consumer dequeue functions.
> > > + */
> > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > +esize, n)
> > > do { \
> > > +	if (esize == 4) \
> > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> > > +	else if (esize == 8) \
> > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> > > +	else if (esize == 16) \
> > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n);
> \ }
> > > while
> > > +(0)
> > > +
> > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +			obj[i + 2] = ring[idx + 2]; \
> > > +			obj[i + 3] = ring[idx + 3]; \
> > > +			obj[i + 4] = ring[idx + 4]; \
> > > +			obj[i + 5] = ring[idx + 5]; \
> > > +			obj[i + 6] = ring[idx + 6]; \
> > > +			obj[i + 7] = ring[idx + 7]; \
> > > +		} \
> > > +		switch (n & 0x7) { \
> > > +		case 7: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 6: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 5: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 4: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 3: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +			obj[i + 2] = ring[idx + 2]; \
> > > +			obj[i + 3] = ring[idx + 3]; \
> > > +		} \
> > > +		switch (n & 0x3) { \
> > > +		case 3: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do
> { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +		} \
> > > +		switch (n & 0x1) { \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +/* Between load and load. there might be cpu reorder in weak model
> > > + * (powerpc/arm).
> > > + * There are 2 choices for the users
> > > + * 1.use rmb() memory barrier
> > > + * 2.use one-direction load_acquire/store_release barrier,defined
> > > +by
> > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > + * It depends on performance test results.
> > > + * By default, move common functions to rte_ring_generic.h  */
> > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > +#else
> > > +#include "rte_ring_generic.h"
> > > +#endif
> > > +
> > > +/**
> > > + * @internal Enqueue several objects on the ring
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> > > + *   as passed while creating the ring, otherwise the results are undefined.
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a
> ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> from
> > > ring
> > > + * @param is_sp
> > > + *   Indicates whether to use single producer or multi-producer head
> update
> > > + * @param free_space
> > > + *   returns the amount of space after the enqueue operation has
> finished
> > > + * @return
> > > + *   Actual number of objects enqueued.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> > > +		unsigned int esize, unsigned int n,
> > > +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> > > +		unsigned int *free_space)
> 
> 
> I like the idea to add esize as an argument to the public API, so the compiler
> can do it's jib optimizing calls with constant esize.
> Though I am not very happy with the rest of implementation:
> 1. It doesn't really provide configurable elem size - only 4/8/16B elems are
> supported.
Agree. I was thinking other sizes can be added on need basis.
However, I am wondering if we should just provide for 4B and then the users can use bulk operations to construct whatever they need? It would mean extra work for the users.

> 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> macros.
> 
> Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does
> 32B copy per iteration.
Yes, I tried to keep it the same as the existing one (originally, I guess the intention was to allow for 256b vector instructions to be generated)

> So wonder can we make a generic function that would do 32B copy per
> iteration in a main loop, and copy tail  by 4B chunks?
> That would avoid copy duplication and will allow user to have any elem size
> (multiple of 4B) he wants.
> Something like that (note didn't test it, just a rough idea):
> 
>  static inline void
> copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t
> esize) {
>         uint32_t i, sz;
> 
>         sz = (num * esize) / sizeof(uint32_t);
If 'num' is a compile time constant, 'sz' will be a compile time constant. Otherwise, this will result in a multiplication operation. I have tried to avoid the multiplication operation and try to use shift and mask operations (just like how the rest of the ring code does).

> 
>         for (i = 0; i < (sz & ~7); i += 8)
>                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
I had used memcpy to start with (for the entire copy operation), performance is not the same for 64b elements when compared with the existing ring APIs (some cases more and some cases less).

IMO, we have to keep the performance of the 64b and 128b the same as what we get with the existing ring and event-ring APIs. That would allow us to replace them with these new APIs. I suggest that we keep the macros in this patch for 64b and 128b.

For the rest of the sizes, we could put a for loop around 32b macro (this would allow for all sizes as well).

> 
>         switch (sz & 7) {
>         case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
>         case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
>         case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
>         case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
>         case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
>         case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
>         case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
>         }
> }
> 
> static inline void
> enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
>                 void *obj_table, uint32_t num, uint32_t esize) {
>         uint32_t idx, n;
>         uint32_t *du32;
> 
>         const uint32_t size = r->size;
> 
>         idx = prod_head & (r)->mask;
> 
>         du32 = ring_start + idx * sizeof(uint32_t);
> 
>         if (idx + num < size)
>                 copy_elems(du32, obj_table, num, esize);
>         else {
>                 n = size - idx;
>                 copy_elems(du32, obj_table, n, esize);
>                 copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
>                         num - n, esize);
>         }
> }
> 
> And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:
> 
> enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);
> 
> 
> > > +{
> > > +	uint32_t prod_head, prod_next;
> > > +	uint32_t free_entries;
> > > +
> > > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > > +			&prod_head, &prod_next, &free_entries);
> > > +	if (n == 0)
> > > +		goto end;
> > > +
> > > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> > > +
> > > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > > +end:
> > > +	if (free_space != NULL)
> > > +		*free_space = free_entries - n;
> > > +	return n;
> > > +}
> > > +


More information about the dev mailing list