[dpdk-dev] [PATCH v5 09/20] event/sw: add worker core functions

Jerin Jacob jerin.jacob at caviumnetworks.com
Mon Mar 27 15:50:50 CEST 2017


On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson at intel.com>
> 
> add the event enqueue, dequeue and release functions to the eventdev.
> These also include tracking of stats for observability in the load of
> the scheduler.
> Internally in the enqueue function, the various types of enqueue
> operations, to forward an existing event, to send a new event, to
> drop a previous event, are converted to a series of flags which will
> be used by the scheduler code to perform the needed actions for that
> event.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
> ---
>  drivers/event/sw/Makefile          |   1 +
>  drivers/event/sw/sw_evdev.c        |   5 +
>  drivers/event/sw/sw_evdev.h        |  32 +++++++
>  drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
>  4 files changed, 226 insertions(+)
>  create mode 100644 drivers/event/sw/sw_evdev_worker.c
> 
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index d6836e3..b6ecd91 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
>  
>  # library source files
>  SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
>  
>  # export include files
>  SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 82ac3bd..9b2816d 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -412,6 +412,7 @@ sw_dev_configure(const struct rte_eventdev *dev)
>  	sw->qid_count = conf->nb_event_queues;
>  	sw->port_count = conf->nb_event_ports;
>  	sw->nb_events_limit = conf->nb_events_limit;
> +	rte_atomic32_set(&sw->inflights, 0);
>  
>  	return 0;
>  }
> @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params)
>  		return -EFAULT;
>  	}
>  	dev->dev_ops = &evdev_sw_ops;
> +	dev->enqueue = sw_event_enqueue;
> +	dev->enqueue_burst = sw_event_enqueue_burst;
> +	dev->dequeue = sw_event_dequeue;
> +	dev->dequeue_burst = sw_event_dequeue_burst;

Is all the code in the sw_probe() valid for multi process? If not, after
function pointer assignment it can return[1] from sw_probe. Just like
another PMD's, we will support configuration API and fastpath API in primary
process and secondary process will be limited to fast path functions.

[1]
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
		return 0;

>  
>  	sw = dev->data->dev_private;
>  	sw->data = dev->data;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index f5515e1..ab372fd 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -55,12 +55,36 @@
>  #define SCHED_DEQUEUE_BURST_SIZE 32
>  
> +
> +static inline void
> +sw_event_release(struct sw_port *p, uint8_t index)
> +{
> +	/*
> +	 * Drops the next outstanding event in our history. Used on dequeue
> +	 * to clear any history before dequeuing more events.
> +	 */
> +	RTE_SET_USED(index);
> +
> +	/* create drop message */
> +	struct rte_event ev = {
> +		.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
> +	};
> +
> +	uint16_t free_count;
> +	qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
> +
> +	/* each release returns one credit */
> +	p->outstanding_releases--;
> +	p->inflight_credits++;
> +}
> +
> +uint16_t
> +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
> +{
> +	int32_t i;
> +	uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
> +	struct sw_port *p = port;
> +	struct sw_evdev *sw = (void *)p->sw;
> +	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
> +
> +	if (p->inflight_max < sw_inflights)
> +		return 0;

likely and unlikely attributes are missing in fastpath functions.
Worth to consider in using those in worker file.

> +	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
> +		num = PORT_ENQUEUE_MAX_BURST_SIZE;
> +
> +	if (p->inflight_credits < num) {
> +		/* Check if sending events would bring instance over the
> +		 * max events threshold
> +		 */
> +		uint32_t credit_update_quanta = sw->credit_update_quanta;
> +		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
> +			return 0;
> +
> +		rte_atomic32_add(&sw->inflights, credit_update_quanta);
> +		p->inflight_credits += (credit_update_quanta);
> +
> +		if (p->inflight_credits < num)
> +			return 0;
> +	}
> +
> +	for (i = 0; i < num; i++) {
> +		int op = ev[i].op;
> +		int outstanding = p->outstanding_releases > 0;
> +		const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
> +
> +		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
> +		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
> +					outstanding;
> +
> +		new_ops[i] = sw_qe_flag_map[op];
> +		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
> +
> +		/* FWD and RELEASE packets will both resolve to taken (assuming
> +		 * correct usage of the API), providing very high correct
> +		 * prediction rate.
> +		 */
> +		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
> +			p->outstanding_releases--;
> +		/* Branch to avoid touching p->stats except error case */
> +		if (invalid_qid)
> +			p->stats.rx_dropped++;
> +	}
> +
> +	/* returns number of events actually enqueued */
> +	uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
> +					     new_ops);
> +	if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
> +		uint64_t burst_ticks = rte_get_timer_cycles() -
> +				p->last_dequeue_ticks;
> +		uint64_t burst_pkt_ticks =
> +			burst_ticks / p->last_dequeue_burst_sz;
> +		p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
> +		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
> +		p->last_dequeue_ticks = 0;
> +	}
> +	return enq;
> +}
> +
> +uint16_t
> +sw_event_enqueue(void *port, const struct rte_event *ev)
> +{
> +	return sw_event_enqueue_burst(port, ev, 1);
> +}
> +
> +uint16_t
> +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> +		uint64_t wait)
> +{
> +	RTE_SET_USED(wait);
> +	struct sw_port *p = (void *)port;
> +	struct sw_evdev *sw = (void *)p->sw;
> +	struct qe_ring *ring = p->cq_worker_ring;
> +	uint32_t credit_update_quanta = sw->credit_update_quanta;
> +
> +	/* check that all previous dequeues have been released */
> +	if (!p->is_directed) {
> +		uint16_t out_rels = p->outstanding_releases;
> +		uint16_t i;
> +		for (i = 0; i < out_rels; i++)
> +			sw_event_release(p, i);
> +	}
> +
> +	/* Intel modification: may not be in final API */
> +	if (ev == 0)
> +		return 0;

May be we can remove this one in fastpath. Maybe under DEBUG in common code
we can add this.

> +
> +	/* returns number of events actually dequeued */
> +	uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
> +	if (ndeq == 0) {
> +		p->outstanding_releases = 0;
> +		p->zero_polls++;
> +		p->total_polls++;
> +		goto end;
> +	}
> +
> +	/* only add credits for directed ports - LB ports send RELEASEs */
> +	p->inflight_credits += ndeq * p->is_directed;
> +	p->outstanding_releases = ndeq;
> +	p->last_dequeue_burst_sz = ndeq;
> +	p->last_dequeue_ticks = rte_get_timer_cycles();
> +	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
> +	p->total_polls++;
> +
> +end:
> +	if (p->inflight_credits >= credit_update_quanta * 2 &&
> +			p->inflight_credits > credit_update_quanta + ndeq) {
> +		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
> +		p->inflight_credits -= credit_update_quanta;
> +	}
> +	return ndeq;
> +}
> +
> +uint16_t
> +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
> +{
> +	return sw_event_dequeue_burst(port, ev, 1, wait);
> +}
> -- 
> 2.7.4
> 


More information about the dev mailing list