[dpdk-dev] [PATCH v6 09/42] pipeline: add SWX Rx and extract instructions

Cristian Dumitrescu cristian.dumitrescu at intel.com
Wed Sep 30 08:33:43 CEST 2020


Add packet reception and header extraction instructions. The Rx must
be the first pipeline instruction. Each extracted header is logically
removed from the packet, then it can be read/written by instructions,
emitted into the outgoing packet or discarded.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu at intel.com>
---
 lib/librte_pipeline/rte_pipeline_version.map |   1 +
 lib/librte_pipeline/rte_swx_pipeline.c       | 564 ++++++++++++++++++-
 lib/librte_pipeline/rte_swx_pipeline.h       |  13 +
 3 files changed, 574 insertions(+), 4 deletions(-)

diff --git a/lib/librte_pipeline/rte_pipeline_version.map b/lib/librte_pipeline/rte_pipeline_version.map
index 7139df0d3..793957eb9 100644
--- a/lib/librte_pipeline/rte_pipeline_version.map
+++ b/lib/librte_pipeline/rte_pipeline_version.map
@@ -73,6 +73,7 @@ EXPERIMENTAL {
 	rte_swx_pipeline_instructions_config;
 	rte_swx_pipeline_build;
 	rte_swx_pipeline_free;
+	rte_swx_pipeline_run;
 	rte_swx_pipeline_table_state_get;
 	rte_swx_pipeline_table_state_set;
 };
diff --git a/lib/librte_pipeline/rte_swx_pipeline.c b/lib/librte_pipeline/rte_swx_pipeline.c
index 2ae6229d0..d7af80e39 100644
--- a/lib/librte_pipeline/rte_swx_pipeline.c
+++ b/lib/librte_pipeline/rte_swx_pipeline.c
@@ -8,6 +8,7 @@
 #include <sys/queue.h>
 
 #include <rte_common.h>
+#include <rte_prefetch.h>
 
 #include "rte_swx_pipeline.h"
 #include "rte_swx_ctl.h"
