[dpdk-dev] [PATCH v2 1/1] eventdev: add new software event timer adapter

Erik Gabriel Carrillo erik.g.carrillo at intel.com
Fri Dec 7 21:34:45 CET 2018


This patch introduces a new version of the event timer adapter software
PMD. In the original design, timer event producer lcores in the primary
and secondary processes enqueued event timers into a ring, and a
service core in the primary process dequeued them and processed them
further.  To improve performance, this version does away with the ring
and lets lcores in both primary and secondary processes insert timers
directly into timer skiplist data structures; the service core directly
accesses the lists as well, when looking for timers that have expired.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 687 +++++++++++---------------
 1 file changed, 275 insertions(+), 412 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 79070d4..9c528cb 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -7,6 +7,7 @@
 #include <inttypes.h>
 #include <stdbool.h>
 #include <sys/queue.h>
+#include <assert.h>
 
 #include <rte_memzone.h>
 #include <rte_memory.h>
@@ -19,6 +20,7 @@
 #include <rte_timer.h>
 #include <rte_service_component.h>
 #include <rte_cycles.h>
+#include <rte_random.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -34,7 +36,7 @@ static int evtim_buffer_logtype;
 
 static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
 
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
+static const struct rte_event_timer_adapter_ops swtim_ops;
 
 #define EVTIM_LOG(level, logtype, ...) \
 	rte_log(RTE_LOG_ ## level, logtype, \
@@ -211,7 +213,7 @@ rte_event_timer_adapter_create_ext(
 	 * implementation.
 	 */
 	if (adapter->ops == NULL)
-		adapter->ops = &sw_event_adapter_timer_ops;
+		adapter->ops = &swtim_ops;
 
 	/* Allow driver to do some setup */
 	FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
@@ -334,7 +336,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
 	 * implementation.
 	 */
 	if (adapter->ops == NULL)
-		adapter->ops = &sw_event_adapter_timer_ops;
+		adapter->ops = &swtim_ops;
 
 	/* Set fast-path function pointers */
 	adapter->arm_burst = adapter->ops->arm_burst;
@@ -491,6 +493,7 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
 	}
 
 	*nb_events_inv = 0;
+
 	*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
 						     &events[tail_idx], n);
 	if (*nb_events_flushed != n && rte_errno == -EINVAL) {
@@ -498,137 +501,123 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
 		(*nb_events_inv)++;
 	}
 
+	if (*nb_events_flushed > 0)
+		EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event "
+				  "device", *nb_events_flushed);
+
 	bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;
 }
 
 /*
  * Software event timer adapter implementation
  */
-
-struct rte_event_timer_adapter_sw_data {
-	/* List of messages for outstanding timers */
-	TAILQ_HEAD(, msg) msgs_tailq_head;
-	/* Lock to guard tailq and armed count */
-	rte_spinlock_t msgs_tailq_sl;
+struct swtim {
 	/* Identifier of service executing timer management logic. */
 	uint32_t service_id;
 	/* The cycle count at which the adapter should next tick */
 	uint64_t next_tick_cycles;
-	/* Incremented as the service moves through phases of an iteration */
-	volatile int service_phase;
 	/* The tick resolution used by adapter instance. May have been
 	 * adjusted from what user requested
 	 */
 	uint64_t timer_tick_ns;
 	/* Maximum timeout in nanoseconds allowed by adapter instance. */
 	uint64_t max_tmo_ns;
-	/* Ring containing messages to arm or cancel event timers */
-	struct rte_ring *msg_ring;
-	/* Mempool containing msg objects */
-	struct rte_mempool *msg_pool;
 	/* Buffered timer expiry events to be enqueued to an event device. */
 	struct event_buffer buffer;
 	/* Statistics */
 	struct rte_event_timer_adapter_stats stats;
-	/* The number of threads currently adding to the message ring */
-	rte_atomic16_t message_producer_count;
+	/* Mempool of timer objects */
+	struct rte_mempool *tim_pool;
+	/* Back pointer for convenience */
+	struct rte_event_timer_adapter *adapter;
+	/* Identifier of timer data instance */
+	uint32_t timer_data_id;
+	/* Track which cores have actually armed a timer */
+	rte_atomic16_t in_use[RTE_MAX_LCORE];
+	/* Track which cores' timer lists should be polled */
+	unsigned int poll_lcores[RTE_MAX_LCORE];
+	/* The number of lists that should be polled */
+	int n_poll_lcores;
+	/* Lock to atomically access the above two variables */
+	rte_spinlock_t poll_lcores_sl;
 };
 
-enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
-
-struct msg {
-	enum msg_type type;
-	struct rte_event_timer *evtim;
-	struct rte_timer tim;
-	TAILQ_ENTRY(msg) msgs;
-};
+static inline struct swtim *
+swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)
+{
+	return adapter->data->adapter_priv;
+}
 
 static void
