[dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck

Vipin Varghese vipin.varghese at intel.com
Thu Jun 6 20:33:54 CEST 2019


Add callback event handler for enqueue dequeue operation on ring.
The pre-enqueue and post-dequeue operation on ring is selected to
invoke user callback handler.

Signed-off-by: Vipin Varghese <vipin.varghese at intel.com>
---
 config/common_base                   |   1 +
 lib/librte_ring/Makefile             |   1 +
 lib/librte_ring/meson.build          |   2 +
 lib/librte_ring/rte_ring.c           | 187 +++++++++++++++++++++++++++
 lib/librte_ring/rte_ring.h           | 117 ++++++++++++++++-
 lib/librte_ring/rte_ring_version.map |   9 ++
 6 files changed, 316 insertions(+), 1 deletion(-)

diff --git a/config/common_base b/config/common_base
index ec29455d2..022734f19 100644
--- a/config/common_base
+++ b/config/common_base
@@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n
 CONFIG_RTE_LIBRTE_PMD_RING=y
 CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16
 CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16
+CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n
 
 #
 # Compile SOFTNIC PMD
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 21a36770d..4f086e687 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 # library name
 LIB = librte_ring.a
 
+CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
 LDLIBS += -lrte_eal
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ab8b0b469..b92dcf027 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -2,6 +2,8 @@
 # Copyright(c) 2017 Intel Corporation
 
 version = 2
+allow_experimental_apis = true
+
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
 		'rte_ring_c11_mem.h',
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index b89ecf999..ee740c401 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 /* true if x is a power of 2 */
 #define POWEROF2(x) ((((x)-1) & (x)) == 0)
 
+/* spinlock for pre-enqueue callback */
+rte_spinlock_t rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+/* spinlock for post-dequeue callback */
+rte_spinlock_t rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
 /* return the size of memory occupied by a ring */
 ssize_t
 rte_ring_get_memsize(unsigned count)
@@ -103,6 +108,9 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	r->prod.head = r->cons.head = 0;
 	r->prod.tail = r->cons.tail = 0;
 
+	TAILQ_INIT(&(r->enq_cbs));
+	TAILQ_INIT(&(r->deq_cbs));
+
 	return 0;
 }
 
