[dpdk-dev] [PATCH v7 1/1] eventdev: add new software event timer adapter

Erik Gabriel Carrillo erik.g.carrillo at intel.com
Wed Jun 19 17:14:30 CEST 2019


This patch introduces a new version of the event timer adapter software
PMD. In the original design, timer event producer lcores in the primary
and secondary processes enqueued event timers into a ring, and a
service core in the primary process dequeued them and processed them
further.  To improve performance, this version does away with the ring
and lets lcores insert timers directly into timer skiplist data
structures; the service core directly accesses the lists as well, when
looking for timers that have expired.

To compare the burst and non-burst performance of the original and new
versions of the software event timer adapter, I ran the following
commands:

$ sudo ./build/app/dpdk-test-eventdev -c 0xFFE -s 0xC --vdev=event_sw0 \
-- --test=perf_queue --plcores=4,5,6 --wlcore=7,8,9 --stlist=p \
--prod_type_timerdev --worker_deq_depth=32

$ sudo ./build/app/dpdk-test-eventdev -c 0xFFE -s 0xC --vdev=event_sw0 \
-- --test=perf_queue --plcores=4,5,6 --wlcore=7,8,9 --stlist=p \
--prod_type_timerdev_burst --worker_deq_depth=32

With the new version, I see a 151% improvement in throughput for the
non-burst case, and a 270% improvement in throughput for the burst case.
I also see a 53% improvement in arm latency in the non-burst case and a
65% improvement in arm latency in the burst case.

