[dpdk-dev] [PATCH v2 17/34] app/testeventdev: order queue: add worker functions

Jerin Jacob jerin.jacob at caviumnetworks.com
Mon Jul 3 21:13:45 CEST 2017


Signed-off-by: Jerin Jacob <jerin.jacob at caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren at intel.com>
---
 app/test-eventdev/test_order_common.h |  47 ++++++++++++++++
 app/test-eventdev/test_order_queue.c  | 100 ++++++++++++++++++++++++++++++++++
 2 files changed, 147 insertions(+)

diff --git a/app/test-eventdev/test_order_common.h b/app/test-eventdev/test_order_common.h
index a760b94bd..88cb2acd9 100644
--- a/app/test-eventdev/test_order_common.h
+++ b/app/test-eventdev/test_order_common.h
@@ -90,6 +90,53 @@ order_nb_event_ports(struct evt_options *opt)
 	return evt_nr_active_lcores(opt->wlcores) + 1 /* producer */;
 }
 
+static inline __attribute__((always_inline)) void
+order_process_stage_1(struct test_order *const t,
+		struct rte_event *const ev, const uint32_t nb_flows,
+		uint32_t *const expected_flow_seq,
+		rte_atomic64_t *const outstand_pkts)
+{
+	const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
+	/* compare the seqn against expected value */
+	if (ev->mbuf->seqn != expected_flow_seq[flow]) {
+		evt_err("flow=%x seqn mismatch got=%x expected=%x",
+			flow, ev->mbuf->seqn, expected_flow_seq[flow]);
+		t->err = true;
+		rte_smp_wmb();
+	}
+	/*
+	 * Events from an atomic flow of an event queue can be scheduled only to
+	 * a single port at a time. The port is guaranteed to have exclusive
+	 * (atomic) access for given atomic flow.So we don't need to update
+	 * expected_flow_seq in critical section.
+	 */
+	expected_flow_seq[flow]++;
+	rte_pktmbuf_free(ev->mbuf);
+	rte_atomic64_sub(outstand_pkts, 1);
+}
+
+static inline __attribute__((always_inline)) void
+order_process_stage_invalid(struct test_order *const t,
+			struct rte_event *const ev)
+{
+	evt_err("invalid queue %d", ev->queue_id);
+	t->err = true;
+	rte_smp_wmb();
+}
+
+#define ORDER_WORKER_INIT\
+	struct worker_data *w  = arg;\
+	struct test_order *t = w->t;\
+	struct evt_options *opt = t->opt;\
+	const uint8_t dev_id = w->dev_id;\
+	const uint8_t port = w->port_id;\
+	const uint32_t nb_flows = t->nb_flows;\
+	uint32_t *expected_flow_seq = t->expected_flow_seq;\
+	rte_atomic64_t *outstand_pkts = &t->outstand_pkts;\
+	if (opt->verbose_level > 1)\
+		printf("%s(): lcore %d dev_id %d port=%d\n",\
+			__func__, rte_lcore_id(), dev_id, port)
+
 int order_test_result(struct evt_test *test, struct evt_options *opt);
 int order_opt_check(struct evt_options *opt);
 int order_test_setup(struct evt_test *test, struct evt_options *opt);
diff --git a/app/test-eventdev/test_order_queue.c b/app/test-eventdev/test_order_queue.c
index c4003efd2..232dcf26d 100644
--- a/app/test-eventdev/test_order_queue.c
+++ b/app/test-eventdev/test_order_queue.c
@@ -37,6 +37,105 @@
 
 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
 
+static inline __attribute__((always_inline)) void
+order_queue_process_stage_0(struct rte_event *const ev)
+{
+	ev->queue_id = 1; /* q1 atomic queue */
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+}
+
+static int
+order_queue_worker(void *arg)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev;
+
+	while (t->err == false) {
+		uint16_t event = rte_event_dequeue_burst(dev_id, port,
+					&ev, 1, 0);
+		if (!event) {
+			if (rte_atomic64_read(outstand_pkts) <= 0)
+				break;
+			rte_pause();
+			continue;
+		}
+
+		if (ev.queue_id == 0) { /* from ordered queue */
+			order_queue_process_stage_0(&ev);
+			while (rte_event_enqueue_burst(dev_id, port, &ev, 1)
+					!= 1)
+				rte_pause();
+		} else if (ev.queue_id == 1) { /* from atomic queue */
+			order_process_stage_1(t, &ev, nb_flows,
+					expected_flow_seq, outstand_pkts);
+		} else {
+			order_process_stage_invalid(t, &ev);
+		}
+	}
+	return 0;
+}
+
+static int
+order_queue_worker_burst(void *arg)
+{
+	ORDER_WORKER_INIT;
+	struct rte_event ev[BURST_SIZE];
+	uint16_t i;
+
+	while (t->err == false) {
+		uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev,
+				BURST_SIZE, 0);
+
+		if (nb_rx == 0) {
+			if (rte_atomic64_read(outstand_pkts) <= 0)
+				break;
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			if (ev[i].queue_id == 0) { /* from ordered queue */
+				order_queue_process_stage_0(&ev[i]);
+			} else if (ev[i].queue_id == 1) {/* from atomic queue */
+				order_process_stage_1(t, &ev[i], nb_flows,
+					expected_flow_seq, outstand_pkts);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+			} else {
+				order_process_stage_invalid(t, &ev[i]);
+			}
+		}
+
+		uint16_t enq;
+
+		enq = rte_event_enqueue_burst(dev_id, port, ev, nb_rx);
+		while (enq < nb_rx) {
+			enq += rte_event_enqueue_burst(dev_id, port,
+							ev + enq, nb_rx - enq);
+		}
+	}
+	return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+	struct worker_data *w  = arg;
+	const bool burst = evt_has_burst_mode(w->dev_id);
+
+	if (burst)
+		return order_queue_worker_burst(arg);
+	else
+		return order_queue_worker(arg);
+}
+
+static int
+order_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+	return order_launch_lcores(test, opt, worker_wrapper);
+}
+
 #define NB_QUEUES 2
 static int
 order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
@@ -133,6 +232,7 @@ static const struct evt_test_ops order_queue =  {
 	.test_setup         = order_test_setup,
 	.mempool_setup      = order_mempool_setup,
 	.eventdev_setup     = order_queue_eventdev_setup,
+	.launch_lcores      = order_queue_launch_lcores,
 	.eventdev_destroy   = order_eventdev_destroy,
 	.mempool_destroy    = order_mempool_destroy,
 	.test_result        = order_test_result,
-- 
2.13.2



More information about the dev mailing list