[dpdk-dev] [PATCH v7 02/17] lib/ring: apis to support configurable element size
Ananyev, Konstantin
konstantin.ananyev at intel.com
Thu Jan 2 17:42:31 CET 2020
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> new file mode 100644
> index 000000000..fc7fe127c
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -0,0 +1,1002 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2019 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_H_
> +#define _RTE_RING_ELEM_H_
> +
> +/**
> + * @file
> + * RTE Ring with user defined element size
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Calculate the memory size needed for a ring with given element size
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the number of elements in it and the size of the element. This value
> + * is the sum of the size of the structure rte_ring and the size of the
> + * memory needed for storing the elements. The value is aligned to a cache
> + * line size.
> + *
> + * @param esize
> + * The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + * The number of elements in the ring (must be a power of 2).
> + * @return
> + * - The memory size needed for the ring on success.
> + * - -EINVAL - esize is not a multiple of 4 or count provided is not a
> + * power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a new ring named *name* that stores elements with given size.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable ring size
> + * is *count-1* instead of *count* to differentiate a free ring from an
> + * empty ring.
> + *
> + * The ring is added in RTE_TAILQ_RING list.
> + *
> + * @param name
> + * The name of the ring.
> + * @param esize
> + * The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + * The number of elements in the ring (must be a power of 2).
> + * @param socket_id
> + * The *socket_id* argument is the socket identifier in case of
> + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + * constraint for the reserved zone.
> + * @param flags
> + * An OR of the following:
> + * - RING_F_SP_ENQ: If this flag is set, the default behavior when
> + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> + * is "single-producer". Otherwise, it is "multi-producers".
> + * - RING_F_SC_DEQ: If this flag is set, the default behavior when
> + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> + * is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + * On success, the pointer to the new allocated ring. NULL on error with
> + * rte_errno set appropriately. Possible errno values include:
> + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
> + * - E_RTE_SECONDARY - function was called from a secondary process instance
> + * - EINVAL - esize is not a multiple of 4 or count provided is not a
> + * power of 2.
> + * - ENOSPC - the maximum number of memzones has already been allocated
> + * - EEXIST - a memzone with the same name already exists
> + * - ENOMEM - no appropriate memory area found in which to create memzone
> + */
> +__rte_experimental
> +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
> + unsigned int count, int socket_id, unsigned int flags);
> +
> +static __rte_always_inline void
> +enqueue_elems_32(struct rte_ring *r, uint32_t idx,
> + const void *obj_table, uint32_t n)
> +{
> + unsigned int i;
> + const uint32_t size = r->size;
> + uint32_t *ring = (uint32_t *)&r[1];
> + const uint32_t *obj = (const uint32_t *)obj_table;
> + if (likely(idx + n < size)) {
> + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> + ring[idx] = obj[i];
> + ring[idx + 1] = obj[i + 1];
> + ring[idx + 2] = obj[i + 2];
> + ring[idx + 3] = obj[i + 3];
> + ring[idx + 4] = obj[i + 4];
> + ring[idx + 5] = obj[i + 5];
> + ring[idx + 6] = obj[i + 6];
> + ring[idx + 7] = obj[i + 7];
> + }
> + switch (n & 0x7) {
> + case 7:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 6:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 5:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 4:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 3:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 2:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 1:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + }
> + } else {
> + for (i = 0; idx < size; i++, idx++)
> + ring[idx] = obj[i];
> + /* Start at the beginning */
> + for (idx = 0; i < n; i++, idx++)
> + ring[idx] = obj[i];
> + }
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> + const void *obj_table, uint32_t n)
> +{
> + unsigned int i;
> + const uint32_t size = r->size;
> + uint32_t idx = prod_head & r->mask;
> + uint64_t *ring = (uint64_t *)&r[1];
> + const uint64_t *obj = (const uint64_t *)obj_table;
> + if (likely(idx + n < size)) {
> + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> + ring[idx] = obj[i];
> + ring[idx + 1] = obj[i + 1];
> + ring[idx + 2] = obj[i + 2];
> + ring[idx + 3] = obj[i + 3];
> + }
> + switch (n & 0x3) {
> + case 3:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 2:
> + ring[idx++] = obj[i++]; /* fallthrough */
> + case 1:
> + ring[idx++] = obj[i++];
> + }
> + } else {
> + for (i = 0; idx < size; i++, idx++)
> + ring[idx] = obj[i];
> + /* Start at the beginning */
> + for (idx = 0; i < n; i++, idx++)
> + ring[idx] = obj[i];
> + }
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> + const void *obj_table, uint32_t n)
> +{
> + unsigned int i;
> + const uint32_t size = r->size;
> + uint32_t idx = prod_head & r->mask;
> + __uint128_t *ring = (__uint128_t *)&r[1];
> + const __uint128_t *obj = (const __uint128_t *)obj_table;
> + if (likely(idx + n < size)) {
> + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
> + ring[idx] = obj[i];
> + ring[idx + 1] = obj[i + 1];
AFAIK, that implies 16B aligned obj_table...
Would it always be the case?
> + }
> + switch (n & 0x1) {
> + case 1:
> + ring[idx++] = obj[i++];
> + }
> + } else {
> + for (i = 0; idx < size; i++, idx++)
> + ring[idx] = obj[i];
> + /* Start at the beginning */
> + for (idx = 0; i < n; i++, idx++)
> + ring[idx] = obj[i];
> + }
> +}
> +
> +/* the actual enqueue of elements on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +static __rte_always_inline void
> +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
> + uint32_t esize, uint32_t num)
> +{
> + uint32_t idx, nr_idx, nr_num;
> +
> + /* 8B and 16B copies implemented individually to retain
> + * the current performance.
> + */
> + if (esize == 8)
> + enqueue_elems_64(r, prod_head, obj_table, num);
> + else if (esize == 16)
> + enqueue_elems_128(r, prod_head, obj_table, num);
> + else {
> + /* Normalize to uint32_t */
> + uint32_t scale = esize / sizeof(uint32_t);
> + nr_num = num * scale;
> + idx = prod_head & r->mask;
> + nr_idx = idx * scale;
> + enqueue_elems_32(r, nr_idx, obj_table, nr_num);
> + }
> +}
> +
More information about the dev
mailing list