[dpdk-dev] [PATCH v9 2/6] lib/ring: apis to support configurable element size

Ananyev, Konstantin konstantin.ananyev at intel.com
Sat Jan 18 13:32:23 CET 2020


> > On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > > Current APIs assume ring elements to be pointers. However, in many use
> > > cases, the size can be different. Add new APIs to support configurable
> > > ring element sizes.
> > >
> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
> > > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> > > Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> > > ---
> > >  lib/librte_ring/Makefile             |    3 +-
> > >  lib/librte_ring/meson.build          |    4 +
> > >  lib/librte_ring/rte_ring.c           |   41 +-
> > >  lib/librte_ring/rte_ring.h           |    1 +
> > >  lib/librte_ring/rte_ring_elem.h      | 1003 ++++++++++++++++++++++++++
> > >  lib/librte_ring/rte_ring_version.map |    2 +
> > >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > > 100644 lib/librte_ring/rte_ring_elem.h
> > >
> >
> > [...]
> >
> > > +static __rte_always_inline void
> > > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	uint32_t *ring = (uint32_t *)&r[1];
> > > +	const uint32_t *obj = (const uint32_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > > +			ring[idx] = obj[i];
> > > +			ring[idx + 1] = obj[i + 1];
> > > +			ring[idx + 2] = obj[i + 2];
> > > +			ring[idx + 3] = obj[i + 3];
> > > +			ring[idx + 4] = obj[i + 4];
> > > +			ring[idx + 5] = obj[i + 5];
> > > +			ring[idx + 6] = obj[i + 6];
> > > +			ring[idx + 7] = obj[i + 7];
> > > +		}
> > > +		switch (n & 0x7) {
> > > +		case 7:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 6:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 5:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 4:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 3:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 2:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 1:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +	}
> > > +}
> > > +
> > > +static __rte_always_inline void
> > > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	const uint32_t size = r->size;
> > > +	uint32_t idx = prod_head & r->mask;
> > > +	uint64_t *ring = (uint64_t *)&r[1];
> > > +	const uint64_t *obj = (const uint64_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > > +			ring[idx] = obj[i];
> > > +			ring[idx + 1] = obj[i + 1];
> > > +			ring[idx + 2] = obj[i + 2];
> > > +			ring[idx + 3] = obj[i + 3];
> > > +		}
> > > +		switch (n & 0x3) {
> > > +		case 3:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 2:
> > > +			ring[idx++] = obj[i++]; /* fallthrough */
> > > +		case 1:
> > > +			ring[idx++] = obj[i++];
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			ring[idx] = obj[i];
> > > +	}
> > > +}
> > > +
> > > +static __rte_always_inline void
> > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > > +		const void *obj_table, uint32_t n)
> > > +{
> > > +	unsigned int i;
> > > +	const uint32_t size = r->size;
> > > +	uint32_t idx = prod_head & r->mask;
> > > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > > +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > > +	if (likely(idx + n < size)) {
> > > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 32);
> > > +		switch (n & 0x1) {
> > > +		case 1:
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +		}
> > > +	} else {
> > > +		for (i = 0; idx < size; i++, idx++)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +		/* Start at the beginning */
> > > +		for (idx = 0; i < n; i++, idx++)
> > > +			memcpy((void *)(ring + idx),
> > > +				(const void *)(obj + i), 16);
> > > +	}
> > > +}
> > > +
> > > +/* the actual enqueue of elements on the ring.
> > > + * Placed here since identical code needed in both
> > > + * single and multi producer enqueue functions.
> > > + */
> > > +static __rte_always_inline void
> > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > *obj_table,
> > > +		uint32_t esize, uint32_t num)
> > > +{
> > > +	/* 8B and 16B copies implemented individually to retain
> > > +	 * the current performance.
> > > +	 */
> > > +	if (esize == 8)
> > > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > > +	else if (esize == 16)
> > > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > > +	else {
> > > +		uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > > +
> > > +		/* Normalize to uint32_t */
> > > +		scale = esize / sizeof(uint32_t);
> > > +		nr_num = num * scale;
> > > +		idx = prod_head & r->mask;
> > > +		nr_idx = idx * scale;
> > > +		nr_size = r->size * scale;
> > > +		enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > > +	}
> > > +}
> >
> > Following Konstatin's comment on v7, enqueue_elems_128() was modified to
> > ensure it won't crash if the object is unaligned. Are we sure that this same
> > problem cannot also occurs with 64b copies on all supported architectures? (I
> > mean 64b access that is only aligned on 32b)
> Konstantin mentioned that the 64b load/store instructions on x86 can handle unaligned access.

Yep, I think we are ok here for IA and IA-32.

> On aarch64, the load/store (non-atomic,
> which will be used in this case) can handle unaligned access.
> 
> + David Christensen to comment for PPC

If we are in doubt here, probably worth to add a new test-case(s) for UT?

> 
> >
> > Out of curiosity, would it make a big perf difference to only use
> > enqueue_elems_32()?
> Yes, this was having a significant impact on 128b elements. I did not try on 64b elements.
> I will run the perf test with 32b copy for 64b element size and get back.


More information about the dev mailing list