[dpdk-dev] [PATCH 7/7] examples/eventdev_pipeline: adding example

Harry van Haaren harry.van.haaren at intel.com
Wed Nov 16 19:00:07 CET 2016


This patch adds a sample app to the examples/ directory, which can be used
as a reference application and for general testing. The application requires
two ethdev ports and expects traffic to be flowing. The application must be
run with the --vdev flags as follows to indicate to EAL that a virtual
eventdev device called "evdev_sw0" is available to be used:

    ./build/eventdev_pipeline --vdev evdev_sw0

The general flow of the traffic is as follows:

    Rx core -> Atomic Queue => 4 worker cores => TX core

A scheduler core is required to do the packet scheduling, making this
configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers). Finally
a master core brings the core count to 8 for this configuration. The
application can be configured for various numbers of flows and worker
cores. Run the application with -h for details.

Signed-off-by: Gage Eads <gage.eads at intel.com>
Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
---
 examples/eventdev_pipeline/Makefile |  49 +++
 examples/eventdev_pipeline/main.c   | 718 ++++++++++++++++++++++++++++++++++++
 2 files changed, 767 insertions(+)
 create mode 100644 examples/eventdev_pipeline/Makefile
 create mode 100644 examples/eventdev_pipeline/main.c

diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
new file mode 100644
index 0000000..bab8916
--- /dev/null
+++ b/examples/eventdev_pipeline/Makefile
@@ -0,0 +1,49 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
new file mode 100644
index 0000000..6a8052c
--- /dev/null
+++ b/examples/eventdev_pipeline/main.c
@@ -0,0 +1,718 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+
+#define BATCH_SIZE 32
+
+static unsigned int num_workers = 4;
+static unsigned long num_packets = (1L << 25); /* do ~32M packets */
+static unsigned int num_fids = 16;
+static unsigned int num_priorities = 1;
+static int sched_type = RTE_SCHED_TYPE_ATOMIC;
+
+struct prod_data {
+	uint8_t event_dev_id;
+	uint8_t event_port_id;
+	int32_t qid;
+	unsigned num_ports;
+};
+
+struct cons_data {
+	uint8_t event_dev_id;
+	uint8_t event_port_id;
+};
+
+struct worker_data {
+	uint8_t event_dev_id;
+	int event_port_id;
+	int32_t qid;
+};
+
+static volatile int done = 0;
+static int quiet = 0;
+struct rte_mempool *mp;
+
+static int
+worker(void *arg)
+{
+	struct rte_event rcv_events[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	int32_t qid = data->qid;
+	size_t sent = 0, received = 0;
+
+	while (!done) {
+		uint16_t i;
+
+		uint16_t n = rte_event_dequeue_burst(event_dev_id,
+						     event_port_id,
+						     rcv_events,
+						     RTE_DIM(rcv_events),
+						     false);
+		if (n == 0){
+			rte_pause();
+			/* Flush any buffered events */
+			rte_event_dequeue(event_dev_id,
+					  event_port_id,
+					  NULL,
+					  false);
+			continue;
+		}
+		received += n;
+
+		for (i = 0; i < n; i++) {
+			struct ether_hdr *eth;
+			struct ether_addr addr;
+			struct rte_event *ev = &rcv_events[i];
+
+			ev->queue_id = qid;
+			ev->flow_id = 0;
+			ev->priority = 0;
+			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+			ev->operation = RTE_EVENT_OP_FORWARD;
+
+			uint64_t now = rte_rdtsc();
+			while(now + 750 > rte_rdtsc()) {}
+
+			/* change mac addresses on packet */
+			eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *);
+			ether_addr_copy(&eth->d_addr, &addr);
+			ether_addr_copy(&eth->s_addr, &eth->d_addr);
+			ether_addr_copy(&addr, &eth->s_addr);
+		}
+		int ret = rte_event_enqueue_burst(event_dev_id, event_port_id,
+					rcv_events, n, false);
+		if (ret != n)
+			rte_panic("worker %u thread failed to enqueue event\n",
+				rte_lcore_id());
+	}
+
+	/* Flush the buffered events */
+	rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
+
+	if (!quiet)
+		printf("  worker %u thread done. RX=%zu TX=%zu\n",
+				rte_lcore_id(), received, sent);
+
+	return 0;
+}
+
+static int
+scheduler(void *arg)
+{
+	RTE_SET_USED(arg);
+	size_t loops = 0;
+
+	while (!done) {
+		/* Assumes an event dev ID of 0 */
+		rte_event_schedule(0);
+		loops++;
+	}
+
+	printf("  scheduler thread done. loops=%zu\n", loops);
+
+	return 0;
+}
+
+static int
+consumer(void *arg)
+{
+	struct rte_event events[BATCH_SIZE];
+
+	struct cons_data *data = (struct cons_data *)arg;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+	size_t npackets = num_packets;
+	size_t received = 0;
+	size_t received_printed = 0; /* tracks when we last printed receive count */
+	uint64_t start_time = 0;
+	uint64_t freq_khz = rte_get_timer_hz() / 1000;
+	uint64_t dropped = 0;
+	unsigned i;
+
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		tx_buf[i] = rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+		if (tx_buf[i] == NULL)
+			rte_panic("Out of memory\n");
+		rte_eth_tx_buffer_init(tx_buf[i], 32);
+		rte_eth_tx_buffer_set_err_callback(tx_buf[i],
+				rte_eth_tx_buffer_count_callback, &dropped);
+	}
+
+	while (!done) {
+		uint16_t i;
+		uint16_t n = rte_event_dequeue_burst(event_dev_id,
+						     event_port_id,
+						     events,
+						     RTE_DIM(events),
+						     false);
+
+		if (n == 0){
+			rte_pause();
+			continue;
+		}
+		if (start_time == 0)
+			start_time = rte_get_timer_cycles();
+
+		received += n;
+		for (i = 0; i < n; i++) {
+			uint8_t outport = events[i].mbuf->port;
+			rte_eth_tx_buffer(outport, 0, tx_buf[outport], events[i].mbuf);
+		}
+
+		if (!quiet && received >= received_printed + (1<<22)) {
+			printf("# consumer RX=%zu, time %"PRIu64"ms\n",
+					received,
+					(rte_get_timer_cycles() - start_time) / freq_khz);
+			received_printed = received;
+		}
+
+		if (num_packets > 0 && npackets > 0) {
+			npackets -= n;
+			if (npackets == 0)
+				done = 1;
+		}
+	}
+
+	for (i = 0; i < rte_eth_dev_count(); i++)
+		rte_eth_tx_buffer_flush(i, 0, tx_buf[i]);
+
+	printf("  consumer done! RX=%zu, time %"PRIu64"ms\n",
+			received,
+			(rte_get_timer_cycles() - start_time) / freq_khz);
+
+	return 0;
+}
+
+static int
+producer(void *arg)
+{
+
+	struct prod_data *data = (struct prod_data *)arg;
+	size_t npackets = num_packets;
+	unsigned i;
+	uint64_t mbuf_seqno = 0;
+	size_t sent = 0;
+	uint8_t eth_port = 0;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	int fid_counter = 0;
+
+	while (!done) {
+		int ret;
+		unsigned num_ports = data->num_ports;
+		int32_t qid = data->qid;
+		struct rte_event events[BATCH_SIZE];
+		struct rte_mbuf *mbufs[BATCH_SIZE];
+
+		uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
+		if (++eth_port == num_ports)
+			eth_port = 0;
+		if (nb_rx == 0) {
+			rte_pause();
+			/* Flush any buffered events */
+			rte_event_dequeue(event_dev_id,
+					  event_port_id,
+					  NULL,
+					  false);
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			struct rte_mbuf *m = mbufs[i];
+			struct rte_event *ev = &events[i];
+
+			ev->queue_id = qid;
+			ev->flow_id = fid_counter++ % 6;
+			ev->priority = 0;
+			m->udata64 = mbuf_seqno++;
+			ev->mbuf = m;
+			ev->sched_type = sched_type;
+			ev->operation = RTE_EVENT_OP_NEW;
+		}
+
+		do {
+			ret = rte_event_enqueue_burst(event_dev_id,
+							event_port_id,
+							events,
+							nb_rx,
+							false);
+		} while (ret == -ENOSPC);
+		if (ret != nb_rx)
+			rte_panic("producer thread failed to enqueue *all* events\n");
+
+		sent += nb_rx;
+
+		if (num_packets > 0 && npackets > 0) {
+			npackets -= nb_rx;
+			if (npackets == 0)
+				break;
+		}
+	}
+
+	/* Flush any buffered events */
+	while (!done)
+		rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
+
+	printf("  prod thread done! TX=%zu across %u flows\n", sent, num_fids);
+
+	return 0;
+}
+
+static struct option long_options[] = {
+	{"workers", required_argument, 0, 'w'},
+	{"packets", required_argument, 0, 'n'},
+	{"atomic-flows", required_argument, 0, 'f'},
+	{"priority", required_argument, 0, 'p'},
+	{"ordered", no_argument, 0, 'o'},
+	{"quiet", no_argument, 0, 'q'},
+	{0, 0, 0, 0}
+};
+
+static void
+usage(void)
+{
+	const char *usage_str =
+		"  Usage: eventdev_pipeline [options]\n"
+		"  Options:\n"
+		"  -w, --workers=N       Use N workers (default 4)\n"
+		"  -n, --packets=N       Send N packets (default ~32M), 0 implies no limit\n"
+		"  -f, --atomic-flows=N  Use N random flows from 1 to N (default 16)\n"
+		"  -p, --priority=N      Use N number of priorities (default 1)\n"
+		"  -o, --ordered         Use ordered scheduling\n"
+		"  -q, --quiet           Minimize printed output\n"
+		"\n";
+
+	fprintf(stderr, "%s", usage_str);
+	exit(1);
+}
+
+static void
+parse_app_args(int argc, char** argv)
+{
+	/* Parse cli options*/
+	int option_index;
+	int c;
+	opterr = 0;
+
+	for (;;) {
+		c = getopt_long(argc, argv, "w:n:f:p:oq", long_options,
+				&option_index);
+		if (c == -1)
+			break;
+
+		switch (c) {
+			case 'w':
+				num_workers = (unsigned int)atoi(optarg);
+				break;
+			case 'n':
+				num_packets = (unsigned long )atol(optarg);
+				break;
+			case 'f':
+				num_fids = (unsigned int)atoi(optarg);
+				break;
+			case 'p':
+				num_priorities = (unsigned int)atoi(optarg);
+				break;
+			case 'o':
+				sched_type = RTE_SCHED_TYPE_ORDERED;
+				break;
+			case 'q':
+				quiet = 1;
+				break;
+			default:
+				usage();
+		}
+	}
+	if (num_workers == 0)
+		usage();
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN }
+	};
+	const uint16_t rx_rings = 1, tx_rings = 1;
+	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+	struct rte_eth_conf port_conf = port_conf_default;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	/* Configure the Ethernet device. */
+	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	/* Allocate and set up 1 RX queue per Ethernet port. */
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Allocate and set up 1 TX queue per Ethernet port. */
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+				rte_eth_dev_socket_id(port), NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Start the Ethernet port. */
+	retval = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	/* Display the port MAC address. */
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+			(unsigned)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	/* Enable RX in promiscuous mode for the Ethernet device. */
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+static int
+init_ports(unsigned num_ports)
+{
+	uint8_t portid;
+
+	mp = rte_pktmbuf_pool_create("packet_pool",
+			/* mbufs */ 16384 * num_ports,
+			/* cache_size */ 512,
+			/* priv_size*/ 0,
+			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+			rte_socket_id());
+
+	for (portid = 0; portid < num_ports; portid++)
+		if (port_init(portid, mp) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+					portid);
+	return 0;
+}
+
+static uint8_t
+setup_event_dev(struct prod_data *prod_data,
+		struct cons_data *cons_data,
+		struct worker_data *worker_data)
+{
+	struct rte_event_dev_config config;
+	struct rte_event_queue_conf queue_config;
+	struct rte_event_port_conf port_config;
+	struct rte_event_queue_link link;
+	int prod_port;
+	int cons_port;
+	int qid0;
+	int cons_qid;
+	int prod_qid;
+	unsigned i;
+	int ret;
+	int8_t id;
+
+	const char *dev_name = "evdev_sw0";
+	id = rte_event_dev_get_dev_id(dev_name);
+	if (id < 0)
+		rte_panic("Failed to get %s device ID\n", dev_name);
+
+	config.nb_event_queues = 3;
+	config.nb_event_ports = num_workers + 2;
+	config.nb_events_limit = 256;
+	config.dequeue_wait_ns = 0;
+
+	ret = rte_event_dev_configure(id, &config);
+	if (ret)
+		rte_panic("Failed to configure the event dev\n");
+
+	/* Create queues */
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
+	queue_config.priority = 0;
+
+	qid0 = 0;
+	ret = rte_event_queue_setup(id, qid0, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the scheduled QID\n");
+
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
+	queue_config.priority = 0;
+
+	cons_qid = 1;
+	ret = rte_event_queue_setup(id, cons_qid, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the cons directed QID\n");
+
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
+	queue_config.priority = 0;
+
+	prod_qid = 2;
+	ret = rte_event_queue_setup(id, prod_qid, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the prod directed QID\n");
+
+	/* Create ports */
+#define LB_PORT_DEPTH 16
+#define DIR_PORT_DEPTH 32
+	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
+	port_config.dequeue_queue_depth = LB_PORT_DEPTH;
+	port_config.new_event_threshold = 255;
+
+	prod_port = 0;
+	ret = rte_event_port_setup(id, prod_port, &port_config);
+	if (ret < 0)
+		rte_panic("Failed to create the producer port\n");
+
+	cons_port = 1;
+	port_config.enqueue_queue_depth = DIR_PORT_DEPTH;
+	port_config.dequeue_queue_depth = DIR_PORT_DEPTH;
+	ret = rte_event_port_setup(id, cons_port, &port_config);
+	if (ret < 0)
+		rte_panic("Failed to create the consumer port\n");
+
+	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
+	port_config.dequeue_queue_depth = LB_PORT_DEPTH;
+	for (i = 0; i < num_workers; i++) {
+		worker_data[i].event_port_id = i + 2;
+		ret = rte_event_port_setup(id, worker_data[i].event_port_id, &port_config);
+		if (ret < 0)
+			rte_panic("Failed to create worker port #%d\n", i);
+	}
+
+	/* Map ports/qids */
+	for (i = 0; i < num_workers; i++) {
+		link.queue_id = qid0;
+		link.priority = 0;
+
+		ret = rte_event_port_link(id, worker_data[i].event_port_id, &link, 1);
+		if (ret != 1)
+			rte_panic("Failed to map worker%d port to qid0\n", i);
+	}
+
+	/* Link consumer port to its QID */
+	link.queue_id = cons_qid;
+	link.priority = 0;
+
+	ret = rte_event_port_link(id, cons_port, &link, 1);
+	if (ret != 1)
+		rte_panic("Failed to map consumer port to cons_qid\n");
+
+	/* Link producer port to its QID */
+	link.queue_id = prod_qid;
+	link.priority = 0;
+
+	ret = rte_event_port_link(id, prod_port, &link, 1);
+	if (ret != 1)
+		rte_panic("Failed to map producer port to prod_qid\n");
+
+	/* Dispatch to slaves */
+	*prod_data = (struct prod_data){.event_dev_id = id,
+					.event_port_id = prod_port,
+					.qid = qid0};
+	*cons_data = (struct cons_data){.event_dev_id = id,
+					.event_port_id = cons_port};
+
+	for (i = 0; i < num_workers; i++) {
+		struct worker_data *w = &worker_data[i];
+		w->event_dev_id = id;
+		w->qid = cons_qid;
+	}
+
+	if (rte_event_dev_start(id) < 0) {
+		printf("%d: Error with start call\n", __LINE__);
+		return -1;
+	}
+
+	return (uint8_t) id;
+}
+
+static void sighndlr(int sig)
+{
+	/* Ctlr-Z to dump stats */
+	if(sig == SIGTSTP) {
+		rte_mempool_dump(stdout, mp);
+		rte_event_dev_dump(stdout, 0);
+	}
+	/* Ctlr-C to exit */
+	if(sig == SIGINT)
+		rte_exit(0, "sigint arrived, quitting\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	signal(SIGINT , sighndlr);
+	signal(SIGTSTP, sighndlr);
+
+	struct prod_data prod_data = {0};
+	struct cons_data cons_data = {0};
+	struct worker_data *worker_data;
+	unsigned nworkers = 0;
+	unsigned num_ports;
+	int lcore_id;
+	int err;
+	int has_prod = 0;
+	int has_cons = 0;
+	int has_scheduler = 0;
+
+	err = rte_eal_init(argc, argv);
+	if (err < 0)
+		rte_panic("Invalid EAL arguments\n");
+
+	argc -= err;
+	argv += err;
+
+	/* Parse cli options*/
+	parse_app_args(argc, argv);
+
+	num_ports = rte_eth_dev_count();
+	if (num_ports == 0)
+		rte_panic("No ethernet ports found\n");
+
+	if (!quiet) {
+		printf("  Config:\n");
+		printf("\tports: %u\n", num_ports);
+		printf("\tworkers: %u\n", num_workers);
+		printf("\tpackets: %lu\n", num_packets);
+		printf("\tflows: %u\n", num_fids);
+		printf("\tpriorities: %u\n", num_priorities);
+		if (sched_type == RTE_SCHED_TYPE_ORDERED)
+			printf("\tqid0 type: ordered\n");
+		if (sched_type == RTE_SCHED_TYPE_ATOMIC)
+			printf("\tqid0 type: atomic\n");
+		printf("\n");
+	}
+
+	const unsigned cores_needed = num_workers +
+			/*main*/1 +
+			/*sched*/1 +
+			/*TX*/1 +
+			/*RX*/1;
+
+	if (!quiet) {
+		printf("Number of cores available: %u\n", rte_lcore_count());
+		printf("Number of cores to be used: %u\n", cores_needed);
+	}
+
+	if (rte_lcore_count() < cores_needed)
+		rte_panic("Too few cores\n");
+
+	const uint8_t ndevs = rte_event_dev_count();
+	if (ndevs == 0)
+		rte_panic("No event devs found. Do you need to pass in a --vdev flag?\n");
+	if (ndevs > 1)
+		fprintf(stderr, "Warning: More than one event dev, using idx 0");
+
+	worker_data = rte_calloc(0, num_workers, sizeof(worker_data[0]), 0);
+	if (worker_data == NULL)
+		rte_panic("rte_calloc failed\n");
+
+	uint8_t id = setup_event_dev(&prod_data, &cons_data, worker_data);
+	RTE_SET_USED(id);
+
+	prod_data.num_ports = num_ports;
+	init_ports(num_ports);
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		if (has_prod && has_cons && has_scheduler && nworkers == num_workers)
+			break;
+
+		if (!has_scheduler) {
+			err = rte_eal_remote_launch(scheduler, NULL, lcore_id);
+			if (err)
+				rte_panic("Failed to launch scheduler\n");
+
+			has_scheduler = 1;
+			continue;
+		}
+
+		if (nworkers < num_workers) {
+			err = rte_eal_remote_launch(worker, &worker_data[nworkers], lcore_id);
+			if (err)
+				rte_panic("Failed to launch worker%d\n", nworkers);
+			nworkers++;
+			continue;
+		}
+
+		if (!has_cons) {
+			err = rte_eal_remote_launch(consumer, &cons_data, lcore_id);
+			if (err)
+				rte_panic("Failed to launch consumer\n");
+			has_cons = 1;
+			continue;
+		}
+
+		if (!has_prod) {
+			err = rte_eal_remote_launch(producer, &prod_data, lcore_id);
+			if (err)
+				rte_panic("Failed to launch producer\n");
+			has_prod = 1;
+			continue;
+		}
+	}
+
+	rte_eal_mp_wait_lcore();
+
+	/* Cleanup done automatically by kernel on app exit */
+
+	return 0;
+}
-- 
2.7.4



More information about the dev mailing list