[dpdk-dev] [PATCH v4 1/2] lib/ring: apis to support configurable element size

Ananyev, Konstantin konstantin.ananyev at intel.com
Thu Oct 17 13:51:41 CEST 2019


> > > > > > Current APIs assume ring elements to be pointers. However, in
> > > > > > many use cases, the size can be different. Add new APIs to
> > > > > > support configurable ring element sizes.
> > > > > >
> > > > > > Signed-off-by: Honnappa Nagarahalli
> > > > > > <honnappa.nagarahalli at arm.com>
> > > > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
> > > > > > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > > > > > Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> > > > > > ---
> > > > > >  lib/librte_ring/Makefile             |   3 +-
> > > > > >  lib/librte_ring/meson.build          |   3 +
> > > > > >  lib/librte_ring/rte_ring.c           |  45 +-
> > > > > >  lib/librte_ring/rte_ring.h           |   1 +
> > > > > >  lib/librte_ring/rte_ring_elem.h      | 946
> > +++++++++++++++++++++++++++
> > > > > >  lib/librte_ring/rte_ring_version.map |   2 +
> > > > > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > > > >
> > > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > > > > index 21a36770d..515a967bb 100644
> > > > > > --- a/lib/librte_ring/Makefile
> > > > > > +++ b/lib/librte_ring/Makefile
> 
> <snip>
> 
> > > > > > +
> > > > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > > > > +experimental allow_experimental_apis = true
> > > > > > diff --git a/lib/librte_ring/rte_ring.c
> > > > > > b/lib/librte_ring/rte_ring.c index d9b308036..6fed3648b 100644
> > > > > > --- a/lib/librte_ring/rte_ring.c
> > > > > > +++ b/lib/librte_ring/rte_ring.c
> > > > > > @@ -33,6 +33,7 @@
> > > > > >  #include <rte_tailq.h>
> > > > > >
> > > > > >  #include "rte_ring.h"
> > > > > > +#include "rte_ring_elem.h"
> > > > > >
> 
> <snip>
> 
> > > > > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > > > > 000000000..860f059ad
> > > > > > --- /dev/null
> > > > > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > > > > @@ -0,0 +1,946 @@
> > > > > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > > > > + *
> > > > > > + * Copyright (c) 2019 Arm Limited
> > > > > > + * Copyright (c) 2010-2017 Intel Corporation
> > > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> > > > > > + * All rights reserved.
> > > > > > + * Derived from FreeBSD's bufring.h
> > > > > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > > > > + */
> > > > > > +
> > > > > > +#ifndef _RTE_RING_ELEM_H_
> > > > > > +#define _RTE_RING_ELEM_H_
> > > > > > +
> 
> <snip>
> 
> > > > > > +
> > > > > > +/* the actual enqueue of pointers on the ring.
> > > > > > + * Placed here since identical code needed in both
> > > > > > + * single and multi producer enqueue functions.
> > > > > > + */
> > > > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > > > > +esize, n)
> > > > > > do { \
> > > > > > +	if (esize == 4) \
> > > > > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 8) \
> > > > > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 16) \
> > > > > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head,
> > obj_table, n);
> > > > \ }
> > > > > > while
> > > > > > +(0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n)
> > do { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8)
> > { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > > +			ring[idx + 4] = obj[i + 4]; \
> > > > > > +			ring[idx + 5] = obj[i + 5]; \
> > > > > > +			ring[idx + 6] = obj[i + 6]; \
> > > > > > +			ring[idx + 7] = obj[i + 7]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x7) { \
> > > > > > +		case 7: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 6: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 5: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 4: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 3: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n)
> > do { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4)
> > { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x3) { \
> > > > > > +		case 3: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table,
> > > > > > +n) do
> > > > { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x1) { \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +/* the actual copy of pointers on the ring to obj_table.
> > > > > > + * Placed here since identical code needed in both
> > > > > > + * single and multi consumer dequeue functions.
> > > > > > + */
> > > > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > > > > +esize, n)
> > > > > > do { \
> > > > > > +	if (esize == 4) \
> > > > > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 8) \
> > > > > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 16) \
> > > > > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head,
> > obj_table, n);
> > > > \ }
> > > > > > while
> > > > > > +(0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8)
> > {\
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > > +			obj[i + 4] = ring[idx + 4]; \
> > > > > > +			obj[i + 5] = ring[idx + 5]; \
> > > > > > +			obj[i + 6] = ring[idx + 6]; \
> > > > > > +			obj[i + 7] = ring[idx + 7]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x7) { \
> > > > > > +		case 7: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 6: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 5: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 4: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 3: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4)
> > {\
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x3) { \
> > > > > > +		case 3: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table,
> > > > > > +n) do
> > > > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x1) { \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +/* Between load and load. there might be cpu reorder in weak
> > > > > > +model
> > > > > > + * (powerpc/arm).
> > > > > > + * There are 2 choices for the users
> > > > > > + * 1.use rmb() memory barrier
> > > > > > + * 2.use one-direction load_acquire/store_release
> > > > > > +barrier,defined by
> > > > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > > > > + * It depends on performance test results.
> > > > > > + * By default, move common functions to rte_ring_generic.h  */
> > > > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > > > > +#else
> > > > > > +#include "rte_ring_generic.h"
> > > > > > +#endif
> > > > > > +
> > > > > > +/**
> > > > > > + * @internal Enqueue several objects on the ring
> > > > > > + *
> > > > > > + * @param r
> > > > > > + *   A pointer to the ring structure.
> > > > > > + * @param obj_table
> > > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > > + * @param esize
> > > > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the
> > same
> > > > > > + *   as passed while creating the ring, otherwise the results are
> > undefined.
> > > > > > + * @param n
> > > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > > + * @param behavior
> > > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items
> > from a
> > > > ring
> > > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > from
> > > > > > ring
> > > > > > + * @param is_sp
> > > > > > + *   Indicates whether to use single producer or multi-producer head
> > > > update
> > > > > > + * @param free_space
> > > > > > + *   returns the amount of space after the enqueue operation has
> > > > finished
> > > > > > + * @return
> > > > > > + *   Actual number of objects enqueued.
> > > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > > + */
> > > > > > +static __rte_always_inline unsigned int
> > > > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const
> > obj_table,
> > > > > > +		unsigned int esize, unsigned int n,
> > > > > > +		enum rte_ring_queue_behavior behavior, unsigned
> > int is_sp,
> > > > > > +		unsigned int *free_space)
> > > >
> > > >
> > > > I like the idea to add esize as an argument to the public API, so
> > > > the compiler can do it's jib optimizing calls with constant esize.
> > > > Though I am not very happy with the rest of implementation:
> > > > 1. It doesn't really provide configurable elem size - only 4/8/16B
> > > > elems are supported.
> > > Agree. I was thinking other sizes can be added on need basis.
> > > However, I am wondering if we should just provide for 4B and then the
> > users can use bulk operations to construct whatever they need?
> >
> > I suppose it could be plan B... if there would be no agreement on generic case.
> > And for 4B elems, I guess you do have a particular use-case?
> Yes
> 
> >
> > > It
> > > would mean extra work for the users.
> > >
> > > > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> > > > macros.
> > > >
> > > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always
> > > > does 32B copy per iteration.
> > > Yes, I tried to keep it the same as the existing one (originally, I
> > > guess the intention was to allow for 256b vector instructions to be
> > > generated)
> > >
> > > > So wonder can we make a generic function that would do 32B copy per
> > > > iteration in a main loop, and copy tail  by 4B chunks?
> > > > That would avoid copy duplication and will allow user to have any
> > > > elem size (multiple of 4B) he wants.
> > > > Something like that (note didn't test it, just a rough idea):
> > > >
> > > >  static inline void
> > > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num,
> > > > uint32_t
> > > > esize) {
> > > >         uint32_t i, sz;
> > > >
> > > >         sz = (num * esize) / sizeof(uint32_t);
> > > If 'num' is a compile time constant, 'sz' will be a compile time constant.
> > Otherwise, this will result in a multiplication operation.
> >
> > Not always.
> > If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it
> > would be just one shift.
> > For other constant values it could be a 'mul' or in many cases just 2 shifts plus
> > 'add' (if compiler is smart enough).
> > I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num <<
> > 1).
> With num * 15 it has to be (num << 3) + (num << 2) + (num << 1) + num
> Not sure if the compiler will do this.

