[dpdk-dev] [PATCH v5 09/20] event/sw: add worker core functions
Jerin Jacob
jerin.jacob at caviumnetworks.com
Mon Mar 27 15:50:50 CEST 2017
On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson at intel.com>
>
> add the event enqueue, dequeue and release functions to the eventdev.
> These also include tracking of stats for observability in the load of
> the scheduler.
> Internally in the enqueue function, the various types of enqueue
> operations, to forward an existing event, to send a new event, to
> drop a previous event, are converted to a series of flags which will
> be used by the scheduler code to perform the needed actions for that
> event.
>
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
> ---
> drivers/event/sw/Makefile | 1 +
> drivers/event/sw/sw_evdev.c | 5 +
> drivers/event/sw/sw_evdev.h | 32 +++++++
> drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
> 4 files changed, 226 insertions(+)
> create mode 100644 drivers/event/sw/sw_evdev_worker.c
>
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index d6836e3..b6ecd91 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
>
> # library source files
> SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
>
> # export include files
> SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 82ac3bd..9b2816d 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -412,6 +412,7 @@ sw_dev_configure(const struct rte_eventdev *dev)
> sw->qid_count = conf->nb_event_queues;
> sw->port_count = conf->nb_event_ports;
> sw->nb_events_limit = conf->nb_events_limit;
> + rte_atomic32_set(&sw->inflights, 0);
>
> return 0;
> }
> @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params)
> return -EFAULT;
> }
> dev->dev_ops = &evdev_sw_ops;
> + dev->enqueue = sw_event_enqueue;
> + dev->enqueue_burst = sw_event_enqueue_burst;
> + dev->dequeue = sw_event_dequeue;
> + dev->dequeue_burst = sw_event_dequeue_burst;
Is all the code in the sw_probe() valid for multi process? If not, after
function pointer assignment it can return[1] from sw_probe. Just like
another PMD's, we will support configuration API and fastpath API in primary
process and secondary process will be limited to fast path functions.
[1]
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
>
> sw = dev->data->dev_private;
> sw->data = dev->data;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index f5515e1..ab372fd 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -55,12 +55,36 @@
> #define SCHED_DEQUEUE_BURST_SIZE 32
>
> +
> +static inline void
> +sw_event_release(struct sw_port *p, uint8_t index)
> +{
> + /*
> + * Drops the next outstanding event in our history. Used on dequeue
> + * to clear any history before dequeuing more events.
> + */
> + RTE_SET_USED(index);
> +
> + /* create drop message */
> + struct rte_event ev = {
> + .op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
> + };
> +
> + uint16_t free_count;
> + qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
> +
> + /* each release returns one credit */
> + p->outstanding_releases--;
> + p->inflight_credits++;
> +}
> +
> +uint16_t
> +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
> +{
> + int32_t i;
> + uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
> + struct sw_port *p = port;
> + struct sw_evdev *sw = (void *)p->sw;
> + uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
> +
> + if (p->inflight_max < sw_inflights)
> + return 0;
likely and unlikely attributes are missing in fastpath functions.
Worth to consider in using those in worker file.
> + if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
> + num = PORT_ENQUEUE_MAX_BURST_SIZE;
> +
> + if (p->inflight_credits < num) {
> + /* Check if sending events would bring instance over the
> + * max events threshold
> + */
> + uint32_t credit_update_quanta = sw->credit_update_quanta;
> + if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
> + return 0;
> +
> + rte_atomic32_add(&sw->inflights, credit_update_quanta);
> + p->inflight_credits += (credit_update_quanta);
> +
> + if (p->inflight_credits < num)
> + return 0;
> + }
> +
> + for (i = 0; i < num; i++) {
> + int op = ev[i].op;
> + int outstanding = p->outstanding_releases > 0;
> + const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
> +
> + p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
> + p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
> + outstanding;
> +
> + new_ops[i] = sw_qe_flag_map[op];
> + new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
> +
> + /* FWD and RELEASE packets will both resolve to taken (assuming
> + * correct usage of the API), providing very high correct
> + * prediction rate.
> + */
> + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
> + p->outstanding_releases--;
> + /* Branch to avoid touching p->stats except error case */
> + if (invalid_qid)
> + p->stats.rx_dropped++;
> + }
> +
> + /* returns number of events actually enqueued */
> + uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
> + new_ops);
> + if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
> + uint64_t burst_ticks = rte_get_timer_cycles() -
> + p->last_dequeue_ticks;
> + uint64_t burst_pkt_ticks =
> + burst_ticks / p->last_dequeue_burst_sz;
> + p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
> + p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
> + p->last_dequeue_ticks = 0;
> + }
> + return enq;
> +}
> +
> +uint16_t
> +sw_event_enqueue(void *port, const struct rte_event *ev)
> +{
> + return sw_event_enqueue_burst(port, ev, 1);
> +}
> +
> +uint16_t
> +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> + uint64_t wait)
> +{
> + RTE_SET_USED(wait);
> + struct sw_port *p = (void *)port;
> + struct sw_evdev *sw = (void *)p->sw;
> + struct qe_ring *ring = p->cq_worker_ring;
> + uint32_t credit_update_quanta = sw->credit_update_quanta;
> +
> + /* check that all previous dequeues have been released */
> + if (!p->is_directed) {
> + uint16_t out_rels = p->outstanding_releases;
> + uint16_t i;
> + for (i = 0; i < out_rels; i++)
> + sw_event_release(p, i);
> + }
> +
> + /* Intel modification: may not be in final API */
> + if (ev == 0)
> + return 0;
May be we can remove this one in fastpath. Maybe under DEBUG in common code
we can add this.
> +
> + /* returns number of events actually dequeued */
> + uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
> + if (ndeq == 0) {
> + p->outstanding_releases = 0;
> + p->zero_polls++;
> + p->total_polls++;
> + goto end;
> + }
> +
> + /* only add credits for directed ports - LB ports send RELEASEs */
> + p->inflight_credits += ndeq * p->is_directed;
> + p->outstanding_releases = ndeq;
> + p->last_dequeue_burst_sz = ndeq;
> + p->last_dequeue_ticks = rte_get_timer_cycles();
> + p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
> + p->total_polls++;
> +
> +end:
> + if (p->inflight_credits >= credit_update_quanta * 2 &&
> + p->inflight_credits > credit_update_quanta + ndeq) {
> + rte_atomic32_sub(&sw->inflights, credit_update_quanta);
> + p->inflight_credits -= credit_update_quanta;
> + }
> + return ndeq;
> +}
> +
> +uint16_t
> +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
> +{
> + return sw_event_dequeue_burst(port, ev, 1, wait);
> +}
> --
> 2.7.4
>
More information about the dev
mailing list