[PATCH v2 1/2] deque: add multi-thread unsafe double ended queue
Morten Brørup
mb at smartsharesystems.com
Wed Apr 24 17:16:50 CEST 2024
[...]
> +
> +/* mask of all valid flag values to deque_create() */
> +#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
> +ssize_t
> +rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
> +{
> + ssize_t sz;
> +
> + /* Check if element size is a multiple of 4B */
> + if (esize % 4 != 0) {
> + rte_log(RTE_LOG_ERR, rte_deque_log_type,
> + "%s(): element size is not a multiple of 4\n",
> + __func__);
Double indent when continuing on the next line:
+ rte_log(RTE_LOG_ERR, rte_deque_log_type,
+ "%s(): element size is not a multiple of 4\n",
+ __func__);
Not just here, but multiple locations in the code.
> +
> + return -EINVAL;
> + }
> +
> + /* count must be a power of 2 */
> + if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
> + rte_log(RTE_LOG_ERR, rte_deque_log_type,
> + "%s(): Requested number of elements is invalid,"
> + "must be power of 2, and not exceed %u\n",
> + __func__, RTE_DEQUE_SZ_MASK);
Please use shorter error messages, so they can fit on one line in the source code.
Note: DPDK coding style allows 100 chars source code line length, not just 80.
[...]
> +/* create the deque for a given element size */
> +struct rte_deque *
> +rte_deque_create(const char *name, unsigned int esize, unsigned int count,
> + int socket_id, unsigned int flags)
> +{
> + char mz_name[RTE_MEMZONE_NAMESIZE];
> + struct rte_deque *d;
> + const struct rte_memzone *mz;
> + ssize_t deque_size;
> + int mz_flags = 0;
> + const unsigned int requested_count = count;
> + int ret;
> +
> + /* for an exact size deque, round up from count to a power of two */
> + if (flags & RTE_DEQUE_F_EXACT_SZ)
> + count = rte_align32pow2(count + 1);
> +
> + deque_size = rte_deque_get_memsize_elem(esize, count);
> + if (deque_size < 0) {
> + rte_errno = -deque_size;
> + return NULL;
> + }
> +
> + ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> + RTE_DEQUE_MZ_PREFIX, name);
> + if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> + rte_errno = ENAMETOOLONG;
> + return NULL;
> + }
> +
> + /* reserve a memory zone for this deque. If we can't get rte_config or
> + * we are secondary process, the memzone_reserve function will set
> + * rte_errno for us appropriately - hence no check in this function
> + */
> + mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
> + mz_flags, alignof(struct rte_deque));
> + if (mz != NULL) {
> + d = mz->addr;
> + /* no need to check return value here, we already checked the
> + * arguments above
> + */
> + rte_deque_init(d, name, requested_count, flags);
rte_deque_init() error handling is missing here.
> + d->memzone = mz;
> + } else {
> + d = NULL;
> + rte_log(RTE_LOG_ERR, rte_deque_log_type,
> + "%s(): Cannot reserve memory\n", __func__);
> + }
> + return d;
> +}
[...]
> +#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
> +/** The maximum length of a deque name. */
> +#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> + sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
> +
> +/**
> + * Double ended queue (deque) structure.
> + *
> + * The producer and the consumer have a head and a tail index. These indices
> + * are not between 0 and size(deque)-1. These indices are between 0 and
> + * 2^32 -1. Their value is masked while accessing the objects in deque.
> + * These indices are unsigned 32bits. Hence the result of the subtraction is
> + * always a modulo of 2^32 and it is between 0 and capacity.
> + */
> +struct rte_deque {
> + alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
Suggest alternative:
+struct __rte_cache_aligned rte_deque {
+ char name[RTE_DEQUE_NAMESIZE];
> + /**< Name of the deque */
> + int flags;
> + /**< Flags supplied at creation. */
> + const struct rte_memzone *memzone;
> + /**< Memzone, if any, containing the rte_deque */
> +
> + alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
Why the cache alignment here?
If required, omit the pad0 field and cache align the size field instead.
Alternatively, use RTE_CACHE_GUARD, if that is what you are trying to achieve.
> +
> + uint32_t size; /**< Size of deque. */
> + uint32_t mask; /**< Mask (size-1) of deque. */
> + uint32_t capacity; /**< Usable size of deque */
> + /** Ring head and tail pointers. */
> + volatile uint32_t head;
> + volatile uint32_t tail;
> +};
[...]
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
> + const void *obj_table,
> + unsigned int n)
> +{
> + unsigned int i;
> + const uint32_t size = d->size;
> + uint32_t idx = (d->head & d->mask);
> + rte_int128_t *deque = (rte_int128_t *)&d[1];
> + const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> + if (likely(idx + n <= size)) {
> + for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> + memcpy((void *)(deque + idx),
> + (const void *)(obj + i), 32);
With 100 chars source code line length, this memcpy() fits on one line.
Not just here, but in all the functions.
> + switch (n & 0x1) {
> + case 1:
> + memcpy((void *)(deque + idx),
> + (const void *)(obj + i), 16);
> + }
> + } else {
> + for (i = 0; idx < size; i++, idx++)
> + memcpy((void *)(deque + idx),
> + (const void *)(obj + i), 16);
> + /* Start at the beginning */
> + for (idx = 0; i < n; i++, idx++)
> + memcpy((void *)(deque + idx),
> + (const void *)(obj + i), 16);
> + }
> +}
More information about the dev
mailing list