For 15, it can be just (num << 4) - num

> 
> > I suppose for non-power of 2 elems it might be ok to get such small perf hit.
> Agree, should be ok not to focus on right now.
> 
> >
> > >I have tried
> > > to avoid the multiplication operation and try to use shift and mask
> > operations (just like how the rest of the ring code does).
> > >
> > > >
> > > >         for (i = 0; i < (sz & ~7); i += 8)
> > > >                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
> > > I had used memcpy to start with (for the entire copy operation),
> > > performance is not the same for 64b elements when compared with the
> > existing ring APIs (some cases more and some cases less).
> >
> > I remember that from one of your previous mails, that's why here I suggest to
> > use in a loop memcpy() with fixed size.
> > That way for each iteration complier will replace memcpy() with instructions
> > to copy 32B in a way he thinks is optimal (same as for original macro, I think).
> I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as follows. The numbers in brackets are with the code on master.
> gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0
> 
> RTE>>ring_perf_elem_autotest
> ### Testing single element and burst enq/deq ###
> SP/SC single enq/dequeue: 5
> MP/MC single enq/dequeue: 40 (35)
> SP/SC burst enq/dequeue (size: 8): 2
> MP/MC burst enq/dequeue (size: 8): 6
> SP/SC burst enq/dequeue (size: 32): 1 (2)
> MP/MC burst enq/dequeue (size: 32): 2
> 
> ### Testing empty dequeue ###
> SC empty dequeue: 2.11
> MC empty dequeue: 1.41 (2.11)
> 
> ### Testing using a single lcore ###
> SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86)
> MP/MC bulk enq/dequeue (size: 8): 6.35 (6.91)
> SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06)
> MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)
> 
> ### Testing using two physical cores ###
> SP/SC bulk enq/dequeue (size: 8): 73.81 (15.33)
> MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
> SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58)
> MP/MC bulk enq/dequeue (size: 32): 25.74 (20.91)
> 
> ### Testing using two NUMA nodes ###
> SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66)
> MP/MC bulk enq/dequeue (size: 8): 176.02 (173.43)
> SP/SC bulk enq/dequeue (size: 32): 50.78 (23)
> MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)
> 
> On one of the Arm platform
> MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are ok)

So it shows better numbers for one core, but worse on 2, right?

 
> On another Arm platform, all numbers are same or slightly better.
> 
> I can post the patch with this change if you want to run some benchmarks on your platform.

Sure, please do.
I'll try to run on my boxes.

> I have not used the same code you have suggested, instead I have used the same logic in a single macro with memcpy.
> 



More information about the dev mailing list