[dpdk-dev] [PATCH v6 4/4] examples/ntb: support more functions for NTB

Xiaoyun Li xiaoyun.li at intel.com
Thu Sep 26 05:20:22 CEST 2019


Support to transmit files between two systems.
Support iofwd between one ethdev and NTB device.
Support rxonly and txonly for NTB device.
Support to set forwarding mode as file-trans, txonly,
rxonly or iofwd.
Support to show/clear port stats and throughput.

Signed-off-by: Xiaoyun Li <xiaoyun.li at intel.com>
---
 doc/guides/sample_app_ug/ntb.rst |   59 +-
 examples/ntb/meson.build         |    3 +
 examples/ntb/ntb_fwd.c           | 1298 +++++++++++++++++++++++++++---
 3 files changed, 1232 insertions(+), 128 deletions(-)

diff --git a/doc/guides/sample_app_ug/ntb.rst b/doc/guides/sample_app_ug/ntb.rst
index 079242175..df16af86c 100644
--- a/doc/guides/sample_app_ug/ntb.rst
+++ b/doc/guides/sample_app_ug/ntb.rst
@@ -5,8 +5,17 @@ NTB Sample Application
 ======================
 
 The ntb sample application shows how to use ntb rawdev driver.
-This sample provides interactive mode to transmit file between
-two hosts.
+This sample provides interactive mode to do packet based processing
+between two systems.
+
+This sample supports 4 types of packet forwarding mode.
+
+* ``file-trans``: transmit files between two systems. The sample will
+  be polling to receive files from the peer and save the file as
+  ``ntb_recv_file[N]``, [N] represents the number of received file.
+* ``rxonly``: NTB receives packets but doesn't transmit them.
+* ``txonly``: NTB generates and transmits packets without receiving any.
+* ``iofwd``: iofwd between NTB device and ethdev.
 
 Compiling the Application
 -------------------------
@@ -29,6 +38,40 @@ Refer to the *DPDK Getting Started Guide* for general information on
 running applications and the Environment Abstraction Layer (EAL)
 options.
 
+Command-line Options
+--------------------
+
+The application supports the following command-line options.
+
+* ``--buf-size=N``
+
+  Set the data size of the mbufs used to N bytes, where N < 65536.
+  The default value is 2048.
+
+* ``--fwd-mode=mode``
+
+  Set the packet forwarding mode as ``file-trans``, ``txonly``,
+  ``rxonly`` or ``iofwd``.
+
+* ``--nb-desc=N``
+
+  Set number of descriptors of queue as N, namely queue size,
+  where 64 <= N <= 1024. The default value is 1024.
+
+* ``--txfreet=N``
+
+  Set the transmit free threshold of TX rings to N, where 0 <= N <=
+  the value of ``--nb-desc``. The default value is 256.
+
+* ``--burst=N``
+
+  Set the number of packets per burst to N, where 1 <= N <= 32.
+  The default value is 32.
+
+* ``--qp=N``
+
+  Set the number of queues as N, where qp > 0. The default value is 1.
+
 Using the application
 ---------------------
 
@@ -41,7 +84,11 @@ The application is console-driven using the cmdline DPDK interface:
 From this interface the available commands and descriptions of what
 they do as as follows:
 
-* ``send [filepath]``: Send file to the peer host.
-* ``receive [filepath]``: Receive file to [filepath]. Need the peer
-  to send file successfully first.
-* ``quit``: Exit program
+* ``send [filepath]``: Send file to the peer host. Need to be in
+  file-trans forwarding mode first.
+* ``start``: Start transmission.
+* ``stop``: Stop transmission.
+* ``show/clear port stats``: Show/Clear port stats and throughput.
+* ``set fwd file-trans/rxonly/txonly/iofwd``: Set packet forwarding
+  mode.
+* ``quit``: Exit program.
diff --git a/examples/ntb/meson.build b/examples/ntb/meson.build
index 9a6288f4f..f5435fe12 100644
--- a/examples/ntb/meson.build
+++ b/examples/ntb/meson.build
@@ -14,3 +14,6 @@ cflags += ['-D_FILE_OFFSET_BITS=64']
 sources = files(
 	'ntb_fwd.c'
 )
+if dpdk_conf.has('RTE_LIBRTE_PMD_NTB_RAWDEV')
+	deps += 'rawdev_ntb'
+endif
diff --git a/examples/ntb/ntb_fwd.c b/examples/ntb/ntb_fwd.c
index f8c970cdb..b1ea71c8f 100644
--- a/examples/ntb/ntb_fwd.c
+++ b/examples/ntb/ntb_fwd.c
@@ -14,21 +14,103 @@
 #include <cmdline.h>
 #include <rte_common.h>
 #include <rte_rawdev.h>
+#include <rte_ethdev.h>
+#include <rte_malloc.h>
 #include <rte_lcore.h>
+#include <rte_cycles.h>
+#include <rte_pmd_ntb.h>
 
-#define NTB_DRV_NAME_LEN	7
-static uint64_t max_file_size = 0x400000;
+/* Per-port statistics struct */
+struct ntb_port_statistics {
+	uint64_t tx;
+	uint64_t rx;
+} __rte_cache_aligned;
+/* Port 0: NTB dev, Port 1: ethdev when iofwd. */
+struct ntb_port_statistics ntb_port_stats[2];
+
+struct ntb_fwd_stream {
+	uint16_t tx_port;
+	uint16_t rx_port;
+	uint16_t qp_id;
+	uint8_t tx_ntb;  /* If ntb device is tx port. */
+};
+
+struct ntb_fwd_lcore_conf {
+	uint16_t stream_id;
+	uint16_t nb_stream;
+	uint8_t stopped;
+};
+
+enum ntb_fwd_mode {
+	FILE_TRANS = 0,
+	RXONLY,
+	TXONLY,
+	IOFWD,
+	MAX_FWD_MODE,
+};
+static const char *const fwd_mode_s[] = {
+	"file-trans",
+	"rxonly",
+	"txonly",
+	"iofwd",
+	NULL,
+};
+static enum ntb_fwd_mode fwd_mode = MAX_FWD_MODE;
+
+static struct ntb_fwd_lcore_conf fwd_lcore_conf[RTE_MAX_LCORE];
+static struct ntb_fwd_stream *fwd_streams;
+
+static struct rte_mempool *mbuf_pool;
+
+#define NTB_DRV_NAME_LEN 7
+#define MEMPOOL_CACHE_SIZE 256
+
+static uint8_t in_test;
 static uint8_t interactive = 1;
+static uint16_t eth_port_id = RTE_MAX_ETHPORTS;
 static uint16_t dev_id;
 
+/* Number of queues, default set as 1 */
+static uint16_t num_queues = 1;
+static uint16_t ntb_buf_size = RTE_MBUF_DEFAULT_BUF_SIZE;
+
+/* Configurable number of descriptors */
+#define NTB_DEFAULT_NUM_DESCS 1024
+static uint16_t nb_desc = NTB_DEFAULT_NUM_DESCS;
+
+static uint16_t tx_free_thresh;
+
+#define NTB_MAX_PKT_BURST 32
+#define NTB_DFLT_PKT_BURST 32
+static uint16_t pkt_burst = NTB_DFLT_PKT_BURST;
+
+#define BURST_TX_RETRIES 64
+
+static struct rte_eth_conf eth_port_conf = {
+	.rxmode = {
+		.mq_mode = ETH_MQ_RX_RSS,
+		.split_hdr_size = 0,
+	},
+	.rx_adv_conf = {
+		.rss_conf = {
+			.rss_key = NULL,
+			.rss_hf = ETH_RSS_IP,
+		},
+	},
+	.txmode = {
+		.mq_mode = ETH_MQ_TX_NONE,
+	},
+};
+
 /* *** Help command with introduction. *** */
 struct cmd_help_result {
 	cmdline_fixed_string_t help;
 };
 
