[dpdk-dev] [PATCH] examples/eventdev_pipeline: add Tx adapter support

Pavan Nikhilesh pbhagavatula at caviumnetworks.com
Wed Sep 5 15:45:54 CEST 2018


Signed-off-by: Pavan Nikhilesh <pbhagavatula at caviumnetworks.com>
---
 This patch depends on the following series:
 http://patches.dpdk.org/project/dpdk/list/?series=1121

 examples/eventdev_pipeline/main.c             |  62 ++--
 examples/eventdev_pipeline/pipeline_common.h  |  31 +-
 .../pipeline_worker_generic.c                 | 273 +++++-------------
 .../eventdev_pipeline/pipeline_worker_tx.c    | 130 +++++----
 4 files changed, 186 insertions(+), 310 deletions(-)

diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
index 700bc696f..95531150b 100644
--- a/examples/eventdev_pipeline/main.c
+++ b/examples/eventdev_pipeline/main.c
@@ -26,20 +26,6 @@ core_in_use(unsigned int lcore_id) {
 		fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
 }

-static void
-eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
-			void *userdata)
-{
-	int port_id = (uintptr_t) userdata;
-	unsigned int _sent = 0;
-
-	do {
-		/* Note: hard-coded TX queue */
-		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
-					  unsent - _sent);
-	} while (_sent != unsent);
-}
-
 /*
  * Parse the coremask given as argument (hexadecimal string) and fill
  * the global configuration (core role and core count) with the parsed
@@ -263,6 +249,7 @@ parse_app_args(int argc, char **argv)
 static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
+	struct rte_eth_rxconf rx_conf;
 	static const struct rte_eth_conf port_conf_default = {
 		.rxmode = {
 			.mq_mode = ETH_MQ_RX_RSS,
@@ -291,6 +278,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
 		port_conf.txmode.offloads |=
 			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+	rx_conf = dev_info.default_rxconf;
+	rx_conf.offloads = port_conf.rxmode.offloads;

 	port_conf.rx_adv_conf.rss_conf.rss_hf &=
 		dev_info.flow_type_rss_offloads;
@@ -311,7 +300,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	/* Allocate and set up 1 RX queue per Ethernet port. */
 	for (q = 0; q < rx_rings; q++) {
 		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
-				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+				rte_eth_dev_socket_id(port), &rx_conf,
+				mbuf_pool);
 		if (retval < 0)
 			return retval;
 	}
@@ -350,7 +340,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 static int
 init_ports(uint16_t num_ports)
 {
-	uint16_t portid, i;
+	uint16_t portid;

 	if (!cdata.num_mbuf)
 		cdata.num_mbuf = 16384 * num_ports;
@@ -367,36 +357,26 @@ init_ports(uint16_t num_ports)
 			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
 					portid);

