[dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Jun 7 08:03:34 CEST 2019


> 
> Add callback event handler for enqueue dequeue operation on ring.
> The pre-enqueue and post-dequeue operation on ring is selected to invoke
> user callback handler.
Can you provide a use case for this to better understand the need?

> 
> Signed-off-by: Vipin Varghese <vipin.varghese at intel.com>
> ---
>  config/common_base                   |   1 +
>  lib/librte_ring/Makefile             |   1 +
>  lib/librte_ring/meson.build          |   2 +
>  lib/librte_ring/rte_ring.c           | 187 +++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring.h           | 117 ++++++++++++++++-
>  lib/librte_ring/rte_ring_version.map |   9 ++
>  6 files changed, 316 insertions(+), 1 deletion(-)
> 
> diff --git a/config/common_base b/config/common_base index
> ec29455d2..022734f19 100644
> --- a/config/common_base
> +++ b/config/common_base
> @@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n
> CONFIG_RTE_LIBRTE_PMD_RING=y
>  CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16
>  CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16
> +CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n
> 
>  #
>  # Compile SOFTNIC PMD
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 21a36770d..4f086e687 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name  LIB =
> librte_ring.a
> 
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3  LDLIBS += -lrte_eal
> 
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> ab8b0b469..b92dcf027 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -2,6 +2,8 @@
>  # Copyright(c) 2017 Intel Corporation
> 
>  version = 2
> +allow_experimental_apis = true
> +
>  sources = files('rte_ring.c')
>  headers = files('rte_ring.h',
>  		'rte_ring_c11_mem.h',
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> b89ecf999..ee740c401 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  /* true if x is a power of 2 */
>  #define POWEROF2(x) ((((x)-1) & (x)) == 0)
> 
> +/* spinlock for pre-enqueue callback */ rte_spinlock_t
> +rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER;
> +/* spinlock for post-dequeue callback */ rte_spinlock_t
> +rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER;
> +
>  /* return the size of memory occupied by a ring */  ssize_t
> rte_ring_get_memsize(unsigned count) @@ -103,6 +108,9 @@
> rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>  	r->prod.head = r->cons.head = 0;
>  	r->prod.tail = r->cons.tail = 0;
> 
> +	TAILQ_INIT(&(r->enq_cbs));
> +	TAILQ_INIT(&(r->deq_cbs));
> +
>  	return 0;
>  }
> 
> @@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r)
>  	rte_free(te);
>  }
> 
> +int __rte_experimental
> +rte_ring_preenq_callback_register(struct rte_ring *r,
> +		rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> +	rte_errno = ENOTSUP;
> +	return -ENOTSUP;
> +#endif
> +
> +	struct rte_ring_callback *user_cb;
> +	rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
> +
> +	if (!cb_fn)
> +		return -EINVAL;
> +
> +	if (!rte_ring_get_capacity(r)) {
> +		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> +		return -EINVAL;
> +	}
> +
> +	rte_spinlock_lock(lock);
> +
> +	TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) {
> +		if ((void *) user_cb->cb_fn == (void *) cb_fn &&
> +				user_cb->cb_arg == cb_arg)
> +			break;
> +	}
> +
> +	/* create a new callback. */
> +	if (user_cb == NULL) {
> +		user_cb = rte_zmalloc("RING_USER_CALLBACK",
> +				sizeof(struct rte_ring_callback), 0);
> +		if (user_cb != NULL) {
> +			user_cb->cb_fn = cb_fn;
> +			user_cb->cb_arg = cb_arg;
> +			TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next);
> +		}
> +	}
> +
> +	rte_spinlock_unlock(lock);
> +
> +	return (user_cb == NULL) ? -ENOMEM : 0; }
> +
> +int __rte_experimental
> +rte_ring_pstdeq_callback_register(struct rte_ring *r,
> +		rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> +	rte_errno = ENOTSUP;
> +	return -ENOTSUP;
> +#endif
> +
> +	struct rte_ring_callback *user_cb;
> +	rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
> +
> +	if (!cb_fn)
> +		return -EINVAL;
> +
> +	if (!rte_ring_get_capacity(r)) {
> +		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> +		return -EINVAL;
> +	}
> +
> +	rte_spinlock_lock(lock);
> +
> +	TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) {
> +		if ((void *) user_cb->cb_fn == (void *) cb_fn &&
> +				user_cb->cb_arg == cb_arg)
> +			break;
> +	}
> +
> +	/* create a new callback. */
> +	if (user_cb == NULL) {
> +		user_cb = rte_zmalloc("RING_USER_CALLBACK",
> +				sizeof(struct rte_ring_callback), 0);
> +		if (user_cb != NULL) {
> +			user_cb->cb_fn = cb_fn;
> +			user_cb->cb_arg = cb_arg;
> +			TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next);
> +		}
> +	}
> +
> +	rte_spinlock_unlock(lock);
> +
> +	return (user_cb == NULL) ? -ENOMEM : 0; }
> +
> +int __rte_experimental
> +rte_ring_preenq_callback_unregister(struct rte_ring *r,
> +		rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> +	rte_errno = ENOTSUP;
> +	return -ENOTSUP;
> +#endif
> +
> +	int ret = 0;
> +	struct rte_ring_callback *cb, *next;
> +	rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
> +
> +	if (!cb_fn)
> +		return -EINVAL;
> +
> +	if (!rte_ring_get_capacity(r)) {
> +		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> +		return -EINVAL;
> +	}
> +
> +	rte_spinlock_lock(lock);
> +
> +	ret = -EINVAL;
> +	for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) {
> +		next = TAILQ_NEXT(cb, next);
> +
> +		if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
> +			continue;
> +
> +		if (cb->active == 0) {
> +			TAILQ_REMOVE(&(r->enq_cbs), cb, next);
> +			rte_free(cb);
> +			ret = 0;
> +		} else {
> +			ret = -EAGAIN;
> +		}
> +	}
> +
> +	rte_spinlock_unlock(lock);
> +
> +	return ret;
> +}
> +
> +int __rte_experimental
> +rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
> +		rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> +	rte_errno = ENOTSUP;
> +	return -ENOTSUP;
> +#endif
> +
> +	int ret = 0;
> +	struct rte_ring_callback *cb, *next;
> +	rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
> +
> +	if (!cb_fn)
> +		return -EINVAL;
> +
> +	if (!rte_ring_get_capacity(r)) {
> +		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> +		return -EINVAL;
> +	}
> +
> +	rte_spinlock_lock(lock);
> +
> +	ret = -EINVAL;
> +	for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) {
> +		next = TAILQ_NEXT(cb, next);
> +
> +		if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
> +			continue;
> +
> +		if (cb->active == 0) {
> +			TAILQ_REMOVE(&(r->deq_cbs), cb, next);
> +			rte_free(cb);
> +			ret = 0;
> +		} else {
> +			ret = -EAGAIN;
> +		}
> +	}
> +
> +	rte_spinlock_unlock(lock);
> +
> +	return ret;
> +
> +	return 0;
> +}
> +
> +
>  /* dump the status of the ring on the console */  void  rte_ring_dump(FILE *f,
> const struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h
> b/lib/librte_ring/rte_ring.h index e265e9479..fb0f3efb5 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -63,6 +63,11 @@ enum rte_ring_queue_behavior {
> 
>  struct rte_memzone; /* forward declaration, so as not to require memzone.h
> */
> 
> +struct rte_ring_callback;
> +
> +TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback);
> +TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback);
> +
>  /* structure to hold a pair of head/tail values and other metadata */  struct
> rte_ring_headtail {
>  	volatile uint32_t head;  /**< Prod/consumer head. */ @@ -103,6
> +108,20 @@ struct rte_ring {
>  	/** Ring consumer status. */
>  	struct rte_ring_headtail cons __rte_cache_aligned;
>  	char pad2 __rte_cache_aligned; /**< empty cache line */
> +
> +	struct rte_ring_enq_cb_list enq_cbs;
> +	struct rte_ring_deq_cb_list deq_cbs;
> +};
This breaks ABI compatibility

> +
> +typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r,
> +			void *obj_table, unsigned int n,
> +			void *cb_arg);
> +
> +struct rte_ring_callback {
> +	TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
> +	rte_ring_cb_fn cb_fn; /* Callback address */
> +	void *cb_arg; /* Parameter for callback */
> +	uint32_t active; /* Callback is executing */
>  };
> 
>  #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-
> producer". */ @@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct
> rte_ring *r, void * const *obj_table,
>   *   - n: Actual number of objects enqueued.
>   */
>  static __rte_always_inline unsigned
> -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
>  		      unsigned int n, unsigned int *free_space)  {
> +#ifdef RTE_RING_ENQDEQ_CALLBACKS
> +	struct rte_ring_callback *cb = NULL;
> +
> +	TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
Need to take the TAILQ lock before this. For ex: what happens if a un-register is called concurrently?
Also, traversing a linked list for every enqueue call would be too costly. May be, understanding the use case will help.

> +		if (cb->cb_fn == NULL)
> +			continue;
> +
> +		cb->active = 1;
> +		n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
> +		cb->active = 0;
> +	}
> +
> +#endif
> +
>  	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
>  			r->prod.single, free_space);
>  }

<snip>


More information about the dev mailing list