-sw_event_timer_cb(struct rte_timer *tim, void *arg)
+swtim_callback(void *arg)
 {
-	int ret;
+	struct rte_timer *tim = arg;
+	struct rte_event_timer *evtim = tim->arg;
+	struct rte_event_timer_adapter *adapter;
+	struct swtim *sw;
 	uint16_t nb_evs_flushed = 0;
 	uint16_t nb_evs_invalid = 0;
 	uint64_t opaque;
-	struct rte_event_timer *evtim;
-	struct rte_event_timer_adapter *adapter;
-	struct rte_event_timer_adapter_sw_data *sw_data;
+	int ret;
 
-	evtim = arg;
 	opaque = evtim->impl_opaque[1];
 	adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;
-	sw_data = adapter->data->adapter_priv;
+	sw = swtim_pmd_priv(adapter);
 
-	ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+	ret = event_buffer_add(&sw->buffer, &evtim->ev);
 	if (ret < 0) {
 		/* If event buffer is full, put timer back in list with
 		 * immediate expiry value, so that we process it again on the
 		 * next iteration.
 		 */
-		rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),
-				     sw_event_timer_cb, evtim);
+		rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,
+				    rte_lcore_id(), NULL, evtim);
+
+		sw->stats.evtim_retry_count++;
 
-		sw_data->stats.evtim_retry_count++;
 		EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
 			      "immediate expiry value");
 	} else {
-		struct msg *m = container_of(tim, struct msg, tim);
-		TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
 		EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
-		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+		rte_mempool_put(sw->tim_pool, tim);
+		sw->stats.evtim_exp_count++;
 
-		/* Free the msg object containing the rte_timer now that
-		 * we've buffered its event successfully.
-		 */
-		rte_mempool_put(sw_data->msg_pool, m);
-
-		/* Bump the count when we successfully add an expiry event to
-		 * the buffer.
-		 */
-		sw_data->stats.evtim_exp_count++;
+		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
 	}
 
-	if (event_buffer_batch_ready(&sw_data->buffer)) {
-		event_buffer_flush(&sw_data->buffer,
+	if (event_buffer_batch_ready(&sw->buffer)) {
+		event_buffer_flush(&sw->buffer,
 				   adapter->data->event_dev_id,
 				   adapter->data->event_port_id,
 				   &nb_evs_flushed,
 				   &nb_evs_invalid);
 
-		sw_data->stats.ev_enq_count += nb_evs_flushed;
-		sw_data->stats.ev_inv_count += nb_evs_invalid;
+		sw->stats.ev_enq_count += nb_evs_flushed;
+		sw->stats.ev_inv_count += nb_evs_invalid;
 	}
 }
 
 static __rte_always_inline uint64_t
 get_timeout_cycles(struct rte_event_timer *evtim,
-		   struct rte_event_timer_adapter *adapter)
+		   const struct rte_event_timer_adapter *adapter)
 {
-	uint64_t timeout_ns;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
-	timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;
 	return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
-
 }
 
 /* This function returns true if one or more (adapter) ticks have occurred since
  * the last time it was called.
  */
 static inline bool