-static void cmd_help_parsed(__attribute__((unused)) void *parsed_result,
-			    struct cmdline *cl,
-			    __attribute__((unused)) void *data)
+static void
+cmd_help_parsed(__attribute__((unused)) void *parsed_result,
+		struct cmdline *cl,
+		__attribute__((unused)) void *data)
 {
 	cmdline_printf(
 		cl,
@@ -37,13 +119,17 @@ static void cmd_help_parsed(__attribute__((unused)) void *parsed_result,
 		"Control:\n"
 		"    quit                                      :"
 		" Quit the application.\n"
-		"\nFile transmit:\n"
+		"\nTransmission:\n"
 		"    send [path]                               :"
-		" Send [path] file. (No more than %"PRIu64")\n"
-		"    recv [path]                            :"
-		" Receive file to [path]. Make sure sending is done"
-		" on the other side.\n",
-		max_file_size
+		" Send [path] file. Only take effect in file-trans mode\n"
+		"    start                                     :"
+		" Start transmissions.\n"
+		"    stop                                      :"
+		" Stop transmissions.\n"
+		"    clear/show port stats                     :"
+		" Clear/show port stats.\n"
+		"    set fwd file-trans/rxonly/txonly/iofwd    :"
+		" Set packet forwarding mode.\n"
 	);
 
 }
@@ -66,13 +152,37 @@ struct cmd_quit_result {
 	cmdline_fixed_string_t quit;
 };
 
-static void cmd_quit_parsed(__attribute__((unused)) void *parsed_result,
-			    struct cmdline *cl,
-			    __attribute__((unused)) void *data)
+static void
+cmd_quit_parsed(__attribute__((unused)) void *parsed_result,
+		struct cmdline *cl,
+		__attribute__((unused)) void *data)
 {
+	struct ntb_fwd_lcore_conf *conf;
+	uint8_t lcore_id;
+
+	/* Stop transmission first. */
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		conf = &fwd_lcore_conf[lcore_id];
+
+		if (!conf->nb_stream)
+			continue;
+
+		if (conf->stopped)
+			continue;
+
+		conf->stopped = 1;
+	}
+	printf("\nWaiting for lcores to finish...\n");
+	rte_eal_mp_wait_lcore();
+	in_test = 0;
+
 	/* Stop traffic and Close port. */
 	rte_rawdev_stop(dev_id);
 	rte_rawdev_close(dev_id);
+	if (eth_port_id < RTE_MAX_ETHPORTS && fwd_mode == IOFWD) {
+		rte_eth_dev_stop(eth_port_id);
+		rte_eth_dev_close(eth_port_id);
+	}
 
 	cmdline_quit(cl);
 }