Note: To perform the test,  I commented out a check in the original
version that checks the adapter tick interval against a minimum value.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 741 +++++++++++---------------
 1 file changed, 322 insertions(+), 419 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 2f7a760..d525cb3 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -34,7 +34,7 @@ static int evtim_buffer_logtype;
 
 static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
 
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
+static const struct rte_event_timer_adapter_ops swtim_ops;
 
 #define EVTIM_LOG(level, logtype, ...) \
 	rte_log(RTE_LOG_ ## level, logtype, \
@@ -211,7 +211,7 @@ rte_event_timer_adapter_create_ext(
 	 * implementation.
 	 */
 	if (adapter->ops == NULL)
-		adapter->ops = &sw_event_adapter_timer_ops;
+		adapter->ops = &swtim_ops;
 
 	/* Allow driver to do some setup */
 	FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
@@ -340,7 +340,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
 	 * implementation.
 	 */
 	if (adapter->ops == NULL)
-		adapter->ops = &sw_event_adapter_timer_ops;
+		adapter->ops = &swtim_ops;
 
 	/* Set fast-path function pointers */
 	adapter->arm_burst = adapter->ops->arm_burst;
@@ -426,10 +426,11 @@ rte_event_timer_adapter_stats_reset(struct rte_event_timer_adapter *adapter)
 #define EVENT_BUFFER_SZ 4096
 #define EVENT_BUFFER_BATCHSZ 32
 #define EVENT_BUFFER_MASK (EVENT_BUFFER_SZ - 1)
+#define EXP_TIM_BUFFER_SZ 128
 
 struct event_buffer {
-	uint16_t head;
-	uint16_t tail;
+	size_t head;
+	size_t tail;
 	struct rte_event events[EVENT_BUFFER_SZ];
 } __rte_cache_aligned;
 
@@ -455,7 +456,7 @@ event_buffer_init(struct event_buffer *bufp)
 static int
 event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
 {
-	uint16_t head_idx;
+	size_t head_idx;
 	struct rte_event *buf_eventp;
 
 	if (event_buffer_full(bufp))
@@ -477,13 +478,16 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
 		   uint16_t *nb_events_flushed,
 		   uint16_t *nb_events_inv)
 {
-	uint16_t head_idx, tail_idx, n = 0;
 	struct rte_event *events = bufp->events;
+	size_t head_idx, tail_idx;
+	uint16_t n = 0;
 
 	/* Instead of modulus, bitwise AND with mask to get index. */
 	head_idx = bufp->head & EVENT_BUFFER_MASK;
 	tail_idx = bufp->tail & EVENT_BUFFER_MASK;
 
+	RTE_ASSERT(head_idx < EVENT_BUFFER_SZ && tail_idx < EVENT_BUFFER_SZ);
+
 	/* Determine the largest contigous run we can attempt to enqueue to the
 	 * event device.
 	 */
@@ -491,150 +495,166 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
 		n = head_idx - tail_idx;
 	else if (head_idx < tail_idx)
 		n = EVENT_BUFFER_SZ - tail_idx;
+	else if (event_buffer_full(bufp))
+		n = EVENT_BUFFER_SZ - tail_idx;
 	else {
 		*nb_events_flushed = 0;
 		return;
 	}
 
+	n = RTE_MIN(EVENT_BUFFER_BATCHSZ, n);
 	*nb_events_inv = 0;
+
 	*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
 						     &events[tail_idx], n);
-	if (*nb_events_flushed != n && rte_errno == -EINVAL) {
-		EVTIM_LOG_ERR("failed to enqueue invalid event - dropping it");
-		(*nb_events_inv)++;
+	if (*nb_events_flushed != n) {
+		if (rte_errno == -EINVAL) {
+			EVTIM_LOG_ERR("failed to enqueue invalid event - "
+				      "dropping it");
+			(*nb_events_inv)++;
+		} else if (rte_errno == -ENOSPC)
+			rte_pause();
 	}
 
+	if (*nb_events_flushed > 0)
+		EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event "
+				  "device", *nb_events_flushed);
+
 	bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;
 }
 
 /*
  * Software event timer adapter implementation
  */
-
-struct rte_event_timer_adapter_sw_data {
-	/* List of messages for outstanding timers */
-	TAILQ_HEAD(, msg) msgs_tailq_head;
-	/* Lock to guard tailq and armed count */
-	rte_spinlock_t msgs_tailq_sl;
+struct swtim {
 	/* Identifier of service executing timer management logic. */
 	uint32_t service_id;
 	/* The cycle count at which the adapter should next tick */
 	uint64_t next_tick_cycles;
-	/* Incremented as the service moves through phases of an iteration */
-	volatile int service_phase;
 	/* The tick resolution used by adapter instance. May have been
 	 * adjusted from what user requested
 	 */
 	uint64_t timer_tick_ns;
 	/* Maximum timeout in nanoseconds allowed by adapter instance. */
 	uint64_t max_tmo_ns;
-	/* Ring containing messages to arm or cancel event timers */
-	struct rte_ring *msg_ring;
-	/* Mempool containing msg objects */
-	struct rte_mempool *msg_pool;
 	/* Buffered timer expiry events to be enqueued to an event device. */
 	struct event_buffer buffer;
 	/* Statistics */
 	struct rte_event_timer_adapter_stats stats;
-	/* The number of threads currently adding to the message ring */
-	rte_atomic16_t message_producer_count;
+	/* Mempool of timer objects */
+	struct rte_mempool *tim_pool;
+	/* Back pointer for convenience */
+	struct rte_event_timer_adapter *adapter;
+	/* Identifier of timer data instance */
+	uint32_t timer_data_id;
+	/* Track which cores have actually armed a timer */
+	struct {
+		rte_atomic16_t v;
+	} __rte_cache_aligned in_use[RTE_MAX_LCORE];
+	/* Track which cores' timer lists should be polled */
+	unsigned int poll_lcores[RTE_MAX_LCORE];
+	/* The number of lists that should be polled */
+	int n_poll_lcores;
+	/* Lock to atomically access the above two variables */
+	rte_spinlock_t poll_lcores_sl;
+
+	struct rte_timer *expired_timers[EXP_TIM_BUFFER_SZ];
+	size_t expired_timers_idx;
 };
 
-enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
-
-struct msg {
-	enum msg_type type;
-	struct rte_event_timer *evtim;
-	struct rte_timer tim;
-	TAILQ_ENTRY(msg) msgs;
-};
+static inline struct swtim *
+swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)
+{
+	return adapter->data->adapter_priv;
+}
 
 static void
-sw_event_timer_cb(struct rte_timer *tim, void *arg)
+swtim_callback(struct rte_timer *tim)
 {
-	int ret;
+	struct rte_event_timer *evtim = tim->arg;
+	struct rte_event_timer_adapter *adapter;
+	unsigned int lcore = rte_lcore_id();
+	struct swtim *sw;
 	uint16_t nb_evs_flushed = 0;
 	uint16_t nb_evs_invalid = 0;
 	uint64_t opaque;
-	struct rte_event_timer *evtim;
-	struct rte_event_timer_adapter *adapter;
-	struct rte_event_timer_adapter_sw_data *sw_data;
+	int ret;
 
-	evtim = arg;
 	opaque = evtim->impl_opaque[1];
 	adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;
-	sw_data = adapter->data->adapter_priv;
+	sw = swtim_pmd_priv(adapter);
 
-	ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+	ret = event_buffer_add(&sw->buffer, &evtim->ev);
 	if (ret < 0) {
 		/* If event buffer is full, put timer back in list with
 		 * immediate expiry value, so that we process it again on the
 		 * next iteration.
 		 */
-		rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),
-				     sw_event_timer_cb, evtim);
+		ret = rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,
+					  lcore, NULL, evtim);
+		if (ret < 0) {
+			EVTIM_LOG_DBG("event buffer full, failed to reset "
+				      "timer with immediate expiry value");
+		} else {
+			sw->stats.evtim_retry_count++;
+			EVTIM_LOG_DBG("event buffer full, resetting rte_timer "
+				      "with immediate expiry value");
+		}
 
