[dpdk-dev] [PATCH v2 09/15] examples/eventdev: add burst for thread safe pipeline

Pavan Nikhilesh pbhagavatula at caviumnetworks.com
Wed Jan 10 12:10:07 CET 2018


Add burst mode worker pipeline when Tx is multi thread safe.

Signed-off-by: Pavan Nikhilesh <pbhagavatula at caviumnetworks.com>
---
 .../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c  | 74 +++++++++++++++++++++-
 1 file changed, 72 insertions(+), 2 deletions(-)

diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
index 397b1013f..419d8d410 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
@@ -22,6 +22,19 @@ worker_event_enqueue(const uint8_t dev, const uint8_t port,
 		rte_pause();
 }

+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_enqueue_burst(dev, port,
+						ev + enq, nb_rx - enq);
+	}
+}
+
 static __rte_always_inline void
 worker_tx_pkt(struct rte_mbuf *mbuf)
 {
@@ -81,6 +94,61 @@ worker_do_tx(void *arg)
 	return 0;
 }

+static int
+worker_do_tx_burst(void *arg)
+{
+	struct rte_event ev[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t dev = data->dev_id;
+	uint8_t port = data->port_id;
+	uint8_t lst_qid = cdata.num_stages - 1;
+	size_t fwd = 0, received = 0, tx = 0;
+
+	while (!fdata->done) {
+		uint16_t i;
+		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+				ev, BATCH_SIZE, 0);
+
+		if (nb_rx == 0) {
+			rte_pause();
+			continue;
+		}
+		received += nb_rx;
+
+		for (i = 0; i < nb_rx; i++) {
+			const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+			if (cq_id >= lst_qid) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+					worker_tx_pkt(ev[i].mbuf);
+					tx++;
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					continue;
+				}
+				ev[i].queue_id = (cq_id == lst_qid) ?
+					cdata.next_qid[ev[i].queue_id] :
+					ev[i].queue_id;
+
+				worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
+				worker_fwd_event(&ev[i], cdata.queue_type);
+			}
+			work();
+		}
+		worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+		fwd += nb_rx;
+	}
+
+	if (!cdata.quiet)
+		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+				rte_lcore_id(), received, fwd, tx);
+
+	return 0;
+}
+
 static int
 setup_eventdev_worker_tx(struct cons_data *cons_data,
 		struct worker_data *worker_data)
@@ -412,8 +480,10 @@ worker_tx_opt_check(void)
 void
 set_worker_tx_setup_data(struct setup_data *caps, bool burst)
 {
-	RTE_SET_USED(burst);
-	caps->worker = worker_do_tx;
+	if (burst)
+		caps->worker = worker_do_tx_burst;
+	else
+		caps->worker = worker_do_tx;

 	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);

--
2.15.1



More information about the dev mailing list