@@ -102,21 +212,19 @@ cmd_sendfile_parsed(void *parsed_result,
 		    __attribute__((unused)) void *data)
 {
 	struct cmd_sendfile_result *res = parsed_result;
-	struct rte_rawdev_buf *pkts_send[1];
-	uint64_t rsize, size, link;
-	uint8_t *buff;
+	struct rte_rawdev_buf *pkts_send[NTB_MAX_PKT_BURST];
+	struct rte_mbuf *mbuf_send[NTB_MAX_PKT_BURST];
+	uint64_t size, count, i, nb_burst;
+	uint16_t nb_tx, buf_size;
+	unsigned int nb_pkt;
+	size_t queue_id = 0;
+	uint16_t retry = 0;
 	uint32_t val;
 	FILE *file;
 
-	if (!rte_rawdevs[dev_id].started) {
-		printf("Device needs to be up first. Try later.\n");
-		return;
-	}
-
-	rte_rawdev_get_attr(dev_id, "link_status", &link);
-	if (!link) {
-		printf("Link is not up, cannot send file.\n");
-		return;
+	if (num_queues != 1) {
+		printf("File transmission only supports 1 queue.\n");
+		num_queues = 1;
 	}
 
 	file = fopen(res->filepath, "r");
@@ -127,30 +235,13 @@ cmd_sendfile_parsed(void *parsed_result,
 
 	if (fseek(file, 0, SEEK_END) < 0) {
 		printf("Fail to get file size.\n");
+		fclose(file);
 		return;
 	}
 	size = ftell(file);
 	if (fseek(file, 0, SEEK_SET) < 0) {
 		printf("Fail to get file size.\n");
-		return;
-	}
-
-	/**
-	 * No FIFO now. Only test memory. Limit sending file
-	 * size <= max_file_size.
-	 */
-	if (size > max_file_size) {
-		printf("Warning: The file is too large. Only send first"
-		       " %"PRIu64" bits.\n", max_file_size);
-		size = max_file_size;
-	}
-
-	buff = (uint8_t *)malloc(size);
-	rsize = fread(buff, size, 1, file);
-	if (rsize != 1) {
-		printf("Fail to read file.\n");
 		fclose(file);
-		free(buff);
 		return;
 	}
 
@@ -159,22 +250,63 @@ cmd_sendfile_parsed(void *parsed_result,
 	rte_rawdev_set_attr(dev_id, "spad_user_0", val);
 	val = size;
 	rte_rawdev_set_attr(dev_id, "spad_user_1", val);
+	printf("Sending file, size is %"PRIu64"\n", size);
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		pkts_send[i] = (struct rte_rawdev_buf *)
+				malloc(sizeof(struct rte_rawdev_buf));
+
+	buf_size = ntb_buf_size - RTE_PKTMBUF_HEADROOM;
+	count = (size + buf_size - 1) / buf_size;
+	nb_burst = (count + pkt_burst - 1) / pkt_burst;
 
-	pkts_send[0] = (struct rte_rawdev_buf *)malloc
-			(sizeof(struct rte_rawdev_buf));
-	pkts_send[0]->buf_addr = buff;
+	for (i = 0; i < nb_burst; i++) {
+		val = RTE_MIN(count, pkt_burst);
+		if (rte_mempool_get_bulk(mbuf_pool, (void **)mbuf_send,
+					val) == 0) {
+			for (nb_pkt = 0; nb_pkt < val; nb_pkt++) {
+				mbuf_send[nb_pkt]->port = dev_id;
+				mbuf_send[nb_pkt]->data_len =
+				fread(rte_pktmbuf_mtod(mbuf_send[nb_pkt],
+					void *), 1, buf_size, file);
+				mbuf_send[nb_pkt]->pkt_len =
+					mbuf_send[nb_pkt]->data_len;
+				pkts_send[nb_pkt]->buf_addr = mbuf_send[nb_pkt];
+			}
+		} else {
+			for (nb_pkt = 0; nb_pkt < val; nb_pkt++) {
+				mbuf_send[nb_pkt] =
+					rte_mbuf_raw_alloc(mbuf_pool);
+				if (mbuf_send[nb_pkt] == NULL)
+					break;
+				mbuf_send[nb_pkt]->port = dev_id;
+				mbuf_send[nb_pkt]->data_len =
+				fread(rte_pktmbuf_mtod(mbuf_send[nb_pkt],
+					void *), 1, buf_size, file);
+				mbuf_send[nb_pkt]->pkt_len =
+					mbuf_send[nb_pkt]->data_len;
+				pkts_send[nb_pkt]->buf_addr = mbuf_send[nb_pkt];
+			}
+		}
 
-	if (rte_rawdev_enqueue_buffers(dev_id, pkts_send, 1,
-				       (void *)(size_t)size)) {
-		printf("Fail to enqueue.\n");
-		goto clean;
+		nb_tx = rte_rawdev_enqueue_buffers(dev_id, pkts_send, nb_pkt,
+						   (void *)queue_id);
+		while (nb_tx != nb_pkt && retry < BURST_TX_RETRIES) {
+			rte_delay_us(1);
+			nb_tx += rte_rawdev_enqueue_buffers(dev_id,
+				&pkts_send[nb_tx], nb_pkt - nb_tx,
+				(void *)queue_id);
+		}
+		count -= nb_pkt;
 	}
+	/* Clear register after file sending done. */
+	rte_rawdev_set_attr(dev_id, "spad_user_0", 0);
+	rte_rawdev_set_attr(dev_id, "spad_user_1", 0);
 	printf("Done sending file.\n");
 
-clean:
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		free(pkts_send[i]);
 	fclose(file);
-	free(buff);
-	free(pkts_send[0]);
 }
 
 cmdline_parse_token_string_t cmd_send_file_send =
@@ -195,79 +327,680 @@ cmdline_parse_inst_t cmd_send_file = {
 	},
 };
 
-/* *** RECEIVE FILE PARAMETERS *** */
-struct cmd_recvfile_result {
-	cmdline_fixed_string_t recv_string;
-	char filepath[];
-};
+#define RECV_FILE_LEN 30
+static int
+start_polling_recv_file(void *param)
+{
+	struct rte_rawdev_buf *pkts_recv[NTB_MAX_PKT_BURST];
+	struct ntb_fwd_lcore_conf *conf = param;
+	struct rte_mbuf *mbuf;
+	char filepath[RECV_FILE_LEN];
+	uint64_t val, size, file_len;
+	uint16_t nb_rx, i, file_no;
+	size_t queue_id = 0;
+	FILE *file;
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		pkts_recv[i] = (struct rte_rawdev_buf *)
+				malloc(sizeof(struct rte_rawdev_buf));
+
+	file_no = 0;
+	while (!conf->stopped) {
+		snprintf(filepath, RECV_FILE_LEN, "ntb_recv_file%d", file_no);
+		file = fopen(filepath, "w");
+		if (file == NULL) {
+			printf("Fail to open the file.\n");
+			return -EINVAL;
+		}
+
+		rte_rawdev_get_attr(dev_id, "spad_user_0", &val);
+		size = val << 32;
+		rte_rawdev_get_attr(dev_id, "spad_user_1", &val);
+		size |= val;
+
+		if (!size) {
+			fclose(file);
+			continue;
+		}
+
+		file_len = 0;
+		nb_rx = NTB_MAX_PKT_BURST;
+		while (file_len < size && !conf->stopped) {
+			nb_rx = rte_rawdev_dequeue_buffers(dev_id, pkts_recv,
+						pkt_burst, (void *)queue_id);
+			ntb_port_stats[0].rx += nb_rx;
+			for (i = 0; i < nb_rx; i++) {
+				mbuf = pkts_recv[i]->buf_addr;
+				fwrite(rte_pktmbuf_mtod(mbuf, void *), 1,
+					mbuf->data_len, file);
+				file_len += mbuf->data_len;
+				rte_pktmbuf_free(mbuf);
+				pkts_recv[i]->buf_addr = NULL;
+			}
+		}
+
+		printf("Received file (size: %" PRIu64 ") from peer to %s.\n",
+			size, filepath);
+		fclose(file);
+		file_no++;
+	}
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		free(pkts_recv[i]);
+	return 0;
+}
+
+static int
+start_iofwd_per_lcore(void *param)
+{
+	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+	struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
+	struct ntb_fwd_lcore_conf *conf = param;
+	struct ntb_fwd_stream fs;
+	uint16_t nb_rx, nb_tx;
+	int i, j;
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		ntb_buf[i] = (struct rte_rawdev_buf *)
+			     malloc(sizeof(struct rte_rawdev_buf));
+
+	while (!conf->stopped) {
+		for (i = 0; i < conf->nb_stream; i++) {
+			fs = fwd_streams[conf->stream_id + i];
+			if (fs.tx_ntb) {
+				nb_rx = rte_eth_rx_burst(fs.rx_port,
+						fs.qp_id, pkts_burst,
+						pkt_burst);
+				if (unlikely(nb_rx == 0))
+					continue;
+				for (j = 0; j < nb_rx; j++)
+					ntb_buf[j]->buf_addr = pkts_burst[j];
+				nb_tx =
+				rte_rawdev_enqueue_buffers(fs.tx_port,
+						ntb_buf, nb_rx,
+						(void *)(size_t)fs.qp_id);
+				ntb_port_stats[0].tx += nb_tx;
+				ntb_port_stats[1].rx += nb_rx;
+			} else {
+				nb_rx =
+				rte_rawdev_dequeue_buffers(fs.rx_port,
+						ntb_buf, pkt_burst,
+						(void *)(size_t)fs.qp_id);
+				if (unlikely(nb_rx == 0))
+					continue;
+				for (j = 0; j < nb_rx; j++)
+					pkts_burst[j] = ntb_buf[j]->buf_addr;
+				nb_tx = rte_eth_tx_burst(fs.tx_port,
+					fs.qp_id, pkts_burst, nb_rx);
+				ntb_port_stats[1].tx += nb_tx;
+				ntb_port_stats[0].rx += nb_rx;
+			}
+			if (unlikely(nb_tx < nb_rx)) {
+				do {
+					rte_pktmbuf_free(pkts_burst[nb_tx]);
+				} while (++nb_tx < nb_rx);
+			}
+		}
+	}
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		free(ntb_buf[i]);
+
+	return 0;
+}
+
+static int
+start_rxonly_per_lcore(void *param)
+{
+	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+	struct ntb_fwd_lcore_conf *conf = param;
+	struct ntb_fwd_stream fs;
+	uint16_t nb_rx;
+	int i, j;
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		ntb_buf[i] = (struct rte_rawdev_buf *)
+			     malloc(sizeof(struct rte_rawdev_buf));
+
+	while (!conf->stopped) {
+		for (i = 0; i < conf->nb_stream; i++) {
+			fs = fwd_streams[conf->stream_id + i];
+			nb_rx = rte_rawdev_dequeue_buffers(fs.rx_port,
+				ntb_buf, pkt_burst, (void *)(size_t)fs.qp_id);
+			if (unlikely(nb_rx == 0))
+				continue;
+			ntb_port_stats[0].rx += nb_rx;
+
+			for (j = 0; j < nb_rx; j++)
+				rte_pktmbuf_free(ntb_buf[j]->buf_addr);
+		}
+	}
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		free(ntb_buf[i]);
+
+	return 0;
+}
+
+
+static int
+start_txonly_per_lcore(void *param)
+{
+	struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+	struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
+	struct ntb_fwd_lcore_conf *conf = param;
+	struct ntb_fwd_stream fs;
+	uint16_t nb_pkt, nb_tx;
+	int i;
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		ntb_buf[i] = (struct rte_rawdev_buf *)
+			     malloc(sizeof(struct rte_rawdev_buf));
+
+	while (!conf->stopped) {
+		for (i = 0; i < conf->nb_stream; i++) {
+			fs = fwd_streams[conf->stream_id + i];
+			if (rte_mempool_get_bulk(mbuf_pool, (void **)pkts_burst,
+				  pkt_burst) == 0) {
+				for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
+					pkts_burst[nb_pkt]->port = dev_id;
+					pkts_burst[nb_pkt]->data_len =
+						pkts_burst[nb_pkt]->buf_len -
+						RTE_PKTMBUF_HEADROOM;
+					pkts_burst[nb_pkt]->pkt_len =
+						pkts_burst[nb_pkt]->data_len;
+					ntb_buf[nb_pkt]->buf_addr =
+						pkts_burst[nb_pkt];
+				}
+			} else {
+				for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
+					pkts_burst[nb_pkt] =
+						rte_pktmbuf_alloc(mbuf_pool);
+					if (pkts_burst[nb_pkt] == NULL)
+						break;
+					pkts_burst[nb_pkt]->port = dev_id;
+					pkts_burst[nb_pkt]->data_len =
+						pkts_burst[nb_pkt]->buf_len -
+						RTE_PKTMBUF_HEADROOM;
+					pkts_burst[nb_pkt]->pkt_len =
+						pkts_burst[nb_pkt]->data_len;
+					ntb_buf[nb_pkt]->buf_addr =
+						pkts_burst[nb_pkt];
+				}
+			}
+			nb_tx = rte_rawdev_enqueue_buffers(fs.tx_port,
+				ntb_buf, nb_pkt, (void *)(size_t)fs.qp_id);
+			ntb_port_stats[0].tx += nb_tx;
+			if (unlikely(nb_tx < nb_pkt)) {
+				do {
+					rte_pktmbuf_free(
+						ntb_buf[nb_tx]->buf_addr);
+				} while (++nb_tx < nb_pkt);
+			}
+		}
+	}
+
+	for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+		free(ntb_buf[i]);
+
+	return 0;
+}
+
+static int
+ntb_fwd_config_setup(void)
+{
+	uint16_t i;
+
+	/* Make sure iofwd has valid ethdev. */
+	if (fwd_mode == IOFWD && eth_port_id >= RTE_MAX_ETHPORTS) {
+		printf("No ethdev, cannot be in iofwd mode.");
+		return -EINVAL;
+	}
+
+	if (fwd_mode == IOFWD) {
+		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+			sizeof(struct ntb_fwd_stream) * num_queues * 2,
+			RTE_CACHE_LINE_SIZE);
+		for (i = 0; i < num_queues; i++) {
+			fwd_streams[i * 2].qp_id = i;
+			fwd_streams[i * 2].tx_port = dev_id;
+			fwd_streams[i * 2].rx_port = eth_port_id;
+			fwd_streams[i * 2].tx_ntb = 1;
+
+			fwd_streams[i * 2 + 1].qp_id = i;
+			fwd_streams[i * 2 + 1].tx_port = eth_port_id;
+			fwd_streams[i * 2 + 1].rx_port = dev_id;
+			fwd_streams[i * 2 + 1].tx_ntb = 0;
+		}
+		return 0;
+	}
+
+	if (fwd_mode == RXONLY || fwd_mode == FILE_TRANS) {
+		/* Only support 1 queue in file-trans for in order. */
+		if (fwd_mode == FILE_TRANS)
+			num_queues = 1;
+
+		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+				sizeof(struct ntb_fwd_stream) * num_queues,
+				RTE_CACHE_LINE_SIZE);
+		for (i = 0; i < num_queues; i++) {
+			fwd_streams[i].qp_id = i;
+			fwd_streams[i].tx_port = RTE_MAX_ETHPORTS;
+			fwd_streams[i].rx_port = dev_id;
+			fwd_streams[i].tx_ntb = 0;
+		}
+		return 0;
+	}
+
+	if (fwd_mode == TXONLY) {
+		fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+				sizeof(struct ntb_fwd_stream) * num_queues,
+				RTE_CACHE_LINE_SIZE);
+		for (i = 0; i < num_queues; i++) {
+			fwd_streams[i].qp_id = i;
+			fwd_streams[i].tx_port = dev_id;
+			fwd_streams[i].rx_port = RTE_MAX_ETHPORTS;
+			fwd_streams[i].tx_ntb = 1;
+		}
+	}
+	return 0;
+}
 
 static void
