[dpdk-dev] [PATCH v9 2/6] lib/ring: apis to support configurable element size

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Jan 17 17:45:17 CET 2020


<snip>

> 
> Hi Honnappa,
Thanks Olivier for your review, appreciate your feedback.

> 
> On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > Current APIs assume ring elements to be pointers. However, in many use
> > cases, the size can be different. Add new APIs to support configurable
> > ring element sizes.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> > ---
> >  lib/librte_ring/Makefile             |    3 +-
> >  lib/librte_ring/meson.build          |    4 +
> >  lib/librte_ring/rte_ring.c           |   41 +-
> >  lib/librte_ring/rte_ring.h           |    1 +
> >  lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
> >  lib/librte_ring/rte_ring_version.map |    2 +
> >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > 100644 lib/librte_ring/rte_ring_elem.h
> >
> 
> [...]
> 
> > +static __rte_always_inline void
> > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	uint32_t *ring = (uint32_t *)&r[1];
> > +	const uint32_t *obj = (const uint32_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > +			ring[idx] = obj[i];
> > +			ring[idx + 1] = obj[i + 1];
> > +			ring[idx + 2] = obj[i + 2];
> > +			ring[idx + 3] = obj[i + 3];
> > +			ring[idx + 4] = obj[i + 4];
> > +			ring[idx + 5] = obj[i + 5];
> > +			ring[idx + 6] = obj[i + 6];
> > +			ring[idx + 7] = obj[i + 7];
> > +		}
> > +		switch (n & 0x7) {
> > +		case 7:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 6:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 5:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 4:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 3:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 2:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 1:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			ring[idx] = obj[i];
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			ring[idx] = obj[i];
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	const uint32_t size = r->size;
> > +	uint32_t idx = prod_head & r->mask;
> > +	uint64_t *ring = (uint64_t *)&r[1];
> > +	const uint64_t *obj = (const uint64_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > +			ring[idx] = obj[i];
> > +			ring[idx + 1] = obj[i + 1];
> > +			ring[idx + 2] = obj[i + 2];
> > +			ring[idx + 3] = obj[i + 3];
> > +		}
> > +		switch (n & 0x3) {
> > +		case 3:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 2:
> > +			ring[idx++] = obj[i++]; /* fallthrough */
> > +		case 1:
> > +			ring[idx++] = obj[i++];
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			ring[idx] = obj[i];
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			ring[idx] = obj[i];
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	const uint32_t size = r->size;
> > +	uint32_t idx = prod_head & r->mask;
> > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 32);
> > +		switch (n & 0x1) {
> > +		case 1:
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			memcpy((void *)(ring + idx),
> > +				(const void *)(obj + i), 16);
> > +	}
> > +}
> > +
> > +/* the actual enqueue of elements on the ring.
> > + * Placed here since identical code needed in both
> > + * single and multi producer enqueue functions.
> > + */
> > +static __rte_always_inline void
> > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> *obj_table,
> > +		uint32_t esize, uint32_t num)
> > +{
> > +	/* 8B and 16B copies implemented individually to retain
> > +	 * the current performance.
> > +	 */
> > +	if (esize == 8)
> > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > +	else if (esize == 16)
> > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > +	else {
> > +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > +
> > +		/* Normalize to uint32_t */
> > +		scale = esize / sizeof(uint32_t);
> > +		nr_num = num * scale;
> > +		idx = prod_head & r->mask;
> > +		nr_idx = idx * scale;
> > +		nr_size = r->size * scale;
> > +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > +	}
> > +}
> 
> Following Konstatin's comment on v7, enqueue_elems_128() was modified to
> ensure it won't crash if the object is unaligned. Are we sure that this same
> problem cannot also occurs with 64b copies on all supported architectures? (I
> mean 64b access that is only aligned on 32b)
Konstantin mentioned that the 64b load/store instructions on x86 can handle unaligned access. On aarch64, the load/store (non-atomic, which will be used in this case) can handle unaligned access.

+ David Christensen to comment for PPC

> 
> Out of curiosity, would it make a big perf difference to only use
> enqueue_elems_32()?
Yes, this was having a significant impact on 128b elements. I did not try on 64b elements.
I will run the perf test with 32b copy for 64b element size and get back.


More information about the dev mailing list