[dpdk-dev] [PATCH v3 1/1] eventdev: add new software event timer adapter

Mattias Rönnblom mattias.ronnblom at ericsson.com
Fri Dec 14 22:15:05 CET 2018


On 2018-12-14 16:45, Erik Gabriel Carrillo wrote:
> This patch introduces a new version of the event timer adapter software
> PMD. In the original design, timer event producer lcores in the primary
> and secondary processes enqueued event timers into a ring, and a
> service core in the primary process dequeued them and processed them
> further.  To improve performance, this version does away with the ring
> and lets lcores in both primary and secondary processes insert timers
> directly into timer skiplist data structures; the service core directly
> accesses the lists as well, when looking for timers that have expired.
> 
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
> ---
>   lib/librte_eventdev/rte_event_timer_adapter.c | 688 +++++++++++---------------
>   1 file changed, 276 insertions(+), 412 deletions(-)
> 
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 79070d4..029a45a 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -19,6 +19,7 @@
>   #include <rte_timer.h>
>   #include <rte_service_component.h>
>   #include <rte_cycles.h>
> +#include <rte_random.h>

You aren't using anything from rte_random.h.

/../

> -static __rte_always_inline uint16_t
> -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> -			  struct rte_event_timer **evtims,
> -			  uint16_t nb_evtims)
> +static uint16_t
> +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> +		struct rte_event_timer **evtims,
> +		uint16_t nb_evtims)
>   {
> -	uint16_t i;
> -	int ret;
> -	struct rte_event_timer_adapter_sw_data *sw_data;
> -	struct msg *msgs[nb_evtims];
> +	int i, ret;
> +	struct swtim *sw = swtim_pmd_priv(adapter);
> +	uint32_t lcore_id = rte_lcore_id();
> +	struct rte_timer *tim, *tims[nb_evtims];
> +	uint64_t cycles;
>   
>   #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
>   	/* Check that the service is running. */
> @@ -1101,101 +979,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
>   	}
>   #endif
>   
> -	sw_data = adapter->data->adapter_priv;
> +	/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
> +	 * the highest lcore to insert such timers into
> +	 */
> +	if (lcore_id == LCORE_ID_ANY)
> +		lcore_id = RTE_MAX_LCORE - 1;
> +
> +	/* If this is the first time we're arming an event timer on this lcore,
> +	 * mark this lcore as "in use"; this will cause the service
> +	 * function to process the timer list that corresponds to this lcore.
> +	 */
> +	if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
> +		rte_spinlock_lock(&sw->poll_lcores_sl);
> +		EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
> +			      lcore_id);
> +		sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
> +		rte_spinlock_unlock(&sw->poll_lcores_sl);
> +	}
>   
> -	ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> +	ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
> +				   nb_evtims);
>   	if (ret < 0) {
>   		rte_errno = ENOSPC;
>   		return 0;
>   	}
>   
> -	/* Let the service know we're producing messages for it to process */
> -	rte_atomic16_inc(&sw_data->message_producer_count);
> -
> -	/* If the service is managing timers, wait for it to finish */
> -	while (sw_data->service_phase == 2)
> -		rte_pause();
> -
> -	rte_smp_rmb();
> -
>   	for (i = 0; i < nb_evtims; i++) {
>   		/* Don't modify the event timer state in these cases */
>   		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
>   			rte_errno = EALREADY;
>   			break;
>   		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> -		    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> +			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
>   		ret = check_timeout(evtims[i], adapter);
> -		if (ret == -1) {
> +		if (unlikely(ret == -1)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
>   			rte_errno = EINVAL;
>   			break;
> -		}
> -		if (ret == -2) {
> +		} else if (unlikely(ret == -2)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		if (check_destination_event_queue(evtims[i], adapter) < 0) {
> +		if (unlikely(check_destination_event_queue(evtims[i],
> +							   adapter) < 0)) {
>   			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			rte_errno = EINVAL;
>   			break;
>   		}
>   
> -		/* Checks passed, set up a message to enqueue */
> -		msgs[i]->type = MSG_TYPE_ARM;
> -		msgs[i]->evtim = evtims[i];
> +		tim = tims[i];
> +		rte_timer_init(tim);
>   
> -		/* Set the payload pointer if not set. */
> -		if (evtims[i]->ev.event_ptr == NULL)
> -			evtims[i]->ev.event_ptr = evtims[i];
> +		evtims[i]->impl_opaque[0] = (uintptr_t)tim;
> +		evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
>   
> -		/* msg objects that get enqueued successfully will be freed
> -		 * either by a future cancel operation or by the timer
> -		 * expiration callback.
> -		 */
> -		if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> -			rte_errno = ENOSPC;
> +		cycles = get_timeout_cycles(evtims[i], adapter);
> +		ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
> +					  SINGLE, lcore_id, NULL, evtims[i]);
> +		if (ret < 0) {
> +			/* tim was in RUNNING or CONFIG state */
> +			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
>   			break;
>   		}
>   
> -		EVTIM_LOG_DBG("enqueued ARM message to ring");
> -
> +		rte_smp_wmb();
> +		EVTIM_LOG_DBG("armed an event timer");
>   		evtims[i]->state = RTE_EVENT_TIMER_ARMED;

This looks like you want a reader to see the impl_opaque[] stores, 
before the state store, which sounds like a good idea.

However, I fail to find the corresponding read barriers on the reader 
side. Shouldn't swtim_cancel_burst() have such? It's loading state, and 
loading impl_opaque[], but there's no guarantee those memory loads 
happens in program order.


More information about the dev mailing list