-	RTE_ETH_FOREACH_DEV(i) {
-		void *userdata = (void *)(uintptr_t) i;
-		fdata->tx_buf[i] =
-			rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
-		if (fdata->tx_buf[i] == NULL)
-			rte_panic("Out of memory\n");
-		rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
-		rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
-						   eth_tx_buffer_retry,
-						   userdata);
-	}
-
 	return 0;
 }

 static void
 do_capability_setup(uint8_t eventdev_id)
 {
+	int ret;
 	uint16_t i;
-	uint8_t mt_unsafe = 0;
+	uint8_t generic_pipeline = 0;
 	uint8_t burst = 0;

 	RTE_ETH_FOREACH_DEV(i) {
-		struct rte_eth_dev_info dev_info;
-		memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
-
-		rte_eth_dev_info_get(i, &dev_info);
-		/* Check if it is safe ask worker to tx. */
-		mt_unsafe |= !(dev_info.tx_offload_capa &
-				DEV_TX_OFFLOAD_MT_LOCKFREE);
+		uint32_t caps = 0;
+
+		ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"Invalid capability for Tx adptr port %d\n", i);
+		generic_pipeline |= !(caps &
+				RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	struct rte_event_dev_info eventdev_info;
@@ -406,10 +386,10 @@ do_capability_setup(uint8_t eventdev_id)
 	burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
 		0;

-	if (mt_unsafe)
+	if (generic_pipeline)
 		set_worker_generic_setup_data(&fdata->cap, burst);
 	else
-		set_worker_tx_setup_data(&fdata->cap, burst);
+		set_worker_tx_enq_setup_data(&fdata->cap, burst);
 }

 static void
@@ -499,7 +479,7 @@ main(int argc, char **argv)
 	if (worker_data == NULL)
 		rte_panic("rte_calloc failed\n");

-	int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+	int dev_id = fdata->cap.evdev_setup(worker_data);
 	if (dev_id < 0)
 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");

@@ -524,8 +504,8 @@ main(int argc, char **argv)

 		if (fdata->tx_core[lcore_id])
 			printf(
-				"[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
-				__func__, lcore_id, cons_data.port_id);
+				"[%s()] lcore %d executing NIC Tx\n",
+				__func__, lcore_id);

 		if (fdata->sched_core[lcore_id])
 			printf("[%s()] lcore %d executing scheduler\n",
diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
index 9703396f8..a6cc912fb 100644
--- a/examples/eventdev_pipeline/pipeline_common.h
+++ b/examples/eventdev_pipeline/pipeline_common.h
@@ -16,6 +16,7 @@
 #include <rte_ethdev.h>
 #include <rte_eventdev.h>
 #include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
 #include <rte_service.h>
 #include <rte_service_component.h>

@@ -23,38 +24,30 @@
 #define BATCH_SIZE 16
 #define MAX_NUM_CORE 64

-struct cons_data {
-	uint8_t dev_id;
-	uint8_t port_id;
-	uint8_t release;
-} __rte_cache_aligned;
-
 struct worker_data {
 	uint8_t dev_id;
 	uint8_t port_id;
 } __rte_cache_aligned;

 typedef int (*worker_loop)(void *);
-typedef int (*consumer_loop)(void);
 typedef void (*schedule_loop)(unsigned int);
-typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
-typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef int (*eventdev_setup)(struct worker_data *);
+typedef void (*adapter_setup)(uint16_t nb_ports);
 typedef void (*opt_check)(void);

 struct setup_data {
 	worker_loop worker;
-	consumer_loop consumer;
 	schedule_loop scheduler;
 	eventdev_setup evdev_setup;
-	rx_adapter_setup adptr_setup;
+	adapter_setup adptr_setup;
 	opt_check check_opt;
 };

 struct fastpath_data {
 	volatile int done;
-	uint32_t tx_lock;
 	uint32_t evdev_service_id;
 	uint32_t rxadptr_service_id;
+	uint32_t txadptr_service_id;
 	bool rx_single;
 	bool tx_single;
 	bool sched_single;
@@ -62,7 +55,6 @@ struct fastpath_data {
 	unsigned int tx_core[MAX_NUM_CORE];
 	unsigned int sched_core[MAX_NUM_CORE];
 	unsigned int worker_core[MAX_NUM_CORE];
-	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
 	struct setup_data cap;
 } __rte_cache_aligned;

@@ -88,6 +80,8 @@ struct config_data {
 	int16_t next_qid[MAX_NUM_STAGES+2];
 	int16_t qid[MAX_NUM_STAGES];
 	uint8_t rx_adapter_id;
+	uint8_t tx_adapter_id;
+	uint8_t tx_queue_id;
 	uint64_t worker_lcore_mask;
 	uint64_t rx_lcore_mask;
 	uint64_t tx_lcore_mask;
@@ -99,8 +93,6 @@ struct port_link {
 	uint8_t priority;
 };

-struct cons_data cons_data;
-
 struct fastpath_data *fdata;
 struct config_data cdata;

@@ -142,12 +134,11 @@ schedule_devices(unsigned int lcore_id)
 		}
 	}

-	if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
-			 rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
-		fdata->cap.consumer();
-		rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+	if (fdata->tx_core[lcore_id]) {
+		rte_service_run_iter_on_app_lcore(fdata->txadptr_service_id,
+				!fdata->tx_single);
 	}
 }

 void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
-void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index 2215e9ebe..a355c23a1 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
 	return 0;
 }

