[dpdk-dev] [PATCH v4 1/2] lib/ring: apis to support configurable element size

Ananyev, Konstantin konstantin.ananyev at intel.com
Tue Oct 15 11:34:02 CEST 2019


Hi Honnappa,
 
> > > >
> > > > Current APIs assume ring elements to be pointers. However, in many
> > > > use cases, the size can be different. Add new APIs to support
> > > > configurable ring element sizes.
> > > >
> > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
> > > > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > > > Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> > > > ---
> > > >  lib/librte_ring/Makefile             |   3 +-
> > > >  lib/librte_ring/meson.build          |   3 +
> > > >  lib/librte_ring/rte_ring.c           |  45 +-
> > > >  lib/librte_ring/rte_ring.h           |   1 +
> > > >  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
> > > >  lib/librte_ring/rte_ring_version.map |   2 +
> > > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > >
> > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > > index 21a36770d..515a967bb 100644
> > > > --- a/lib/librte_ring/Makefile
> > > > +++ b/lib/librte_ring/Makefile
> > > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > > LIB = librte_ring.a
> > > >
> > > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> > > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> > > > DALLOW_EXPERIMENTAL_API
> > > >  LDLIBS += -lrte_eal
> > > >
> > > >  EXPORT_MAP := rte_ring_version.map
> > > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > > >
> > > >  # install includes
> > > >  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > > +					rte_ring_elem.h \
> > > >  					rte_ring_generic.h \
> > > >  					rte_ring_c11_mem.h
> > > >
> > > > diff --git a/lib/librte_ring/meson.build
> > > > b/lib/librte_ring/meson.build index ab8b0b469..74219840a 100644
> > > > --- a/lib/librte_ring/meson.build
> > > > +++ b/lib/librte_ring/meson.build
> > > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
> > > >  		'rte_ring_c11_mem.h',
> > > >  		'rte_ring_generic.h')
> > > > +
> > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > > +experimental allow_experimental_apis = true
> > > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > > > index d9b308036..6fed3648b 100644
> > > > --- a/lib/librte_ring/rte_ring.c
> > > > +++ b/lib/librte_ring/rte_ring.c
> > > > @@ -33,6 +33,7 @@
> > > >  #include <rte_tailq.h>
> > > >
> > > >  #include "rte_ring.h"
> > > > +#include "rte_ring_elem.h"
> > > >
> > > >  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> > > >
> > > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> > > >
> > > >  /* return the size of memory occupied by a ring */  ssize_t -
> > > > rte_ring_get_memsize(unsigned count)
> > > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
> > > >  {
> > > >  	ssize_t sz;
> > > >
> > > > +	/* Supported esize values are 4/8/16.
> > > > +	 * Others can be added on need basis.
> > > > +	 */
> > > > +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> > > > +		RTE_LOG(ERR, RING,
> > > > +			"Unsupported esize value. Supported values are 4, 8
> > > > and 16\n");
> > > > +
> > > > +		return -EINVAL;
> > > > +	}
> > > > +
> > > >  	/* count must be a power of 2 */
> > > >  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> > > >  		RTE_LOG(ERR, RING,
> > > > -			"Requested size is invalid, must be power of 2, and "
> > > > -			"do not exceed the size limit %u\n",
> > > > RTE_RING_SZ_MASK);
> > > > +			"Requested number of elements is invalid, must be "
> > > > +			"power of 2, and do not exceed the limit %u\n",
> > > > +			RTE_RING_SZ_MASK);
> > > > +
> > > >  		return -EINVAL;
> > > >  	}
> > > >
> > > > -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> > > > +	sz = sizeof(struct rte_ring) + count * esize;
> > > >  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> > > >  	return sz;
> > > >  }
> > > >
> > > > +/* return the size of memory occupied by a ring */ ssize_t
> > > > +rte_ring_get_memsize(unsigned count) {
> > > > +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> > > > +
> > > >  void
> > > >  rte_ring_reset(struct rte_ring *r)
> > > >  {
> > > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char
> > > > *name, unsigned count,
> > > >  	return 0;
> > > >  }
> > > >
> > > > -/* create the ring */
> > > > +/* create the ring for a given element size */
> > > >  struct rte_ring *
> > > > -rte_ring_create(const char *name, unsigned count, int socket_id,
> > > > -		unsigned flags)
> > > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> > > > +		int socket_id, unsigned flags)
> > > >  {
> > > >  	char mz_name[RTE_MEMZONE_NAMESIZE];
> > > >  	struct rte_ring *r;
> > > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned
> > > > count, int socket_id,
> > > >  	if (flags & RING_F_EXACT_SZ)
> > > >  		count = rte_align32pow2(count + 1);
> > > >
> > > > -	ring_size = rte_ring_get_memsize(count);
> > > > +	ring_size = rte_ring_get_memsize_elem(count, esize);
> > > >  	if (ring_size < 0) {
> > > >  		rte_errno = ring_size;
> > > >  		return NULL;
> > > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned
> > > > count, int socket_id,
> > > >  	return r;
> > > >  }
> > > >
> > > > +/* create the ring */
> > > > +struct rte_ring *
> > > > +rte_ring_create(const char *name, unsigned count, int socket_id,
> > > > +		unsigned flags)
> > > > +{
> > > > +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> > > > +		flags);
> > > > +}
> > > > +
> > > >  /* free the ring */
> > > >  void
> > > >  rte_ring_free(struct rte_ring *r)
> > > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > > index
> > > > 2a9f768a1..18fc5d845 100644
> > > > --- a/lib/librte_ring/rte_ring.h
> > > > +++ b/lib/librte_ring/rte_ring.h
> > > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char
> > > > *name, unsigned count,
> > > >   */
> > > >  struct rte_ring *rte_ring_create(const char *name, unsigned count,
> > > >  				 int socket_id, unsigned flags);
> > > > +
> > > >  /**
> > > >   * De-allocate all memory used by the ring.
> > > >   *
> > > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > > 000000000..860f059ad
> > > > --- /dev/null
> > > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > > @@ -0,0 +1,946 @@
> > > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > > + *
> > > > + * Copyright (c) 2019 Arm Limited
> > > > + * Copyright (c) 2010-2017 Intel Corporation
> > > > + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> > > > + * All rights reserved.
> > > > + * Derived from FreeBSD's bufring.h
> > > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > > + */
> > > > +
> > > > +#ifndef _RTE_RING_ELEM_H_
> > > > +#define _RTE_RING_ELEM_H_
> > > > +
> > > > +/**
> > > > + * @file
> > > > + * RTE Ring with flexible element size  */
> > > > +
> > > > +#ifdef __cplusplus
> > > > +extern "C" {
> > > > +#endif
> > > > +
> > > > +#include <stdio.h>
> > > > +#include <stdint.h>
> > > > +#include <sys/queue.h>
> > > > +#include <errno.h>
> > > > +#include <rte_common.h>
> > > > +#include <rte_config.h>
> > > > +#include <rte_memory.h>
> > > > +#include <rte_lcore.h>
> > > > +#include <rte_atomic.h>
> > > > +#include <rte_branch_prediction.h>
> > > > +#include <rte_memzone.h>
> > > > +#include <rte_pause.h>
> > > > +
> > > > +#include "rte_ring.h"
> > > > +
> > > > +/**
> > > > + * @warning
> > > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > > + *
> > > > + * Calculate the memory size needed for a ring with given element
> > > > +size
> > > > + *
> > > > + * This function returns the number of bytes needed for a ring,
> > > > +given
> > > > + * the number of elements in it and the size of the element. This
> > > > +value
> > > > + * is the sum of the size of the structure rte_ring and the size of
> > > > +the
> > > > + * memory needed for storing the elements. The value is aligned to
> > > > +a cache
> > > > + * line size.
> > > > + *
> > > > + * @param count
> > > > + *   The number of elements in the ring (must be a power of 2).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > > + * @return
> > > > + *   - The memory size needed for the ring on success.
> > > > + *   - -EINVAL if count is not a power of 2.
> > > > + */
> > > > +__rte_experimental
> > > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> > > > +
> > > > +/**
> > > > + * @warning
> > > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > > + *
> > > > + * Create a new ring named *name* that stores elements with given size.
> > > > + *
> > > > + * This function uses ``memzone_reserve()`` to allocate memory.
> > > > +Then it
> > > > + * calls rte_ring_init() to initialize an empty ring.
> > > > + *
> > > > + * The new ring size is set to *count*, which must be a power of
> > > > + * two. Water marking is disabled by default. The real usable ring
> > > > +size
> > > > + * is *count-1* instead of *count* to differentiate a free ring
> > > > +from an
> > > > + * empty ring.
> > > > + *
> > > > + * The ring is added in RTE_TAILQ_RING list.
> > > > + *
> > > > + * @param name
> > > > + *   The name of the ring.
> > > > + * @param count
> > > > + *   The number of elements in the ring (must be a power of 2).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > > + * @param socket_id
> > > > + *   The *socket_id* argument is the socket identifier in case of
> > > > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > > > + *   constraint for the reserved zone.
> > > > + * @param flags
> > > > + *   An OR of the following:
> > > > + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> > > > + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> > > > + *      is "single-producer". Otherwise, it is "multi-producers".
> > > > + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> > > > + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> > > > + *      is "single-consumer". Otherwise, it is "multi-consumers".
> > > > + * @return
> > > > + *   On success, the pointer to the new allocated ring. NULL on error with
> > > > + *    rte_errno set appropriately. Possible errno values include:
> > > > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > > > structure
> > > > + *    - E_RTE_SECONDARY - function was called from a secondary process
> > > > instance
> > > > + *    - EINVAL - count provided is not a power of 2
> > > > + *    - ENOSPC - the maximum number of memzones has already been
> > > > allocated
> > > > + *    - EEXIST - a memzone with the same name already exists
> > > > + *    - ENOMEM - no appropriate memory area found in which to create
> > > > memzone
> > > > + */
> > > > +__rte_experimental
> > > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> > > > +				unsigned esize, int socket_id, unsigned flags);
> > > > +
> > > > +/* the actual enqueue of pointers on the ring.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi producer enqueue functions.
> > > > + */
> > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > > +esize, n)
> > > > do { \
> > > > +	if (esize == 4) \
> > > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> > > > +	else if (esize == 8) \
> > > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> > > > +	else if (esize == 16) \
> > > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n);
> > \ }
> > > > while
> > > > +(0)
> > > > +
> > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > +			ring[idx + 4] = obj[i + 4]; \
> > > > +			ring[idx + 5] = obj[i + 5]; \
> > > > +			ring[idx + 6] = obj[i + 6]; \
> > > > +			ring[idx + 7] = obj[i + 7]; \
> > > > +		} \
> > > > +		switch (n & 0x7) { \
> > > > +		case 7: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 6: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 5: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 4: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 3: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > +		} \
> > > > +		switch (n & 0x3) { \
> > > > +		case 3: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do
> > { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +		} \
> > > > +		switch (n & 0x1) { \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +/* the actual copy of pointers on the ring to obj_table.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi consumer dequeue functions.
> > > > + */
> > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > > +esize, n)
> > > > do { \
> > > > +	if (esize == 4) \
> > > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> > > > +	else if (esize == 8) \
> > > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> > > > +	else if (esize == 16) \
> > > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n);
> > \ }
> > > > while
> > > > +(0)
> > > > +
> > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > +			obj[i + 4] = ring[idx + 4]; \
> > > > +			obj[i + 5] = ring[idx + 5]; \
> > > > +			obj[i + 6] = ring[idx + 6]; \
> > > > +			obj[i + 7] = ring[idx + 7]; \
> > > > +		} \
> > > > +		switch (n & 0x7) { \
> > > > +		case 7: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 6: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 5: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 4: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 3: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > +		} \
> > > > +		switch (n & 0x3) { \
> > > > +		case 3: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +		} \
> > > > +		switch (n & 0x1) { \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +/* Between load and load. there might be cpu reorder in weak model
> > > > + * (powerpc/arm).
> > > > + * There are 2 choices for the users
> > > > + * 1.use rmb() memory barrier
> > > > + * 2.use one-direction load_acquire/store_release barrier,defined
> > > > +by
> > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > > + * It depends on performance test results.
> > > > + * By default, move common functions to rte_ring_generic.h  */
> > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > > +#else
> > > > +#include "rte_ring_generic.h"
> > > > +#endif
> > > > +
> > > > +/**
> > > > + * @internal Enqueue several objects on the ring
> > > > + *
> > > > + * @param r
> > > > + *   A pointer to the ring structure.
> > > > + * @param obj_table
> > > > + *   A pointer to a table of void * pointers (objects).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> > > > + *   as passed while creating the ring, otherwise the results are undefined.
> > > > + * @param n
> > > > + *   The number of objects to add in the ring from the obj_table.
> > > > + * @param behavior
> > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a
> > ring
> > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > from
> > > > ring
> > > > + * @param is_sp
> > > > + *   Indicates whether to use single producer or multi-producer head
> > update
> > > > + * @param free_space
> > > > + *   returns the amount of space after the enqueue operation has
> > finished
> > > > + * @return
> > > > + *   Actual number of objects enqueued.
> > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > + */
> > > > +static __rte_always_inline unsigned int
> > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> > > > +		unsigned int esize, unsigned int n,
> > > > +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> > > > +		unsigned int *free_space)
> >
> >
> > I like the idea to add esize as an argument to the public API, so the compiler
> > can do it's jib optimizing calls with constant esize.
> > Though I am not very happy with the rest of implementation:
> > 1. It doesn't really provide configurable elem size - only 4/8/16B elems are
> > supported.
> Agree. I was thinking other sizes can be added on need basis.
> However, I am wondering if we should just provide for 4B and then the users can use bulk operations to construct whatever they need?

