[dpdk-dev] [PATCH v1 2/2] example: distributor app modified to use burstAPI

David Hunt david.hunt at intel.com
Thu Dec 1 05:50:21 CET 2016


New stats to show details of throughput per second. Runs on a separate
core to the rx thread so as not to affect performance.
Thread for Stats, rx, tx, and distributor, all other cores in coremask
will be used for workers.
There's some "#if 0" lines in the code that allow the rx + dist to be
run on the same core, and a few more commented out lines to allow
skipping of flow matching algo, etc. Code is comment in the appropriate
location

Signed-off-by: David Hunt <david.hunt at intel.com>
---
 examples/distributor/main.c | 489 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 380 insertions(+), 109 deletions(-)

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index 537cee1..fcac807 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -1,8 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
- *   All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
  *   modification, are permitted provided that the following conditions
@@ -31,6 +30,8 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define BURST_API 1
+
 #include <stdint.h>
 #include <inttypes.h>
 #include <unistd.h>
@@ -43,39 +44,87 @@
 #include <rte_malloc.h>
 #include <rte_debug.h>
 #include <rte_prefetch.h>
+#if BURST_API
+#include <rte_distributor_burst.h>
+#else
 #include <rte_distributor.h>
+#endif
 
-#define RX_RING_SIZE 256
-#define TX_RING_SIZE 512
+#define RX_QUEUE_SIZE 512
+#define TX_QUEUE_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
+#define MBUF_CACHE_SIZE 128
+#if BURST_API
+#define BURST_SIZE 64
+#define SCHED_RX_RING_SZ 8192
+#define SCHED_TX_RING_SZ 65536
+#else
 #define BURST_SIZE 32
-#define RTE_RING_SZ 1024
+#define SCHED_RX_RING_SZ 1024
+#define SCHED_TX_RING_SZ 1024
+#endif
+#define BURST_SIZE_TX 32
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
+#define ANSI_COLOR_RED     "\x1b[31m"
+#define ANSI_COLOR_RESET   "\x1b[0m"
+
 /* mask of enabled ports */
 static uint32_t enabled_port_mask;
 volatile uint8_t quit_signal;
 volatile uint8_t quit_signal_rx;
+volatile uint8_t quit_signal_dist;
+volatile uint8_t quit_signal_work;
 
 static volatile struct app_stats {
 	struct {
 		uint64_t rx_pkts;
 		uint64_t returned_pkts;
 		uint64_t enqueued_pkts;
+		uint64_t enqdrop_pkts;
 	} rx __rte_cache_aligned;
+	int pad1 __rte_cache_aligned;
+
+	struct {
+		uint64_t in_pkts;
+		uint64_t ret_pkts;
+		uint64_t sent_pkts;
+		uint64_t enqdrop_pkts;
+	} dist __rte_cache_aligned;
+	int pad2 __rte_cache_aligned;
 
 	struct {
 		uint64_t dequeue_pkts;
 		uint64_t tx_pkts;
+		uint64_t enqdrop_pkts;
 	} tx __rte_cache_aligned;
+	int pad3 __rte_cache_aligned;
+
+	uint64_t worker_pkts[64] __rte_cache_aligned;
+
+	int pad4 __rte_cache_aligned;
+
+	uint64_t worker_bursts[64][8] __rte_cache_aligned;
+
+	int pad5 __rte_cache_aligned;
+
+	uint64_t port_rx_pkts[64] __rte_cache_aligned;
+	uint64_t port_tx_pkts[64] __rte_cache_aligned;
 } app_stats;
 