-adapter_did_tick(struct rte_event_timer_adapter *adapter)
+swtim_did_tick(struct swtim *sw)
 {
 	uint64_t cycles_per_adapter_tick, start_cycles;
 	uint64_t *next_tick_cyclesp;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
-	next_tick_cyclesp = &sw_data->next_tick_cycles;
 
-	cycles_per_adapter_tick = sw_data->timer_tick_ns *
+	next_tick_cyclesp = &sw->next_tick_cycles;
+	cycles_per_adapter_tick = sw->timer_tick_ns *
 			(rte_get_timer_hz() / NSECPERSEC);
-
 	start_cycles = rte_get_timer_cycles();
 
 	/* Note: initially, *next_tick_cyclesp == 0, so the clause below will
@@ -640,7 +629,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)
 		 * boundary.
 		 */
 		start_cycles -= start_cycles % cycles_per_adapter_tick;
-
 		*next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
 
 		return true;
@@ -655,15 +643,12 @@ check_timeout(struct rte_event_timer *evtim,
 	      const struct rte_event_timer_adapter *adapter)
 {
 	uint64_t tmo_nsec;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
-	tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	if (tmo_nsec > sw_data->max_tmo_ns)
+	tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;
+	if (tmo_nsec > sw->max_tmo_ns)
 		return -1;
-
-	if (tmo_nsec < sw_data->timer_tick_ns)
+	if (tmo_nsec < sw->timer_tick_ns)
 		return -2;
 
 	return 0;
@@ -691,110 +676,34 @@ check_destination_event_queue(struct rte_event_timer *evtim,
 	return 0;
 }
 
-#define NB_OBJS 32
 static int
-sw_event_timer_adapter_service_func(void *arg)
+swtim_service_func(void *arg)
 {
-	int i, num_msgs;
-	uint64_t cycles, opaque;
+	struct rte_event_timer_adapter *adapter = arg;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 	uint16_t nb_evs_flushed = 0;
 	uint16_t nb_evs_invalid = 0;
-	struct rte_event_timer_adapter *adapter;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct rte_event_timer *evtim = NULL;
-	struct rte_timer *tim = NULL;
-	struct msg *msg, *msgs[NB_OBJS];
-
-	adapter = arg;
-	sw_data = adapter->data->adapter_priv;
-
-	sw_data->service_phase = 1;
-	rte_smp_wmb();
-
-	while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
-	       !rte_ring_empty(sw_data->msg_ring)) {
-
-		num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
-						  (void **)msgs, NB_OBJS, NULL);
-
-		for (i = 0; i < num_msgs; i++) {
-			int ret = 0;
-
-			RTE_SET_USED(ret);
-
-			msg = msgs[i];
-			evtim = msg->evtim;
-
-			switch (msg->type) {
-			case MSG_TYPE_ARM:
-				EVTIM_SVC_LOG_DBG("dequeued ARM message from "
-						  "ring");
-				tim = &msg->tim;
-				rte_timer_init(tim);
-				cycles = get_timeout_cycles(evtim,
-							    adapter);
-				ret = rte_timer_reset(tim, cycles, SINGLE,
-						      rte_lcore_id(),
-						      sw_event_timer_cb,
-						      evtim);
-				RTE_ASSERT(ret == 0);
-
-				evtim->impl_opaque[0] = (uintptr_t)tim;
-				evtim->impl_opaque[1] = (uintptr_t)adapter;
-
-				TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
-						  msg,
-						  msgs);
-				break;
-			case MSG_TYPE_CANCEL:
-				EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
-						  "from ring");
-				opaque = evtim->impl_opaque[0];
-				tim = (struct rte_timer *)(uintptr_t)opaque;
-				RTE_ASSERT(tim != NULL);
-
-				ret = rte_timer_stop(tim);
-				RTE_ASSERT(ret == 0);
-
-				/* Free the msg object for the original arm
-				 * request.
-				 */
-				struct msg *m;
-				m = container_of(tim, struct msg, tim);
-				TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
-					     msgs);
-				rte_mempool_put(sw_data->msg_pool, m);
-
-				/* Free the msg object for the current msg */
-				rte_mempool_put(sw_data->msg_pool, msg);
-
-				evtim->impl_opaque[0] = 0;
-				evtim->impl_opaque[1] = 0;
-
-				break;
-			}
-		}
-	}
-
-	sw_data->service_phase = 2;
-	rte_smp_wmb();
 
