[dpdk-dev] [PATCH v5 3/9] ring: introduce RTS ring mode

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Sun Apr 19 04:31:14 CEST 2020


<snip>

> 
> Introduce relaxed tail sync (RTS) mode for MT ring synchronization.
> Aim to reduce stall times in case when ring is used on overcommited cpus
> (multiple active threads on the same cpu).
> The main difference from original MP/MC algorithm is that tail value is
> increased not by every thread that finished enqueue/dequeue, but only by the
> last one.
> That allows threads to avoid spinning on ring tail value, leaving actual tail
> value change to the last thread in the update queue.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
Few nits, otherwise
Acked-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>

> ---
> 
> check-abi.sh reports what I believe is a false-positive about ring cons/prod
> changes. As a workaround, devtools/libabigail.abignore is updated to
> suppress *struct ring* related errors.
> 
>  devtools/libabigail.abignore           |   7 +
>  lib/librte_ring/Makefile               |   4 +-
>  lib/librte_ring/meson.build            |   7 +-
>  lib/librte_ring/rte_ring.c             | 100 +++++-
>  lib/librte_ring/rte_ring.h             |  70 +++-
>  lib/librte_ring/rte_ring_core.h        |  36 +-
>  lib/librte_ring/rte_ring_elem.h        |  90 ++++-
>  lib/librte_ring/rte_ring_rts.h         | 439 +++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_rts_c11_mem.h | 179 ++++++++++
>  9 files changed, 902 insertions(+), 30 deletions(-)  create mode 100644
> lib/librte_ring/rte_ring_rts.h  create mode 100644
> lib/librte_ring/rte_ring_rts_c11_mem.h
> 
> diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore index
> a59df8f13..cd86d89ca 100644
> --- a/devtools/libabigail.abignore
> +++ b/devtools/libabigail.abignore
> @@ -11,3 +11,10 @@
>          type_kind = enum
>          name = rte_crypto_asym_xform_type
>          changed_enumerators = RTE_CRYPTO_ASYM_XFORM_TYPE_LIST_END
> +; Ignore updates of ring prod/cons
> +[suppress_type]
> +        type_kind = struct
> +        name = rte_ring
> +[suppress_type]
> +        type_kind = struct
> +        name = rte_event_ring
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 6572768c9..04e446e37 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -19,6 +19,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=
> rte_ring.h \
>  					rte_ring_core.h \
>  					rte_ring_elem.h \
>  					rte_ring_generic.h \
> -					rte_ring_c11_mem.h
> +					rte_ring_c11_mem.h \
> +					rte_ring_rts.h \
> +					rte_ring_rts_c11_mem.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> c656781da..a95598032 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -6,4 +6,9 @@ headers = files('rte_ring.h',
>  		'rte_ring_core.h',
>  		'rte_ring_elem.h',
>  		'rte_ring_c11_mem.h',
> -		'rte_ring_generic.h')
> +		'rte_ring_generic.h',
> +		'rte_ring_rts.h',
> +		'rte_ring_rts_c11_mem.h')
> +
> +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> +allow_experimental_apis = true
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> fa5733907..222eec0fb 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,6 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  /* true if x is a power of 2 */
>  #define POWEROF2(x) ((((x)-1) & (x)) == 0)
> 
> +/* by default set head/tail distance as 1/8 of ring capacity */
> +#define HTD_MAX_DEF	8
> +
>  /* return the size of memory occupied by a ring */  ssize_t
> rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) @@ -
> 79,11 +82,84 @@ rte_ring_get_memsize(unsigned int count)
>  	return rte_ring_get_memsize_elem(sizeof(void *), count);  }
> 
> +/*
> + * internal helper function to reset prod/cons head-tail values.
> + */
> +static void
> +reset_headtail(void *p)
The internal functions have used __rte prefix in ring library. I think we should follow the same here.

> +{
> +	struct rte_ring_headtail *ht;
> +	struct rte_ring_rts_headtail *ht_rts;
> +
> +	ht = p;
> +	ht_rts = p;
> +
> +	switch (ht->sync_type) {
> +	case RTE_RING_SYNC_MT:
> +	case RTE_RING_SYNC_ST:
> +		ht->head = 0;
> +		ht->tail = 0;
> +		break;
> +	case RTE_RING_SYNC_MT_RTS:
> +		ht_rts->head.raw = 0;
> +		ht_rts->tail.raw = 0;
> +		break;
> +	default:
> +		/* unknown sync mode */
> +		RTE_ASSERT(0);
> +	}
> +}
> +
>  void
>  rte_ring_reset(struct rte_ring *r)
>  {
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +	reset_headtail(&r->prod);
> +	reset_headtail(&r->cons);
> +}
> +
> +/*
> + * helper function, calculates sync_type values for prod and cons
> + * based on input flags. Returns zero at success or negative
> + * errno value otherwise.
> + */
> +static int
> +get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st,
> +	enum rte_ring_sync_type *cons_st)
The internal functions have used __rte prefix in ring library. I think we should follow the same here.
Also, it will help avoid symbol clashes.