I suppose it could be plan B... if there would be no agreement on generic case.
And for 4B elems, I guess you do have a particular use-case?

> It
> would mean extra work for the users.
> 
> > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> > macros.
> >
> > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does
> > 32B copy per iteration.
> Yes, I tried to keep it the same as the existing one (originally, I guess the intention was to allow for 256b vector instructions to be
> generated)
> 
> > So wonder can we make a generic function that would do 32B copy per
> > iteration in a main loop, and copy tail  by 4B chunks?
> > That would avoid copy duplication and will allow user to have any elem size
> > (multiple of 4B) he wants.
> > Something like that (note didn't test it, just a rough idea):
> >
> >  static inline void
> > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t
> > esize) {
> >         uint32_t i, sz;
> >
> >         sz = (num * esize) / sizeof(uint32_t);
> If 'num' is a compile time constant, 'sz' will be a compile time constant. Otherwise, this will result in a multiplication operation. 

Not always.
If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it would be just one shift.
For other constant values it could be a 'mul' or in many cases just 2 shifts plus 'add' (if compiler is smart enough).
I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num << 1).
I suppose for non-power of 2 elems it might be ok to get such small perf hit.

>I have tried 
> to avoid the multiplication operation and try to use shift and mask operations (just like how the rest of the ring code does).
> 
> >
> >         for (i = 0; i < (sz & ~7); i += 8)
> >                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
> I had used memcpy to start with (for the entire copy operation), performance is not the same for 64b elements when compared with the
> existing ring APIs (some cases more and some cases less).