-cmd_recvfile_parsed(void *parsed_result,
-		    __attribute__((unused)) struct cmdline *cl,
-		    __attribute__((unused)) void *data)
+assign_stream_to_lcores(void)
 {
-	struct cmd_sendfile_result *res = parsed_result;
-	struct rte_rawdev_buf *pkts_recv[1];
-	uint8_t *buff;
-	uint64_t val;
-	size_t size;
-	FILE *file;
+	struct ntb_fwd_lcore_conf *conf;
+	struct ntb_fwd_stream *fs;
+	uint16_t nb_streams, sm_per_lcore, sm_id, i;
+	uint8_t lcore_id, lcore_num, nb_extra;
 
-	if (!rte_rawdevs[dev_id].started) {
-		printf("Device needs to be up first. Try later.\n");
-		return;
+	lcore_num = rte_lcore_count();
+	/* Exclude master core */
+	lcore_num--;
+
+	nb_streams = (fwd_mode == IOFWD) ? num_queues * 2 : num_queues;
+
+	sm_per_lcore = nb_streams / lcore_num;
+	nb_extra = nb_streams % lcore_num;
+	sm_id = 0;
+	i = 0;
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		conf = &fwd_lcore_conf[lcore_id];
+
+		if (i < nb_extra) {
+			conf->nb_stream = sm_per_lcore + 1;
+			conf->stream_id = sm_id;
+			sm_id = sm_id + sm_per_lcore + 1;
+		} else {
+			conf->nb_stream = sm_per_lcore;
+			conf->stream_id = sm_id;
+			sm_id = sm_id + sm_per_lcore;
+		}
+
+		i++;
+		if (sm_id >= nb_streams)
+			break;
+	}
+
+	/* Print packet forwading config. */
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		conf = &fwd_lcore_conf[lcore_id];
+
+		if (!conf->nb_stream)
+			continue;
+
+		printf("Streams on Lcore %u :\n", lcore_id);
+		for (i = 0; i < conf->nb_stream; i++) {
+			fs = &fwd_streams[conf->stream_id + i];
+			if (fwd_mode == IOFWD)
+				printf(" + Stream %u : %s%u RX -> %s%u TX,"
+					" Q=%u\n", conf->stream_id + i,
+					fs->tx_ntb ? "Eth" : "NTB", fs->rx_port,
+					fs->tx_ntb ? "NTB" : "Eth", fs->tx_port,
+					fs->qp_id);
+			if (fwd_mode == FILE_TRANS || fwd_mode == RXONLY)
+				printf(" + Stream %u : %s%u RX only\n",
+					conf->stream_id, "NTB", fs->rx_port);
+			if (fwd_mode == TXONLY)
+				printf(" + Stream %u : %s%u TX only\n",
+					conf->stream_id, "NTB", fs->tx_port);
+		}
 	}
+}
 