-		sw_data->stats.evtim_retry_count++;
-		EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
-			      "immediate expiry value");
+		if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore].v)))
+			sw->poll_lcores[sw->n_poll_lcores++] = lcore;
 	} else {
-		struct msg *m = container_of(tim, struct msg, tim);
-		TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
 		EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
-		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
 
-		/* Free the msg object containing the rte_timer now that
-		 * we've buffered its event successfully.
+		/* Empty the buffer here, if necessary, to free older expired
+		 * timers only
 		 */
-		rte_mempool_put(sw_data->msg_pool, m);
+		if (unlikely(sw->expired_timers_idx == EXP_TIM_BUFFER_SZ)) {
+			rte_mempool_put_bulk(sw->tim_pool,
+					     (void **)sw->expired_timers,
+					     sw->expired_timers_idx);
+			sw->expired_timers_idx = 0;
+		}
 
-		/* Bump the count when we successfully add an expiry event to
-		 * the buffer.
-		 */
-		sw_data->stats.evtim_exp_count++;
+		sw->expired_timers[sw->expired_timers_idx++] = tim;
+		sw->stats.evtim_exp_count++;
+
+		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
 	}
 
-	if (event_buffer_batch_ready(&sw_data->buffer)) {
-		event_buffer_flush(&sw_data->buffer,
+	if (event_buffer_batch_ready(&sw->buffer)) {
+		event_buffer_flush(&sw->buffer,
 				   adapter->data->event_dev_id,
 				   adapter->data->event_port_id,
 				   &nb_evs_flushed,
 				   &nb_evs_invalid);
 
-		sw_data->stats.ev_enq_count += nb_evs_flushed;
-		sw_data->stats.ev_inv_count += nb_evs_invalid;
+		sw->stats.ev_enq_count += nb_evs_flushed;
+		sw->stats.ev_inv_count += nb_evs_invalid;
 	}
 }
 
 static __rte_always_inline uint64_t
 get_timeout_cycles(struct rte_event_timer *evtim,
-		   struct rte_event_timer_adapter *adapter)
+		   const struct rte_event_timer_adapter *adapter)
 {
-	uint64_t timeout_ns;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
-	timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;
 	return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
-
 }
 
 /* This function returns true if one or more (adapter) ticks have occurred since
  * the last time it was called.
  */
 static inline bool
-adapter_did_tick(struct rte_event_timer_adapter *adapter)
+swtim_did_tick(struct swtim *sw)
 {
 	uint64_t cycles_per_adapter_tick, start_cycles;
 	uint64_t *next_tick_cyclesp;
-	struct rte_event_timer_adapter_sw_data *sw_data;
 
-	sw_data = adapter->data->adapter_priv;
-	next_tick_cyclesp = &sw_data->next_tick_cycles;
-
-	cycles_per_adapter_tick = sw_data->timer_tick_ns *
+	next_tick_cyclesp = &sw->next_tick_cycles;
+	cycles_per_adapter_tick = sw->timer_tick_ns *
 			(rte_get_timer_hz() / NSECPERSEC);
-
 	start_cycles = rte_get_timer_cycles();
 
 	/* Note: initially, *next_tick_cyclesp == 0, so the clause below will
@@ -646,7 +666,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)
 		 * boundary.
 		 */
 		start_cycles -= start_cycles % cycles_per_adapter_tick;
-
 		*next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
 
 		return true;
@@ -661,15 +680,12 @@ check_timeout(struct rte_event_timer *evtim,
 	      const struct rte_event_timer_adapter *adapter)
 {
 	uint64_t tmo_nsec;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
-	tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	if (tmo_nsec > sw_data->max_tmo_ns)
+	tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;
+	if (tmo_nsec > sw->max_tmo_ns)
 		return -1;
-
-	if (tmo_nsec < sw_data->timer_tick_ns)
+	if (tmo_nsec < sw->timer_tick_ns)
 		return -2;
 
 	return 0;
@@ -697,110 +713,41 @@ check_destination_event_queue(struct rte_event_timer *evtim,
 	return 0;
 }
 
-#define NB_OBJS 32
 static int
-sw_event_timer_adapter_service_func(void *arg)
+swtim_service_func(void *arg)
 {
-	int i, num_msgs;
-	uint64_t cycles, opaque;
+	struct rte_event_timer_adapter *adapter = arg;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 	uint16_t nb_evs_flushed = 0;
 	uint16_t nb_evs_invalid = 0;
-	struct rte_event_timer_adapter *adapter;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct rte_event_timer *evtim = NULL;
-	struct rte_timer *tim = NULL;
-	struct msg *msg, *msgs[NB_OBJS];
-
-	adapter = arg;
-	sw_data = adapter->data->adapter_priv;
-
-	sw_data->service_phase = 1;
-	rte_smp_wmb();
-
-	while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
-	       !rte_ring_empty(sw_data->msg_ring)) {
-
-		num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
-						  (void **)msgs, NB_OBJS, NULL);
-
-		for (i = 0; i < num_msgs; i++) {
-			int ret = 0;
-
-			RTE_SET_USED(ret);
-
-			msg = msgs[i];
-			evtim = msg->evtim;
-
-			switch (msg->type) {
-			case MSG_TYPE_ARM:
-				EVTIM_SVC_LOG_DBG("dequeued ARM message from "
-						  "ring");
-				tim = &msg->tim;
-				rte_timer_init(tim);
-				cycles = get_timeout_cycles(evtim,
-							    adapter);
-				ret = rte_timer_reset(tim, cycles, SINGLE,
-						      rte_lcore_id(),
-						      sw_event_timer_cb,
-						      evtim);
-				RTE_ASSERT(ret == 0);
-
-				evtim->impl_opaque[0] = (uintptr_t)tim;
-				evtim->impl_opaque[1] = (uintptr_t)adapter;
-
-				TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
-						  msg,
-						  msgs);
-				break;
-			case MSG_TYPE_CANCEL:
-				EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
-						  "from ring");
-				opaque = evtim->impl_opaque[0];
-				tim = (struct rte_timer *)(uintptr_t)opaque;
-				RTE_ASSERT(tim != NULL);
-
-				ret = rte_timer_stop(tim);
-				RTE_ASSERT(ret == 0);
-
-				/* Free the msg object for the original arm
-				 * request.
-				 */
-				struct msg *m;
-				m = container_of(tim, struct msg, tim);
-				TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
-					     msgs);
-				rte_mempool_put(sw_data->msg_pool, m);
-
-				/* Free the msg object for the current msg */
-				rte_mempool_put(sw_data->msg_pool, msg);
-
-				evtim->impl_opaque[0] = 0;
-				evtim->impl_opaque[1] = 0;
-
-				break;
-			}
-		}
-	}
 
