[PATCH v3] test/flow: add support for async API

Maxime Peim maxime.peim at gmail.com
Mon Mar 2 11:57:50 CET 2026


Add async flow API mode to test-flow-perf application for improved
flow rule insertion performance. The async API allows batching flow
rule creation operations and processing completions in bulk, reducing
per-rule overhead.

New command line options:
  --async: enable async flow API mode
  --async-queue-size=N: size of async queues (default: 1024)
  --async-push-batch=N: flows to batch before push (default: 256)

Signed-off-by: Maxime Peim <maxime.peim at gmail.com>
---
v2:
  - Replace per-flow stack allocation with pre-allocated slot pool;
    flat buffers are initialized once at init time and the hot path
    only patches per-flow item/action values into a pre-set slot
  - Fix alloca misuse: use heap allocation for queue_attr_list, round
    queue_size to power of 2 for bitmask wrapping, add bounds checks
  - Fix race on file-scope flow variable, premature latency
    measurement, and integer division in rate calculation
  - Drop unrelated lgopts reformatting
  - Use malloc instead of rte_zmalloc for non-dataplane allocations
  - Various robustness and style fixes

v3:
  - Update meson.build to exclude Windows build for flow perf test
  - Fix checkstyle
  - Remove cast from void* to uintptr_t
  - Add name to mailmap and maintainers

 .mailmap                         |   1 +
 MAINTAINERS                      |   1 +
 app/test-flow-perf/actions_gen.c | 281 +++++++++++-
 app/test-flow-perf/actions_gen.h |  31 ++
 app/test-flow-perf/async_flow.c  | 758 +++++++++++++++++++++++++++++++
 app/test-flow-perf/async_flow.h  |  54 +++
 app/test-flow-perf/items_gen.c   |  58 +++
 app/test-flow-perf/items_gen.h   |   6 +
 app/test-flow-perf/main.c        | 303 +++++++++++-
 app/test-flow-perf/meson.build   |   7 +
 10 files changed, 1460 insertions(+), 40 deletions(-)
 create mode 100644 app/test-flow-perf/async_flow.c
 create mode 100644 app/test-flow-perf/async_flow.h

diff --git a/.mailmap b/.mailmap
index 6c4c977dde..a0141402c3 100644
--- a/.mailmap
+++ b/.mailmap
@@ -1044,6 +1044,7 @@ Mauro Annarumma <mauroannarumma at hotmail.it>
 Maxime Coquelin <maxime.coquelin at redhat.com>
 Maxime Gouin <maxime.gouin at 6wind.com>
 Maxime Leroy <maxime at leroys.fr> <maxime.leroy at 6wind.com>
+Maxime Peim <maxime.peim at gmail.com>
 Md Fahad Iqbal Polash <md.fahad.iqbal.polash at intel.com>
 Megha Ajmera <megha.ajmera at intel.com>
 Meijuan Zhao <meijuanx.zhao at intel.com>
diff --git a/MAINTAINERS b/MAINTAINERS
index 1b2f1ed2ba..d4c01037c8 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1964,6 +1964,7 @@ F: doc/guides/tools/dmaperf.rst
 
 Flow performance tool
 M: Wisam Jaddo <wisamm at nvidia.com>
+M: Maxime Peim <maxime.peim at gmail.com>
 F: app/test-flow-perf/
 F: doc/guides/tools/flow-perf.rst
 
diff --git a/app/test-flow-perf/actions_gen.c b/app/test-flow-perf/actions_gen.c
index 9d102e3af4..4a5b6fb2ff 100644
--- a/app/test-flow-perf/actions_gen.c
+++ b/app/test-flow-perf/actions_gen.c
@@ -36,27 +36,7 @@ struct additional_para {
 	bool unique_data;
 };
 
-/* Storage for struct rte_flow_action_raw_encap including external data. */
-struct action_raw_encap_data {
-	struct rte_flow_action_raw_encap conf;
-	uint8_t data[128];
-	uint8_t preserve[128];
-	uint16_t idx;
-};
-
-/* Storage for struct rte_flow_action_raw_decap including external data. */
-struct action_raw_decap_data {
-	struct rte_flow_action_raw_decap conf;
-	uint8_t data[128];
-	uint16_t idx;
-};
-
-/* Storage for struct rte_flow_action_rss including external data. */
-struct action_rss_data {
-	struct rte_flow_action_rss conf;
-	uint8_t key[40];
-	uint16_t queue[128];
-};
+/* Compound action data structs defined in actions_gen.h */
 
 static void
 add_mark(struct rte_flow_action *actions,
@@ -1165,3 +1145,262 @@ fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,
 	free(queues);
 	free(hairpin_queues);
 }
