[dpdk-dev] [PATCH v3 5/5] app/test-flow-perf: add packet forwarding	support
    Wisam Jaddo 
    wisamm at mellanox.com
       
    Thu Apr 30 11:32:49 CEST 2020
    
    
  
Introduce packet forwarding support to the app to do
some performance measurements.
The measurements are reported in term of packet per
second unit. The forwarding will start after the end
of insertion/deletion operations.
The support has single and multi core performance measurements.
Signed-off-by: Wisam Jaddo <wisamm at mellanox.com>
---
 app/test-flow-perf/main.c      | 300 +++++++++++++++++++++++++++++++++
 doc/guides/tools/flow-perf.rst |   6 +
 2 files changed, 306 insertions(+)
diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c
index 95435910de..2596d05dc2 100644
--- a/app/test-flow-perf/main.c
+++ b/app/test-flow-perf/main.c
@@ -60,14 +60,45 @@ static uint8_t flow_group;
 static uint16_t flow_items;
 static uint16_t flow_actions;
 static uint8_t flow_attrs;
+
 static volatile bool force_quit;
 static volatile bool dump_iterations;
 static volatile bool dump_socket_mem_flag;
 static volatile bool delete_flag;
+static volatile bool enable_fwd;
+
 static struct rte_mempool *mbuf_mp;
 static uint32_t nb_lcores;
 static uint32_t flows_count;
 static uint32_t iterations_number;
+static uint32_t nb_lcores;
+
+#define MAX_PKT_BURST 32
+#define LCORE_MODE_PKT 1
+#define LCORE_MODE_STATS 2
+#define MAX_STREAMS 64
+#define MAX_LCORES 64
+
+struct stream {
+	int tx_port;
+	int tx_queue;
+	int rx_port;
+	int rx_queue;
+};
+
+struct lcore_info {
+	int mode;
+	int streams_nb;
+	struct stream streams[MAX_STREAMS];
+	/* stats */
+	uint64_t tx_pkts;
+	uint64_t tx_drops;
+	uint64_t rx_pkts;
+	struct rte_mbuf *pkts[MAX_PKT_BURST];
+} __attribute__((__aligned__(64))); /* let it be cacheline aligned */
+
+
+static struct lcore_info lcore_infos[MAX_LCORES];
 
 static void usage(char *progname)
 {
@@ -80,6 +111,8 @@ static void usage(char *progname)
 	printf("  --deletion-rate: Enable deletion rate"
 		" calculations\n");
 	printf("  --dump-socket-mem: to dump all socket memory\n");
+	printf("  --enable-fwd: to enable packets forwarding"
+		" after insertion\n");
 
 	printf("To set flow attributes:\n");
 	printf("  --ingress: set ingress attribute in flows\n");
@@ -130,6 +163,7 @@ args_parse(int argc, char **argv)
 		{ "dump-iterations",            0, 0, 0 },
 		{ "deletion-rate",              0, 0, 0 },
 		{ "dump-socket-mem",            0, 0, 0 },
+		{ "enable-fwd",                 0, 0, 0 },
 		/* Attributes */
 		{ "ingress",                    0, 0, 0 },
 		{ "egress",                     0, 0, 0 },
@@ -315,6 +349,8 @@ args_parse(int argc, char **argv)
 				delete_flag = true;
 			if (!strcmp(lgopts[opt_idx].name, "dump-socket-mem"))
 				dump_socket_mem_flag = true;
+			if (!strcmp(lgopts[opt_idx].name, "enable-fwd"))
+				enable_fwd = true;
 			break;
 		default:
 			usage(argv[0]);
@@ -582,6 +618,265 @@ signal_handler(int signum)
 	}
 }
 
