[dpdk-dev] [PATCH v2 06/15] examples/eventdev: add non burst mode generic worker

Pavan Nikhilesh pbhagavatula at caviumnetworks.com
Wed Jan 10 12:10:04 CET 2018


Currently, worker uses burst dequeue and burst enqueue to forward events.
Add a non burst mode based on the event dev capabilities.

Signed-off-by: Pavan Nikhilesh <pbhagavatula at caviumnetworks.com>
---
 examples/eventdev_pipeline_sw_pmd/main.c           |  12 +-
 .../pipeline_worker_generic.c                      | 126 ++++++++++++++++++++-
 2 files changed, 133 insertions(+), 5 deletions(-)

diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
index 9e6061643..947c5f786 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -382,8 +382,16 @@ static void
 do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
 {
 	RTE_SET_USED(nb_ethdev);
-	RTE_SET_USED(eventdev_id);
-	set_worker_generic_setup_data(&fdata->cap, 1);
+	uint8_t burst = 0;
+
+	struct rte_event_dev_info eventdev_info;
+	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+
+	rte_event_dev_info_get(eventdev_id, &eventdev_info);
+	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
+		0;
+
+	set_worker_generic_setup_data(&fdata->cap, burst);
 }
 
 static void
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
index d1b0e1db1..f4523902b 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
@@ -6,6 +6,59 @@
 
 #include "pipeline_common.h"
 
+static __rte_always_inline int
+worker_generic(void *arg)
+{
+	struct rte_event ev;
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t dev_id = data->dev_id;
+	uint8_t port_id = data->port_id;
+	size_t sent = 0, received = 0;
+	unsigned int lcore_id = rte_lcore_id();
+
+	while (!fdata->done) {
+
+		if (fdata->cap.scheduler)
+			fdata->cap.scheduler(lcore_id);
+
+		if (!fdata->worker_core[lcore_id]) {
+			rte_pause();
+			continue;
+		}
+
+		const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+				&ev, 1, 0);
+
+		if (nb_rx == 0) {
+			rte_pause();
+			continue;
+		}
+		received++;
+
+		/* The first worker stage does classification */
+		if (ev.queue_id == cdata.qid[0])
+			ev.flow_id = ev.mbuf->hash.rss
+						% cdata.num_fids;
+
+		ev.queue_id = cdata.next_qid[ev.queue_id];
+		ev.op = RTE_EVENT_OP_FORWARD;
+		ev.sched_type = cdata.queue_type;
+
+		work(ev.mbuf);
+
+		while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
+			rte_pause();
+		sent++;
+	}
+
+	if (!cdata.quiet)
+		printf("  worker %u thread done. RX=%zu TX=%zu\n",
+				rte_lcore_id(), received, sent);
+
+	return 0;
+}
+
 static int
 worker_generic_burst(void *arg)
 {
@@ -66,6 +119,69 @@ worker_generic_burst(void *arg)
 	return 0;
 }
 
+static __rte_always_inline int
+consumer(void)
+{
+	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+	struct rte_event packet;
+
+	static uint64_t received;
+	static uint64_t last_pkts;
+	static uint64_t last_time;
+	static uint64_t start_time;
+	int i;
+	uint8_t dev_id = cons_data.dev_id;
+	uint8_t port_id = cons_data.port_id;
+
+	do {
+		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+				&packet, 1, 0);
+
+		if (n == 0) {
+			for (i = 0; i < rte_eth_dev_count(); i++)
+				rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
+			return 0;
+		}
+		if (start_time == 0)
+			last_time = start_time = rte_get_timer_cycles();
+
+		received++;
+		uint8_t outport = packet.mbuf->port;
+
+		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+				packet.mbuf);
+
+		if (cons_data.release)
+			rte_event_enqueue_burst(dev_id, port_id,
+								&packet, n);
+
+		/* Print out mpps every 1<22 packets */
+		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+			const uint64_t now = rte_get_timer_cycles();
+			const uint64_t total_ms = (now - start_time) / freq_khz;
+			const uint64_t delta_ms = (now - last_time) / freq_khz;
+			uint64_t delta_pkts = received - last_pkts;
+
+			printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
+					"avg %.3f mpps [current %.3f mpps]\n",
+					__func__,
+					received,
+					total_ms,
+					received / (total_ms * 1000.0),
+					delta_pkts / (delta_ms * 1000.0));
+			last_pkts = received;
+			last_time = now;
+		}
+
+		cdata.num_packets--;
+		if (cdata.num_packets <= 0)
+			fdata->done = 1;
+	/* Be stuck in this loop if single. */
+	} while (!fdata->done && fdata->tx_single);
+
+	return 0;
+}
+
 static __rte_always_inline int
 consumer_burst(void)
 {
@@ -430,9 +546,13 @@ generic_opt_check(void)
 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
-	RTE_SET_USED(burst);
-	caps->consumer = consumer_burst;
-	caps->worker = worker_generic_burst;
+	if (burst) {
+		caps->consumer = consumer_burst;
+		caps->worker = worker_generic_burst;
+	} else {
+		caps->consumer = consumer;
+		caps->worker = worker_generic;
+	}
 
 	caps->adptr_setup = init_rx_adapter;
 	caps->scheduler = schedule_devices;
-- 
2.15.1



More information about the dev mailing list