+
+static size_t
+action_conf_size(enum rte_flow_action_type type)
+{
+	switch (type) {
+	case RTE_FLOW_ACTION_TYPE_MARK:
+		return sizeof(struct rte_flow_action_mark);
+	case RTE_FLOW_ACTION_TYPE_QUEUE:
+		return sizeof(struct rte_flow_action_queue);
+	case RTE_FLOW_ACTION_TYPE_JUMP:
+		return sizeof(struct rte_flow_action_jump);
+	case RTE_FLOW_ACTION_TYPE_RSS:
+		return sizeof(struct action_rss_data);
+	case RTE_FLOW_ACTION_TYPE_SET_META:
+		return sizeof(struct rte_flow_action_set_meta);
+	case RTE_FLOW_ACTION_TYPE_SET_TAG:
+		return sizeof(struct rte_flow_action_set_tag);
+	case RTE_FLOW_ACTION_TYPE_PORT_ID:
+		return sizeof(struct rte_flow_action_port_id);
+	case RTE_FLOW_ACTION_TYPE_COUNT:
+		return sizeof(struct rte_flow_action_count);
+	case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_MAC_DST:
+		return sizeof(struct rte_flow_action_set_mac);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST:
+		return sizeof(struct rte_flow_action_set_ipv4);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST:
+		return sizeof(struct rte_flow_action_set_ipv6);
+	case RTE_FLOW_ACTION_TYPE_SET_TP_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_TP_DST:
+		return sizeof(struct rte_flow_action_set_tp);
+	case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
+	case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
+	case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
+	case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ:
+		return sizeof(rte_be32_t);
+	case RTE_FLOW_ACTION_TYPE_SET_TTL:
+		return sizeof(struct rte_flow_action_set_ttl);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP:
+		return sizeof(struct rte_flow_action_set_dscp);
+	case RTE_FLOW_ACTION_TYPE_METER:
+		return sizeof(struct rte_flow_action_meter);
+	case RTE_FLOW_ACTION_TYPE_RAW_ENCAP:
+		return sizeof(struct action_raw_encap_data);
+	case RTE_FLOW_ACTION_TYPE_RAW_DECAP:
+		return sizeof(struct action_raw_decap_data);
+	case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP:
+		return sizeof(struct rte_flow_action_vxlan_encap) +
+		       5 * sizeof(struct rte_flow_item) + sizeof(struct rte_flow_item_eth) +
+		       sizeof(struct rte_flow_item_ipv4) + sizeof(struct rte_flow_item_udp) +
+		       sizeof(struct rte_flow_item_vxlan);
+	case RTE_FLOW_ACTION_TYPE_MODIFY_FIELD:
+		return sizeof(struct rte_flow_action_modify_field);
+	/* Zero-conf types */
+	case RTE_FLOW_ACTION_TYPE_DROP:
+	case RTE_FLOW_ACTION_TYPE_FLAG:
+	case RTE_FLOW_ACTION_TYPE_DEC_TTL:
+	case RTE_FLOW_ACTION_TYPE_VXLAN_DECAP:
+		return 0;
+	default:
+		return 0;
+	}
+}
+
+void
+fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,
+		      uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,
+		      bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out)
+{
+	uint8_t actions_counter = 0;
+	uint8_t i, j;
+
+	*need_wire_orig_table = false;
+	memset(port_attr, 0, sizeof(*port_attr));
+
+	/* Static configurations for actions that need them in templates */
+	static struct rte_flow_action_mark mark_conf = {
+		.id = 1,
+	};
+	static struct rte_flow_action_queue queue_conf = {
+		.index = 0,
+	};
+	static struct rte_flow_action_port_id port_id_conf = {
+		.id = 0,
+	};
+	static struct rte_flow_action_jump jump_conf = {
+		.group = 1,
+	};
+	static struct rte_flow_action_modify_field set_meta_conf = {
+		.operation = RTE_FLOW_MODIFY_SET,
+		.dst = {.field = RTE_FLOW_FIELD_META},
+		.src = {
+			.field = RTE_FLOW_FIELD_VALUE,
+			.value = {0, 0, 0, META_DATA},
+		},
+		.width = 32,
+	};
+
+	/* Static mask configurations for each action type */
+	static struct rte_flow_action_mark mark_mask = {
+		.id = UINT32_MAX,
+	};
+	static struct rte_flow_action_queue queue_mask = {
+		.index = UINT16_MAX,
+	};
+	static struct rte_flow_action_jump jump_mask = {
+		.group = UINT32_MAX,
+	};
+	static struct rte_flow_action_rss rss_mask = {
+		.level = UINT32_MAX,
+		.types = UINT64_MAX,
+	};
+	static struct rte_flow_action_set_meta set_meta_mask = {
+		.data = UINT32_MAX,
+		.mask = UINT32_MAX,
+	};
+	static struct rte_flow_action_set_tag set_tag_mask = {
+		.data = UINT32_MAX,
+		.mask = UINT32_MAX,
+		.index = UINT8_MAX,
+	};
+	static struct rte_flow_action_port_id port_id_mask = {
+		.id = UINT32_MAX,
+	};
+	static struct rte_flow_action_count count_mask;
+	static struct rte_flow_action_set_mac set_mac_mask = {
+		.mac_addr = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
+	};
+	static struct rte_flow_action_set_ipv4 set_ipv4_mask = {
+		.ipv4_addr = UINT32_MAX,
+	};
+	static struct rte_flow_action_set_ipv6 set_ipv6_mask = {
+		.ipv6_addr.a = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+				0xff, 0xff, 0xff, 0xff, 0xff}};
+	static struct rte_flow_action_set_tp set_tp_mask = {
+		.port = UINT16_MAX,
+	};
+	static rte_be32_t tcp_seq_ack_mask = UINT32_MAX;
+	static struct rte_flow_action_set_ttl set_ttl_mask = {
+		.ttl_value = UINT8_MAX,
+	};
+	static struct rte_flow_action_set_dscp set_dscp_mask = {
+		.dscp = UINT8_MAX,
+	};
+	static struct rte_flow_action_meter meter_mask = {
+		.mtr_id = UINT32_MAX,
+	};
+
+	static const struct {
+		uint64_t flow_mask;
+		enum rte_flow_action_type type;
+		const void *action_conf;
+		const void *action_mask;
+		const bool need_wire_orig_table;
+	} template_actions[] = {
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MARK), RTE_FLOW_ACTION_TYPE_MARK, &mark_conf,
+		 &mark_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_COUNT), RTE_FLOW_ACTION_TYPE_COUNT, NULL,
+		 &count_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MODIFY_FIELD),
+		 RTE_FLOW_ACTION_TYPE_MODIFY_FIELD, &set_meta_conf, &set_meta_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TAG), RTE_FLOW_ACTION_TYPE_SET_TAG, NULL,
+		 &set_tag_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_FLAG), RTE_FLOW_ACTION_TYPE_FLAG, NULL, NULL,
+		 false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_MAC_SRC, NULL, &set_mac_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_MAC_DST, NULL, &set_mac_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC, NULL, &set_ipv4_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_DST, NULL, &set_ipv4_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC, NULL, &set_ipv6_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_DST, NULL, &set_ipv6_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_SRC), RTE_FLOW_ACTION_TYPE_SET_TP_SRC,
+		 NULL, &set_tp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_DST), RTE_FLOW_ACTION_TYPE_SET_TP_DST,
+		 NULL, &set_tp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_ACK),
+		 RTE_FLOW_ACTION_TYPE_INC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK),
+		 RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ),
+		 RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ),
+		 RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TTL), RTE_FLOW_ACTION_TYPE_SET_TTL, NULL,
+		 &set_ttl_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TTL), RTE_FLOW_ACTION_TYPE_DEC_TTL, NULL,
+		 NULL, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP, NULL, &set_dscp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP, NULL, &set_dscp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_QUEUE), RTE_FLOW_ACTION_TYPE_QUEUE,
+		 &queue_conf, &queue_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_RSS), RTE_FLOW_ACTION_TYPE_RSS, NULL,
+		 &rss_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_JUMP), RTE_FLOW_ACTION_TYPE_JUMP, &jump_conf,
+		 &jump_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_PORT_ID), RTE_FLOW_ACTION_TYPE_PORT_ID,
+		 &port_id_conf, &port_id_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DROP), RTE_FLOW_ACTION_TYPE_DROP, NULL, NULL,
+		 false},
+		{HAIRPIN_QUEUE_ACTION, RTE_FLOW_ACTION_TYPE_QUEUE, &queue_conf, &queue_mask, false},
+		{HAIRPIN_RSS_ACTION, RTE_FLOW_ACTION_TYPE_RSS, NULL, &rss_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_METER), RTE_FLOW_ACTION_TYPE_METER, NULL,
+		 &meter_mask, false},
+	};
+
+	for (j = 0; j < MAX_ACTIONS_NUM; j++) {
+		if (flow_actions[j] == 0)
+			break;
+		for (i = 0; i < RTE_DIM(template_actions); i++) {
+			if ((flow_actions[j] & template_actions[i].flow_mask) == 0)
+				continue;
+
+			switch (template_actions[i].type) {
+			case RTE_FLOW_ACTION_TYPE_COUNT:
+				port_attr->nb_counters++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_AGE:
+				port_attr->nb_aging_objects++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_METER:
+				port_attr->nb_meters++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_CONNTRACK:
+				port_attr->nb_conn_tracks++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_QUOTA:
+				port_attr->nb_quotas++;
+			default:
+				break;
+			}
+
+			actions[actions_counter].type = template_actions[i].type;
+			actions[actions_counter].conf = template_actions[i].action_conf;
+			masks[actions_counter].type = template_actions[i].type;
+			masks[actions_counter].conf = template_actions[i].action_mask;
+			conf_sizes[actions_counter] = action_conf_size(template_actions[i].type);
+			*need_wire_orig_table |= template_actions[i].need_wire_orig_table;
+			actions_counter++;
+			break;
+		}
+	}
+
+	actions[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
+	masks[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
+
+	/* take END into account */
+	*n_actions_out = actions_counter + 1;
+}
diff --git a/app/test-flow-perf/actions_gen.h b/app/test-flow-perf/actions_gen.h
index 9e13b164f9..3ac0ffed59 100644
--- a/app/test-flow-perf/actions_gen.h
+++ b/app/test-flow-perf/actions_gen.h
@@ -17,9 +17,40 @@
 #define RTE_VXLAN_GPE_UDP_PORT 250
 #define RTE_GENEVE_UDP_PORT 6081
 
+/* Compound action data structures (needed by async_flow.c for slot init) */
+
+/* Storage for struct rte_flow_action_raw_encap including external data. */
+struct action_raw_encap_data {
+	struct rte_flow_action_raw_encap conf;
+	uint8_t data[128];
+	uint8_t preserve[128];
+	uint16_t idx;
+};
+
+/* Storage for struct rte_flow_action_raw_decap including external data. */
+struct action_raw_decap_data {
+	struct rte_flow_action_raw_decap conf;
+	uint8_t data[128];
+	uint16_t idx;
+};
+
+/* Storage for struct rte_flow_action_rss including external data. */
+struct action_rss_data {
+	struct rte_flow_action_rss conf;
+	uint8_t key[40];
+	uint16_t queue[128];
+};
+
 void fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,
 	uint32_t counter, uint16_t next_table, uint16_t hairpinq,
 	uint64_t encap_data, uint64_t decap_data, uint8_t core_idx,
 	bool unique_data, uint8_t rx_queues_count, uint16_t dst_port);
 
+/* Fill actions template for async flow API (types only, no values).
+ * If conf_sizes is non-NULL, populates per-action conf sizes and n_actions_out.
+ */
+void fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,
+			   uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,
+			   bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out);
+
 #endif /* FLOW_PERF_ACTION_GEN */
