[dpdk-dev] [PATCH v5 3/5] examples/flow_distributor: sample app to demonstrate EFD usage

Pablo de Lara pablo.de.lara.guarch at intel.com
Mon Jan 16 10:43:22 CET 2017


This new sample app, based on the client/server sample app,
shows the user an scenario using the EFD library.
It consists of:

- A front-end server which has an EFD table that stores the
  node id for each flow key, which will distribute the incoming
  packets to the different nodes

- A back-end node, which has a hash table where node checks,
  after reading packets coming from the server, whether the packet
  is meant to be used in such node, in which case it will be TXed,
  or not, in which case, packet will be dropped.

Signed-off-by: Pablo de Lara <pablo.de.lara.guarch at intel.com>
Signed-off-by: Saikrishna Edupuganti <saikrishna.edupuganti at intel.com>
Acked-by: Christian Maciocco <christian.maciocco at intel.com>
---
 MAINTAINERS                                    |   1 +
 doc/api/examples.dox                           |   4 +
 examples/Makefile                              |   1 +
 examples/flow_distributor/Makefile             |  44 +++
 examples/flow_distributor/distributor/Makefile |  57 ++++
 examples/flow_distributor/distributor/args.c   | 200 ++++++++++++
 examples/flow_distributor/distributor/args.h   |  39 +++
 examples/flow_distributor/distributor/init.c   | 371 ++++++++++++++++++++++
 examples/flow_distributor/distributor/init.h   |  76 +++++
 examples/flow_distributor/distributor/main.c   | 362 +++++++++++++++++++++
 examples/flow_distributor/node/Makefile        |  48 +++
 examples/flow_distributor/node/node.c          | 417 +++++++++++++++++++++++++
 examples/flow_distributor/shared/common.h      |  99 ++++++
 13 files changed, 1719 insertions(+)
 create mode 100644 examples/flow_distributor/Makefile
 create mode 100644 examples/flow_distributor/distributor/Makefile
 create mode 100644 examples/flow_distributor/distributor/args.c
 create mode 100644 examples/flow_distributor/distributor/args.h
 create mode 100644 examples/flow_distributor/distributor/init.c
 create mode 100644 examples/flow_distributor/distributor/init.h
 create mode 100644 examples/flow_distributor/distributor/main.c
 create mode 100644 examples/flow_distributor/node/Makefile
 create mode 100644 examples/flow_distributor/node/node.c
 create mode 100644 examples/flow_distributor/shared/common.h

diff --git a/MAINTAINERS b/MAINTAINERS
index d812962..b124f6e 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -533,6 +533,7 @@ M: Byron Marohn <byron.marohn at intel.com>
 M: Pablo de Lara Guarch <pablo.de.lara.guarch at intel.com>
 F: lib/librte_efd/
 F: app/test/test_efd*
+F: examples/flow_distributor/
 
 Hashes
 M: Bruce Richardson <bruce.richardson at intel.com>
diff --git a/doc/api/examples.dox b/doc/api/examples.dox
index 1626852..c13e574 100644
--- a/doc/api/examples.dox
+++ b/doc/api/examples.dox
@@ -52,6 +52,10 @@
 @example load_balancer/init.c
 @example load_balancer/main.c
 @example load_balancer/runtime.c
+ at example flow_distributor/distributor/args.c
+ at example flow_distributor/distributor/init.c
+ at example flow_distributor/distributor/main.c
+ at example flow_distributor/node/node.c
 @example multi_process/client_server_mp/mp_client/client.c
 @example multi_process/client_server_mp/mp_server/args.c
 @example multi_process/client_server_mp/mp_server/init.c
diff --git a/examples/Makefile b/examples/Makefile
index d49c7f2..b404982 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -45,6 +45,7 @@ DIRS-y += dpdk_qat
 endif
 DIRS-y += ethtool
 DIRS-y += exception_path
+DIRS-$(CONFIG_RTE_LIBRTE_EFD) += flow_distributor
 DIRS-y += helloworld
 DIRS-$(CONFIG_RTE_LIBRTE_PIPELINE) += ip_pipeline
 ifeq ($(CONFIG_RTE_LIBRTE_LPM),y)