-	if (adapter_did_tick(adapter)) {
-		rte_timer_manage();
+	if (swtim_did_tick(sw)) {
+		/* This lock is seldom acquired on the arm side */
+		rte_spinlock_lock(&sw->poll_lcores_sl);
+		rte_timer_alt_manage(sw->timer_data_id,
+				     sw->poll_lcores,
+				     sw->n_poll_lcores,
+				     swtim_callback);
+		rte_spinlock_unlock(&sw->poll_lcores_sl);
 
-		event_buffer_flush(&sw_data->buffer,
+		event_buffer_flush(&sw->buffer,
 				   adapter->data->event_dev_id,
 				   adapter->data->event_port_id,
-				   &nb_evs_flushed, &nb_evs_invalid);
+				   &nb_evs_flushed,
+				   &nb_evs_invalid);
 
-		sw_data->stats.ev_enq_count += nb_evs_flushed;
-		sw_data->stats.ev_inv_count += nb_evs_invalid;
-		sw_data->stats.adapter_tick_count++;
+		sw->stats.ev_enq_count += nb_evs_flushed;
+		sw->stats.ev_inv_count += nb_evs_invalid;
+		sw->stats.adapter_tick_count++;
 	}
 
-	sw_data->service_phase = 0;
-	rte_smp_wmb();
-
 	return 0;
 }
 
@@ -828,168 +737,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
 	return cache_size;
 }
 
-#define SW_MIN_INTERVAL 1E5
-
 static int
-sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
+swtim_init(struct rte_event_timer_adapter *adapter)
 {
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	uint64_t nb_timers;
+	int i, ret;
+	struct swtim *sw;
 	unsigned int flags;
 	struct rte_service_spec service;
-	static bool timer_subsystem_inited; // static initialized to false
 
-	/* Allocate storage for SW implementation data */
-	char priv_data_name[RTE_RING_NAMESIZE];
-	snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
-		 adapter->data->id);
-	adapter->data->adapter_priv = rte_zmalloc_socket(
-				priv_data_name,
-				sizeof(struct rte_event_timer_adapter_sw_data),
-				RTE_CACHE_LINE_SIZE,
-				adapter->data->socket_id);
-	if (adapter->data->adapter_priv == NULL) {
+	/* Allocate storage for private data area */
+#define SWTIM_NAMESIZE 32
+	char swtim_name[SWTIM_NAMESIZE];
+	snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8,
+			adapter->data->id);
+	sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,
+			adapter->data->socket_id);
+	if (sw == NULL) {
 		EVTIM_LOG_ERR("failed to allocate space for private data");
 		rte_errno = ENOMEM;
 		return -1;
 	}
 
-	if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {
-		EVTIM_LOG_ERR("failed to create adapter with requested tick "
-			      "interval");
-		rte_errno = EINVAL;
-		return -1;
-	}
-
-	sw_data = adapter->data->adapter_priv;
-
-	sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;
-	sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;
+	/* Connect storage to adapter instance */
+	adapter->data->adapter_priv = sw;
+	sw->adapter = adapter;
 
-	TAILQ_INIT(&sw_data->msgs_tailq_head);
-	rte_spinlock_init(&sw_data->msgs_tailq_sl);
-	rte_atomic16_init(&sw_data->message_producer_count);
-
-	/* Rings require power of 2, so round up to next such value */
-	nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
-
-	char msg_ring_name[RTE_RING_NAMESIZE];
-	snprintf(msg_ring_name, RTE_RING_NAMESIZE,
-		 "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
-	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
-		RING_F_SP_ENQ | RING_F_SC_DEQ :
-		RING_F_SC_DEQ;
-	sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
-					    adapter->data->socket_id, flags);
-	if (sw_data->msg_ring == NULL) {
-		EVTIM_LOG_ERR("failed to create message ring");
-		rte_errno = ENOMEM;
-		goto free_priv_data;
-	}
+	sw->timer_tick_ns = adapter->data->conf.timer_tick_ns;
+	sw->max_tmo_ns = adapter->data->conf.max_tmo_ns;
 
-	char pool_name[RTE_RING_NAMESIZE];
-	snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
+	/* Create a timer pool */
+	char pool_name[SWTIM_NAMESIZE];
+	snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8,
 		 adapter->data->id);