diff --git a/app/test-flow-perf/async_flow.c b/app/test-flow-perf/async_flow.c
new file mode 100644
index 0000000000..71728845cc
--- /dev/null
+++ b/app/test-flow-perf/async_flow.c
@@ -0,0 +1,758 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2026 Maxime Peim <maxime.peim at gmail.com>
+ *
+ * This file contains the async flow API implementation
+ * for the flow-perf application.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <rte_bitops.h>
+#include <rte_common.h>
+#include <rte_ethdev.h>
+#include <rte_flow.h>
+#include <rte_vxlan.h>
+
+#include "actions_gen.h"
+#include "async_flow.h"
+#include "flow_gen.h"
+#include "items_gen.h"
+
+/* Max iterations when draining pending async completions during cleanup */
+#define DRAIN_MAX_ITERATIONS 100
+
+/* Per-port async flow resources */
+static struct async_flow_resources port_resources[MAX_PORTS];
+
+/*
+ * Initialize compound action types within a pre-allocated slot.
+ * Called once per slot during pool init to set up internal pointers
+ * for RSS, RAW_ENCAP, RAW_DECAP and VXLAN_ENCAP actions.
+ */
+static void
+init_slot_compound_actions(struct rte_flow_action *actions, uint32_t n_actions,
+			   const size_t *action_conf_sizes)
+{
+	uint32_t i;
+
+	for (i = 0; i < n_actions; i++) {
+		if (action_conf_sizes[i] == 0)
+			continue;
+
+		switch (actions[i].type) {
+		case RTE_FLOW_ACTION_TYPE_RSS: {
+			struct action_rss_data *rss = actions[i].conf;
+			rss->conf.func = RTE_ETH_HASH_FUNCTION_DEFAULT;
+			rss->conf.level = 0;
+			rss->conf.types = GET_RSS_HF();
+			rss->conf.key_len = sizeof(rss->key);
+			rss->conf.key = rss->key;
+			rss->conf.queue = rss->queue;
+			rss->key[0] = 1;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
+			struct action_raw_encap_data *encap = actions[i].conf;
+			encap->conf.data = encap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
+			struct action_raw_decap_data *decap = actions[i].conf;
+			decap->conf.data = decap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
+			/*
+			 * Layout within the conf area:
+			 *   struct rte_flow_action_vxlan_encap
+			 *   struct rte_flow_item[5]
+			 *   struct rte_flow_item_eth
+			 *   struct rte_flow_item_ipv4
+			 *   struct rte_flow_item_udp
+			 *   struct rte_flow_item_vxlan
+			 */
+			uint8_t *base = actions[i].conf;
+			struct rte_flow_action_vxlan_encap *ve =
+				(struct rte_flow_action_vxlan_encap *)base;
+			struct rte_flow_item *items =
+				(struct rte_flow_item
+					 *)(base + sizeof(struct rte_flow_action_vxlan_encap));
+			uint8_t *data = (uint8_t *)(items + 5);
+
+			struct rte_flow_item_eth *item_eth = (struct rte_flow_item_eth *)data;
+			data += sizeof(struct rte_flow_item_eth);
+			struct rte_flow_item_ipv4 *item_ipv4 = (struct rte_flow_item_ipv4 *)data;
+			data += sizeof(struct rte_flow_item_ipv4);
+			struct rte_flow_item_udp *item_udp = (struct rte_flow_item_udp *)data;
+			data += sizeof(struct rte_flow_item_udp);
+			struct rte_flow_item_vxlan *item_vxlan = (struct rte_flow_item_vxlan *)data;
+
+			memset(item_eth, 0, sizeof(*item_eth));
+			memset(item_ipv4, 0, sizeof(*item_ipv4));
+			memset(item_udp, 0, sizeof(*item_udp));
+			memset(item_vxlan, 0, sizeof(*item_vxlan));
+
+			item_ipv4->hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
+			item_ipv4->hdr.version_ihl = RTE_IPV4_VHL_DEF;
+			item_udp->hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
+			item_vxlan->hdr.vni[2] = 1;
+
+			items[0].type = RTE_FLOW_ITEM_TYPE_ETH;
+			items[0].spec = item_eth;
+			items[0].mask = item_eth;
+			items[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
+			items[1].spec = item_ipv4;
+			items[1].mask = item_ipv4;
+			items[2].type = RTE_FLOW_ITEM_TYPE_UDP;
+			items[2].spec = item_udp;
+			items[2].mask = item_udp;
+			items[3].type = RTE_FLOW_ITEM_TYPE_VXLAN;
+			items[3].spec = item_vxlan;
+			items[3].mask = item_vxlan;
+			items[4].type = RTE_FLOW_ITEM_TYPE_END;
+
+			ve->definition = items;
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+/*
+ * Allocate and pre-initialize all per-slot flat buffers.
+ * Returns 0 on success.
+ */
+static int
+init_slot_pool(struct async_flow_resources *res, uint32_t nb_queues, uint32_t queue_size,
+	       const struct rte_flow_item *pattern, uint32_t n_items, const size_t *item_spec_sizes,
+	       const struct rte_flow_action *template_actions, uint32_t n_actions,
+	       const size_t *action_conf_sizes)
+{
+	uint32_t items_array_bytes, actions_array_bytes;
+	uint32_t spec_data_bytes, conf_data_bytes, mask_data_bytes;
+	uint32_t slot_size, num_slots;
+	uint32_t s, i;
+	uint8_t *mptr;
+
+	/* Compute shared mask size */
+	mask_data_bytes = 0;
+	for (i = 0; i < n_items; i++)
+		mask_data_bytes += item_spec_sizes[i];
+
+	/* specs and masks have the same size */
+	spec_data_bytes = mask_data_bytes;
+
+	conf_data_bytes = 0;
+	for (i = 0; i < n_actions; i++)
+		conf_data_bytes += action_conf_sizes[i];
+
+	/* Compute per-slot layout sizes (+ 1 for END sentinel) */
+	items_array_bytes = n_items * sizeof(struct rte_flow_item);
+	actions_array_bytes = n_actions * sizeof(struct rte_flow_action);
+
+	slot_size = RTE_ALIGN_CEIL(items_array_bytes + actions_array_bytes + spec_data_bytes +
+					   conf_data_bytes,
+				   RTE_CACHE_LINE_SIZE);
+
+	num_slots = queue_size * nb_queues;
+
+	/* Store layout info */
+	res->slot_size = slot_size;
+	res->slots_per_queue = queue_size;
+	res->nb_queues = nb_queues;
+	res->n_items = n_items;
+	res->n_actions = n_actions;
+
+	/* Allocate shared masks */
+	if (mask_data_bytes > 0) {
+		res->shared_masks = aligned_alloc(
+			RTE_CACHE_LINE_SIZE, RTE_ALIGN_CEIL(mask_data_bytes, RTE_CACHE_LINE_SIZE));
+		if (res->shared_masks == NULL) {
+			fprintf(stderr, "Failed to allocate shared masks (%u bytes)\n",
+				mask_data_bytes);
+			return -ENOMEM;
+		}
+		memset(res->shared_masks, 0, mask_data_bytes);
+
+		/* Copy mask data from template pattern */
+		mptr = res->shared_masks;
+		for (i = 0; i < n_items; i++) {
+			if (item_spec_sizes[i] > 0 && pattern[i].mask != NULL)
+				memcpy(mptr, pattern[i].mask, item_spec_sizes[i]);
+			mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
+		}
+	}
+
+	/* Allocate per-slot pool */
+	/* slot_size is already cache-line aligned, so total is a multiple */
+	res->slot_pool = aligned_alloc(RTE_CACHE_LINE_SIZE, (size_t)num_slots * slot_size);
+	if (res->slot_pool == NULL) {
+		fprintf(stderr, "Failed to allocate slot pool (%u slots * %u bytes)\n", num_slots,
+			slot_size);
+		free(res->shared_masks);
+		res->shared_masks = NULL;
+		return -ENOMEM;
+	}
+	memset(res->slot_pool, 0, (size_t)num_slots * slot_size);
+
+	/* Pre-initialize every slot */
+	for (s = 0; s < num_slots; s++) {
+		uint8_t *slot = res->slot_pool + (size_t)s * slot_size;
+		struct rte_flow_item *items = (struct rte_flow_item *)slot;
+		struct rte_flow_action *actions =
+			(struct rte_flow_action *)(slot + items_array_bytes);
+		uint8_t *data = slot + items_array_bytes + actions_array_bytes;
+
+		/* Pre-set items: spec → per-slot data, mask → shared masks */
+		mptr = res->shared_masks;
+		for (i = 0; i < n_items; i++) {
+			items[i].type = pattern[i].type;
+			if (item_spec_sizes[i] > 0) {
+				items[i].spec = data;
+				items[i].mask = mptr;
+				data += item_spec_sizes[i];
+				mptr += item_spec_sizes[i];
+			}
+		}
+		items[n_items].type = RTE_FLOW_ITEM_TYPE_END;
+
+		/* Pre-set actions: conf → per-slot data */
+		for (i = 0; i < n_actions; i++) {
+			actions[i].type = template_actions[i].type;
+			if (action_conf_sizes[i] > 0) {
+				actions[i].conf = data;
+				data += action_conf_sizes[i];
+			}
+		}
+		actions[n_actions].type = RTE_FLOW_ACTION_TYPE_END;
+
+		/* Initialize compound action types (RSS, RAW_ENCAP, etc.) */
+		init_slot_compound_actions(actions, n_actions, action_conf_sizes);
+	}
+
+	/* Allocate and initialize per-queue slot tracking */
+	res->queues =
+		aligned_alloc(RTE_CACHE_LINE_SIZE, nb_queues * sizeof(struct async_flow_queue));
+	if (res->queues == NULL) {
+		fprintf(stderr, "Failed to allocate queue structs (%u queues)\n", nb_queues);
+		free(res->slot_pool);
+		res->slot_pool = NULL;
+		free(res->shared_masks);
+		res->shared_masks = NULL;
+		return -ENOMEM;
+	}
+	memset(res->queues, 0, nb_queues * sizeof(struct async_flow_queue));
+	for (s = 0; s < nb_queues; s++) {
+		res->queues[s].slots = res->slot_pool + (size_t)s * queue_size * slot_size;
+		res->queues[s].head = 0;
+	}
+
+	printf(":: Slot pool: %u slots * %u bytes = %u KB (shared masks: %u bytes)\n", num_slots,
+	       slot_size, (num_slots * slot_size) / 1024, mask_data_bytes);
+
+	return 0;
+}
+
+/*
+ * Hot-path: update per-flow item values through pre-set pointers.
+ * Only IPv4/IPv6 src_addr varies per flow (based on counter).
+ */
+static void
+update_item_values(struct rte_flow_item *items, uint32_t counter)
+{
+	uint8_t i;
+
+	for (i = 0; items[i].type != RTE_FLOW_ITEM_TYPE_END; i++) {
+		switch (items[i].type) {
+		case RTE_FLOW_ITEM_TYPE_IPV4: {
+			struct rte_flow_item_ipv4 *spec = items[i].spec;
+			spec->hdr.src_addr = RTE_BE32(counter);
+			break;
+		}
+		case RTE_FLOW_ITEM_TYPE_IPV6: {
+			struct rte_flow_item_ipv6 *spec = items[i].spec;
+			uint8_t j;
+			for (j = 0; j < 4; j++)
+				spec->hdr.src_addr.a[15 - j] = counter >> (j * 8);
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+/*
+ * Hot-path: update per-flow action values through pre-set pointers.
+ */
+static void
+update_action_values(struct rte_flow_action *actions, uint32_t counter, uint16_t hairpinq,
+		     uint64_t encap_data, uint64_t decap_data, __rte_unused uint8_t core_idx,
+		     bool unique_data, uint8_t rx_queues_count, uint16_t dst_port)
+{
+	uint8_t i;
+
+	for (i = 0; actions[i].type != RTE_FLOW_ACTION_TYPE_END; i++) {
+		switch (actions[i].type) {
+		case RTE_FLOW_ACTION_TYPE_MARK: {
+			struct rte_flow_action_mark *conf = actions[i].conf;
+			conf->id = (counter % 255) + 1;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_QUEUE: {
+			struct rte_flow_action_queue *conf = actions[i].conf;
+			conf->index = hairpinq ? (counter % hairpinq) + rx_queues_count :
+						 counter % rx_queues_count;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_METER: {
+			struct rte_flow_action_meter *conf = actions[i].conf;
+			conf->mtr_id = counter;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RSS: {
+			struct action_rss_data *conf = actions[i].conf;
+			uint16_t q;
+			if (hairpinq) {
+				conf->conf.queue_num = hairpinq;
+				for (q = 0; q < hairpinq; q++)
+					conf->queue[q] = q + rx_queues_count;
+			} else {
+				conf->conf.queue_num = rx_queues_count;
+				for (q = 0; q < rx_queues_count; q++)
+					conf->queue[q] = q;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_MAC_DST: {
+			struct rte_flow_action_set_mac *conf = actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			uint8_t j;
+			for (j = 0; j < RTE_ETHER_ADDR_LEN; j++) {
+				conf->mac_addr[j] = val & 0xff;
+				val >>= 8;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST: {
+			struct rte_flow_action_set_ipv4 *conf = actions[i].conf;
+			uint32_t ip = unique_data ? counter : 1;
+			conf->ipv4_addr = RTE_BE32(ip + 1);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST: {
+			struct rte_flow_action_set_ipv6 *conf = actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			uint8_t j;
+			for (j = 0; j < 16; j++) {
+				conf->ipv6_addr.a[j] = val & 0xff;
+				val >>= 8;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TP_SRC: {
+			struct rte_flow_action_set_tp *conf = actions[i].conf;
+			uint32_t tp = unique_data ? counter : 100;
+			tp = tp % 0xffff;
+			conf->port = RTE_BE16(tp & 0xffff);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TP_DST: {
+			struct rte_flow_action_set_tp *conf = actions[i].conf;
+			uint32_t tp = unique_data ? counter : 100;
+			if (tp > 0xffff)
+				tp >>= 16;
+			conf->port = RTE_BE16(tp & 0xffff);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
+		case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
+		case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
+		case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ: {
+			rte_be32_t *conf = actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			*conf = RTE_BE32(val);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TTL: {
+			struct rte_flow_action_set_ttl *conf = actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			conf->ttl_value = val % 0xff;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP: {
+			struct rte_flow_action_set_dscp *conf = actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			conf->dscp = val % 0xff;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_PORT_ID: {
+			struct rte_flow_action_port_id *conf = actions[i].conf;
+			conf->id = dst_port;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
+			struct action_raw_encap_data *encap = actions[i].conf;
+			uint8_t *header = encap->data;
+			struct rte_ether_hdr eth_hdr;
+			struct rte_ipv4_hdr ipv4_hdr;
+			struct rte_udp_hdr udp_hdr;
+
+			memset(&eth_hdr, 0, sizeof(eth_hdr));
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VLAN))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_VLAN);
+				else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);
+				else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);
+				memcpy(header, &eth_hdr, sizeof(eth_hdr));
+				header += sizeof(eth_hdr);
+			}
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4)) {
+				uint32_t ip_dst = unique_data ? counter : 1;
+				memset(&ipv4_hdr, 0, sizeof(ipv4_hdr));
+				ipv4_hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
+				ipv4_hdr.dst_addr = RTE_BE32(ip_dst);
+				ipv4_hdr.version_ihl = RTE_IPV4_VHL_DEF;
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP))
+					ipv4_hdr.next_proto_id = 17; /* UDP */
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_GRE))
+					ipv4_hdr.next_proto_id = 47; /* GRE */
+				memcpy(header, &ipv4_hdr, sizeof(ipv4_hdr));
+				header += sizeof(ipv4_hdr);
+			}
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP)) {
+				memset(&udp_hdr, 0, sizeof(udp_hdr));
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VXLAN))
+					udp_hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
+				memcpy(header, &udp_hdr, sizeof(udp_hdr));
+				header += sizeof(udp_hdr);
+			}
+			encap->conf.size = header - encap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
+			struct action_raw_decap_data *decap_d = actions[i].conf;
+			uint8_t *header = decap_d->data;
+			struct rte_ether_hdr eth_hdr;
+
+			memset(&eth_hdr, 0, sizeof(eth_hdr));
+			if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
+				if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);
+				else if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);
+				memcpy(header, &eth_hdr, sizeof(eth_hdr));
+				header += sizeof(eth_hdr);
+			}
+			decap_d->conf.size = header - decap_d->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
+			uint8_t *base = actions[i].conf;
+			struct rte_flow_item *vitems =
+				(struct rte_flow_item
+					 *)(base + sizeof(struct rte_flow_action_vxlan_encap));
+			struct rte_flow_item_ipv4 *spec = vitems[1].spec;
+			uint32_t ip_dst = unique_data ? counter : 1;
+			/* vitems[1] is IPV4 */
+			spec->hdr.dst_addr = RTE_BE32(ip_dst);
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+int
+async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,
+		     uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,
+		     uint8_t flow_group, uint32_t rules_count)
+{
+	struct rte_flow_port_info port_info = {0};
+	struct rte_flow_queue_info queue_info = {0};
+	struct rte_flow_error error = {0};
+	struct rte_flow_port_attr port_attr = {0};
+	struct rte_flow_queue_attr queue_attr;
+	const struct rte_flow_queue_attr **queue_attr_list;
+	struct rte_flow_pattern_template_attr pt_attr = {0};
+	struct rte_flow_actions_template_attr at_attr = {0};
+	struct rte_flow_template_table_attr table_attr = {0};
+	struct rte_flow_item pattern[MAX_ITEMS_NUM];
+	struct rte_flow_action actions[MAX_ACTIONS_NUM];
+	struct rte_flow_action action_masks[MAX_ACTIONS_NUM];
+	size_t item_spec_sizes[MAX_ITEMS_NUM];
+	size_t action_conf_sizes[MAX_ACTIONS_NUM];
+	uint32_t n_items, n_actions;
+	struct async_flow_resources *res;
+	bool need_wire_orig_table = false;
+	uint32_t i;
+	int ret;
+
+	if (port_id >= MAX_PORTS)
+		return -1;
+
+	res = &port_resources[port_id];
+	memset(res, 0, sizeof(*res));
+
+	/* Query port flow info */
+	ret = rte_flow_info_get(port_id, &port_info, &queue_info, &error);
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: rte_flow_info_get failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		return ret;
+	}
+
+	if (port_info.max_nb_queues == 0 || queue_info.max_size == 0) {
+		fprintf(stderr, "Port %u: rte_flow_info_get reports that no queues are supported\n",
+			port_id);
+		return -1;
+	}
+
+	/* Limit to device capabilities if reported */
+	if (port_info.max_nb_queues != 0 && port_info.max_nb_queues != UINT32_MAX &&
+	    nb_queues > port_info.max_nb_queues)
+		nb_queues = port_info.max_nb_queues;
+	if (queue_info.max_size != 0 && queue_info.max_size != UINT32_MAX &&
+	    queue_size > queue_info.max_size)
+		queue_size = queue_info.max_size;
+
+	/* Slot ring uses bitmask wrapping, so queue_size must be power of 2 */
+	queue_size = rte_align32prevpow2(queue_size);
+	if (queue_size == 0) {
+		fprintf(stderr, "Port %u: queue_size is 0 after rounding\n", port_id);
+		return -EINVAL;
+	}
+
+	for (i = 0; i < MAX_ATTRS_NUM; i++) {
+		if (flow_attrs[i] == 0)
+			break;
+		if (flow_attrs[i] & INGRESS)
+			pt_attr.ingress = 1;
+		else if (flow_attrs[i] & EGRESS)
+			pt_attr.egress = 1;
+		else if (flow_attrs[i] & TRANSFER)
+			pt_attr.transfer = 1;
+	}
+	/* Enable relaxed matching for better performance */
+	pt_attr.relaxed_matching = 1;
+
+	memset(pattern, 0, sizeof(pattern));
+	memset(actions, 0, sizeof(actions));
+	memset(action_masks, 0, sizeof(action_masks));
+
+	/* Fill templates and gather per-item/action sizes */
+	fill_items_template(pattern, flow_items, 0, 0, item_spec_sizes, &n_items);
+
+	at_attr.ingress = pt_attr.ingress;
+	at_attr.egress = pt_attr.egress;
+	at_attr.transfer = pt_attr.transfer;
+
+	fill_actions_template(actions, action_masks, flow_actions, &port_attr,
+			      &need_wire_orig_table, action_conf_sizes, &n_actions);
+
+	/*
+	 * fill_actions_template count the number of actions that require each kind of object,
+	 * so we multiply by the number of rules to have correct number
+	 */
+	port_attr.nb_counters *= rules_count;
+	port_attr.nb_aging_objects *= rules_count;
+	port_attr.nb_meters *= rules_count;
+	port_attr.nb_conn_tracks *= rules_count;
+	port_attr.nb_quotas *= rules_count;
+
+	table_attr.flow_attr.group = flow_group;
+	table_attr.flow_attr.priority = 0;
+	table_attr.flow_attr.ingress = pt_attr.ingress;
+	table_attr.flow_attr.egress = pt_attr.egress;
+	table_attr.flow_attr.transfer = pt_attr.transfer;
+	table_attr.nb_flows = rules_count;
+
+	if (pt_attr.transfer && need_wire_orig_table)
+		table_attr.specialize = RTE_FLOW_TABLE_SPECIALIZE_TRANSFER_WIRE_ORIG;
+
+	queue_attr_list = malloc(sizeof(*queue_attr_list) * nb_queues);
+	if (queue_attr_list == NULL) {
+		fprintf(stderr, "Port %u: failed to allocate queue_attr_list\n", port_id);
+		return -ENOMEM;
+	}
+
+	queue_attr.size = queue_size;
+	for (i = 0; i < nb_queues; i++)
+		queue_attr_list[i] = &queue_attr;
+
+	ret = rte_flow_configure(port_id, &port_attr, nb_queues, queue_attr_list, &error);
+
+	free(queue_attr_list);
+
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: rte_flow_configure failed (ret=%d, type=%d): %s\n",
+			port_id, ret, error.type, error.message ? error.message : "(no message)");
+		return ret;
+	}
+
+	/* Create pattern template */
+	res->pattern_template =
+		rte_flow_pattern_template_create(port_id, &pt_attr, pattern, &error);
+	if (res->pattern_template == NULL) {
+		fprintf(stderr, "Port %u: pattern template create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		return -1;
+	}
+
+	/* Create actions template */
+	res->actions_template =
+		rte_flow_actions_template_create(port_id, &at_attr, actions, action_masks, &error);
+	if (res->actions_template == NULL) {
+		fprintf(stderr, "Port %u: actions template create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+		return -1;
+	}
+
+	/* Create template table */
+	res->table = rte_flow_template_table_create(port_id, &table_attr, &res->pattern_template, 1,
+						    &res->actions_template, 1, &error);
+	if (res->table == NULL) {
+		fprintf(stderr, "Port %u: template table create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+		res->actions_template = NULL;
+		return -1;
+	}
+
+	/* Allocate and pre-initialize per-slot flat buffers */
+	ret = init_slot_pool(res, nb_queues, queue_size, pattern, n_items, item_spec_sizes, actions,
+			     n_actions, action_conf_sizes);
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: slot pool init failed\n", port_id);
+		rte_flow_template_table_destroy(port_id, res->table, &error);
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->table = NULL;
+		res->actions_template = NULL;
+		res->pattern_template = NULL;
+		return ret;
+	}
+
+	res->table_capacity = rules_count;
+	res->initialized = true;
+
+	printf(":: Port %u: Async flow engine initialized (queues=%u, queue_size=%u)\n", port_id,
+	       nb_queues, queue_size);
+
+	return 0;
+}
+
+struct rte_flow *
+async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter, uint16_t hairpinq,
+		    uint64_t encap_data, uint64_t decap_data, uint16_t dst_port, uint8_t core_idx,
+		    uint8_t rx_queues_count, bool unique_data, bool postpone,
+		    struct rte_flow_error *error)
+{
+	struct async_flow_resources *res;
+	struct async_flow_queue *q;
+	uint8_t *slot;
+	uint32_t idx, items_array_bytes;
+	struct rte_flow_item *items;
+	struct rte_flow_action *actions;
+	struct rte_flow_op_attr op_attr = {
+		.postpone = postpone,
+	};
+
+	if (port_id >= MAX_PORTS) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Invalid port ID");
+		return NULL;
+	}
+
+	res = &port_resources[port_id];
+	if (!res->initialized) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Async flow resources not initialized");
+		return NULL;
+	}
+
+	if (queue_id >= res->nb_queues) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Invalid queue ID");
+		return NULL;
+	}
+
+	/* Pick the next slot from this queue's ring */
+	q = &res->queues[queue_id];
+	idx = q->head;
+	q->head = (idx + 1) & (res->slots_per_queue - 1);
+	slot = q->slots + (size_t)idx * res->slot_size;
+	items_array_bytes = res->n_items * sizeof(struct rte_flow_item);
+	items = (struct rte_flow_item *)slot;
+	actions = (struct rte_flow_action *)(slot + items_array_bytes);
+
+	/* Update only per-flow varying values */
+	update_item_values(items, counter);
+	update_action_values(actions, counter, hairpinq, encap_data, decap_data, core_idx,
+			     unique_data, rx_queues_count, dst_port);
+
+	return rte_flow_async_create(port_id, queue_id, &op_attr, res->table, items, 0, actions, 0,
+				     NULL, error);
+}
+
+void
+async_flow_cleanup_port(uint16_t port_id)
+{
+	struct async_flow_resources *res;
+	struct rte_flow_error error;
+	struct rte_flow_op_result results[64];
+	int ret, i;
+
+	if (port_id >= MAX_PORTS)
+		return;
+
+	res = &port_resources[port_id];
+	if (!res->initialized)
+		return;
+
+	/* Drain any pending async completions from flow flush */
+	for (i = 0; i < DRAIN_MAX_ITERATIONS; i++) {
+		rte_flow_push(port_id, 0, &error);
+		ret = rte_flow_pull(port_id, 0, results, 64, &error);
+		if (ret <= 0)
+			break;
+	}
+
+	if (res->table != NULL) {
+		rte_flow_template_table_destroy(port_id, res->table, &error);
+		res->table = NULL;
+	}
+
+	if (res->actions_template != NULL) {
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		res->actions_template = NULL;
+	}
+
+	if (res->pattern_template != NULL) {
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+	}
+
+	free(res->queues);
+	res->queues = NULL;
+	free(res->slot_pool);
+	res->slot_pool = NULL;
+	free(res->shared_masks);
+	res->shared_masks = NULL;
+
+	res->initialized = false;
+}
diff --git a/app/test-flow-perf/async_flow.h b/app/test-flow-perf/async_flow.h
new file mode 100644
index 0000000000..8c12924bc6
--- /dev/null
+++ b/app/test-flow-perf/async_flow.h
@@ -0,0 +1,54 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2026 Maxime Peim <maxime.peim at gmail.com>
+ *
+ * This file contains the async flow API related definitions
+ * and function declarations.
+ */
+
+#ifndef FLOW_PERF_ASYNC_FLOW
+#define FLOW_PERF_ASYNC_FLOW
+
+#include <rte_flow.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "config.h"
+
+/* Per-queue slot ring — tracks which slot to use next */
+struct async_flow_queue {
+	uint8_t *slots; /* pointer to this queue's region within slot_pool */
+	uint32_t head;	/* next slot index (wraps mod slots_per_queue) */
+};
+
+/* Per-port async flow resources */
+struct async_flow_resources {
+	struct rte_flow_pattern_template *pattern_template;
+	struct rte_flow_actions_template *actions_template;
+	struct rte_flow_template_table *table;
+	uint8_t *slot_pool;    /* flat buffer pool for all slots */
+	uint8_t *shared_masks; /* shared item mask data (one copy for all slots) */
+	struct async_flow_queue *queues;
+	uint32_t slot_size;	  /* bytes per slot (cache-line aligned) */
+	uint32_t slots_per_queue; /* = queue_size */
+	uint32_t nb_queues;
+	uint32_t n_items;   /* item count (excl. END) */
+	uint32_t n_actions; /* action count (excl. END) */
+	uint32_t table_capacity;
+	bool initialized;
+};
+
+/* Initialize async flow engine for a port */
+int async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,
+			 uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,
+			 uint8_t flow_group, uint32_t rules_count);
+
+/* Create a flow rule asynchronously using pre-allocated slot */
+struct rte_flow *async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter,
+				     uint16_t hairpinq, uint64_t encap_data, uint64_t decap_data,
+				     uint16_t dst_port, uint8_t core_idx, uint8_t rx_queues_count,
+				     bool unique_data, bool postpone, struct rte_flow_error *error);
+
+/* Cleanup async flow resources for a port */
+void async_flow_cleanup_port(uint16_t port_id);
+
+#endif /* FLOW_PERF_ASYNC_FLOW */
diff --git a/app/test-flow-perf/items_gen.c b/app/test-flow-perf/items_gen.c
index c740e1838f..58f1c16cf8 100644
--- a/app/test-flow-perf/items_gen.c
+++ b/app/test-flow-perf/items_gen.c
@@ -389,3 +389,61 @@ fill_items(struct rte_flow_item *items,
 
 	items[items_counter].type = RTE_FLOW_ITEM_TYPE_END;
 }
+
+static size_t
+item_spec_size(enum rte_flow_item_type type)
+{
+	switch (type) {
+	case RTE_FLOW_ITEM_TYPE_ETH:
+		return sizeof(struct rte_flow_item_eth);
+	case RTE_FLOW_ITEM_TYPE_VLAN:
+		return sizeof(struct rte_flow_item_vlan);
+	case RTE_FLOW_ITEM_TYPE_IPV4:
+		return sizeof(struct rte_flow_item_ipv4);
+	case RTE_FLOW_ITEM_TYPE_IPV6:
+		return sizeof(struct rte_flow_item_ipv6);
+	case RTE_FLOW_ITEM_TYPE_TCP:
+		return sizeof(struct rte_flow_item_tcp);
+	case RTE_FLOW_ITEM_TYPE_UDP:
+		return sizeof(struct rte_flow_item_udp);
+	case RTE_FLOW_ITEM_TYPE_VXLAN:
+		return sizeof(struct rte_flow_item_vxlan);
+	case RTE_FLOW_ITEM_TYPE_VXLAN_GPE:
+		return sizeof(struct rte_flow_item_vxlan_gpe);
+	case RTE_FLOW_ITEM_TYPE_GRE:
+		return sizeof(struct rte_flow_item_gre);
+	case RTE_FLOW_ITEM_TYPE_GENEVE:
+		return sizeof(struct rte_flow_item_geneve);
+	case RTE_FLOW_ITEM_TYPE_GTP:
+		return sizeof(struct rte_flow_item_gtp);
+	case RTE_FLOW_ITEM_TYPE_META:
+		return sizeof(struct rte_flow_item_meta);
+	case RTE_FLOW_ITEM_TYPE_TAG:
+		return sizeof(struct rte_flow_item_tag);
+	case RTE_FLOW_ITEM_TYPE_ICMP:
+		return sizeof(struct rte_flow_item_icmp);
+	case RTE_FLOW_ITEM_TYPE_ICMP6:
+		return sizeof(struct rte_flow_item_icmp6);
+	default:
+		return 0;
+	}
+}
+
+void
+fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,
+		    uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out)
+{
+	uint32_t count;
+
+	fill_items(items, flow_items, outer_ip_src, core_idx);
+
+	/* Count items before END */
+	for (count = 0; items[count].type != RTE_FLOW_ITEM_TYPE_END; count++) {
+		spec_sizes[count] = item_spec_size(items[count].type);
+		/* For templates, set spec to NULL - only mask matters for template matching */
+		items[count].spec = NULL;
+	}
+
+	/* take END into account */
+	*n_items_out = count + 1;
+}
diff --git a/app/test-flow-perf/items_gen.h b/app/test-flow-perf/items_gen.h
index f4b0e9a981..0987f7be3c 100644
--- a/app/test-flow-perf/items_gen.h
+++ b/app/test-flow-perf/items_gen.h
@@ -15,4 +15,10 @@
 void fill_items(struct rte_flow_item *items, uint64_t *flow_items,
 	uint32_t outer_ip_src, uint8_t core_idx);
 
