[dpdk-dev] [PATCH v3 1/3] timer: add multiple pending lists option for each lcore

Erik Gabriel Carrillo erik.g.carrillo at intel.com
Thu Sep 14 00:05:06 CEST 2017


This commit adds support for enabling multiple pending timer lists in
each lcore's priv_timer struct with a new function; a single list is still
used by default. In the case that multiple lcores repeatedly install
timers on the same target lcore, this option reduces lock contention for
the target lcore's skiplists and increases performance.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
v3:
* Added ability to enable multiple pending lists for a specified lcore
  with a new function, in response to feedback from Keith Wiles, Konstantin
  Ananyev, and Stephen Hemminger. This functionality is off by default,
  so that new overhead in rte_timer_manage() is minimized if the
  application doesn't want this feature.
* Updated map file to reflect addition of new function.
* Made minor updates to some variable names and logic structure.

v2:
* Address checkpatch warnings

 lib/librte_timer/rte_timer.c           | 376 ++++++++++++++++++++++-----------
 lib/librte_timer/rte_timer.h           |  29 ++-
 lib/librte_timer/rte_timer_version.map |   6 +
 3 files changed, 287 insertions(+), 124 deletions(-)

diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 5ee0840..1f4141e 100644
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -37,6 +37,7 @@
 #include <inttypes.h>
 #include <assert.h>
 #include <sys/queue.h>
+#include <stdbool.h>
 
 #include <rte_atomic.h>
 #include <rte_common.h>
@@ -54,19 +55,24 @@
 
 #include "rte_timer.h"
 
+#define SINGLE_LIST_IDX 0
+
 LIST_HEAD(rte_timer_list, rte_timer);
 
+struct skiplist {
+	struct rte_timer head;  /**< dummy timer instance to head up list */
+	rte_spinlock_t lock;	/**< lock to protect list access */
+	unsigned int depth;	/**< track the current depth of the skiplist */
+} __rte_cache_aligned;
+
 struct priv_timer {
-	struct rte_timer pending_head;  /**< dummy timer instance to head up list */
-	rte_spinlock_t list_lock;       /**< lock to protect list access */
+	/** one pending list per enabled lcore */
+	struct skiplist pending_lists[RTE_MAX_LCORE];
+	bool multi_pendlists;
 
 	/** per-core variable that true if a timer was updated on this
 	 *  core since last reset of the variable */
 	int updated;
-
-	/** track the current depth of the skiplist */
-	unsigned curr_skiplist_depth;
-
 	unsigned prev_lcore;              /**< used for lcore round robin */
 
 	/** running timer on this lcore now */
@@ -81,6 +87,10 @@ struct priv_timer {
 /** per-lcore private info for timers */
 static struct priv_timer priv_timer[RTE_MAX_LCORE];
 
+/** cache of IDs of enabled lcores */
+static unsigned int enabled_lcores[RTE_MAX_LCORE];
+static int n_enabled_lcores;
+
 /* when debug is enabled, store some statistics */
 #ifdef RTE_LIBRTE_TIMER_DEBUG
 #define __TIMER_STAT_ADD(name, n) do {					\
@@ -96,17 +106,60 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE];
 void
 rte_timer_subsystem_init(void)
 {
-	unsigned lcore_id;
+	unsigned int lcore_id, pending_lists_idx;
+	struct skiplist *list;
+	struct priv_timer *priv_tim;
+	int i, j;
+
+	RTE_LCORE_FOREACH(lcore_id)
+		enabled_lcores[n_enabled_lcores++] = lcore_id;
 
 	/* since priv_timer is static, it's zeroed by default, so only init some
 	 * fields.
 	 */
-	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) {
-		rte_spinlock_init(&priv_timer[lcore_id].list_lock);
-		priv_timer[lcore_id].prev_lcore = lcore_id;
+	for (i = 0; i < n_enabled_lcores; i++) {
+		lcore_id = enabled_lcores[i];
+
+
+		priv_tim = &priv_timer[lcore_id];
+		priv_tim->prev_lcore = lcore_id;
+		priv_tim->multi_pendlists = false;
+
+		for (j = 0; j < n_enabled_lcores; j++) {
+			pending_lists_idx = enabled_lcores[j];
+			list = &priv_tim->pending_lists[pending_lists_idx];
+			rte_spinlock_init(&list->lock);
+		}
+
+		/* Init the list at SINGLE_LIST_IDX, in case we didn't get to
+		 * it above. This will be the skiplist we use unless the
+		 * multi-pendlists option is later enabled for this lcore.
+		 */
+		list = &priv_tim->pending_lists[SINGLE_LIST_IDX];
+		rte_spinlock_init(&list->lock);
 	}
 }
 
+int
+rte_timer_subsystem_set_multi_pendlists(unsigned int lcore_id)
+{
+	struct priv_timer *priv_tim = &priv_timer[lcore_id];
+	struct skiplist *list = &priv_tim->pending_lists[SINGLE_LIST_IDX];
+
+	rte_spinlock_lock(&list->lock);
+
+	if (list->head.sl_next[0] != NULL) {
+		/* Timers are already on the default pending list. */
+		rte_spinlock_unlock(&list->lock);
+		return -1;
+	}
+
+	priv_tim->multi_pendlists = true;
+	rte_spinlock_unlock(&list->lock);
+
+	return 0;
+}
+
 /* Initialize the timer handle tim for use */
 void
 rte_timer_init(struct rte_timer *tim)
@@ -114,7 +167,8 @@ rte_timer_init(struct rte_timer *tim)
 	union rte_timer_status status;
 
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.index = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 }
 