@@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r)
 	rte_free(te);
 }
 
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+		rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+	rte_errno = ENOTSUP;
+	return -ENOTSUP;
+#endif
+
+	struct rte_ring_callback *user_cb;
+	rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+	if (!cb_fn)
+		return -EINVAL;
+
+	if (!rte_ring_get_capacity(r)) {
+		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+		return -EINVAL;
+	}
+
+	rte_spinlock_lock(lock);
+
+	TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) {
+		if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+				user_cb->cb_arg == cb_arg)
+			break;
+	}
+
+	/* create a new callback. */
+	if (user_cb == NULL) {
+		user_cb = rte_zmalloc("RING_USER_CALLBACK",
+				sizeof(struct rte_ring_callback), 0);
+		if (user_cb != NULL) {
+			user_cb->cb_fn = cb_fn;
+			user_cb->cb_arg = cb_arg;
+			TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next);
+		}
+	}
+
+	rte_spinlock_unlock(lock);
+
+	return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+		rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+	rte_errno = ENOTSUP;
+	return -ENOTSUP;
+#endif
+
+	struct rte_ring_callback *user_cb;
+	rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+	if (!cb_fn)
+		return -EINVAL;
+
+	if (!rte_ring_get_capacity(r)) {
+		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+		return -EINVAL;
+	}
+
+	rte_spinlock_lock(lock);
+
+	TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) {
+		if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+				user_cb->cb_arg == cb_arg)
+			break;
+	}
+
+	/* create a new callback. */
+	if (user_cb == NULL) {
+		user_cb = rte_zmalloc("RING_USER_CALLBACK",
+				sizeof(struct rte_ring_callback), 0);
+		if (user_cb != NULL) {
+			user_cb->cb_fn = cb_fn;
+			user_cb->cb_arg = cb_arg;
+			TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next);
+		}
+	}
+
+	rte_spinlock_unlock(lock);
+
+	return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+		rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+	rte_errno = ENOTSUP;
+	return -ENOTSUP;
+#endif
+
+	int ret = 0;
+	struct rte_ring_callback *cb, *next;
+	rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+	if (!cb_fn)
+		return -EINVAL;
+
+	if (!rte_ring_get_capacity(r)) {
+		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+		return -EINVAL;
+	}
+
+	rte_spinlock_lock(lock);
+
+	ret = -EINVAL;
+	for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) {
+		next = TAILQ_NEXT(cb, next);
+
+		if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+			continue;
+
+		if (cb->active == 0) {
+			TAILQ_REMOVE(&(r->enq_cbs), cb, next);
+			rte_free(cb);
+			ret = 0;
+		} else {
+			ret = -EAGAIN;
+		}
+	}
+
+	rte_spinlock_unlock(lock);
+
+	return ret;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+		rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+	rte_errno = ENOTSUP;
+	return -ENOTSUP;
+#endif
+
+	int ret = 0;
+	struct rte_ring_callback *cb, *next;
+	rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+	if (!cb_fn)
+		return -EINVAL;
+
+	if (!rte_ring_get_capacity(r)) {
+		RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+		return -EINVAL;
+	}
+
+	rte_spinlock_lock(lock);
+
+	ret = -EINVAL;
+	for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) {
+		next = TAILQ_NEXT(cb, next);
+
+		if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+			continue;
+
+		if (cb->active == 0) {
+			TAILQ_REMOVE(&(r->deq_cbs), cb, next);
+			rte_free(cb);
+			ret = 0;
+		} else {
+			ret = -EAGAIN;
+		}
+	}
+
+	rte_spinlock_unlock(lock);
+
+	return ret;
+
+	return 0;
+}
+
+
 /* dump the status of the ring on the console */
 void
 rte_ring_dump(FILE *f, const struct rte_ring *r)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index e265e9479..fb0f3efb5 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -63,6 +63,11 @@ enum rte_ring_queue_behavior {
 
 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
 
+struct rte_ring_callback;
+
+TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback);
+TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback);
+
 /* structure to hold a pair of head/tail values and other metadata */
 struct rte_ring_headtail {
 	volatile uint32_t head;  /**< Prod/consumer head. */
@@ -103,6 +108,20 @@ struct rte_ring {
 	/** Ring consumer status. */
 	struct rte_ring_headtail cons __rte_cache_aligned;
 	char pad2 __rte_cache_aligned; /**< empty cache line */
+
+	struct rte_ring_enq_cb_list enq_cbs;
+	struct rte_ring_deq_cb_list deq_cbs;
+};
+
+typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r,
+			void *obj_table, unsigned int n,
+			void *cb_arg);
+
+struct rte_ring_callback {
+	TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
+	rte_ring_cb_fn cb_fn; /* Callback address */
+	void *cb_arg; /* Parameter for callback */
+	uint32_t active; /* Callback is executing */
 };
 
 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
@@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  *   - n: Actual number of objects enqueued.
  */
 static __rte_always_inline unsigned
-rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+	struct rte_ring_callback *cb = NULL;
+
+	TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
+		if (cb->cb_fn == NULL)
+			continue;
+
+		cb->active = 1;
+		n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+		cb->active = 0;
+	}
+
+#endif
+
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
 			r->prod.single, free_space);
 }
@@ -881,6 +914,20 @@ static __rte_always_inline unsigned
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+	struct rte_ring_callback *cb = NULL;
+
+	TAILQ_FOREACH(cb, &(r->deq_cbs), next) {
+		if (cb->cb_fn == NULL)
+			continue;
+
+		cb->active = 1;
+		n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+		cb->active = 0;
+	}
+
+#endif
+
 	return __rte_ring_do_dequeue(r, obj_table, n,
 			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
 }
@@ -938,6 +985,74 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 				r->cons.single, available);
 }
 
+/**
+ * Register user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param cb_fn
+ *   User callback function to be invoked.
+ * @param cb_arg
+ *   user callback arguments to shared in callback process.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+			rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Register user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param cb_fn
+ *   User callback function to be invoked.
+ * @param cb_arg
+ *   user callback arguments to shared in callback process.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+			rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param cb_fn
+ *   User callback function to be invoked.
+ * @param cb_arg
+ *   user callback arguments to shared in callback process.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+			rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param cb_fn
+ *   User callback function to be invoked.
+ * @param cb_arg
+ *   user callback arguments to shared in callback process.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+			rte_ring_cb_fn cb_fn, void *cb_arg);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..101814fad 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,12 @@ DPDK_2.2 {
 	rte_ring_free;
 
 } DPDK_2.0;
+
+EXPERIMENTAL {
+	global:
+
+	rte_ring_preenq_callback_register;
+	rte_ring_preenq_callback_unregister;
+	rte_ring_pstdeq_callback_register;
+	rte_ring_pstdeq_callback_unregister;
+};
-- 
2.17.1



More information about the dev mailing list