+/* Fill items template for async flow API (masks only, no spec values).
+ * If spec_sizes is non-NULL, populates per-item spec sizes and n_items_out.
+ */
+void fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,
+			 uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out);
+
 #endif /* FLOW_PERF_ITEMS_GEN */
diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c
index 6636d1517f..bf420b397b 100644
--- a/app/test-flow-perf/main.c
+++ b/app/test-flow-perf/main.c
@@ -37,11 +37,15 @@
 #include <rte_mtr.h>
 #include <rte_os_shim.h>
 
-#include "config.h"
 #include "actions_gen.h"
+#include "async_flow.h"
+#include "config.h"
 #include "flow_gen.h"
+#include "rte_build_config.h"
 
 #define MAX_BATCHES_COUNT          100
+#define MAX_ASYNC_QUEUE_SIZE	     (1 << 14)
+#define MAX_PULL_RETRIES	     (1 << 20)
 #define DEFAULT_RULES_COUNT    4000000
 #define DEFAULT_RULES_BATCH     100000
 #define DEFAULT_GROUP                0
@@ -55,7 +59,6 @@
 #define HAIRPIN_TX_CONF_LOCKED_MEMORY (0x0100)
 #define HAIRPIN_TX_CONF_RTE_MEMORY    (0x0200)
 