-	rte_rawdev_get_attr(dev_id, "link_status", &val);
-	if (!val) {
-		printf("Link is not up, cannot receive file.\n");
+static void
+start_pkt_fwd(void)
+{
+	struct ntb_fwd_lcore_conf *conf;
+	struct rte_eth_link eth_link;
+	uint8_t lcore_id;
+	int ret, i;
+
+	ret = ntb_fwd_config_setup();
+	if (ret < 0) {
+		printf("Cannot start traffic. Please reset fwd mode.\n");
 		return;
 	}
 
-	file = fopen(res->filepath, "w");
-	if (file == NULL) {
-		printf("Fail to open the file.\n");
+	/* If using iofwd, checking ethdev link status first. */
+	if (fwd_mode == IOFWD) {
+		printf("Checking eth link status...\n");
+		/* Wait for eth link up at most 100 times. */
+		for (i = 0; i < 100; i++) {
+			rte_eth_link_get(eth_port_id, &eth_link);
+			if (eth_link.link_status) {
+				printf("Eth%u Link Up. Speed %u Mbps - %s\n",
+					eth_port_id, eth_link.link_speed,
+					(eth_link.link_duplex ==
+					 ETH_LINK_FULL_DUPLEX) ?
+					("full-duplex") : ("half-duplex"));
+				break;
+			}
+		}
+		if (!eth_link.link_status) {
+			printf("Eth%u link down. Cannot start traffic.\n",
+				eth_port_id);
+			return;
+		}
+	}
+
+	assign_stream_to_lcores();
+	in_test = 1;
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		conf = &fwd_lcore_conf[lcore_id];
+
+		if (!conf->nb_stream)
+			continue;
+
+		conf->stopped = 0;
+		if (fwd_mode == FILE_TRANS)
+			rte_eal_remote_launch(start_polling_recv_file,
+					      conf, lcore_id);
+		else if (fwd_mode == IOFWD)
+			rte_eal_remote_launch(start_iofwd_per_lcore,
+					      conf, lcore_id);
+		else if (fwd_mode == RXONLY)
+			rte_eal_remote_launch(start_rxonly_per_lcore,
+					      conf, lcore_id);
+		else if (fwd_mode == TXONLY)
+			rte_eal_remote_launch(start_txonly_per_lcore,
+					      conf, lcore_id);
+	}
+}
+
+/* *** START FWD PARAMETERS *** */
+struct cmd_start_result {
+	cmdline_fixed_string_t start;
+};
+
+static void
+cmd_start_parsed(__attribute__((unused)) void *parsed_result,
+			    __attribute__((unused)) struct cmdline *cl,
+			    __attribute__((unused)) void *data)
+{
+	start_pkt_fwd();
+}
+
+cmdline_parse_token_string_t cmd_start_start =
+		TOKEN_STRING_INITIALIZER(struct cmd_start_result, start, "start");
+
+cmdline_parse_inst_t cmd_start = {
+	.f = cmd_start_parsed,
+	.data = NULL,
+	.help_str = "start pkt fwd between ntb and ethdev",
+	.tokens = {
+		(void *)&cmd_start_start,
+		NULL,
+	},
+};
+
+/* *** STOP *** */
+struct cmd_stop_result {
+	cmdline_fixed_string_t stop;
+};
+
+static void
+cmd_stop_parsed(__attribute__((unused)) void *parsed_result,
+		__attribute__((unused)) struct cmdline *cl,
+		__attribute__((unused)) void *data)
+{
+	struct ntb_fwd_lcore_conf *conf;
+	uint8_t lcore_id;
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		conf = &fwd_lcore_conf[lcore_id];
+
+		if (!conf->nb_stream)
+			continue;
+
+		if (conf->stopped)
+			continue;
+
+		conf->stopped = 1;
+	}
+	printf("\nWaiting for lcores to finish...\n");
+	rte_eal_mp_wait_lcore();
+	in_test = 0;
+	printf("\nDone.\n");
+}
+
+cmdline_parse_token_string_t cmd_stop_stop =
+		TOKEN_STRING_INITIALIZER(struct cmd_stop_result, stop, "stop");
+
+cmdline_parse_inst_t cmd_stop = {
+	.f = cmd_stop_parsed,
+	.data = NULL,
+	.help_str = "stop: Stop packet forwarding",
+	.tokens = {
+		(void *)&cmd_stop_stop,
+		NULL,
+	},
+};
+
+static void
+ntb_stats_clear(void)
+{
+	int nb_ids, i;
+	uint32_t *ids;
+
+	/* Clear NTB dev stats */
+	nb_ids = rte_rawdev_xstats_names_get(dev_id, NULL, 0);
+	if (nb_ids  < 0) {
+		printf("Error: Cannot get count of xstats\n");
 		return;
 	}
+	ids = malloc(sizeof(uint32_t) * nb_ids);
+	for (i = 0; i < nb_ids; i++)
+		ids[i] = i;
+	rte_rawdev_xstats_reset(dev_id, ids, nb_ids);
+	printf("\n  statistics for NTB port %d cleared\n", dev_id);
+
+	/* Clear Ethdev stats if have any */
+	if (fwd_mode == IOFWD && eth_port_id != RTE_MAX_ETHPORTS) {
+		rte_eth_stats_reset(eth_port_id);
+		printf("\n  statistics for ETH port %d cleared\n", eth_port_id);
+	}
+}
+
+static inline void
+ntb_calculate_throughput(uint16_t port) {
+	uint64_t diff_pkts_rx, diff_pkts_tx, diff_cycles;
+	uint64_t mpps_rx, mpps_tx;
+	static uint64_t prev_pkts_rx[2];
+	static uint64_t prev_pkts_tx[2];
+	static uint64_t prev_cycles[2];
+
+	diff_cycles = prev_cycles[port];
+	prev_cycles[port] = rte_rdtsc();
+	if (diff_cycles > 0)
+		diff_cycles = prev_cycles[port] - diff_cycles;
+	diff_pkts_rx = (ntb_port_stats[port].rx > prev_pkts_rx[port]) ?
+		(ntb_port_stats[port].rx - prev_pkts_rx[port]) : 0;
+	diff_pkts_tx = (ntb_port_stats[port].tx > prev_pkts_tx[port]) ?
+		(ntb_port_stats[port].tx - prev_pkts_tx[port]) : 0;
+	prev_pkts_rx[port] = ntb_port_stats[port].rx;
+	prev_pkts_tx[port] = ntb_port_stats[port].tx;
+	mpps_rx = diff_cycles > 0 ?
+		diff_pkts_rx * rte_get_tsc_hz() / diff_cycles : 0;
+	mpps_tx = diff_cycles > 0 ?
+		diff_pkts_tx * rte_get_tsc_hz() / diff_cycles : 0;
+	printf("  Throughput (since last show)\n");
+	printf("  Rx-pps: %12"PRIu64"\n  Tx-pps: %12"PRIu64"\n",
+			mpps_rx, mpps_tx);
+
+}
+
+static void
+ntb_stats_display(void)
+{
+	struct rte_rawdev_xstats_name *xstats_names;
+	struct rte_eth_stats stats;
+	uint64_t *values;
+	uint32_t *ids;
+	int nb_ids, i;
 
-	rte_rawdev_get_attr(dev_id, "spad_user_0", &val);
-	size = val << 32;
-	rte_rawdev_get_attr(dev_id, "spad_user_1", &val);
-	size |= val;
+	printf("###### statistics for NTB port %d #######\n", dev_id);
 
-	buff = (uint8_t *)malloc(size);
-	pkts_recv[0] = (struct rte_rawdev_buf *)malloc
-			(sizeof(struct rte_rawdev_buf));
-	pkts_recv[0]->buf_addr = buff;
+	/* Get NTB dev stats and stats names */
+	nb_ids = rte_rawdev_xstats_names_get(dev_id, NULL, 0);
+	if (nb_ids  < 0) {
+		printf("Error: Cannot get count of xstats\n");
+		return;
+	}
+	xstats_names = malloc(sizeof(struct rte_rawdev_xstats_name) * nb_ids);
+	if (xstats_names == NULL) {
+		printf("Cannot allocate memory for xstats lookup\n");
+		return;
+	}
+	if (nb_ids != rte_rawdev_xstats_names_get(
+			dev_id, xstats_names, nb_ids)) {
+		printf("Error: Cannot get xstats lookup\n");
+		free(xstats_names);
+		return;
+	}
+	ids = malloc(sizeof(uint32_t) * nb_ids);
+	for (i = 0; i < nb_ids; i++)
+		ids[i] = i;
+	values = malloc(sizeof(uint64_t) * nb_ids);
+	if (nb_ids != rte_rawdev_xstats_get(dev_id, ids, values, nb_ids)) {
+		printf("Error: Unable to get xstats\n");
+		free(xstats_names);
+		free(values);
+		free(ids);
+		return;
+	}
+
+	/* Display NTB dev stats */
+	for (i = 0; i < nb_ids; i++)
+		printf("  %s: %"PRIu64"\n", xstats_names[i].name, values[i]);
+	ntb_calculate_throughput(0);
 
-	if (rte_rawdev_dequeue_buffers(dev_id, pkts_recv, 1, (void *)size)) {
-		printf("Fail to dequeue.\n");
-		goto clean;
+	/* Get Ethdev stats if have any */
+	if (fwd_mode == IOFWD && eth_port_id != RTE_MAX_ETHPORTS) {
+		printf("###### statistics for ETH port %d ######\n",
+			eth_port_id);
+		rte_eth_stats_get(eth_port_id, &stats);
+		printf("  RX-packets: %"PRIu64"\n", stats.ipackets);
+		printf("  RX-bytes: %"PRIu64"\n", stats.ibytes);
+		printf("  RX-errors: %"PRIu64"\n", stats.ierrors);
+		printf("  RX-missed: %"PRIu64"\n", stats.imissed);
+		printf("  TX-packets: %"PRIu64"\n", stats.opackets);
+		printf("  TX-bytes: %"PRIu64"\n", stats.obytes);
+		printf("  TX-errors: %"PRIu64"\n", stats.oerrors);
+		ntb_calculate_throughput(1);
 	}
 
-	fwrite(buff, size, 1, file);
-	printf("Done receiving to file.\n");
+	free(xstats_names);
+	free(values);
+	free(ids);
+}
 
-clean:
-	fclose(file);
-	free(buff);
-	free(pkts_recv[0]);
+/* *** SHOW/CLEAR PORT STATS *** */
+struct cmd_stats_result {
+	cmdline_fixed_string_t show;
+	cmdline_fixed_string_t port;
+	cmdline_fixed_string_t stats;
+};
+
+static void
+cmd_stats_parsed(void *parsed_result,
+		 __attribute__((unused)) struct cmdline *cl,
+		 __attribute__((unused)) void *data)
+{
+	struct cmd_stats_result *res = parsed_result;
+	if (!strcmp(res->show, "clear"))
+		ntb_stats_clear();
+	else
+		ntb_stats_display();
 }
 
-cmdline_parse_token_string_t cmd_recv_file_recv =
-	TOKEN_STRING_INITIALIZER(struct cmd_recvfile_result, recv_string,
-				 "recv");
-cmdline_parse_token_string_t cmd_recv_file_filepath =
-	TOKEN_STRING_INITIALIZER(struct cmd_recvfile_result, filepath, NULL);
+cmdline_parse_token_string_t cmd_stats_show =
+	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, show, "show#clear");
+cmdline_parse_token_string_t cmd_stats_port =
+	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, port, "port");
+cmdline_parse_token_string_t cmd_stats_stats =
+	TOKEN_STRING_INITIALIZER(struct cmd_stats_result, stats, "stats");
 
 
-cmdline_parse_inst_t cmd_recv_file = {
-	.f = cmd_recvfile_parsed,
+cmdline_parse_inst_t cmd_stats = {
+	.f = cmd_stats_parsed,
 	.data = NULL,
-	.help_str = "recv <file_path>",
+	.help_str = "show|clear port stats",
 	.tokens = {
-		(void *)&cmd_recv_file_recv,
-		(void *)&cmd_recv_file_filepath,
+		(void *)&cmd_stats_show,
+		(void *)&cmd_stats_port,
+		(void *)&cmd_stats_stats,
+		NULL,
+	},
+};
+
+/* *** SET FORWARDING MODE *** */
+struct cmd_set_fwd_mode_result {
+	cmdline_fixed_string_t set;
+	cmdline_fixed_string_t fwd;
+	cmdline_fixed_string_t mode;
+};
+
+static void
+cmd_set_fwd_mode_parsed(__attribute__((unused)) void *parsed_result,
+			__attribute__((unused)) struct cmdline *cl,
+			__attribute__((unused)) void *data)
+{
+	struct cmd_set_fwd_mode_result *res = parsed_result;
+	int i;
+
+	if (in_test) {
+		printf("Please stop traffic first.\n");
+		return;
+	}
+
+	for (i = 0; i < MAX_FWD_MODE; i++) {
+		if (!strcmp(res->mode, fwd_mode_s[i])) {
+			fwd_mode = i;
+			return;
+		}
+	}
+	printf("Invalid %s packet forwarding mode.\n", res->mode);
+}
+
+cmdline_parse_token_string_t cmd_setfwd_set =
+	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, set, "set");
+cmdline_parse_token_string_t cmd_setfwd_fwd =
+	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, fwd, "fwd");
+cmdline_parse_token_string_t cmd_setfwd_mode =
+	TOKEN_STRING_INITIALIZER(struct cmd_set_fwd_mode_result, mode,
+				"file-trans#iofwd#txonly#rxonly");
+
+cmdline_parse_inst_t cmd_set_fwd_mode = {
+	.f = cmd_set_fwd_mode_parsed,
+	.data = NULL,
+	.help_str = "set forwarding mode as file-trans|rxonly|txonly|iofwd",
+	.tokens = {
+		(void *)&cmd_setfwd_set,
+		(void *)&cmd_setfwd_fwd,
+		(void *)&cmd_setfwd_mode,
 		NULL,
 	},
 };