@@ -21,6 +22,16 @@ do {                                                                           \
 #define CHECK_NAME(name, err_code)                                             \
 	CHECK((name) && (name)[0], err_code)
 
+#ifndef TRACE_LEVEL
+#define TRACE_LEVEL 0
+#endif
+
+#if TRACE_LEVEL
+#define TRACE(...) printf(__VA_ARGS__)
+#else
+#define TRACE(...)
+#endif
+
 /*
  * Struct.
  */
@@ -181,7 +192,64 @@ struct header_out_runtime {
 /*
  * Instruction.
  */
+
+/* Packet headers are always in Network Byte Order (NBO), i.e. big endian.
+ * Packet meta-data fields are always assumed to be in Host Byte Order (HBO).
+ * Table entry fields can be in either NBO or HBO; they are assumed to be in HBO
+ * when transferred to packet meta-data and in NBO when transferred to packet
+ * headers.
+ */
+
+/* Notation conventions:
+ *    -Header field: H = h.header.field (dst/src)
+ *    -Meta-data field: M = m.field (dst/src)
+ *    -Extern object mailbox field: E = e.field (dst/src)
+ *    -Extern function mailbox field: F = f.field (dst/src)
+ *    -Table action data field: T = t.field (src only)
+ *    -Immediate value: I = 32-bit unsigned value (src only)
+ */
+
+enum instruction_type {
+	/* rx m.port_in */
+	INSTR_RX,
+
+	/* extract h.header */
+	INSTR_HDR_EXTRACT,
+	INSTR_HDR_EXTRACT2,
+	INSTR_HDR_EXTRACT3,
+	INSTR_HDR_EXTRACT4,
+	INSTR_HDR_EXTRACT5,
+	INSTR_HDR_EXTRACT6,
+	INSTR_HDR_EXTRACT7,
+	INSTR_HDR_EXTRACT8,
+};
+
+struct instr_io {
+	struct {
+		uint8_t offset;
+		uint8_t n_bits;
+		uint8_t pad[2];
+	} io;
+
+	struct {
+		uint8_t header_id[8];
+		uint8_t struct_id[8];
+		uint8_t n_bytes[8];
+	} hdr;
+};
+
 struct instruction {
+	enum instruction_type type;
+	union {
+		struct instr_io io;
+	};
+};
+
+struct instruction_data {
+	char label[RTE_SWX_NAME_SIZE];
+	char jmp_label[RTE_SWX_NAME_SIZE];
+	uint32_t n_users; /* user = jmp instruction to this instruction. */
+	int invalid;
 };
 
 /*
@@ -251,6 +319,10 @@ struct table_runtime {
  * Pipeline.
  */
 struct thread {
+	/* Packet. */
+	struct rte_swx_pkt pkt;
+	uint8_t *ptr;
+
 	/* Structures. */
 	uint8_t **structs;
 
@@ -280,6 +352,29 @@ struct thread {
 	struct instruction *ret;
 };
 
+#define MASK64_BIT_GET(mask, pos) ((mask) & (1LLU << (pos)))
+#define MASK64_BIT_SET(mask, pos) ((mask) | (1LLU << (pos)))
+#define MASK64_BIT_CLR(mask, pos) ((mask) & ~(1LLU << (pos)))
+
+#define METADATA_READ(thread, offset, n_bits)                                  \
+({                                                                             \
+	uint64_t *m64_ptr = (uint64_t *)&(thread)->metadata[offset];           \
+	uint64_t m64 = *m64_ptr;                                               \
+	uint64_t m64_mask = UINT64_MAX >> (64 - (n_bits));                     \
+	(m64 & m64_mask);                                                      \
+})
+
+#define METADATA_WRITE(thread, offset, n_bits, value)                          \
+{                                                                              \
+	uint64_t *m64_ptr = (uint64_t *)&(thread)->metadata[offset];           \
+	uint64_t m64 = *m64_ptr;                                               \
+	uint64_t m64_mask = UINT64_MAX >> (64 - (n_bits));                     \
+									       \
+	uint64_t m_new = value;                                                \
+									       \
+	*m64_ptr = (m64 & ~m64_mask) | (m_new & m64_mask);                     \
+}
+
 #ifndef RTE_SWX_PIPELINE_THREADS_MAX
 #define RTE_SWX_PIPELINE_THREADS_MAX 16
 #endif
@@ -315,6 +410,8 @@ struct rte_swx_pipeline {
 	uint32_t n_actions;
 	uint32_t n_tables;
 	uint32_t n_headers;
+	uint32_t thread_id;
+	uint32_t port_id;
 	uint32_t n_instructions;
 	int build_done;
 	int numa_node;
@@ -1187,6 +1284,16 @@ header_find(struct rte_swx_pipeline *p, const char *name)
 	return NULL;
 }
 
+static struct header *
+header_parse(struct rte_swx_pipeline *p,
+	     const char *name)
+{
+	if (name[0] != 'h' || name[1] != '.')
+		return NULL;
+
+	return header_find(p, &name[2]);
+}
+
 static struct field *
 header_field_parse(struct rte_swx_pipeline *p,
 		   const char *name,
@@ -1430,19 +1537,459 @@ metadata_free(struct rte_swx_pipeline *p)
 /*
  * Instruction.
  */
+static inline void
+pipeline_port_inc(struct rte_swx_pipeline *p)
+{
+	p->port_id = (p->port_id + 1) & (p->n_ports_in - 1);
+}
+
 static inline void
 thread_ip_reset(struct rte_swx_pipeline *p, struct thread *t)
 {
 	t->ip = p->instructions;
 }
 
+static inline void
+thread_ip_inc(struct rte_swx_pipeline *p);
+
+static inline void
+thread_ip_inc(struct rte_swx_pipeline *p)
+{
+	struct thread *t = &p->threads[p->thread_id];
+
+	t->ip++;
+}
+
+static inline void
+thread_ip_inc_cond(struct thread *t, int cond)
+{
+	t->ip += cond;
+}
+
+static inline void
+thread_yield(struct rte_swx_pipeline *p)
+{
+	p->thread_id = (p->thread_id + 1) & (RTE_SWX_PIPELINE_THREADS_MAX - 1);
+}
+
+/*
+ * rx.
+ */
+static int
+instr_rx_translate(struct rte_swx_pipeline *p,
+		   struct action *action,
+		   char **tokens,
+		   int n_tokens,
+		   struct instruction *instr,
+		   struct instruction_data *data __rte_unused)
+{
+	struct field *f;
+
+	CHECK(!action, EINVAL);
+	CHECK(n_tokens == 2, EINVAL);
+
+	f = metadata_field_parse(p, tokens[1]);
+	CHECK(f, EINVAL);
+
+	instr->type = INSTR_RX;
+	instr->io.io.offset = f->offset / 8;
+	instr->io.io.n_bits = f->n_bits;
+	return 0;
+}
+
+static inline void
+instr_rx_exec(struct rte_swx_pipeline *p);
+
+static inline void
+instr_rx_exec(struct rte_swx_pipeline *p)
+{
+	struct thread *t = &p->threads[p->thread_id];
+	struct instruction *ip = t->ip;
+	struct port_in_runtime *port = &p->in[p->port_id];
+	struct rte_swx_pkt *pkt = &t->pkt;
+	int pkt_received;
+
+	/* Packet. */
+	pkt_received = port->pkt_rx(port->obj, pkt);
+	t->ptr = &pkt->pkt[pkt->offset];
+	rte_prefetch0(t->ptr);
+
+	TRACE("[Thread %2u] rx %s from port %u\n",
+	      p->thread_id,
+	      pkt_received ? "1 pkt" : "0 pkts",
+	      p->port_id);
+
+	/* Headers. */
+	t->valid_headers = 0;
+	t->n_headers_out = 0;
+
+	/* Meta-data. */
+	METADATA_WRITE(t, ip->io.io.offset, ip->io.io.n_bits, p->port_id);
+
+	/* Tables. */
+	t->table_state = p->table_state;
+
+	/* Thread. */
+	pipeline_port_inc(p);
+	thread_ip_inc_cond(t, pkt_received);
+	thread_yield(p);
+}
+
+/*
+ * extract.
+ */
+static int
+instr_hdr_extract_translate(struct rte_swx_pipeline *p,
+			    struct action *action,
+			    char **tokens,
+			    int n_tokens,
+			    struct instruction *instr,
+			    struct instruction_data *data __rte_unused)
+{
+	struct header *h;
+
+	CHECK(!action, EINVAL);
+	CHECK(n_tokens == 2, EINVAL);
+
+	h = header_parse(p, tokens[1]);
+	CHECK(h, EINVAL);
+
+	instr->type = INSTR_HDR_EXTRACT;
+	instr->io.hdr.header_id[0] = h->id;
+	instr->io.hdr.struct_id[0] = h->struct_id;
+	instr->io.hdr.n_bytes[0] = h->st->n_bits / 8;
+	return 0;
+}
+
+static inline void
+__instr_hdr_extract_exec(struct rte_swx_pipeline *p, uint32_t n_extract);
+
+static inline void
+__instr_hdr_extract_exec(struct rte_swx_pipeline *p, uint32_t n_extract)
+{
+	struct thread *t = &p->threads[p->thread_id];
+	struct instruction *ip = t->ip;
+	uint64_t valid_headers = t->valid_headers;
+	uint8_t *ptr = t->ptr;
+	uint32_t offset = t->pkt.offset;
+	uint32_t length = t->pkt.length;
+	uint32_t i;
+
+	for (i = 0; i < n_extract; i++) {
+		uint32_t header_id = ip->io.hdr.header_id[i];
+		uint32_t struct_id = ip->io.hdr.struct_id[i];
+		uint32_t n_bytes = ip->io.hdr.n_bytes[i];
+
+		TRACE("[Thread %2u]: extract header %u (%u bytes)\n",
+		      p->thread_id,
+		      header_id,
+		      n_bytes);
+
+		/* Headers. */
+		t->structs[struct_id] = ptr;
+		valid_headers = MASK64_BIT_SET(valid_headers, header_id);
+
+		/* Packet. */
+		offset += n_bytes;
+		length -= n_bytes;
+		ptr += n_bytes;
+	}
+
+	/* Headers. */
+	t->valid_headers = valid_headers;
+
+	/* Packet. */
+	t->pkt.offset = offset;
+	t->pkt.length = length;
+	t->ptr = ptr;
+}
+
+static inline void
+instr_hdr_extract_exec(struct rte_swx_pipeline *p)
+{
+	__instr_hdr_extract_exec(p, 1);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract2_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 2 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 2);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract3_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 3 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 3);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract4_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 4 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 4);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract5_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 5 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 5);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract6_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 6 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 6);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract7_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 7 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 7);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract8_exec(struct rte_swx_pipeline *p)
+{
+	TRACE("[Thread %2u] *** The next 8 instructions are fused. ***\n",
+	      p->thread_id);
+
+	__instr_hdr_extract_exec(p, 8);
+
+	/* Thread. */
+	thread_ip_inc(p);
+}
+
+#define RTE_SWX_INSTRUCTION_TOKENS_MAX 16
+
+static int
+instr_translate(struct rte_swx_pipeline *p,
+		struct action *action,
+		char *string,
+		struct instruction *instr,
+		struct instruction_data *data)
+{
+	char *tokens[RTE_SWX_INSTRUCTION_TOKENS_MAX];
+	int n_tokens = 0, tpos = 0;
+
+	/* Parse the instruction string into tokens. */
+	for ( ; ; ) {
+		char *token;
+
+		token = strtok_r(string, " \t\v", &string);
+		if (!token)
+			break;
+
+		CHECK(n_tokens < RTE_SWX_INSTRUCTION_TOKENS_MAX, EINVAL);
+
+		tokens[n_tokens] = token;
+		n_tokens++;
+	}
+
+	CHECK(n_tokens, EINVAL);
+
+	/* Handle the optional instruction label. */
+	if ((n_tokens >= 2) && !strcmp(tokens[1], ":")) {
+		strcpy(data->label, tokens[0]);
+
+		tpos += 2;
+		CHECK(n_tokens - tpos, EINVAL);
+	}
+
+	/* Identify the instruction type. */
+	if (!strcmp(tokens[tpos], "rx"))
+		return instr_rx_translate(p,
+					  action,
+					  &tokens[tpos],
+					  n_tokens - tpos,
+					  instr,
+					  data);
+
+	if (!strcmp(tokens[tpos], "extract"))
+		return instr_hdr_extract_translate(p,
+						   action,
+						   &tokens[tpos],
+						   n_tokens - tpos,
+						   instr,
+						   data);
+
+	CHECK(0, EINVAL);
+}
+
+static uint32_t
+label_is_used(struct instruction_data *data, uint32_t n, const char *label)
+{
+	uint32_t count = 0, i;
+
+	if (!label[0])
+		return 0;
+
+	for (i = 0; i < n; i++)
+		if (!strcmp(label, data[i].jmp_label))
+			count++;
+
+	return count;
+}
+
 static int