-	sw_data->service_phase = 2;
-	rte_smp_wmb();
+	if (swtim_did_tick(sw)) {
+		/* This lock is seldom acquired on the arm side */
+		rte_spinlock_lock(&sw->poll_lcores_sl);
+
+		rte_timer_alt_manage(sw->timer_data_id,
+				     sw->poll_lcores,
+				     sw->n_poll_lcores,
+				     swtim_callback);
 
-	if (adapter_did_tick(adapter)) {
-		rte_timer_manage();
+		rte_spinlock_unlock(&sw->poll_lcores_sl);
 
-		event_buffer_flush(&sw_data->buffer,
+		/* Return expired timer objects back to mempool */
+		rte_mempool_put_bulk(sw->tim_pool, (void **)sw->expired_timers,
+				     sw->expired_timers_idx);
+		sw->expired_timers_idx = 0;
+
+		event_buffer_flush(&sw->buffer,
 				   adapter->data->event_dev_id,
 				   adapter->data->event_port_id,
-				   &nb_evs_flushed, &nb_evs_invalid);
+				   &nb_evs_flushed,
+				   &nb_evs_invalid);
 
-		sw_data->stats.ev_enq_count += nb_evs_flushed;
-		sw_data->stats.ev_inv_count += nb_evs_invalid;
-		sw_data->stats.adapter_tick_count++;
+		sw->stats.ev_enq_count += nb_evs_flushed;
+		sw->stats.ev_inv_count += nb_evs_invalid;
+		sw->stats.adapter_tick_count++;
 	}
 
-	sw_data->service_phase = 0;
-	rte_smp_wmb();
-
 	return 0;
 }
 
@@ -820,7 +767,7 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
 	int size;
 	int cache_size = 0;
 