-
-	/* Both the arming/canceling thread and the service thread will do puts
-	 * to the mempool, but if the SP_PUT flag is enabled, we can specify
-	 * single-consumer get for the mempool.
-	 */
-	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
-		MEMPOOL_F_SC_GET : 0;
-
-	/* The usable size of a ring is count - 1, so subtract one here to
-	 * make the counts agree.
-	 */
+	/* Optimal mempool size is a power of 2 minus one */
+	uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
 	int pool_size = nb_timers - 1;
 	int cache_size = compute_msg_mempool_cache_size(
 				adapter->data->conf.nb_timers, nb_timers);
-	sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
-					       sizeof(struct msg), cache_size,
-					       0, NULL, NULL, NULL, NULL,
-					       adapter->data->socket_id, flags);
-	if (sw_data->msg_pool == NULL) {
-		EVTIM_LOG_ERR("failed to create message object mempool");
+	flags = 0; /* pool is multi-producer, multi-consumer */
+	sw->tim_pool = rte_mempool_create(pool_name, pool_size,
+			sizeof(struct rte_timer), cache_size, 0, NULL, NULL,
+			NULL, NULL, adapter->data->socket_id, flags);
+	if (sw->tim_pool == NULL) {
+		EVTIM_LOG_ERR("failed to create timer object mempool");
 		rte_errno = ENOMEM;
-		goto free_msg_ring;
+		goto free_alloc;
+	}
+
+	/* Initialize the variables that track in-use timer lists */
+	rte_spinlock_init(&sw->poll_lcores_sl);
+	for (i = 0; i < RTE_MAX_LCORE; i++)
+		rte_atomic16_init(&sw->in_use[i]);
+
+	/* Initialize the timer subsystem and allocate timer data instance */
+	ret = rte_timer_subsystem_init();
+	if (ret < 0) {
+		if (ret != -EALREADY) {
+			EVTIM_LOG_ERR("failed to initialize timer subsystem");
+			rte_errno = ret;
+			goto free_mempool;
+		}
+	}
+
+	ret = rte_timer_data_alloc(&sw->timer_data_id);
+	if (ret < 0) {
+		EVTIM_LOG_ERR("failed to allocate timer data instance");
+		rte_errno = ret;
+		goto free_mempool;
 	}
 
-	event_buffer_init(&sw_data->buffer);
+	/* Initialize timer event buffer */
+	event_buffer_init(&sw->buffer);
+
+	sw->adapter = adapter;
 
 	/* Register a service component to run adapter logic */
 	memset(&service, 0, sizeof(service));
 	snprintf(service.name, RTE_SERVICE_NAME_MAX,
-		 "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
+		 "swtim_svc_%"PRIu8, adapter->data->id);
 	service.socket_id = adapter->data->socket_id;
-	service.callback = sw_event_timer_adapter_service_func;
+	service.callback = swtim_service_func;
 	service.callback_userdata = adapter;
 	service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
-	ret = rte_service_component_register(&service, &sw_data->service_id);
+	ret = rte_service_component_register(&service, &sw->service_id);
 	if (ret < 0) {
 		EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
-			      ": err = %d", service.name, sw_data->service_id,
+			      ": err = %d", service.name, sw->service_id,
 			      ret);
 
 		rte_errno = ENOSPC;
-		goto free_msg_pool;
+		goto free_mempool;
 	}
 
 	EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
-		      sw_data->service_id);
+		      sw->service_id);
 
-	adapter->data->service_id = sw_data->service_id;
+	adapter->data->service_id = sw->service_id;
 	adapter->data->service_inited = 1;
 
-	if (!timer_subsystem_inited) {
-		rte_timer_subsystem_init();
-		timer_subsystem_inited = true;
-	}
-
 	return 0;
-
-free_msg_pool:
-	rte_mempool_free(sw_data->msg_pool);
-free_msg_ring:
-	rte_ring_free(sw_data->msg_ring);
-free_priv_data:
-	rte_free(sw_data);
+free_mempool:
+	rte_mempool_free(sw->tim_pool);
+free_alloc:
+	rte_free(sw);
 	return -1;
 }
 
