[dpdk-dev] [PATCH v3 12/17] event/sw: add worker core functions

Harry van Haaren harry.van.haaren at intel.com
Fri Feb 17 15:54:07 CET 2017


From: Bruce Richardson <bruce.richardson at intel.com>

add the event enqueue, dequeue and release functions to the eventdev.
These also include tracking of stats for observability in the load of
the scheduler.
Internally in the enqueue function, the various types of enqueue
operations, to forward an existing event, to send a new event, to
drop a previous event, are converted to a series of flags which will
be used by the scheduler code to perform the needed actions for that
event.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
Signed-off-by: Gage Eads <gage.eads at intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
---
 drivers/event/sw/Makefile          |   1 +
 drivers/event/sw/sw_evdev.c        |   5 +
 drivers/event/sw/sw_evdev.h        |  34 +++++++
 drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
 4 files changed, 228 insertions(+)
 create mode 100644 drivers/event/sw/sw_evdev_worker.c

diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
index d6836e3..b6ecd91 100644
--- a/drivers/event/sw/Makefile
+++ b/drivers/event/sw/Makefile
@@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
 
 # library source files
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
 
 # export include files
 SYMLINK-y-include +=
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index b809d5d..adff729 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -387,6 +387,7 @@ sw_dev_configure(const struct rte_eventdev *dev)
 	sw->qid_count = conf->nb_event_queues;
 	sw->port_count = conf->nb_event_ports;
 	sw->nb_events_limit = conf->nb_events_limit;
+	rte_atomic32_set(&sw->inflights, 0);
 
 	return 0;
 }
@@ -525,6 +526,10 @@ sw_probe(const char *name, const char *params)
 		return -EFAULT;
 	}
 	dev->dev_ops = &evdev_sw_ops;
+	dev->enqueue = sw_event_enqueue;
+	dev->enqueue_burst = sw_event_enqueue_burst;
+	dev->dequeue = sw_event_dequeue;
+	dev->dequeue_burst = sw_event_dequeue_burst;
 
 	sw = dev->data->dev_private;
 	sw->data = dev->data;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 1bedd63..ab372fd 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -55,12 +55,36 @@
 #define SCHED_DEQUEUE_BURST_SIZE 32
 
 #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
