[dpdk-dev] [PATCH 38/39] examples/l2fwd-event: add eventmode worker

Anoob Joseph anoobj at marvell.com
Mon Jun 3 19:32:38 CEST 2019


Adding burst no Tx internal-port eventmode worker

Signed-off-by: Anoob Joseph <anoobj at marvell.com>
Signed-off-by: Lukasz Bartosik <lbartosik at marvell.com>
---
 examples/l2fwd-event/l2fwd_worker.c | 219 +++++++++++++++++++++++++++++++++++-
 1 file changed, 218 insertions(+), 1 deletion(-)

diff --git a/examples/l2fwd-event/l2fwd_worker.c b/examples/l2fwd-event/l2fwd_worker.c
index 0a413a4..251c546 100644
--- a/examples/l2fwd-event/l2fwd_worker.c
+++ b/examples/l2fwd-event/l2fwd_worker.c
@@ -315,7 +315,7 @@ l2fwd_poll_mode_worker(void)
  */
 
 /* Workers registered */
-#define L2FWD_EVENTMODE_WORKERS		2
+#define L2FWD_EVENTMODE_WORKERS		3
 
 /*
  * Event mode worker
@@ -639,6 +639,214 @@ l2fwd_eventmode_non_burst_tx_internal_port(void *args)
 		rte_free(links);
 }
 
+/*
+ * Event mode worker
+ * Operating mode : burst no internal port (regular tx worker)
+ */
+static void
+l2fwd_eventmode_burst_no_internal_port(void *args)
+{
+	struct rte_event ev[MAX_PKT_BURST];
+	struct rte_mbuf *pkt;
+	struct rte_eventmode_helper_conf *mode_conf;
+	struct rte_eventmode_helper_event_link_info *links = NULL;
+	unsigned int lcore_nb_link = 0;
+	uint32_t lcore_id;
+	unsigned int i, j, left, nb_rx = 0;
+	unsigned int portid;
+	struct lcore_queue_conf *qconf;
+	int is_master_core;
+	struct rte_event_port_conf event_port_conf;
+	uint16_t deq_len = 0, enq_len = 0;
+	uint8_t tx_queue;
+	struct tsc_tracker tsc = {0};
+
+	/* Get core ID */
+	lcore_id = rte_lcore_id();
+
+	RTE_LOG(INFO, L2FWD,
+		"Launching event mode burst worker no internal port "
+		"(regular tx worker) on lcore %d\n", lcore_id);
+
+	/* Set the flag if master core */
+	is_master_core = (lcore_id == rte_get_master_lcore()) ? 1 : 0;
+
+	/* Get qconf for this core */
+	qconf = &lcore_queue_conf[lcore_id];
+
+	/* Set drain tsc */
+	tsc.drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
+			US_PER_S * BURST_TX_DRAIN_US;
+
+	/* Mode conf will be passed as args */
+	mode_conf = (struct rte_eventmode_helper_conf *)args;
+
+	/* Get the links configured for this lcore */
+	lcore_nb_link = rte_eventmode_helper_get_event_lcore_links(lcore_id,
+			mode_conf, &links);
+
+	/* Check if we have links registered for this lcore */
+	if (lcore_nb_link == 0) {
+		/* No links registered. The core could do periodic drains */
+		l2fwd_drain_loop(qconf, &tsc, is_master_core);
+		goto clean_and_exit;
+	}
+
+	/* We have valid links */
+
+	/* Reset stats before proceeding */
+	reset_eth_stats(is_master_core);
+
+	/*
+	 * There is no internal port between ethdev and eventdev. So the worker
+	 * thread needs to submit event to a designated tx queue. Internally
+	 * eth core would receive events from multiple worker threads and send
+	 * out packets on wire.
+	 */
+	tx_queue = rte_eventmode_helper_get_tx_queue(mode_conf,
+						     links[0].eventdev_id);
+
+	/* Get the burst size of the event device */
+
+	/* Get the default conf of the first link */
+	rte_event_port_default_conf_get(links[0].eventdev_id,
+			links[0].event_portid,
+			&event_port_conf);
+
+	/* Save the burst size */
+	deq_len = event_port_conf.dequeue_depth;
+	enq_len = event_port_conf.enqueue_depth;
+
+	/* Dequeue and enqueue length should not exceed MAX_PKT_BURST */
+	if (deq_len > MAX_PKT_BURST)
+		deq_len = MAX_PKT_BURST;
+	if (enq_len > MAX_PKT_BURST)
+		enq_len = MAX_PKT_BURST;
+
+	/* See if it's single link */
+	if (lcore_nb_link == 1)
+		goto single_link_loop;
+	else
+		goto multi_link_loop;
+
+single_link_loop:
+
+	RTE_LOG(INFO, L2FWD, " -- lcoreid=%u event_port_id=%u\n", lcore_id,
+		links[0].event_portid);
+
+	while (!force_quit) {
+
+		/* Do periodic operations (buffer drain & stats monitor) */
+		l2fwd_periodic_drain_stats_monitor(qconf, &tsc, is_master_core);
+
+		/* Read packet from event queues */
+		nb_rx = rte_event_dequeue_burst(links[0].eventdev_id,
+				links[0].event_portid,
+				ev,             /* events */
+				deq_len,        /* nb_events */
+				0               /* timeout_ticks */);
+
+		if (nb_rx == 0)
+			continue;
+
+		for (j = 0; j < nb_rx; j++) {
+
+			portid = ev[j].queue_id;
+			port_statistics[portid].rx++;
+			pkt = ev[j].mbuf;
+
+			rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
+
+			/* Process packet */
+			l2fwd_event_pre_forward(&(ev[j]), portid);
+
+			/*
+			 * Internal port is not available, the packet needs
+			 * to be enqueued to the designated event queue.
+			 */
+
+			/* Prepare event for submission to tx event queue */
+			l2fwd_event_switch_to_tx_queue(&(ev[j]), tx_queue);
+		}
+
+		for (j = 0, left = nb_rx;
+			j < (nb_rx + enq_len - 1)/enq_len; j++) {
+
+			/* Submit the updated events for tx stage */
+			left -= rte_event_enqueue_burst(links[0].eventdev_id,
+					links[0].event_portid,
+					&(ev[j*enq_len]), /* events */
+					left > enq_len ?
+					enq_len : left /* nb_events */);
+		}
+	}
+	goto clean_and_exit;
+
+multi_link_loop:
+
+	for (i = 0; i < lcore_nb_link; i++) {
+		RTE_LOG(INFO, L2FWD, " -- lcoreid=%u event_port_id=%u\n",
+			lcore_id, links[i].event_portid);
+	}
+
+	while (!force_quit) {
+
+		/* Do periodic operations (buffer drain & stats monitor) */
+		l2fwd_periodic_drain_stats_monitor(qconf, &tsc, is_master_core);
+
+		for (i = 0; i < lcore_nb_link; i++) {
+			/* Read packet from event queues */
+			nb_rx = rte_event_dequeue_burst(links[i].eventdev_id,
+					links[i].event_portid,
+					ev,             /* events */
+					deq_len,        /* nb_events */
+					0               /* timeout_ticks */);
+
+			if (nb_rx == 0)
+				continue;
+
+			for (j = 0; j < nb_rx; j++) {
+
+				portid = ev[j].queue_id;
+				port_statistics[portid].rx++;
+				pkt = ev[j].mbuf;
+
+				rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
+
+				/* Process packet */
+				l2fwd_event_pre_forward(&(ev[j]), portid);
+
+				/*
+				 * Internal port is not available, the packet
+				 * needs to be enqueued to the designated event
+				 * queue.
+				 */
+
+				/* Update the scheduling type for tx stage */
+				l2fwd_event_switch_to_tx_queue(&(ev[j]),
+						tx_queue);
+			}
+
+			for (j = 0, left = nb_rx;
+				j < (nb_rx + enq_len - 1)/enq_len; j++) {
+
+				/* Submit the updated events for tx stage */
+				left -= rte_event_enqueue_burst(
+					links[i].eventdev_id,
+					links[i].event_portid,
+					&(ev[j*enq_len]), /* events */
+					left > enq_len ?
+					enq_len : left /* nb_events */);
+			}
+		}
+	}
+	goto clean_and_exit;
+
+clean_and_exit:
+	if (links != NULL)
+		rte_free(links);
+}
+
 static uint8_t
 l2fwd_eventmode_populate_wrkr_params(
 		struct rte_eventmode_helper_app_worker_params *wrkrs)
@@ -665,6 +873,15 @@ l2fwd_eventmode_populate_wrkr_params(
 	wrkr->worker_thread = l2fwd_eventmode_non_burst_tx_internal_port;
 
 	nb_wrkr_param++;
+	wrkr++;
+
+	/* Burst no internal port (regular tx worker) */
+	wrkr->cap.burst = RTE_EVENTMODE_HELPER_RX_TYPE_BURST;
+	wrkr->cap.tx_internal_port =
+			RTE_EVENTMODE_HELPER_TX_TYPE_NO_INTERNAL_PORT;
+	wrkr->worker_thread = l2fwd_eventmode_burst_no_internal_port;
+
+	nb_wrkr_param++;
 	return nb_wrkr_param;
 }
 
-- 
2.7.4



More information about the dev mailing list