diff --git a/examples/flow_distributor/Makefile b/examples/flow_distributor/Makefile
new file mode 100644
index 0000000..5bae706
--- /dev/null
+++ b/examples/flow_distributor/Makefile
@@ -0,0 +1,44 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+DIRS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += distributor
+DIRS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += node
+
+include $(RTE_SDK)/mk/rte.extsubdir.mk
diff --git a/examples/flow_distributor/distributor/Makefile b/examples/flow_distributor/distributor/Makefile
new file mode 100644
index 0000000..8714151
--- /dev/null
+++ b/examples/flow_distributor/distributor/Makefile
@@ -0,0 +1,57 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+ifneq ($(CONFIG_RTE_EXEC_ENV), "linuxapp")
+$(error This application can only operate in a linuxapp environment, \
+please change the definition of the RTE_TARGET environment variable)
+endif
+
+# binary name
+APP = distributor
+
+# all source are stored in SRCS-y
+SRCS-y := main.c init.c args.c
+
+INC := $(wildcard *.h)
+
+CFLAGS += $(WERROR_FLAGS) -O3
+CFLAGS += -I$(SRCDIR)/../shared
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/flow_distributor/distributor/args.c b/examples/flow_distributor/distributor/args.c
new file mode 100644
index 0000000..ee29203
--- /dev/null
+++ b/examples/flow_distributor/distributor/args.c
@@ -0,0 +1,200 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <getopt.h>
+#include <stdarg.h>
+#include <errno.h>
+
+#include <rte_memory.h>
+#include <rte_string_fns.h>
+
+#include "common.h"
+#include "args.h"
+#include "init.h"
+
+/* 1M flows by default */
+#define DEFAULT_NUM_FLOWS    0x100000
+
+/* global var for number of nodes - extern in header */
+uint8_t num_nodes;
+/* global var for number of flows - extern in header */
+uint32_t num_flows = DEFAULT_NUM_FLOWS;
+
+static const char *progname;
+
+/**
+ * Prints out usage information to stdout
+ */
+static void
+usage(void)
+{
+	printf("%s [EAL options] -- -p PORTMASK -n NUM_NODES -f NUM_FLOWS\n"
+		" -p PORTMASK: hexadecimal bitmask of ports to use\n"
+		" -n NUM_NODES: number of node processes to use\n"
+		" -f NUM_FLOWS: number of flows to be added in the EFD table\n",
+		progname);
+}
+
+/**
+ * The ports to be used by the application are passed in
+ * the form of a bitmask. This function parses the bitmask
+ * and places the port numbers to be used into the port[]
+ * array variable
+ */
+static int
+parse_portmask(uint8_t max_ports, const char *portmask)
+{
+	char *end = NULL;
+	unsigned long pm;
+	uint8_t count = 0;
+
+	if (portmask == NULL || *portmask == '\0')
+		return -1;
+
+	/* convert parameter to a number and verify */
+	pm = strtoul(portmask, &end, 16);
+	if (end == NULL || *end != '\0' || pm == 0)
+		return -1;
+
+	/* loop through bits of the mask and mark ports */
+	while (pm != 0) {
+		if (pm & 0x01) { /* bit is set in mask, use port */
+			if (count >= max_ports)
+				printf("WARNING: requested port %u not present"
+				" - ignoring\n", (unsigned int)count);
+			else
+			    info->id[info->num_ports++] = count;
+		}
+		pm = (pm >> 1);
+		count++;
+	}
+
+	return 0;
+}
+
+/**
+ * Take the number of nodes parameter passed to the app
+ * and convert to a number to store in the num_nodes variable
+ */
+static int
+parse_num_nodes(const char *nodes)
+{
+	char *end = NULL;
+	unsigned long temp;
+
+	if (nodes == NULL || *nodes == '\0')
+		return -1;
+
+	temp = strtoul(nodes, &end, 10);
+	if (end == NULL || *end != '\0' || temp == 0)
+		return -1;
+
+	num_nodes = (uint8_t)temp;
+	return 0;
+}
+
+static int
+parse_num_flows(const char *flows)
+{
+	char *end = NULL;
+
+	/* parse hexadecimal string */
+	num_flows = strtoul(flows, &end, 16);
+	if ((flows[0] == '\0') || (end == NULL) || (*end != '\0'))
+		return -1;
+
+	if (num_flows == 0)
+		return -1;
+
+	return 0;
+}
+
+/**
+ * The application specific arguments follow the DPDK-specific
+ * arguments which are stripped by the DPDK init. This function
+ * processes these application arguments, printing usage info
+ * on error.
+ */
+int
+parse_app_args(uint8_t max_ports, int argc, char *argv[])
+{
+	int option_index, opt;
+	char **argvopt = argv;
+	static struct option lgopts[] = { /* no long options */
+		{NULL, 0, 0, 0 }
+	};
+	progname = argv[0];
+
+	while ((opt = getopt_long(argc, argvopt, "n:f:p:", lgopts,
+			&option_index)) != EOF) {
+		switch (opt) {
+		case 'p':
+			if (parse_portmask(max_ports, optarg) != 0) {
+				usage();
+				return -1;
+			}
+			break;
+		case 'n':
+			if (parse_num_nodes(optarg) != 0) {
+				usage();
+				return -1;
+			}
+			break;
+		case 'f':
+			if (parse_num_flows(optarg) != 0) {
+				usage();
+				return -1;
+			}
+			break;
+		default:
+			printf("ERROR: Unknown option '%c'\n", opt);
+			usage();
+			return -1;
+		}
+	}
+
+	if (info->num_ports == 0 || num_nodes == 0) {
+		usage();
+		return -1;
+	}
+
+	if (info->num_ports % 2 != 0) {
+		printf("ERROR: application requires an even "
+				"number of ports to use\n");
+		return -1;
+	}
+	return 0;
+}
diff --git a/examples/flow_distributor/distributor/args.h b/examples/flow_distributor/distributor/args.h
new file mode 100644
index 0000000..cacf395
--- /dev/null
+++ b/examples/flow_distributor/distributor/args.h
@@ -0,0 +1,39 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _ARGS_H_
+#define _ARGS_H_
+
+int parse_app_args(uint8_t max_ports, int argc, char *argv[]);
+
+#endif /* ifndef _ARGS_H_ */
diff --git a/examples/flow_distributor/distributor/init.c b/examples/flow_distributor/distributor/init.c
new file mode 100644
index 0000000..3b0aa85
--- /dev/null
+++ b/examples/flow_distributor/distributor/init.c
@@ -0,0 +1,371 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <inttypes.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_byteorder.h>
+#include <rte_atomic.h>
+#include <rte_launch.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_debug.h>
+#include <rte_ring.h>
+#include <rte_log.h>
+#include <rte_mempool.h>
+#include <rte_memcpy.h>
+#include <rte_mbuf.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_malloc.h>
+#include <rte_string_fns.h>
+#include <rte_cycles.h>
+#include <rte_efd.h>
+#include <rte_hash.h>
+
+#include "common.h"
+#include "args.h"
+#include "init.h"
+
+#define MBUFS_PER_NODE 1536
+#define MBUFS_PER_PORT 1536
+#define MBUF_CACHE_SIZE 512
+
+#define RTE_MP_RX_DESC_DEFAULT 512
+#define RTE_MP_TX_DESC_DEFAULT 512
+#define NODE_QUEUE_RINGSIZE 128
+
+#define NO_FLAGS 0
+
+/* The mbuf pool for packet rx */
+struct rte_mempool *pktmbuf_pool;
+
+/* array of info/queues for nodes */
+struct node *nodes;
+
+/* Flow distributor table */
+struct rte_efd_table *efd_table;
+
+/* Shared info between distributor and nodes */
+struct shared_info *info;
+
+/**
+ * Initialise the mbuf pool for packet reception for the NIC, and any other
+ * buffer pools needed by the app - currently none.
+ */
+static int
+init_mbuf_pools(void)
+{
+	const unsigned int num_mbufs = (num_nodes * MBUFS_PER_NODE) +
+			(info->num_ports * MBUFS_PER_PORT);
+
+	/*
+	 * Don't pass single-producer/single-consumer flags to mbuf create as it
+	 * seems faster to use a cache instead
+	 */
+	printf("Creating mbuf pool '%s' [%u mbufs] ...\n",
+			PKTMBUF_POOL_NAME, num_mbufs);
+	pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_NAME, num_mbufs,
+		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+
+	return pktmbuf_pool == NULL; /* 0  on success */
+}
+
+/**
+ * Initialise an individual port:
+ * - configure number of rx and tx rings
+ * - set up each rx ring, to pull from the main mbuf pool
+ * - set up each tx ring
+ * - start the port and report its status to stdout
+ */
+static int
+init_port(uint8_t port_num)
+{
+	/* for port configuration all features are off by default */
+	const struct rte_eth_conf port_conf = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS
+		}
+	};
+	const uint16_t rx_rings = 1, tx_rings = num_nodes;
+	const uint16_t rx_ring_size = RTE_MP_RX_DESC_DEFAULT;
+	const uint16_t tx_ring_size = RTE_MP_TX_DESC_DEFAULT;
+
+	uint16_t q;
+	int retval;
+
+	printf("Port %u init ... ", (unsigned int)port_num);
+	fflush(stdout);
+
+	/*
+	 * Standard DPDK port initialisation - config port, then set up
+	 * rx and tx rings.
+	 */
+	retval = rte_eth_dev_configure(port_num, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port_num, q, rx_ring_size,
+				rte_eth_dev_socket_id(port_num),
+				NULL, pktmbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port_num, q, tx_ring_size,
+				rte_eth_dev_socket_id(port_num),
+				NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	rte_eth_promiscuous_enable(port_num);
+
+	retval = rte_eth_dev_start(port_num);
+	if (retval < 0)
+		return retval;
+
+	printf("done:\n");
+
+	return 0;
+}
+
+/**
+ * Set up the DPDK rings which will be used to pass packets, via
+ * pointers, between the multi-process distributor and node processes.
+ * Each node needs one RX queue.
+ */
+static int
+init_shm_rings(void)
+{
+	unsigned int i;
+	unsigned int socket_id;
+	const char *q_name;
+	const unsigned int ringsize = NODE_QUEUE_RINGSIZE;
+
+	nodes = rte_malloc("node details",
+		sizeof(*nodes) * num_nodes, 0);
+	if (nodes == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot allocate memory for "
+				"node program details\n");
+
+	for (i = 0; i < num_nodes; i++) {
+		/* Create an RX queue for each node */
+		socket_id = rte_socket_id();
+		q_name = get_rx_queue_name(i);
+		nodes[i].rx_q = rte_ring_create(q_name,
+				ringsize, socket_id,
+				RING_F_SP_ENQ | RING_F_SC_DEQ);
+		if (nodes[i].rx_q == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot create rx ring queue "
+					"for node %u\n", i);
+	}
+	return 0;
+}
+
+/*
+ * Create flow distributor table which will contain all the flows
+ * that will be distributed among the nodes
+ */
+static void
+create_flow_distributor_table(void)
+{
+	uint8_t socket_id = rte_socket_id();
+
+	/* create table */
+	efd_table = rte_efd_create("flow table", num_flows * 2, sizeof(uint32_t),
+			1 << socket_id,	socket_id);
+
+	if (efd_table == NULL)
+		rte_exit(EXIT_FAILURE, "Problem creating the flow table\n");
+}
+
+static void
+populate_flow_distributor_table(void)
+{
+	unsigned int i;
+	int32_t ret;
+	uint32_t ip_dst;
+	uint8_t socket_id = rte_socket_id();
+	uint64_t node_id;
+
+	/* Add flows in table */
+	for (i = 0; i < num_flows; i++) {
+		node_id = i % num_nodes;
+
+		ip_dst = rte_cpu_to_be_32(i);
+		ret = rte_efd_update(efd_table, socket_id,
+				(void *)&ip_dst, (efd_value_t)node_id);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE, "Unable to add entry %u in "
+					"flow distributor table\n", i);
+	}
+
+	printf("EFD table: Adding 0x%x keys\n", num_flows);
+}
+
+/* Check the link status of all ports in up to 9s, and print them finally */
+static void
+check_all_ports_link_status(uint8_t port_num, uint32_t port_mask)
+{
+#define CHECK_INTERVAL 100 /* 100ms */
+#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
+	uint8_t portid, count, all_ports_up, print_flag = 0;
+	struct rte_eth_link link;
+
+	printf("\nChecking link status");
+	fflush(stdout);
+	for (count = 0; count <= MAX_CHECK_TIME; count++) {
+		all_ports_up = 1;
+		for (portid = 0; portid < port_num; portid++) {
+			if ((port_mask & (1 << info->id[portid])) == 0)
+				continue;
+			memset(&link, 0, sizeof(link));
+			rte_eth_link_get_nowait(info->id[portid], &link);
+			/* print link status if flag set */
+			if (print_flag == 1) {
+				if (link.link_status)
+					printf("Port %d Link Up - speed %u "
+						"Mbps - %s\n", info->id[portid],
+						(unsigned int)link.link_speed,
+				(link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
+					("full-duplex") : ("half-duplex\n"));
+				else
+					printf("Port %d Link Down\n",
+						(uint8_t)info->id[portid]);
+				continue;
+			}
+			/* clear all_ports_up flag if any link down */
+			if (link.link_status == ETH_LINK_DOWN) {
+				all_ports_up = 0;
+				break;
+			}
+		}
+		/* after finally printing all link status, get out */
+		if (print_flag == 1)
+			break;
+
+		if (all_ports_up == 0) {
+			printf(".");
+			fflush(stdout);
+			rte_delay_ms(CHECK_INTERVAL);
+		}
+
+		/* set the print_flag if all ports up or timeout */
+		if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
+			print_flag = 1;
+			printf("done\n");
+		}
+	}
+}
+
+/**
+ * Main init function for the multi-process distributor app,
+ * calls subfunctions to do each stage of the initialisation.
+ */
+int
+init(int argc, char *argv[])
+{
+	int retval;
+	const struct rte_memzone *mz;
+	uint8_t i, total_ports;
+
+	/* init EAL, parsing EAL args */
+	retval = rte_eal_init(argc, argv);
+	if (retval < 0)
+		return -1;
+	argc -= retval;
+	argv += retval;
+
+	/* get total number of ports */
+	total_ports = rte_eth_dev_count();
+
+	/* set up array for port data */
+	mz = rte_memzone_reserve(MZ_SHARED_INFO, sizeof(*info),
+				rte_socket_id(), NO_FLAGS);
+	if (mz == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot reserve memory zone "
+				"for port information\n");
+	memset(mz->addr, 0, sizeof(*info));
+	info = mz->addr;
+
+	/* parse additional, application arguments */
+	retval = parse_app_args(total_ports, argc, argv);
+	if (retval != 0)
+		return -1;
+
+	/* initialise mbuf pools */
+	retval = init_mbuf_pools();
+	if (retval != 0)
+		rte_exit(EXIT_FAILURE, "Cannot create needed mbuf pools\n");
+
+	/* now initialise the ports we will use */
+	for (i = 0; i < info->num_ports; i++) {
+		retval = init_port(info->id[i]);
+		if (retval != 0)
+			rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n",
+					(unsigned int) i);
+	}
+
+	check_all_ports_link_status(info->num_ports, (~0x0));
+
+	/* initialise the node queues/rings for inter-eu comms */
+	init_shm_rings();
+
+	/* Create the flow distributor table */
+	create_flow_distributor_table();
+
+	/* Populate the flow distributor table */
+	populate_flow_distributor_table();
+
+	/* Share the total number of nodes */
+	info->num_nodes = num_nodes;
+
+	/* Share the total number of flows */
+	info->num_flows = num_flows;
+	return 0;
+}
diff --git a/examples/flow_distributor/distributor/init.h b/examples/flow_distributor/distributor/init.h
new file mode 100644
index 0000000..d11aacf
--- /dev/null
+++ b/examples/flow_distributor/distributor/init.h
@@ -0,0 +1,76 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _INIT_H_
+#define _INIT_H_
+
+/*
+ * #include <rte_ring.h>
+ * #include "args.h"
+ */
+
+/*
+ * Define a node structure with all needed info, including
+ * stats from the nodes.
+ */
+struct node {
+	struct rte_ring *rx_q;
+	unsigned int node_id;
+	/* these stats hold how many packets the node will actually receive,
+	 * and how many packets were dropped because the node's queue was full.
+	 * The port-info stats, in contrast, record how many packets were received
+	 * or transmitted on an actual NIC port.
+	 */
+	struct {
+		uint64_t rx;
+		uint64_t rx_drop;
+	} stats;
+};
+
+extern struct rte_efd_table *efd_table;
+extern struct node *nodes;
+
+/*
+ * shared information between distributor and nodes: number of clients,
+ * port numbers, rx and tx stats etc.
+ */
+extern struct shared_info *info;
+
+extern struct rte_mempool *pktmbuf_pool;
+extern uint8_t num_nodes;
+extern unsigned int num_sockets;
+extern uint32_t num_flows;
+
+int init(int argc, char *argv[]);
+
+#endif /* ifndef _INIT_H_ */
diff --git a/examples/flow_distributor/distributor/main.c b/examples/flow_distributor/distributor/main.c
new file mode 100644
index 0000000..f97f003
--- /dev/null
+++ b/examples/flow_distributor/distributor/main.c
@@ -0,0 +1,362 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdarg.h>
+#include <inttypes.h>
+#include <inttypes.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <netinet/ip.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_byteorder.h>
+#include <rte_launch.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_atomic.h>
+#include <rte_ring.h>
+#include <rte_log.h>
+#include <rte_debug.h>
+#include <rte_mempool.h>
+#include <rte_memcpy.h>
+#include <rte_mbuf.h>
+#include <rte_ether.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_ethdev.h>
+#include <rte_byteorder.h>
+#include <rte_malloc.h>
+#include <rte_string_fns.h>
+#include <rte_efd.h>
+#include <rte_ip.h>
+
+#include "common.h"
+#include "args.h"
+#include "init.h"
+
+/*
+ * When doing reads from the NIC or the node queues,
+ * use this batch size
+ */
+#define PACKET_READ_SIZE 32
+
+/*
+ * Local buffers to put packets in, used to send packets in bursts to the
+ * nodes
+ */
+struct node_rx_buf {
+	struct rte_mbuf *buffer[PACKET_READ_SIZE];
+	uint16_t count;
+};
+
+struct flow_distributor_stats {
+	uint64_t distributed;
+	uint64_t drop;
+} flow_dist_stats;
+
+/* One buffer per node rx queue - dynamically allocate array */
+static struct node_rx_buf *cl_rx_buf;
+
+static const char *
+get_printable_mac_addr(uint8_t port)
+{
+	static const char err_address[] = "00:00:00:00:00:00";
+	static char addresses[RTE_MAX_ETHPORTS][sizeof(err_address)];
+	struct ether_addr mac;
+
+	if (unlikely(port >= RTE_MAX_ETHPORTS))
+		return err_address;
+	if (unlikely(addresses[port][0] == '\0')) {
+		rte_eth_macaddr_get(port, &mac);
+		snprintf(addresses[port], sizeof(addresses[port]),
+				"%02x:%02x:%02x:%02x:%02x:%02x\n",
+				mac.addr_bytes[0], mac.addr_bytes[1],
+				mac.addr_bytes[2], mac.addr_bytes[3],
+				mac.addr_bytes[4], mac.addr_bytes[5]);
+	}
+	return addresses[port];
+}
+
+/*
+ * This function displays the recorded statistics for each port
+ * and for each node. It uses ANSI terminal codes to clear
+ * screen when called. It is called from a single non-master
+ * thread in the distributor process, when the process is run with more
+ * than one lcore enabled.
+ */
+static void
+do_stats_display(void)
+{
+	unsigned int i, j;
+	const char clr[] = {27, '[', '2', 'J', '\0'};
+	const char topLeft[] = {27, '[', '1', ';', '1', 'H', '\0'};
+	uint64_t port_tx[RTE_MAX_ETHPORTS], port_tx_drop[RTE_MAX_ETHPORTS];
+	uint64_t node_tx[MAX_NODES], node_tx_drop[MAX_NODES];
+
+	/* to get TX stats, we need to do some summing calculations */
+	memset(port_tx, 0, sizeof(port_tx));
+	memset(port_tx_drop, 0, sizeof(port_tx_drop));
+	memset(node_tx, 0, sizeof(node_tx));
+	memset(node_tx_drop, 0, sizeof(node_tx_drop));
+
+	for (i = 0; i < num_nodes; i++) {
+		const struct tx_stats *tx = &info->tx_stats[i];
+
+		for (j = 0; j < info->num_ports; j++) {
+			const uint64_t tx_val = tx->tx[info->id[j]];
+			const uint64_t drop_val = tx->tx_drop[info->id[j]];
+
+			port_tx[j] += tx_val;
+			port_tx_drop[j] += drop_val;
+			node_tx[i] += tx_val;
+			node_tx_drop[i] += drop_val;
+		}
+	}
+
+	/* Clear screen and move to top left */
+	printf("%s%s", clr, topLeft);
+
+	printf("PORTS\n");
+	printf("-----\n");
+	for (i = 0; i < info->num_ports; i++)
+		printf("Port %u: '%s'\t", (unsigned int)info->id[i],
+				get_printable_mac_addr(info->id[i]));
+	printf("\n\n");
+	for (i = 0; i < info->num_ports; i++) {
+		printf("Port %u - rx: %9"PRIu64"\t"
+				"tx: %9"PRIu64"\n",
+				(unsigned int)info->id[i], info->rx_stats.rx[i],
+				port_tx[i]);
+	}
+
+	printf("\nFLOW DISTRIBUTOR\n");
+	printf("-----\n");
+	printf("distributed: %9"PRIu64", drop: %9"PRIu64"\n",
+			flow_dist_stats.distributed, flow_dist_stats.drop);
+
+	printf("\nNODES\n");
+	printf("-------\n");
+	for (i = 0; i < num_nodes; i++) {
+		const unsigned long long rx = nodes[i].stats.rx;
+		const unsigned long long rx_drop = nodes[i].stats.rx_drop;
+		const struct filter_stats *filter = &info->filter_stats[i];
+
+		printf("Node %2u - rx: %9llu, rx_drop: %9llu\n"
+				"            tx: %9"PRIu64", tx_drop: %9"PRIu64"\n"
+				"            filter_passed: %9"PRIu64", "
+				"filter_drop: %9"PRIu64"\n",
+				i, rx, rx_drop, node_tx[i], node_tx_drop[i],
+				filter->passed, filter->drop);
+	}
+
+	printf("\n");
+}
+
+/*
+ * The function called from each non-master lcore used by the process.
+ * The test_and_set function is used to randomly pick a single lcore on which
+ * the code to display the statistics will run. Otherwise, the code just
+ * repeatedly sleeps.
+ */
+static int
+sleep_lcore(__attribute__((unused)) void *dummy)
+{
+	/* Used to pick a display thread - static, so zero-initialised */
+	static rte_atomic32_t display_stats;
+
+	/* Only one core should display stats */
+	if (rte_atomic32_test_and_set(&display_stats)) {
+		const unsigned int sleeptime = 1;
+
+		printf("Core %u displaying statistics\n", rte_lcore_id());
+
+		/* Longer initial pause so above printf is seen */
+		sleep(sleeptime * 3);
+
+		/* Loop forever: sleep always returns 0 or <= param */
+		while (sleep(sleeptime) <= sleeptime)
+			do_stats_display();
+	}
+	return 0;
+}
+
+/*
+ * Function to set all the node statistic values to zero.
+ * Called at program startup.
+ */
+static void
+clear_stats(void)
+{
+	unsigned int i;
+
+	for (i = 0; i < num_nodes; i++)
+		nodes[i].stats.rx = nodes[i].stats.rx_drop = 0;
+}
+
+/*
+ * send a burst of traffic to a node, assuming there are packets
+ * available to be sent to this node
+ */
+static void
+flush_rx_queue(uint16_t node)
+{
+	uint16_t j;
+	struct node *cl;
+
+	if (cl_rx_buf[node].count == 0)
+		return;
+
+	cl = &nodes[node];
+	if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
+			cl_rx_buf[node].count) != 0){
+		for (j = 0; j < cl_rx_buf[node].count; j++)
+			rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
+		cl->stats.rx_drop += cl_rx_buf[node].count;
+	} else
+		cl->stats.rx += cl_rx_buf[node].count;
+
+	cl_rx_buf[node].count = 0;
+}
+
+/*
+ * marks a packet down to be sent to a particular node process
+ */
+static inline void
+enqueue_rx_packet(uint8_t node, struct rte_mbuf *buf)
+{
+	cl_rx_buf[node].buffer[cl_rx_buf[node].count++] = buf;
+}
+
+/*
+ * This function takes a group of packets and routes them
+ * individually to the node process. Very simply round-robins the packets
+ * without checking any of the packet contents.
+ */
+static void
+process_packets(uint32_t port_num __rte_unused, struct rte_mbuf *pkts[],
+		uint16_t rx_count, unsigned int socket_id)
+{
+	uint16_t i;
+	uint8_t node;
+	efd_value_t data[RTE_EFD_BURST_MAX];
+	const void *key_ptrs[RTE_EFD_BURST_MAX];
+
+	struct ipv4_hdr *ipv4_hdr;
+	uint32_t ipv4_dst_ip[RTE_EFD_BURST_MAX];
+
+	for (i = 0; i < rx_count; i++) {
+		/* Handle IPv4 header.*/
+		ipv4_hdr = rte_pktmbuf_mtod_offset(pkts[i], struct ipv4_hdr *,
+				sizeof(struct ether_hdr));
+		ipv4_dst_ip[i] = ipv4_hdr->dst_addr;
+		key_ptrs[i] = (void *)&ipv4_dst_ip[i];
+	}
+
+	rte_efd_lookup_bulk(efd_table, socket_id, rx_count,
+				(const void **) key_ptrs, data);
+	for (i = 0; i < rx_count; i++) {
+		node = (uint8_t) ((uintptr_t)data[i]);
+
+		if (node >= num_nodes) {
+			/*
+			 * Node is out of range, which means that
+			 * flow has not been inserted
+			 */
+			flow_dist_stats.drop++;
+			rte_pktmbuf_free(pkts[i]);
+		} else {
+			flow_dist_stats.distributed++;
+			enqueue_rx_packet(node, pkts[i]);
+		}
+	}
+
+	for (i = 0; i < num_nodes; i++)
+		flush_rx_queue(i);
+}
+
+/*
+ * Function called by the master lcore of the DPDK process.
+ */
+static void
+do_packet_forwarding(void)
+{
+	unsigned int port_num = 0; /* indexes the port[] array */
+	unsigned int socket_id = rte_socket_id();
+
+	for (;;) {
+		struct rte_mbuf *buf[PACKET_READ_SIZE];
+		uint16_t rx_count;
+
+		/* read a port */
+		rx_count = rte_eth_rx_burst(info->id[port_num], 0,
+				buf, PACKET_READ_SIZE);
+		info->rx_stats.rx[port_num] += rx_count;
+
+		/* Now process the NIC packets read */
+		if (likely(rx_count > 0))
+			process_packets(port_num, buf, rx_count, socket_id);
+
+		/* move to next port */
+		if (++port_num == info->num_ports)
+			port_num = 0;
+	}
+}
+
+int
+main(int argc, char *argv[])
+{
+	/* initialise the system */
+	if (init(argc, argv) < 0)
+		return -1;
+	RTE_LOG(INFO, APP, "Finished Process Init.\n");
+
+	cl_rx_buf = calloc(num_nodes, sizeof(cl_rx_buf[0]));
+
+	/* clear statistics */
+	clear_stats();
+
+	/* put all other cores to sleep bar master */
+	rte_eal_mp_remote_launch(sleep_lcore, NULL, SKIP_MASTER);
+
+	do_packet_forwarding();
+	return 0;
+}
diff --git a/examples/flow_distributor/node/Makefile b/examples/flow_distributor/node/Makefile
new file mode 100644
index 0000000..8cf7b65
--- /dev/null
+++ b/examples/flow_distributor/node/Makefile
@@ -0,0 +1,48 @@
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = node
+
+# all source are stored in SRCS-y
+SRCS-y := node.c
+
+CFLAGS += $(WERROR_FLAGS) -O3
+CFLAGS += -I$(SRCDIR)/../shared
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/flow_distributor/node/node.c b/examples/flow_distributor/node/node.c
new file mode 100644
index 0000000..1f1e7e7
--- /dev/null
+++ b/examples/flow_distributor/node/node.c
@@ -0,0 +1,417 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <inttypes.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <sys/queue.h>
+#include <stdlib.h>
+#include <getopt.h>
+#include <string.h>
+
+#include <rte_common.h>
+#include <rte_malloc.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_log.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_lcore.h>
+#include <rte_ring.h>
+#include <rte_launch.h>
+#include <rte_lcore.h>
+#include <rte_debug.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_string_fns.h>
+#include <rte_ip.h>
+
+#include "common.h"
+
+/* Number of packets to attempt to read from queue */
+#define PKT_READ_SIZE  ((uint16_t)32)
+
+/*
+ * Our node id number - tells us which rx queue to read, and NIC TX
+ * queue to write to.
+ */
+static uint8_t node_id;
+
+#define MBQ_CAPACITY 32
+
+/* maps input ports to output ports for packets */
+static uint8_t output_ports[RTE_MAX_ETHPORTS];
+
+/* buffers up a set of packet that are ready to send */
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
+/* shared data from distributor. We update statistics here */
+static struct tx_stats *tx_stats;
+
+static struct filter_stats *filter_stats;
+
+/*
+ * print a usage message
+ */
+static void
+usage(const char *progname)
+{
+	printf("Usage: %s [EAL args] -- -n <node_id>\n\n", progname);
+}
+
+/*
+ * Convert the node id number from a string to an int.
+ */
+static int
+parse_node_num(const char *node)
+{
+	char *end = NULL;
+	unsigned long temp;
+
+	if (node == NULL || *node == '\0')
+		return -1;
+
+	temp = strtoul(node, &end, 10);
+	if (end == NULL || *end != '\0')
+		return -1;
+
+	node_id = (uint8_t)temp;
+	return 0;
+}
+
+/*
+ * Parse the application arguments to the node app.
+ */
+static int
+parse_app_args(int argc, char *argv[])
+{
+	int option_index, opt;
+	char **argvopt = argv;
+	const char *progname = NULL;
+	static struct option lgopts[] = { /* no long options */
+		{NULL, 0, 0, 0 }
+	};
+	progname = argv[0];
+
+	while ((opt = getopt_long(argc, argvopt, "n:", lgopts,
+		&option_index)) != EOF) {
+		switch (opt) {
+		case 'n':
+			if (parse_node_num(optarg) != 0) {
+				usage(progname);
+				return -1;
+			}
+			break;
+		default:
+			usage(progname);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*
+ * Tx buffer error callback
+ */
+static void
+flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count,
+		void *userdata) {
+	int i;
+	uint8_t port_id = (uintptr_t)userdata;
+
+	tx_stats->tx_drop[port_id] += count;
+
+	/* free the mbufs which failed from transmit */
+	for (i = 0; i < count; i++)
+		rte_pktmbuf_free(unsent[i]);
+
+}
+
+static void
+configure_tx_buffer(uint8_t port_id, uint16_t size)
+{
+	int ret;
+
+	/* Initialize TX buffers */
+	tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer",
+			RTE_ETH_TX_BUFFER_SIZE(size), 0,
+			rte_eth_dev_socket_id(port_id));
+	if (tx_buffer[port_id] == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx "
+				"on port %u\n", (unsigned int) port_id);
+
+	rte_eth_tx_buffer_init(tx_buffer[port_id], size);
+
+	ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id],
+			flush_tx_error_callback, (void *)(intptr_t)port_id);
+	if (ret < 0)
+		rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+			"tx buffer on port %u\n", (unsigned int) port_id);
+}
+
+/*
+ * set up output ports so that all traffic on port gets sent out
+ * its paired port. Index using actual port numbers since that is
+ * what comes in the mbuf structure.
+ */
+static void
+configure_output_ports(const struct shared_info *info)
+{
+	int i;
+
+	if (info->num_ports > RTE_MAX_ETHPORTS)
+		rte_exit(EXIT_FAILURE, "Too many ethernet ports. "
+				"RTE_MAX_ETHPORTS = %u\n",
+				(unsigned int)RTE_MAX_ETHPORTS);
+	for (i = 0; i < info->num_ports - 1; i += 2) {
+		uint8_t p1 = info->id[i];
+		uint8_t p2 = info->id[i+1];
+
+		output_ports[p1] = p2;
+		output_ports[p2] = p1;
+
+		configure_tx_buffer(p1, MBQ_CAPACITY);
+		configure_tx_buffer(p2, MBQ_CAPACITY);
+
+	}
+}
+
+/*
+ * Create the hash table that will contain the flows that
+ * the node will handle, which will be used to decide if packet
+ * is transmitted or dropped.
+ */
+static struct rte_hash *
+create_hash_table(const struct shared_info *info)
+{
+	uint32_t num_flows_node = info->num_flows / info->num_nodes;
+	char name[RTE_HASH_NAMESIZE];
+	struct rte_hash *h;
+
+	/* create table */
+	struct rte_hash_parameters hash_params = {
+		.entries = num_flows_node * 2, /* table load = 50% */
+		.key_len = sizeof(uint32_t), /* Store IPv4 dest IP address */
+		.socket_id = rte_socket_id(),
+		.hash_func_init_val = 0,
+	};
+
+	snprintf(name, sizeof(name), "hash_table_%d", node_id);
+	hash_params.name = name;
+	h = rte_hash_create(&hash_params);
+
+	if (h == NULL)
+		rte_exit(EXIT_FAILURE,
+				"Problem creating the hash table for node %d\n",
+				node_id);
+	return h;
+}
+
+static void
+populate_hash_table(const struct rte_hash *h, const struct shared_info *info)
+{
+	unsigned int i;
+	int32_t ret;
+	uint32_t ip_dst;
+	uint32_t num_flows_node = 0;
+	uint64_t target_node;
+
+	/* Add flows in table */
+	for (i = 0; i < info->num_flows; i++) {
+		target_node = i % info->num_nodes;
+		if (target_node != node_id)
+			continue;
+
+		ip_dst = rte_cpu_to_be_32(i);
+
+		ret = rte_hash_add_key(h, (void *) &ip_dst);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE, "Unable to add entry %u "
+					"in hash table\n", i);
+		else
+			num_flows_node++;
+
+	}
+
+	printf("Hash table: Adding 0x%x keys\n", num_flows_node);
+}
+
+/*
+ * This function performs routing of packets
+ * Just sends each input packet out an output port based solely on the input
+ * port it arrived on.
+ */
+static inline void
+transmit_packet(struct rte_mbuf *buf)
+{
+	int sent;
+	const uint8_t in_port = buf->port;
+	const uint8_t out_port = output_ports[in_port];
+	struct rte_eth_dev_tx_buffer *buffer = tx_buffer[out_port];
+
+	sent = rte_eth_tx_buffer(out_port, node_id, buffer, buf);
+	if (sent)
+		tx_stats->tx[out_port] += sent;
+
+}
+
+static inline void
+handle_packets(struct rte_hash *h, struct rte_mbuf **bufs, uint16_t num_packets)
+{
+	struct ipv4_hdr *ipv4_hdr;
+	uint32_t ipv4_dst_ip[PKT_READ_SIZE];
+	const void *key_ptrs[PKT_READ_SIZE];
+	unsigned int i;
+	int32_t positions[PKT_READ_SIZE] = {0};
+
+	for (i = 0; i < num_packets; i++) {
+		/* Handle IPv4 header.*/
+		ipv4_hdr = rte_pktmbuf_mtod_offset(bufs[i], struct ipv4_hdr *,
+				sizeof(struct ether_hdr));
+		ipv4_dst_ip[i] = ipv4_hdr->dst_addr;
+		key_ptrs[i] = &ipv4_dst_ip[i];
+	}
+	/* Check if packets belongs to any flows handled by this node */
+	rte_hash_lookup_bulk(h, key_ptrs, num_packets, positions);
+
+	for (i = 0; i < num_packets; i++) {
+		if (likely(positions[i] >= 0)) {
+			filter_stats->passed++;
+			transmit_packet(bufs[i]);
+		} else {
+			filter_stats->drop++;
+			/* Drop packet, as flow is not handled by this node */
+			rte_pktmbuf_free(bufs[i]);
+		}
+	}
+}
+
+/*
+ * Application main function - loops through
+ * receiving and processing packets. Never returns
+ */
+int
+main(int argc, char *argv[])
+{
+	const struct rte_memzone *mz;
+	struct rte_ring *rx_ring;
+	struct rte_hash *h;
+	struct rte_mempool *mp;
+	struct shared_info *info;
+	int need_flush = 0; /* indicates whether we have unsent packets */
+	int retval;
+	void *pkts[PKT_READ_SIZE];
+	uint16_t sent;
+
+	retval = rte_eal_init(argc, argv);
+	if (retval  < 0)
+		return -1;
+	argc -= retval;
+	argv += retval;
+
+	if (parse_app_args(argc, argv) < 0)
+		rte_exit(EXIT_FAILURE, "Invalid command-line arguments\n");
+
+	if (rte_eth_dev_count() == 0)
+		rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
+
+	rx_ring = rte_ring_lookup(get_rx_queue_name(node_id));
+	if (rx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot get RX ring - "
+				"is distributor process running?\n");
+
+	mp = rte_mempool_lookup(PKTMBUF_POOL_NAME);
+	if (mp == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot get mempool for mbufs\n");
+
+	mz = rte_memzone_lookup(MZ_SHARED_INFO);
+	if (mz == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot get port info structure\n");
+	info = mz->addr;
+	tx_stats = &(info->tx_stats[node_id]);
+	filter_stats = &(info->filter_stats[node_id]);
+
+	configure_output_ports(info);
+
+	h = create_hash_table(info);
+
+	populate_hash_table(h, info);
+
+	RTE_LOG(INFO, APP, "Finished Process Init.\n");
+
+	printf("\nNode process %d handling packets\n", node_id);
+	printf("[Press Ctrl-C to quit ...]\n");
+
+	for (;;) {
+		uint16_t  rx_pkts = PKT_READ_SIZE;
+		uint8_t port;
+
+		/*
+		 * Try dequeuing max possible packets first, if that fails,
+		 * get the most we can. Loop body should only execute once,
+		 * maximum
+		 */
+		while (rx_pkts > 0 &&
+				unlikely(rte_ring_dequeue_bulk(rx_ring, pkts,
+					rx_pkts) != 0))
+			rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring),
+					PKT_READ_SIZE);
+
+		if (unlikely(rx_pkts == 0)) {
+			if (need_flush)
+				for (port = 0; port < info->num_ports; port++) {
+					sent = rte_eth_tx_buffer_flush(
+							info->id[port],
+							node_id,
+							tx_buffer[port]);
+					if (unlikely(sent))
+						tx_stats->tx[port] += sent;
+				}
+			need_flush = 0;
+			continue;
+		}
+
+		handle_packets(h, (struct rte_mbuf **)pkts, rx_pkts);
+
+		need_flush = 1;
+	}
+}
diff --git a/examples/flow_distributor/shared/common.h b/examples/flow_distributor/shared/common.h
new file mode 100644
index 0000000..5dcffd6
--- /dev/null
+++ b/examples/flow_distributor/shared/common.h
@@ -0,0 +1,99 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _COMMON_H_
+#define _COMMON_H_
+
+#include <rte_hash_crc.h>
+#include <rte_hash.h>
+
+#define MAX_NODES             16
+/*
+ * Shared port info, including statistics information for display by distributor.
+ * Structure will be put in a memzone.
+ * - All port id values share one cache line as this data will be read-only
+ * during operation.
+ * - All rx statistic values share cache lines, as this data is written only
+ * by the distributor process. (rare reads by stats display)
+ * - The tx statistics have values for all ports per cache line, but the stats
+ * themselves are written by the nodes, so we have a distinct set, on different
+ * cache lines for each node to use.
+ */
+struct rx_stats {
+	uint64_t rx[RTE_MAX_ETHPORTS];
+} __rte_cache_aligned;
+
+struct tx_stats {
+	uint64_t tx[RTE_MAX_ETHPORTS];
+	uint64_t tx_drop[RTE_MAX_ETHPORTS];
+} __rte_cache_aligned;
+
+struct filter_stats {
+	uint64_t drop;
+	uint64_t passed;
+} __rte_cache_aligned;
+
+struct shared_info {
+	uint8_t num_nodes;
+	uint8_t num_ports;
+	uint32_t num_flows;
+	uint8_t id[RTE_MAX_ETHPORTS];
+	struct rx_stats rx_stats;
+	struct tx_stats tx_stats[MAX_NODES];
+	struct filter_stats filter_stats[MAX_NODES];
+};
+
+/* define common names for structures shared between distributor and node */
+#define MP_NODE_RXQ_NAME "MProc_Node_%u_RX"
+#define PKTMBUF_POOL_NAME "MProc_pktmbuf_pool"
+#define MZ_SHARED_INFO "MProc_shared_info"
+
+/*
+ * Given the rx queue name template above, get the queue name
+ */
+static inline const char *
+get_rx_queue_name(unsigned int id)
+{
+	/*
+	 * Buffer for return value. Size calculated by %u being replaced
+	 * by maximum 3 digits (plus an extra byte for safety)
+	 */
+	static char buffer[sizeof(MP_NODE_RXQ_NAME) + 2];
+
+	snprintf(buffer, sizeof(buffer) - 1, MP_NODE_RXQ_NAME, id);
+	return buffer;
+}
+
+#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1
+
+#endif
-- 
2.7.4



More information about the dev mailing list