[dpdk-dev] [PATCH v7 36/42] examples/pipeline: add new example application

Cristian Dumitrescu cristian.dumitrescu at intel.com
Thu Oct 1 12:20:04 CEST 2020


Add new example application to showcase the API of the newly
introduced SWX pipeline type.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu at intel.com>
---
 MAINTAINERS                   |   1 +
 examples/meson.build          |   1 +
 examples/pipeline/Makefile    |  50 ++++
 examples/pipeline/main.c      |  50 ++++
 examples/pipeline/meson.build |  16 +
 examples/pipeline/obj.c       | 470 +++++++++++++++++++++++++++++
 examples/pipeline/obj.h       | 131 ++++++++
 examples/pipeline/thread.c    | 549 ++++++++++++++++++++++++++++++++++
 examples/pipeline/thread.h    |  28 ++
 9 files changed, 1296 insertions(+)
 create mode 100644 examples/pipeline/Makefile
 create mode 100644 examples/pipeline/main.c
 create mode 100644 examples/pipeline/meson.build
 create mode 100644 examples/pipeline/obj.c
 create mode 100644 examples/pipeline/obj.h
 create mode 100644 examples/pipeline/thread.c
 create mode 100644 examples/pipeline/thread.h

diff --git a/MAINTAINERS b/MAINTAINERS
index 49a6dfa7a..df3033cdb 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1331,6 +1331,7 @@ F: app/test/test_table*
 F: app/test-pipeline/
 F: doc/guides/sample_app_ug/test_pipeline.rst
 F: examples/ip_pipeline/
+F: examples/pipeline/
 F: doc/guides/sample_app_ug/ip_pipeline.rst
 
 