-	for (i = 0; ; i++) {
+	for (i = 0;; i++) {
 		size = 1 << i;
 
 		if (RTE_MAX_LCORE * size < (int)(nb_actual - nb_requested) &&
@@ -834,168 +781,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
 	return cache_size;
 }
 
-#define SW_MIN_INTERVAL 1E5
-
 static int
-sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
+swtim_init(struct rte_event_timer_adapter *adapter)
 {
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	uint64_t nb_timers;
+	int i, ret;
+	struct swtim *sw;
 	unsigned int flags;
 	struct rte_service_spec service;
-	static bool timer_subsystem_inited; // static initialized to false
 
-	/* Allocate storage for SW implementation data */
-	char priv_data_name[RTE_RING_NAMESIZE];
-	snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
-		 adapter->data->id);
-	adapter->data->adapter_priv = rte_zmalloc_socket(
-				priv_data_name,
-				sizeof(struct rte_event_timer_adapter_sw_data),
-				RTE_CACHE_LINE_SIZE,
-				adapter->data->socket_id);
-	if (adapter->data->adapter_priv == NULL) {
+	/* Allocate storage for private data area */
+#define SWTIM_NAMESIZE 32
+	char swtim_name[SWTIM_NAMESIZE];
+	snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8,
+			adapter->data->id);
+	sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,
+			adapter->data->socket_id);
+	if (sw == NULL) {
 		EVTIM_LOG_ERR("failed to allocate space for private data");
 		rte_errno = ENOMEM;
 		return -1;
 	}
 
-	if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {
-		EVTIM_LOG_ERR("failed to create adapter with requested tick "
-			      "interval");
-		rte_errno = EINVAL;
-		return -1;
-	}
-
-	sw_data = adapter->data->adapter_priv;
-
-	sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;
-	sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;
-
-	TAILQ_INIT(&sw_data->msgs_tailq_head);
-	rte_spinlock_init(&sw_data->msgs_tailq_sl);
-	rte_atomic16_init(&sw_data->message_producer_count);
+	/* Connect storage to adapter instance */
+	adapter->data->adapter_priv = sw;
+	sw->adapter = adapter;
 
-	/* Rings require power of 2, so round up to next such value */
-	nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
+	sw->timer_tick_ns = adapter->data->conf.timer_tick_ns;
+	sw->max_tmo_ns = adapter->data->conf.max_tmo_ns;
 
-	char msg_ring_name[RTE_RING_NAMESIZE];
-	snprintf(msg_ring_name, RTE_RING_NAMESIZE,
-		 "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
-	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
-		RING_F_SP_ENQ | RING_F_SC_DEQ :
-		RING_F_SC_DEQ;
-	sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
-					    adapter->data->socket_id, flags);
-	if (sw_data->msg_ring == NULL) {
-		EVTIM_LOG_ERR("failed to create message ring");
-		rte_errno = ENOMEM;
-		goto free_priv_data;
-	}
-
-	char pool_name[RTE_RING_NAMESIZE];
-	snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
+	/* Create a timer pool */
+	char pool_name[SWTIM_NAMESIZE];
+	snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8,
 		 adapter->data->id);
-
-	/* Both the arming/canceling thread and the service thread will do puts
-	 * to the mempool, but if the SP_PUT flag is enabled, we can specify
-	 * single-consumer get for the mempool.
-	 */
-	flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
-		MEMPOOL_F_SC_GET : 0;
-
-	/* The usable size of a ring is count - 1, so subtract one here to
-	 * make the counts agree.
-	 */
+	/* Optimal mempool size is a power of 2 minus one */
+	uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
 	int pool_size = nb_timers - 1;
 	int cache_size = compute_msg_mempool_cache_size(
 				adapter->data->conf.nb_timers, nb_timers);
-	sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
-					       sizeof(struct msg), cache_size,
-					       0, NULL, NULL, NULL, NULL,
-					       adapter->data->socket_id, flags);
-	if (sw_data->msg_pool == NULL) {
-		EVTIM_LOG_ERR("failed to create message object mempool");
+	flags = 0; /* pool is multi-producer, multi-consumer */
+	sw->tim_pool = rte_mempool_create(pool_name, pool_size,
+			sizeof(struct rte_timer), cache_size, 0, NULL, NULL,
+			NULL, NULL, adapter->data->socket_id, flags);
+	if (sw->tim_pool == NULL) {
+		EVTIM_LOG_ERR("failed to create timer object mempool");
 		rte_errno = ENOMEM;
-		goto free_msg_ring;
+		goto free_alloc;
 	}
 
-	event_buffer_init(&sw_data->buffer);
+	/* Initialize the variables that track in-use timer lists */
+	rte_spinlock_init(&sw->poll_lcores_sl);
+	for (i = 0; i < RTE_MAX_LCORE; i++)
+		rte_atomic16_init(&sw->in_use[i].v);
+
+	/* Initialize the timer subsystem and allocate timer data instance */
+	ret = rte_timer_subsystem_init();
+	if (ret < 0) {
+		if (ret != -EALREADY) {
+			EVTIM_LOG_ERR("failed to initialize timer subsystem");
+			rte_errno = ret;
+			goto free_mempool;
+		}
+	}
+
+	ret = rte_timer_data_alloc(&sw->timer_data_id);
+	if (ret < 0) {
+		EVTIM_LOG_ERR("failed to allocate timer data instance");
+		rte_errno = ret;
+		goto free_mempool;
+	}
+
+	/* Initialize timer event buffer */
+	event_buffer_init(&sw->buffer);
+
+	sw->adapter = adapter;
 
 	/* Register a service component to run adapter logic */
 	memset(&service, 0, sizeof(service));
 	snprintf(service.name, RTE_SERVICE_NAME_MAX,
-		 "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
+		 "swtim_svc_%"PRIu8, adapter->data->id);
 	service.socket_id = adapter->data->socket_id;
-	service.callback = sw_event_timer_adapter_service_func;
+	service.callback = swtim_service_func;
 	service.callback_userdata = adapter;
 	service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
-	ret = rte_service_component_register(&service, &sw_data->service_id);
+	ret = rte_service_component_register(&service, &sw->service_id);
 	if (ret < 0) {
 		EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
-			      ": err = %d", service.name, sw_data->service_id,
+			      ": err = %d", service.name, sw->service_id,
 			      ret);
 
 		rte_errno = ENOSPC;
-		goto free_msg_pool;
+		goto free_mempool;
 	}
 
 	EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