-static int
-sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
+static void
+swtim_free_tim(struct rte_timer *tim, void *arg)
 {
-	int ret;
-	struct msg *m1, *m2;
-	struct rte_event_timer_adapter_sw_data *sw_data =
-						adapter->data->adapter_priv;
+	struct swtim *sw = arg;
 
-	rte_spinlock_lock(&sw_data->msgs_tailq_sl);
-
-	/* Cancel outstanding rte_timers and free msg objects */
-	m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
-	while (m1 != NULL) {
-		EVTIM_LOG_DBG("freeing outstanding timer");
-		m2 = TAILQ_NEXT(m1, msgs);
-
-		rte_timer_stop_sync(&m1->tim);
-		rte_mempool_put(sw_data->msg_pool, m1);
+	rte_mempool_put(sw->tim_pool, (void *)tim);
+}
 
-		m1 = m2;
-	}
+/* Traverse the list of outstanding timers and put them back in the mempool
+ * before freeing the adapter to avoid leaking the memory.
+ */
+static int
+swtim_uninit(struct rte_event_timer_adapter *adapter)
+{
+	int ret;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
+	/* Free outstanding timers */
+	rte_timer_stop_all(sw->timer_data_id,
+			   sw->poll_lcores,
+			   sw->n_poll_lcores,
+			   swtim_free_tim,
+			   sw);
 
-	ret = rte_service_component_unregister(sw_data->service_id);
+	ret = rte_service_component_unregister(sw->service_id);
 	if (ret < 0) {
 		EVTIM_LOG_ERR("failed to unregister service component");
 		return ret;
 	}
 
-	rte_ring_free(sw_data->msg_ring);
-	rte_mempool_free(sw_data->msg_pool);
-	rte_free(adapter->data->adapter_priv);
+	rte_mempool_free(sw->tim_pool);
+	rte_free(sw);
+	adapter->data->adapter_priv = NULL;
 
 	return 0;
 }
@@ -1010,88 +896,79 @@ get_mapped_count_for_service(uint32_t service_id)
 }
 
 static int
-sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
+swtim_start(const struct rte_event_timer_adapter *adapter)
 {
 	int mapped_count;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
 	/* Mapping the service to more than one service core can introduce
 	 * delays while one thread is waiting to acquire a lock, so only allow
 	 * one core to be mapped to the service.
+	 *
+	 * Note: the service could be modified such that it spreads cores to
+	 * poll over multiple service instances.
 	 */
-	mapped_count = get_mapped_count_for_service(sw_data->service_id);
+	mapped_count = get_mapped_count_for_service(sw->service_id);
 
-	if (mapped_count == 1)
-		return rte_service_component_runstate_set(sw_data->service_id,
-							  1);
+	if (mapped_count != 1)
+		return mapped_count < 1 ? -ENOENT : -ENOTSUP;
 
-	return mapped_count < 1 ? -ENOENT : -ENOTSUP;
+	return rte_service_component_runstate_set(sw->service_id, 1);
 }
 
 static int
-sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
+swtim_stop(const struct rte_event_timer_adapter *adapter)
 {
 	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data =
-						adapter->data->adapter_priv;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	ret = rte_service_component_runstate_set(sw_data->service_id, 0);
+	ret = rte_service_component_runstate_set(sw->service_id, 0);
 	if (ret < 0)
 		return ret;
 
-	/* Wait for the service to complete its final iteration before
-	 * stopping.
-	 */
-	while (sw_data->service_phase != 0)
+	/* Wait for the service to complete its final iteration */
+	while (rte_service_may_be_active(sw->service_id))
 		rte_pause();
 
-	rte_smp_rmb();
-
 	return 0;
 }
 
 static void
-sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
+swtim_get_info(const struct rte_event_timer_adapter *adapter,
 		struct rte_event_timer_adapter_info *adapter_info)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-
-	adapter_info->min_resolution_ns = sw_data->timer_tick_ns;
-	adapter_info->max_tmo_ns = sw_data->max_tmo_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	adapter_info->min_resolution_ns = sw->timer_tick_ns;
+	adapter_info->max_tmo_ns = sw->max_tmo_ns;
 }
 
 static int
-sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
-				 struct rte_event_timer_adapter_stats *stats)
+swtim_stats_get(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer_adapter_stats *stats)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-	*stats = sw_data->stats;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	*stats = sw->stats; /* structure copy */
 	return 0;
 }
 
 static int