@@ -142,7 +196,7 @@ timer_set_config_state(struct rte_timer *tim,
 		 * or ready to run on local core, exit
 		 */
 		if (prev_status.state == RTE_TIMER_RUNNING &&
-		    (prev_status.owner != (uint16_t)lcore_id ||
+		    (prev_status.owner != (int)lcore_id ||
 		     tim != priv_timer[lcore_id].running_tim))
 			return -1;
 
@@ -153,7 +207,8 @@ timer_set_config_state(struct rte_timer *tim,
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_CONFIG;
-		status.owner = (int16_t)lcore_id;
+		status.index = RTE_TIMER_NO_LCORE;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -185,7 +240,8 @@ timer_set_running_state(struct rte_timer *tim)
 		/* here, we know that timer is stopped or pending,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_RUNNING;
-		status.owner = (int16_t)lcore_id;
+		status.index = prev_status.index;
+		status.owner = lcore_id;
 		success = rte_atomic32_cmpset(&tim->status.u32,
 					      prev_status.u32,
 					      status.u32);
@@ -236,11 +292,11 @@ timer_get_skiplist_level(unsigned curr_depth)
  * are <= that time value.
  */
 static void
-timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
+timer_get_prev_entries(uint64_t time_val, struct skiplist *list,
 		struct rte_timer **prev)
 {
-	unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth;
-	prev[lvl] = &priv_timer[tim_lcore].pending_head;
+	unsigned int lvl = list->depth;
+	prev[lvl] = &list->head;
 	while(lvl != 0) {
 		lvl--;
 		prev[lvl] = prev[lvl+1];
@@ -255,15 +311,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore,
  * all skiplist levels.
  */
 static void
-timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
+timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list,
 		struct rte_timer **prev)
 {
 	int i;
 	/* to get a specific entry in the list, look for just lower than the time
 	 * values, and then increment on each level individually if necessary
 	 */
-	timer_get_prev_entries(tim->expire - 1, tim_lcore, prev);
-	for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries(tim->expire - 1, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		while (prev[i]->sl_next[i] != NULL &&
 				prev[i]->sl_next[i] != tim &&
 				prev[i]->sl_next[i]->expire <= tim->expire)
@@ -277,30 +333,39 @@ timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore,
  * timer must not be in a list
  */
 static void
-timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
+timer_add(struct rte_timer *tim, unsigned int tim_lcore, int local_is_locked,
+	  int *pending_lists_idx)
 {
 	unsigned lcore_id = rte_lcore_id();
 	unsigned lvl;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list;
+	struct priv_timer *priv_tim = &priv_timer[tim_lcore];
+
+	if (priv_tim->multi_pendlists)
+		*pending_lists_idx = lcore_id;
+	else
+		*pending_lists_idx = SINGLE_LIST_IDX;
+
+	list = &priv_tim->pending_lists[*pending_lists_idx];
 
 	/* if timer needs to be scheduled on another core, we need to
 	 * lock the list; if it is on local core, we need to lock if
 	 * we are not called from rte_timer_manage() */
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* find where exactly this element goes in the list of elements
 	 * for each depth. */
-	timer_get_prev_entries(tim->expire, tim_lcore, prev);
+	timer_get_prev_entries(tim->expire, list, prev);
 
 	/* now assign it a new level and add at that level */
-	const unsigned tim_level = timer_get_skiplist_level(
-			priv_timer[tim_lcore].curr_skiplist_depth);
-	if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth)
-		priv_timer[tim_lcore].curr_skiplist_depth++;
+	const unsigned int tim_level = timer_get_skiplist_level(list->depth);
+	if (tim_level == list->depth)
+		list->depth++;
 
 	lvl = tim_level;
-	while (lvl > 0) {
+	while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) {
 		tim->sl_next[lvl] = prev[lvl]->sl_next[lvl];
 		prev[lvl]->sl_next[lvl] = tim;
 		lvl--;
@@ -310,11 +375,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, int local_is_locked)
 
 	/* save the lowest list entry into the expire field of the dummy hdr
 	 * NOTE: this is not atomic on 32-bit*/
-	priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\
-			pending_head.sl_next[0]->expire;
+	list->head.expire = list->head.sl_next[0]->expire;
 
 	if (tim_lcore != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /*
@@ -330,35 +394,38 @@ timer_del(struct rte_timer *tim, union rte_timer_status prev_status,
 	unsigned prev_owner = prev_status.owner;
 	int i;
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1];
+	struct skiplist *list;
+
+	list = &priv_timer[prev_owner].pending_lists[prev_status.index];
 
 	/* if timer needs is pending another core, we need to lock the
 	 * list; if it is on local core, we need to lock if we are not
 	 * called from rte_timer_manage() */
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_lock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_lock(&list->lock);
 
 	/* save the lowest list entry into the expire field of the dummy hdr.
 	 * NOTE: this is not atomic on 32-bit */
-	if (tim == priv_timer[prev_owner].pending_head.sl_next[0])
-		priv_timer[prev_owner].pending_head.expire =
-				((tim->sl_next[0] == NULL) ? 0 : tim->sl_next[0]->expire);
+	if (tim == list->head.sl_next[0])
+		list->head.expire = ((tim->sl_next[0] == NULL) ?
+				     0 : tim->sl_next[0]->expire);
 
 	/* adjust pointers from previous entries to point past this */
-	timer_get_prev_entries_for_node(tim, prev_owner, prev);
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) {
+	timer_get_prev_entries_for_node(tim, list, prev);
+	for (i = list->depth - 1; i >= 0; i--) {
 		if (prev[i]->sl_next[i] == tim)
 			prev[i]->sl_next[i] = tim->sl_next[i];
 	}
 
 	/* in case we deleted last entry at a level, adjust down max level */
-	for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--)
-		if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL)
-			priv_timer[prev_owner].curr_skiplist_depth --;
+	for (i = list->depth - 1; i >= 0; i--)
+		if (list->head.sl_next[i] == NULL)
+			list->depth--;
 		else
 			break;
 
 	if (prev_owner != lcore_id || !local_is_locked)
-		rte_spinlock_unlock(&priv_timer[prev_owner].list_lock);
+		rte_spinlock_unlock(&list->lock);
 }
 
 /* Reset and start the timer associated with the timer handle (private func) */
@@ -371,15 +438,16 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	union rte_timer_status prev_status, status;
 	int ret;
 	unsigned lcore_id = rte_lcore_id();
+	struct priv_timer *priv_tim = &priv_timer[lcore_id];
+	int pending_lists_index;
 
 	/* round robin for tim_lcore */
 	if (tim_lcore == (unsigned)LCORE_ID_ANY) {
 		if (lcore_id < RTE_MAX_LCORE) {
 			/* EAL thread with valid lcore_id */
-			tim_lcore = rte_get_next_lcore(
-				priv_timer[lcore_id].prev_lcore,
-				0, 1);
-			priv_timer[lcore_id].prev_lcore = tim_lcore;
+			tim_lcore = rte_get_next_lcore(priv_tim->prev_lcore, 0,
+						       1);
+			priv_tim->prev_lcore = tim_lcore;
 		} else
 			/* non-EAL thread do not run rte_timer_manage(),
 			 * so schedule the timer on the first enabled lcore. */
@@ -395,7 +463,7 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	__TIMER_STAT_ADD(reset, 1);
 	if (prev_status.state == RTE_TIMER_RUNNING &&
 	    lcore_id < RTE_MAX_LCORE) {
-		priv_timer[lcore_id].updated = 1;
+		priv_tim->updated = 1;
 	}
 
 	/* remove it from list */
@@ -410,13 +478,14 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 	tim->arg = arg;
 
 	__TIMER_STAT_ADD(pending, 1);
-	timer_add(tim, tim_lcore, local_is_locked);
+	timer_add(tim, tim_lcore, local_is_locked, &pending_lists_index);
 
-	/* update state: as we are in CONFIG state, only us can modify
+	/* update state: as we are in CONFIG state, only we can modify
 	 * the state so we don't need to use cmpset() here */
 	rte_wmb();
 	status.state = RTE_TIMER_PENDING;
-	status.owner = (int16_t)tim_lcore;
+	status.index = pending_lists_index;
+	status.owner = tim_lcore;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -484,7 +553,8 @@ rte_timer_stop(struct rte_timer *tim)
 	/* mark timer as stopped */
 	rte_wmb();
 	status.state = RTE_TIMER_STOP;
-	status.owner = RTE_TIMER_NO_OWNER;
+	status.index = RTE_TIMER_NO_LCORE;
+	status.owner = RTE_TIMER_NO_LCORE;
 	tim->status.u32 = status.u32;
 
 	return 0;
@@ -506,119 +576,185 @@ rte_timer_pending(struct rte_timer *tim)
 }
 
 /* must be called periodically, run all timer that expired */
-void rte_timer_manage(void)
+void
+rte_timer_manage(void)
 {
 	union rte_timer_status status;
-	struct rte_timer *tim, *next_tim;
-	struct rte_timer *run_first_tim, **pprev;
-	unsigned lcore_id = rte_lcore_id();
+	struct rte_timer *tim, *next_tim, **pprev;
+	struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1];
 	struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1];
-	uint64_t cur_time;
-	int i, ret;
+	struct priv_timer *priv_tim;
+	unsigned int lcore_id = rte_lcore_id();
+	uint64_t cur_time, min_expire;
+	int i, j, min_idx, ret;
+	int npendlists, nrunlists = 0;
+	int local_is_locked = 0;
+	struct skiplist *dest_list, *list = NULL;
+	bool done;
+	int idx;
 
 	/* timer manager only runs on EAL thread with valid lcore_id */
 	assert(lcore_id < RTE_MAX_LCORE);
 
+	priv_tim = &priv_timer[lcore_id];
+
 	__TIMER_STAT_ADD(manage, 1);
-	/* optimize for the case where per-cpu list is empty */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL)
-		return;
-	cur_time = rte_get_timer_cycles();
+	npendlists = (priv_tim->multi_pendlists) ? n_enabled_lcores : 1;
+
+	for (i = 0; i < npendlists; i++) {
+		idx = (priv_tim->multi_pendlists) ? enabled_lcores[i] :
+		      SINGLE_LIST_IDX;
+		list = &priv_tim->pending_lists[idx];
+
+		/* optimize for the case where list is empty */
+		if (list->head.sl_next[0] == NULL)
+			continue;
+		cur_time = rte_get_timer_cycles();
 
 #ifdef RTE_ARCH_X86_64
-	/* on 64-bit the value cached in the pending_head.expired will be
-	 * updated atomically, so we can consult that for a quick check here
-	 * outside the lock */
-	if (likely(priv_timer[lcore_id].pending_head.expire > cur_time))
-		return;
+		/* on 64-bit the value cached in the list->head.expired will be
+		 * updated atomically, so we can consult that for a quick check
+		 * here outside the lock
+		 */
+		if (likely(list->head.expire > cur_time))
+			continue;
 #endif
 
-	/* browse ordered list, add expired timers in 'expired' list */
-	rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+		/* browse ordered list, add expired timers in 'expired' list */
+		rte_spinlock_lock(&list->lock);
 
-	/* if nothing to do just unlock and return */
-	if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL ||
-	    priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) {
-		rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
-		return;
-	}
+		/* if nothing to do just unlock and continue */
+		if (list->head.sl_next[0] == NULL ||
+		    list->head.sl_next[0]->expire > cur_time) {
+			rte_spinlock_unlock(&list->lock);
+			continue;
+		}
 
-	/* save start of list of expired timers */
-	tim = priv_timer[lcore_id].pending_head.sl_next[0];
+		/* save start of list of expired timers */
+		tim = list->head.sl_next[0];
+
+		/* break the existing list at current time point */
+		timer_get_prev_entries(cur_time, list, prev);
+		for (j = list->depth - 1; j >= 0; j--) {
+			if (prev[j] == &list->head)
+				continue;
+			list->head.sl_next[j] =
+			    prev[j]->sl_next[j];
+			if (prev[j]->sl_next[j] == NULL)
+				list->depth--;
+			prev[j]->sl_next[j] = NULL;
+		}
 
-	/* break the existing list at current time point */
-	timer_get_prev_entries(cur_time, lcore_id, prev);
-	for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) {
-		if (prev[i] == &priv_timer[lcore_id].pending_head)
-			continue;
-		priv_timer[lcore_id].pending_head.sl_next[i] =
-		    prev[i]->sl_next[i];
-		if (prev[i]->sl_next[i] == NULL)
-			priv_timer[lcore_id].curr_skiplist_depth--;
-		prev[i] ->sl_next[i] = NULL;
-	}
+		/* transition run-list from PENDING to RUNNING */
+		run_first_tims[nrunlists] = tim;
+		pprev = &run_first_tims[nrunlists];
+		nrunlists++;
+
+		for (; tim != NULL; tim = next_tim) {
+			next_tim = tim->sl_next[0];
+
+			ret = timer_set_running_state(tim);
+			if (likely(ret == 0)) {
+				pprev = &tim->sl_next[0];
+			} else {
+				/* another core is trying to re-config this one,
+				 * remove it from local expired list
+				 */
+				*pprev = next_tim;
+			}
+		}
 
-	/* transition run-list from PENDING to RUNNING */
-	run_first_tim = tim;
-	pprev = &run_first_tim;
+		/* update the next to expire timer value */
+		list->head.expire = (list->head.sl_next[0] == NULL) ?
+				    0 : list->head.sl_next[0]->expire;
 
-	for ( ; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
+		rte_spinlock_unlock(&list->lock);
+	}
 
-		ret = timer_set_running_state(tim);
-		if (likely(ret == 0)) {
-			pprev = &tim->sl_next[0];
-		} else {
-			/* another core is trying to re-config this one,
-			 * remove it from local expired list
-			 */
-			*pprev = next_tim;
+	/* Now process the run lists */
+	while (1) {
+		done = true;
+		min_expire = UINT64_MAX;
+		min_idx = 0;
+
+		/* Find the next oldest timer to process */
+		for (i = 0; i < nrunlists; i++) {
+			tim = run_first_tims[i];
+
+			if (tim != NULL && tim->expire < min_expire) {
+				min_expire = tim->expire;
+				min_idx = i;
+				done = false;
+			}
 		}
-	}
 
-	/* update the next to expire timer value */
-	priv_timer[lcore_id].pending_head.expire =
-	    (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 :
-		priv_timer[lcore_id].pending_head.sl_next[0]->expire;
+		if (done)
+			break;
+
+		tim = run_first_tims[min_idx];
 
-	rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+		/* Move down the runlist from which we picked a timer to
+		 * execute
+		 */
+		run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0];
 
-	/* now scan expired list and call callbacks */
-	for (tim = run_first_tim; tim != NULL; tim = next_tim) {
-		next_tim = tim->sl_next[0];
-		priv_timer[lcore_id].updated = 0;
-		priv_timer[lcore_id].running_tim = tim;
+		priv_tim->updated = 0;
+		priv_tim->running_tim = tim;
 
 		/* execute callback function with list unlocked */
 		tim->f(tim, tim->arg);
 
 		__TIMER_STAT_ADD(pending, -1);
 		/* the timer was stopped or reloaded by the callback
-		 * function, we have nothing to do here */
-		if (priv_timer[lcore_id].updated == 1)
+		 * function, we have nothing to do here
+		 */
+		if (priv_tim->updated == 1)
 			continue;
 
 		if (tim->period == 0) {
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
-			status.owner = RTE_TIMER_NO_OWNER;
+			status.index = RTE_TIMER_NO_LCORE;
+			status.owner = RTE_TIMER_NO_LCORE;
 			rte_wmb();
 			tim->status.u32 = status.u32;
 		}
 		else {
-			/* keep it in list and mark timer as pending */
-			rte_spinlock_lock(&priv_timer[lcore_id].list_lock);
+			idx = (priv_tim->multi_pendlists) ? lcore_id :
+			      SINGLE_LIST_IDX;
+			dest_list = &priv_tim->pending_lists[idx];
+
+			/* If the destination list is the current list
+			 * we can acquire the lock here, and hold it
+			 * across removal and insertion of the timer.
+			 */
+			if (list == dest_list) {
+				rte_spinlock_lock(&list->lock);
+				local_is_locked = 1;
+			}
+
+			/* set timer state back to PENDING and
+			 * reinsert it in pending list
+			 */
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(pending, 1);
-			status.owner = (int16_t)lcore_id;
+			status.index = tim->status.index;
+			status.owner = lcore_id;
 			rte_wmb();
 			tim->status.u32 = status.u32;
-			__rte_timer_reset(tim, tim->expire + tim->period,
-				tim->period, lcore_id, tim->f, tim->arg, 1);
-			rte_spinlock_unlock(&priv_timer[lcore_id].list_lock);
+
+			__rte_timer_reset(tim,
+					  tim->expire + tim->period,
+					  tim->period, lcore_id,
+					  tim->f, tim->arg, local_is_locked);
+
+			if (local_is_locked) {
+				rte_spinlock_unlock(&list->lock);
+				local_is_locked = 0;
+			}
 		}
 	}
-	priv_timer[lcore_id].running_tim = NULL;
+	priv_tim->running_tim = NULL;
 }
 
 /* dump statistics about timers */
diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
index a276a73..5870b4a 100644
--- a/lib/librte_timer/rte_timer.h
+++ b/lib/librte_timer/rte_timer.h
@@ -77,7 +77,7 @@ extern "C" {
 #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
 #define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */
 
-#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */
+#define RTE_TIMER_NO_LCORE -2
 
 /**
  * Timer type: Periodic or single (one-shot).
@@ -94,10 +94,11 @@ enum rte_timer_type {
 union rte_timer_status {
 	RTE_STD_C11
 	struct {
-		uint16_t state;  /**< Stop, pending, running, config. */
-		int16_t owner;   /**< The lcore that owns the timer. */
+		unsigned state	: 2;
+		int index	: 15;
+		int owner	: 15;
 	};
-	uint32_t u32;            /**< To atomic-set status + owner. */
+	uint32_t u32;            /**< To atomic-set status, index, owner */
 };
 
 #ifdef RTE_LIBRTE_TIMER_DEBUG
