[dpdk-dev] [PATCH v3 1/1] eventdev: add new software event timer adapter
Mattias Rönnblom
mattias.ronnblom at ericsson.com
Fri Dec 14 22:15:05 CET 2018
On 2018-12-14 16:45, Erik Gabriel Carrillo wrote:
> This patch introduces a new version of the event timer adapter software
> PMD. In the original design, timer event producer lcores in the primary
> and secondary processes enqueued event timers into a ring, and a
> service core in the primary process dequeued them and processed them
> further. To improve performance, this version does away with the ring
> and lets lcores in both primary and secondary processes insert timers
> directly into timer skiplist data structures; the service core directly
> accesses the lists as well, when looking for timers that have expired.
>
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
> ---
> lib/librte_eventdev/rte_event_timer_adapter.c | 688 +++++++++++---------------
> 1 file changed, 276 insertions(+), 412 deletions(-)
>
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 79070d4..029a45a 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -19,6 +19,7 @@
> #include <rte_timer.h>
> #include <rte_service_component.h>
> #include <rte_cycles.h>
> +#include <rte_random.h>
You aren't using anything from rte_random.h.
/../
> -static __rte_always_inline uint16_t
> -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer **evtims,
> - uint16_t nb_evtims)
> +static uint16_t
> +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer **evtims,
> + uint16_t nb_evtims)
> {
> - uint16_t i;
> - int ret;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - struct msg *msgs[nb_evtims];
> + int i, ret;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + uint32_t lcore_id = rte_lcore_id();
> + struct rte_timer *tim, *tims[nb_evtims];
> + uint64_t cycles;
>
> #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
> /* Check that the service is running. */
> @@ -1101,101 +979,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> }
> #endif
>
> - sw_data = adapter->data->adapter_priv;
> + /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
> + * the highest lcore to insert such timers into
> + */
> + if (lcore_id == LCORE_ID_ANY)
> + lcore_id = RTE_MAX_LCORE - 1;
> +
> + /* If this is the first time we're arming an event timer on this lcore,
> + * mark this lcore as "in use"; this will cause the service
> + * function to process the timer list that corresponds to this lcore.
> + */
> + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
> + rte_spinlock_lock(&sw->poll_lcores_sl);
> + EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
> + lcore_id);
> + sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
> + rte_spinlock_unlock(&sw->poll_lcores_sl);
> + }
>
> - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> + ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
> + nb_evtims);
> if (ret < 0) {
> rte_errno = ENOSPC;
> return 0;
> }
>
> - /* Let the service know we're producing messages for it to process */
> - rte_atomic16_inc(&sw_data->message_producer_count);
> -
> - /* If the service is managing timers, wait for it to finish */
> - while (sw_data->service_phase == 2)
> - rte_pause();
> -
> - rte_smp_rmb();
> -
> for (i = 0; i < nb_evtims; i++) {
> /* Don't modify the event timer state in these cases */
> if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
> rte_errno = EALREADY;
> break;
> } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> + evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> rte_errno = EINVAL;
> break;
> }
>
> ret = check_timeout(evtims[i], adapter);
> - if (ret == -1) {
> + if (unlikely(ret == -1)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
> rte_errno = EINVAL;
> break;
> - }
> - if (ret == -2) {
> + } else if (unlikely(ret == -2)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
> rte_errno = EINVAL;
> break;
> }
>
> - if (check_destination_event_queue(evtims[i], adapter) < 0) {
> + if (unlikely(check_destination_event_queue(evtims[i],
> + adapter) < 0)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> rte_errno = EINVAL;
> break;
> }
>
> - /* Checks passed, set up a message to enqueue */
> - msgs[i]->type = MSG_TYPE_ARM;
> - msgs[i]->evtim = evtims[i];
> + tim = tims[i];
> + rte_timer_init(tim);
>
> - /* Set the payload pointer if not set. */
> - if (evtims[i]->ev.event_ptr == NULL)
> - evtims[i]->ev.event_ptr = evtims[i];
> + evtims[i]->impl_opaque[0] = (uintptr_t)tim;
> + evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
>
> - /* msg objects that get enqueued successfully will be freed
> - * either by a future cancel operation or by the timer
> - * expiration callback.
> - */
> - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> - rte_errno = ENOSPC;
> + cycles = get_timeout_cycles(evtims[i], adapter);
> + ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
> + SINGLE, lcore_id, NULL, evtims[i]);
> + if (ret < 0) {
> + /* tim was in RUNNING or CONFIG state */
> + evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> break;
> }
>
> - EVTIM_LOG_DBG("enqueued ARM message to ring");
> -
> + rte_smp_wmb();
> + EVTIM_LOG_DBG("armed an event timer");
> evtims[i]->state = RTE_EVENT_TIMER_ARMED;
This looks like you want a reader to see the impl_opaque[] stores,
before the state store, which sounds like a good idea.
However, I fail to find the corresponding read barriers on the reader
side. Shouldn't swtim_cancel_burst() have such? It's loading state, and
loading impl_opaque[], but there's no guarantee those memory loads
happens in program order.
More information about the dev
mailing list