-instruction_config(struct rte_swx_pipeline *p __rte_unused,
-		   struct action *a __rte_unused,
-		   const char **instructions __rte_unused,
-		   uint32_t n_instructions __rte_unused)
+instr_label_check(struct instruction_data *instruction_data,
+		  uint32_t n_instructions)
 {
+	uint32_t i;
+
+	/* Check that all instruction labels are unique. */
+	for (i = 0; i < n_instructions; i++) {
+		struct instruction_data *data = &instruction_data[i];
+		char *label = data->label;
+		uint32_t j;
+
+		if (!label[0])
+			continue;
+
+		for (j = i + 1; j < n_instructions; j++)
+			CHECK(strcmp(label, data[j].label), EINVAL);
+	}
+
+	/* Get users for each instruction label. */
+	for (i = 0; i < n_instructions; i++) {
+		struct instruction_data *data = &instruction_data[i];
+		char *label = data->label;
+
+		data->n_users = label_is_used(instruction_data,
+					      n_instructions,
+					      label);
+	}
+
+	return 0;
+}
+
+static int
+instruction_config(struct rte_swx_pipeline *p,
+		   struct action *a,
+		   const char **instructions,
+		   uint32_t n_instructions)
+{
+	struct instruction *instr = NULL;
+	struct instruction_data *data = NULL;
+	char *string = NULL;
+	int err = 0;
+	uint32_t i;
+
+	CHECK(n_instructions, EINVAL);
+	CHECK(instructions, EINVAL);
+	for (i = 0; i < n_instructions; i++)
+		CHECK(instructions[i], EINVAL);
+
+	/* Memory allocation. */
+	instr = calloc(n_instructions, sizeof(struct instruction));
+	if (!instr) {
+		err = ENOMEM;
+		goto error;
+	}
+
+	data = calloc(n_instructions, sizeof(struct instruction_data));
+	if (!data) {
+		err = ENOMEM;
+		goto error;
+	}
+
+	for (i = 0; i < n_instructions; i++) {
+		string = strdup(instructions[i]);
+		if (!string) {
+			err = ENOMEM;
+			goto error;
+		}
+
+		err = instr_translate(p, a, string, &instr[i], &data[i]);
+		if (err)
+			goto error;
+
+		free(string);
+	}
+
+	err = instr_label_check(data, n_instructions);
+	if (err)
+		goto error;
+
+	free(data);
+
+	if (a) {
+		a->instructions = instr;
+		a->n_instructions = n_instructions;
+	} else {
+		p->instructions = instr;
+		p->n_instructions = n_instructions;
+	}
+
 	return 0;
+
+error:
+	free(string);
+	free(data);
+	free(instr);
+	return err;
+}
+
+typedef void (*instr_exec_t)(struct rte_swx_pipeline *);
+
+static instr_exec_t instruction_table[] = {
+	[INSTR_RX] = instr_rx_exec,
+
+	[INSTR_HDR_EXTRACT] = instr_hdr_extract_exec,
+	[INSTR_HDR_EXTRACT2] = instr_hdr_extract2_exec,
+	[INSTR_HDR_EXTRACT3] = instr_hdr_extract3_exec,
+	[INSTR_HDR_EXTRACT4] = instr_hdr_extract4_exec,
+	[INSTR_HDR_EXTRACT5] = instr_hdr_extract5_exec,
+	[INSTR_HDR_EXTRACT6] = instr_hdr_extract6_exec,
+	[INSTR_HDR_EXTRACT7] = instr_hdr_extract7_exec,
+	[INSTR_HDR_EXTRACT8] = instr_hdr_extract8_exec,
+};
+
+static inline void
+instr_exec(struct rte_swx_pipeline *p)
+{
+	struct thread *t = &p->threads[p->thread_id];
+	struct instruction *ip = t->ip;
+	instr_exec_t instr = instruction_table[ip->type];
+
+	instr(p);
 }
 
 /*
@@ -2226,6 +2773,15 @@ rte_swx_pipeline_build(struct rte_swx_pipeline *p)
 	return status;
 }
 
+void
+rte_swx_pipeline_run(struct rte_swx_pipeline *p, uint32_t n_instructions)
+{
+	uint32_t i;
+
+	for (i = 0; i < n_instructions; i++)
+		instr_exec(p);
+}
+
 /*
  * Control.
  */
diff --git a/lib/librte_pipeline/rte_swx_pipeline.h b/lib/librte_pipeline/rte_swx_pipeline.h
index 47a0f8dcc..fb83a8820 100644
--- a/lib/librte_pipeline/rte_swx_pipeline.h
+++ b/lib/librte_pipeline/rte_swx_pipeline.h
@@ -534,6 +534,19 @@ __rte_experimental
 int
 rte_swx_pipeline_build(struct rte_swx_pipeline *p);
 
+/**
+ * Pipeline run
+ *
+ * @param[in] p
+ *   Pipeline handle.
+ * @param[in] n_instructions
+ *   Number of instructions to execute.
+ */
+__rte_experimental
+void
+rte_swx_pipeline_run(struct rte_swx_pipeline *p,
+		     uint32_t n_instructions);
+
 /**
  * Pipeline free
  *
-- 
2.17.1



More information about the dev mailing list