@@ -276,7 +1009,10 @@ cmdline_parse_inst_t cmd_recv_file = {
 cmdline_parse_ctx_t main_ctx[] = {
 	(cmdline_parse_inst_t *)&cmd_help,
 	(cmdline_parse_inst_t *)&cmd_send_file,
-	(cmdline_parse_inst_t *)&cmd_recv_file,
+	(cmdline_parse_inst_t *)&cmd_start,
+	(cmdline_parse_inst_t *)&cmd_stop,
+	(cmdline_parse_inst_t *)&cmd_stats,
+	(cmdline_parse_inst_t *)&cmd_set_fwd_mode,
 	(cmdline_parse_inst_t *)&cmd_quit,
 	NULL,
 };
@@ -305,45 +1041,257 @@ signal_handler(int signum)
 	}
 }
 
+#define OPT_BUF_SIZE         "buf-size"
+#define OPT_FWD_MODE         "fwd-mode"
+#define OPT_NB_DESC          "nb-desc"
+#define OPT_TXFREET          "txfreet"
+#define OPT_BURST            "burst"
+#define OPT_QP               "qp"
+
+enum {
+	/* long options mapped to a short option */
+	OPT_NO_ZERO_COPY_NUM = 1,
+	OPT_BUF_SIZE_NUM,
+	OPT_FWD_MODE_NUM,
+	OPT_NB_DESC_NUM,
+	OPT_TXFREET_NUM,
+	OPT_BURST_NUM,
+	OPT_QP_NUM,
+};
+
+static const char short_options[] =
+	"i" /* interactive mode */
+	;
+
+static const struct option lgopts[] = {
+	{OPT_BUF_SIZE,     1, NULL, OPT_BUF_SIZE_NUM     },
+	{OPT_FWD_MODE,     1, NULL, OPT_FWD_MODE_NUM     },
+	{OPT_NB_DESC,      1, NULL, OPT_NB_DESC_NUM      },
+	{OPT_TXFREET,      1, NULL, OPT_TXFREET_NUM      },
+	{OPT_BURST,        1, NULL, OPT_BURST_NUM        },
+	{OPT_QP,           1, NULL, OPT_QP_NUM           },
+	{0,                0, NULL, 0                    }
+};
+
 static void
 ntb_usage(const char *prgname)
 {
 	printf("%s [EAL options] -- [options]\n"
-	       "-i : run in interactive mode (default value is 1)\n",
-	       prgname);
+	       "-i: run in interactive mode.\n"
+	       "-qp=N: set number of queues as N (N > 0, default: 1).\n"
+	       "--fwd-mode=N: set fwd mode (N: file-trans | rxonly | "
+	       "txonly | iofwd, default: file-trans)\n"
+	       "--buf-size=N: set mbuf dataroom size as N (0 < N < 65535,"
+	       " default: 2048).\n"
+	       "--nb-desc=N: set number of descriptors as N (%u <= N <= %u,"
+	       " default: 1024).\n"
+	       "--txfreet=N: set tx free thresh for NTB driver as N. (N >= 0)\n"
+	       "--burst=N: set pkt burst as N (0 < N <= %u default: 32).\n",
+	       prgname, NTB_MIN_DESC_SIZE, NTB_MAX_DESC_SIZE,
+	       NTB_MAX_PKT_BURST);
 }
 
