[dpdk-dev] [PATCH 1/3] timer: add per-installer pending lists for each lcore

Gabriel Carrillo erik.g.carrillo at intel.com
Wed Aug 23 16:47:22 CEST 2017


Instead of each priv_timer struct containing a single skiplist, this
commit adds a skiplist for each enabled lcore to priv_timer. In the case
that multiple lcores repeatedly install timers on the same target lcore,
this change reduces lock contention for the target lcore's skiplists and
increases performance.

Signed-off-by: Gabriel Carrillo <erik.g.carrillo at intel.com>
---
 lib/librte_timer/rte_timer.c | 309 +++++++++++++++++++++++++++----------------
 lib/librte_timer/rte_timer.h |   9 +-
 2 files changed, 202 insertions(+), 116 deletions(-)

diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 5ee0840..2db74c1 100644
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -37,6 +37,7 @@
 #include <inttypes.h>
 #include <assert.h>
 #include <sys/queue.h>
+#include <stdbool.h>
 
 #include <rte_atomic.h>
 #include <rte_common.h>
@@ -56,17 +57,19 @@
 
 LIST_HEAD(rte_timer_list, rte_timer);
 
+struct skiplist {
+	struct rte_timer head;  /**< dummy timer instance to head up list */
+	rte_spinlock_t lock;	/**< lock to protect list access */
+	unsigned depth;		/**< track the current depth of the skiplist */
+} __rte_cache_aligned;
+
 struct priv_timer {
-	struct rte_timer pending_head;  /**< dummy timer instance to head up list */
-	rte_spinlock_t list_lock;       /**< lock to protect list access */
+	/** one pending list per enabled lcore */
+	struct skiplist pending_lists[RTE_MAX_LCORE];
 
 	/** per-core variable that true if a timer was updated on this
 	 *  core since last reset of the variable */
 	int updated;
-
-	/** track the current depth of the skiplist */
-	unsigned curr_skiplist_depth;
-
 	unsigned prev_lcore;              /**< used for lcore round robin */
 
 	/** running timer on this lcore now */
@@ -81,6 +84,10 @@ struct priv_timer {
 /** per-lcore private info for timers */
 static struct priv_timer priv_timer[RTE_MAX_LCORE];
 
+/** cache of IDs of enabled lcores */
+static unsigned enabled_lcores[RTE_MAX_LCORE];
+static int n_enabled_lcores;
+
 /* when debug is enabled, store some statistics */
 #ifdef RTE_LIBRTE_TIMER_DEBUG
 #define __TIMER_STAT_ADD(name, n) do {					\
@@ -96,14 +103,25 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE];
 void
 rte_timer_subsystem_init(void)
 {
-	unsigned lcore_id;
+	unsigned lcore_id1, lcore_id2;
+	struct skiplist *list;
+	int i, j;
+
+	RTE_LCORE_FOREACH(lcore_id1)
+		enabled_lcores[n_enabled_lcores++] = lcore_id1;
 
 	/* since priv_timer is static, it's zeroed by default, so only init some
 	 * fields.
 	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) {
-		rte_spinlock_init(&priv_timer[lcore_id].list_lock);
-		priv_timer[lcore_id].prev_lcore = lcore_id;
+	for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores;
+	     lcore_id1 = enabled_lcores[++i]) {
+		priv_timer[lcore_id1].prev_lcore = lcore_id1;
+
+		for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores;
+		     lcore_id2 = enabled_lcores[++j]) {
+			list = &priv_timer[lcore_id1].pending_lists[lcore_id2];
+			rte_spinlock_init(&list->lock);
+		}
 	}
 }
 
@@ -114,7 +132,8 @@ rte_timer_init(struct rte_timer *tim)
 	union rte_timer_status status;
 
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 }
 
@@ -142,7 +161,7 @@ timer_set_config_state(struct rte_timer *tim,
 		 * or ready to run on local core, exit
 		 */
 		if (prev_status.state == RTE_TIMER_RUNNING &&
-		    (prev_status.owner != (uint16_t)lcore_id ||
+		    (prev_status.owner != (int)lcore_id ||
 		     tim != priv_timer[lcore_id].running_tim))
 			return -1;
 
@@ -153,7 +172,8 @@ timer_set_config_state(struct rte_timer *tim,
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_CONFIG;
-		status.owner = (int16_t)lcore_id;
+		status.installer = RTE_TIMER_NO_LCORE;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -185,7 +205,8 @@ timer_set_running_state(struct rte_timer *tim)
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_RUNNING;
-		status.owner = (int16_t)lcore_id;
+		status.installer = prev_status.installer;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -236,11 +257,11 @@ timer_get_skiplist_level(unsigned curr_depth)
  * are <= that time value.
  */
 static void
-timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
+timer_get_prev_entries(uint64_t time_val, struct skiplist *list,
 		struct rte_timer **prev)
 {
-	unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth;
-	prev[lvl] = &priv_timer[tim_lcore].pending_head;
+	unsigned lvl = list->depth;
+	prev[lvl] = &list->head;
 	while(lvl != 0) {
 		lvl--;
 		prev[lvl] = prev[lvl+1];
@@ -255,15 +276,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
  * all skiplist levels.
  */
 static void
-timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
+timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list,
 		struct rte_timer **prev)
 {
 	int i;
 	/* to get a specific entry in the list, look for just lower than the time
 	 * values, and then increment on each level individually if necessary
 	 */
-	timer_get_prev_entries(tim->expire - 1, tim_lcore, prev);
-	for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries(tim->expire - 1, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		while (prev[i]->sl_next[i] != NULL &&
 				prev[i]->sl_next[i] != tim &&
 				prev[i]->sl_next[i]->expire <= tim->expire)
@@ -282,25 +303,25 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 	unsigned lcore_id = rte_lcore_id();
 	unsigned lvl;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id];
 
 	/* if timer needs to be scheduled on another core, we need to
 	 * lock the list; if it is on local core, we need to lock if
 	 * we are not called from rte_timer_manage() */
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* find where exactly this element goes in the list of elements
 	 * for each depth. */
-	timer_get_prev_entries(tim->expire, tim_lcore, prev);
+	timer_get_prev_entries(tim->expire, list, prev);
 
 	/* now assign it a new level and add at that level */
-	const unsigned tim_level = timer_get_skiplist_level(
-			priv_timer[tim_lcore].curr_skiplist_depth);
-	if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth)
-		priv_timer[tim_lcore].curr_skiplist_depth++;
+	const unsigned tim_level = timer_get_skiplist_level(list->depth);
+	if (tim_level == list->depth)
+		list->depth++;
 
 	lvl = tim_level;
-	while (lvl > 0) {
+	while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) {
 		tim->sl_next[lvl] = prev[lvl]->sl_next[lvl];
 		prev[lvl]->sl_next[lvl] = tim;
 		lvl--;
@@ -310,11 +331,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 
 	/* save the lowest list entry into the expire field of the dummy hdr
 	 * NOTE: this is not atomic on 32-bit*/
-	priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\
-			pending_head.sl_next[0]->expire;
+	list->head.expire = list->head.sl_next[0]->expire;
 
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /*
@@ -330,35 +350,38 @@ timer_del(struct rte_timer *tim, union rte_timer_status prev_status,
 	unsigned prev_owner = prev_status.owner;
 	int i;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list;
+
+	list = &priv_timer[prev_owner].pending_lists[prev_status.installer];
 
 	/* if timer needs is pending another core, we need to lock the
 	 * list; if it is on local core, we need to lock if we are not
 	 * called from rte_timer_manage() */
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* save the lowest list entry into the expire field of the dummy hdr.
 	 * NOTE: this is not atomic on 32-bit */
-	if (tim == priv_timer[prev_owner].pending_head.sl_next[0])
-		priv_timer[prev_owner].pending_head.expire =
-				((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire);
+	if (tim == list->head.sl_next[0])
+		list->head.expire = ((tim->sl_next[0] == NULL) ?
+				     0 : tim->sl_next[0]->expire);
 
 	/* adjust pointers from previous entries to point past this */
-	timer_get_prev_entries_for_node(tim, prev_owner, prev);
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries_for_node(tim, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		if (prev[i]->sl_next[i] == tim)
 			prev[i]->sl_next[i] = tim->sl_next[i];
 	}
 
 	/* in case we deleted last entry at a level, adjust down max level */
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--)
-		if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL)
-			priv_timer[prev_owner].curr_skiplist_depth --;
+	for (i = list->depth - 1; i >= 0; i--)
+		if (list->head.sl_next[i] == NULL)
+			list->depth--;
 		else
 			break;
 
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /* Reset and start the timer associated with the timer handle (private func) */
@@ -416,7 +439,8 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	 * the state so we don't need to use cmpset() here */
 	rte_wmb();
 	status.state = RTE_TIMER_PENDING;
-	status.owner = (int16_t)tim_lcore;
+	status.installer = lcore_id;
+	status.owner = tim_lcore;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -484,7 +508,8 @@ rte_timer_stop(struct rte_timer *tim)
 	/* mark timer as stopped */
 	rte_wmb();
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.installer = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -506,119 +531,179 @@ rte_timer_pending(struct rte_timer *tim)
 }
 
 /* must be called periodically, run all timer that expired */
-void rte_timer_manage(void)
+void
+rte_timer_manage(void)
 {
 	union rte_timer_status status;
-	struct rte_timer *tim, *next_tim;
-	struct rte_timer *run_first_tim, **pprev;
-	unsigned lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *next_tim, **pprev;
+	struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1];
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1];
-	uint64_t cur_time;
-	int i, ret;
+	struct priv_timer *priv_tim;
+	unsigned installer_lcore, lcore_id = rte_lcore_id();
+	uint64_t cur_time, min_expire;
+	int i, j, min_idx, ret;
+	int nrunlists = 0;
+	int local_is_locked = 0;
+	struct skiplist *dest_list, *list = NULL;
+	bool done;
 
 	/* timer manager only runs on EAL thread with valid lcore_id */
 	assert(lcore_id < RTE_MAX_LCORE);
 
+	priv_tim = &priv_timer[lcore_id];
+
 	__TIMER_STAT_ADD(manage, 1);
-	/* optimize for the case where per-cpu list is empty */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
-		return;
-	cur_time = rte_get_timer_cycles();
+	for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores;
+	     installer_lcore = enabled_lcores[++i]) {
+		list = &priv_tim->pending_lists[installer_lcore];
+
+		/* optimize for the case where list is empty */
+		if (list->head.sl_next[0] == NULL)
+			continue;
+		cur_time = rte_get_timer_cycles();
 
 #ifdef RTE_ARCH_X86_64
-	/* on 64-bit the value cached in the pending_head.expired will be
-	 * updated atomically, so we can consult that for a quick check here
-	 * outside the lock */
-	if (likely(priv_timer[lcore_id].pending_head.expire > cur_time))
-		return;
+		/* on 64-bit the value cached in the list->head.expired will be
+		 * updated atomically, so we can consult that for a quick check
+		 * here outside the lock
+		 */
+		if (likely(list->head.expire > cur_time))
+			continue;
 #endif
 
-	/* browse ordered list, add expired timers in 'expired' list */
-	rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+		/* browse ordered list, add expired timers in 'expired' list */
+		rte_spinlock_lock(&list->lock);
 
-	/* if nothing to do just unlock and return */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL ||
-	    priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) {
-		rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
-		return;
-	}
+		/* if nothing to do just unlock and continue */
+		if (list->head.sl_next[0] == NULL ||
+		    list->head.sl_next[0]->expire > cur_time) {
+			rte_spinlock_unlock(&list->lock);
+			continue;
+		}
 
-	/* save start of list of expired timers */
-	tim = priv_timer[lcore_id].pending_head.sl_next[0];
+		/* save start of list of expired timers */
+		tim = list->head.sl_next[0];
+
+		/* break the existing list at current time point */
+		timer_get_prev_entries(cur_time, list, prev);
+		for (j = list->depth - 1; j >= 0; j--) {
+			if (prev[j] == &list->head)
+				continue;
+			list->head.sl_next[j] =
+			    prev[j]->sl_next[j];
+			if (prev[j]->sl_next[j] == NULL)
+				list->depth--;
+			prev[j]->sl_next[j] = NULL;
+		}
 
-	/* break the existing list at current time point */
-	timer_get_prev_entries(cur_time, lcore_id, prev);
-	for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) {
-		if (prev[i] == &priv_timer[lcore_id].pending_head)
-			continue;
-		priv_timer[lcore_id].pending_head.sl_next[i] =
-		    prev[i]->sl_next[i];
-		if (prev[i]->sl_next[i] == NULL)
-			priv_timer[lcore_id].curr_skiplist_depth--;
-		prev[i] ->sl_next[i] = NULL;
-	}
+		/* transition run-list from PENDING to RUNNING */
+		run_first_tims[nrunlists] = tim;
+		pprev = &run_first_tims[nrunlists];
+		nrunlists++;
+
+		for (; tim != NULL; tim = next_tim) {
+			next_tim = tim->sl_next[0];
+
+			ret = timer_set_running_state(tim);
+			if (likely(ret == 0)) {
+				pprev = &tim->sl_next[0];
+			} else {
+				/* another core is trying to re-config this one,
+				 * remove it from local expired list
+				 */
+				*pprev = next_tim;
+			}
+		}
 
-	/* transition run-list from PENDING to RUNNING */
-	run_first_tim = tim;
-	pprev = &run_first_tim;
+		/* update the next to expire timer value */
+		list->head.expire = (list->head.sl_next[0] == NULL) ?
+				    0 : list->head.sl_next[0]->expire;
 
-	for ( ; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
+		rte_spinlock_unlock(&list->lock);
+	}
 
-		ret = timer_set_running_state(tim);
-		if (likely(ret == 0)) {
-			pprev = &tim->sl_next[0];
-		} else {
-			/* another core is trying to re-config this one,
-			 * remove it from local expired list
-			 */
-			*pprev = next_tim;
+	/* Now process the run lists */
+	while (1) {
+		done = true;
+		min_expire = UINT64_MAX;
+		min_idx = 0;
+
+		/* Find the next oldest timer to process */
+		for (i = 0; i < nrunlists; i++) {
+			tim = run_first_tims[i];
+
+			if (tim != NULL && tim->expire < min_expire) {
+				min_expire = tim->expire;
+				min_idx = i;
+				done = false;
+			}
 		}
-	}
 
-	/* update the next to expire timer value */
-	priv_timer[lcore_id].pending_head.expire =
-	    (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 :
-		priv_timer[lcore_id].pending_head.sl_next[0]->expire;
+		if (done)
+			break;
+
+		tim = run_first_tims[min_idx];
 
-	rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+		/* Move down the runlist from which we picked a timer to
+		 * execute
+		 */
+		run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0];
 
-	/* now scan expired list and call callbacks */
-	for (tim = run_first_tim; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
-		priv_timer[lcore_id].updated = 0;
-		priv_timer[lcore_id].running_tim = tim;
+		priv_tim->updated = 0;
+		priv_tim->running_tim = tim;
 
 		/* execute callback function with list unlocked */
 		tim->f(tim, tim->arg);
 
 		__TIMER_STAT_ADD(pending, -1);
 		/* the timer was stopped or reloaded by the callback
-		 * function, we have nothing to do here */
-		if (priv_timer[lcore_id].updated == 1)
+		 * function, we have nothing to do here
+		 */
+		if (priv_tim->updated == 1)
 			continue;
 
 		if (tim->period == 0) {
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
-			status.owner = RTE_TIMER_NO_OWNER;
+			status.installer = RTE_TIMER_NO_LCORE;
+			status.owner = RTE_TIMER_NO_LCORE;
 			rte_wmb();
 			tim->status.u32 = status.u32;
 		}
 		else {
-			/* keep it in list and mark timer as pending */
-			rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+			dest_list = &priv_tim->pending_lists[lcore_id];
+
+			/* If the destination list is the current list
+			 * we can acquire the lock here, and hold it
+			 * across removal and insertion of the timer.
+			 */
+			if (list == dest_list) {
+				rte_spinlock_lock(&list->lock);
+				local_is_locked = 1;
+			}
+
+			/* set timer state back to PENDING and
+			 * reinsert it in pending list
+			 */
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(pending, 1);
-			status.owner = (int16_t)lcore_id;
+			status.installer = tim->status.installer;
+			status.owner = lcore_id;
 			rte_wmb();
 			tim->status.u32 = status.u32;
-			__rte_timer_reset(tim, tim->expire + tim->period,
-				tim->period, lcore_id, tim->f, tim->arg, 1);
-			rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+
+			__rte_timer_reset(tim,
+					  tim->expire + tim->period,
+					  tim->period, lcore_id,
+					  tim->f, tim->arg, local_is_locked);
+
+			if (local_is_locked) {
+				rte_spinlock_unlock(&list->lock);
+				local_is_locked = 0;
+			}
 		}
 	}
-	priv_timer[lcore_id].running_tim = NULL;
+	priv_tim->running_tim = NULL;
 }
 
 /* dump statistics about timers */
diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
index a276a73..4cc6baf 100644
--- a/lib/librte_timer/rte_timer.h
+++ b/lib/librte_timer/rte_timer.h
@@ -77,7 +77,7 @@ extern "C" {
 #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
 #define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */
 
-#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */
+#define RTE_TIMER_NO_LCORE -2
 
 /**
  * Timer type: Periodic or single (one-shot).
@@ -94,10 +94,11 @@ enum rte_timer_type {
 union rte_timer_status {
 	RTE_STD_C11
 	struct {
-		uint16_t state;  /**< Stop, pending, running, config. */
-		int16_t owner;   /**< The lcore that owns the timer. */
+		unsigned state	: 2;
+		int installer	: 15;
+		int owner	: 15;
 	};
-	uint32_t u32;            /**< To atomic-set status + owner. */
+	uint32_t u32;            /**< To atomic-set status, installer, owner */
 };
 
 #ifdef RTE_LIBRTE_TIMER_DEBUG
-- 
2.6.4



More information about the dev mailing list