[dpdk-dev] [PATCH v7 02/17] lib/ring: apis to support configurable element size

Ananyev, Konstantin konstantin.ananyev at intel.com
Thu Jan 2 17:42:31 CET 2020


> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> new file mode 100644
> index 000000000..fc7fe127c
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -0,0 +1,1002 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2019 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_H_
> +#define _RTE_RING_ELEM_H_
> +
> +/**
> + * @file
> + * RTE Ring with user defined element size
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Calculate the memory size needed for a ring with given element size
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the number of elements in it and the size of the element. This value
> + * is the sum of the size of the structure rte_ring and the size of the
> + * memory needed for storing the elements. The value is aligned to a cache
> + * line size.
> + *
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @return
> + *   - The memory size needed for the ring on success.
> + *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
> + *		 power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a new ring named *name* that stores elements with given size.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable ring size
> + * is *count-1* instead of *count* to differentiate a free ring from an
> + * empty ring.
> + *
> + * The ring is added in RTE_TAILQ_RING list.
> + *
> + * @param name
> + *   The name of the ring.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated ring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process instance
> + *    - EINVAL - esize is not a multiple of 4 or count provided is not a
> + *		 power of 2.
> + *    - ENOSPC - the maximum number of memzones has already been allocated
> + *    - EEXIST - a memzone with the same name already exists
> + *    - ENOMEM - no appropriate memory area found in which to create memzone
> + */
> +__rte_experimental
> +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
> +			unsigned int count, int socket_id, unsigned int flags);
> +
> +static __rte_always_inline void
> +enqueue_elems_32(struct rte_ring *r, uint32_t idx,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t *ring = (uint32_t *)&r[1];
> +	const uint32_t *obj = (const uint32_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +			ring[idx + 4] = obj[i + 4];
> +			ring[idx + 5] = obj[i + 5];
> +			ring[idx + 6] = obj[i + 6];
> +			ring[idx + 7] = obj[i + 7];
> +		}
> +		switch (n & 0x7) {
> +		case 7:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 6:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 5:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 4:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	uint64_t *ring = (uint64_t *)&r[1];
> +	const uint64_t *obj = (const uint64_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++];
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	__uint128_t *ring = (__uint128_t *)&r[1];
> +	const __uint128_t *obj = (const __uint128_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];


AFAIK, that implies 16B aligned obj_table...
Would it always be the case?  

> +		}
> +		switch (n & 0x1) {
> +		case 1:
> +			ring[idx++] = obj[i++];
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +/* the actual enqueue of elements on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +static __rte_always_inline void
> +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
> +		uint32_t esize, uint32_t num)
> +{
> +	uint32_t idx, nr_idx, nr_num;
> +
> +	/* 8B and 16B copies implemented individually to retain
> +	 * the current performance.
> +	 */
> +	if (esize == 8)
> +		enqueue_elems_64(r, prod_head, obj_table, num);
> +	else if (esize == 16)
> +		enqueue_elems_128(r, prod_head, obj_table, num);
> +	else {
> +		/* Normalize to uint32_t */
> +		uint32_t scale = esize / sizeof(uint32_t);
> +		nr_num = num * scale;
> +		idx = prod_head & r->mask;
> +		nr_idx = idx * scale;
> +		enqueue_elems_32(r, nr_idx, obj_table, nr_num);
> +	}
> +}
> +


More information about the dev mailing list