I remember that from one of your previous mails, that's why here I suggest to use in a loop memcpy() with fixed size.
That way for each iteration complier will replace memcpy() with instructions to copy 32B in a way he thinks is optimal
(same as for original macro, I think).

> 
> IMO, we have to keep the performance of the 64b and 128b the same as what we get with the existing ring and event-ring APIs. That would
> allow us to replace them with these new APIs. I suggest that we keep the macros in this patch for 64b and 128b.

I still think we probably can achieve that without duplicating macros, while still supporting arbitrary elem size.
See above.

> For the rest of the sizes, we could put a for loop around 32b macro (this would allow for all sizes as well).
> 
> >
> >         switch (sz & 7) {
> >         case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
> >         case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
> >         case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
> >         case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
> >         case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
> >         case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
> >         case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
> >         }
> > }
> >
> > static inline void
> > enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
> >                 void *obj_table, uint32_t num, uint32_t esize) {
> >         uint32_t idx, n;
> >         uint32_t *du32;
> >
> >         const uint32_t size = r->size;
> >
> >         idx = prod_head & (r)->mask;
> >
> >         du32 = ring_start + idx * sizeof(uint32_t);
> >
> >         if (idx + num < size)
> >                 copy_elems(du32, obj_table, num, esize);
> >         else {
> >                 n = size - idx;
> >                 copy_elems(du32, obj_table, n, esize);
> >                 copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
> >                         num - n, esize);
> >         }
> > }
> >
> > And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:
> >
> > enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);
> >
> >
> > > > +{
> > > > +	uint32_t prod_head, prod_next;
> > > > +	uint32_t free_entries;
> > > > +
> > > > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > > > +			&prod_head, &prod_next, &free_entries);
> > > > +	if (n == 0)
> > > > +		goto end;
> > > > +
> > > > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> > > > +
> > > > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > > > +end:
> > > > +	if (free_space != NULL)
> > > > +		*free_space = free_entries - n;
> > > > +	return n;
> > > > +}
> > > > +


More information about the dev mailing list