-		      sw_data->service_id);
+		      sw->service_id);
 
-	adapter->data->service_id = sw_data->service_id;
+	adapter->data->service_id = sw->service_id;
 	adapter->data->service_inited = 1;
 
-	if (!timer_subsystem_inited) {
-		rte_timer_subsystem_init();
-		timer_subsystem_inited = true;
-	}
-
 	return 0;
-
-free_msg_pool:
-	rte_mempool_free(sw_data->msg_pool);
-free_msg_ring:
-	rte_ring_free(sw_data->msg_ring);
-free_priv_data:
-	rte_free(sw_data);
+free_mempool:
+	rte_mempool_free(sw->tim_pool);
+free_alloc:
+	rte_free(sw);
 	return -1;
 }
 
-static int
-sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
+static void
+swtim_free_tim(struct rte_timer *tim, void *arg)
 {
-	int ret;
-	struct msg *m1, *m2;
-	struct rte_event_timer_adapter_sw_data *sw_data =
-						adapter->data->adapter_priv;
-
-	rte_spinlock_lock(&sw_data->msgs_tailq_sl);
+	struct swtim *sw = arg;
 
-	/* Cancel outstanding rte_timers and free msg objects */
-	m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
-	while (m1 != NULL) {
-		EVTIM_LOG_DBG("freeing outstanding timer");
-		m2 = TAILQ_NEXT(m1, msgs);
-
-		rte_timer_stop_sync(&m1->tim);
-		rte_mempool_put(sw_data->msg_pool, m1);
+	rte_mempool_put(sw->tim_pool, tim);
+}
 
-		m1 = m2;
-	}
+/* Traverse the list of outstanding timers and put them back in the mempool
+ * before freeing the adapter to avoid leaking the memory.
+ */
+static int
+swtim_uninit(struct rte_event_timer_adapter *adapter)
+{
+	int ret;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
+	/* Free outstanding timers */
+	rte_timer_stop_all(sw->timer_data_id,
+			   sw->poll_lcores,
+			   sw->n_poll_lcores,
+			   swtim_free_tim,
+			   sw);
 
-	ret = rte_service_component_unregister(sw_data->service_id);
+	ret = rte_service_component_unregister(sw->service_id);
 	if (ret < 0) {
 		EVTIM_LOG_ERR("failed to unregister service component");
 		return ret;
 	}
 
-	rte_ring_free(sw_data->msg_ring);
-	rte_mempool_free(sw_data->msg_pool);
-	rte_free(adapter->data->adapter_priv);
+	rte_mempool_free(sw->tim_pool);
+	rte_free(sw);
+	adapter->data->adapter_priv = NULL;
 
 	return 0;
 }
@@ -1016,88 +940,79 @@ get_mapped_count_for_service(uint32_t service_id)
 }
 
 static int
-sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
+swtim_start(const struct rte_event_timer_adapter *adapter)
 {
 	int mapped_count;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-
-	sw_data = adapter->data->adapter_priv;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
 	/* Mapping the service to more than one service core can introduce
 	 * delays while one thread is waiting to acquire a lock, so only allow
 	 * one core to be mapped to the service.
+	 *
+	 * Note: the service could be modified such that it spreads cores to
+	 * poll over multiple service instances.
 	 */
-	mapped_count = get_mapped_count_for_service(sw_data->service_id);
+	mapped_count = get_mapped_count_for_service(sw->service_id);
 
-	if (mapped_count == 1)
-		return rte_service_component_runstate_set(sw_data->service_id,
-							  1);
+	if (mapped_count != 1)
+		return mapped_count < 1 ? -ENOENT : -ENOTSUP;
 
-	return mapped_count < 1 ? -ENOENT : -ENOTSUP;
+	return rte_service_component_runstate_set(sw->service_id, 1);
 }
 
 static int
-sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
+swtim_stop(const struct rte_event_timer_adapter *adapter)
 {
 	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data =
-						adapter->data->adapter_priv;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
-	ret = rte_service_component_runstate_set(sw_data->service_id, 0);
+	ret = rte_service_component_runstate_set(sw->service_id, 0);
 	if (ret < 0)
 		return ret;
 
-	/* Wait for the service to complete its final iteration before
-	 * stopping.
-	 */
-	while (sw_data->service_phase != 0)
+	/* Wait for the service to complete its final iteration */
+	while (rte_service_may_be_active(sw->service_id))
 		rte_pause();
 
-	rte_smp_rmb();
-
 	return 0;
 }
 
 static void
-sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
+swtim_get_info(const struct rte_event_timer_adapter *adapter,
 		struct rte_event_timer_adapter_info *adapter_info)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-
-	adapter_info->min_resolution_ns = sw_data->timer_tick_ns;
-	adapter_info->max_tmo_ns = sw_data->max_tmo_ns;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	adapter_info->min_resolution_ns = sw->timer_tick_ns;
+	adapter_info->max_tmo_ns = sw->max_tmo_ns;
 }
 
 static int
-sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
-				 struct rte_event_timer_adapter_stats *stats)
+swtim_stats_get(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer_adapter_stats *stats)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-	*stats = sw_data->stats;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	*stats = sw->stats; /* structure copy */
 	return 0;
 }
 
 static int
-sw_event_timer_adapter_stats_reset(
-				const struct rte_event_timer_adapter *adapter)
+swtim_stats_reset(const struct rte_event_timer_adapter *adapter)
 {
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	sw_data = adapter->data->adapter_priv;
-	memset(&sw_data->stats, 0, sizeof(sw_data->stats));
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	memset(&sw->stats, 0, sizeof(sw->stats));
 	return 0;
 }
 
-static __rte_always_inline uint16_t
-__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
-			  struct rte_event_timer **evtims,
-			  uint16_t nb_evtims)
+static uint16_t
+__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer **evtims,
+		uint16_t nb_evtims)
 {
-	uint16_t i;
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct msg *msgs[nb_evtims];
+	int i, ret;
+	struct swtim *sw = swtim_pmd_priv(adapter);
+	uint32_t lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *tims[nb_evtims];
+	uint64_t cycles;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1107,101 +1022,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
 	}
 #endif
 
-	sw_data = adapter->data->adapter_priv;
+	/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
+	 * the highest lcore to insert such timers into
+	 */
+	if (lcore_id == LCORE_ID_ANY)
+		lcore_id = RTE_MAX_LCORE - 1;
+
+	/* If this is the first time we're arming an event timer on this lcore,
+	 * mark this lcore as "in use"; this will cause the service
+	 * function to process the timer list that corresponds to this lcore.
+	 */
+	if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
+		rte_spinlock_lock(&sw->poll_lcores_sl);
+		EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
+			      lcore_id);
+		sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
+		rte_spinlock_unlock(&sw->poll_lcores_sl);
+	}
 
-	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+	ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
+				   nb_evtims);
 	if (ret < 0) {
 		rte_errno = ENOSPC;
 		return 0;
 	}
 
-	/* Let the service know we're producing messages for it to process */
-	rte_atomic16_inc(&sw_data->message_producer_count);
-
-	/* If the service is managing timers, wait for it to finish */
-	while (sw_data->service_phase == 2)
-		rte_pause();
-
-	rte_smp_rmb();
-
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
 		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EALREADY;
 			break;
 		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-		    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
 			rte_errno = EINVAL;
 			break;
 		}
 
 		ret = check_timeout(evtims[i], adapter);
-		if (ret == -1) {
+		if (unlikely(ret == -1)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
 			rte_errno = EINVAL;
 			break;
-		}
-		if (ret == -2) {
+		} else if (unlikely(ret == -2)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
 			rte_errno = EINVAL;
 			break;
 		}
 
-		if (check_destination_event_queue(evtims[i], adapter) < 0) {
+		if (unlikely(check_destination_event_queue(evtims[i],
+							   adapter) < 0)) {
 			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
 			rte_errno = EINVAL;
 			break;
 		}
 
-		/* Checks passed, set up a message to enqueue */
-		msgs[i]->type = MSG_TYPE_ARM;
-		msgs[i]->evtim = evtims[i];
+		tim = tims[i];
+		rte_timer_init(tim);
 
-		/* Set the payload pointer if not set. */
-		if (evtims[i]->ev.event_ptr == NULL)
-			evtims[i]->ev.event_ptr = evtims[i];
+		evtims[i]->impl_opaque[0] = (uintptr_t)tim;
+		evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
 
-		/* msg objects that get enqueued successfully will be freed
-		 * either by a future cancel operation or by the timer
-		 * expiration callback.
-		 */
-		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
-			rte_errno = ENOSPC;
+		cycles = get_timeout_cycles(evtims[i], adapter);
+		ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
+					  SINGLE, lcore_id, NULL, evtims[i]);
+		if (ret < 0) {
+			/* tim was in RUNNING or CONFIG state */
+			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
 			break;
 		}
 
-		EVTIM_LOG_DBG("enqueued ARM message to ring");
-
+		rte_smp_wmb();
+		EVTIM_LOG_DBG("armed an event timer");
 		evtims[i]->state = RTE_EVENT_TIMER_ARMED;
 	}
 