-static int
-parse_args(int argc, char **argv)
+static void
+ntb_parse_args(int argc, char **argv)
 {
 	char *prgname = argv[0], **argvopt = argv;
-	int opt, ret;
+	int opt, opt_idx, n, i;
 
-	/* Only support interactive mode to send/recv file first. */
-	while ((opt = getopt(argc, argvopt, "i")) != EOF) {
+	while ((opt = getopt_long(argc, argvopt, short_options,
+				lgopts, &opt_idx)) != EOF) {
 		switch (opt) {
 		case 'i':
-			printf("Interactive-mode selected\n");
+			printf("Interactive-mode selected.\n");
 			interactive = 1;
 			break;
+		case OPT_QP_NUM:
+			n = atoi(optarg);
+			if (n > 0)
+				num_queues = n;
+			else
+				rte_exit(EXIT_FAILURE, "q must be > 0.\n");
+			break;
+		case OPT_BUF_SIZE_NUM:
+			n = atoi(optarg);
+			if (n > RTE_PKTMBUF_HEADROOM && n <= 0xFFFF)
+				ntb_buf_size = n;
+			else
+				rte_exit(EXIT_FAILURE, "buf-size must be > "
+					"%u and < 65536.\n",
+					RTE_PKTMBUF_HEADROOM);
+			break;
+		case OPT_FWD_MODE_NUM:
+			for (i = 0; i < MAX_FWD_MODE; i++) {
+				if (!strcmp(optarg, fwd_mode_s[i])) {
+					fwd_mode = i;
+					break;
+				}
+			}
+			if (i == MAX_FWD_MODE)
+				rte_exit(EXIT_FAILURE, "Unsupported mode. "
+				"(Should be: file-trans | rxonly | txonly "
+				"| iofwd)\n");
+			break;
+		case OPT_NB_DESC_NUM:
+			n = atoi(optarg);
+			if (n >= NTB_MIN_DESC_SIZE && n <= NTB_MAX_DESC_SIZE)
+				nb_desc = n;
+			else
+				rte_exit(EXIT_FAILURE, "nb-desc must be within"
+					" [%u, %u].\n", NTB_MIN_DESC_SIZE,
+					NTB_MAX_DESC_SIZE);
+			break;
+		case OPT_TXFREET_NUM:
+			n = atoi(optarg);
+			if (n >= 0)
+				tx_free_thresh = n;
+			else
+				rte_exit(EXIT_FAILURE, "txfreet must be"
+					" >= 0\n");
+			break;
+		case OPT_BURST_NUM:
+			n = atoi(optarg);
+			if (n > 0 && n <= NTB_MAX_PKT_BURST)
+				pkt_burst = n;
+			else
+				rte_exit(EXIT_FAILURE, "burst must be within "
+					"(0, %u].\n", NTB_MAX_PKT_BURST);
+			break;
 
 		default:
 			ntb_usage(prgname);
-			return -1;
+			rte_exit(EXIT_FAILURE,
+				 "Command line is incomplete or incorrect.\n");
+			break;
 		}
 	}
+}
 
-	if (optind >= 0)
-		argv[optind-1] = prgname;
+static void
+ntb_mempool_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
+		void *opaque)
+{
+	const struct rte_memzone *mz = opaque;
+	rte_memzone_free(mz);
+}
 