@@ -168,6 +169,26 @@ struct rte_timer
 void rte_timer_subsystem_init(void);
 
 /**
+ * Enable multiple pending lists for specified lcore ID.
+ *
+ * This function enables the use of per-installer pending lists for a
+ * target lcore. When timers are being installed from multiple
+ * lcores simultaneously, this option can increase performance by
+ * reducing contention for the same pending list.
+ *
+ * This function should be called after the timer subsystem has been
+ * initialized, and before any timers have been started on the desired
+ * lcore.
+ *
+ * @param lcore_id
+ *  The lcore on which to enable multiple pending lists
+ * @return
+ *   - 0: Success
+ *   - (-1): Failure; pending timers already exist on the default list.
+ */
+int rte_timer_subsystem_set_multi_pendlists(unsigned int lcore_id);
+
+/**
  * Initialize a timer handle.
  *
  * The rte_timer_init() function initializes the timer handle *tim*
diff --git a/lib/librte_timer/rte_timer_version.map b/lib/librte_timer/rte_timer_version.map
index 9b2e4b8..7d4cbe8 100644
--- a/lib/librte_timer/rte_timer_version.map
+++ b/lib/librte_timer/rte_timer_version.map
@@ -13,3 +13,9 @@ DPDK_2.0 {
 
 	local: *;
 };
+
+DPDK_17.11 {
+	global:
+
+	rte_timer_subsystem_set_multi_pendlists;
+} DPDK_2.0;
-- 
2.6.4



More information about the dev mailing list