-static __rte_always_inline int
-consumer(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packet;
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	int i;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				&packet, 1, 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(i)
-				rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received++;
-		uint8_t outport = packet.mbuf->port;
-
-		exchange_mac(packet.mbuf);
-		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-				packet.mbuf);
-
-		if (cons_data.release)
-			rte_event_enqueue_burst(dev_id, port_id,
-								&packet, n);
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					__func__,
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets--;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packets[BATCH_SIZE];
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	unsigned int i, j;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	do {
-		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-				packets, RTE_DIM(packets), 0);
-
-		if (n == 0) {
-			RTE_ETH_FOREACH_DEV(j)
-				rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
-			return 0;
-		}
-		if (start_time == 0)
-			last_time = start_time = rte_get_timer_cycles();
-
-		received += n;
-		for (i = 0; i < n; i++) {
-			uint8_t outport = packets[i].mbuf->port;
-
-			exchange_mac(packets[i].mbuf);
-			rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-					packets[i].mbuf);
-
-			packets[i].op = RTE_EVENT_OP_RELEASE;
-		}
-
-		if (cons_data.release) {
-			uint16_t nb_tx;
-
-			nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-								packets, n);
-			while (nb_tx < n)
-				nb_tx += rte_event_enqueue_burst(dev_id,
-						port_id, packets + nb_tx,
-						n - nb_tx);
-		}
-
-		/* Print out mpps every 1<22 packets */
-		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-			const uint64_t now = rte_get_timer_cycles();
-			const uint64_t total_ms = (now - start_time) / freq_khz;
-			const uint64_t delta_ms = (now - last_time) / freq_khz;
-			uint64_t delta_pkts = received - last_pkts;
-
-			printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
-					"avg %.3f mpps [current %.3f mpps]\n",
-					received,
-					total_ms,
-					received / (total_ms * 1000.0),
-					delta_pkts / (delta_ms * 1000.0));
-			last_pkts = received;
-			last_time = now;
-		}
-
-		cdata.num_packets -= n;
-		if (cdata.num_packets <= 0)
-			fdata->done = 1;
-	/* Be stuck in this loop if single. */
-	} while (!fdata->done && fdata->tx_single);
-
-	return 0;
-}
-
 static int
-setup_eventdev_generic(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_generic(struct worker_data *worker_data)
 {
 	const uint8_t dev_id = 0;
 	/* +1 stages is for a SINGLE_LINK TX stage */
 	const uint8_t nb_queues = cdata.num_stages + 1;
-	/* + 1 is one port for consumer */
-	const uint8_t nb_ports = cdata.num_workers + 1;
+	const uint8_t nb_ports = cdata.num_workers;
 	struct rte_event_dev_config config = {
 			.nb_event_queues = nb_queues,
 			.nb_event_ports = nb_ports,
@@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			.nb_atomic_flows = 1024,
 		.nb_atomic_order_sequences = 1024,
 	};
-	struct rte_event_port_conf tx_p_conf = {
-			.dequeue_depth = 128,
-			.enqueue_depth = 128,
-			.new_event_threshold = 4096,
-	};
 	struct rte_event_queue_conf tx_q_conf = {
 			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
 			.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,

 	struct port_link worker_queues[MAX_NUM_STAGES];
 	uint8_t disable_implicit_release;
-	struct port_link tx_queue;
 	unsigned int i;

 	int ret, ndev = rte_event_dev_count();
@@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 			RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);

 	wkr_p_conf.disable_implicit_release = disable_implicit_release;
-	tx_p_conf.disable_implicit_release = disable_implicit_release;

 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
@@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		printf("%d: error creating qid %d\n", __LINE__, i);
 		return -1;
 	}
-	tx_queue.queue_id = i;
-	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+	cdata.tx_queue_id = i;

 	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
 		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
 		w->port_id = i;
 	}

-	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-	/* port for consumer, linked to TX queue */
-	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
-		printf("Error setting up port %d\n", i);
-		return -1;
-	}
-	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
-				&tx_queue.priority, 1) != 1) {
-		printf("%d: error creating link for port %d\n",
-				__LINE__, i);
-		return -1;
-	}
-	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i,
-					.release = disable_implicit_release };
-
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
 	if (ret != -ESRCH && ret != 0) {
@@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}

 	return dev_id;
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
+	uint8_t tx_port_id = 0;
 	uint8_t evdev_id = 0;
 	struct rte_event_dev_info dev_info;

 	ret = rte_event_dev_info_get(evdev_id, &dev_info);

-	struct rte_event_port_conf rx_p_conf = {
+	struct rte_event_port_conf adptr_p_conf = {
 		.dequeue_depth = 8,
 		.enqueue_depth = 8,
 		.new_event_threshold = 1200,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	/* Create one adapter for all the ethernet ports. */
 	ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
-			&rx_p_conf);
+			&adptr_p_conf);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
 				cdata.rx_adapter_id);

+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
 	queue_conf.ev.sched_type = cdata.queue_type;
 	queue_conf.ev.queue_id = cdata.qid[0];

 	for (i = 0; i < nb_ports; i++) {
-		uint32_t cap;
-
-		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-		if (ret)
-			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
-
 		ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
 				-1, &queue_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");
+
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
 	}