-struct rte_flow *flow;
 static uint8_t flow_group;
 
 static uint64_t encap_data;
@@ -81,6 +84,9 @@ static bool enable_fwd;
 static bool unique_data;
 static bool policy_mtr;
 static bool packet_mode;
+static bool async_mode;
+static uint32_t async_queue_size = 1024;
+static uint32_t async_push_batch = 256;
 
 static uint8_t rx_queues_count;
 static uint8_t tx_queues_count;
@@ -598,6 +604,29 @@ usage(char *progname)
 		"Encapped data is fixed with pattern: ether,ipv4,udp,vxlan\n"
 		"With fixed values\n");
 	printf("  --vxlan-decap: add vxlan_decap action to flow actions\n");
+
+	printf("\nAsync flow API options:\n");
+	printf("  --async: enable async flow API mode\n");
+	printf("  --async-queue-size=N: size of each async queue,"
+	       " default is 1024\n");
+	printf("  --async-push-batch=N: flows to batch before push,"
+	       " default is 256\n");
+}
+
+static inline uint32_t
+prev_power_of_two(uint32_t x)
+{
+	uint32_t saved = x;
+	x--;
+	x |= x >> 1;
+	x |= x >> 2;
+	x |= x >> 4;
+	x |= x >> 8;
+	x |= x >> 16;
+	x++;
+	if (x == saved)
+		return x;
+	return x >> 1;
 }
 
 static void