+struct app_stats prev_app_stats;
+
 static const struct rte_eth_conf port_conf_default = {
 	.rxmode = {
 		.mq_mode = ETH_MQ_RX_RSS,
 		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 1, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
 	},
 	.txmode = {
 		.mq_mode = ETH_MQ_TX_NONE,
@@ -93,6 +142,8 @@ struct output_buffer {
 	struct rte_mbuf *mbufs[BURST_SIZE];
 };
 
+static void print_stats(void);
+
 /*
  * Initialises a given port using global settings and with the rx buffers
  * coming from the mbuf_pool passed as parameter
@@ -101,9 +152,13 @@ static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
 	struct rte_eth_conf port_conf = port_conf_default;
-	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
-	int retval;
+	const uint16_t rxRings = 1;
+	uint16_t txRings = rte_lcore_count() - 1;
 	uint16_t q;
+	int retval;
+
+	if (txRings > RTE_MAX_ETHPORTS)
+		txRings = RTE_MAX_ETHPORTS;
 
 	if (port >= rte_eth_dev_count())
 		return -1;
@@ -113,7 +168,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 		return retval;
 
 	for (q = 0; q < rxRings; q++) {
-		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+		retval = rte_eth_rx_queue_setup(port, q, RX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL, mbuf_pool);
 		if (retval < 0)
@@ -121,7 +176,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	}
 
 	for (q = 0; q < txRings; q++) {
-		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+		retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL);
 		if (retval < 0)
@@ -134,7 +189,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 	struct rte_eth_link link;
 	rte_eth_link_get_nowait(port, &link);
-	if (!link.link_status) {
+	while (!link.link_status) {
+		printf("Waiting for Link up on port %"PRIu8"\n", port);
 		sleep(1);
 		rte_eth_link_get_nowait(port, &link);
 	}
@@ -161,40 +217,51 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 struct lcore_params {
 	unsigned worker_id;
 	struct rte_distributor *d;
-	struct rte_ring *r;
+	struct rte_ring *rx_dist_ring;
+	struct rte_ring *dist_tx_ring;
 	struct rte_mempool *mem_pool;
 };
 
-static int
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	const unsigned num_workers = rte_lcore_count() - 2;
-	unsigned i;
-	struct rte_mbuf *bufs[num_workers];
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
-	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
-		printf("line %d: Error getting mbufs from pool\n", __LINE__);
-		return -1;
+	if (unlikely(nb_tx < outbuf->count)) {
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
+		do {
+			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
+		} while (++nb_tx < outbuf->count);
 	}
+	outbuf->count = 0;
+}
 
-	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+static inline void
+flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+{
+	uint8_t outp;
 
-	rte_distributor_process(d, bufs, num_workers);
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+	for (outp = 0; outp < nb_ports; outp++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << outp)) == 0)
+			continue;
 
-	return 0;
+		if (tx_buffers[outp].count == 0)
+			continue;
+
+		flush_one_port(&tx_buffers[outp], outp);
+	}
 }
 
 static int
 lcore_rx(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
-	struct rte_mempool *mem_pool = p->mem_pool;
-	struct rte_ring *r = p->r;
 	const uint8_t nb_ports = rte_eth_dev_count();
 	const int socket_id = rte_socket_id();
 	uint8_t port;
+	struct rte_mbuf *bufs[BURST_SIZE*2];
 
 	for (port = 0; port < nb_ports; port++) {
 		/* skip ports that are not enabled */
@@ -210,6 +277,7 @@ lcore_rx(struct lcore_params *p)
 
 	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
 	port = 0;
+
 	while (!quit_signal_rx) {
 
 		/* skip ports that are not enabled */
@@ -218,7 +286,7 @@ lcore_rx(struct lcore_params *p)
 				port = 0;
 			continue;
 		}
-		struct rte_mbuf *bufs[BURST_SIZE*2];
+
 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
 				BURST_SIZE);
 		if (unlikely(nb_rx == 0)) {
@@ -228,19 +296,46 @@ lcore_rx(struct lcore_params *p)
 		}
 		app_stats.rx.rx_pkts += nb_rx;
 
+/*
+ * You can run the distributor on the rx core with this code. Returned
+ * packets are then send straight to the tx core.
+ */
+#if 0
+
+#if BURST_API
+		rte_distributor_process_burst(d, bufs, nb_rx);
+		const uint16_t nb_ret = rte_distributor_returned_pkts_burst(d,
+				bufs, BURST_SIZE*2);
+#else
 		rte_distributor_process(d, bufs, nb_rx);
 		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
 				bufs, BURST_SIZE*2);
