[dpdk-dev] [PATCH] port: add ring SWX port

Cristian Dumitrescu cristian.dumitrescu at intel.com
Thu Jan 28 19:55:12 CET 2021


Add the ring input/output port type for the SWX pipeline.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu at intel.com>
---
 doc/api/doxy-api-index.md           |   1 +
 examples/pipeline/cli.c             | 131 ++++++++++++
 examples/pipeline/obj.c             |  76 +++++++
 examples/pipeline/obj.h             |  21 ++
 lib/librte_port/meson.build         |   8 +-
 lib/librte_port/rte_swx_port_ring.c | 317 ++++++++++++++++++++++++++++
 lib/librte_port/rte_swx_port_ring.h |  52 +++++
 lib/librte_port/version.map         |   4 +
 8 files changed, 608 insertions(+), 2 deletions(-)
 create mode 100644 lib/librte_port/rte_swx_port_ring.c
 create mode 100644 lib/librte_port/rte_swx_port_ring.h

diff --git a/doc/api/doxy-api-index.md b/doc/api/doxy-api-index.md
index 748514e24..224803859 100644
--- a/doc/api/doxy-api-index.md
+++ b/doc/api/doxy-api-index.md
@@ -183,6 +183,7 @@ The public API headers are grouped by topics:
   * SWX port:
     [port]             (@ref rte_swx_port.h),
     [ethdev]           (@ref rte_swx_port_ethdev.h),
+    [ring]             (@ref rte_swx_port_ring.h),
     [src/sink]         (@ref rte_swx_port_source_sink.h)
   * SWX table:
     [table]            (@ref rte_swx_table.h),
diff --git a/examples/pipeline/cli.c b/examples/pipeline/cli.c
index d0150cfcf..3f490854c 100644
--- a/examples/pipeline/cli.c
+++ b/examples/pipeline/cli.c
@@ -10,6 +10,7 @@
 #include <rte_common.h>
 #include <rte_ethdev.h>
 #include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_ring.h>
 #include <rte_swx_port_source_sink.h>
 #include <rte_swx_pipeline.h>
 #include <rte_swx_ctl.h>
@@ -444,6 +445,54 @@ cmd_link_show(char **tokens,
 	}
 }
 
+static const char cmd_ring_help[] =
+"ring <ring_name> size <size> numa <numa_node>\n";
+
+static void
+cmd_ring(char **tokens,
+	uint32_t n_tokens,
+	char *out,
+	size_t out_size,
+	void *obj)
+{
+	struct ring_params p;
+	char *name;
+	struct ring *ring;
+
+	if (n_tokens != 6) {
+		snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
+		return;
+	}
+
+	name = tokens[1];
+
+	if (strcmp(tokens[2], "size") != 0) {
+		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "size");
+		return;
+	}
+
+	if (parser_read_uint32(&p.size, tokens[3]) != 0) {
+		snprintf(out, out_size, MSG_ARG_INVALID, "size");
+		return;
+	}
+
+	if (strcmp(tokens[4], "numa") != 0) {
+		snprintf(out, out_size, MSG_ARG_NOT_FOUND, "numa");
+		return;
+	}
+
+	if (parser_read_uint32(&p.numa_node, tokens[5]) != 0) {
+		snprintf(out, out_size, MSG_ARG_INVALID, "numa_node");
+		return;
+	}
+
+	ring = ring_create(obj, name, &p);
+	if (!ring) {
+		snprintf(out, out_size, MSG_CMD_FAIL, tokens[0]);
+		return;
+	}
+}
+
 static const char cmd_pipeline_create_help[] =
 "pipeline <pipeline_name> create <numa_node>\n";
 