@@ -734,6 +763,9 @@ args_parse(int argc, char **argv)
 		{ "policy-mtr",                 1, 0, 0 },
 		{ "meter-profile",              1, 0, 0 },
 		{ "packet-mode",                0, 0, 0 },
+		{ "async",                      0, 0, 0 },
+		{ "async-queue-size",           1, 0, 0 },
+		{ "async-push-batch",           1, 0, 0 },
 		{ 0, 0, 0, 0 },
 	};
 
@@ -913,8 +945,7 @@ args_parse(int argc, char **argv)
 					rte_exit(EXIT_FAILURE, "Invalid hairpin config mask\n");
 				hairpin_conf_mask = hp_conf;
 			}
-			if (strcmp(lgopts[opt_idx].name,
-					"port-id") == 0) {
+			if (strcmp(lgopts[opt_idx].name, "port-id") == 0) {
 				uint16_t port_idx = 0;
 
 				token = strtok(optarg, ",");
@@ -981,6 +1012,26 @@ args_parse(int argc, char **argv)
 			}
 			if (strcmp(lgopts[opt_idx].name, "packet-mode") == 0)
 				packet_mode = true;
+			if (strcmp(lgopts[opt_idx].name, "async") == 0)
+				async_mode = true;
+			if (strcmp(lgopts[opt_idx].name, "async-queue-size") == 0) {
+				n = atoi(optarg);
+				if (n >= MAX_ASYNC_QUEUE_SIZE)
+					async_queue_size = MAX_ASYNC_QUEUE_SIZE;
+				else if (n > 0)
+					async_queue_size = rte_align32prevpow2(n);
+				else
+					rte_exit(EXIT_FAILURE, "async-queue-size should be > 0\n");
+			}
+			if (strcmp(lgopts[opt_idx].name, "async-push-batch") == 0) {
+				n = atoi(optarg);
+				if (n >= MAX_ASYNC_QUEUE_SIZE >> 1)
+					async_push_batch = MAX_ASYNC_QUEUE_SIZE >> 1;
+				else if (n > 0)
+					async_push_batch = rte_align32prevpow2(n);
+				else
+					rte_exit(EXIT_FAILURE, "async-push-batch should be > 0\n");
+			}
 			break;
 		default:
 			usage(argv[0]);
@@ -1457,10 +1508,10 @@ query_flows(int port_id, uint8_t core_id, struct rte_flow **flows_list)
 	mc_pool.flows_record.query[port_id][core_id] = cpu_time_used;
 }
 
