[dpdk-dev] [PATCH 2/2] lib/timer: relax barrier for status update

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Wed Apr 8 23:16:13 CEST 2020


<snip>

> > Subject: [PATCH 2/2] lib/timer: relax barrier for status update
> >
> > Volatile has no ordering semantics. The rte_timer structure defines
> > timer status as a volatile variable and uses the rte_r/wmb barrier to
> > guarantee inter-thread visibility.
> >
> > This patch optimized the volatile operation with c11 atomic operations
> > and one-way barrier to save the performance penalty. According to the
> > timer_perf_autotest benchmarking results, this patch can uplift
> > 10%~16% timer appending performance, 3%~20% timer resetting
> > performance and 45% timer callbacks scheduling performance on aarch64
> > and no loss in performance for x86.
> >
> > Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> > Signed-off-by: Phil Yang <phil.yang at arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu at arm.com>
> 
> Hi Phil,
> 
> It seems like the consensus is to generally avoid replacing rte_atomic_*
> interfaces with the GCC builtins directly.   In other areas of DPDK that are
> being patched, are the <std_atomic.h> C11 APIs going to be investigated?   It
> seems like that decision will apply here as well.
Agree. The new APIs are going to be 1 to 1 mapped with the built-in intrinsics (the memory orderings used themselves will not change). We should go ahead with the review and conclude any issues. Once the decision is made on what APIs to use, we can submit the next version using the APIs decided.