+static inline uint16_t
+do_rx(struct lcore_info *li, uint16_t rx_port, uint16_t rx_queue)
+{
+	uint16_t cnt = 0;
+	cnt = rte_eth_rx_burst(rx_port, rx_queue, li->pkts, MAX_PKT_BURST);
+	li->rx_pkts += cnt;
+	return cnt;
+}
+
+static inline void
+do_tx(struct lcore_info *li, uint16_t cnt, uint16_t tx_port,
+			uint16_t tx_queue)
+{
+	uint16_t nr_tx = 0;
+	uint16_t i;
+
+	nr_tx = rte_eth_tx_burst(tx_port, tx_queue, li->pkts, cnt);
+	li->tx_pkts  += nr_tx;
+	li->tx_drops += cnt - nr_tx;
+
+	for (i = nr_tx; i < cnt; i++)
+		rte_pktmbuf_free(li->pkts[i]);
+}
+
+/*
+ * Method to convert numbers into pretty numbers that easy
+ * to read. The design here is to add comma after each three
+ * digits and set all of this inside buffer.
+ *
+ * For example if n = 1799321, the output will be
+ * 1,799,321 after this method which is easier to read.
+ */
+static char *
+pretty_number(uint64_t n, char *buf)
+{
+	char p[6][4];
+	int i = 0;
+	int off = 0;
+
+	while (n > 1000) {
+		sprintf(p[i], "%03d", (int)(n % 1000));
+		n /= 1000;
+		i += 1;
+	}
+
+	sprintf(p[i++], "%d", (int)n);
+
+	while (i--)
+		off += sprintf(buf + off, "%s,", p[i]);
+	buf[strlen(buf) - 1] = '\0';
+
+	return buf;
+}
+
+static void
+packet_per_second_stats(void)
+{
+	struct lcore_info *old;
+	struct lcore_info *li, *oli;
+	int nr_lines = 0;
+	int i;
+
+	old = rte_zmalloc("old",
+		sizeof(struct lcore_info) * MAX_LCORES, 0);
+	if (old == NULL)
+		rte_exit(EXIT_FAILURE, "No Memory available!");
+
+	memcpy(old, lcore_infos,
+		sizeof(struct lcore_info) * MAX_LCORES);
+
+	while (!force_quit) {
+		uint64_t total_tx_pkts = 0;
+		uint64_t total_rx_pkts = 0;
+		uint64_t total_tx_drops = 0;
+		uint64_t tx_delta, rx_delta, drops_delta;
+		char buf[3][32];
+		int nr_valid_core = 0;
+
+		sleep(1);
+
+		if (nr_lines) {
+			char go_up_nr_lines[16];
+
+			sprintf(go_up_nr_lines, "%c[%dA\r", 27, nr_lines);
+			printf("%s\r", go_up_nr_lines);
+		}
+
+		printf("\n%6s %16s %16s %16s\n", "core", "tx", "tx drops", "rx");
+		printf("%6s %16s %16s %16s\n", "------", "----------------",
+			"----------------", "----------------");
+		nr_lines = 3;
+		for (i = 0; i < MAX_LCORES; i++) {
+			li  = &lcore_infos[i];
+			oli = &old[i];
+			if (li->mode != LCORE_MODE_PKT)
+				continue;
+
+			tx_delta    = li->tx_pkts  - oli->tx_pkts;
+			rx_delta    = li->rx_pkts  - oli->rx_pkts;
+			drops_delta = li->tx_drops - oli->tx_drops;
+			printf("%6d %16s %16s %16s\n", i,
+				pretty_number(tx_delta,    buf[0]),
+				pretty_number(drops_delta, buf[1]),
+				pretty_number(rx_delta,    buf[2]));
+
+			total_tx_pkts  += tx_delta;
+			total_rx_pkts  += rx_delta;
+			total_tx_drops += drops_delta;
+
+			nr_valid_core++;
+			nr_lines += 1;
+		}
+
+		if (nr_valid_core > 1) {
+			printf("%6s %16s %16s %16s\n", "total",
+				pretty_number(total_tx_pkts,  buf[0]),
+				pretty_number(total_tx_drops, buf[1]),
+				pretty_number(total_rx_pkts,  buf[2]));
+			nr_lines += 1;
+		}
+
+		memcpy(old, lcore_infos,
+			sizeof(struct lcore_info) * MAX_LCORES);
+	}
+}
+
+static int
+start_forwarding(void *data __rte_unused)
+{
+	int lcore = rte_lcore_id();
+	int stream_id;
+	uint16_t cnt;
+	struct lcore_info *li = &lcore_infos[lcore];
+
+	if (!li->mode)
+		return 0;
+
+	if (li->mode == LCORE_MODE_STATS) {
+		printf(":: started stats on lcore %u\n", lcore);
+		packet_per_second_stats();
+		return 0;
+	}
+
+	while (!force_quit)
+		for (stream_id = 0; stream_id < MAX_STREAMS; stream_id++) {
+			if (li->streams[stream_id].rx_port == -1)
+				continue;
+
+			cnt = do_rx(li,
+					li->streams[stream_id].rx_port,
+					li->streams[stream_id].rx_queue);
+			if (cnt)
+				do_tx(li, cnt,
+					li->streams[stream_id].tx_port,
+					li->streams[stream_id].tx_queue);
+		}
+	return 0;
+}
+
+static void
+init_lcore_info(void)
+{
+	int i, j;
+	unsigned int lcore;
+	uint16_t nr_port;
+	uint16_t queue;
+	int port;
+	int stream_id = 0;
+	int streams_per_core;
+	int unassigned_streams;
+	int nb_fwd_streams;
+	nr_port = rte_eth_dev_count_avail();
+
+	/* First logical core is reserved for stats printing */
+	lcore = rte_get_next_lcore(-1, 0, 0);
+	lcore_infos[lcore].mode = LCORE_MODE_STATS;
+
+	/*
+	 * Initialize all cores
+	 * All cores at first must have -1 value in all streams
+	 * This means that this stream is not used, or not set
+	 * yet.
+	 */
+	for (i = 0; i < MAX_LCORES; i++)
+		for (j = 0; j < MAX_STREAMS; j++) {
+			lcore_infos[i].streams[j].tx_port = -1;
+			lcore_infos[i].streams[j].rx_port = -1;
+			lcore_infos[i].streams[j].tx_queue = -1;
+			lcore_infos[i].streams[j].rx_queue = -1;
+			lcore_infos[i].streams_nb = 0;
+		}
+
+	/*
+	 * Calculate the total streams count.
+	 * Also distribute those streams count between the available
+	 * logical cores except first core, since it's reserved for
+	 * stats prints.
+	 */
+	nb_fwd_streams = nr_port * RXQs;
+	if ((int)(nb_lcores - 1) >= nb_fwd_streams)
+		for (i = 0; i < (int)(nb_lcores - 1); i++) {
+			lcore = rte_get_next_lcore(lcore, 0, 0);
+			lcore_infos[lcore].streams_nb = 1;
+		}
+	else {
+		streams_per_core = nb_fwd_streams / (nb_lcores - 1);
+		unassigned_streams = nb_fwd_streams % (nb_lcores - 1);
+		for (i = 0; i < (int)(nb_lcores - 1); i++) {
+			lcore = rte_get_next_lcore(lcore, 0, 0);
+			lcore_infos[lcore].streams_nb = streams_per_core;
+			if (unassigned_streams) {
+				lcore_infos[lcore].streams_nb++;
+				unassigned_streams--;
+			}
+		}
+	}
+
+	/*
+	 * Set the streams for the cores according to each logical
+	 * core stream count.
+	 * The streams is built on the design of what received should
+	 * forward as well, this means that if you received packets on
+	 * port 0 queue 0 then the same queue should forward the
+	 * packets, using the same logical core.
+	 */
+	lcore = rte_get_next_lcore(-1, 0, 0);
+	for (port = 0; port < nr_port; port++) {
+		/** Create FWD stream **/
+		for (queue = 0; queue < RXQs; queue++) {
+			if (!lcore_infos[lcore].streams_nb ||
+				!(stream_id % lcore_infos[lcore].streams_nb)) {
+				lcore = rte_get_next_lcore(lcore, 0, 0);
+				lcore_infos[lcore].mode = LCORE_MODE_PKT;
+				stream_id = 0;
+			}
+			lcore_infos[lcore].streams[stream_id].rx_queue = queue;
+			lcore_infos[lcore].streams[stream_id].tx_queue = queue;
+			lcore_infos[lcore].streams[stream_id].rx_port = port;
+			lcore_infos[lcore].streams[stream_id].tx_port = port;
+			stream_id++;
+		}
+	}
+
+	/** Print all streams **/
+	printf(":: Stream -> core id[N]: (rx_port, rx_queue)->(tx_port, tx_queue)\n");
+	for (i = 0; i < MAX_LCORES; i++)
+		for (j = 0; j < MAX_STREAMS; j++) {
+			/** No streams for this core **/
+			if (lcore_infos[i].streams[j].tx_port == -1)
+				break;
+			printf("Stream -> core id[%d]: (%d,%d)->(%d,%d)\n",
+				i,
+				lcore_infos[i].streams[j].rx_port,
+				lcore_infos[i].streams[j].rx_queue,
+				lcore_infos[i].streams[j].tx_port,
+				lcore_infos[i].streams[j].tx_queue);
+		}
+}
+
 static void
 init_port(void)
 {
@@ -757,6 +1052,11 @@ main(int argc, char **argv)
 		fprintf(stdout, ":: Memory allocation change(M): %.6lf\n",
 		(alloc - last_alloc) / 1.0e6);
 
+	if (enable_fwd) {
+		init_lcore_info();
+		rte_eal_mp_remote_launch(start_forwarding, NULL, CALL_MASTER);
+	}
+
 	RTE_LCORE_FOREACH_SLAVE(lcore_id)
 
 	if (rte_eal_wait_lcore(lcore_id) < 0)
diff --git a/doc/guides/tools/flow-perf.rst b/doc/guides/tools/flow-perf.rst
index 28d452fd06..ecd760de81 100644
--- a/doc/guides/tools/flow-perf.rst
+++ b/doc/guides/tools/flow-perf.rst
@@ -21,6 +21,8 @@ have a multi core insertion rate measurement support in the app.
 The application also provide the ability to measure rte flow deletion rate,
 in addition to memory consumption before and after the flows creation.
 
+The app supports single and multi core performance measurements.
+
 
 Compiling the Application
 =========================
@@ -98,6 +100,10 @@ The command line options are:
 *	``--dump-socket-mem``
 	Dump the memory stats for each socket before the insertion and after.
 
+*	``enable-fwd``
+	Enable packets forwarding after insertion/deletion operations.
+
+
 Attributes:
 
 *	``--ingress``
-- 
2.17.1
    
    
More information about the dev
mailing list