[dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck
Vipin Varghese
vipin.varghese at intel.com
Thu Jun 6 20:33:54 CEST 2019
Add callback event handler for enqueue dequeue operation on ring.
The pre-enqueue and post-dequeue operation on ring is selected to
invoke user callback handler.
Signed-off-by: Vipin Varghese <vipin.varghese at intel.com>
---
config/common_base | 1 +
lib/librte_ring/Makefile | 1 +
lib/librte_ring/meson.build | 2 +
lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++
lib/librte_ring/rte_ring.h | 117 ++++++++++++++++-
lib/librte_ring/rte_ring_version.map | 9 ++
6 files changed, 316 insertions(+), 1 deletion(-)
diff --git a/config/common_base b/config/common_base
index ec29455d2..022734f19 100644
--- a/config/common_base
+++ b/config/common_base
@@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n
CONFIG_RTE_LIBRTE_PMD_RING=y
CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16
CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16
+CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n
#
# Compile SOFTNIC PMD
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 21a36770d..4f086e687 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
# library name
LIB = librte_ring.a
+CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
LDLIBS += -lrte_eal
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ab8b0b469..b92dcf027 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -2,6 +2,8 @@
# Copyright(c) 2017 Intel Corporation
version = 2
+allow_experimental_apis = true
+
sources = files('rte_ring.c')
headers = files('rte_ring.h',
'rte_ring_c11_mem.h',
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index b89ecf999..ee740c401 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
/* true if x is a power of 2 */
#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+/* spinlock for pre-enqueue callback */
+rte_spinlock_t rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+/* spinlock for post-dequeue callback */
+rte_spinlock_t rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
/* return the size of memory occupied by a ring */
ssize_t
rte_ring_get_memsize(unsigned count)
@@ -103,6 +108,9 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
r->prod.head = r->cons.head = 0;
r->prod.tail = r->cons.tail = 0;
+ TAILQ_INIT(&(r->enq_cbs));
+ TAILQ_INIT(&(r->deq_cbs));
+
return 0;
}
@@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r)
rte_free(te);
}
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ struct rte_ring_callback *user_cb;
+ rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) {
+ if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+ user_cb->cb_arg == cb_arg)
+ break;
+ }
+
+ /* create a new callback. */
+ if (user_cb == NULL) {
+ user_cb = rte_zmalloc("RING_USER_CALLBACK",
+ sizeof(struct rte_ring_callback), 0);
+ if (user_cb != NULL) {
+ user_cb->cb_fn = cb_fn;
+ user_cb->cb_arg = cb_arg;
+ TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next);
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ struct rte_ring_callback *user_cb;
+ rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) {
+ if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+ user_cb->cb_arg == cb_arg)
+ break;
+ }
+
+ /* create a new callback. */
+ if (user_cb == NULL) {
+ user_cb = rte_zmalloc("RING_USER_CALLBACK",
+ sizeof(struct rte_ring_callback), 0);
+ if (user_cb != NULL) {
+ user_cb->cb_fn = cb_fn;
+ user_cb->cb_arg = cb_arg;
+ TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next);
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ int ret = 0;
+ struct rte_ring_callback *cb, *next;
+ rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ ret = -EINVAL;
+ for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) {
+ next = TAILQ_NEXT(cb, next);
+
+ if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+ continue;
+
+ if (cb->active == 0) {
+ TAILQ_REMOVE(&(r->enq_cbs), cb, next);
+ rte_free(cb);
+ ret = 0;
+ } else {
+ ret = -EAGAIN;
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return ret;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ int ret = 0;
+ struct rte_ring_callback *cb, *next;
+ rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ ret = -EINVAL;
+ for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) {
+ next = TAILQ_NEXT(cb, next);
+
+ if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+ continue;
+
+ if (cb->active == 0) {
+ TAILQ_REMOVE(&(r->deq_cbs), cb, next);
+ rte_free(cb);
+ ret = 0;
+ } else {
+ ret = -EAGAIN;
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return ret;
+
+ return 0;
+}
+
+
/* dump the status of the ring on the console */
void
rte_ring_dump(FILE *f, const struct rte_ring *r)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index e265e9479..fb0f3efb5 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -63,6 +63,11 @@ enum rte_ring_queue_behavior {
struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+struct rte_ring_callback;
+
+TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback);
+TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback);
+
/* structure to hold a pair of head/tail values and other metadata */
struct rte_ring_headtail {
volatile uint32_t head; /**< Prod/consumer head. */
@@ -103,6 +108,20 @@ struct rte_ring {
/** Ring consumer status. */
struct rte_ring_headtail cons __rte_cache_aligned;
char pad2 __rte_cache_aligned; /**< empty cache line */
+
+ struct rte_ring_enq_cb_list enq_cbs;
+ struct rte_ring_deq_cb_list deq_cbs;
+};
+
+typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r,
+ void *obj_table, unsigned int n,
+ void *cb_arg);
+
+struct rte_ring_callback {
+ TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
+ rte_ring_cb_fn cb_fn; /* Callback address */
+ void *cb_arg; /* Parameter for callback */
+ uint32_t active; /* Callback is executing */
};
#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
@@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* - n: Actual number of objects enqueued.
*/
static __rte_always_inline unsigned
-rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *free_space)
{
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+ struct rte_ring_callback *cb = NULL;
+
+ TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
+ if (cb->cb_fn == NULL)
+ continue;
+
+ cb->active = 1;
+ n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+ cb->active = 0;
+ }
+
+#endif
+
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
r->prod.single, free_space);
}
@@ -881,6 +914,20 @@ static __rte_always_inline unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+ struct rte_ring_callback *cb = NULL;
+
+ TAILQ_FOREACH(cb, &(r->deq_cbs), next) {
+ if (cb->cb_fn == NULL)
+ continue;
+
+ cb->active = 1;
+ n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+ cb->active = 0;
+ }
+
+#endif
+
return __rte_ring_do_dequeue(r, obj_table, n,
RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
}
@@ -938,6 +985,74 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
r->cons.single, available);
}
+/**
+ * Register user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Register user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..101814fad 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,12 @@ DPDK_2.2 {
rte_ring_free;
} DPDK_2.0;
+
+EXPERIMENTAL {
+ global:
+
+ rte_ring_preenq_callback_register;
+ rte_ring_preenq_callback_unregister;
+ rte_ring_pstdeq_callback_register;
+ rte_ring_pstdeq_callback_unregister;
+};
--
2.17.1
More information about the dev
mailing list