[dpdk-dev] [PATCH v2 1/1] eventdev: add new software event timer adapter

Mattias Rönnblom mattias.ronnblom at ericsson.com
Sun Dec 9 20:17:22 CET 2018


On 2018-12-07 21:34, Erik Gabriel Carrillo wrote:
> This patch introduces a new version of the event timer adapter software
> PMD. In the original design, timer event producer lcores in the primary
> and secondary processes enqueued event timers into a ring, and a
> service core in the primary process dequeued them and processed them
> further.  To improve performance, this version does away with the ring
> and lets lcores in both primary and secondary processes insert timers
> directly into timer skiplist data structures; the service core directly
> accesses the lists as well, when looking for timers that have expired.
> 
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
> ---
>   lib/librte_eventdev/rte_event_timer_adapter.c | 687 +++++++++++---------------
>   1 file changed, 275 insertions(+), 412 deletions(-)
> 
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 79070d4..9c528cb 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -7,6 +7,7 @@
>   #include <inttypes.h>
>   #include <stdbool.h>
>   #include <sys/queue.h>
> +#include <assert.h>
>   

You have no assert() calls, from what I can see. Include <rte_debug.h> 
for RTE_ASSERT().

>   #include <rte_memzone.h>
>   #include <rte_memory.h>
> @@ -19,6 +20,7 @@
>   #include <rte_timer.h>
>   #include <rte_service_component.h>
>   #include <rte_cycles.h>
> +#include <rte_random.h>
>   
>   #include "rte_eventdev.h"
>   #include "rte_eventdev_pmd.h"
> @@ -34,7 +36,7 @@ static int evtim_buffer_logtype;
>   
>   static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
>   
> -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
> +static const struct rte_event_timer_adapter_ops swtim_ops;
>   
>   #define EVTIM_LOG(level, logtype, ...) \
>   	rte_log(RTE_LOG_ ## level, logtype, \
> @@ -211,7 +213,7 @@ rte_event_timer_adapter_create_ext(
>   	 * implementation.
>   	 */
>   	if (adapter->ops == NULL)
> -		adapter->ops = &sw_event_adapter_timer_ops;
> +		adapter->ops = &swtim_ops;
>   
>   	/* Allow driver to do some setup */
>   	FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
> @@ -334,7 +336,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
>   	 * implementation.
>   	 */
>   	if (adapter->ops == NULL)
> -		adapter->ops = &sw_event_adapter_timer_ops;
> +		adapter->ops = &swtim_ops;
>   
>   	/* Set fast-path function pointers */
>   	adapter->arm_burst = adapter->ops->arm_burst;
> @@ -491,6 +493,7 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
>   	}
>   
>   	*nb_events_inv = 0;
> +
>   	*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
>   						     &events[tail_idx], n);
>   	if (*nb_events_flushed != n && rte_errno == -EINVAL) {
> @@ -498,137 +501,123 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
>   		(*nb_events_inv)++;
>   	}
>   
> +	if (*nb_events_flushed > 0)
> +		EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event "
> +				  "device", *nb_events_flushed);
> +
>   	bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;
>   }
>   
>   /*
>    * Software event timer adapter implementation
>    */
> -
> -struct rte_event_timer_adapter_sw_data {
> -	/* List of messages for outstanding timers */
> -	TAILQ_HEAD(, msg) msgs_tailq_head;
> -	/* Lock to guard tailq and armed count */
> -	rte_spinlock_t msgs_tailq_sl;
> +struct swtim {
>   	/* Identifier of service executing timer management logic. */
>   	uint32_t service_id;
>   	/* The cycle count at which the adapter should next tick */
>   	uint64_t next_tick_cycles;
> -	/* Incremented as the service moves through phases of an iteration */
> -	volatile int service_phase;
>   	/* The tick resolution used by adapter instance. May have been
>   	 * adjusted from what user requested
>   	 */
>   	uint64_t timer_tick_ns;
>   	/* Maximum timeout in nanoseconds allowed by adapter instance. */
>   	uint64_t max_tmo_ns;
> -	/* Ring containing messages to arm or cancel event timers */
> -	struct rte_ring *msg_ring;
> -	/* Mempool containing msg objects */
> -	struct rte_mempool *msg_pool;
>   	/* Buffered timer expiry events to be enqueued to an event device. */
>   	struct event_buffer buffer;
>   	/* Statistics */
>   	struct rte_event_timer_adapter_stats stats;
> -	/* The number of threads currently adding to the message ring */
> -	rte_atomic16_t message_producer_count;
> +	/* Mempool of timer objects */
> +	struct rte_mempool *tim_pool;
> +	/* Back pointer for convenience */
> +	struct rte_event_timer_adapter *adapter;
> +	/* Identifier of timer data instance */
> +	uint32_t timer_data_id;
> +	/* Track which cores have actually armed a timer */
> +	rte_atomic16_t in_use[RTE_MAX_LCORE];
> +	/* Track which cores' timer lists should be polled */
> +	unsigned int poll_lcores[RTE_MAX_LCORE];
> +	/* The number of lists that should be polled */
> +	int n_poll_lcores;
> +	/* Lock to atomically access the above two variables */
> +	rte_spinlock_t poll_lcores_sl;
>   };
>   
> -enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
> -
> -struct msg {
> -	enum msg_type type;
> -	struct rte_event_timer *evtim;
> -	struct rte_timer tim;
> -	TAILQ_ENTRY(msg) msgs;
> -};
> +static inline struct swtim *
> +swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)
> +{
> +	return adapter->data->adapter_priv;
> +}
>   
>   static void
> -sw_event_timer_cb(struct rte_timer *tim, void *arg)
> +swtim_callback(void *arg)
>   {
> -	int ret;
> +	struct rte_timer *tim = arg;
> +	struct rte_event_timer *evtim = tim->arg;
> +	struct rte_event_timer_adapter *adapter;
> +	struct swtim *sw;
>   	uint16_t nb_evs_flushed = 0;
>   	uint16_t nb_evs_invalid = 0;
>   	uint64_t opaque;
> -	struct rte_event_timer *evtim;
> -	struct rte_event_timer_adapter *adapter;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> +	int ret;
>   
> -	evtim = arg;
>   	opaque = evtim->impl_opaque[1];
>   	adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;
> -	sw_data = adapter->data->adapter_priv;
> +	sw = swtim_pmd_priv(adapter);
>   
> -	ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
> +	ret = event_buffer_add(&sw->buffer, &evtim->ev);
>   	if (ret < 0) {
>   		/* If event buffer is full, put timer back in list with
>   		 * immediate expiry value, so that we process it again on the
>   		 * next iteration.
>   		 */
> -		rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),
> -				     sw_event_timer_cb, evtim);
> +		rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,
> +				    rte_lcore_id(), NULL, evtim);
> +
> +		sw->stats.evtim_retry_count++;
>   
> -		sw_data->stats.evtim_retry_count++;
>   		EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
>   			      "immediate expiry value");
>   	} else {
> -		struct msg *m = container_of(tim, struct msg, tim);
> -		TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
>   		EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
> -		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> +		rte_mempool_put(sw->tim_pool, tim);
> +		sw->stats.evtim_exp_count++;
>   
> -		/* Free the msg object containing the rte_timer now that
> -		 * we've buffered its event successfully.
> -		 */
> -		rte_mempool_put(sw_data->msg_pool, m);
> -
> -		/* Bump the count when we successfully add an expiry event to
> -		 * the buffer.
> -		 */
> -		sw_data->stats.evtim_exp_count++;
> +		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
>   	}
>   
> -	if (event_buffer_batch_ready(&sw_data->buffer)) {
> -		event_buffer_flush(&sw_data->buffer,
> +	if (event_buffer_batch_ready(&sw->buffer)) {
> +		event_buffer_flush(&sw->buffer,
>   				   adapter->data->event_dev_id,
>   				   adapter->data->event_port_id,
>   				   &nb_evs_flushed,
>   				   &nb_evs_invalid);
>   
> -		sw_data->stats.ev_enq_count += nb_evs_flushed;
> -		sw_data->stats.ev_inv_count += nb_evs_invalid;
> +		sw->stats.ev_enq_count += nb_evs_flushed;
> +		sw->stats.ev_inv_count += nb_evs_invalid;
>   	}
>   }
>   
>   static __rte_always_inline uint64_t
>   get_timeout_cycles(struct rte_event_timer *evtim,
> -		   struct rte_event_timer_adapter *adapter)
> +		   const struct rte_event_timer_adapter *adapter)
>   {
> -	uint64_t timeout_ns;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -
> -	sw_data = adapter->data->adapter_priv;
> -	timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;
>   	return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
> -
>   }
>   
>   /* This function returns true if one or more (adapter) ticks have occurred since
>    * the last time it was called.
>    */
>   static inline bool
> -adapter_did_tick(struct rte_event_timer_adapter *adapter)
> +swtim_did_tick(struct swtim *sw)
>   {
>   	uint64_t cycles_per_adapter_tick, start_cycles;
>   	uint64_t *next_tick_cyclesp;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -
> -	sw_data = adapter->data->adapter_priv;
> -	next_tick_cyclesp = &sw_data->next_tick_cycles;
>   
> -	cycles_per_adapter_tick = sw_data->timer_tick_ns *
> +	next_tick_cyclesp = &sw->next_tick_cycles;
> +	cycles_per_adapter_tick = sw->timer_tick_ns *
>   			(rte_get_timer_hz() / NSECPERSEC);
> -
>   	start_cycles = rte_get_timer_cycles();
>   
>   	/* Note: initially, *next_tick_cyclesp == 0, so the clause below will
> @@ -640,7 +629,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)
>   		 * boundary.
>   		 */
>   		start_cycles -= start_cycles % cycles_per_adapter_tick;
> -
>   		*next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
>   
>   		return true;
> @@ -655,15 +643,12 @@ check_timeout(struct rte_event_timer *evtim,
>   	      const struct rte_event_timer_adapter *adapter)
>   {
>   	uint64_t tmo_nsec;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -
> -	sw_data = adapter->data->adapter_priv;
> -	tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   
> -	if (tmo_nsec > sw_data->max_tmo_ns)
> +	tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;
> +	if (tmo_nsec > sw->max_tmo_ns)
>   		return -1;
> -
> -	if (tmo_nsec < sw_data->timer_tick_ns)
> +	if (tmo_nsec < sw->timer_tick_ns)
>   		return -2;
>   
>   	return 0;
> @@ -691,110 +676,34 @@ check_destination_event_queue(struct rte_event_timer *evtim,
>   	return 0;
>   }
>   
> -#define NB_OBJS 32
>   static int
> -sw_event_timer_adapter_service_func(void *arg)
> +swtim_service_func(void *arg)
>   {
> -	int i, num_msgs;
> -	uint64_t cycles, opaque;
> +	struct rte_event_timer_adapter *adapter = arg;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   	uint16_t nb_evs_flushed = 0;
>   	uint16_t nb_evs_invalid = 0;
> -	struct rte_event_timer_adapter *adapter;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	struct rte_event_timer *evtim = NULL;
> -	struct rte_timer *tim = NULL;
> -	struct msg *msg, *msgs[NB_OBJS];
> -
> -	adapter = arg;
> -	sw_data = adapter->data->adapter_priv;
> -
> -	sw_data->service_phase = 1;
> -	rte_smp_wmb();
> -
> -	while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
> -	       !rte_ring_empty(sw_data->msg_ring)) {
> -
> -		num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
> -						  (void **)msgs, NB_OBJS, NULL);
> -
> -		for (i = 0; i < num_msgs; i++) {
> -			int ret = 0;
> -
> -			RTE_SET_USED(ret);
> -
> -			msg = msgs[i];
> -			evtim = msg->evtim;
> -
> -			switch (msg->type) {
> -			case MSG_TYPE_ARM:
> -				EVTIM_SVC_LOG_DBG("dequeued ARM message from "
> -						  "ring");
> -				tim = &msg->tim;
> -				rte_timer_init(tim);
> -				cycles = get_timeout_cycles(evtim,
> -							    adapter);
> -				ret = rte_timer_reset(tim, cycles, SINGLE,
> -						      rte_lcore_id(),
> -						      sw_event_timer_cb,
> -						      evtim);
> -				RTE_ASSERT(ret == 0);
> -
> -				evtim->impl_opaque[0] = (uintptr_t)tim;
> -				evtim->impl_opaque[1] = (uintptr_t)adapter;
> -
> -				TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
> -						  msg,
> -						  msgs);
> -				break;
> -			case MSG_TYPE_CANCEL:
> -				EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
> -						  "from ring");
> -				opaque = evtim->impl_opaque[0];
> -				tim = (struct rte_timer *)(uintptr_t)opaque;
> -				RTE_ASSERT(tim != NULL);
> -
> -				ret = rte_timer_stop(tim);
> -				RTE_ASSERT(ret == 0);
> -
> -				/* Free the msg object for the original arm
> -				 * request.
> -				 */
> -				struct msg *m;
> -				m = container_of(tim, struct msg, tim);
> -				TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
> -					     msgs);
> -				rte_mempool_put(sw_data->msg_pool, m);
> -
> -				/* Free the msg object for the current msg */
> -				rte_mempool_put(sw_data->msg_pool, msg);
> -
> -				evtim->impl_opaque[0] = 0;
> -				evtim->impl_opaque[1] = 0;
> -
> -				break;
> -			}
> -		}
> -	}
> -
> -	sw_data->service_phase = 2;
> -	rte_smp_wmb();
>   
> -	if (adapter_did_tick(adapter)) {
> -		rte_timer_manage();
> +	if (swtim_did_tick(sw)) {
> +		/* This lock is seldom acquired on the arm side */
> +		rte_spinlock_lock(&sw->poll_lcores_sl);
> +		rte_timer_alt_manage(sw->timer_data_id,
> +				     sw->poll_lcores,
> +				     sw->n_poll_lcores,
> +				     swtim_callback);
> +		rte_spinlock_unlock(&sw->poll_lcores_sl);
>   
> -		event_buffer_flush(&sw_data->buffer,
> +		event_buffer_flush(&sw->buffer,
>   				   adapter->data->event_dev_id,
>   				   adapter->data->event_port_id,
> -				   &nb_evs_flushed, &nb_evs_invalid);
> +				   &nb_evs_flushed,
> +				   &nb_evs_invalid);
>   
> -		sw_data->stats.ev_enq_count += nb_evs_flushed;
> -		sw_data->stats.ev_inv_count += nb_evs_invalid;
> -		sw_data->stats.adapter_tick_count++;
> +		sw->stats.ev_enq_count += nb_evs_flushed;
> +		sw->stats.ev_inv_count += nb_evs_invalid;
> +		sw->stats.adapter_tick_count++;
>   	}
>   
> -	sw_data->service_phase = 0;
> -	rte_smp_wmb();
> -
>   	return 0;
>   }
>   
> @@ -828,168 +737,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
>   	return cache_size;
>   }
>   
> -#define SW_MIN_INTERVAL 1E5
> -
>   static int
> -sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
> +swtim_init(struct rte_event_timer_adapter *adapter)
>   {
> -	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	uint64_t nb_timers;
> +	int i, ret;
> +	struct swtim *sw;
>   	unsigned int flags;
>   	struct rte_service_spec service;
> -	static bool timer_subsystem_inited; // static initialized to false
>   
> -	/* Allocate storage for SW implementation data */
> -	char priv_data_name[RTE_RING_NAMESIZE];
> -	snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
> -		 adapter->data->id);
> -	adapter->data->adapter_priv = rte_zmalloc_socket(
> -				priv_data_name,
> -				sizeof(struct rte_event_timer_adapter_sw_data),
> -				RTE_CACHE_LINE_SIZE,
> -				adapter->data->socket_id);
> -	if (adapter->data->adapter_priv == NULL) {
> +	/* Allocate storage for private data area */
> +#define SWTIM_NAMESIZE 32
> +	char swtim_name[SWTIM_NAMESIZE];
> +	snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8,
> +			adapter->data->id);
> +	sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,
> +			adapter->data->socket_id);
> +	if (sw == NULL) {
>   		EVTIM_LOG_ERR("failed to allocate space for private data");
>   		rte_errno = ENOMEM;
>   		return -1;
>   	}
>   
> -	if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {
> -		EVTIM_LOG_ERR("failed to create adapter with requested tick "
> -			      "interval");
> -		rte_errno = EINVAL;
> -		return -1;
> -	}
> -
> -	sw_data = adapter->data->adapter_priv;
> -
> -	sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;
> -	sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;
> +	/* Connect storage to adapter instance */
> +	adapter->data->adapter_priv = sw;
> +	sw->adapter = adapter;
>   
> -	TAILQ_INIT(&sw_data->msgs_tailq_head);
> -	rte_spinlock_init(&sw_data->msgs_tailq_sl);
> -	rte_atomic16_init(&sw_data->message_producer_count);
> -
> -	/* Rings require power of 2, so round up to next such value */
> -	nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
> -
> -	char msg_ring_name[RTE_RING_NAMESIZE];
> -	snprintf(msg_ring_name, RTE_RING_NAMESIZE,
> -		 "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
> -	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
> -		RING_F_SP_ENQ | RING_F_SC_DEQ :
> -		RING_F_SC_DEQ;
> -	sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
> -					    adapter->data->socket_id, flags);
> -	if (sw_data->msg_ring == NULL) {
> -		EVTIM_LOG_ERR("failed to create message ring");
> -		rte_errno = ENOMEM;
> -		goto free_priv_data;
> -	}
> +	sw->timer_tick_ns = adapter->data->conf.timer_tick_ns;
> +	sw->max_tmo_ns = adapter->data->conf.max_tmo_ns;
>   
> -	char pool_name[RTE_RING_NAMESIZE];
> -	snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
> +	/* Create a timer pool */
> +	char pool_name[SWTIM_NAMESIZE];
> +	snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8,
>   		 adapter->data->id);
> -
> -	/* Both the arming/canceling thread and the service thread will do puts
> -	 * to the mempool, but if the SP_PUT flag is enabled, we can specify
> -	 * single-consumer get for the mempool.
> -	 */
> -	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
> -		MEMPOOL_F_SC_GET : 0;
> -
> -	/* The usable size of a ring is count - 1, so subtract one here to
> -	 * make the counts agree.
> -	 */
> +	/* Optimal mempool size is a power of 2 minus one */
> +	uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
>   	int pool_size = nb_timers - 1;
>   	int cache_size = compute_msg_mempool_cache_size(
>   				adapter->data->conf.nb_timers, nb_timers);
> -	sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
> -					       sizeof(struct msg), cache_size,
> -					       0, NULL, NULL, NULL, NULL,
> -					       adapter->data->socket_id, flags);
> -	if (sw_data->msg_pool == NULL) {
> -		EVTIM_LOG_ERR("failed to create message object mempool");
> +	flags = 0; /* pool is multi-producer, multi-consumer */
> +	sw->tim_pool = rte_mempool_create(pool_name, pool_size,
> +			sizeof(struct rte_timer), cache_size, 0, NULL, NULL,
> +			NULL, NULL, adapter->data->socket_id, flags);
> +	if (sw->tim_pool == NULL) {
> +		EVTIM_LOG_ERR("failed to create timer object mempool");
>   		rte_errno = ENOMEM;
> -		goto free_msg_ring;
> +		goto free_alloc;
> +	}
> +
> +	/* Initialize the variables that track in-use timer lists */
> +	rte_spinlock_init(&sw->poll_lcores_sl);
> +	for (i = 0; i < RTE_MAX_LCORE; i++)
> +		rte_atomic16_init(&sw->in_use[i]);
> +
> +	/* Initialize the timer subsystem and allocate timer data instance */
> +	ret = rte_timer_subsystem_init();
> +	if (ret < 0) {
> +		if (ret != -EALREADY) {
> +			EVTIM_LOG_ERR("failed to initialize timer subsystem");
> +			rte_errno = ret;
> +			goto free_mempool;
> +		}
> +	}
> +
> +	ret = rte_timer_data_alloc(&sw->timer_data_id);
> +	if (ret < 0) {
> +		EVTIM_LOG_ERR("failed to allocate timer data instance");
> +		rte_errno = ret;
> +		goto free_mempool;
>   	}
>   
> -	event_buffer_init(&sw_data->buffer);
> +	/* Initialize timer event buffer */
> +	event_buffer_init(&sw->buffer);
> +
> +	sw->adapter = adapter;
>   
>   	/* Register a service component to run adapter logic */
>   	memset(&service, 0, sizeof(service));
>   	snprintf(service.name, RTE_SERVICE_NAME_MAX,
> -		 "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
> +		 "swtim_svc_%"PRIu8, adapter->data->id);
>   	service.socket_id = adapter->data->socket_id;
> -	service.callback = sw_event_timer_adapter_service_func;
> +	service.callback = swtim_service_func;
>   	service.callback_userdata = adapter;
>   	service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
> -	ret = rte_service_component_register(&service, &sw_data->service_id);
> +	ret = rte_service_component_register(&service, &sw->service_id);
>   	if (ret < 0) {
>   		EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
> -			      ": err = %d", service.name, sw_data->service_id,
> +			      ": err = %d", service.name, sw->service_id,
>   			      ret);
>   
>   		rte_errno = ENOSPC;
> -		goto free_msg_pool;
> +		goto free_mempool;
>   	}
>   
>   	EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
> -		      sw_data->service_id);
> +		      sw->service_id);
>   
> -	adapter->data->service_id = sw_data->service_id;
> +	adapter->data->service_id = sw->service_id;
>   	adapter->data->service_inited = 1;
>   
> -	if (!timer_subsystem_inited) {
> -		rte_timer_subsystem_init();
> -		timer_subsystem_inited = true;
> -	}
> -
>   	return 0;
> -
> -free_msg_pool:
> -	rte_mempool_free(sw_data->msg_pool);
> -free_msg_ring:
> -	rte_ring_free(sw_data->msg_ring);
> -free_priv_data:
> -	rte_free(sw_data);
> +free_mempool:
> +	rte_mempool_free(sw->tim_pool);
> +free_alloc:
> +	rte_free(sw);
>   	return -1;
>   }
>   
> -static int
> -sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
> +static void
> +swtim_free_tim(struct rte_timer *tim, void *arg)
>   {
> -	int ret;
> -	struct msg *m1, *m2;
> -	struct rte_event_timer_adapter_sw_data *sw_data =
> -						adapter->data->adapter_priv;
> +	struct swtim *sw = arg;
>   
> -	rte_spinlock_lock(&sw_data->msgs_tailq_sl);
> -
> -	/* Cancel outstanding rte_timers and free msg objects */
> -	m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
> -	while (m1 != NULL) {
> -		EVTIM_LOG_DBG("freeing outstanding timer");
> -		m2 = TAILQ_NEXT(m1, msgs);
> -
> -		rte_timer_stop_sync(&m1->tim);
> -		rte_mempool_put(sw_data->msg_pool, m1);
> +	rte_mempool_put(sw->tim_pool, (void *)tim);
> +}

No cast required.

>   
> -		m1 = m2;
> -	}
> +/* Traverse the list of outstanding timers and put them back in the mempool
> + * before freeing the adapter to avoid leaking the memory.
> + */
> +static int
> +swtim_uninit(struct rte_event_timer_adapter *adapter)
> +{
> +	int ret;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   
> -	rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
> +	/* Free outstanding timers */
> +	rte_timer_stop_all(sw->timer_data_id,
> +			   sw->poll_lcores,
> +			   sw->n_poll_lcores,
> +			   swtim_free_tim,
> +			   sw);
>   
> -	ret = rte_service_component_unregister(sw_data->service_id);
> +	ret = rte_service_component_unregister(sw->service_id);
>   	if (ret < 0) {
>   		EVTIM_LOG_ERR("failed to unregister service component");
>   		return ret;
>   	}
>   
> -	rte_ring_free(sw_data->msg_ring);
> -	rte_mempool_free(sw_data->msg_pool);
> -	rte_free(adapter->data->adapter_priv);
> +	rte_mempool_free(sw->tim_pool);
> +	rte_free(sw);
> +	adapter->data->adapter_priv = NULL;
>   
>   	return 0;
>   }
> @@ -1010,88 +896,79 @@ get_mapped_count_for_service(uint32_t service_id)
>   }
>   
>   static int
> -sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
> +swtim_start(const struct rte_event_timer_adapter *adapter)
>   {
>   	int mapped_count;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -
> -	sw_data = adapter->data->adapter_priv;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   
>   	/* Mapping the service to more than one service core can introduce
>   	 * delays while one thread is waiting to acquire a lock, so only allow
>   	 * one core to be mapped to the service.
> +	 *
> +	 * Note: the service could be modified such that it spreads cores to
> +	 * poll over multiple service instances.
>   	 */
> -	mapped_count = get_mapped_count_for_service(sw_data->service_id);
> +	mapped_count = get_mapped_count_for_service(sw->service_id);
>   
> -	if (mapped_count == 1)
> -		return rte_service_component_runstate_set(sw_data->service_id,
> -							  1);
> +	if (mapped_count != 1)
> +		return mapped_count < 1 ? -ENOENT : -ENOTSUP;
>   
> -	return mapped_count < 1 ? -ENOENT : -ENOTSUP;
> +	return rte_service_component_runstate_set(sw->service_id, 1);
>   }
>   
>   static int
> -sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
> +swtim_stop(const struct rte_event_timer_adapter *adapter)
>   {
>   	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data =
> -						adapter->data->adapter_priv;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   
> -	ret = rte_service_component_runstate_set(sw_data->service_id, 0);
> +	ret = rte_service_component_runstate_set(sw->service_id, 0);
>   	if (ret < 0)
>   		return ret;
>   
> -	/* Wait for the service to complete its final iteration before
> -	 * stopping.
> -	 */
> -	while (sw_data->service_phase != 0)
> +	/* Wait for the service to complete its final iteration */
> +	while (rte_service_may_be_active(sw->service_id))
>   		rte_pause();
>   
> -	rte_smp_rmb();
> -
>   	return 0;
>   }
>   
>   static void
> -sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
> +swtim_get_info(const struct rte_event_timer_adapter *adapter,
>   		struct rte_event_timer_adapter_info *adapter_info)
>   {
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	sw_data = adapter->data->adapter_priv;
> -
> -	adapter_info->min_resolution_ns = sw_data->timer_tick_ns;
> -	adapter_info->max_tmo_ns = sw_data->max_tmo_ns;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	adapter_info->min_resolution_ns = sw->timer_tick_ns;
> +	adapter_info->max_tmo_ns = sw->max_tmo_ns;
>   }
>   
>   static int
> -sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
> -				 struct rte_event_timer_adapter_stats *stats)
> +swtim_stats_get(const struct rte_event_timer_adapter *adapter,
> +		struct rte_event_timer_adapter_stats *stats)
>   {
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	sw_data = adapter->data->adapter_priv;
> -	*stats = sw_data->stats;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	*stats = sw->stats; /* structure copy */
>   	return 0;
>   }
>   
>   static int
> -sw_event_timer_adapter_stats_reset(
> -				const struct rte_event_timer_adapter *adapter)
> +swtim_stats_reset(const struct rte_event_timer_adapter *adapter)
>   {
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	sw_data = adapter->data->adapter_priv;
> -	memset(&sw_data->stats, 0, sizeof(sw_data->stats));
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	memset(&sw->stats, 0, sizeof(sw->stats));
>   	return 0;
>   }
>   
> -static __rte_always_inline uint16_t
> -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> -			  struct rte_event_timer **evtims,
> -			  uint16_t nb_evtims)
> +static uint16_t
> +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> +		struct rte_event_timer **evtims,
> +		uint16_t nb_evtims)
>   {
> -	uint16_t i;
> -	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	struct msg *msgs[nb_evtims];
> +	int i, ret;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	uint32_t lcore_id = rte_lcore_id();
> +	struct rte_timer *tim, *tims[nb_evtims];
> +	uint64_t cycles;
>   
>   #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
>   	/* Check that the service is running. */
> @@ -1101,101 +978,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
>   	}
>   #endif
>   
> -	sw_data = adapter->data->adapter_priv;
> +	/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
> +	 * the highest lcore to insert such timers into
> +	 */
> +	if (lcore_id == LCORE_ID_ANY)
> +		lcore_id = RTE_MAX_LCORE - 1;
> +
> +	/* If this is the first time we're arming an event timer on this lcore,
> +	 * mark this lcore as "in use"; this will cause the service
> +	 * function to process the timer list that corresponds to this lcore.
> +	 */
> +	if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id]))) {

I suspect we have a performance critical false sharing issue above. 
Many/all flags are going to be arranged on the same cache line.

> +		rte_spinlock_lock(&sw->poll_lcores_sl);
> +		EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
> +			      lcore_id);
> +		sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
> +		rte_spinlock_unlock(&sw->poll_lcores_sl);
> +	}
>   
> -	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> +	ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
> +				   nb_evtims);
>   	if (ret < 0) {
>   		rte_errno = ENOSPC;
>   		return 0;
>   	}
>   
> -	/* Let the service know we're producing messages for it to process */
> -	rte_atomic16_inc(&sw_data->message_producer_count);
> -
> -	/* If the service is managing timers, wait for it to finish */
> -	while (sw_data->service_phase == 2)
> -		rte_pause();
> -
> -	rte_smp_rmb();
> -
>   	for (i = 0; i < nb_evtims; i++) {
>   		/* Don't modify the event timer state in these cases */
>   		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
>   			rte_errno = EALREADY;
>   			break;
>   		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> -		    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> +			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
>   		ret = check_timeout(evtims[i], adapter);
> -		if (ret == -1) {
> +		if (unlikely(ret == -1)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
>   			rte_errno = EINVAL;
>   			break;
> -		}
> -		if (ret == -2) {
> +		} else if (unlikely(ret == -2)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		if (check_destination_event_queue(evtims[i], adapter) < 0) {
> +		if (unlikely(check_destination_event_queue(evtims[i],
> +							   adapter) < 0)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		/* Checks passed, set up a message to enqueue */
> -		msgs[i]->type = MSG_TYPE_ARM;
> -		msgs[i]->evtim = evtims[i];
> +		tim = tims[i];
> +		rte_timer_init(tim);
>   
> -		/* Set the payload pointer if not set. */
> -		if (evtims[i]->ev.event_ptr == NULL)
> -			evtims[i]->ev.event_ptr = evtims[i];
> +		evtims[i]->impl_opaque[0] = (uintptr_t)tim;
> +		evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
>   
> -		/* msg objects that get enqueued successfully will be freed
> -		 * either by a future cancel operation or by the timer
> -		 * expiration callback.
> -		 */
> -		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> -			rte_errno = ENOSPC;
> +		cycles = get_timeout_cycles(evtims[i], adapter);
> +		ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
> +					  SINGLE, lcore_id, NULL, evtims[i]);
> +		if (ret < 0) {
> +			/* tim was in RUNNING or CONFIG state */
> +			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			break;
>   		}
>   
> -		EVTIM_LOG_DBG("enqueued ARM message to ring");
> -
> +		rte_smp_wmb();
> +		EVTIM_LOG_DBG("armed an event timer");
>   		evtims[i]->state = RTE_EVENT_TIMER_ARMED;
>   	}
>   
> -	/* Let the service know we're done producing messages */
> -	rte_atomic16_dec(&sw_data->message_producer_count);
> -
>   	if (i < nb_evtims)
> -		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
> -				     nb_evtims - i);
> +		rte_mempool_put_bulk(sw->tim_pool,
> +				     (void **)&tims[i], nb_evtims - i);
>   
>   	return i;
>   }
>   
>   static uint16_t
> -sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> -			 struct rte_event_timer **evtims,
> -			 uint16_t nb_evtims)
> +swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> +		struct rte_event_timer **evtims,
> +		uint16_t nb_evtims)
>   {
> -	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
> +	return __swtim_arm_burst(adapter, evtims, nb_evtims);
>   }
>   
>   static uint16_t
> -sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
> -			    struct rte_event_timer **evtims,
> -			    uint16_t nb_evtims)
> +swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
> +		   struct rte_event_timer **evtims,
> +		   uint16_t nb_evtims)
>   {
> -	uint16_t i;
> -	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	struct msg *msgs[nb_evtims];
> +	int i, ret;
> +	struct rte_timer *timp;
> +	uint64_t opaque;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
>   
>   #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
>   	/* Check that the service is running. */
> @@ -1205,23 +1085,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
>   	}
>   #endif
>   
> -	sw_data = adapter->data->adapter_priv;
> -
> -	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> -	if (ret < 0) {
> -		rte_errno = ENOSPC;
> -		return 0;
> -	}
> -
> -	/* Let the service know we're producing messages for it to process */
> -	rte_atomic16_inc(&sw_data->message_producer_count);
> -
> -	/* If the service could be modifying event timer states, wait */
> -	while (sw_data->service_phase == 2)
> -		rte_pause();
> -
> -	rte_smp_rmb();
> -
>   	for (i = 0; i < nb_evtims; i++) {
>   		/* Don't modify the event timer state in these cases */
>   		if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
> @@ -1232,54 +1095,54 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
>   			break;
>   		}
>   
> -		msgs[i]->type = MSG_TYPE_CANCEL;
> -		msgs[i]->evtim = evtims[i];
> +		opaque = evtims[i]->impl_opaque[0];
> +		timp = (struct rte_timer *)(uintptr_t)opaque;
> +		RTE_ASSERT(timp != NULL);
>   
> -		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> -			rte_errno = ENOSPC;
> +		ret = rte_timer_alt_stop(sw->timer_data_id, timp);
> +		if (ret < 0) {
> +			/* Timer is running or being configured */
> +			rte_errno = EAGAIN;
>   			break;
>   		}
>   
> -		EVTIM_LOG_DBG("enqueued CANCEL message to ring");
> +		rte_mempool_put(sw->tim_pool, (void **)timp);
>   
>   		evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
> -	}
> +		evtims[i]->impl_opaque[0] = 0;
> +		evtims[i]->impl_opaque[1] = 0;
>   
> -	/* Let the service know we're done producing messages */
> -	rte_atomic16_dec(&sw_data->message_producer_count);
> -
> -	if (i < nb_evtims)
> -		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
> -				     nb_evtims - i);
> +		rte_smp_wmb();
> +	}
>   
>   	return i;
>   }
>   
>   static uint16_t
> -sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
> -				  struct rte_event_timer **evtims,
> -				  uint64_t timeout_ticks,
> -				  uint16_t nb_evtims)
> +swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
> +			 struct rte_event_timer **evtims,
> +			 uint64_t timeout_ticks,
> +			 uint16_t nb_evtims)
>   {
>   	int i;
>   
>   	for (i = 0; i < nb_evtims; i++)
>   		evtims[i]->timeout_ticks = timeout_ticks;
>   
> -	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
> +	return __swtim_arm_burst(adapter, evtims, nb_evtims);
>   }
>   
> -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
> -	.init = sw_event_timer_adapter_init,
> -	.uninit = sw_event_timer_adapter_uninit,
> -	.start = sw_event_timer_adapter_start,
> -	.stop = sw_event_timer_adapter_stop,
> -	.get_info = sw_event_timer_adapter_get_info,
> -	.stats_get = sw_event_timer_adapter_stats_get,
> -	.stats_reset = sw_event_timer_adapter_stats_reset,
> -	.arm_burst = sw_event_timer_arm_burst,
> -	.arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
> -	.cancel_burst = sw_event_timer_cancel_burst,
> +static const struct rte_event_timer_adapter_ops swtim_ops = {
> +	.init			= swtim_init,
> +	.uninit			= swtim_uninit,
> +	.start			= swtim_start,
> +	.stop			= swtim_stop,
> +	.get_info		= swtim_get_info,
> +	.stats_get		= swtim_stats_get,
> +	.stats_reset		= swtim_stats_reset,
> +	.arm_burst		= swtim_arm_burst,
> +	.arm_tmo_tick_burst	= swtim_arm_tmo_tick_burst,
> +	.cancel_burst		= swtim_cancel_burst,
>   };
>   
>   RTE_INIT(event_timer_adapter_init_log)
> 


More information about the dev mailing list