@@ -480,6 +529,7 @@ cmd_pipeline_create(char **tokens,
 static const char cmd_pipeline_port_in_help[] =
 "pipeline <pipeline_name> port in <port_id>\n"
 "   link <link_name> rxq <queue_id> bsz <burst_size>\n"
+"   ring <ring_name> bsz <burst_size>\n"
 "   | source <mempool_name> <file_name>\n";
 
 static void
@@ -567,6 +617,41 @@ cmd_pipeline_port_in(char **tokens,
 			port_id,
 			"ethdev",
 			&params);
+	} else if (strcmp(tokens[t0], "ring") == 0) {
+		struct rte_swx_port_ring_reader_params params;
+		struct ring *ring;
+
+		if (n_tokens < t0 + 4) {
+			snprintf(out, out_size, MSG_ARG_MISMATCH,
+				"pipeline port in ring");
+			return;
+		}
+
+		ring = ring_find(obj, tokens[t0 + 1]);
+		if (!ring) {
+			snprintf(out, out_size, MSG_ARG_INVALID,
+				"ring_name");
+			return;
+		}
+		params.name = ring->name;
+
+		if (strcmp(tokens[t0 + 2], "bsz") != 0) {
+			snprintf(out, out_size, MSG_ARG_NOT_FOUND, "bsz");
+			return;
+		}
+
+		if (parser_read_uint32(&params.burst_size, tokens[t0 + 3])) {
+			snprintf(out, out_size, MSG_ARG_INVALID,
+				"burst_size");
+			return;
+		}
+
+		t0 += 4;
+
+		status = rte_swx_pipeline_port_in_config(p->p,
+			port_id,
+			"ring",
+			&params);
 	} else if (strcmp(tokens[t0], "source") == 0) {
 		struct rte_swx_port_source_params params;
 		struct mempool *mp;
@@ -612,6 +697,7 @@ cmd_pipeline_port_in(char **tokens,
 static const char cmd_pipeline_port_out_help[] =
 "pipeline <pipeline_name> port out <port_id>\n"
 "   link <link_name> txq <txq_id> bsz <burst_size>\n"
+"   ring <ring_name> bsz <burst_size>\n"
 "   | sink <file_name> | none\n";
 
 static void
@@ -699,6 +785,41 @@ cmd_pipeline_port_out(char **tokens,
 			port_id,
 			"ethdev",
 			&params);
+	} else if (strcmp(tokens[t0], "ring") == 0) {
+		struct rte_swx_port_ring_writer_params params;
+		struct ring *ring;
+
+		if (n_tokens < t0 + 4) {
+			snprintf(out, out_size, MSG_ARG_MISMATCH,
+				"pipeline port out link");
+			return;
+		}
+
+		ring = ring_find(obj, tokens[t0 + 1]);
+		if (!ring) {
+			snprintf(out, out_size, MSG_ARG_INVALID,
+				"ring_name");
+			return;
+		}
+		params.name = ring->name;
+
+		if (strcmp(tokens[t0 + 2], "bsz") != 0) {
+			snprintf(out, out_size, MSG_ARG_NOT_FOUND, "bsz");
+			return;
+		}
+
+		if (parser_read_uint32(&params.burst_size, tokens[t0 + 3])) {
+			snprintf(out, out_size, MSG_ARG_INVALID,
+				"burst_size");
+			return;
+		}
+
+		t0 += 4;
+
+		status = rte_swx_pipeline_port_out_config(p->p,
+			port_id,
+			"ring",
+			&params);
 	} else if (strcmp(tokens[t0], "sink") == 0) {
 		struct rte_swx_port_sink_params params;
 
@@ -1203,6 +1324,11 @@ cmd_help(char **tokens,
 		return;
 	}
 
+	if (strcmp(tokens[0], "ring") == 0) {
+		snprintf(out, out_size, "\n%s\n", cmd_ring_help);
+		return;
+	}
+
 	if ((strcmp(tokens[0], "pipeline") == 0) &&
 		(n_tokens == 2) && (strcmp(tokens[1], "create") == 0)) {
 		snprintf(out, out_size, "\n%s\n", cmd_pipeline_create_help);
@@ -1303,6 +1429,11 @@ cli_process(char *in, char *out, size_t out_size, void *obj)
 		return;
 	}
 
+	if (strcmp(tokens[0], "ring") == 0) {
+		cmd_ring(tokens, n_tokens, out, out_size, obj);
+		return;
+	}
+
 	if (strcmp(tokens[0], "pipeline") == 0) {
 		if ((n_tokens >= 3) &&
 			(strcmp(tokens[2], "create") == 0)) {
diff --git a/examples/pipeline/obj.c b/examples/pipeline/obj.c
index 84bbcf2b2..154d832ed 100644
--- a/examples/pipeline/obj.c
+++ b/examples/pipeline/obj.c
@@ -9,6 +9,7 @@
 #include <rte_mbuf.h>
 #include <rte_ethdev.h>
 #include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_ring.h>
 #include <rte_swx_port_source_sink.h>
 #include <rte_swx_table_em.h>
 #include <rte_swx_pipeline.h>
@@ -26,6 +27,11 @@ TAILQ_HEAD(mempool_list, mempool);
  */
 TAILQ_HEAD(link_list, link);
 
+/*
+ * ring
+ */
+TAILQ_HEAD(ring_list, ring);
+
 /*
  * pipeline
  */
@@ -37,6 +43,7 @@ TAILQ_HEAD(pipeline_list, pipeline);
 struct obj {
 	struct mempool_list mempool_list;
 	struct link_list link_list;
+	struct ring_list ring_list;
 	struct pipeline_list pipeline_list;
 };
 
@@ -358,6 +365,62 @@ link_next(struct obj *obj, struct link *link)
 		TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node);
 }
 
+/*
+ * ring
+ */
+struct ring *
+ring_create(struct obj *obj, const char *name, struct ring_params *params)
+{
+	struct ring *ring;
+	struct rte_ring *r;
+	unsigned int flags = RING_F_SP_ENQ | RING_F_SC_DEQ;
+
+	/* Check input params */
+	if (!name || ring_find(obj, name) || !params || !params->size)
+		return NULL;
+
+	/**
+	 * Resource create
+	 */
+	r = rte_ring_create(
+		name,
+		params->size,
+		params->numa_node,
+		flags);
+	if (!r)
+		return NULL;
+
+	/* Node allocation */
+	ring = calloc(1, sizeof(struct ring));
+	if (!ring) {
+		rte_ring_free(r);
+		return NULL;
+	}
+
+	/* Node fill in */
+	strlcpy(ring->name, name, sizeof(ring->name));
+
+	/* Node add to list */
+	TAILQ_INSERT_TAIL(&obj->ring_list, ring, node);
+
+	return ring;
+}
+
+struct ring *
+ring_find(struct obj *obj, const char *name)
+{
+	struct ring *ring;
+
+	if (!obj || !name)
+		return NULL;
+
+	TAILQ_FOREACH(ring, &obj->ring_list, node)
+		if (strcmp(ring->name, name) == 0)
+			return ring;
+
+	return NULL;
+}
+
 /*
  * pipeline
  */
@@ -394,6 +457,18 @@ pipeline_create(struct obj *obj, const char *name, int numa_node)
 	if (status)
 		goto error;
 
+	status = rte_swx_pipeline_port_in_type_register(p,
+		"ring",
+		&rte_swx_port_ring_reader_ops);
+	if (status)
+		goto error;
+
+	status = rte_swx_pipeline_port_out_type_register(p,
+		"ring",
+		&rte_swx_port_ring_writer_ops);
+	if (status)
+		goto error;
+
 #ifdef RTE_PORT_PCAP
 	status = rte_swx_pipeline_port_in_type_register(p,
 		"source",
@@ -464,6 +539,7 @@ obj_init(void)
 
 	TAILQ_INIT(&obj->mempool_list);
 	TAILQ_INIT(&obj->link_list);
+	TAILQ_INIT(&obj->ring_list);
 	TAILQ_INIT(&obj->pipeline_list);
 
 	return obj;
diff --git a/examples/pipeline/obj.h b/examples/pipeline/obj.h
index e6351fd27..1aab2a37b 100644
--- a/examples/pipeline/obj.h
+++ b/examples/pipeline/obj.h
@@ -104,6 +104,27 @@ link_find(struct obj *obj, const char *name);
 struct link *
 link_next(struct obj *obj, struct link *link);
 
+/*
+ * ring
+ */
+struct ring_params {
+	uint32_t size;
+	uint32_t numa_node;
+};
+
+struct ring {
+	TAILQ_ENTRY(ring) node;
+	char name[NAME_SIZE];
+};
+
+struct ring *
+ring_create(struct obj *obj,
+	   const char *name,
+	   struct ring_params *params);
+
+struct ring *
+ring_find(struct obj *obj, const char *name);
+
 /*
  * pipeline
  */
diff --git a/lib/librte_port/meson.build b/lib/librte_port/meson.build
index 609624c29..9fcd62cd8 100644
--- a/lib/librte_port/meson.build
+++ b/lib/librte_port/meson.build
@@ -12,7 +12,9 @@ sources = files(
 	'rte_port_sym_crypto.c',
 	'rte_port_eventdev.c',
 	'rte_swx_port_ethdev.c',
-	'rte_swx_port_source_sink.c',)
+	'rte_swx_port_ring.c',
+	'rte_swx_port_source_sink.c',
+	)
 headers = files(
 	'rte_port_ethdev.h',
 	'rte_port_fd.h',
@@ -26,7 +28,9 @@ headers = files(
 	'rte_port_eventdev.h',
 	'rte_swx_port.h',
 	'rte_swx_port_ethdev.h',
-	'rte_swx_port_source_sink.h',)
+	'rte_swx_port_ring.h',
+	'rte_swx_port_source_sink.h',
+	)
 deps += ['ethdev', 'sched', 'ip_frag', 'cryptodev', 'eventdev']
 
 if dpdk_conf.has('RTE_PORT_PCAP')
diff --git a/lib/librte_port/rte_swx_port_ring.c b/lib/librte_port/rte_swx_port_ring.c
new file mode 100644
index 000000000..4df720d91
--- /dev/null
+++ b/lib/librte_port/rte_swx_port_ring.c
@@ -0,0 +1,317 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Intel Corporation
+ */
+#include <string.h>
+#include <stdint.h>
+
+#include <rte_mbuf.h>
+#include <rte_ring.h>
+#include <rte_hexdump.h>
+
+#include "rte_swx_port_ring.h"
+
+#ifndef TRACE_LEVEL
+#define TRACE_LEVEL 0
+#endif
+
+#if TRACE_LEVEL
+#define TRACE(...) printf(__VA_ARGS__)
+#else
+#define TRACE(...)
+#endif
+
+/*
+ * Reader
+ */
+struct reader {
+	struct {
+		struct rte_ring *ring;
+		char *name;
+		uint32_t burst_size;
+	} params;
+	struct rte_swx_port_in_stats stats;
+	struct rte_mbuf **pkts;
+	int n_pkts;
+	int pos;
+};
+
+static void *
+reader_create(void *args)
+{
+	struct rte_swx_port_ring_reader_params *params = args;
+	struct rte_ring *ring;
+	struct reader *p = NULL;
+
+	/* Check input parameters. */
+	if (!params || !params->name || !params->burst_size)
+		goto error;
+
+	ring = rte_ring_lookup(params->name);
+	if (!ring)
+		goto error;
+
+	/* Memory allocation. */
+	p = calloc(1, sizeof(struct reader));
+	if (!p)
+		goto error;
+
+	p->params.name = strdup(params->name);
+	if (!p->params.name)
+		goto error;
+
+	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
+	if (!p->pkts)
+		goto error;
+
+	/* Initialization. */
+	p->params.ring = ring;
+	p->params.burst_size = params->burst_size;
+
+	return p;
+
+error:
+	if (!p)
+		return NULL;
+
+	free(p->pkts);
+	free(p->params.name);
+	free(p);
+	return NULL;
+}
+
+static int
+reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
+{
+	struct reader *p = port;
+	struct rte_mbuf *m;
+
+	if (p->pos == p->n_pkts) {
+		int n_pkts;
+
+		n_pkts = rte_ring_sc_dequeue_burst(p->params.ring,
+			(void **) p->pkts,
+			p->params.burst_size,
+			NULL);
+		if (!n_pkts) {
+			p->stats.n_empty++;
+			return 0;
+		}
+
+		TRACE("[Ring %s] %d packets in\n",
+		      p->params.name,
+		      n_pkts);
+
+		p->n_pkts = n_pkts;
+		p->pos = 0;
+	}
+
+	m = p->pkts[p->pos++];
+	pkt->handle = m;
+	pkt->pkt = m->buf_addr;
+	pkt->offset = m->data_off;
+	pkt->length = m->pkt_len;
+
+	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
+	      (uint32_t)p->params.name,
+	      p->pos - 1,
+	      pkt->length,
+	      pkt->offset);
+	if (TRACE_LEVEL)
+		rte_hexdump(stdout,
+			    NULL,
+			    &((uint8_t *)m->buf_addr)[m->data_off],
+			    m->data_len);
+
+	p->stats.n_pkts++;
+	p->stats.n_bytes += pkt->length;
+
+	return 1;
+}
+
+static void
+reader_free(void *port)
+{
+	struct reader *p = port;
+	int i;
+
+	if (!p)
+		return;
+
+	for (i = 0; i < p->n_pkts; i++) {
+		struct rte_mbuf *pkt = p->pkts[i];
+
+		rte_pktmbuf_free(pkt);
+	}
+
+	free(p->pkts);
+	free(p->params.name);
+	free(p);
+}
+
+static void
+reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
+{
+	struct reader *p = port;
+
+	if (!stats)
+		return;
+
+	memcpy(stats, &p->stats, sizeof(p->stats));
+}
+
+/*
+ * Writer
+ */
+struct writer {
+	struct {
+		struct rte_ring *ring;
+		char *name;
+		uint32_t burst_size;
+	} params;
+	struct rte_swx_port_out_stats stats;
+
+	struct rte_mbuf **pkts;
+	int n_pkts;
+};
+
+static void *
+writer_create(void *args)
+{
+	struct rte_swx_port_ring_writer_params *params = args;
+	struct rte_ring *ring;
+	struct writer *p = NULL;
+
+	/* Check input parameters. */
+	if (!params || !params->name || !params->burst_size)
+		goto error;
+
+	ring = rte_ring_lookup(params->name);
+	if (!ring)
+		goto error;
+
+	/* Memory allocation. */
+	p = calloc(1, sizeof(struct writer));
+	if (!p)
+		goto error;
+
+	p->params.name = strdup(params->name);
+	if (!p)
+		goto error;
+
+	p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
+	if (!p->pkts)
+		goto error;
+
+	/* Initialization. */
+	p->params.ring = ring;
+	p->params.burst_size = params->burst_size;
+
+	return p;
+
+error:
+	if (!p)
+		return NULL;
+
+	free(p->params.name);
+	free(p->pkts);
+	free(p);
+	return NULL;
+}
+
+static void
+__writer_flush(struct writer *p)
+{
+	int n_pkts;
+
+	for (n_pkts = 0; ; ) {
+		n_pkts += rte_ring_sp_enqueue_burst(p->params.ring,
+						    (void **)p->pkts + n_pkts,
+						    p->n_pkts - n_pkts,
+						    NULL);
+
+		TRACE("[Ring %s] %d packets out\n", p->params.name, n_pkts);
+
+		if (n_pkts == p->n_pkts)
+			break;
+	}
+
+	p->n_pkts = 0;
+}
+
+static void
+writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
+{
+	struct writer *p = port;
+	struct rte_mbuf *m = pkt->handle;
+
+	TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
+	      p->params.name,
+	      p->n_pkts - 1,
+	      pkt->length,
+	      pkt->offset);
+	if (TRACE_LEVEL)
+		rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
+
+	m->pkt_len = pkt->length;
+	m->data_len = (uint16_t)pkt->length;
+	m->data_off = (uint16_t)pkt->offset;
+
+	p->stats.n_pkts++;
+	p->stats.n_bytes += pkt->length;
+
+	p->pkts[p->n_pkts++] = m;
+	if (p->n_pkts ==  (int)p->params.burst_size)
+		__writer_flush(p);
+}
+
+static void
+writer_flush(void *port)
+{
+	struct writer *p = port;
+
+	if (p->n_pkts)
+		__writer_flush(p);
+}
+
+static void
+writer_free(void *port)
+{
+	struct writer *p = port;
+
+	if (!p)
+		return;
+
+	writer_flush(p);
+	free(p->pkts);
+	free(p->params.name);
+	free(port);
+}
+
+static void
+writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
+{
+	struct writer *p = port;
+
+	if (!stats)
+		return;
+
+	memcpy(stats, &p->stats, sizeof(p->stats));
+}
+
+/*
+ * Summary of port operations
+ */
+struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops = {
+	.create = reader_create,
+	.free = reader_free,
+	.pkt_rx = reader_pkt_rx,
+	.stats_read = reader_stats_read,
+};
+
+struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops = {
+	.create = writer_create,
+	.free = writer_free,
+	.pkt_tx = writer_pkt_tx,
+	.flush = writer_flush,
+	.stats_read = writer_stats_read,
+};
diff --git a/lib/librte_port/rte_swx_port_ring.h b/lib/librte_port/rte_swx_port_ring.h
new file mode 100644
index 000000000..d338f205f
--- /dev/null
+++ b/lib/librte_port/rte_swx_port_ring.h
@@ -0,0 +1,52 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Intel Corporation
+ */
+
+#ifndef __INCLUDE_RTE_SWX_PORT_RING_H__
+#define __INCLUDE_RTE_SWX_PORT_RING_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ * RTE SWX Ring Input and Output Ports
+ *
+ ***/
+
+#include <stdint.h>
+
+#include <rte_ring.h>
+
+#include "rte_swx_port.h"
+
+/** Ring input port (reader) creation parameters. */
+struct rte_swx_port_ring_reader_params {
+	/** Name of valid RTE ring. */
+	const char *name;
+
+	/** Read burst size. */
+	uint32_t burst_size;
+};
+
+/** Ring_reader operations. */
+extern struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops;
+
+/** Ring output port (writer) creation parameters. */
+struct rte_swx_port_ring_writer_params {
+	/** Name of valid RTE ring. */
+	const char *name;
+
+	/** Read burst size. */
+	uint32_t burst_size;
+};
+
+/** Ring writer operations. */
+extern struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_port/version.map b/lib/librte_port/version.map
index 0418821d9..d8be68c4f 100644
--- a/lib/librte_port/version.map
+++ b/lib/librte_port/version.map
@@ -44,4 +44,8 @@ EXPERIMENTAL {
 	rte_swx_port_ethdev_writer_ops;
 	rte_swx_port_sink_ops;
 	rte_swx_port_source_ops;
+
+	#added in 21.02
+	rte_swx_port_ring_reader_ops;
+	rte_swx_port_ring_writer_ops;
 };
-- 
2.17.1



More information about the dev mailing list