> 
> Thanks,
> Erik
> 
> > ---
> >  lib/librte_timer/rte_timer.c | 90 +++++++++++++++++++++++++++++++----
> > ---------
> >  lib/librte_timer/rte_timer.h |  2 +-
> >  2 files changed, 65 insertions(+), 27 deletions(-)
> >
> > diff --git a/lib/librte_timer/rte_timer.c
> > b/lib/librte_timer/rte_timer.c index 269e921..be0262d 100644
> > --- a/lib/librte_timer/rte_timer.c
> > +++ b/lib/librte_timer/rte_timer.c
> > @@ -10,7 +10,6 @@
> >  #include <assert.h>
> >  #include <sys/queue.h>
> >
> > -#include <rte_atomic.h>
> >  #include <rte_common.h>
> >  #include <rte_cycles.h>
> >  #include <rte_eal_memconfig.h>
> > @@ -218,7 +217,7 @@ rte_timer_init(struct rte_timer *tim)
> >
> >  	status.state = RTE_TIMER_STOP;
> >  	status.owner = RTE_TIMER_NO_OWNER;
> > -	tim->status.u32 = status.u32;
> > +	__atomic_store_n(&tim->status.u32, status.u32,
> > __ATOMIC_RELAXED);
> >  }
> >
> >  /*
> > @@ -239,9 +238,9 @@ timer_set_config_state(struct rte_timer *tim,
> >
> >  	/* wait that the timer is in correct status before update,
> >  	 * and mark it as being configured */
> > -	while (success == 0) {
> > -		prev_status.u32 = tim->status.u32;
> > +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> > __ATOMIC_RELAXED);
> >
> > +	while (success == 0) {
> >  		/* timer is running on another core
> >  		 * or ready to run on local core, exit
> >  		 */
> > @@ -258,9 +257,20 @@ timer_set_config_state(struct rte_timer *tim,
> >  		 * mark it atomically as being configured */
> >  		status.state = RTE_TIMER_CONFIG;
> >  		status.owner = (int16_t)lcore_id;
> > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > -					      prev_status.u32,
> > -					      status.u32);
> > +		/* If status is observed as RTE_TIMER_CONFIG earlier,
> > +		 * that's not going to cause any issues because the
> > +		 * pattern is read for status then read the other members.
> > +		 * In one of the callers to timer_set_config_state
> > +		 * (the __rte_timer_reset) we set other members to the
> > +		 * structure (period, expire, f, arg) we want these
> > +		 * changes to be observed after our change to status.
> > +		 * So we need __ATOMIC_ACQUIRE here.
> > +		 */
> > +		success = __atomic_compare_exchange_n(&tim-
> > >status.u32,
> > +					      &prev_status.u32,
> > +					      status.u32, 0,
> > +					      __ATOMIC_ACQUIRE,
> > +					      __ATOMIC_RELAXED);
> >  	}
> >
> >  	ret_prev_status->u32 = prev_status.u32; @@ -279,20 +289,27 @@
> > timer_set_running_state(struct rte_timer *tim)
> >
> >  	/* wait that the timer is in correct status before update,
> >  	 * and mark it as running */
> > -	while (success == 0) {
> > -		prev_status.u32 = tim->status.u32;
> > +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> > __ATOMIC_RELAXED);
> >
> > +	while (success == 0) {
> >  		/* timer is not pending anymore */
> >  		if (prev_status.state != RTE_TIMER_PENDING)
> >  			return -1;
> >
> >  		/* here, we know that timer is stopped or pending,
> > -		 * mark it atomically as being configured */
> > +		 * mark it atomically as being running
> > +		 */
> >  		status.state = RTE_TIMER_RUNNING;
> >  		status.owner = (int16_t)lcore_id;
> > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > -					      prev_status.u32,
> > -					      status.u32);
> > +		/* RUNNING states are acting as locked states. If the
> > +		 * timer is in RUNNING state, the state cannot be changed
> > +		 * by other threads. So, we should use ACQUIRE here.
> > +		 */
> > +		success = __atomic_compare_exchange_n(&tim-
> > >status.u32,
> > +					      &prev_status.u32,
> > +					      status.u32, 0,
> > +					      __ATOMIC_ACQUIRE,
> > +					      __ATOMIC_RELAXED);
> >  	}
> >
> >  	return 0;
> > @@ -520,10 +537,12 @@ __rte_timer_reset(struct rte_timer *tim,
> > uint64_t expire,
> >
> >  	/* update state: as we are in CONFIG state, only us can modify
> >  	 * the state so we don't need to use cmpset() here */
> > -	rte_wmb();
> >  	status.state = RTE_TIMER_PENDING;
> >  	status.owner = (int16_t)tim_lcore;
> > -	tim->status.u32 = status.u32;
> > +	/* The "RELEASE" ordering guarantees the memory operations above
> > +	 * the status update are observed before the update by all threads
> > +	 */
> > +	__atomic_store_n(&tim->status.u32, status.u32,
> > __ATOMIC_RELEASE);
> >
> >  	if (tim_lcore != lcore_id || !local_is_locked)
> >  		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
> > @@ -600,10 +619,12 @@ __rte_timer_stop(struct rte_timer *tim, int
> > local_is_locked,
> >  	}
> >
> >  	/* mark timer as stopped */
> > -	rte_wmb();
> >  	status.state = RTE_TIMER_STOP;
> >  	status.owner = RTE_TIMER_NO_OWNER;
> > -	tim->status.u32 = status.u32;
> > +	/* The "RELEASE" ordering guarantees the memory operations above
> > +	 * the status update are observed before the update by all threads
> > +	 */
> > +	__atomic_store_n(&tim->status.u32, status.u32,
> > __ATOMIC_RELEASE);
> >
> >  	return 0;
> >  }
> > @@ -637,7 +658,8 @@ rte_timer_stop_sync(struct rte_timer *tim)  int
> > rte_timer_pending(struct rte_timer *tim)  {
> > -	return tim->status.state == RTE_TIMER_PENDING;
> > +	return __atomic_load_n(&tim->status.state,
> > +				__ATOMIC_RELAXED) ==
> > RTE_TIMER_PENDING;
> >  }
> >
> >  /* must be called periodically, run all timer that expired */ @@
> > -739,8
> > +761,12 @@ __rte_timer_manage(struct rte_timer_data *timer_data)
> >  			/* remove from done list and mark timer as stopped
> */
> >  			status.state = RTE_TIMER_STOP;
> >  			status.owner = RTE_TIMER_NO_OWNER;
> > -			rte_wmb();
> > -			tim->status.u32 = status.u32;
> > +			/* The "RELEASE" ordering guarantees the memory
> > +			 * operations above the status update are observed
> > +			 * before the update by all threads
> > +			 */
> > +			__atomic_store_n(&tim->status.u32, status.u32,
> > +				__ATOMIC_RELEASE);
> >  		}
> >  		else {
> >  			/* keep it in list and mark timer as pending */ @@ -
> > 748,8 +774,12 @@ __rte_timer_manage(struct rte_timer_data *timer_data)
> >  			status.state = RTE_TIMER_PENDING;
> >  			__TIMER_STAT_ADD(priv_timer, pending, 1);
> >  			status.owner = (int16_t)lcore_id;
> > -			rte_wmb();
> > -			tim->status.u32 = status.u32;
> > +			/* The "RELEASE" ordering guarantees the memory
> > +			 * operations above the status update are observed
> > +			 * before the update by all threads
> > +			 */
> > +			__atomic_store_n(&tim->status.u32, status.u32,
> > +				__ATOMIC_RELEASE);
> >  			__rte_timer_reset(tim, tim->expire + tim->period,
> >  				tim->period, lcore_id, tim->f, tim->arg, 1,
> >  				timer_data);
> > @@ -919,8 +949,12 @@ rte_timer_alt_manage(uint32_t timer_data_id,
> >  			/* remove from done list and mark timer as stopped
> */
> >  			status.state = RTE_TIMER_STOP;
> >  			status.owner = RTE_TIMER_NO_OWNER;
> > -			rte_wmb();
> > -			tim->status.u32 = status.u32;
> > +			/* The "RELEASE" ordering guarantees the memory
> > +			 * operations above the status update are observed
> > +			 * before the update by all threads
> > +			 */
> > +			__atomic_store_n(&tim->status.u32, status.u32,
> > +				__ATOMIC_RELEASE);
> >  		} else {
> >  			/* keep it in list and mark timer as pending */
> >  			rte_spinlock_lock(
> > @@ -928,8 +962,12 @@ rte_timer_alt_manage(uint32_t timer_data_id,
> >  			status.state = RTE_TIMER_PENDING;
> >  			__TIMER_STAT_ADD(data->priv_timer, pending, 1);
> >  			status.owner = (int16_t)this_lcore;
> > -			rte_wmb();
> > -			tim->status.u32 = status.u32;
> > +			/* The "RELEASE" ordering guarantees the memory
> > +			 * operations above the status update are observed
> > +			 * before the update by all threads
> > +			 */
> > +			__atomic_store_n(&tim->status.u32, status.u32,
> > +				__ATOMIC_RELEASE);
> >  			__rte_timer_reset(tim, tim->expire + tim->period,
> >  				tim->period, this_lcore, tim->f, tim->arg, 1,
> >  				data);
> > diff --git a/lib/librte_timer/rte_timer.h
> > b/lib/librte_timer/rte_timer.h index c6b3d45..df533fa 100644
> > --- a/lib/librte_timer/rte_timer.h
> > +++ b/lib/librte_timer/rte_timer.h
> > @@ -101,7 +101,7 @@ struct rte_timer
> >  {
> >  	uint64_t expire;       /**< Time when timer expire. */
> >  	struct rte_timer *sl_next[MAX_SKIPLIST_DEPTH];
> > -	volatile union rte_timer_status status; /**< Status of timer. */
> > +	union rte_timer_status status; /**< Status of timer. */
> >  	uint64_t period;       /**< Period of timer (0 if not periodic). */
> >  	rte_timer_cb_t f;      /**< Callback function. */
> >  	void *arg;             /**< Argument to callback function. */
> > --
> > 2.7.4



More information about the dev mailing list