-static struct rte_flow **
-insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
+static void
+insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)
 {
-	struct rte_flow **flows_list;
+	struct rte_flow *flow;
 	struct rte_flow_error error;
 	clock_t start_batch, end_batch;
 	double first_flow_latency;
@@ -1485,8 +1536,7 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 	global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
 	global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
 
-	flows_list = rte_zmalloc("flows_list",
-		(sizeof(struct rte_flow *) * (rules_count_per_core + 1)), 0);
+	flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));
 	if (flows_list == NULL)
 		rte_exit(EXIT_FAILURE, "No Memory available!\n");
 
@@ -1524,6 +1574,11 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 			core_id, rx_queues_count,
 			unique_data, max_priority, &error);
 
+		if (!flow) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
+		}
+
 		if (!counter) {
 			first_flow_latency = (double) (rte_get_timer_cycles() - start_batch);
 			first_flow_latency /= rte_get_timer_hz();
@@ -1537,11 +1592,6 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 		if (force_quit)
 			counter = end_counter;
 
-		if (!flow) {
-			print_flow_error(error);
-			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
-		}
-
 		flows_list[flow_index++] = flow;
 
 		/*
@@ -1575,7 +1625,204 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 		port_id, core_id, rules_count_per_core, cpu_time_used);
 
 	mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
-	return flows_list;
+}
+
+static uint32_t push_counter[RTE_MAX_LCORE];
+
+static inline int
+push_pull_flows_async(int port_id, int queue_id, int core_id, uint32_t enqueued, bool empty,
+		      bool check_op_status, struct rte_flow_error *error)
+{
+	static struct rte_flow_op_result results[RTE_MAX_LCORE][MAX_ASYNC_QUEUE_SIZE];
+	uint32_t to_pull = (empty || async_push_batch > enqueued) ? enqueued : async_push_batch;
+	uint32_t pulled_complete = 0;
+	uint32_t retries = 0;
+	int pulled, i;
+	int ret = 0;
+
+	/* Push periodically to give HW work to do */
+	ret = rte_flow_push(port_id, queue_id, error);
+	if (ret)
+		return ret;
+	push_counter[core_id]++;
+
+	/* Check if queue is getting full, if so push and drain completions */
+	if (!empty && push_counter[core_id] == 1)
+		return 0;
+
+	while (to_pull > 0) {
+		pulled = rte_flow_pull(port_id, queue_id, results[core_id], to_pull, error);
+		if (pulled < 0) {
+			return -1;
+		} else if (pulled == 0) {
+			if (++retries > MAX_PULL_RETRIES) {
+				rte_flow_error_set(error, ETIMEDOUT,
+						   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+						   "Timeout waiting for async completions");
+				return -1;
+			}
+			rte_pause();
+			continue;
+		}
+		retries = 0;
+
+		to_pull -= pulled;
+		pulled_complete += pulled;
+		if (!check_op_status)
+			continue;
+
+		for (i = 0; i < pulled; i++) {
+			if (results[core_id][i].status != RTE_FLOW_OP_SUCCESS) {
+				rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
+						   NULL, "Some flow rule insertion failed");
+				return -1;
+			}
+		}
+	}
+
+	return pulled_complete;
+}
+
+static void
+insert_flows_async(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)
+{
+	struct rte_flow *flow;
+	struct rte_flow_error error;
+	clock_t start_batch, end_batch;
+	double first_flow_latency;
+	double cpu_time_used;
+	double insertion_rate;
+	double cpu_time_per_batch[MAX_BATCHES_COUNT] = {0};
+	double delta;
+	uint32_t flow_index;
+	uint32_t counter, batch_counter, start_counter = 0, end_counter;
+	int rules_batch_idx;
+	int rules_count_per_core;
+	uint32_t enqueued = 0;
+	uint32_t queue_id = core_id;
+	bool first_batch = true;
+	int pulled;
+
+	rules_count_per_core = rules_count / mc_pool.cores_count;
+
+	if (async_push_batch > async_queue_size >> 1)
+		async_push_batch = async_queue_size >> 1;
+
+	/* Set boundaries of rules for each core. */
+	if (core_id)
+		start_counter = core_id * rules_count_per_core;
+	end_counter = (core_id + 1) * rules_count_per_core;
+
+	cpu_time_used = 0;
+	flow_index = 0;
+	push_counter[core_id] = 0;
+
+	if (flow_group > 0 && core_id == 0) {
+		/*
+		 * Create global rule to jump into flow_group,
+		 * this way the app will avoid the default rules.
+		 *
+		 * This rule will be created only once.
+		 *
+		 * Global rule:
+		 * group 0 eth / end actions jump group <flow_group>
+		 */
+
+		uint64_t global_items[MAX_ITEMS_NUM] = {0};
+		uint64_t global_actions[MAX_ACTIONS_NUM] = {0};
+		global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
+		global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
+		flow = generate_flow(port_id, 0, flow_attrs, global_items, global_actions,
+				     flow_group, 0, 0, 0, 0, dst_port_id, core_id, rx_queues_count,
+				     unique_data, max_priority, &error);
+
+		if (flow == NULL) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
+		}
+		flows_list[flow_index++] = flow;
+	}
+
+	start_batch = rte_get_timer_cycles();
+	for (counter = start_counter; counter < end_counter;) {
+		/* batch adding flow rules, this avoids unnecessary checks for push/pull */
+		for (batch_counter = 0; batch_counter < async_push_batch && counter < end_counter;
+		     batch_counter++, counter++) {
+			/* Create flow with postpone=true to batch operations */
+			flow = async_generate_flow(port_id, queue_id, counter, hairpin_queues_num,
+						   encap_data, decap_data, dst_port_id, core_id,
+						   rx_queues_count, unique_data, true, &error);
+
+			if (!flow) {
+				print_flow_error(error);
+				rte_exit(EXIT_FAILURE, "Error in creating async flow\n");
+			}
+
+			if (force_quit)
+				break;
+
+			flows_list[flow_index++] = flow;
+			enqueued++;
+
+			/*
+			 * Save the insertion rate for rules batch.
+			 * Check if the insertion reached the rules
+			 * patch counter, then save the insertion rate
+			 * for this batch.
+			 */
+			if (!((counter + 1) % rules_batch)) {
+				end_batch = rte_get_timer_cycles();
+				delta = (double)(end_batch - start_batch);
+				rules_batch_idx = ((counter + 1) / rules_batch) - 1;
+				cpu_time_per_batch[rules_batch_idx] = delta / rte_get_timer_hz();
+				cpu_time_used += cpu_time_per_batch[rules_batch_idx];
+				start_batch = rte_get_timer_cycles();
+			}
+		}
+
+		pulled = push_pull_flows_async(port_id, queue_id, core_id, enqueued, false, true,
+					       &error);
+		if (pulled < 0) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error push/pull async operations\n");
+		}
+
+		enqueued -= pulled;
+
+		if (first_batch) {
+			first_flow_latency = (double)(rte_get_timer_cycles() - start_batch);
+			first_flow_latency /= rte_get_timer_hz();
+			/* In millisecond */
+			first_flow_latency *= 1000;
+			printf(":: First Flow Batch Latency (Async) :: Port %d :: First batch (%u) "
+			       "installed in %f milliseconds\n",
+			       port_id, async_push_batch, first_flow_latency);
+			first_batch = false;
+		}
+	}
+
+	if (push_pull_flows_async(port_id, queue_id, core_id, enqueued, true, true, &error) < 0) {
+		print_flow_error(error);
+		rte_exit(EXIT_FAILURE, "Error final push/pull async operations\n");
+	}
+
+	/* Print insertion rates for all batches */
+	if (dump_iterations)
+		print_rules_batches(cpu_time_per_batch);
+
+	printf(":: Port %d :: Core %d boundaries (Async) :: start @[%d] - end @[%d]\n", port_id,
+	       core_id, start_counter, end_counter - 1);
+
+	/* Insertion rate for all rules in one core */
+	if (cpu_time_used > 0) {
+		insertion_rate = ((double)rules_count_per_core / cpu_time_used) / 1000;
+		printf(":: Port %d :: Core %d :: Async rules insertion rate -> %f K Rule/Sec\n",
+		       port_id, core_id, insertion_rate);
+	}
+	printf(":: Port %d :: Core %d :: The time for creating %d async rules is %f seconds\n",
+	       port_id, core_id, rules_count_per_core, cpu_time_used);
+
+	mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
 }
 
 static void
