[PATCH v2 1/2] deque: add multi-thread unsafe double ended queue

Morten Brørup mb at smartsharesystems.com
Wed Apr 24 17:16:50 CEST 2024


[...]

> +
> +/* mask of all valid flag values to deque_create() */
> +#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
> +ssize_t
> +rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
> +{
> +	ssize_t sz;
> +
> +	/* Check if element size is a multiple of 4B */
> +	if (esize % 4 != 0) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): element size is not a multiple of 4\n",
> +			__func__);

Double indent when continuing on the next line:

+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+				"%s(): element size is not a multiple of 4\n",
+				__func__);

Not just here, but multiple locations in the code.

> +
> +		return -EINVAL;
> +	}
> +
> +	/* count must be a power of 2 */
> +	if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Requested number of elements is invalid,"
> +			"must be power of 2, and not exceed %u\n",
> +			__func__, RTE_DEQUE_SZ_MASK);

Please use shorter error messages, so they can fit on one line in the source code.

Note: DPDK coding style allows 100 chars source code line length, not just 80.

[...]

> +/* create the deque for a given element size */
> +struct rte_deque *
> +rte_deque_create(const char *name, unsigned int esize, unsigned int count,
> +		int socket_id, unsigned int flags)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_deque *d;
> +	const struct rte_memzone *mz;
> +	ssize_t deque_size;
> +	int mz_flags = 0;
> +	const unsigned int requested_count = count;
> +	int ret;
> +
> +	/* for an exact size deque, round up from count to a power of two */
> +	if (flags & RTE_DEQUE_F_EXACT_SZ)
> +		count = rte_align32pow2(count + 1);
> +
> +	deque_size = rte_deque_get_memsize_elem(esize, count);
> +	if (deque_size < 0) {
> +		rte_errno = -deque_size;
> +		return NULL;
> +	}
> +
> +	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> +		RTE_DEQUE_MZ_PREFIX, name);
> +	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> +		rte_errno = ENAMETOOLONG;
> +		return NULL;
> +	}
> +
> +	/* reserve a memory zone for this deque. If we can't get rte_config or
> +	 * we are secondary process, the memzone_reserve function will set
> +	 * rte_errno for us appropriately - hence no check in this function
> +	 */
> +	mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
> +					 mz_flags, alignof(struct rte_deque));
> +	if (mz != NULL) {
> +		d = mz->addr;
> +		/* no need to check return value here, we already checked the
> +		 * arguments above
> +		 */
> +		rte_deque_init(d, name, requested_count, flags);

rte_deque_init() error handling is missing here.

> +		d->memzone = mz;
> +	} else {
> +		d = NULL;
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Cannot reserve memory\n", __func__);
> +	}
> +	return d;
> +}

[...]

> +#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
> +/** The maximum length of a deque name. */
> +#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
> +
> +/**
> + * Double ended queue (deque) structure.
> + *
> + * The producer and the consumer have a head and a tail index. These indices
> + * are not between 0 and size(deque)-1. These indices are between 0 and
> + * 2^32 -1. Their value is masked while accessing the objects in deque.
> + * These indices are unsigned 32bits. Hence the result of the subtraction is
> + * always a modulo of 2^32 and it is between 0 and capacity.
> + */
> +struct rte_deque {
> +	alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];

Suggest alternative:
+struct __rte_cache_aligned rte_deque {
+	char name[RTE_DEQUE_NAMESIZE];

> +	/**< Name of the deque */
> +	int flags;
> +	/**< Flags supplied at creation. */
> +	const struct rte_memzone *memzone;
> +	/**< Memzone, if any, containing the rte_deque */
> +
> +	alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */

Why the cache alignment here?

If required, omit the pad0 field and cache align the size field instead.

Alternatively, use RTE_CACHE_GUARD, if that is what you are trying to achieve.

> +
> +	uint32_t size;           /**< Size of deque. */
> +	uint32_t mask;           /**< Mask (size-1) of deque. */
> +	uint32_t capacity;       /**< Usable size of deque */
> +	/** Ring head and tail pointers. */
> +	volatile uint32_t head;
> +	volatile uint32_t tail;
> +};

[...]

> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->head & d->mask);
> +	rte_int128_t *deque = (rte_int128_t *)&d[1];
> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 32);

With 100 chars source code line length, this memcpy() fits on one line.
Not just here, but in all the functions.

> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +	}
> +}



More information about the dev mailing list