+#endif
+
 		app_stats.rx.returned_pkts += nb_ret;
 		if (unlikely(nb_ret == 0)) {
 			if (++port == nb_ports)
 				port = 0;
 			continue;
 		}
-
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		struct rte_ring *tx_ring = p->dist_tx_ring;
+		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+				(void *)bufs, nb_ret);
+#else
+		uint16_t nb_ret = nb_rx;
+		/*
+		 * Swap the following two lines if you want the rx traffic
+		 * to go directly to tx, no distribution.
+		 */
+		struct rte_ring *out_ring = p->rx_dist_ring;
+		//struct rte_ring *out_ring = p->dist_tx_ring;
+
+		uint16_t sent = rte_ring_enqueue_burst(out_ring,
+				(void *)bufs, nb_ret);
+#endif
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -249,54 +344,86 @@ lcore_rx(struct lcore_params *p)
 		if (++port == nb_ports)
 			port = 0;
 	}
-	rte_distributor_process(d, NULL, 0);
-	/* flush distributor to bring to known state */
-	rte_distributor_flush(d);
 	/* set worker & tx threads quit flag */
+	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
 	quit_signal = 1;
-	/*
-	 * worker threads may hang in get packet as
-	 * distributor process is not running, just make sure workers
-	 * get packets till quit_signal is actually been
-	 * received and they gracefully shutdown
-	 */
-	if (quit_workers(d, mem_pool) != 0)
-		return -1;
-	/* rx thread should quit at last */
 	return 0;
 }
 