-	/* Let the service know we're done producing messages */
-	rte_atomic16_dec(&sw_data->message_producer_count);
-
 	if (i < nb_evtims)
-		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
-				     nb_evtims - i);
+		rte_mempool_put_bulk(sw->tim_pool,
+				     (void **)&tims[i], nb_evtims - i);
 
 	return i;
 }
 
 static uint16_t
-sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
-			 struct rte_event_timer **evtims,
-			 uint16_t nb_evtims)
+swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+		struct rte_event_timer **evtims,
+		uint16_t nb_evtims)
 {
-	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+	return __swtim_arm_burst(adapter, evtims, nb_evtims);
 }
 
 static uint16_t
-sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
-			    struct rte_event_timer **evtims,
-			    uint16_t nb_evtims)
+swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+		   struct rte_event_timer **evtims,
+		   uint16_t nb_evtims)
 {
-	uint16_t i;
-	int ret;
-	struct rte_event_timer_adapter_sw_data *sw_data;
-	struct msg *msgs[nb_evtims];
+	int i, ret;
+	struct rte_timer *timp;
+	uint64_t opaque;
+	struct swtim *sw = swtim_pmd_priv(adapter);
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1211,23 +1129,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 	}
 #endif
 
-	sw_data = adapter->data->adapter_priv;
-
-	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
-	if (ret < 0) {
-		rte_errno = ENOSPC;
-		return 0;
-	}
-
-	/* Let the service know we're producing messages for it to process */
-	rte_atomic16_inc(&sw_data->message_producer_count);
-
-	/* If the service could be modifying event timer states, wait */
-	while (sw_data->service_phase == 2)
-		rte_pause();
-
-	rte_smp_rmb();
-
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
 		if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
@@ -1238,54 +1139,56 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 			break;
 		}
 
-		msgs[i]->type = MSG_TYPE_CANCEL;
-		msgs[i]->evtim = evtims[i];
+		rte_smp_rmb();
+
+		opaque = evtims[i]->impl_opaque[0];
+		timp = (struct rte_timer *)(uintptr_t)opaque;
+		RTE_ASSERT(timp != NULL);
 
-		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
-			rte_errno = ENOSPC;
+		ret = rte_timer_alt_stop(sw->timer_data_id, timp);
+		if (ret < 0) {
+			/* Timer is running or being configured */
+			rte_errno = EAGAIN;
 			break;
 		}
 
-		EVTIM_LOG_DBG("enqueued CANCEL message to ring");
+		rte_mempool_put(sw->tim_pool, (void **)timp);
 
 		evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-	}
+		evtims[i]->impl_opaque[0] = 0;
+		evtims[i]->impl_opaque[1] = 0;
 
-	/* Let the service know we're done producing messages */
-	rte_atomic16_dec(&sw_data->message_producer_count);
-
-	if (i < nb_evtims)
-		rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
-				     nb_evtims - i);
+		rte_smp_wmb();
+	}
 
 	return i;
 }
 
 static uint16_t
-sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
-				  struct rte_event_timer **evtims,
-				  uint64_t timeout_ticks,
-				  uint16_t nb_evtims)
+swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
+			 struct rte_event_timer **evtims,
+			 uint64_t timeout_ticks,
+			 uint16_t nb_evtims)
 {
 	int i;
 
 	for (i = 0; i < nb_evtims; i++)
 		evtims[i]->timeout_ticks = timeout_ticks;
 
-	return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+	return __swtim_arm_burst(adapter, evtims, nb_evtims);
 }
 
-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
-	.init = sw_event_timer_adapter_init,
-	.uninit = sw_event_timer_adapter_uninit,
-	.start = sw_event_timer_adapter_start,
-	.stop = sw_event_timer_adapter_stop,
-	.get_info = sw_event_timer_adapter_get_info,
-	.stats_get = sw_event_timer_adapter_stats_get,
-	.stats_reset = sw_event_timer_adapter_stats_reset,
-	.arm_burst = sw_event_timer_arm_burst,
-	.arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
-	.cancel_burst = sw_event_timer_cancel_burst,
+static const struct rte_event_timer_adapter_ops swtim_ops = {
+	.init			= swtim_init,
+	.uninit			= swtim_uninit,
+	.start			= swtim_start,
+	.stop			= swtim_stop,
+	.get_info		= swtim_get_info,
+	.stats_get		= swtim_stats_get,
+	.stats_reset		= swtim_stats_reset,
+	.arm_burst		= swtim_arm_burst,
+	.arm_tmo_tick_burst	= swtim_arm_tmo_tick_burst,
+	.cancel_burst		= swtim_cancel_burst,
 };
 
 RTE_INIT(event_timer_adapter_init_log)
-- 
2.6.4



More information about the dev mailing list