diff --git a/examples/meson.build b/examples/meson.build
index eb13e8210..245d98575 100644
--- a/examples/meson.build
+++ b/examples/meson.build
@@ -33,6 +33,7 @@ all_examples = [
 	'ntb', 'packet_ordering',
 	'performance-thread/l3fwd-thread',
 	'performance-thread/pthread_shim',
+	'pipeline',
 	'ptpclient',
 	'qos_meter', 'qos_sched',
 	'rxtx_callbacks',
diff --git a/examples/pipeline/Makefile b/examples/pipeline/Makefile
new file mode 100644
index 000000000..da2f4850b
--- /dev/null
+++ b/examples/pipeline/Makefile
@@ -0,0 +1,50 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2020 Intel Corporation
+
+# binary name
+APP = pipeline
+
+# all source are stored in SRCS-y
+SRCS-y += main.c
+SRCS-y += obj.c
+SRCS-y += thread.c
+
+# Build using pkg-config variables if possible
+ifneq ($(shell pkg-config --exists libdpdk && echo 0),0)
+$(error "no installation of DPDK found")
+endif
+
+all: shared
+.PHONY: shared static
+shared: build/$(APP)-shared
+	ln -sf $(APP)-shared build/$(APP)
+static: build/$(APP)-static
+	ln -sf $(APP)-static build/$(APP)
+
+PKGCONF ?= pkg-config
+
+PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
+CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
+LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
+LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
+
+CFLAGS += -I. -DALLOW_EXPERIMENTAL_API -D_GNU_SOURCE
+
+OBJS := $(patsubst %.c,build/%.o,$(SRCS-y))
+
+build/%.o: %.c Makefile $(PC_FILE) | build
+	$(CC) $(CFLAGS) -c $< -o $@
+
+build/$(APP)-shared: $(OBJS)
+	$(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
+
+build/$(APP)-static: $(OBJS)
+	$(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
+
+build:
+	@mkdir -p $@
+
+.PHONY: clean
+clean:
+	rm -f build/$(APP)* build/*.o
+	test -d build && rmdir -p build || true
diff --git a/examples/pipeline/main.c b/examples/pipeline/main.c
new file mode 100644
index 000000000..d831df15e
--- /dev/null
+++ b/examples/pipeline/main.c
@@ -0,0 +1,50 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <rte_launch.h>
+#include <rte_eal.h>
+
+#include "obj.h"
+#include "thread.h"
+
+int
+main(int argc, char **argv)
+{
+	struct obj *obj;
+	int status;
+
+	/* EAL */
+	status = rte_eal_init(argc, argv);
+	if (status < 0) {
+		printf("Error: EAL initialization failed (%d)\n", status);
+		return status;
+	};
+
+	/* Obj */
+	obj = obj_init();
+	if (!obj) {
+		printf("Error: Obj initialization failed (%d)\n", status);
+		return status;
+	}
+
+	/* Thread */
+	status = thread_init();
+	if (status) {
+		printf("Error: Thread initialization failed (%d)\n", status);
+		return status;
+	}
+
+	rte_eal_mp_remote_launch(
+		thread_main,
+		NULL,
+		SKIP_MASTER);
+
+	return 0;
+}
diff --git a/examples/pipeline/meson.build b/examples/pipeline/meson.build
new file mode 100644
index 000000000..1ebef3a9d
--- /dev/null
+++ b/examples/pipeline/meson.build
@@ -0,0 +1,16 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2020 Intel Corporation
+
+# meson file, for building this example as part of a main DPDK build.
+#
+# To build this example as a standalone application with an already-installed
+# DPDK instance, use 'make'
+
+build = cc.has_header('sys/epoll.h')
+deps += ['pipeline', 'bus_pci']
+allow_experimental_apis = true
+sources = files(
+	'main.c',
+	'obj.c',
+	'thread.c',
+)
diff --git a/examples/pipeline/obj.c b/examples/pipeline/obj.c
new file mode 100644
index 000000000..84bbcf2b2
--- /dev/null
+++ b/examples/pipeline/obj.c
@@ -0,0 +1,470 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ethdev.h>
+#include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_source_sink.h>
+#include <rte_swx_table_em.h>
+#include <rte_swx_pipeline.h>
+#include <rte_swx_ctl.h>
+
+#include "obj.h"
+
+/*
+ * mempool
+ */
+TAILQ_HEAD(mempool_list, mempool);
+
+/*
+ * link
+ */
+TAILQ_HEAD(link_list, link);
+
+/*
+ * pipeline
+ */
+TAILQ_HEAD(pipeline_list, pipeline);
+
+/*
+ * obj
+ */
+struct obj {
+	struct mempool_list mempool_list;
+	struct link_list link_list;
+	struct pipeline_list pipeline_list;
+};
+
+/*
+ * mempool
+ */
+#define BUFFER_SIZE_MIN (sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+struct mempool *
+mempool_create(struct obj *obj, const char *name, struct mempool_params *params)
+{
+	struct mempool *mempool;
+	struct rte_mempool *m;
+
+	/* Check input params */
+	if ((name == NULL) ||
+		mempool_find(obj, name) ||
+		(params == NULL) ||
+		(params->buffer_size < BUFFER_SIZE_MIN) ||
+		(params->pool_size == 0))
+		return NULL;
+
+	/* Resource create */
+	m = rte_pktmbuf_pool_create(
+		name,
+		params->pool_size,
+		params->cache_size,
+		0,
+		params->buffer_size - sizeof(struct rte_mbuf),
+		params->cpu_id);
+
+	if (m == NULL)
+		return NULL;
+
+	/* Node allocation */
+	mempool = calloc(1, sizeof(struct mempool));
+	if (mempool == NULL) {
+		rte_mempool_free(m);
+		return NULL;
+	}
+
+	/* Node fill in */
+	strlcpy(mempool->name, name, sizeof(mempool->name));
+	mempool->m = m;
+	mempool->buffer_size = params->buffer_size;
+
+	/* Node add to list */
+	TAILQ_INSERT_TAIL(&obj->mempool_list, mempool, node);
+
+	return mempool;
+}
+
+struct mempool *
+mempool_find(struct obj *obj, const char *name)
+{
+	struct mempool *mempool;
+
+	if (!obj || !name)
+		return NULL;
+
+	TAILQ_FOREACH(mempool, &obj->mempool_list, node)
+		if (strcmp(mempool->name, name) == 0)
+			return mempool;
+
+	return NULL;
+}
+
+/*
+ * link
+ */
+static struct rte_eth_conf port_conf_default = {
+	.link_speeds = 0,
+	.rxmode = {
+		.mq_mode = ETH_MQ_RX_NONE,
+		.max_rx_pkt_len = 9000, /* Jumbo frame max packet len */
+		.split_hdr_size = 0, /* Header split buffer size */
+	},
+	.rx_adv_conf = {
+		.rss_conf = {
+			.rss_key = NULL,
+			.rss_key_len = 40,
+			.rss_hf = 0,
+		},
+	},
+	.txmode = {
+		.mq_mode = ETH_MQ_TX_NONE,
+	},
+	.lpbk_mode = 0,
+};
+
+#define RETA_CONF_SIZE     (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE)
+
+static int
+rss_setup(uint16_t port_id,
+	uint16_t reta_size,
+	struct link_params_rss *rss)
+{
+	struct rte_eth_rss_reta_entry64 reta_conf[RETA_CONF_SIZE];
+	uint32_t i;
+	int status;
+
+	/* RETA setting */
+	memset(reta_conf, 0, sizeof(reta_conf));
+
+	for (i = 0; i < reta_size; i++)
+		reta_conf[i / RTE_RETA_GROUP_SIZE].mask = UINT64_MAX;
+
+	for (i = 0; i < reta_size; i++) {
+		uint32_t reta_id = i / RTE_RETA_GROUP_SIZE;
+		uint32_t reta_pos = i % RTE_RETA_GROUP_SIZE;
+		uint32_t rss_qs_pos = i % rss->n_queues;
+
+		reta_conf[reta_id].reta[reta_pos] =
+			(uint16_t) rss->queue_id[rss_qs_pos];
+	}
+
+	/* RETA update */
+	status = rte_eth_dev_rss_reta_update(port_id,
+		reta_conf,
+		reta_size);
+
+	return status;
+}
+
+struct link *
+link_create(struct obj *obj, const char *name, struct link_params *params)
+{
+	struct rte_eth_dev_info port_info;
+	struct rte_eth_conf port_conf;
+	struct link *link;
+	struct link_params_rss *rss;
+	struct mempool *mempool;
+	uint32_t cpu_id, i;
+	int status;
+	uint16_t port_id;
+
+	/* Check input params */
+	if ((name == NULL) ||
+		link_find(obj, name) ||
+		(params == NULL) ||
+		(params->rx.n_queues == 0) ||
+		(params->rx.queue_size == 0) ||
+		(params->tx.n_queues == 0) ||
+		(params->tx.queue_size == 0))
+		return NULL;
+
+	port_id = params->port_id;
+	if (params->dev_name) {
+		status = rte_eth_dev_get_port_by_name(params->dev_name,
+			&port_id);
+
+		if (status)
+			return NULL;
+	} else
+		if (!rte_eth_dev_is_valid_port(port_id))
+			return NULL;
+
+	if (rte_eth_dev_info_get(port_id, &port_info) != 0)
+		return NULL;
+
+	mempool = mempool_find(obj, params->rx.mempool_name);
+	if (mempool == NULL)
+		return NULL;
+
+	rss = params->rx.rss;
+	if (rss) {
+		if ((port_info.reta_size == 0) ||
+			(port_info.reta_size > ETH_RSS_RETA_SIZE_512))
+			return NULL;
+
+		if ((rss->n_queues == 0) ||
+			(rss->n_queues >= LINK_RXQ_RSS_MAX))
+			return NULL;
+
+		for (i = 0; i < rss->n_queues; i++)
+			if (rss->queue_id[i] >= port_info.max_rx_queues)
+				return NULL;
+	}
+
+	/**
+	 * Resource create
+	 */
+	/* Port */
+	memcpy(&port_conf, &port_conf_default, sizeof(port_conf));
+	if (rss) {
+		port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
+		port_conf.rx_adv_conf.rss_conf.rss_hf =
+			(ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP) &
+			port_info.flow_type_rss_offloads;
+	}
+
+	cpu_id = (uint32_t) rte_eth_dev_socket_id(port_id);
+	if (cpu_id == (uint32_t) SOCKET_ID_ANY)
+		cpu_id = 0;
+
+	status = rte_eth_dev_configure(
+		port_id,
+		params->rx.n_queues,
+		params->tx.n_queues,
+		&port_conf);
+
+	if (status < 0)
+		return NULL;
+
+	if (params->promiscuous) {
+		status = rte_eth_promiscuous_enable(port_id);
+		if (status != 0)
+			return NULL;
+	}
+
+	/* Port RX */
+	for (i = 0; i < params->rx.n_queues; i++) {
+		status = rte_eth_rx_queue_setup(
+			port_id,
+			i,
+			params->rx.queue_size,
+			cpu_id,
+			NULL,
+			mempool->m);
+
+		if (status < 0)
+			return NULL;
+	}
+
+	/* Port TX */
+	for (i = 0; i < params->tx.n_queues; i++) {
+		status = rte_eth_tx_queue_setup(
+			port_id,
+			i,
+			params->tx.queue_size,
+			cpu_id,
+			NULL);
+
+		if (status < 0)
+			return NULL;
+	}
+
+	/* Port start */
+	status = rte_eth_dev_start(port_id);
+	if (status < 0)
+		return NULL;
+
+	if (rss) {
+		status = rss_setup(port_id, port_info.reta_size, rss);
+
+		if (status) {
+			rte_eth_dev_stop(port_id);
+			return NULL;
+		}
+	}
+
+	/* Port link up */
+	status = rte_eth_dev_set_link_up(port_id);
+	if ((status < 0) && (status != -ENOTSUP)) {
+		rte_eth_dev_stop(port_id);
+		return NULL;
+	}
+
+	/* Node allocation */
+	link = calloc(1, sizeof(struct link));
+	if (link == NULL) {
+		rte_eth_dev_stop(port_id);
+		return NULL;
+	}
+
+	/* Node fill in */
+	strlcpy(link->name, name, sizeof(link->name));
+	link->port_id = port_id;
+	rte_eth_dev_get_name_by_port(port_id, link->dev_name);
+	link->n_rxq = params->rx.n_queues;
+	link->n_txq = params->tx.n_queues;
+
+	/* Node add to list */
+	TAILQ_INSERT_TAIL(&obj->link_list, link, node);
+
+	return link;
+}
+
+int
+link_is_up(struct obj *obj, const char *name)
+{
+	struct rte_eth_link link_params;
+	struct link *link;
+
+	/* Check input params */
+	if (!obj || !name)
+		return 0;
+
+	link = link_find(obj, name);
+	if (link == NULL)
+		return 0;
+
+	/* Resource */
+	if (rte_eth_link_get(link->port_id, &link_params) < 0)
+		return 0;
+
+	return (link_params.link_status == ETH_LINK_DOWN) ? 0 : 1;
+}
+
+struct link *
+link_find(struct obj *obj, const char *name)
+{
+	struct link *link;
+
+	if (!obj || !name)
+		return NULL;
+
+	TAILQ_FOREACH(link, &obj->link_list, node)
+		if (strcmp(link->name, name) == 0)
+			return link;
+
+	return NULL;
+}
+
+struct link *
+link_next(struct obj *obj, struct link *link)
+{
+	return (link == NULL) ?
+		TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node);
+}
+
+/*
+ * pipeline
+ */
+#ifndef PIPELINE_MSGQ_SIZE
+#define PIPELINE_MSGQ_SIZE                                 64
+#endif
+
+struct pipeline *
+pipeline_create(struct obj *obj, const char *name, int numa_node)
+{
+	struct pipeline *pipeline;
+	struct rte_swx_pipeline *p = NULL;
+	int status;
+
+	/* Check input params */
+	if ((name == NULL) ||
+		pipeline_find(obj, name))
+		return NULL;
+
+	/* Resource create */
+	status = rte_swx_pipeline_config(&p, numa_node);
+	if (status)
+		goto error;
+
+	status = rte_swx_pipeline_port_in_type_register(p,
+		"ethdev",
+		&rte_swx_port_ethdev_reader_ops);
+	if (status)
+		goto error;
+
+	status = rte_swx_pipeline_port_out_type_register(p,
+		"ethdev",
+		&rte_swx_port_ethdev_writer_ops);
+	if (status)
+		goto error;
+
+#ifdef RTE_PORT_PCAP
+	status = rte_swx_pipeline_port_in_type_register(p,
+		"source",
+		&rte_swx_port_source_ops);
+	if (status)
+		goto error;
+#endif
+
+	status = rte_swx_pipeline_port_out_type_register(p,
+		"sink",
+		&rte_swx_port_sink_ops);
+	if (status)
+		goto error;
+
+	status = rte_swx_pipeline_table_type_register(p,
+		"exact",
+		RTE_SWX_TABLE_MATCH_EXACT,
+		&rte_swx_table_exact_match_ops);
+	if (status)
+		goto error;
+
+	/* Node allocation */
+	pipeline = calloc(1, sizeof(struct pipeline));
+	if (pipeline == NULL)
+		goto error;
+
+	/* Node fill in */
+	strlcpy(pipeline->name, name, sizeof(pipeline->name));
+	pipeline->p = p;
+	pipeline->timer_period_ms = 10;
+
+	/* Node add to list */
+	TAILQ_INSERT_TAIL(&obj->pipeline_list, pipeline, node);
+
+	return pipeline;
+
+error:
+	rte_swx_pipeline_free(p);
+	return NULL;
+}
+
+struct pipeline *
+pipeline_find(struct obj *obj, const char *name)
+{
+	struct pipeline *pipeline;
+
+	if (!obj || !name)
+		return NULL;
+
+	TAILQ_FOREACH(pipeline, &obj->pipeline_list, node)
+		if (strcmp(name, pipeline->name) == 0)
+			return pipeline;
+
+	return NULL;
+}
+
+/*
+ * obj
+ */
+struct obj *
+obj_init(void)
+{
+	struct obj *obj;
+
+	obj = calloc(1, sizeof(struct obj));
+	if (!obj)
+		return NULL;
+
+	TAILQ_INIT(&obj->mempool_list);
+	TAILQ_INIT(&obj->link_list);
+	TAILQ_INIT(&obj->pipeline_list);
+
+	return obj;
+}
diff --git a/examples/pipeline/obj.h b/examples/pipeline/obj.h
new file mode 100644
index 000000000..e6351fd27
--- /dev/null
+++ b/examples/pipeline/obj.h
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _INCLUDE_OBJ_H_
+#define _INCLUDE_OBJ_H_
+
+#include <stdint.h>
+#include <sys/queue.h>
+
+#include <rte_mempool.h>
+#include <rte_swx_pipeline.h>
+#include <rte_swx_ctl.h>
+
+#ifndef NAME_SIZE
+#define NAME_SIZE 64
+#endif
+
+/*
+ * obj
+ */
+struct obj;
+
+struct obj *
+obj_init(void);
+
+/*
+ * mempool
+ */
+struct mempool_params {
+	uint32_t buffer_size;
+	uint32_t pool_size;
+	uint32_t cache_size;
+	uint32_t cpu_id;
+};
+
+struct mempool {
+	TAILQ_ENTRY(mempool) node;
+	char name[NAME_SIZE];
+	struct rte_mempool *m;
+	uint32_t buffer_size;
+};
+
+struct mempool *
+mempool_create(struct obj *obj,
+	       const char *name,
+	       struct mempool_params *params);
+
+struct mempool *
+mempool_find(struct obj *obj,
+	     const char *name);
+
+/*
+ * link
+ */
+#ifndef LINK_RXQ_RSS_MAX
+#define LINK_RXQ_RSS_MAX                                   16
+#endif
+
+struct link_params_rss {
+	uint32_t queue_id[LINK_RXQ_RSS_MAX];
+	uint32_t n_queues;
+};
+
+struct link_params {
+	const char *dev_name;
+	uint16_t port_id; /**< Valid only when *dev_name* is NULL. */
+
+	struct {
+		uint32_t n_queues;
+		uint32_t queue_size;
+		const char *mempool_name;
+		struct link_params_rss *rss;
+	} rx;
+
+	struct {
+		uint32_t n_queues;
+		uint32_t queue_size;
+	} tx;
+
+	int promiscuous;
+};
+
+struct link {
+	TAILQ_ENTRY(link) node;
+	char name[NAME_SIZE];
+	char dev_name[NAME_SIZE];
+	uint16_t port_id;
+	uint32_t n_rxq;
+	uint32_t n_txq;
+};
+
+struct link *
+link_create(struct obj *obj,
+	    const char *name,
+	    struct link_params *params);
+
+int
+link_is_up(struct obj *obj, const char *name);
+
+struct link *
+link_find(struct obj *obj, const char *name);
+
+struct link *
+link_next(struct obj *obj, struct link *link);
+
+/*
+ * pipeline
+ */
+struct pipeline {
+	TAILQ_ENTRY(pipeline) node;
+	char name[NAME_SIZE];
+
+	struct rte_swx_pipeline *p;
+	struct rte_swx_ctl_pipeline *ctl;
+
+	uint32_t timer_period_ms;
+	int enabled;
+	uint32_t thread_id;
+	uint32_t cpu_id;
+};
+
+struct pipeline *
+pipeline_create(struct obj *obj,
+		const char *name,
+		int numa_node);
+
+struct pipeline *
+pipeline_find(struct obj *obj, const char *name);
+
+#endif /* _INCLUDE_OBJ_H_ */
diff --git a/examples/pipeline/thread.c b/examples/pipeline/thread.c
new file mode 100644
index 000000000..7ff22e97f
--- /dev/null
+++ b/examples/pipeline/thread.c
@@ -0,0 +1,549 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdlib.h>
+
+#include <rte_common.h>
+#include <rte_cycles.h>
+#include <rte_lcore.h>
+#include <rte_ring.h>
+
+#include <rte_table_acl.h>
+#include <rte_table_array.h>
+#include <rte_table_hash.h>
+#include <rte_table_lpm.h>
+#include <rte_table_lpm_ipv6.h>
+
+#include "obj.h"
+#include "thread.h"
+
+#ifndef THREAD_PIPELINES_MAX
+#define THREAD_PIPELINES_MAX                               256
+#endif
+
+#ifndef THREAD_MSGQ_SIZE
+#define THREAD_MSGQ_SIZE                                   64
+#endif
+
+#ifndef THREAD_TIMER_PERIOD_MS
+#define THREAD_TIMER_PERIOD_MS                             100
+#endif
+
+/**
+ * Control thread: data plane thread context
+ */
+struct thread {
+	struct rte_ring *msgq_req;
+	struct rte_ring *msgq_rsp;
+
+	uint32_t enabled;
+};
+
+static struct thread thread[RTE_MAX_LCORE];
+
+/**
+ * Data plane threads: context
+ */
+struct pipeline_data {
+	struct rte_swx_pipeline *p;
+	uint64_t timer_period; /* Measured in CPU cycles. */
+	uint64_t time_next;
+};
+
+struct thread_data {
+	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
+	uint32_t n_pipelines;
+
+	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
+	struct rte_ring *msgq_req;
+	struct rte_ring *msgq_rsp;
+	uint64_t timer_period; /* Measured in CPU cycles. */
+	uint64_t time_next;
+	uint64_t time_next_min;
+} __rte_cache_aligned;
+
+static struct thread_data thread_data[RTE_MAX_LCORE];
+
+/**
+ * Control thread: data plane thread init
+ */
+static void
+thread_free(void)
+{
+	uint32_t i;
+
+	for (i = 0; i < RTE_MAX_LCORE; i++) {
+		struct thread *t = &thread[i];
+
+		if (!rte_lcore_is_enabled(i))
+			continue;
+
+		/* MSGQs */
+		if (t->msgq_req)
+			rte_ring_free(t->msgq_req);
+
+		if (t->msgq_rsp)
+			rte_ring_free(t->msgq_rsp);
+	}
+}
+
+int
+thread_init(void)
+{
+	uint32_t i;
+
+	RTE_LCORE_FOREACH_SLAVE(i) {
+		char name[NAME_MAX];
+		struct rte_ring *msgq_req, *msgq_rsp;
+		struct thread *t = &thread[i];
+		struct thread_data *t_data = &thread_data[i];
+		uint32_t cpu_id = rte_lcore_to_socket_id(i);
+
+		/* MSGQs */
+		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
+
+		msgq_req = rte_ring_create(name,
+			THREAD_MSGQ_SIZE,
+			cpu_id,
+			RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+		if (msgq_req == NULL) {
+			thread_free();
+			return -1;
+		}
+
+		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
+
+		msgq_rsp = rte_ring_create(name,
+			THREAD_MSGQ_SIZE,
+			cpu_id,
+			RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+		if (msgq_rsp == NULL) {
+			thread_free();
+			return -1;
+		}
+
+		/* Control thread records */
+		t->msgq_req = msgq_req;
+		t->msgq_rsp = msgq_rsp;
+		t->enabled = 1;
+
+		/* Data plane thread records */
+		t_data->n_pipelines = 0;
+		t_data->msgq_req = msgq_req;
+		t_data->msgq_rsp = msgq_rsp;
+		t_data->timer_period =
+			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
+		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
+		t_data->time_next_min = t_data->time_next;
+	}
+
+	return 0;
+}
+
+static inline int
+thread_is_running(uint32_t thread_id)
+{
+	enum rte_lcore_state_t thread_state;
+
+	thread_state = rte_eal_get_lcore_state(thread_id);
+	return (thread_state == RUNNING) ? 1 : 0;
+}
+
+/**
+ * Control thread & data plane threads: message passing
+ */
+enum thread_req_type {
+	THREAD_REQ_PIPELINE_ENABLE = 0,
+	THREAD_REQ_PIPELINE_DISABLE,
+	THREAD_REQ_MAX
+};
+
+struct thread_msg_req {
+	enum thread_req_type type;
+
+	union {
+		struct {
+			struct rte_swx_pipeline *p;
+			uint32_t timer_period_ms;
+		} pipeline_enable;
+
+		struct {
+			struct rte_swx_pipeline *p;
+		} pipeline_disable;
+	};
+};
+
+struct thread_msg_rsp {
+	int status;
+};
+
+/**
+ * Control thread
+ */
+static struct thread_msg_req *
+thread_msg_alloc(void)
+{
+	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
+		sizeof(struct thread_msg_rsp));
+
+	return calloc(1, size);
+}
+
+static void
+thread_msg_free(struct thread_msg_rsp *rsp)
+{
+	free(rsp);
+}
+
+static struct thread_msg_rsp *
+thread_msg_send_recv(uint32_t thread_id,
+	struct thread_msg_req *req)
+{
+	struct thread *t = &thread[thread_id];
+	struct rte_ring *msgq_req = t->msgq_req;
+	struct rte_ring *msgq_rsp = t->msgq_rsp;
+	struct thread_msg_rsp *rsp;
+	int status;
+
+	/* send */
+	do {
+		status = rte_ring_sp_enqueue(msgq_req, req);
+	} while (status == -ENOBUFS);
+
+	/* recv */
+	do {
+		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
+	} while (status != 0);
+
+	return rsp;
+}
+
+int
+thread_pipeline_enable(uint32_t thread_id,
+	struct obj *obj,
+	const char *pipeline_name)
+{
+	struct pipeline *p = pipeline_find(obj, pipeline_name);
+	struct thread *t;
+	struct thread_msg_req *req;
+	struct thread_msg_rsp *rsp;
+	int status;
+
+	/* Check input params */
+	if ((thread_id >= RTE_MAX_LCORE) ||
+		(p == NULL))
+		return -1;
+
+	t = &thread[thread_id];
+	if (t->enabled == 0)
+		return -1;
+
+	if (!thread_is_running(thread_id)) {
+		struct thread_data *td = &thread_data[thread_id];
+		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
+
+		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
+			return -1;
+
+		/* Data plane thread */
+		td->p[td->n_pipelines] = p->p;
+
+		tdp->p = p->p;
+		tdp->timer_period =
+			(rte_get_tsc_hz() * p->timer_period_ms) / 1000;
+		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
+
+		td->n_pipelines++;
+
+		/* Pipeline */
+		p->thread_id = thread_id;
+		p->enabled = 1;
+
+		return 0;
+	}
+
+	/* Allocate request */
+	req = thread_msg_alloc();
+	if (req == NULL)
+		return -1;
+
+	/* Write request */
+	req->type = THREAD_REQ_PIPELINE_ENABLE;
+	req->pipeline_enable.p = p->p;
+	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
+
+	/* Send request and wait for response */
+	rsp = thread_msg_send_recv(thread_id, req);
+
+	/* Read response */
+	status = rsp->status;
+
+	/* Free response */
+	thread_msg_free(rsp);
+
+	/* Request completion */
+	if (status)
+		return status;
+
+	p->thread_id = thread_id;
+	p->enabled = 1;
+
+	return 0;
+}
+
+int
+thread_pipeline_disable(uint32_t thread_id,
+	struct obj *obj,
+	const char *pipeline_name)
+{
+	struct pipeline *p = pipeline_find(obj, pipeline_name);
+	struct thread *t;
+	struct thread_msg_req *req;
+	struct thread_msg_rsp *rsp;
+	int status;
+
+	/* Check input params */
+	if ((thread_id >= RTE_MAX_LCORE) ||
+		(p == NULL))
+		return -1;
+
+	t = &thread[thread_id];
+	if (t->enabled == 0)
+		return -1;
+
+	if (p->enabled == 0)
+		return 0;
+
+	if (p->thread_id != thread_id)
+		return -1;
+
+	if (!thread_is_running(thread_id)) {
+		struct thread_data *td = &thread_data[thread_id];
+		uint32_t i;
+
+		for (i = 0; i < td->n_pipelines; i++) {
+			struct pipeline_data *tdp = &td->pipeline_data[i];
+
+			if (tdp->p != p->p)
+				continue;
+
+			/* Data plane thread */
+			if (i < td->n_pipelines - 1) {
+				struct rte_swx_pipeline *pipeline_last =
+					td->p[td->n_pipelines - 1];
+				struct pipeline_data *tdp_last =
+					&td->pipeline_data[td->n_pipelines - 1];
+
+				td->p[i] = pipeline_last;
+				memcpy(tdp, tdp_last, sizeof(*tdp));
+			}
+
+			td->n_pipelines--;
+
+			/* Pipeline */
+			p->enabled = 0;
+
+			break;
+		}
+
+		return 0;
+	}
+
+	/* Allocate request */
+	req = thread_msg_alloc();
+	if (req == NULL)
+		return -1;
+
+	/* Write request */
+	req->type = THREAD_REQ_PIPELINE_DISABLE;
+	req->pipeline_disable.p = p->p;
+
+	/* Send request and wait for response */
+	rsp = thread_msg_send_recv(thread_id, req);
+
+	/* Read response */
+	status = rsp->status;
+
+	/* Free response */
+	thread_msg_free(rsp);
+
+	/* Request completion */
+	if (status)
+		return status;
+
+	p->enabled = 0;
+
+	return 0;
+}
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct thread_msg_req *
+thread_msg_recv(struct rte_ring *msgq_req)
+{
+	struct thread_msg_req *req;
+
+	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+	if (status != 0)
+		return NULL;
+
+	return req;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *msgq_rsp,
+	struct thread_msg_rsp *rsp)
+{
+	int status;
+
+	do {
+		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+	} while (status == -ENOBUFS);
+}
+
+static struct thread_msg_rsp *
+thread_msg_handle_pipeline_enable(struct thread_data *t,
+	struct thread_msg_req *req)
+{
+	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
+	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
+
+	/* Request */
+	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
+		rsp->status = -1;
+		return rsp;
+	}
+
+	t->p[t->n_pipelines] = req->pipeline_enable.p;
+
+	p->p = req->pipeline_enable.p;
+	p->timer_period = (rte_get_tsc_hz() *
+		req->pipeline_enable.timer_period_ms) / 1000;
+	p->time_next = rte_get_tsc_cycles() + p->timer_period;
+
+	t->n_pipelines++;
+
+	/* Response */
+	rsp->status = 0;
+	return rsp;
+}
+
+static struct thread_msg_rsp *
+thread_msg_handle_pipeline_disable(struct thread_data *t,
+	struct thread_msg_req *req)
+{
+	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
+	uint32_t n_pipelines = t->n_pipelines;
+	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
+	uint32_t i;
+
+	/* find pipeline */
+	for (i = 0; i < n_pipelines; i++) {
+		struct pipeline_data *p = &t->pipeline_data[i];
+
+		if (p->p != pipeline)
+			continue;
+
+		if (i < n_pipelines - 1) {
+			struct rte_swx_pipeline *pipeline_last =
+				t->p[n_pipelines - 1];
+			struct pipeline_data *p_last =
+				&t->pipeline_data[n_pipelines - 1];
+
+			t->p[i] = pipeline_last;
+			memcpy(p, p_last, sizeof(*p));
+		}
+
+		t->n_pipelines--;
+
+		rsp->status = 0;
+		return rsp;
+	}
+
+	/* should not get here */
+	rsp->status = 0;
+	return rsp;
+}
+
+static void
+thread_msg_handle(struct thread_data *t)
+{
+	for ( ; ; ) {
+		struct thread_msg_req *req;
+		struct thread_msg_rsp *rsp;
+
+		req = thread_msg_recv(t->msgq_req);
+		if (req == NULL)
+			break;
+
+		switch (req->type) {
+		case THREAD_REQ_PIPELINE_ENABLE:
+			rsp = thread_msg_handle_pipeline_enable(t, req);
+			break;
+
+		case THREAD_REQ_PIPELINE_DISABLE:
+			rsp = thread_msg_handle_pipeline_disable(t, req);
+			break;
+
+		default:
+			rsp = (struct thread_msg_rsp *) req;
+			rsp->status = -1;
+		}
+
+		thread_msg_send(t->msgq_rsp, rsp);
+	}
+}
+
+/**
+ * Data plane threads: main
+ */
+int
+thread_main(void *arg __rte_unused)
+{
+	struct thread_data *t;
+	uint32_t thread_id, i;
+
+	thread_id = rte_lcore_id();
+	t = &thread_data[thread_id];
+
+	/* Dispatch loop */
+	for (i = 0; ; i++) {
+		uint32_t j;
+
+		/* Data Plane */
+		for (j = 0; j < t->n_pipelines; j++)
+			rte_swx_pipeline_run(t->p[j], 1000000);
+
+		/* Control Plane */
+		if ((i & 0xF) == 0) {
+			uint64_t time = rte_get_tsc_cycles();
+			uint64_t time_next_min = UINT64_MAX;
+
+			if (time < t->time_next_min)
+				continue;
+
+			/* Thread message queues */
+			{
+				uint64_t time_next = t->time_next;
+
+				if (time_next <= time) {
+					thread_msg_handle(t);
+					time_next = time + t->timer_period;
+					t->time_next = time_next;
+				}
+
+				if (time_next < time_next_min)
+					time_next_min = time_next;
+			}
+
+			t->time_next_min = time_next_min;
+		}
+	}
+
+	return 0;
+}
diff --git a/examples/pipeline/thread.h b/examples/pipeline/thread.h
new file mode 100644
index 000000000..d9d8645d4
--- /dev/null
+++ b/examples/pipeline/thread.h
@@ -0,0 +1,28 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _INCLUDE_THREAD_H_
+#define _INCLUDE_THREAD_H_
+
+#include <stdint.h>
+
+#include "obj.h"
+
+int
+thread_pipeline_enable(uint32_t thread_id,
+	struct obj *obj,
+	const char *pipeline_name);
+
+int
+thread_pipeline_disable(uint32_t thread_id,
+	struct obj *obj,
+	const char *pipeline_name);
+
+int
+thread_init(void);
+
+int
+thread_main(void *arg);
+
+#endif /* _INCLUDE_THREAD_H_ */
-- 
2.17.1



More information about the dev mailing list