+#define NUM_SAMPLES 64 /* how many data points use for average stats */
 
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define SW_PMD_NAME RTE_STR(event_sw)
 
 #define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
 
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t sw_qe_flag_map[] = {
+		QE_FLAG_VALID /* NEW Event */,
+		QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+		QE_FLAG_COMPLETE /* RELEASE Event */,
+
+		/* Values which can be used for future support for partial
+		 * events, i.e. where one event comes back to the scheduler
+		 * as multiple which need to be tracked together
+		 */
+		QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
 #ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
 #define SW_LOG_INFO(fmt, args...) \
 	RTE_LOG(INFO, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
@@ -210,6 +234,8 @@ struct sw_evdev {
 	/* Contains all ports - load balanced and directed */
 	struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
 
+	rte_atomic32_t inflights __rte_cache_aligned;
+
 	/*
 	 * max events in this instance. Cached here for performance.
 	 * (also available in data->conf.nb_events_limit)
@@ -239,4 +265,12 @@ sw_pmd_priv_const(const struct rte_eventdev *eventdev)
 	return eventdev->data->dev_private;
 }
 
+uint16_t sw_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+			uint64_t wait);
+
 #endif /* _SW_EVDEV_H_ */
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
new file mode 100644
index 0000000..aed1597
--- /dev/null
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -0,0 +1,188 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+
+#include "sw_evdev.h"
+#include "event_ring.h"
+
+#define PORT_ENQUEUE_MAX_BURST_SIZE 64
+
+static inline void
+sw_event_release(struct sw_port *p, uint8_t index)
+{
+	/*
+	 * Drops the next outstanding event in our history. Used on dequeue
+	 * to clear any history before dequeuing more events.
+	 */
+	RTE_SET_USED(index);
+
+	/* create drop message */
+	struct rte_event ev = {
+		.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
+	};
+
+	uint16_t free_count;
+	qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
+
+	/* each release returns one credit */
+	p->outstanding_releases--;
+	p->inflight_credits++;
+}
+
+uint16_t
+sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
+{
+	int32_t i;
+	uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
+	struct sw_port *p = port;
+	struct sw_evdev *sw = (void *)p->sw;
+	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
+
+	if (p->inflight_max < sw_inflights)
+		return 0;
+	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
+		num = PORT_ENQUEUE_MAX_BURST_SIZE;
+
+	if (p->inflight_credits < num) {
+		/* Check if sending events would bring instance over the
+		 * max events threshold
+		 */
+		uint32_t credit_update_quanta = sw->credit_update_quanta;
+		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
+			return 0;
+
+		rte_atomic32_add(&sw->inflights, credit_update_quanta);
+		p->inflight_credits += (credit_update_quanta);
+
+		if (p->inflight_credits < num)
+			return 0;
+	}
+
+	for (i = 0; i < num; i++) {
+		int op = ev[i].op;
+		int outstanding = p->outstanding_releases > 0;
+		const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
+
+		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
+		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
+					outstanding;
+
+		new_ops[i] = sw_qe_flag_map[op];
+		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
+
+		/* FWD and RELEASE packets will both resolve to taken (assuming
+		 * correct usage of the API), providing very high correct
+		 * prediction rate.
+		 */
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+			p->outstanding_releases--;
+		/* Branch to avoid touching p->stats except error case */
+		if (invalid_qid)
+			p->stats.rx_dropped++;
+	}
+
+	/* returns number of events actually enqueued */
+	uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
+					     new_ops);
+	if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
+		uint64_t burst_ticks = rte_get_timer_cycles() -
+				p->last_dequeue_ticks;
+		uint64_t burst_pkt_ticks =
+			burst_ticks / p->last_dequeue_burst_sz;
+		p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
+		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
+		p->last_dequeue_ticks = 0;
+	}
+	return enq;
+}
+
+uint16_t
+sw_event_enqueue(void *port, const struct rte_event *ev)
+{
+	return sw_event_enqueue_burst(port, ev, 1);
+}
+
+uint16_t
+sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+		uint64_t wait)
+{
+	RTE_SET_USED(wait);
+	struct sw_port *p = (void *)port;
+	struct sw_evdev *sw = (void *)p->sw;
+	struct qe_ring *ring = p->cq_worker_ring;
+	uint32_t credit_update_quanta = sw->credit_update_quanta;
+
+	/* check that all previous dequeues have been released */
+	if (!p->is_directed) {
+		uint16_t out_rels = p->outstanding_releases;
+		uint16_t i;
+		for (i = 0; i < out_rels; i++)
+			sw_event_release(p, i);
+	}
+
+	/* Intel modification: may not be in final API */
+	if (ev == 0)
+		return 0;
+
+	/* returns number of events actually dequeued */
+	uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
+	if (ndeq == 0) {
+		p->outstanding_releases = 0;
+		p->zero_polls++;
+		p->total_polls++;
+		goto end;
+	}
+
+	/* only add credits for directed ports - LB ports send RELEASEs */
+	p->inflight_credits += ndeq * p->is_directed;
+	p->outstanding_releases = ndeq;
+	p->last_dequeue_burst_sz = ndeq;
+	p->last_dequeue_ticks = rte_get_timer_cycles();
+	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
+	p->total_polls++;
+
+end:
+	if (p->inflight_credits >= credit_update_quanta * 2 &&
+			p->inflight_credits > credit_update_quanta + ndeq) {
+		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+		p->inflight_credits -= credit_update_quanta;
+	}
+	return ndeq;
+}
+
+uint16_t
+sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
+{
+	return sw_event_dequeue_burst(port, ev, 1, wait);
+}
-- 
2.7.4



More information about the dev mailing list