-static inline void
-flush_one_port(struct output_buffer *outbuf, uint8_t outp)
+static int
+lcore_distributor(struct lcore_params *p)
 {
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
+	struct rte_ring *in_r = p->rx_dist_ring;
+	struct rte_ring *out_r = p->dist_tx_ring;
+	struct rte_mbuf *bufs[BURST_SIZE * 4];
+	struct rte_distributor *d = p->d;
 
-	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
-		do {
-			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
-		} while (++nb_tx < outbuf->count);
-	}
-	outbuf->count = 0;
-}
+	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+	while (!quit_signal_dist) {
 
-static inline void
-flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
-{
-	uint8_t outp;
-	for (outp = 0; outp < nb_ports; outp++) {
-		/* skip ports that are not enabled */
-		if ((enabled_port_mask & (1 << outp)) == 0)
-			continue;
+		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+				(void *)bufs, BURST_SIZE*1);
 
-		if (tx_buffers[outp].count == 0)
-			continue;
+		if (nb_rx) {
+			app_stats.dist.in_pkts += nb_rx;
+/*
+ * This #if allows you to bypass the distributor. Incoming packets may be
+ * sent straight to the tx ring.
+ */
+#if 1
+
+#if BURST_API
+			/* Distribute the packets */
+			rte_distributor_process_burst(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts_burst(d,
+					bufs, BURST_SIZE*2);
+#else
+			/* Distribute the packets */
+			rte_distributor_process(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts(d,
+					bufs, BURST_SIZE*2);
+#endif
+
+#else
+			/* Bypass the distributor */
+			const unsigned int xor_val = (rte_eth_dev_count() > 1);
+			/* Touch the mbuf by xor'ing the port */
+			for (unsigned int i = 0; i < nb_rx; i++)
+				bufs[i]->port ^= xor_val;
+
+			const uint16_t nb_ret = nb_rx;
+#endif
+			if (unlikely(nb_ret == 0))
+				continue;
 
-		flush_one_port(&tx_buffers[outp], outp);
+			app_stats.dist.ret_pkts += nb_ret;
+
+			uint16_t sent = rte_ring_enqueue_burst(out_r,
+					(void *)bufs, nb_ret);
+			app_stats.dist.sent_pkts += sent;
+			if (unlikely(sent < nb_ret)) {
+				app_stats.dist.enqdrop_pkts += nb_ret - sent;
+				RTE_LOG(DEBUG, DISTRAPP,
+					"%s:Packet loss due to full out ring\n",
+					__func__);
+				while (sent < nb_ret)
+					rte_pktmbuf_free(bufs[sent++]);
+			}
+		}
 	}
+	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+	quit_signal_work = 1;
+
+#if BURST_API
+	/* Unblock any returns so workers can exit */
+	rte_distributor_clear_returns_burst(d);
+#endif
+	quit_signal_rx = 1;
+	return 0;
 }
 
 static int
@@ -327,9 +454,9 @@ lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -358,11 +485,12 @@ lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -371,7 +499,7 @@ int_handler(int sig_num)
 {
 	printf("Exiting on signal %d\n", sig_num);
 	/* set quit flag for rx thread to exit */
-	quit_signal_rx = 1;
+	quit_signal_dist = 1;
 }
 
 static void
@@ -379,24 +507,88 @@ print_stats(void)
 {
 	struct rte_eth_stats eth_stats;
 	unsigned i;
-
-	printf("\nRX thread stats:\n");
-	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
-	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
-	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
-
-	printf("\nTX thread stats:\n");
-	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
-	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
+	const unsigned int num_workers = rte_lcore_count() - 4;
 
 	for (i = 0; i < rte_eth_dev_count(); i++) {
 		rte_eth_stats_get(i, &eth_stats);
-		printf("\nPort %u stats:\n", i);
-		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
-		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
-		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
-		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
-		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+		app_stats.port_rx_pkts[i] = eth_stats.ipackets;
+		app_stats.port_tx_pkts[i] = eth_stats.opackets;
+	}
+
+	printf("\n\nRX Thread:\n");
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsin : %5.2f\n", i,
+				(app_stats.port_rx_pkts[i] -
+				prev_app_stats.port_rx_pkts[i])/1000000.0);
+		prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
+	}
+	printf(" - Received:    %5.2f\n",
+			(app_stats.rx.rx_pkts -
+			prev_app_stats.rx.rx_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.rx.returned_pkts -
+			prev_app_stats.rx.returned_pkts)/1000000.0);
+	printf(" - Enqueued:    %5.2f\n",
+			(app_stats.rx.enqueued_pkts -
+			prev_app_stats.rx.enqueued_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.rx.enqdrop_pkts -
+			prev_app_stats.rx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("Distributor thread:\n");
+	printf(" - In:          %5.2f\n",
+			(app_stats.dist.in_pkts -
+			prev_app_stats.dist.in_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.dist.ret_pkts -
+			prev_app_stats.dist.ret_pkts)/1000000.0);
+	printf(" - Sent:        %5.2f\n",
+			(app_stats.dist.sent_pkts -
+			prev_app_stats.dist.sent_pkts)/1000000.0);
+	printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.dist.enqdrop_pkts -
+			prev_app_stats.dist.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("TX thread:\n");
+	printf(" - Dequeued:    %5.2f\n",
+			(app_stats.tx.dequeue_pkts -
+			prev_app_stats.tx.dequeue_pkts)/1000000.0);
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsout: %5.2f\n",
+				i, (app_stats.port_tx_pkts[i] -
+				prev_app_stats.port_tx_pkts[i])/1000000.0);
+		prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
+	}
+	printf(" - Transmitted: %5.2f\n",
+			(app_stats.tx.tx_pkts -
+			prev_app_stats.tx.tx_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.tx.enqdrop_pkts -
+			prev_app_stats.tx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
+	prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
+	prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
+	prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
+	prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
+	prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
+	prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
+	prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
+	prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
+	prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
+	prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
+
+	for (i = 0; i < num_workers; i++) {
+		printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
+				(app_stats.worker_pkts[i] -
+				prev_app_stats.worker_pkts[i])/1000000.0);
+		for (int j = 0; j < 8; j++)
+			printf("%ld ", app_stats.worker_bursts[i][j]);
+		printf("\n");
+		prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
 	}
 }
 
@@ -405,18 +597,48 @@ lcore_worker(struct lcore_params *p)
 {
 	struct rte_distributor *d = p->d;
 	const unsigned id = p->worker_id;
+	unsigned int num = 0;
+
 	/*
 	 * for single port, xor_val will be zero so we won't modify the output
 	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
 	 */
 	const unsigned xor_val = (rte_eth_dev_count() > 1);
-	struct rte_mbuf *buf = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (int i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
-	while (!quit_signal) {
-		buf = rte_distributor_get_pkt(d, id, buf);
-		buf->port ^= xor_val;
+	while (!quit_signal_work) {
+
+#if BURST_API
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+		/* Do a little bit of work for each packet */
+		for (unsigned int i = 0; i < num; i++) {
+			uint64_t t = __rdtsc()+100;
+
+			while (__rdtsc() < t)
+				rte_pause();
+			buf[i]->port ^= xor_val;
+		}
+#else
+		buf[0] = rte_distributor_get_pkt(d, id, buf[0]);
+		uint64_t t = __rdtsc() + 10;
+
+		while (__rdtsc() < t)
+			rte_pause();
+		buf[0]->port ^= xor_val;
+#endif
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
+	printf("\nCore %u exiting worker task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -497,11 +719,13 @@ main(int argc, char *argv[])
 {
 	struct rte_mempool *mbuf_pool;
 	struct rte_distributor *d;
-	struct rte_ring *output_ring;
+	struct rte_ring *dist_tx_ring;
+	struct rte_ring *rx_dist_ring;
 	unsigned lcore_id, worker_id = 0;
 	unsigned nb_ports;
 	uint8_t portid;
 	uint8_t nb_ports_available;
+	uint64_t t, freq;
 
 	/* catch ctrl-c so we can print on exit */
 	signal(SIGINT, int_handler);
@@ -518,10 +742,12 @@ main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 3)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 3 logical cores to run:\n"
-				"1 lcore for packet RX and distribution\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
 				"and at least 1 lcore for worker threads\n");
 
@@ -560,41 +786,86 @@ main(int argc, char *argv[])
 				"All available ports are disabled. Please set portmask.\n");
 	}
 
+#if BURST_API
+	d = rte_distributor_create_burst("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 4);
+#else
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 2);
+			rte_lcore_count() - 4);
+#endif
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
 
 	/*
-	 * scheduler ring is read only by the transmitter core, but written to
-	 * by multiple threads
+	 * scheduler ring is read by the transmitter core, and written to
+	 * by scheduler core
 	 */
-	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
-			rte_socket_id(), RING_F_SC_DEQ);
-	if (output_ring == NULL)
+	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (dist_tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (rx_dist_ring == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-		if (worker_id == rte_lcore_count() - 2)
+		if (worker_id == rte_lcore_count() - 3) {
+			printf("Starting distributor on lcore_id %d\n",
+					lcore_id);
+			/* distributor core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d,
+					rx_dist_ring, dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch(
+					(lcore_function_t *)lcore_distributor,
+					p, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 4) {
+			printf("Starting tx  on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
-					output_ring, lcore_id);
-		else {
+					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
+		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
 				rte_panic("malloc failure\n");
-			*p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool};
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
 
 			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
 					p, lcore_id);
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, output_ring, mbuf_pool};
 
-	if (lcore_rx(&p) != 0)
-		return -1;
+	freq = rte_get_timer_hz();
+	t = __rdtsc() + freq;
+	while (!quit_signal_dist) {
+		if (t < __rdtsc()) {
+			print_stats();
+			t = _rdtsc() + freq;
+		}
+	}
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 		if (rte_eal_wait_lcore(lcore_id) < 0)
-- 
2.7.4



More information about the dev mailing list