[dpdk-dev] [PATCH v2 1/1] eventdev: add new software event timer adapter
Mattias Rönnblom
mattias.ronnblom at ericsson.com
Sun Dec 9 20:17:22 CET 2018
On 2018-12-07 21:34, Erik Gabriel Carrillo wrote:
> This patch introduces a new version of the event timer adapter software
> PMD. In the original design, timer event producer lcores in the primary
> and secondary processes enqueued event timers into a ring, and a
> service core in the primary process dequeued them and processed them
> further. To improve performance, this version does away with the ring
> and lets lcores in both primary and secondary processes insert timers
> directly into timer skiplist data structures; the service core directly
> accesses the lists as well, when looking for timers that have expired.
>
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
> ---
> lib/librte_eventdev/rte_event_timer_adapter.c | 687 +++++++++++---------------
> 1 file changed, 275 insertions(+), 412 deletions(-)
>
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 79070d4..9c528cb 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -7,6 +7,7 @@
> #include <inttypes.h>
> #include <stdbool.h>
> #include <sys/queue.h>
> +#include <assert.h>
>
You have no assert() calls, from what I can see. Include <rte_debug.h>
for RTE_ASSERT().
> #include <rte_memzone.h>
> #include <rte_memory.h>
> @@ -19,6 +20,7 @@
> #include <rte_timer.h>
> #include <rte_service_component.h>
> #include <rte_cycles.h>
> +#include <rte_random.h>
>
> #include "rte_eventdev.h"
> #include "rte_eventdev_pmd.h"
> @@ -34,7 +36,7 @@ static int evtim_buffer_logtype;
>
> static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
>
> -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
> +static const struct rte_event_timer_adapter_ops swtim_ops;
>
> #define EVTIM_LOG(level, logtype, ...) \
> rte_log(RTE_LOG_ ## level, logtype, \
> @@ -211,7 +213,7 @@ rte_event_timer_adapter_create_ext(
> * implementation.
> */
> if (adapter->ops == NULL)
> - adapter->ops = &sw_event_adapter_timer_ops;
> + adapter->ops = &swtim_ops;
>
> /* Allow driver to do some setup */
> FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
> @@ -334,7 +336,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
> * implementation.
> */
> if (adapter->ops == NULL)
> - adapter->ops = &sw_event_adapter_timer_ops;
> + adapter->ops = &swtim_ops;
>
> /* Set fast-path function pointers */
> adapter->arm_burst = adapter->ops->arm_burst;
> @@ -491,6 +493,7 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
> }
>
> *nb_events_inv = 0;
> +
> *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
> &events[tail_idx], n);
> if (*nb_events_flushed != n && rte_errno == -EINVAL) {
> @@ -498,137 +501,123 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
> (*nb_events_inv)++;
> }
>
> + if (*nb_events_flushed > 0)
> + EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event "
> + "device", *nb_events_flushed);
> +
> bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;
> }
>
> /*
> * Software event timer adapter implementation
> */
> -
> -struct rte_event_timer_adapter_sw_data {
> - /* List of messages for outstanding timers */
> - TAILQ_HEAD(, msg) msgs_tailq_head;
> - /* Lock to guard tailq and armed count */
> - rte_spinlock_t msgs_tailq_sl;
> +struct swtim {
> /* Identifier of service executing timer management logic. */
> uint32_t service_id;
> /* The cycle count at which the adapter should next tick */
> uint64_t next_tick_cycles;
> - /* Incremented as the service moves through phases of an iteration */
> - volatile int service_phase;
> /* The tick resolution used by adapter instance. May have been
> * adjusted from what user requested
> */
> uint64_t timer_tick_ns;
> /* Maximum timeout in nanoseconds allowed by adapter instance. */
> uint64_t max_tmo_ns;
> - /* Ring containing messages to arm or cancel event timers */
> - struct rte_ring *msg_ring;
> - /* Mempool containing msg objects */
> - struct rte_mempool *msg_pool;
> /* Buffered timer expiry events to be enqueued to an event device. */
> struct event_buffer buffer;
> /* Statistics */
> struct rte_event_timer_adapter_stats stats;
> - /* The number of threads currently adding to the message ring */
> - rte_atomic16_t message_producer_count;
> + /* Mempool of timer objects */
> + struct rte_mempool *tim_pool;
> + /* Back pointer for convenience */
> + struct rte_event_timer_adapter *adapter;
> + /* Identifier of timer data instance */
> + uint32_t timer_data_id;
> + /* Track which cores have actually armed a timer */
> + rte_atomic16_t in_use[RTE_MAX_LCORE];
> + /* Track which cores' timer lists should be polled */
> + unsigned int poll_lcores[RTE_MAX_LCORE];
> + /* The number of lists that should be polled */
> + int n_poll_lcores;
> + /* Lock to atomically access the above two variables */
> + rte_spinlock_t poll_lcores_sl;
> };
>
> -enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
> -
> -struct msg {
> - enum msg_type type;
> - struct rte_event_timer *evtim;
> - struct rte_timer tim;
> - TAILQ_ENTRY(msg) msgs;
> -};
> +static inline struct swtim *
> +swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)
> +{
> + return adapter->data->adapter_priv;
> +}
>
> static void
> -sw_event_timer_cb(struct rte_timer *tim, void *arg)
> +swtim_callback(void *arg)
> {
> - int ret;
> + struct rte_timer *tim = arg;
> + struct rte_event_timer *evtim = tim->arg;
> + struct rte_event_timer_adapter *adapter;
> + struct swtim *sw;
> uint16_t nb_evs_flushed = 0;
> uint16_t nb_evs_invalid = 0;
> uint64_t opaque;
> - struct rte_event_timer *evtim;
> - struct rte_event_timer_adapter *adapter;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> + int ret;
>
> - evtim = arg;
> opaque = evtim->impl_opaque[1];
> adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;
> - sw_data = adapter->data->adapter_priv;
> + sw = swtim_pmd_priv(adapter);
>
> - ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
> + ret = event_buffer_add(&sw->buffer, &evtim->ev);
> if (ret < 0) {
> /* If event buffer is full, put timer back in list with
> * immediate expiry value, so that we process it again on the
> * next iteration.
> */
> - rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),
> - sw_event_timer_cb, evtim);
> + rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,
> + rte_lcore_id(), NULL, evtim);
> +
> + sw->stats.evtim_retry_count++;
>
> - sw_data->stats.evtim_retry_count++;
> EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
> "immediate expiry value");
> } else {
> - struct msg *m = container_of(tim, struct msg, tim);
> - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
> EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
> - evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> + rte_mempool_put(sw->tim_pool, tim);
> + sw->stats.evtim_exp_count++;
>
> - /* Free the msg object containing the rte_timer now that
> - * we've buffered its event successfully.
> - */
> - rte_mempool_put(sw_data->msg_pool, m);
> -
> - /* Bump the count when we successfully add an expiry event to
> - * the buffer.
> - */
> - sw_data->stats.evtim_exp_count++;
> + evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> }
>
> - if (event_buffer_batch_ready(&sw_data->buffer)) {
> - event_buffer_flush(&sw_data->buffer,
> + if (event_buffer_batch_ready(&sw->buffer)) {
> + event_buffer_flush(&sw->buffer,
> adapter->data->event_dev_id,
> adapter->data->event_port_id,
> &nb_evs_flushed,
> &nb_evs_invalid);
>
> - sw_data->stats.ev_enq_count += nb_evs_flushed;
> - sw_data->stats.ev_inv_count += nb_evs_invalid;
> + sw->stats.ev_enq_count += nb_evs_flushed;
> + sw->stats.ev_inv_count += nb_evs_invalid;
> }
> }
>
> static __rte_always_inline uint64_t
> get_timeout_cycles(struct rte_event_timer *evtim,
> - struct rte_event_timer_adapter *adapter)
> + const struct rte_event_timer_adapter *adapter)
> {
> - uint64_t timeout_ns;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> -
> - sw_data = adapter->data->adapter_priv;
> - timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;
> return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
> -
> }
>
> /* This function returns true if one or more (adapter) ticks have occurred since
> * the last time it was called.
> */
> static inline bool
> -adapter_did_tick(struct rte_event_timer_adapter *adapter)
> +swtim_did_tick(struct swtim *sw)
> {
> uint64_t cycles_per_adapter_tick, start_cycles;
> uint64_t *next_tick_cyclesp;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> -
> - sw_data = adapter->data->adapter_priv;
> - next_tick_cyclesp = &sw_data->next_tick_cycles;
>
> - cycles_per_adapter_tick = sw_data->timer_tick_ns *
> + next_tick_cyclesp = &sw->next_tick_cycles;
> + cycles_per_adapter_tick = sw->timer_tick_ns *
> (rte_get_timer_hz() / NSECPERSEC);
> -
> start_cycles = rte_get_timer_cycles();
>
> /* Note: initially, *next_tick_cyclesp == 0, so the clause below will
> @@ -640,7 +629,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)
> * boundary.
> */
> start_cycles -= start_cycles % cycles_per_adapter_tick;
> -
> *next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
>
> return true;
> @@ -655,15 +643,12 @@ check_timeout(struct rte_event_timer *evtim,
> const struct rte_event_timer_adapter *adapter)
> {
> uint64_t tmo_nsec;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> -
> - sw_data = adapter->data->adapter_priv;
> - tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;
> + struct swtim *sw = swtim_pmd_priv(adapter);
>
> - if (tmo_nsec > sw_data->max_tmo_ns)
> + tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;
> + if (tmo_nsec > sw->max_tmo_ns)
> return -1;
> -
> - if (tmo_nsec < sw_data->timer_tick_ns)
> + if (tmo_nsec < sw->timer_tick_ns)
> return -2;
>
> return 0;
> @@ -691,110 +676,34 @@ check_destination_event_queue(struct rte_event_timer *evtim,
> return 0;
> }
>
> -#define NB_OBJS 32
> static int
> -sw_event_timer_adapter_service_func(void *arg)
> +swtim_service_func(void *arg)
> {
> - int i, num_msgs;
> - uint64_t cycles, opaque;
> + struct rte_event_timer_adapter *adapter = arg;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> uint16_t nb_evs_flushed = 0;
> uint16_t nb_evs_invalid = 0;
> - struct rte_event_timer_adapter *adapter;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - struct rte_event_timer *evtim = NULL;
> - struct rte_timer *tim = NULL;
> - struct msg *msg, *msgs[NB_OBJS];
> -
> - adapter = arg;
> - sw_data = adapter->data->adapter_priv;
> -
> - sw_data->service_phase = 1;
> - rte_smp_wmb();
> -
> - while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
> - !rte_ring_empty(sw_data->msg_ring)) {
> -
> - num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
> - (void **)msgs, NB_OBJS, NULL);
> -
> - for (i = 0; i < num_msgs; i++) {
> - int ret = 0;
> -
> - RTE_SET_USED(ret);
> -
> - msg = msgs[i];
> - evtim = msg->evtim;
> -
> - switch (msg->type) {
> - case MSG_TYPE_ARM:
> - EVTIM_SVC_LOG_DBG("dequeued ARM message from "
> - "ring");
> - tim = &msg->tim;
> - rte_timer_init(tim);
> - cycles = get_timeout_cycles(evtim,
> - adapter);
> - ret = rte_timer_reset(tim, cycles, SINGLE,
> - rte_lcore_id(),
> - sw_event_timer_cb,
> - evtim);
> - RTE_ASSERT(ret == 0);
> -
> - evtim->impl_opaque[0] = (uintptr_t)tim;
> - evtim->impl_opaque[1] = (uintptr_t)adapter;
> -
> - TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
> - msg,
> - msgs);
> - break;
> - case MSG_TYPE_CANCEL:
> - EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
> - "from ring");
> - opaque = evtim->impl_opaque[0];
> - tim = (struct rte_timer *)(uintptr_t)opaque;
> - RTE_ASSERT(tim != NULL);
> -
> - ret = rte_timer_stop(tim);
> - RTE_ASSERT(ret == 0);
> -
> - /* Free the msg object for the original arm
> - * request.
> - */
> - struct msg *m;
> - m = container_of(tim, struct msg, tim);
> - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
> - msgs);
> - rte_mempool_put(sw_data->msg_pool, m);
> -
> - /* Free the msg object for the current msg */
> - rte_mempool_put(sw_data->msg_pool, msg);
> -
> - evtim->impl_opaque[0] = 0;
> - evtim->impl_opaque[1] = 0;
> -
> - break;
> - }
> - }
> - }
> -
> - sw_data->service_phase = 2;
> - rte_smp_wmb();
>
> - if (adapter_did_tick(adapter)) {
> - rte_timer_manage();
> + if (swtim_did_tick(sw)) {
> + /* This lock is seldom acquired on the arm side */
> + rte_spinlock_lock(&sw->poll_lcores_sl);
> + rte_timer_alt_manage(sw->timer_data_id,
> + sw->poll_lcores,
> + sw->n_poll_lcores,
> + swtim_callback);
> + rte_spinlock_unlock(&sw->poll_lcores_sl);
>
> - event_buffer_flush(&sw_data->buffer,
> + event_buffer_flush(&sw->buffer,
> adapter->data->event_dev_id,
> adapter->data->event_port_id,
> - &nb_evs_flushed, &nb_evs_invalid);
> + &nb_evs_flushed,
> + &nb_evs_invalid);
>
> - sw_data->stats.ev_enq_count += nb_evs_flushed;
> - sw_data->stats.ev_inv_count += nb_evs_invalid;
> - sw_data->stats.adapter_tick_count++;
> + sw->stats.ev_enq_count += nb_evs_flushed;
> + sw->stats.ev_inv_count += nb_evs_invalid;
> + sw->stats.adapter_tick_count++;
> }
>
> - sw_data->service_phase = 0;
> - rte_smp_wmb();
> -
> return 0;
> }
>
> @@ -828,168 +737,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)
> return cache_size;
> }
>
> -#define SW_MIN_INTERVAL 1E5
> -
> static int
> -sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
> +swtim_init(struct rte_event_timer_adapter *adapter)
> {
> - int ret;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - uint64_t nb_timers;
> + int i, ret;
> + struct swtim *sw;
> unsigned int flags;
> struct rte_service_spec service;
> - static bool timer_subsystem_inited; // static initialized to false
>
> - /* Allocate storage for SW implementation data */
> - char priv_data_name[RTE_RING_NAMESIZE];
> - snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
> - adapter->data->id);
> - adapter->data->adapter_priv = rte_zmalloc_socket(
> - priv_data_name,
> - sizeof(struct rte_event_timer_adapter_sw_data),
> - RTE_CACHE_LINE_SIZE,
> - adapter->data->socket_id);
> - if (adapter->data->adapter_priv == NULL) {
> + /* Allocate storage for private data area */
> +#define SWTIM_NAMESIZE 32
> + char swtim_name[SWTIM_NAMESIZE];
> + snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8,
> + adapter->data->id);
> + sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,
> + adapter->data->socket_id);
> + if (sw == NULL) {
> EVTIM_LOG_ERR("failed to allocate space for private data");
> rte_errno = ENOMEM;
> return -1;
> }
>
> - if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {
> - EVTIM_LOG_ERR("failed to create adapter with requested tick "
> - "interval");
> - rte_errno = EINVAL;
> - return -1;
> - }
> -
> - sw_data = adapter->data->adapter_priv;
> -
> - sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;
> - sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;
> + /* Connect storage to adapter instance */
> + adapter->data->adapter_priv = sw;
> + sw->adapter = adapter;
>
> - TAILQ_INIT(&sw_data->msgs_tailq_head);
> - rte_spinlock_init(&sw_data->msgs_tailq_sl);
> - rte_atomic16_init(&sw_data->message_producer_count);
> -
> - /* Rings require power of 2, so round up to next such value */
> - nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
> -
> - char msg_ring_name[RTE_RING_NAMESIZE];
> - snprintf(msg_ring_name, RTE_RING_NAMESIZE,
> - "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
> - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
> - RING_F_SP_ENQ | RING_F_SC_DEQ :
> - RING_F_SC_DEQ;
> - sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
> - adapter->data->socket_id, flags);
> - if (sw_data->msg_ring == NULL) {
> - EVTIM_LOG_ERR("failed to create message ring");
> - rte_errno = ENOMEM;
> - goto free_priv_data;
> - }
> + sw->timer_tick_ns = adapter->data->conf.timer_tick_ns;
> + sw->max_tmo_ns = adapter->data->conf.max_tmo_ns;
>
> - char pool_name[RTE_RING_NAMESIZE];
> - snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
> + /* Create a timer pool */
> + char pool_name[SWTIM_NAMESIZE];
> + snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8,
> adapter->data->id);
> -
> - /* Both the arming/canceling thread and the service thread will do puts
> - * to the mempool, but if the SP_PUT flag is enabled, we can specify
> - * single-consumer get for the mempool.
> - */
> - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
> - MEMPOOL_F_SC_GET : 0;
> -
> - /* The usable size of a ring is count - 1, so subtract one here to
> - * make the counts agree.
> - */
> + /* Optimal mempool size is a power of 2 minus one */
> + uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
> int pool_size = nb_timers - 1;
> int cache_size = compute_msg_mempool_cache_size(
> adapter->data->conf.nb_timers, nb_timers);
> - sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
> - sizeof(struct msg), cache_size,
> - 0, NULL, NULL, NULL, NULL,
> - adapter->data->socket_id, flags);
> - if (sw_data->msg_pool == NULL) {
> - EVTIM_LOG_ERR("failed to create message object mempool");
> + flags = 0; /* pool is multi-producer, multi-consumer */
> + sw->tim_pool = rte_mempool_create(pool_name, pool_size,
> + sizeof(struct rte_timer), cache_size, 0, NULL, NULL,
> + NULL, NULL, adapter->data->socket_id, flags);
> + if (sw->tim_pool == NULL) {
> + EVTIM_LOG_ERR("failed to create timer object mempool");
> rte_errno = ENOMEM;
> - goto free_msg_ring;
> + goto free_alloc;
> + }
> +
> + /* Initialize the variables that track in-use timer lists */
> + rte_spinlock_init(&sw->poll_lcores_sl);
> + for (i = 0; i < RTE_MAX_LCORE; i++)
> + rte_atomic16_init(&sw->in_use[i]);
> +
> + /* Initialize the timer subsystem and allocate timer data instance */
> + ret = rte_timer_subsystem_init();
> + if (ret < 0) {
> + if (ret != -EALREADY) {
> + EVTIM_LOG_ERR("failed to initialize timer subsystem");
> + rte_errno = ret;
> + goto free_mempool;
> + }
> + }
> +
> + ret = rte_timer_data_alloc(&sw->timer_data_id);
> + if (ret < 0) {
> + EVTIM_LOG_ERR("failed to allocate timer data instance");
> + rte_errno = ret;
> + goto free_mempool;
> }
>
> - event_buffer_init(&sw_data->buffer);
> + /* Initialize timer event buffer */
> + event_buffer_init(&sw->buffer);
> +
> + sw->adapter = adapter;
>
> /* Register a service component to run adapter logic */
> memset(&service, 0, sizeof(service));
> snprintf(service.name, RTE_SERVICE_NAME_MAX,
> - "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
> + "swtim_svc_%"PRIu8, adapter->data->id);
> service.socket_id = adapter->data->socket_id;
> - service.callback = sw_event_timer_adapter_service_func;
> + service.callback = swtim_service_func;
> service.callback_userdata = adapter;
> service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
> - ret = rte_service_component_register(&service, &sw_data->service_id);
> + ret = rte_service_component_register(&service, &sw->service_id);
> if (ret < 0) {
> EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
> - ": err = %d", service.name, sw_data->service_id,
> + ": err = %d", service.name, sw->service_id,
> ret);
>
> rte_errno = ENOSPC;
> - goto free_msg_pool;
> + goto free_mempool;
> }
>
> EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
> - sw_data->service_id);
> + sw->service_id);
>
> - adapter->data->service_id = sw_data->service_id;
> + adapter->data->service_id = sw->service_id;
> adapter->data->service_inited = 1;
>
> - if (!timer_subsystem_inited) {
> - rte_timer_subsystem_init();
> - timer_subsystem_inited = true;
> - }
> -
> return 0;
> -
> -free_msg_pool:
> - rte_mempool_free(sw_data->msg_pool);
> -free_msg_ring:
> - rte_ring_free(sw_data->msg_ring);
> -free_priv_data:
> - rte_free(sw_data);
> +free_mempool:
> + rte_mempool_free(sw->tim_pool);
> +free_alloc:
> + rte_free(sw);
> return -1;
> }
>
> -static int
> -sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
> +static void
> +swtim_free_tim(struct rte_timer *tim, void *arg)
> {
> - int ret;
> - struct msg *m1, *m2;
> - struct rte_event_timer_adapter_sw_data *sw_data =
> - adapter->data->adapter_priv;
> + struct swtim *sw = arg;
>
> - rte_spinlock_lock(&sw_data->msgs_tailq_sl);
> -
> - /* Cancel outstanding rte_timers and free msg objects */
> - m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
> - while (m1 != NULL) {
> - EVTIM_LOG_DBG("freeing outstanding timer");
> - m2 = TAILQ_NEXT(m1, msgs);
> -
> - rte_timer_stop_sync(&m1->tim);
> - rte_mempool_put(sw_data->msg_pool, m1);
> + rte_mempool_put(sw->tim_pool, (void *)tim);
> +}
No cast required.
>
> - m1 = m2;
> - }
> +/* Traverse the list of outstanding timers and put them back in the mempool
> + * before freeing the adapter to avoid leaking the memory.
> + */
> +static int
> +swtim_uninit(struct rte_event_timer_adapter *adapter)
> +{
> + int ret;
> + struct swtim *sw = swtim_pmd_priv(adapter);
>
> - rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
> + /* Free outstanding timers */
> + rte_timer_stop_all(sw->timer_data_id,
> + sw->poll_lcores,
> + sw->n_poll_lcores,
> + swtim_free_tim,
> + sw);
>
> - ret = rte_service_component_unregister(sw_data->service_id);
> + ret = rte_service_component_unregister(sw->service_id);
> if (ret < 0) {
> EVTIM_LOG_ERR("failed to unregister service component");
> return ret;
> }
>
> - rte_ring_free(sw_data->msg_ring);
> - rte_mempool_free(sw_data->msg_pool);
> - rte_free(adapter->data->adapter_priv);
> + rte_mempool_free(sw->tim_pool);
> + rte_free(sw);
> + adapter->data->adapter_priv = NULL;
>
> return 0;
> }
> @@ -1010,88 +896,79 @@ get_mapped_count_for_service(uint32_t service_id)
> }
>
> static int
> -sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
> +swtim_start(const struct rte_event_timer_adapter *adapter)
> {
> int mapped_count;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> -
> - sw_data = adapter->data->adapter_priv;
> + struct swtim *sw = swtim_pmd_priv(adapter);
>
> /* Mapping the service to more than one service core can introduce
> * delays while one thread is waiting to acquire a lock, so only allow
> * one core to be mapped to the service.
> + *
> + * Note: the service could be modified such that it spreads cores to
> + * poll over multiple service instances.
> */
> - mapped_count = get_mapped_count_for_service(sw_data->service_id);
> + mapped_count = get_mapped_count_for_service(sw->service_id);
>
> - if (mapped_count == 1)
> - return rte_service_component_runstate_set(sw_data->service_id,
> - 1);
> + if (mapped_count != 1)
> + return mapped_count < 1 ? -ENOENT : -ENOTSUP;
>
> - return mapped_count < 1 ? -ENOENT : -ENOTSUP;
> + return rte_service_component_runstate_set(sw->service_id, 1);
> }
>
> static int
> -sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
> +swtim_stop(const struct rte_event_timer_adapter *adapter)
> {
> int ret;
> - struct rte_event_timer_adapter_sw_data *sw_data =
> - adapter->data->adapter_priv;
> + struct swtim *sw = swtim_pmd_priv(adapter);
>
> - ret = rte_service_component_runstate_set(sw_data->service_id, 0);
> + ret = rte_service_component_runstate_set(sw->service_id, 0);
> if (ret < 0)
> return ret;
>
> - /* Wait for the service to complete its final iteration before
> - * stopping.
> - */
> - while (sw_data->service_phase != 0)
> + /* Wait for the service to complete its final iteration */
> + while (rte_service_may_be_active(sw->service_id))
> rte_pause();
>
> - rte_smp_rmb();
> -
> return 0;
> }
>
> static void
> -sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
> +swtim_get_info(const struct rte_event_timer_adapter *adapter,
> struct rte_event_timer_adapter_info *adapter_info)
> {
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - sw_data = adapter->data->adapter_priv;
> -
> - adapter_info->min_resolution_ns = sw_data->timer_tick_ns;
> - adapter_info->max_tmo_ns = sw_data->max_tmo_ns;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + adapter_info->min_resolution_ns = sw->timer_tick_ns;
> + adapter_info->max_tmo_ns = sw->max_tmo_ns;
> }
>
> static int
> -sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer_adapter_stats *stats)
> +swtim_stats_get(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer_adapter_stats *stats)
> {
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - sw_data = adapter->data->adapter_priv;
> - *stats = sw_data->stats;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + *stats = sw->stats; /* structure copy */
> return 0;
> }
>
> static int
> -sw_event_timer_adapter_stats_reset(
> - const struct rte_event_timer_adapter *adapter)
> +swtim_stats_reset(const struct rte_event_timer_adapter *adapter)
> {
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - sw_data = adapter->data->adapter_priv;
> - memset(&sw_data->stats, 0, sizeof(sw_data->stats));
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + memset(&sw->stats, 0, sizeof(sw->stats));
> return 0;
> }
>
> -static __rte_always_inline uint16_t
> -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer **evtims,
> - uint16_t nb_evtims)
> +static uint16_t
> +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer **evtims,
> + uint16_t nb_evtims)
> {
> - uint16_t i;
> - int ret;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - struct msg *msgs[nb_evtims];
> + int i, ret;
> + struct swtim *sw = swtim_pmd_priv(adapter);
> + uint32_t lcore_id = rte_lcore_id();
> + struct rte_timer *tim, *tims[nb_evtims];
> + uint64_t cycles;
>
> #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
> /* Check that the service is running. */
> @@ -1101,101 +978,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> }
> #endif
>
> - sw_data = adapter->data->adapter_priv;
> + /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
> + * the highest lcore to insert such timers into
> + */
> + if (lcore_id == LCORE_ID_ANY)
> + lcore_id = RTE_MAX_LCORE - 1;
> +
> + /* If this is the first time we're arming an event timer on this lcore,
> + * mark this lcore as "in use"; this will cause the service
> + * function to process the timer list that corresponds to this lcore.
> + */
> + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id]))) {
I suspect we have a performance critical false sharing issue above.
Many/all flags are going to be arranged on the same cache line.
> + rte_spinlock_lock(&sw->poll_lcores_sl);
> + EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
> + lcore_id);
> + sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
> + rte_spinlock_unlock(&sw->poll_lcores_sl);
> + }
>
> - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> + ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
> + nb_evtims);
> if (ret < 0) {
> rte_errno = ENOSPC;
> return 0;
> }
>
> - /* Let the service know we're producing messages for it to process */
> - rte_atomic16_inc(&sw_data->message_producer_count);
> -
> - /* If the service is managing timers, wait for it to finish */
> - while (sw_data->service_phase == 2)
> - rte_pause();
> -
> - rte_smp_rmb();
> -
> for (i = 0; i < nb_evtims; i++) {
> /* Don't modify the event timer state in these cases */
> if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
> rte_errno = EALREADY;
> break;
> } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
> - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> + evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
> rte_errno = EINVAL;
> break;
> }
>
> ret = check_timeout(evtims[i], adapter);
> - if (ret == -1) {
> + if (unlikely(ret == -1)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
> rte_errno = EINVAL;
> break;
> - }
> - if (ret == -2) {
> + } else if (unlikely(ret == -2)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
> rte_errno = EINVAL;
> break;
> }
>
> - if (check_destination_event_queue(evtims[i], adapter) < 0) {
> + if (unlikely(check_destination_event_queue(evtims[i],
> + adapter) < 0)) {
> evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> rte_errno = EINVAL;
> break;
> }
>
> - /* Checks passed, set up a message to enqueue */
> - msgs[i]->type = MSG_TYPE_ARM;
> - msgs[i]->evtim = evtims[i];
> + tim = tims[i];
> + rte_timer_init(tim);
>
> - /* Set the payload pointer if not set. */
> - if (evtims[i]->ev.event_ptr == NULL)
> - evtims[i]->ev.event_ptr = evtims[i];
> + evtims[i]->impl_opaque[0] = (uintptr_t)tim;
> + evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
>
> - /* msg objects that get enqueued successfully will be freed
> - * either by a future cancel operation or by the timer
> - * expiration callback.
> - */
> - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> - rte_errno = ENOSPC;
> + cycles = get_timeout_cycles(evtims[i], adapter);
> + ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
> + SINGLE, lcore_id, NULL, evtims[i]);
> + if (ret < 0) {
> + /* tim was in RUNNING or CONFIG state */
> + evtims[i]->state = RTE_EVENT_TIMER_ERROR;
> break;
> }
>
> - EVTIM_LOG_DBG("enqueued ARM message to ring");
> -
> + rte_smp_wmb();
> + EVTIM_LOG_DBG("armed an event timer");
> evtims[i]->state = RTE_EVENT_TIMER_ARMED;
> }
>
> - /* Let the service know we're done producing messages */
> - rte_atomic16_dec(&sw_data->message_producer_count);
> -
> if (i < nb_evtims)
> - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
> - nb_evtims - i);
> + rte_mempool_put_bulk(sw->tim_pool,
> + (void **)&tims[i], nb_evtims - i);
>
> return i;
> }
>
> static uint16_t
> -sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer **evtims,
> - uint16_t nb_evtims)
> +swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer **evtims,
> + uint16_t nb_evtims)
> {
> - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
> + return __swtim_arm_burst(adapter, evtims, nb_evtims);
> }
>
> static uint16_t
> -sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer **evtims,
> - uint16_t nb_evtims)
> +swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer **evtims,
> + uint16_t nb_evtims)
> {
> - uint16_t i;
> - int ret;
> - struct rte_event_timer_adapter_sw_data *sw_data;
> - struct msg *msgs[nb_evtims];
> + int i, ret;
> + struct rte_timer *timp;
> + uint64_t opaque;
> + struct swtim *sw = swtim_pmd_priv(adapter);
>
> #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
> /* Check that the service is running. */
> @@ -1205,23 +1085,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
> }
> #endif
>
> - sw_data = adapter->data->adapter_priv;
> -
> - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
> - if (ret < 0) {
> - rte_errno = ENOSPC;
> - return 0;
> - }
> -
> - /* Let the service know we're producing messages for it to process */
> - rte_atomic16_inc(&sw_data->message_producer_count);
> -
> - /* If the service could be modifying event timer states, wait */
> - while (sw_data->service_phase == 2)
> - rte_pause();
> -
> - rte_smp_rmb();
> -
> for (i = 0; i < nb_evtims; i++) {
> /* Don't modify the event timer state in these cases */
> if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
> @@ -1232,54 +1095,54 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
> break;
> }
>
> - msgs[i]->type = MSG_TYPE_CANCEL;
> - msgs[i]->evtim = evtims[i];
> + opaque = evtims[i]->impl_opaque[0];
> + timp = (struct rte_timer *)(uintptr_t)opaque;
> + RTE_ASSERT(timp != NULL);
>
> - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
> - rte_errno = ENOSPC;
> + ret = rte_timer_alt_stop(sw->timer_data_id, timp);
> + if (ret < 0) {
> + /* Timer is running or being configured */
> + rte_errno = EAGAIN;
> break;
> }
>
> - EVTIM_LOG_DBG("enqueued CANCEL message to ring");
> + rte_mempool_put(sw->tim_pool, (void **)timp);
>
> evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
> - }
> + evtims[i]->impl_opaque[0] = 0;
> + evtims[i]->impl_opaque[1] = 0;
>
> - /* Let the service know we're done producing messages */
> - rte_atomic16_dec(&sw_data->message_producer_count);
> -
> - if (i < nb_evtims)
> - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
> - nb_evtims - i);
> + rte_smp_wmb();
> + }
>
> return i;
> }
>
> static uint16_t
> -sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
> - struct rte_event_timer **evtims,
> - uint64_t timeout_ticks,
> - uint16_t nb_evtims)
> +swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,
> + struct rte_event_timer **evtims,
> + uint64_t timeout_ticks,
> + uint16_t nb_evtims)
> {
> int i;
>
> for (i = 0; i < nb_evtims; i++)
> evtims[i]->timeout_ticks = timeout_ticks;
>
> - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
> + return __swtim_arm_burst(adapter, evtims, nb_evtims);
> }
>
> -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
> - .init = sw_event_timer_adapter_init,
> - .uninit = sw_event_timer_adapter_uninit,
> - .start = sw_event_timer_adapter_start,
> - .stop = sw_event_timer_adapter_stop,
> - .get_info = sw_event_timer_adapter_get_info,
> - .stats_get = sw_event_timer_adapter_stats_get,
> - .stats_reset = sw_event_timer_adapter_stats_reset,
> - .arm_burst = sw_event_timer_arm_burst,
> - .arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
> - .cancel_burst = sw_event_timer_cancel_burst,
> +static const struct rte_event_timer_adapter_ops swtim_ops = {
> + .init = swtim_init,
> + .uninit = swtim_uninit,
> + .start = swtim_start,
> + .stop = swtim_stop,
> + .get_info = swtim_get_info,
> + .stats_get = swtim_stats_get,
> + .stats_reset = swtim_stats_reset,
> + .arm_burst = swtim_arm_burst,
> + .arm_tmo_tick_burst = swtim_arm_tmo_tick_burst,
> + .cancel_burst = swtim_cancel_burst,
> };
>
> RTE_INIT(event_timer_adapter_init_log)
>
More information about the dev
mailing list