@@ -1585,12 +1832,18 @@ flows_handler(uint8_t core_id)
 	uint16_t port_idx = 0;
 	uint16_t nr_ports;
 	int port_id;
+	int rules_count_per_core;
 
 	nr_ports = rte_eth_dev_count_avail();
 
 	if (rules_batch > rules_count)
 		rules_batch = rules_count;
 
+	rules_count_per_core = rules_count / mc_pool.cores_count;
+	flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));
+	if (flows_list == NULL)
+		rte_exit(EXIT_FAILURE, "No Memory available!\n");
+
 	printf(":: Rules Count per port: %d\n\n", rules_count);
 
 	for (port_id = 0; port_id < nr_ports; port_id++) {
@@ -1602,10 +1855,10 @@ flows_handler(uint8_t core_id)
 		mc_pool.last_alloc[core_id] = (int64_t)dump_socket_mem(stdout);
 		if (has_meter())
 			meters_handler(port_id, core_id, METER_CREATE);
-		flows_list = insert_flows(port_id, core_id,
-						dst_ports[port_idx++]);
-		if (flows_list == NULL)
-			rte_exit(EXIT_FAILURE, "Error: Insertion Failed!\n");
+		if (async_mode)
+			insert_flows_async(port_id, core_id, dst_ports[port_idx++], flows_list);
+		else
+			insert_flows(port_id, core_id, dst_ports[port_idx++], flows_list);
 		mc_pool.current_alloc[core_id] = (int64_t)dump_socket_mem(stdout);
 
 		if (query_flag)
@@ -2212,6 +2465,16 @@ init_port(void)
 			}
 		}
 
+		/* Configure async flow engine before device start */
+		if (async_mode) {
+			ret = async_flow_init_port(port_id, mc_pool.cores_count, async_queue_size,
+						   flow_items, flow_actions, flow_attrs, flow_group,
+						   rules_count);
+			if (ret != 0)
+				rte_exit(EXIT_FAILURE, "Failed to init async flow on port %d\n",
+					 port_id);
+		}
+
 		ret = rte_eth_dev_start(port_id);
 		if (ret < 0)
 			rte_exit(EXIT_FAILURE,
@@ -2291,6 +2554,8 @@ main(int argc, char **argv)
 
 	RTE_ETH_FOREACH_DEV(port) {
 		rte_flow_flush(port, &error);
+		if (async_mode)
+			async_flow_cleanup_port(port);
 		if (rte_eth_dev_stop(port) != 0)
 			printf("Failed to stop device on port %u\n", port);
 		rte_eth_dev_close(port);
diff --git a/app/test-flow-perf/meson.build b/app/test-flow-perf/meson.build
index e101449e32..70d8671a54 100644
--- a/app/test-flow-perf/meson.build
+++ b/app/test-flow-perf/meson.build
@@ -1,8 +1,15 @@
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2020 Mellanox Technologies, Ltd
 
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+    subdir_done()
+endif
+
 sources = files(
         'actions_gen.c',
+        'async_flow.c',
         'flow_gen.c',
         'items_gen.c',
         'main.c',
-- 
2.43.0



More information about the dev mailing list