-	ret = optind-1;
-	optind = 1; /* reset getopt lib */
-	return ret;
+static struct rte_mempool *
+ntb_mbuf_pool_create(uint16_t mbuf_seg_size, uint32_t nb_mbuf,
+		     struct ntb_dev_info ntb_info,
+		     struct ntb_dev_config *ntb_conf,
+		     unsigned int socket_id)
+{
+	size_t mz_len, total_elt_sz, max_mz_len, left_sz;
+	struct rte_pktmbuf_pool_private mbp_priv;
+	char pool_name[RTE_MEMPOOL_NAMESIZE];
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+	struct rte_mempool *mp;
+	uint64_t align;
+	uint32_t mz_id;
+	int ret;
+
+	snprintf(pool_name, sizeof(pool_name), "ntb_mbuf_pool_%u", socket_id);
+	mp = rte_mempool_create_empty(pool_name, nb_mbuf,
+				      (mbuf_seg_size + sizeof(struct rte_mbuf)),
+				      MEMPOOL_CACHE_SIZE,
+				      sizeof(struct rte_pktmbuf_pool_private),
+				      socket_id, 0);
+	if (mp == NULL)
+		return NULL;
+
+	mbp_priv.mbuf_data_room_size = mbuf_seg_size;
+	mbp_priv.mbuf_priv_size = 0;
+	rte_pktmbuf_pool_init(mp, &mbp_priv);
+
+	ntb_conf->mz_list = rte_zmalloc("ntb_memzone_list",
+				sizeof(struct rte_memzone *) *
+				ntb_info.mw_cnt, 0);
+	if (ntb_conf->mz_list == NULL)
+		goto fail;
+
+	/* Put ntb header on mw0. */
+	if (ntb_info.mw_size[0] < ntb_info.ntb_hdr_size) {
+		printf("mw0 (size: %" PRIu64 ") is not enough for ntb hdr"
+		       " (size: %u)\n", ntb_info.mw_size[0],
+		       ntb_info.ntb_hdr_size);
+		goto fail;
+	}
+
+	total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
+	left_sz = total_elt_sz * nb_mbuf;
+	for (mz_id = 0; mz_id < ntb_info.mw_cnt; mz_id++) {
+		/* If populated mbuf is enough, no need to reserve extra mz. */
+		if (!left_sz)
+			break;
+		snprintf(mz_name, sizeof(mz_name), "ntb_mw_%d", mz_id);
+		align = ntb_info.mw_size_align ? ntb_info.mw_size[mz_id] :
+			RTE_CACHE_LINE_SIZE;
+		/* Reserve ntb header space on memzone 0. */
+		max_mz_len = mz_id ? ntb_info.mw_size[mz_id] :
+			     ntb_info.mw_size[mz_id] - ntb_info.ntb_hdr_size;
+		mz_len = left_sz <= max_mz_len ? left_sz :
+			(max_mz_len / total_elt_sz * total_elt_sz);
+		if (!mz_len)
+			continue;
+		mz = rte_memzone_reserve_aligned(mz_name, mz_len, socket_id,
+					RTE_MEMZONE_IOVA_CONTIG, align);
+		if (mz == NULL) {
+			printf("Cannot allocate %" PRIu64 " aligned memzone"
+				" %u\n", align, mz_id);
+			goto fail;
+		}
+		left_sz -= mz_len;
+
+		/* Reserve ntb header space on memzone 0. */
+		if (mz_id)
+			ret = rte_mempool_populate_iova(mp, mz->addr, mz->iova,
+					mz->len, ntb_mempool_mz_free,
+					(void *)(uintptr_t)mz);
+		else
+			ret = rte_mempool_populate_iova(mp,
+					(void *)((size_t)mz->addr +
+					ntb_info.ntb_hdr_size),
+					mz->iova + ntb_info.ntb_hdr_size,
+					mz->len - ntb_info.ntb_hdr_size,
+					ntb_mempool_mz_free,
+					(void *)(uintptr_t)mz);
+		if (ret < 0) {
+			rte_memzone_free(mz);
+			rte_mempool_free(mp);
+			return NULL;
+		}
+
+		ntb_conf->mz_list[mz_id] = mz;
+	}
+	if (left_sz) {
+		printf("mw space is not enough for mempool.\n");
+		goto fail;
+	}
+
+	ntb_conf->mz_num = mz_id;
+	rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);
+
+	return mp;
+fail:
+	rte_mempool_free(mp);
+	return NULL;
 }
 
 int
 main(int argc, char **argv)
 {
+	struct rte_eth_conf eth_pconf = eth_port_conf;
+	struct rte_rawdev_info ntb_rawdev_conf;
+	struct rte_rawdev_info ntb_rawdev_info;
+	struct rte_eth_dev_info ethdev_info;
+	struct rte_eth_rxconf eth_rx_conf;
+	struct rte_eth_txconf eth_tx_conf;
+	struct ntb_queue_conf ntb_q_conf;
+	struct ntb_dev_config ntb_conf;
+	struct ntb_dev_info ntb_info;
+	uint64_t ntb_link_status;
+	uint32_t nb_mbuf;
 	int ret, i;
 
 	signal(SIGINT, signal_handler);
@@ -353,6 +1301,9 @@ main(int argc, char **argv)
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Error with EAL initialization.\n");
 
+	if (rte_lcore_count() < 2)
+		rte_exit(EXIT_FAILURE, "Need at least 2 cores\n");
+
 	/* Find 1st ntb rawdev. */
 	for (i = 0; i < RTE_RAWDEV_MAX_DEVS; i++)
 		if (rte_rawdevs[i].driver_name &&
@@ -368,15 +1319,118 @@ main(int argc, char **argv)
 	argc -= ret;
 	argv += ret;
 
-	ret = parse_args(argc, argv);
+	ntb_parse_args(argc, argv);
+
+	rte_rawdev_set_attr(dev_id, NTB_QUEUE_SZ_NAME, nb_desc);
+	printf("Set queue size as %u.\n", nb_desc);
+	rte_rawdev_set_attr(dev_id, NTB_QUEUE_NUM_NAME, num_queues);
+	printf("Set queue number as %u.\n", num_queues);
+	ntb_rawdev_info.dev_private = (rte_rawdev_obj_t)(&ntb_info);
+	rte_rawdev_info_get(dev_id, &ntb_rawdev_info);
+
+	nb_mbuf = nb_desc * num_queues * 2 * 2 + rte_lcore_count() *
+		  MEMPOOL_CACHE_SIZE;
+	mbuf_pool = ntb_mbuf_pool_create(ntb_buf_size, nb_mbuf, ntb_info,
+					 &ntb_conf, rte_socket_id());
+	if (mbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool.\n");
+
+	ntb_conf.num_queues = num_queues;
+	ntb_conf.queue_size = nb_desc;
+	ntb_rawdev_conf.dev_private = (rte_rawdev_obj_t)(&ntb_conf);
+	ret = rte_rawdev_configure(dev_id, &ntb_rawdev_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Can't config ntb dev: err=%d, "
+			"port=%u\n", ret, dev_id);
+
+	ntb_q_conf.tx_free_thresh = tx_free_thresh;
+	ntb_q_conf.nb_desc = nb_desc;
+	ntb_q_conf.rx_mp = mbuf_pool;
+	for (i = 0; i < num_queues; i++) {
+		/* Setup rawdev queue */
+		ret = rte_rawdev_queue_setup(dev_id, i, &ntb_q_conf);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE,
+				"Failed to setup ntb queue %u.\n", i);
+	}
+
+	/* Waiting for peer dev up at most 100s.*/
+	printf("Checking ntb link status...\n");
+	for (i = 0; i < 1000; i++) {
+		rte_rawdev_get_attr(dev_id, NTB_LINK_STATUS_NAME,
+				    &ntb_link_status);
+		if (ntb_link_status) {
+			printf("Peer dev ready, ntb link up.\n");
+			break;
+		}
+		rte_delay_ms(100);
+	}
+	rte_rawdev_get_attr(dev_id, NTB_LINK_STATUS_NAME, &ntb_link_status);
+	if (ntb_link_status == 0)
+		printf("Expire 100s. Link is not up. Please restart app.\n");
+
+	ret = rte_rawdev_start(dev_id);
 	if (ret < 0)
-		rte_exit(EXIT_FAILURE, "Invalid arguments\n");
+		rte_exit(EXIT_FAILURE, "rte_rawdev_start: err=%d, port=%u\n",
+			ret, dev_id);
+
+	/* Find 1st ethdev */
+	eth_port_id = rte_eth_find_next(0);
 
-	rte_rawdev_start(dev_id);
+	if (eth_port_id < RTE_MAX_ETHPORTS) {
+		rte_eth_dev_info_get(eth_port_id, &ethdev_info);
+		eth_pconf.rx_adv_conf.rss_conf.rss_hf &=
+				ethdev_info.flow_type_rss_offloads;
+		ret = rte_eth_dev_configure(eth_port_id, num_queues,
+					    num_queues, &eth_pconf);
+		if (ret)
+			rte_exit(EXIT_FAILURE, "Can't config ethdev: err=%d, "
+				"port=%u\n", ret, eth_port_id);
+		eth_rx_conf = ethdev_info.default_rxconf;
+		eth_rx_conf.offloads = eth_pconf.rxmode.offloads;
+		eth_tx_conf = ethdev_info.default_txconf;
+		eth_tx_conf.offloads = eth_pconf.txmode.offloads;
+
+		/* Setup ethdev queue if ethdev exists */
+		for (i = 0; i < num_queues; i++) {
+			ret = rte_eth_rx_queue_setup(eth_port_id, i, nb_desc,
+					rte_eth_dev_socket_id(eth_port_id),
+					&eth_rx_conf, mbuf_pool);
+			if (ret < 0)
+				rte_exit(EXIT_FAILURE,
+					"Failed to setup eth rxq %u.\n", i);
+			ret = rte_eth_tx_queue_setup(eth_port_id, i, nb_desc,
+					rte_eth_dev_socket_id(eth_port_id),
+					&eth_tx_conf);
+			if (ret < 0)
+				rte_exit(EXIT_FAILURE,
+					"Failed to setup eth txq %u.\n", i);
+		}
+
+		ret = rte_eth_dev_start(eth_port_id);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE, "rte_eth_dev_start: err=%d, "
+				"port=%u\n", ret, eth_port_id);
+	}
+
+	/* initialize port stats */
+	memset(&ntb_port_stats, 0, sizeof(ntb_port_stats));
+
+	/* Set default fwd mode if user doesn't set it. */
+	if (fwd_mode == MAX_FWD_MODE && eth_port_id < RTE_MAX_ETHPORTS) {
+		printf("Set default fwd mode as iofwd.\n");
+		fwd_mode = IOFWD;
+	}
+	if (fwd_mode == MAX_FWD_MODE) {
+		printf("Set default fwd mode as file-trans.\n");
+		fwd_mode = FILE_TRANS;
+	}
 
 	if (interactive) {
 		sleep(1);
 		prompt();
+	} else {
+		start_pkt_fwd();
 	}
 
 	return 0;
-- 
2.17.1



More information about the dev mailing list