+	ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
+			&tx_port_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE,
+				"Failed to get Tx adapter port id");
+	ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
+			NULL, 1);
+	if (ret != 1)
+		rte_exit(EXIT_FAILURE,
+				"Unable to link Tx adapter port to Tx queue");
+
 	ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
 				&fdata->rxadptr_service_id);
 	if (ret != -ESRCH && ret != 0) {
 		rte_exit(EXIT_FAILURE,
-			"Error getting the service ID for sw eventdev\n");
+			"Error getting the service ID for Rx adapter\n");
 	}
 	rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);

+	ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
+				&fdata->txadptr_service_id);
+	if (ret != -ESRCH && ret != 0) {
+		rte_exit(EXIT_FAILURE,
+			"Error getting the service ID for Tx adapter\n");
+	}
+	rte_service_runstate_set(fdata->txadptr_service_id, 1);
+	rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
+
 	ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
 	if (ret)
 		rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
 				cdata.rx_adapter_id);
+
+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
+	if (rte_event_dev_start(evdev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");
 }

 static void
@@ -510,6 +373,8 @@ generic_opt_check(void)
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t tx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -519,6 +384,8 @@ generic_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@@ -527,13 +394,19 @@ generic_opt_check(void)
 				"failed to get event rx adapter capabilities");
 		rx_needed |=
 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+
+		ret = rte_event_eth_tx_adapter_caps_get(0, i, &cap);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"failed to get event tx adapter capabilities");
+		tx_needed |=
+			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
-				&& !(eventdev_info.event_dev_cap &
-					RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(tx_needed && cdata.tx_lcore_mask == 0) ||
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
@@ -545,23 +418,27 @@ generic_opt_check(void)
 		rte_exit(-1, "Fix core masks\n");
 	}

-	if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+	if (!sched_needed)
 		memset(fdata->sched_core, 0,
 				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!tx_needed)
+		memset(fdata->tx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
 	if (burst) {
-		caps->consumer = consumer_burst;
 		caps->worker = worker_generic_burst;
 	} else {
-		caps->consumer = consumer;
 		caps->worker = worker_generic;
 	}

-	caps->adptr_setup = init_rx_adapter;
+	caps->adptr_setup = init_adapters;
 	caps->scheduler = schedule_devices;
 	caps->evdev_setup = setup_eventdev_generic;
 	caps->check_opt = generic_opt_check;
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
index 3dbde92df..7cd516cd7 100644
--- a/examples/eventdev_pipeline/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
 }

 static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
-	exchange_mac(mbuf);
-	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+	exchange_mac(ev->mbuf);
+	rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
 		rte_pause();
 }

@@ -64,7 +65,7 @@ worker_do_tx_single(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
 			continue;
 		}
@@ -100,7 +101,7 @@ worker_do_tx_single_atq(void *arg)
 		received++;

 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			worker_tx_pkt(ev.mbuf);
+			worker_tx_pkt(dev, port, &ev);
 			tx++;
 			continue;
 		}
@@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;

@@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {

-				worker_tx_pkt(ev[i].mbuf);
+				worker_tx_pkt(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				tx++;
 			} else
@@ -232,7 +233,7 @@ worker_do_tx(void *arg)

 		if (cq_id >= lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)

 		if (cq_id == lst_qid) {
 			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-				worker_tx_pkt(ev.mbuf);
+				worker_tx_pkt(dev, port, &ev);
 				tx++;
 				continue;
 			}
@@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)

 			if (cq_id >= lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)

 			if (cq_id == lst_qid) {
 				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-					worker_tx_pkt(ev[i].mbuf);
+					worker_tx_pkt(dev, port, &ev[i]);
 					tx++;
 					ev[i].op = RTE_EVENT_OP_RELEASE;
 					continue;
@@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
 }

 static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
 {
-	RTE_SET_USED(cons_data);
 	uint8_t i;
 	const uint8_t atq = cdata.all_type_queues ? 1 : 0;
 	const uint8_t dev_id = 0;
@@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
 	}
 	rte_service_runstate_set(fdata->evdev_service_id, 1);
 	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}
+
+	if (rte_event_dev_start(dev_id) < 0)
+		rte_exit(EXIT_FAILURE, "Error starting eventdev");

 	return dev_id;
 }
@@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
 }

 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
 	int i;
 	int ret;
@@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
 	ret = rte_event_dev_info_get(evdev_id, &dev_info);
 	adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);

-	struct rte_event_port_conf rx_p_conf = {
+	struct rte_event_port_conf adptr_p_conf = {
 		.dequeue_depth = 8,
 		.enqueue_depth = 8,
 		.new_event_threshold = 1200,
 	};

-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
+	if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		adptr_p_conf.dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		adptr_p_conf.enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;

 	struct rte_event_eth_rx_adapter_queue_conf queue_conf;
 	memset(&queue_conf, 0, sizeof(queue_conf));
@@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
 		uint32_t cap;
 		uint32_t service_id;

-		ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+		ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+				&adptr_p_conf);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to create rx adapter[%d]",
-					cdata.rx_adapter_id);
+					"failed to create rx adapter[%d]", i);

 		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
 		if (ret)