-sw_event_timer_adapter_stats_reset(
-				const struct rte_event_timer_adapter *adapter)
+swtim_stats_reset(const struct rte_event_timer_adapter *adapter)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-	memset(&sw_data->stats, 0, sizeof(sw_data->stats));
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	memset(&sw->stats, 0, sizeof(sw->stats));
 	return 0;
 }
 
-static __rte_always_inline uint16_t
-__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
-			  struct rte_event_timer **evtims,
-			  uint16_t nb_evtims)
+static uint16_t
+__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer **evtims,
+		uint16_t nb_evtims)
 {
-	uint16_t i;
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct msg *msgs[nb_evtims];
+	int i, ret;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	uint32_t lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *tims[nb_evtims];
+	uint64_t cycles;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1101,101 +978,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
 	}
 #endif
 
-	sw_data = adapter->data->adapter_priv;
+	/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
+	 * the highest lcore to insert such timers into
+	 */
+	if (lcore_id == LCORE_ID_ANY)
+		lcore_id = RTE_MAX_LCORE - 1;
+
+	/* If this is the first time we're arming an event timer on this lcore,
+	 * mark this lcore as "in use"; this will cause the service
+	 * function to process the timer list that corresponds to this lcore.
+	 */
+	if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id]))) {
+		rte_spinlock_lock(&sw->poll_lcores_sl);
+		EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
+			      lcore_id);
+		sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
+		rte_spinlock_unlock(&sw->poll_lcores_sl);
+	}
 
-	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+	ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
+				   nb_evtims);
 	if (ret < 0) {
 		rte_errno = ENOSPC;
 		return 0;
 	}
 
-	/* Let the service know we're producing messages for it to process */
-	rte_atomic16_inc(&sw_data->message_producer_count);
-
-	/* If the service is managing timers, wait for it to finish */
-	while (sw_data->service_phase == 2)
-		rte_pause();
-
-	rte_smp_rmb();
-
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
 		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EALREADY;
 			break;
 		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-		    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
 			rte_errno = EINVAL;
 			break;
 		}
 
 		ret = check_timeout(evtims[i], adapter);
-		if (ret == -1) {
+		if (unlikely(ret == -1)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
 			rte_errno = EINVAL;
 			break;
-		}
-		if (ret == -2) {
+		} else if (unlikely(ret == -2)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
 			rte_errno = EINVAL;
 			break;
 		}
 
-		if (check_destination_event_queue(evtims[i], adapter) < 0) {
+		if (unlikely(check_destination_event_queue(evtims[i],
+							   adapter) < 0)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
 			rte_errno = EINVAL;
 			break;
 		}
 
-		/* Checks passed, set up a message to enqueue */
-		msgs[i]->type = MSG_TYPE_ARM;
-		msgs[i]->evtim = evtims[i];
+		tim = tims[i];
+		rte_timer_init(tim);
 
-		/* Set the payload pointer if not set. */
-		if (evtims[i]->ev.event_ptr == NULL)
-			evtims[i]->ev.event_ptr = evtims[i];
+		evtims[i]->impl_opaque[0] = (uintptr_t)tim;
+		evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
 
-		/* msg objects that get enqueued successfully will be freed
-		 * either by a future cancel operation or by the timer
-		 * expiration callback.
-		 */
-		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
-			rte_errno = ENOSPC;
+		cycles = get_timeout_cycles(evtims[i], adapter);
+		ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
+					  SINGLE, lcore_id, NULL, evtims[i]);
+		if (ret < 0) {
+			/* tim was in RUNNING or CONFIG state */
+			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
 			break;
 		}
 
-		EVTIM_LOG_DBG("enqueued ARM message to ring");
-
+		rte_smp_wmb();
+		EVTIM_LOG_DBG("armed an event timer");
 		evtims[i]->state = RTE_EVENT_TIMER_ARMED;
 	}
 
-	/* Let the service know we're done producing messages */
-	rte_atomic16_dec(&sw_data->message_producer_count);
-
 	if (i < nb_evtims)
-		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
-				     nb_evtims - i);
+		rte_mempool_put_bulk(sw->tim_pool,
+				     (void **)&tims[i], nb_evtims - i);
 
 	return i;
 }
 
 static uint16_t
-sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
-			 struct rte_event_timer **evtims,
-			 uint16_t nb_evtims)
+swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer **evtims,
+		uint16_t nb_evtims)
 {
-	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+	return __swtim_arm_burst(adapter, evtims, nb_evtims);
 }
 
 static uint16_t
-sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
-			    struct rte_event_timer **evtims,
-			    uint16_t nb_evtims)
+swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+		   struct rte_event_timer **evtims,
+		   uint16_t nb_evtims)
 {
-	uint16_t i;
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct msg *msgs[nb_evtims];
+	int i, ret;
+	struct rte_timer *timp;
+	uint64_t opaque;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1205,23 +1085,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 	}
 #endif
 
-	sw_data = adapter->data->adapter_priv;
-
-	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
-	if (ret < 0) {
-		rte_errno = ENOSPC;
-		return 0;
-	}
-
-	/* Let the service know we're producing messages for it to process */
-	rte_atomic16_inc(&sw_data->message_producer_count);
-
-	/* If the service could be modifying event timer states, wait */
-	while (sw_data->service_phase == 2)
-		rte_pause();
-
-	rte_smp_rmb();
-
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
 		if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
@@ -1232,54 +1095,54 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 			break;
 		}
 
-		msgs[i]->type = MSG_TYPE_CANCEL;
-		msgs[i]->evtim = evtims[i];
+		opaque = evtims[i]->impl_opaque[0];
+		timp = (struct rte_timer *)(uintptr_t)opaque;
+		RTE_ASSERT(timp != NULL);
 
-		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
-			rte_errno = ENOSPC;
+		ret = rte_timer_alt_stop(sw->timer_data_id, timp);
+		if (ret < 0) {
+			/* Timer is running or being configured */
+			rte_errno = EAGAIN;
 			break;
 		}
 
-		EVTIM_LOG_DBG("enqueued CANCEL message to ring");
+		rte_mempool_put(sw->tim_pool, (void **)timp);
 
 		evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-	}
+		evtims[i]->impl_opaque[0] = 0;
+		evtims[i]->impl_opaque[1] = 0;
 
-	/* Let the service know we're done producing messages */
-	rte_atomic16_dec(&sw_data->message_producer_count);
-
-	if (i < nb_evtims)
-		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
-				     nb_evtims - i);
+		rte_smp_wmb();
+	}
 
 	return i;
 }
 
 static uint16_t
-sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
-				  struct rte_event_timer **evtims,
-				  uint64_t timeout_ticks,
-				  uint16_t nb_evtims)
+swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
+			 struct rte_event_timer **evtims,
+			 uint64_t timeout_ticks,
+			 uint16_t nb_evtims)
 {
 	int i;
 
 	for (i = 0; i < nb_evtims; i++)
 		evtims[i]->timeout_ticks = timeout_ticks;
 
-	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+	return __swtim_arm_burst(adapter, evtims, nb_evtims);
 }
 
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
-	.init = sw_event_timer_adapter_init,
-	.uninit = sw_event_timer_adapter_uninit,
-	.start = sw_event_timer_adapter_start,
-	.stop = sw_event_timer_adapter_stop,
-	.get_info = sw_event_timer_adapter_get_info,
-	.stats_get = sw_event_timer_adapter_stats_get,
-	.stats_reset = sw_event_timer_adapter_stats_reset,
-	.arm_burst = sw_event_timer_arm_burst,
-	.arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
-	.cancel_burst = sw_event_timer_cancel_burst,
+static const struct rte_event_timer_adapter_ops swtim_ops = {
+	.init			= swtim_init,
+	.uninit			= swtim_uninit,
+	.start			= swtim_start,
+	.stop			= swtim_stop,
+	.get_info		= swtim_get_info,
+	.stats_get		= swtim_stats_get,
+	.stats_reset		= swtim_stats_reset,
+	.arm_burst		= swtim_arm_burst,
+	.arm_tmo_tick_burst	= swtim_arm_tmo_tick_burst,
+	.cancel_burst		= swtim_cancel_burst,
 };
 
 RTE_INIT(event_timer_adapter_init_log)
-- 
2.6.4



More information about the dev mailing list