[PATCH v3 1/3] lib: introduce dispatcher library
    Naga Harish K, S V 
    s.v.naga.harish.k at intel.com
       
    Sun Sep 17 18:46:40 CEST 2023
    
    
  
> -----Original Message-----
> From: Mattias Rönnblom <mattias.ronnblom at ericsson.com>
> Sent: Monday, September 4, 2023 6:33 PM
> To: dev at dpdk.org
> Cc: Jerin Jacob <jerinj at marvell.com>; techboard at dpdk.org; Van Haaren,
> Harry <harry.van.haaren at intel.com>; hofors at lysator.liu.se; Nilsson, Peter
> <peter.j.nilsson at ericsson.com>; Heng Wang <heng.wang at ericsson.com>;
> Naga Harish K, S V <s.v.naga.harish.k at intel.com>; Pavan Nikhilesh
> <pbhagavatula at marvell.com>; Gujjar, Abhinandan S
> <abhinandan.gujjar at intel.com>; Carrillo, Erik G <Erik.G.Carrillo at intel.com>;
> Shijith Thotton <sthotton at marvell.com>; Hemant Agrawal
> <hemant.agrawal at nxp.com>; Sachin Saxena <sachin.saxena at oss.nxp.com>;
> Liang Ma <liangma at liangbit.com>; Mccarthy, Peter
> <Peter.Mccarthy at intel.com>; Yan, Zhirun <Zhirun.Yan at intel.com>;
> mattias.ronnblom <mattias.ronnblom at ericsson.com>
> Subject: [PATCH v3 1/3] lib: introduce dispatcher library
> 
> The purpose of the dispatcher library is to help reduce coupling in an
> Eventdev-based DPDK application.
> 
> In addition, the dispatcher also provides a convenient and flexible way for the
> application to use service cores for application-level processing.
> 
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom at ericsson.com>
> Tested-by: Peter Nilsson <peter.j.nilsson at ericsson.com>
> Reviewed-by: Heng Wang <heng.wang at ericsson.com>
> 
> --
> 
> PATCH v3:
>  o To underline its optional character and since it does not provide
>    hardware abstraction, the event dispatcher is now a separate
>    library.
>  o Change name from rte_event_dispatcher -> rte_dispatcher, to make it
>    shorter and to avoid the rte_event_* namespace.
> 
Rte_dispatcher is basically dispatching events but it feels like the name does not convey that.
Also, it is like any other adapter service that can reside within the eventdev directory.
I can see some discussion in previous threads related to the placement of the dispatcher library.
It is an optional eventdev application service, not enforcing this programming model to the application.
The documentation may need to be updated and mention that this is optional.
If any hardware comes up with the dispatcher feature, then this library may need to be moved inside eventdev library later.
So, It makes sense to keep this optional service in the eventdev folder as an optional feature.
> PATCH v2:
>  o Add dequeue batch count statistic.
>  o Add statistics reset function to API.
>  o Clarify MT safety guarantees (or lack thereof) in the API documentation.
>  o Change loop variable type in evd_lcore_get_handler_by_id() to uint16_t,
>    to be consistent with similar loops elsewhere in the dispatcher.
>  o Fix variable names in finalizer unregister function.
> 
> PATCH:
>  o Change prefix from RED to EVD, to avoid confusion with random
>    early detection.
> 
> RFC v4:
>  o Move handlers to per-lcore data structures.
>  o Introduce mechanism which rearranges handlers so that often-used
>    handlers tend to be tried first.
>  o Terminate dispatch loop in case all events are delivered.
>  o To avoid the dispatcher's service function hogging the CPU, process
>    only one batch per call.
>  o Have service function return -EAGAIN if no work is performed.
>  o Events delivered in the process function is no longer marked 'const',
>    since modifying them may be useful for the application and cause
>    no difficulties for the dispatcher.
>  o Various minor API documentation improvements.
> 
> RFC v3:
>  o Add stats_get() function to the version.map file.
> ---
>  MAINTAINERS                     |   3 +
>  lib/dispatcher/meson.build      |  17 +
>  lib/dispatcher/rte_dispatcher.c | 791
> ++++++++++++++++++++++++++++++++  lib/dispatcher/rte_dispatcher.h |
> 480 +++++++++++++++++++
>  lib/dispatcher/version.map      |  20 +
>  lib/meson.build                 |   2 +
>  6 files changed, 1313 insertions(+)
>  create mode 100644 lib/dispatcher/meson.build  create mode 100644
> lib/dispatcher/rte_dispatcher.c  create mode 100644
> lib/dispatcher/rte_dispatcher.h  create mode 100644
> lib/dispatcher/version.map
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index a926155f26..6704cd5b2c 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1726,6 +1726,9 @@ M: Nithin Dabilpuram
> <ndabilpuram at marvell.com>
>  M: Pavan Nikhilesh <pbhagavatula at marvell.com>
>  F: lib/node/
> 
> +Dispatcher - EXPERIMENTAL
> +M: Mattias Rönnblom <mattias.ronnblom at ericsson.com>
> +F: lib/dispatcher/
> 
>  Test Applications
>  -----------------
> diff --git a/lib/dispatcher/meson.build b/lib/dispatcher/meson.build new file
> mode 100644 index 0000000000..c6054a3a5d
> --- /dev/null
> +++ b/lib/dispatcher/meson.build
> @@ -0,0 +1,17 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2023 Ericsson AB
> +
> +if is_windows
> +    build = false
> +    reason = 'not supported on Windows'
> +    subdir_done()
> +endif
> +
> +sources = files(
> +        'rte_dispatcher.c',
> +)
> +headers = files(
> +        'rte_dispatcher.h',
> +)
> +
> +deps += ['eventdev']
> diff --git a/lib/dispatcher/rte_dispatcher.c b/lib/dispatcher/rte_dispatcher.c
> new file mode 100644 index 0000000000..3319fe09f2
> --- /dev/null
> +++ b/lib/dispatcher/rte_dispatcher.c
> @@ -0,0 +1,791 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2023 Ericsson AB
> + */
> +
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_common.h>
> +#include <rte_lcore.h>
> +#include <rte_random.h>
> +#include <rte_service_component.h>
> +
> +#include "eventdev_pmd.h"
> +
> +#include <rte_dispatcher.h>
> +
> +#define EVD_MAX_PORTS_PER_LCORE 4
> +#define EVD_MAX_HANDLERS 32
> +#define EVD_MAX_FINALIZERS 16
> +#define EVD_AVG_PRIO_INTERVAL 2000
> +
> +struct rte_dispatcher_lcore_port {
> +	uint8_t port_id;
> +	uint16_t batch_size;
> +	uint64_t timeout;
> +};
> +
> +struct rte_dispatcher_handler {
> +	int id;
> +	rte_dispatcher_match_t match_fun;
> +	void *match_data;
> +	rte_dispatcher_process_t process_fun;
> +	void *process_data;
> +};
> +
> +struct rte_dispatcher_finalizer {
> +	int id;
> +	rte_dispatcher_finalize_t finalize_fun;
> +	void *finalize_data;
> +};
> +
> +struct rte_dispatcher_lcore {
> +	uint8_t num_ports;
> +	uint16_t num_handlers;
> +	int32_t prio_count;
> +	struct rte_dispatcher_lcore_port
> ports[EVD_MAX_PORTS_PER_LCORE];
> +	struct rte_dispatcher_handler handlers[EVD_MAX_HANDLERS];
> +	struct rte_dispatcher_stats stats;
> +} __rte_cache_aligned;
> +
> +struct rte_dispatcher {
> +	uint8_t id;
> +	uint8_t event_dev_id;
> +	int socket_id;
> +	uint32_t service_id;
> +	struct rte_dispatcher_lcore lcores[RTE_MAX_LCORE];
> +	uint16_t num_finalizers;
> +	struct rte_dispatcher_finalizer finalizers[EVD_MAX_FINALIZERS]; };
> +
> +static struct rte_dispatcher *dispatchers[UINT8_MAX];
> +
> +static bool
> +evd_has_dispatcher(uint8_t id)
> +{
> +	return dispatchers[id] != NULL;
> +}
> +
> +static struct rte_dispatcher *
> +evd_get_dispatcher(uint8_t id)
> +{
> +	return dispatchers[id];
> +}
> +
> +static void
> +evd_set_dispatcher(uint8_t id, struct rte_dispatcher *dispatcher) {
> +	dispatchers[id] = dispatcher;
> +}
> +
> +#define EVD_VALID_ID_OR_RET_EINVAL(id)
> 	\
> +	do {								\
> +		if (unlikely(!evd_has_dispatcher(id))) {		\
> +			RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id);
> \
> +			return -EINVAL;					\
> +		}							\
> +	} while (0)
> +
> +static int
> +evd_lookup_handler_idx(struct rte_dispatcher_lcore *lcore,
> +		       const struct rte_event *event) {
> +	uint16_t i;
> +
> +	for (i = 0; i < lcore->num_handlers; i++) {
> +		struct rte_dispatcher_handler *handler =
> +			&lcore->handlers[i];
> +
> +		if (handler->match_fun(event, handler->match_data))
> +			return i;
> +	}
> +
> +	return -1;
> +}
> +
> +static void
> +evd_prioritize_handler(struct rte_dispatcher_lcore *lcore,
> +		       int handler_idx)
> +{
> +	struct rte_dispatcher_handler tmp;
> +
> +	if (handler_idx == 0)
> +		return;
> +
> +	/* Let the lucky handler "bubble" up the list */
> +
> +	tmp = lcore->handlers[handler_idx - 1];
> +
> +	lcore->handlers[handler_idx - 1] = lcore->handlers[handler_idx];
> +
> +	lcore->handlers[handler_idx] = tmp;
> +}
> +
> +static inline void
> +evd_consider_prioritize_handler(struct rte_dispatcher_lcore *lcore,
> +				int handler_idx, uint16_t handler_events) {
> +	lcore->prio_count -= handler_events;
> +
> +	if (unlikely(lcore->prio_count <= 0)) {
> +		evd_prioritize_handler(lcore, handler_idx);
> +
> +		/*
> +		 * Randomize the interval in the unlikely case
> +		 * the traffic follow some very strict pattern.
> +		 */
> +		lcore->prio_count =
> +			rte_rand_max(EVD_AVG_PRIO_INTERVAL) +
> +			EVD_AVG_PRIO_INTERVAL / 2;
> +	}
> +}
> +
> +static inline void
> +evd_dispatch_events(struct rte_dispatcher *dispatcher,
> +		    struct rte_dispatcher_lcore *lcore,
> +		    struct rte_dispatcher_lcore_port *port,
> +		    struct rte_event *events, uint16_t num_events) {
> +	int i;
> +	struct rte_event bursts[EVD_MAX_HANDLERS][num_events];
> +	uint16_t burst_lens[EVD_MAX_HANDLERS] = { 0 };
> +	uint16_t drop_count = 0;
> +	uint16_t dispatch_count;
> +	uint16_t dispatched = 0;
> +
> +	for (i = 0; i < num_events; i++) {
> +		struct rte_event *event = &events[i];
> +		int handler_idx;
> +
> +		handler_idx = evd_lookup_handler_idx(lcore, event);
> +
> +		if (unlikely(handler_idx < 0)) {
> +			drop_count++;
> +			continue;
> +		}
> +
> +		bursts[handler_idx][burst_lens[handler_idx]] = *event;
> +		burst_lens[handler_idx]++;
> +	}
> +
> +	dispatch_count = num_events - drop_count;
> +
> +	for (i = 0; i < lcore->num_handlers &&
> +		 dispatched < dispatch_count; i++) {
> +		struct rte_dispatcher_handler *handler =
> +			&lcore->handlers[i];
> +		uint16_t len = burst_lens[i];
> +
> +		if (len == 0)
> +			continue;
> +
> +		handler->process_fun(dispatcher->event_dev_id, port-
> >port_id,
> +				     bursts[i], len, handler->process_data);
> +
> +		dispatched += len;
> +
> +		/*
> +		 * Safe, since any reshuffling will only involve
> +		 * already-processed handlers.
> +		 */
> +		evd_consider_prioritize_handler(lcore, i, len);
> +	}
> +
> +	lcore->stats.ev_batch_count++;
> +	lcore->stats.ev_dispatch_count += dispatch_count;
> +	lcore->stats.ev_drop_count += drop_count;
> +
> +	for (i = 0; i < dispatcher->num_finalizers; i++) {
> +		struct rte_dispatcher_finalizer *finalizer =
> +			&dispatcher->finalizers[i];
> +
> +		finalizer->finalize_fun(dispatcher->event_dev_id,
> +					port->port_id,
> +					finalizer->finalize_data);
> +	}
> +}
> +
> +static __rte_always_inline uint16_t
> +evd_port_dequeue(struct rte_dispatcher *dispatcher,
> +		 struct rte_dispatcher_lcore *lcore,
> +		 struct rte_dispatcher_lcore_port *port) {
> +	uint16_t batch_size = port->batch_size;
> +	struct rte_event events[batch_size];
> +	uint16_t n;
> +
> +	n = rte_event_dequeue_burst(dispatcher->event_dev_id, port-
> >port_id,
> +				    events, batch_size, port->timeout);
> +
> +	if (likely(n > 0))
> +		evd_dispatch_events(dispatcher, lcore, port, events, n);
> +
> +	lcore->stats.poll_count++;
> +
> +	return n;
> +}
> +
> +static __rte_always_inline uint16_t
> +evd_lcore_process(struct rte_dispatcher *dispatcher,
> +		  struct rte_dispatcher_lcore *lcore) {
> +	uint16_t i;
> +	uint16_t event_count = 0;
> +
> +	for (i = 0; i < lcore->num_ports; i++) {
> +		struct rte_dispatcher_lcore_port *port =
> +			&lcore->ports[i];
> +
> +		event_count += evd_port_dequeue(dispatcher, lcore, port);
> +	}
> +
> +	return event_count;
> +}
> +
> +static int32_t
> +evd_process(void *userdata)
> +{
> +	struct rte_dispatcher *dispatcher = userdata;
> +	unsigned int lcore_id = rte_lcore_id();
> +	struct rte_dispatcher_lcore *lcore =
> +		&dispatcher->lcores[lcore_id];
> +	uint64_t event_count;
> +
> +	event_count = evd_lcore_process(dispatcher, lcore);
> +
> +	if (unlikely(event_count == 0))
> +		return -EAGAIN;
> +
> +	return 0;
> +}
> +
> +static int
> +evd_service_register(struct rte_dispatcher *dispatcher) {
> +	struct rte_service_spec service = {
> +		.callback = evd_process,
> +		.callback_userdata = dispatcher,
> +		.capabilities = RTE_SERVICE_CAP_MT_SAFE,
> +		.socket_id = dispatcher->socket_id
> +	};
> +	int rc;
> +
> +	snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "evd_%d",
> +		 dispatcher->id);
> +
> +	rc = rte_service_component_register(&service,
> +&dispatcher->service_id);
> +
> +	if (rc)
> +		RTE_EDEV_LOG_ERR("Registration of dispatcher service "
> +				 "%s failed with error code %d\n",
> +				 service.name, rc);
> +
> +	return rc;
> +}
> +
> +static int
> +evd_service_unregister(struct rte_dispatcher *dispatcher) {
> +	int rc;
> +
> +	rc = rte_service_component_unregister(dispatcher->service_id);
> +
> +	if (rc)
> +		RTE_EDEV_LOG_ERR("Unregistration of dispatcher service "
> +				 "failed with error code %d\n", rc);
> +
> +	return rc;
> +}
> +
> +int
> +rte_dispatcher_create(uint8_t id, uint8_t event_dev_id) {
> +	int socket_id;
> +	struct rte_dispatcher *dispatcher;
> +	int rc;
> +
> +	if (evd_has_dispatcher(id)) {
> +		RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
> +				 id);
> +		return -EEXIST;
> +	}
> +
> +	socket_id = rte_event_dev_socket_id(event_dev_id);
> +
> +	dispatcher =
> +		rte_malloc_socket("dispatcher", sizeof(struct rte_dispatcher),
> +				  RTE_CACHE_LINE_SIZE, socket_id);
> +
> +	if (dispatcher == NULL) {
> +		RTE_EDEV_LOG_ERR("Unable to allocate memory for
> dispatcher\n");
> +		return -ENOMEM;
> +	}
> +
> +	*dispatcher = (struct rte_dispatcher) {
> +		.id = id,
> +		.event_dev_id = event_dev_id,
> +		.socket_id = socket_id
> +	};
> +
> +	rc = evd_service_register(dispatcher);
> +
> +	if (rc < 0) {
> +		rte_free(dispatcher);
> +		return rc;
> +	}
> +
> +	evd_set_dispatcher(id, dispatcher);
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_free(uint8_t id)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	int rc;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	rc = evd_service_unregister(dispatcher);
> +
> +	if (rc)
> +		return rc;
> +
> +	evd_set_dispatcher(id, NULL);
> +
> +	rte_free(dispatcher);
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_service_id_get(uint8_t id, uint32_t *service_id) {
> +	struct rte_dispatcher *dispatcher;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
Service_id pointer needs to be validated for NULL before accessing
> +	*service_id = dispatcher->service_id;
> +
> +	return 0;
> +}
> +
> +static int
> +lcore_port_index(struct rte_dispatcher_lcore *lcore,
> +		 uint8_t event_port_id)
> +{
> +	uint16_t i;
> +
> +	for (i = 0; i < lcore->num_ports; i++) {
> +		struct rte_dispatcher_lcore_port *port =
> +			&lcore->ports[i];
> +
> +		if (port->port_id == event_port_id)
> +			return i;
> +	}
> +
> +	return -1;
> +}
> +
> +int
> +rte_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> +					uint16_t batch_size, uint64_t
> timeout,
> +					unsigned int lcore_id)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	struct rte_dispatcher_lcore *lcore;
> +	struct rte_dispatcher_lcore_port *port;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	lcore =	&dispatcher->lcores[lcore_id];
> +
> +	if (lcore->num_ports == EVD_MAX_PORTS_PER_LCORE)
> +		return -ENOMEM;
> +
> +	if (lcore_port_index(lcore, event_port_id) >= 0)
> +		return -EEXIST;
> +
> +	port = &lcore->ports[lcore->num_ports];
> +
> +	*port = (struct rte_dispatcher_lcore_port) {
> +		.port_id = event_port_id,
> +		.batch_size = batch_size,
> +		.timeout = timeout
> +	};
> +
> +	lcore->num_ports++;
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> +					    unsigned int lcore_id)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	struct rte_dispatcher_lcore *lcore;
> +	int port_idx;
> +	struct rte_dispatcher_lcore_port *port;
> +	struct rte_dispatcher_lcore_port *last;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	lcore =	&dispatcher->lcores[lcore_id];
> +
> +	port_idx = lcore_port_index(lcore, event_port_id);
> +
> +	if (port_idx < 0)
> +		return -ENOENT;
> +
> +	port = &lcore->ports[port_idx];
> +	last = &lcore->ports[lcore->num_ports - 1];
> +
> +	if (port != last)
> +		*port = *last;
> +
> +	lcore->num_ports--;
> +
> +	return 0;
> +}
> +
> +static struct rte_dispatcher_handler*
> +evd_lcore_get_handler_by_id(struct rte_dispatcher_lcore *lcore,
> +			    int handler_id)
> +{
> +	uint16_t i;
> +
> +	for (i = 0; i < lcore->num_handlers; i++) {
> +		struct rte_dispatcher_handler *handler =
> +			&lcore->handlers[i];
> +
> +		if (handler->id == handler_id)
> +			return handler;
> +	}
> +
> +	return NULL;
> +}
> +
> +static int
> +evd_alloc_handler_id(struct rte_dispatcher *dispatcher) {
> +	int handler_id = 0;
> +	struct rte_dispatcher_lcore *reference_lcore =
> +		&dispatcher->lcores[0];
> +
> +	if (reference_lcore->num_handlers == EVD_MAX_HANDLERS)
> +		return -1;
> +
> +	while (evd_lcore_get_handler_by_id(reference_lcore, handler_id) !=
> NULL)
> +		handler_id++;
> +
> +	return handler_id;
> +}
> +
> +static void
> +evd_lcore_install_handler(struct rte_dispatcher_lcore *lcore,
> +		    const struct rte_dispatcher_handler *handler) {
> +	int handler_idx = lcore->num_handlers;
> +
> +	lcore->handlers[handler_idx] = *handler;
> +	lcore->num_handlers++;
> +}
> +
> +static void
> +evd_install_handler(struct rte_dispatcher *dispatcher,
> +		    const struct rte_dispatcher_handler *handler) {
> +	int i;
> +
> +	for (i = 0; i < RTE_MAX_LCORE; i++) {
> +		struct rte_dispatcher_lcore *lcore =
> +			&dispatcher->lcores[i];
> +		evd_lcore_install_handler(lcore, handler);
> +	}
> +}
> +
> +int
> +rte_dispatcher_register(uint8_t id,
> +			      rte_dispatcher_match_t match_fun,
> +			      void *match_data,
> +			      rte_dispatcher_process_t process_fun,
> +			      void *process_data)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	struct rte_dispatcher_handler handler = {
> +		.match_fun = match_fun,
> +		.match_data = match_data,
We can have a default function which uses queue_id as matching data.
This reduces the application load to provide two callbacks, one for matching and one for processing the event.
Application can pass NULL parameter for "match_fun", in that case default function pointer can be used here.
> +		.process_fun = process_fun,
> +		.process_data = process_data
> +	};
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	handler.id = evd_alloc_handler_id(dispatcher);
> +
> +	if (handler.id < 0)
> +		return -ENOMEM;
> +
> +	evd_install_handler(dispatcher, &handler);
> +
> +	return handler.id;
> +}
> +
> +static int
> +evd_lcore_uninstall_handler(struct rte_dispatcher_lcore *lcore,
> +			    int handler_id)
> +{
> +	struct rte_dispatcher_handler *unreg_handler;
> +	int handler_idx;
> +	uint16_t last_idx;
> +
> +	unreg_handler = evd_lcore_get_handler_by_id(lcore, handler_id);
> +
> +	if (unreg_handler == NULL)
> +		return -EINVAL;
> +
Shouldn't the logic be " handler_idx = unreg_handler - &lcore->handlers[0];"
Because, unreg_handler will be a higher or equal address to the handler base address (&lcore->handlers[0])
> +	handler_idx = &lcore->handlers[0] - unreg_handler;
> +
> +	last_idx = lcore->num_handlers - 1;
> +
> +	if (handler_idx != last_idx) {
> +		/* move all handlers to maintain handler order */
> +		int n = last_idx - handler_idx;
> +		memmove(unreg_handler, unreg_handler + 1,
> +			sizeof(struct rte_dispatcher_handler) * n);
> +	}
> +
> +	lcore->num_handlers--;
> +
> +	return 0;
> +}
> +
> +static int
> +evd_uninstall_handler(struct rte_dispatcher *dispatcher,
> +		      int handler_id)
> +{
> +	unsigned int lcore_id;
> +
> +	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> +		struct rte_dispatcher_lcore *lcore =
> +			&dispatcher->lcores[lcore_id];
> +		int rc;
> +
> +		rc = evd_lcore_uninstall_handler(lcore, handler_id);
> +
> +		if (rc < 0)
> +			return rc;
> +	}
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_unregister(uint8_t id, int handler_id) {
> +	struct rte_dispatcher *dispatcher;
> +	int rc;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	rc = evd_uninstall_handler(dispatcher, handler_id);
> +
> +	return rc;
> +}
> +
> +static struct rte_dispatcher_finalizer* evd_get_finalizer_by_id(struct
> +rte_dispatcher *dispatcher,
> +		       int handler_id)
> +{
> +	int i;
> +
> +	for (i = 0; i < dispatcher->num_finalizers; i++) {
> +		struct rte_dispatcher_finalizer *finalizer =
> +			&dispatcher->finalizers[i];
> +
> +		if (finalizer->id == handler_id)
> +			return finalizer;
> +	}
> +
> +	return NULL;
> +}
> +
> +static int
> +evd_alloc_finalizer_id(struct rte_dispatcher *dispatcher) {
> +	int finalizer_id = 0;
> +
> +	while (evd_get_finalizer_by_id(dispatcher, finalizer_id) != NULL)
> +		finalizer_id++;
> +
> +	return finalizer_id;
> +}
> +
> +static struct rte_dispatcher_finalizer * evd_alloc_finalizer(struct
> +rte_dispatcher *dispatcher) {
> +	int finalizer_idx;
> +	struct rte_dispatcher_finalizer *finalizer;
> +
> +	if (dispatcher->num_finalizers == EVD_MAX_FINALIZERS)
> +		return NULL;
> +
> +	finalizer_idx = dispatcher->num_finalizers;
> +	finalizer = &dispatcher->finalizers[finalizer_idx];
> +
> +	finalizer->id = evd_alloc_finalizer_id(dispatcher);
> +
> +	dispatcher->num_finalizers++;
> +
> +	return finalizer;
> +}
> +
> +int
> +rte_dispatcher_finalize_register(uint8_t id,
> +			      rte_dispatcher_finalize_t finalize_fun,
> +			      void *finalize_data)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	struct rte_dispatcher_finalizer *finalizer;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	finalizer = evd_alloc_finalizer(dispatcher);
> +
> +	if (finalizer == NULL)
> +		return -ENOMEM;
> +
> +	finalizer->finalize_fun = finalize_fun;
> +	finalizer->finalize_data = finalize_data;
> +
> +	return finalizer->id;
> +}
> +
> +int
> +rte_dispatcher_finalize_unregister(uint8_t id, int handler_id) {
> +	struct rte_dispatcher *dispatcher;
> +	struct rte_dispatcher_finalizer *unreg_finalizer;
> +	int finalizer_idx;
> +	uint16_t last_idx;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	unreg_finalizer = evd_get_finalizer_by_id(dispatcher, handler_id);
> +
> +	if (unreg_finalizer == NULL)
> +		return -EINVAL;
> +
Same as above comment in rte_dispatcher_unregister, base address needs to be subtracted from unreg_finalizer
> +	finalizer_idx = &dispatcher->finalizers[0] - unreg_finalizer;
> +
> +	last_idx = dispatcher->num_finalizers - 1;
> +
> +	if (finalizer_idx != last_idx) {
> +		/* move all finalizers to maintain order */
> +		int n = last_idx - finalizer_idx;
> +		memmove(unreg_finalizer, unreg_finalizer + 1,
> +			sizeof(struct rte_dispatcher_finalizer) * n);
> +	}
> +
> +	dispatcher->num_finalizers--;
> +
> +	return 0;
> +}
> +
> +static int
> +evd_set_service_runstate(uint8_t id, int state) {
> +	struct rte_dispatcher *dispatcher;
> +	int rc;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +	rc = rte_service_component_runstate_set(dispatcher->service_id,
> +						state);
> +
> +	if (rc != 0) {
> +		RTE_EDEV_LOG_ERR("Unexpected error %d occurred while
> setting "
> +				 "service component run state to %d\n", rc,
> +				 state);
> +		RTE_ASSERT(0);
> +	}
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_start(uint8_t id)
> +{
> +	return evd_set_service_runstate(id, 1); }
> +
> +int
> +rte_dispatcher_stop(uint8_t id)
> +{
> +	return evd_set_service_runstate(id, 0); }
> +
> +static void
> +evd_aggregate_stats(struct rte_dispatcher_stats *result,
> +		    const struct rte_dispatcher_stats *part) {
> +	result->poll_count += part->poll_count;
> +	result->ev_batch_count += part->ev_batch_count;
> +	result->ev_dispatch_count += part->ev_dispatch_count;
> +	result->ev_drop_count += part->ev_drop_count; }
> +
> +int
> +rte_dispatcher_stats_get(uint8_t id,
> +			       struct rte_dispatcher_stats *stats) {
> +	struct rte_dispatcher *dispatcher;
> +	unsigned int lcore_id;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
Stats pointer needs to be validated for NULL before accessing
> +	*stats = (struct rte_dispatcher_stats) {};
> +
> +	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> +		struct rte_dispatcher_lcore *lcore =
> +			&dispatcher->lcores[lcore_id];
> +
> +		evd_aggregate_stats(stats, &lcore->stats);
> +	}
> +
> +	return 0;
> +}
> +
> +int
> +rte_dispatcher_stats_reset(uint8_t id)
> +{
> +	struct rte_dispatcher *dispatcher;
> +	unsigned int lcore_id;
> +
> +	EVD_VALID_ID_OR_RET_EINVAL(id);
> +	dispatcher = evd_get_dispatcher(id);
> +
> +
> +	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> +		struct rte_dispatcher_lcore *lcore =
> +			&dispatcher->lcores[lcore_id];
> +
> +		lcore->stats = (struct rte_dispatcher_stats) {};
> +	}
> +
> +	return 0;
> +
> +}
> diff --git a/lib/dispatcher/rte_dispatcher.h b/lib/dispatcher/rte_dispatcher.h
> new file mode 100644 index 0000000000..6712687a08
> --- /dev/null
> +++ b/lib/dispatcher/rte_dispatcher.h
> @@ -0,0 +1,480 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2023 Ericsson AB
> + */
> +
> +#ifndef __RTE_DISPATCHER_H__
> +#define __RTE_DISPATCHER_H__
> +
> +/**
> + * @file
> + *
> + * RTE Dispatcher
> + *
> + * The purpose of the dispatcher is to help decouple different parts
> + * of an application (e.g., modules), sharing the same underlying
> + * event device.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_eventdev.h>
> +
> +/**
> + * Function prototype for match callbacks.
> + *
> + * Match callbacks are used by an application to decide how the
> + * dispatcher distributes events to different parts of the
> + * application.
> + *
> + * The application is not expected to process the event at the point
> + * of the match call. Such matters should be deferred to the process
> + * callback invocation.
> + *
> + * The match callback may be used as an opportunity to prefetch data.
> + *
> + * @param event
> + *  Pointer to event
> + *
> + * @param cb_data
> + *  The pointer supplied by the application in
> + *  rte_dispatcher_register().
> + *
> + * @return
> + *   Returns true in case this events should be delivered (via
> + *   the process callback), and false otherwise.
> + */
> +typedef bool
> +(*rte_dispatcher_match_t)(const struct rte_event *event, void
> +*cb_data);
> +
> +/**
> + * Function prototype for process callbacks.
> + *
> + * The process callbacks are used by the dispatcher to deliver
> + * events for processing.
> + *
> + * @param event_dev_id
> + *  The originating event device id.
> + *
> + * @param event_port_id
> + *  The originating event port.
> + *
> + * @param events
> + *  Pointer to an array of events.
> + *
> + * @param num
> + *  The number of events in the @p events array.
> + *
> + * @param cb_data
> + *  The pointer supplied by the application in
> + *  rte_dispatcher_register().
> + */
> +
> +typedef void
> +(*rte_dispatcher_process_t)(uint8_t event_dev_id, uint8_t event_port_id,
> +				  struct rte_event *events, uint16_t num,
> +				  void *cb_data);
> +
> +/**
> + * Function prototype for finalize callbacks.
> + *
> + * The finalize callbacks are used by the dispatcher to notify the
> + * application it has delivered all events from a particular batch
> + * dequeued from the event device.
> + *
> + * @param event_dev_id
> + *  The originating event device id.
> + *
> + * @param event_port_id
> + *  The originating event port.
> + *
> + * @param cb_data
> + *  The pointer supplied by the application in
> + *  rte_dispatcher_finalize_register().
> + */
> +
> +typedef void
> +(*rte_dispatcher_finalize_t)(uint8_t event_dev_id, uint8_t event_port_id,
> +				   void *cb_data);
> +
> +/**
> + * Dispatcher statistics
> + */
> +struct rte_dispatcher_stats {
> +	uint64_t poll_count;
> +	/**< Number of event dequeue calls made toward the event device. */
> +	uint64_t ev_batch_count;
> +	/**< Number of non-empty event batches dequeued from event
> device.*/
> +	uint64_t ev_dispatch_count;
> +	/**< Number of events dispatched to a handler.*/
> +	uint64_t ev_drop_count;
> +	/**< Number of events dropped because no handler was found. */ };
> +
> +/**
> + * Create a dispatcher with the specified id.
> + *
> + * @param id
> + *  An application-specified, unique (across all dispatcher
> + *  instances) identifier.
> + *
> + * @param event_dev_id
> + *  The identifier of the event device from which this dispatcher
> + *  will dequeue events.
> + *
> + * @return
> + *   - 0: Success
> + *   - <0: Error code on failure
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_create(uint8_t id, uint8_t event_dev_id);
> +
> +/**
> + * Free a dispatcher.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_free(uint8_t id);
> +
> +/**
> + * Retrieve the service identifier of a dispatcher.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param [out] service_id
> + *  A pointer to a caller-supplied buffer where the dispatcher's
> + *  service id will be stored.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
> +
> +/**
> + * Binds an event device port to a specific lcore on the specified
> + * dispatcher.
> + *
> + * This function configures the event port id to be used by the event
> + * dispatcher service, if run on the specified lcore.
> + *
> + * Multiple event device ports may be bound to the same lcore. A
> + * particular port must not be bound to more than one lcore.
> + *
> + * If the dispatcher service is mapped (with
> +rte_service_map_lcore_set())
> + * to a lcore to which no ports are bound, the service function will be
> +a
> + * no-operation.
> + *
> + * This function may be called by any thread (including unregistered
> + * non-EAL threads), but not while the dispatcher is running on lcore
> + * specified by @c lcore_id.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param event_port_id
> + *  The event device port identifier.
> + *
> + * @param batch_size
> + *  The batch size to use in rte_event_dequeue_burst(), for the
> + *  configured event device port and lcore.
> + *
> + * @param timeout
> + *  The timeout parameter to use in rte_event_dequeue_burst(), for the
> + *  configured event device port and lcore.
> + *
> + * @param lcore_id
> + *  The lcore by which this event port will be used.
> + *
> + * @return
> + *  - 0: Success
> + *  - -ENOMEM: Unable to allocate sufficient resources.
> + *  - -EEXISTS: Event port is already configured.
> + *  - -EINVAL: Invalid arguments.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> +					uint16_t batch_size, uint64_t
> timeout,
> +					unsigned int lcore_id);
> +
> +/**
> + * Unbind an event device port from a specific lcore.
> + *
> + * This function may be called by any thread (including unregistered
> + * non-EAL threads), but not while the dispatcher is running on
> + * lcore specified by @c lcore_id.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param event_port_id
> + *  The event device port identifier.
> + *
> + * @param lcore_id
> + *  The lcore which was using this event port.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: Invalid @c id.
> + *  - -ENOENT: Event port id not bound to this @c lcore_id.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> +					    unsigned int lcore_id);
> +
> +/**
> + * Register an event handler.
> + *
> + * The match callback function is used to select if a particular event
> + * should be delivered, using the corresponding process callback
> + * function.
> + *
> + * The reason for having two distinct steps is to allow the dispatcher
> + * to deliver all events as a batch. This in turn will cause
> + * processing of a particular kind of events to happen in a
> + * back-to-back manner, improving cache locality.
> + *
> + * The list of handler callback functions is shared among all lcores,
> + * but will only be executed on lcores which has an eventdev port
> + * bound to them, and which are running the dispatcher service.
> + *
> + * An event is delivered to at most one handler. Events where no
> + * handler is found are dropped.
> + *
> + * The application must not depend on the order of which the match
> + * functions are invoked.
> + *
> + * Ordering of events is not guaranteed to be maintained between
> + * different deliver callbacks. For example, suppose there are two
> + * callbacks registered, matching different subsets of events arriving
> + * on an atomic queue. A batch of events [ev0, ev1, ev2] are dequeued
> + * on a particular port, all pertaining to the same flow. The match
> + * callback for registration A returns true for ev0 and ev2, and the
> + * matching function for registration B for ev1. In that scenario, the
> + * dispatcher may choose to deliver first [ev0, ev2] using A's deliver
> + * function, and then [ev1] to B - or vice versa.
> + *
> + * rte_dispatcher_register() may be called by any thread
> + * (including unregistered non-EAL threads), but not while the event
> + * dispatcher is running on any service lcore.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param match_fun
> + *  The match callback function.
> + *
> + * @param match_cb_data
> + *  A pointer to some application-specific opaque data (or NULL),
> + *  which is supplied back to the application when match_fun is
> + *  called.
> + *
> + * @param process_fun
> + *  The process callback function.
> + *
> + * @param process_cb_data
> + *  A pointer to some application-specific opaque data (or NULL),
> + *  which is supplied back to the application when process_fun is
> + *  called.
> + *
> + * @return
> + *  - >= 0: The identifier for this registration.
> + *  - -ENOMEM: Unable to allocate sufficient resources.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_register(uint8_t id,
> +			      rte_dispatcher_match_t match_fun,
> +			      void *match_cb_data,
> +			      rte_dispatcher_process_t process_fun,
> +			      void *process_cb_data);
> +
> +/**
> + * Unregister an event handler.
> + *
> + * This function may be called by any thread (including unregistered
> + * non-EAL threads), but not while the dispatcher is running on
> + * any service lcore.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param handler_id
> + *  The handler registration id returned by the original
> + *  rte_dispatcher_register() call.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: The @c id and/or the @c handler_id parameter was invalid.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_unregister(uint8_t id, int handler_id);
> +
> +/**
> + * Register a finalize callback function.
> + *
> + * An application may optionally install one or more finalize
> + * callbacks.
> + *
> + * All finalize callbacks are invoked by the dispatcher when a
> + * complete batch of events (retrieve using rte_event_dequeue_burst())
> + * have been delivered to the application (or have been dropped).
> + *
> + * The finalize callback is not tied to any particular handler.
> + *
> + * The finalize callback provides an opportunity for the application
> + * to do per-batch processing. One case where this may be useful is if
> + * an event output buffer is used, and is shared among several
> + * handlers. In such a case, proper output buffer flushing may be
> + * assured using a finalize callback.
> + *
> + * rte_dispatcher_finalize_register() may be called by any thread
> + * (including unregistered non-EAL threads), but not while the
> + * dispatcher is running on any service lcore.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param finalize_fun
> + *  The function called after completing the processing of a
> + *  dequeue batch.
> + *
> + * @param finalize_data
> + *  A pointer to some application-specific opaque data (or NULL),
> + *  which is supplied back to the application when @c finalize_fun is
> + *  called.
> + *
> + * @return
> + *  - >= 0: The identifier for this registration.
> + *  - -ENOMEM: Unable to allocate sufficient resources.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_finalize_register(uint8_t id,
> +				 rte_dispatcher_finalize_t finalize_fun,
> +				 void *finalize_data);
> +
> +/**
> + * Unregister a finalize callback.
> + *
> + * This function may be called by any thread (including unregistered
> + * non-EAL threads), but not while the dispatcher is running on
> + * any service lcore.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @param reg_id
> + *  The finalize registration id returned by the original
> + *  rte_dispatcher_finalize_register() call.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: The @c id and/or the @c reg_id parameter was invalid.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_finalize_unregister(uint8_t id, int reg_id);
> +
> +/**
> + * Start a dispatcher instance.
> + *
> + * Enables the dispatcher service.
> + *
> + * The underlying event device must have been started prior to calling
> + * rte_dispatcher_start().
> + *
> + * For the dispatcher to actually perform work (i.e., dispatch
> + * events), its service must have been mapped to one or more service
> + * lcores, and its service run state set to '1'. A dispatcher's
> + * service is retrieved using rte_dispatcher_service_id_get().
> + *
> + * Each service lcore to which the dispatcher is mapped should
> + * have at least one event port configured. Such configuration is
> + * performed by calling rte_dispatcher_bind_port_to_lcore(), prior to
> + * starting the dispatcher.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: Invalid @c id.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_start(uint8_t id);
> +
> +/**
> + * Stop a running dispatcher instance.
> + *
> + * Disables the dispatcher service.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: Invalid @c id.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_stop(uint8_t id);
> +
> +/**
> + * Retrieve statistics for a dispatcher instance.
> + *
> + * This function is MT safe and may be called by any thread
> + * (including unregistered non-EAL threads).
> + *
> + * @param id
> + *  The dispatcher identifier.
> + * @param[out] stats
> + *   A pointer to a structure to fill with statistics.
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: The @c id parameter was invalid.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_stats_get(uint8_t id,
> +			       struct rte_dispatcher_stats *stats);
> +
> +/**
> + * Reset statistics for a dispatcher instance.
> + *
> + * This function may be called by any thread (including unregistered
> + * non-EAL threads), but may not produce the correct result if the
> + * dispatcher is running on any service lcore.
> + *
> + * @param id
> + *  The dispatcher identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - -EINVAL: The @c id parameter was invalid.
> + */
> +__rte_experimental
> +int
> +rte_dispatcher_stats_reset(uint8_t id);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* __RTE_DISPATCHER__ */
> diff --git a/lib/dispatcher/version.map b/lib/dispatcher/version.map new file
> mode 100644 index 0000000000..8f9ad96522
> --- /dev/null
> +++ b/lib/dispatcher/version.map
> @@ -0,0 +1,20 @@
> +EXPERIMENTAL {
> +	global:
> +
> +	# added in 23.11
> +	rte_dispatcher_create;
> +	rte_dispatcher_free;
> +	rte_dispatcher_service_id_get;
> +	rte_dispatcher_bind_port_to_lcore;
> +	rte_dispatcher_unbind_port_from_lcore;
> +	rte_dispatcher_register;
> +	rte_dispatcher_unregister;
> +	rte_dispatcher_finalize_register;
> +	rte_dispatcher_finalize_unregister;
> +	rte_dispatcher_start;
> +	rte_dispatcher_stop;
> +	rte_dispatcher_stats_get;
> +	rte_dispatcher_stats_reset;
> +
> +	local: *;
> +};
> diff --git a/lib/meson.build b/lib/meson.build index
> 099b0ed18a..3093b338d2 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -35,6 +35,7 @@ libraries = [
>          'distributor',
>          'efd',
>          'eventdev',
> +        'dispatcher', # dispatcher depends on eventdev
>          'gpudev',
>          'gro',
>          'gso',
> @@ -81,6 +82,7 @@ optional_libs = [
>          'cfgfile',
>          'compressdev',
>          'cryptodev',
> +        'dispatcher',
>          'distributor',
>          'dmadev',
>          'efd',
> --
> 2.34.1
    
    
More information about the dev
mailing list