@@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
 			rte_exit(EXIT_FAILURE,
 					"Failed to add queues to Rx adapter");

-
 		/* Producer needs to be scheduled. */
 		if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
 			ret = rte_event_eth_rx_adapter_service_id_get(i,
@@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
 		ret = rte_event_eth_rx_adapter_start(i);
 		if (ret)
 			rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-					cdata.rx_adapter_id);
+					i);
+	}
+
+	/* We already know that Tx adapter has INTERNAL port cap*/
+	ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+			&adptr_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+				cdata.tx_adapter_id);
+
+	for (i = 0; i < nb_ports; i++) {
+		ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+				-1);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Tx adapter");
 	}

+	ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+				cdata.tx_adapter_id);
+
 	if (adptr_services->nb_rx_adptrs) {
 		struct rte_service_spec service;

@@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
 				&fdata->rxadptr_service_id);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-				"Rx adapter[%d] service register failed",
-				cdata.rx_adapter_id);
+				"Rx adapter service register failed");

 		rte_service_runstate_set(fdata->rxadptr_service_id, 1);
 		rte_service_component_runstate_set(fdata->rxadptr_service_id,
@@ -708,23 +725,20 @@ init_rx_adapter(uint16_t nb_ports)
 		rte_free(adptr_services);
 	}

-	if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
-			(dev_info.event_dev_cap &
+	if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
 			 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
 		fdata->cap.scheduler = NULL;
-
-	if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-		memset(fdata->sched_core, 0,
-				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
 {
 	int i;
 	int ret;
 	uint32_t cap = 0;
 	uint8_t rx_needed = 0;
+	uint8_t tx_needed = 0;
+	uint8_t sched_needed = 0;
 	struct rte_event_dev_info eventdev_info;

 	memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -734,22 +748,29 @@ worker_tx_opt_check(void)
 				RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
 		rte_exit(EXIT_FAILURE,
 				"Event dev doesn't support all type queues\n");
+	sched_needed = !(eventdev_info.event_dev_cap &
+		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);

 	RTE_ETH_FOREACH_DEV(i) {
 		ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
 		if (ret)
 			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
+				"failed to get event rx adapter capabilities");
 		rx_needed |=
 			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+
+		ret = rte_event_eth_tx_adapter_caps_get(0, i, &cap);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+				"failed to get event tx adapter capabilities");
+		tx_needed |=
+			!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
 	}

 	if (cdata.worker_lcore_mask == 0 ||
 			(rx_needed && cdata.rx_lcore_mask == 0) ||
-			(cdata.sched_lcore_mask == 0 &&
-			 !(eventdev_info.event_dev_cap &
-				 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+			(tx_needed && cdata.tx_lcore_mask == 0) ||
+			(sched_needed && cdata.sched_lcore_mask == 0)) {
 		printf("Core part of pipeline was not assigned any cores. "
 			"This will stall the pipeline, please check core masks "
 			"(use -h for details on setting core masks):\n"
@@ -760,6 +781,16 @@ worker_tx_opt_check(void)
 			cdata.worker_lcore_mask);
 		rte_exit(-1, "Fix core masks\n");
 	}
+
+	if (!sched_needed)
+		memset(fdata->sched_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!rx_needed)
+		memset(fdata->rx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
+	if (!tx_needed)
+		memset(fdata->tx_core, 0,
+				sizeof(unsigned int) * MAX_NUM_CORE);
 }

 static worker_loop
@@ -821,18 +852,15 @@ get_worker_multi_stage(bool burst)
 }

 void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
 {
 	if (cdata.num_stages == 1)
 		caps->worker = get_worker_single_stage(burst);
 	else
 		caps->worker = get_worker_multi_stage(burst);

-	memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
-	caps->check_opt = worker_tx_opt_check;
-	caps->consumer = NULL;
+	caps->check_opt = worker_tx_enq_opt_check;
 	caps->scheduler = schedule_devices;
-	caps->evdev_setup = setup_eventdev_worker_tx;
-	caps->adptr_setup = init_rx_adapter;
+	caps->evdev_setup = setup_eventdev_worker_tx_enq;
+	caps->adptr_setup = init_adapters;
 }
--
2.18.0



More information about the dev mailing list