> +{
> +	static const uint32_t prod_st_flags =
> +		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ);
> +	static const uint32_t cons_st_flags =
> +		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ);
> +
> +	switch (flags & prod_st_flags) {
> +	case 0:
> +		*prod_st = RTE_RING_SYNC_MT;
> +		break;
> +	case RING_F_SP_ENQ:
> +		*prod_st = RTE_RING_SYNC_ST;
> +		break;
> +	case RING_F_MP_RTS_ENQ:
> +		*prod_st = RTE_RING_SYNC_MT_RTS;
> +		break;
> +	default:
> +		return -EINVAL;
> +	}
> +
> +	switch (flags & cons_st_flags) {
> +	case 0:
> +		*cons_st = RTE_RING_SYNC_MT;
> +		break;
> +	case RING_F_SC_DEQ:
> +		*cons_st = RTE_RING_SYNC_ST;
> +		break;
> +	case RING_F_MC_RTS_DEQ:
> +		*cons_st = RTE_RING_SYNC_MT_RTS;
> +		break;
> +	default:
> +		return -EINVAL;
> +	}
> +
> +	return 0;
>  }
> 
>  int
> @@ -100,16 +176,20 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
>  			  RTE_CACHE_LINE_MASK) != 0);
> 
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
> +		offsetof(struct rte_ring_rts_headtail, sync_type));
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
> +		offsetof(struct rte_ring_rts_headtail, tail.val.pos));
> +
>  	/* init the ring structure */
>  	memset(r, 0, sizeof(*r));
>  	ret = strlcpy(r->name, name, sizeof(r->name));
>  	if (ret < 0 || ret >= (int)sizeof(r->name))
>  		return -ENAMETOOLONG;
>  	r->flags = flags;
> -	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> -		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> -	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> -		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> +	ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type);
> +	if (ret != 0)
> +		return ret;
> 
>  	if (flags & RING_F_EXACT_SZ) {
>  		r->size = rte_align32pow2(count + 1); @@ -126,8 +206,12
> @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>  		r->mask = count - 1;
>  		r->capacity = r->mask;
>  	}
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +
> +	/* set default values for head-tail distance */
> +	if (flags & RING_F_MP_RTS_ENQ)
> +		rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF);
> +	if (flags & RING_F_MC_RTS_DEQ)
> +		rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF);
> 
>  	return 0;
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> 35ee4491c..77f206ca7 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -1,6 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   *
> - * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2010-2020 Intel Corporation
>   * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
>   * All rights reserved.
>   * Derived from FreeBSD's bufring.h
> @@ -389,8 +389,21 @@ static __rte_always_inline unsigned int
> rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -			r->prod.sync_type, free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
> #ifdef
> +ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -524,8 +537,20 @@ static __rte_always_inline unsigned int
> rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
>  		unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
> #ifdef
> +ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n,
> available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -845,8 +870,21 @@ static __rte_always_inline unsigned
> rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -			r->prod.sync_type, free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_burst(r, obj_table, n,
> free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
> #ifdef
> +ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  /**
> @@ -925,9 +963,21 @@ static __rte_always_inline unsigned
> rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
> #ifdef
> +ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
> +			available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	return 0;
>  }
> 
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h
> index d9cef763f..ded0fa0b7 100644
> --- a/lib/librte_ring/rte_ring_core.h
> +++ b/lib/librte_ring/rte_ring_core.h
> @@ -57,6 +57,9 @@ enum rte_ring_queue_behavior {  enum
> rte_ring_sync_type {
>  	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
>  	RTE_RING_SYNC_ST,     /**< single thread only */
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
These need to be documented in rte_ring_init, rte_ring_create, rte_ring_create_elem API comments.
Also, please check if you want to update the file description in rte_ring.h in brief to capture the new features.

> #endif
>  };
> 
>  /**
> @@ -76,6 +79,22 @@ struct rte_ring_headtail {
>  	};
>  };
> 
> +union rte_ring_rts_poscnt {
I think this is internal structure, prefix can be __rte

> +	/** raw 8B value to read/write *cnt* and *pos* as one atomic op */
> +	uint64_t raw __rte_aligned(8);
> +	struct {
> +		uint32_t cnt; /**< head/tail reference counter */
> +		uint32_t pos; /**< head/tail position */
> +	} val;
> +};
> +
> +struct rte_ring_rts_headtail {
Same here, the prefix can be __rte

> +	volatile union rte_ring_rts_poscnt tail;
> +	enum rte_ring_sync_type sync_type;  /**< sync type of prod/cons */
> +	uint32_t htd_max;   /**< max allowed distance between head/tail */
> +	volatile union rte_ring_rts_poscnt head; };
> +
>  /**
>   * An RTE ring structure.
>   *
> @@ -104,11 +123,21 @@ struct rte_ring {
>  	char pad0 __rte_cache_aligned; /**< empty cache line */
> 
>  	/** Ring producer status. */
> -	struct rte_ring_headtail prod __rte_cache_aligned;
> +	RTE_STD_C11
> +	union {
> +		struct rte_ring_headtail prod;
> +		struct rte_ring_rts_headtail rts_prod;
> +	}  __rte_cache_aligned;
> +
>  	char pad1 __rte_cache_aligned; /**< empty cache line */
> 
>  	/** Ring consumer status. */
> -	struct rte_ring_headtail cons __rte_cache_aligned;
> +	RTE_STD_C11
> +	union {
> +		struct rte_ring_headtail cons;
> +		struct rte_ring_rts_headtail rts_cons;
> +	}  __rte_cache_aligned;
> +
>  	char pad2 __rte_cache_aligned; /**< empty cache line */  };
> 
> @@ -125,6 +154,9 @@ struct rte_ring {
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> 
> +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP
> RTS".
> +*/ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC
> +RTS". */
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> index 7406c0b0f..6da0a917b 100644
> --- a/lib/librte_ring/rte_ring_elem.h
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -528,6 +528,10 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r,
> const void *obj_table,
>  			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST,
> free_space);  }
> 
> +#ifdef ALLOW_EXPERIMENTAL_API
> +#include <rte_ring_rts.h>
> +#endif
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -557,6 +561,26 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r,
> const void *obj_table,  {
>  	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
>  			RTE_RING_QUEUE_FIXED, r->prod.sync_type,
> free_space);
> +
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n,
> +			free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n,
> +			free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table,
> esize, n,
> +			free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (free_space != NULL)
> +		*free_space = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -661,7 +685,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r,
> void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
>  	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -				RTE_RING_QUEUE_FIXED,
> RTE_RING_SYNC_MT, available);
> +			RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT,
> available);
>  }
> 
>  /**
> @@ -719,8 +743,25 @@ static __rte_always_inline unsigned int
> rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
> -	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_FIXED, r->cons.sync_type,
> available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n,
> +			available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n,
> +			available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table,
> esize,
> +			n, available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (available != NULL)
> +		*available = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -887,8 +928,25 @@ static __rte_always_inline unsigned
> rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *free_space)  {
> -	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> -			RTE_RING_QUEUE_VARIABLE, r->prod.sync_type,
> free_space);
> +	switch (r->prod.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize,
> n,
> +			free_space);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n,
> +			free_space);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table,
> esize,
> +			n, free_space);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (free_space != NULL)
> +		*free_space = 0;
> +	return 0;
>  }
> 
>  /**
> @@ -979,9 +1037,25 @@ static __rte_always_inline unsigned int
> rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
>  		unsigned int esize, unsigned int n, unsigned int *available)  {
> -	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.sync_type, available);
> +	switch (r->cons.sync_type) {
> +	case RTE_RING_SYNC_MT:
> +		return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n,
> +			available);
> +	case RTE_RING_SYNC_ST:
> +		return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n,
> +			available);
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	case RTE_RING_SYNC_MT_RTS:
> +		return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table,
> esize,
> +			n, available);
> +#endif
> +	}
> +
> +	/* valid ring should never reach this point */
> +	RTE_ASSERT(0);
> +	if (available != NULL)
> +		*available = 0;
> +	return 0;
>  }
> 
>  #include <rte_ring.h>
> diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h new
> file mode 100644 index 000000000..8ced07096
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts.h
> @@ -0,0 +1,439 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2020 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_H_
> +#define _RTE_RING_RTS_H_
> +
> +/**
> + * @file rte_ring_rts.h
> + * @b EXPERIMENTAL: this API may change without prior notice
> + * It is not recommended to include this file directly.
> + * Please include <rte_ring.h> instead.
> + *
> + * Contains functions for Relaxed Tail Sync (RTS) ring mode.
> + * The main idea remains the same as for our original MP/MC
> +synchronization
> + * mechanism.
> + * The main difference is that tail value is increased not
> + * by every thread that finished enqueue/dequeue,
> + * but only by the current last one doing enqueue/dequeue.
> + * That allows threads to skip spinning on tail value,
> + * leaving actual tail value change to last thread at a given instance.
> + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation:
> + * one for head update, second for tail update.
> + * As a gain it allows thread to avoid spinning/waiting on tail value.
> + * In comparision original MP/MC algorithm requires one 32-bit CAS
> + * for head update and waiting/spinning on tail value.
> + *
> + * Brief outline:
> + *  - introduce update counter (cnt) for both head and tail.
> + *  - increment head.cnt for each head.value update
> + *  - write head.value and head.cnt atomically (64-bit CAS)
> + *  - move tail.value ahead only when tail.cnt + 1 == head.cnt
> + *    (indicating that this is the last thread updating the tail)
> + *  - increment tail.cnt when each enqueue/dequeue op finishes
> + *    (no matter if tail.value going to change or not)
> + *  - write tail.value and tail.cnt atomically (64-bit CAS)
> + *
> + * To avoid producer/consumer starvation:
> + *  - limit max allowed distance between head and tail value (HTD_MAX).
> + *    I.E. thread is allowed to proceed with changing head.value,
> + *    only when:  head.value - tail.value <= HTD_MAX
> + * HTD_MAX is an optional parameter.
> + * With HTD_MAX == 0 we'll have fully serialized ring -
> + * i.e. only one thread at a time will be able to enqueue/dequeue
> + * to/from the ring.
> + * With HTD_MAX >= ring.capacity - no limitation.
> + * By default HTD_MAX == ring.capacity / 8.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring_rts_c11_mem.h>
> +
> +/**
> + * @internal Enqueue several objects on the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
> +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *free_space)
> +{
> +	uint32_t free, head;
> +
> +	n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
> +
> +	if (n != 0) {
> +		__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
> +		__rte_ring_rts_update_tail(&r->rts_prod);
> +	}
> +
> +	if (free_space != NULL)
> +		*free_space = free - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the RTS ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
> +	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
> +	uint32_t *available)
> +{
> +	uint32_t entries, head;
> +
> +	n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
> +
> +	if (n != 0) {
> +		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
> +		__rte_ring_rts_update_tail(&r->rts_cons);
> +	}
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void
> *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS ring (multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, available);
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void
> *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, free_space); }
> +
> +/**
> + * Dequeue several objects from an RTS  ring (multi-consumers safe).
> + * When the requested objects are more than the available objects,
> + * only dequeue the actual number of objects.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +	unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, available); }
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table,
> +			sizeof(uintptr_t), n, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS ring (multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table,
> +			sizeof(uintptr_t), n, available);
> +}
> +
> +/**
> + * Enqueue several objects on the RTS ring (multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table,
> +			sizeof(uintptr_t), n, free_space);
> +}
> +
> +/**
> + * Dequeue several objects from an RTS  ring (multi-consumers safe).
> + * When the requested objects are more than the available objects,
> + * only dequeue the actual number of objects.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned
> +rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table,
> +			sizeof(uintptr_t), n, available);
> +}
> +
> +/**
> + * Return producer max Head-Tail-Distance (HTD).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Producer HTD value, if producer is set in appropriate sync mode,
> + *   or UINT32_MAX otherwise.
> + */
> +__rte_experimental
> +static inline uint32_t
> +rte_ring_get_prod_htd_max(const struct rte_ring *r) {
> +	if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
> +		return r->rts_prod.htd_max;
> +	return UINT32_MAX;
> +}
> +
> +/**
> + * Set producer max Head-Tail-Distance (HTD).
> + * Note that producer has to use appropriate sync mode (RTS).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param v
> + *   new HTD value to setup.
> + * @return
> + *   Zero on success, or negative error code otherwise.
> + */
> +__rte_experimental
> +static inline int
> +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) {
> +	if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
> +		return -ENOTSUP;
> +
> +	r->rts_prod.htd_max = v;
> +	return 0;
> +}
> +
> +/**
> + * Return consumer max Head-Tail-Distance (HTD).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   Consumer HTD value, if consumer is set in appropriate sync mode,
> + *   or UINT32_MAX otherwise.
> + */
> +__rte_experimental
> +static inline uint32_t
> +rte_ring_get_cons_htd_max(const struct rte_ring *r) {
> +	if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
> +		return r->rts_cons.htd_max;
> +	return UINT32_MAX;
> +}
> +
> +/**
> + * Set consumer max Head-Tail-Distance (HTD).
> + * Note that consumer has to use appropriate sync mode (RTS).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param v
> + *   new HTD value to setup.
> + * @return
> + *   Zero on success, or negative error code otherwise.
> + */
> +__rte_experimental
> +static inline int
> +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) {
> +	if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
> +		return -ENOTSUP;
> +
> +	r->rts_cons.htd_max = v;
> +	return 0;
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_RTS_H_ */
> diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h
> b/lib/librte_ring/rte_ring_rts_c11_mem.h
> new file mode 100644
> index 000000000..9f26817c0
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
> @@ -0,0 +1,179 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2020 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy at freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_RTS_C11_MEM_H_
> +#define _RTE_RING_RTS_C11_MEM_H_
> +
> +/**
> + * @file rte_ring_rts_c11_mem.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
> + * For more information please refer to <rte_ring_rts.h>.
> + */
> +
> +/**
> + * @internal This function updates tail values.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) {
> +	union rte_ring_rts_poscnt h, ot, nt;
> +
> +	/*
> +	 * If there are other enqueues/dequeues in progress that
> +	 * might preceded us, then don't update tail with new value.
> +	 */
> +
> +	ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
> +
> +	do {
> +		/* on 32-bit systems we have to do atomic read here */
> +		h.raw = __atomic_load_n(&ht->head.raw,
> __ATOMIC_RELAXED);
> +
> +		nt.raw = ot.raw;
> +		if (++nt.val.cnt == h.val.cnt)
> +			nt.val.pos = h.val.pos;
> +
> +	} while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw,
> nt.raw,
> +			0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); }
> +
> +/**
> + * @internal This function waits till head/tail distance wouldn't
> + * exceed pre-defined max value.
> + */
> +static __rte_always_inline void
> +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
> +	union rte_ring_rts_poscnt *h)
> +{
> +	uint32_t max;
> +
> +	max = ht->htd_max;
> +
> +	while (h->val.pos - ht->tail.val.pos > max) {
> +		rte_pause();
> +		h->raw = __atomic_load_n(&ht->head.raw,
> __ATOMIC_ACQUIRE);
> +	}
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue.
> + */
> +static __rte_always_inline uint32_t
> +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *free_entries)
> +{
> +	uint32_t n;
> +	union rte_ring_rts_poscnt nh, oh;
> +
> +	const uint32_t capacity = r->capacity;
> +
> +	oh.raw = __atomic_load_n(&r->rts_prod.head.raw,
> __ATOMIC_ACQUIRE);
> +
> +	do {
> +		/* Reset n to the initial burst count */
> +		n = num;
> +
> +		/*
> +		 * wait for prod head/tail distance,
> +		 * make sure that we read prod head *before*
> +		 * reading cons tail.
> +		 */
> +		__rte_ring_rts_head_wait(&r->rts_prod, &oh);
> +
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits
> value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * *old_head > cons_tail). So 'free_entries' is always between
> 0
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = capacity + r->cons.tail - oh.val.pos;
> +
> +		/* check that we have enough room in ring */
> +		if (unlikely(n > *free_entries))
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +					0 : *free_entries;
> +
> +		if (n == 0)
> +			break;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	/*
> +	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
> +	 *  - OOO reads of cons tail value
> +	 *  - OOO copy of elems to the ring
> +	 */
> +	} while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
> +			&oh.raw, nh.raw,
> +			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the consumer head for dequeue  */
> +static __rte_always_inline unsigned int
> +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *entries)
> +{
> +	uint32_t n;
> +	union rte_ring_rts_poscnt nh, oh;
> +
> +	oh.raw = __atomic_load_n(&r->rts_cons.head.raw,
> __ATOMIC_ACQUIRE);
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = num;
> +
> +		/*
> +		 * wait for cons head/tail distance,
> +		 * make sure that we read cons head *before*
> +		 * reading prod tail.
> +		 */
> +		__rte_ring_rts_head_wait(&r->rts_cons, &oh);
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = r->prod.tail - oh.val.pos;
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> +
> +		if (unlikely(n == 0))
> +			break;
> +
> +		nh.val.pos = oh.val.pos + n;
> +		nh.val.cnt = oh.val.cnt + 1;
> +
> +	/*
> +	 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
> +	 *  - OOO reads of prod tail value
> +	 *  - OOO copy of elems from the ring
> +	 */
> +	} while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
> +			&oh.raw, nh.raw,
> +			0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
> +
> +	*old_head = oh.val.pos;
> +	return n;
> +}
> +
> +#endif /* _RTE_RING_RTS_C11_MEM_H_ */
> --
> 2.17.1



More information about the dev mailing list