<div dir="ltr">Hi Stephen,<br><br>Thanks for the review!<br>Some of  the AI generated comments were indeed issues, but already there before my changes (the global flow variable for example).<div>So I updated my changes to fix those nevertheless.</div><div><br></div><div>Let me know if any other changes are needed.</div><div>Also, I have made improvement by pre-allocating a memory pool for async flows:</div><div> - it avoid every in-flight flow in the queue to point to the same items / actions when using static variables</div><div> - it allows better locality and cache usage</div><div>With this change I was able to double the number of flows I can insert on a BF3.</div></div><br><div class="gmail_quote gmail_quote_container"><div dir="ltr" class="gmail_attr">On Mon, Mar 2, 2026 at 12:29 AM Maxime Peim <<a href="mailto:maxime.peim@gmail.com">maxime.peim@gmail.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">Add async flow API mode to test-flow-perf application for improved<br>
flow rule insertion performance. The async API allows batching flow<br>
rule creation operations and processing completions in bulk, reducing<br>
per-rule overhead.<br>
<br>
New command line options:<br>
  --async: enable async flow API mode<br>
  --async-queue-size=N: size of async queues (default: 1024)<br>
  --async-push-batch=N: flows to batch before push (default: 256)<br>
<br>
Signed-off-by: Maxime Peim <<a href="mailto:maxime.peim@gmail.com" target="_blank">maxime.peim@gmail.com</a>><br>
---<br>
v2:<br>
  - Replace per-flow stack allocation with pre-allocated slot pool;<br>
    flat buffers are initialized once at init time and the hot path<br>
    only patches per-flow item/action values into a pre-set slot<br>
  - Fix alloca misuse: use heap allocation for queue_attr_list, round<br>
    queue_size to power of 2 for bitmask wrapping, add bounds checks<br>
  - Fix race on file-scope flow variable, premature latency<br>
    measurement, and integer division in rate calculation<br>
  - Drop unrelated lgopts reformatting<br>
  - Use malloc instead of rte_zmalloc for non-dataplane allocations<br>
  - Various robustness and style fixes<br>
<br>
 app/test-flow-perf/actions_gen.c | 281 +++++++++++-<br>
 app/test-flow-perf/actions_gen.h |  31 ++<br>
 app/test-flow-perf/async_flow.c  | 761 +++++++++++++++++++++++++++++++<br>
 app/test-flow-perf/async_flow.h  |  54 +++<br>
 app/test-flow-perf/items_gen.c   |  58 +++<br>
 app/test-flow-perf/items_gen.h   |   6 +<br>
 app/test-flow-perf/main.c        | 302 +++++++++++-<br>
 app/test-flow-perf/meson.build   |   1 +<br>
 8 files changed, 1454 insertions(+), 40 deletions(-)<br>
 create mode 100644 app/test-flow-perf/async_flow.c<br>
 create mode 100644 app/test-flow-perf/async_flow.h<br>
<br>
diff --git a/app/test-flow-perf/actions_gen.c b/app/test-flow-perf/actions_gen.c<br>
index 9d102e3af4..2b8edd50c8 100644<br>
--- a/app/test-flow-perf/actions_gen.c<br>
+++ b/app/test-flow-perf/actions_gen.c<br>
@@ -36,27 +36,7 @@ struct additional_para {<br>
        bool unique_data;<br>
 };<br>
<br>
-/* Storage for struct rte_flow_action_raw_encap including external data. */<br>
-struct action_raw_encap_data {<br>
-       struct rte_flow_action_raw_encap conf;<br>
-       uint8_t data[128];<br>
-       uint8_t preserve[128];<br>
-       uint16_t idx;<br>
-};<br>
-<br>
-/* Storage for struct rte_flow_action_raw_decap including external data. */<br>
-struct action_raw_decap_data {<br>
-       struct rte_flow_action_raw_decap conf;<br>
-       uint8_t data[128];<br>
-       uint16_t idx;<br>
-};<br>
-<br>
-/* Storage for struct rte_flow_action_rss including external data. */<br>
-struct action_rss_data {<br>
-       struct rte_flow_action_rss conf;<br>
-       uint8_t key[40];<br>
-       uint16_t queue[128];<br>
-};<br>
+/* Compound action data structs defined in actions_gen.h */<br>
<br>
 static void<br>
 add_mark(struct rte_flow_action *actions,<br>
@@ -1165,3 +1145,262 @@ fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,<br>
        free(queues);<br>
        free(hairpin_queues);<br>
 }<br>
+<br>
+static size_t<br>
+action_conf_size(enum rte_flow_action_type type)<br>
+{<br>
+       switch (type) {<br>
+       case RTE_FLOW_ACTION_TYPE_MARK:<br>
+               return sizeof(struct rte_flow_action_mark);<br>
+       case RTE_FLOW_ACTION_TYPE_QUEUE:<br>
+               return sizeof(struct rte_flow_action_queue);<br>
+       case RTE_FLOW_ACTION_TYPE_JUMP:<br>
+               return sizeof(struct rte_flow_action_jump);<br>
+       case RTE_FLOW_ACTION_TYPE_RSS:<br>
+               return sizeof(struct action_rss_data);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_META:<br>
+               return sizeof(struct rte_flow_action_set_meta);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_TAG:<br>
+               return sizeof(struct rte_flow_action_set_tag);<br>
+       case RTE_FLOW_ACTION_TYPE_PORT_ID:<br>
+               return sizeof(struct rte_flow_action_port_id);<br>
+       case RTE_FLOW_ACTION_TYPE_COUNT:<br>
+               return sizeof(struct rte_flow_action_count);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:<br>
+       case RTE_FLOW_ACTION_TYPE_SET_MAC_DST:<br>
+               return sizeof(struct rte_flow_action_set_mac);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST:<br>
+               return sizeof(struct rte_flow_action_set_ipv4);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST:<br>
+               return sizeof(struct rte_flow_action_set_ipv6);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_TP_SRC:<br>
+       case RTE_FLOW_ACTION_TYPE_SET_TP_DST:<br>
+               return sizeof(struct rte_flow_action_set_tp);<br>
+       case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:<br>
+       case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:<br>
+       case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:<br>
+       case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ:<br>
+               return sizeof(rte_be32_t);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_TTL:<br>
+               return sizeof(struct rte_flow_action_set_ttl);<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:<br>
+       case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP:<br>
+               return sizeof(struct rte_flow_action_set_dscp);<br>
+       case RTE_FLOW_ACTION_TYPE_METER:<br>
+               return sizeof(struct rte_flow_action_meter);<br>
+       case RTE_FLOW_ACTION_TYPE_RAW_ENCAP:<br>
+               return sizeof(struct action_raw_encap_data);<br>
+       case RTE_FLOW_ACTION_TYPE_RAW_DECAP:<br>
+               return sizeof(struct action_raw_decap_data);<br>
+       case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP:<br>
+               return sizeof(struct rte_flow_action_vxlan_encap) +<br>
+                      5 * sizeof(struct rte_flow_item) + sizeof(struct rte_flow_item_eth) +<br>
+                      sizeof(struct rte_flow_item_ipv4) + sizeof(struct rte_flow_item_udp) +<br>
+                      sizeof(struct rte_flow_item_vxlan);<br>
+       case RTE_FLOW_ACTION_TYPE_MODIFY_FIELD:<br>
+               return sizeof(struct rte_flow_action_modify_field);<br>
+       /* Zero-conf types */<br>
+       case RTE_FLOW_ACTION_TYPE_DROP:<br>
+       case RTE_FLOW_ACTION_TYPE_FLAG:<br>
+       case RTE_FLOW_ACTION_TYPE_DEC_TTL:<br>
+       case RTE_FLOW_ACTION_TYPE_VXLAN_DECAP:<br>
+               return 0;<br>
+       default:<br>
+               return 0;<br>
+       }<br>
+}<br>
+<br>
+void<br>
+fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,<br>
+                     uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,<br>
+                     bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out)<br>
+{<br>
+       uint8_t actions_counter = 0;<br>
+       uint8_t i, j;<br>
+<br>
+       *need_wire_orig_table = false;<br>
+       memset(port_attr, 0, sizeof(*port_attr));<br>
+<br>
+       /* Static configurations for actions that need them in templates */<br>
+       static struct rte_flow_action_mark mark_conf = {<br>
+               .id = 1,<br>
+       };<br>
+       static struct rte_flow_action_queue queue_conf = {<br>
+               .index = 0,<br>
+       };<br>
+       static struct rte_flow_action_port_id port_id_conf = {<br>
+               .id = 0,<br>
+       };<br>
+       static struct rte_flow_action_jump jump_conf = {<br>
+               .group = 1,<br>
+       };<br>
+       static struct rte_flow_action_modify_field set_meta_conf = {<br>
+               .operation = RTE_FLOW_MODIFY_SET,<br>
+               .dst = {.field = RTE_FLOW_FIELD_META},<br>
+               .src =<br>
+                       {<br>
+                               .field = RTE_FLOW_FIELD_VALUE,<br>
+                               .value = {0, 0, 0, META_DATA},<br>
+                       },<br>
+               .width = 32,<br>
+       };<br>
+<br>
+       /* Static mask configurations for each action type */<br>
+       static struct rte_flow_action_mark mark_mask = {<br>
+               .id = UINT32_MAX,<br>
+       };<br>
+       static struct rte_flow_action_queue queue_mask = {<br>
+               .index = UINT16_MAX,<br>
+       };<br>
+       static struct rte_flow_action_jump jump_mask = {<br>
+               .group = UINT32_MAX,<br>
+       };<br>
+       static struct rte_flow_action_rss rss_mask = {<br>
+               .level = UINT32_MAX,<br>
+               .types = UINT64_MAX,<br>
+       };<br>
+       static struct rte_flow_action_set_meta set_meta_mask = {<br>
+               .data = UINT32_MAX,<br>
+               .mask = UINT32_MAX,<br>
+       };<br>
+       static struct rte_flow_action_set_tag set_tag_mask = {<br>
+               .data = UINT32_MAX,<br>
+               .mask = UINT32_MAX,<br>
+               .index = UINT8_MAX,<br>
+       };<br>
+       static struct rte_flow_action_port_id port_id_mask = {<br>
+               .id = UINT32_MAX,<br>
+       };<br>
+       static struct rte_flow_action_count count_mask;<br>
+       static struct rte_flow_action_set_mac set_mac_mask = {<br>
+               .mac_addr = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff},<br>
+       };<br>
+       static struct rte_flow_action_set_ipv4 set_ipv4_mask = {<br>
+               .ipv4_addr = UINT32_MAX,<br>
+       };<br>
+       static struct rte_flow_action_set_ipv6 set_ipv6_mask = {<br>
+               .ipv6_addr.a = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,<br>
+                               0xff, 0xff, 0xff, 0xff, 0xff}};<br>
+       static struct rte_flow_action_set_tp set_tp_mask = {<br>
+               .port = UINT16_MAX,<br>
+       };<br>
+       static rte_be32_t tcp_seq_ack_mask = UINT32_MAX;<br>
+       static struct rte_flow_action_set_ttl set_ttl_mask = {<br>
+               .ttl_value = UINT8_MAX,<br>
+       };<br>
+       static struct rte_flow_action_set_dscp set_dscp_mask = {<br>
+               .dscp = UINT8_MAX,<br>
+       };<br>
+       static struct rte_flow_action_meter meter_mask = {<br>
+               .mtr_id = UINT32_MAX,<br>
+       };<br>
+<br>
+       static const struct {<br>
+               uint64_t flow_mask;<br>
+               enum rte_flow_action_type type;<br>
+               const void *action_conf;<br>
+               const void *action_mask;<br>
+               const bool need_wire_orig_table;<br>
+       } template_actions[] = {<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MARK), RTE_FLOW_ACTION_TYPE_MARK, &mark_conf,<br>
+                &mark_mask, true},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_COUNT), RTE_FLOW_ACTION_TYPE_COUNT, NULL,<br>
+                &count_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MODIFY_FIELD),<br>
+                RTE_FLOW_ACTION_TYPE_MODIFY_FIELD, &set_meta_conf, &set_meta_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TAG), RTE_FLOW_ACTION_TYPE_SET_TAG, NULL,<br>
+                &set_tag_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_FLAG), RTE_FLOW_ACTION_TYPE_FLAG, NULL, NULL,<br>
+                false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_SRC),<br>
+                RTE_FLOW_ACTION_TYPE_SET_MAC_SRC, NULL, &set_mac_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_DST),<br>
+                RTE_FLOW_ACTION_TYPE_SET_MAC_DST, NULL, &set_mac_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC, NULL, &set_ipv4_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DST),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV4_DST, NULL, &set_ipv4_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC, NULL, &set_ipv6_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DST),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV6_DST, NULL, &set_ipv6_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_SRC), RTE_FLOW_ACTION_TYPE_SET_TP_SRC,<br>
+                NULL, &set_tp_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_DST), RTE_FLOW_ACTION_TYPE_SET_TP_DST,<br>
+                NULL, &set_tp_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_ACK),<br>
+                RTE_FLOW_ACTION_TYPE_INC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK),<br>
+                RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ),<br>
+                RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ),<br>
+                RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TTL), RTE_FLOW_ACTION_TYPE_SET_TTL, NULL,<br>
+                &set_ttl_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TTL), RTE_FLOW_ACTION_TYPE_DEC_TTL, NULL,<br>
+                NULL, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP, NULL, &set_dscp_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP),<br>
+                RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP, NULL, &set_dscp_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_QUEUE), RTE_FLOW_ACTION_TYPE_QUEUE,<br>
+                &queue_conf, &queue_mask, true},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_RSS), RTE_FLOW_ACTION_TYPE_RSS, NULL,<br>
+                &rss_mask, true},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_JUMP), RTE_FLOW_ACTION_TYPE_JUMP, &jump_conf,<br>
+                &jump_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_PORT_ID), RTE_FLOW_ACTION_TYPE_PORT_ID,<br>
+                &port_id_conf, &port_id_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DROP), RTE_FLOW_ACTION_TYPE_DROP, NULL, NULL,<br>
+                false},<br>
+               {HAIRPIN_QUEUE_ACTION, RTE_FLOW_ACTION_TYPE_QUEUE, &queue_conf, &queue_mask, false},<br>
+               {HAIRPIN_RSS_ACTION, RTE_FLOW_ACTION_TYPE_RSS, NULL, &rss_mask, false},<br>
+               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_METER), RTE_FLOW_ACTION_TYPE_METER, NULL,<br>
+                &meter_mask, false},<br>
+       };<br>
+<br>
+       for (j = 0; j < MAX_ACTIONS_NUM; j++) {<br>
+               if (flow_actions[j] == 0)<br>
+                       break;<br>
+               for (i = 0; i < RTE_DIM(template_actions); i++) {<br>
+                       if ((flow_actions[j] & template_actions[i].flow_mask) == 0)<br>
+                               continue;<br>
+<br>
+                       switch (template_actions[i].type) {<br>
+                       case RTE_FLOW_ACTION_TYPE_COUNT:<br>
+                               port_attr->nb_counters++;<br>
+                               break;<br>
+                       case RTE_FLOW_ACTION_TYPE_AGE:<br>
+                               port_attr->nb_aging_objects++;<br>
+                               break;<br>
+                       case RTE_FLOW_ACTION_TYPE_METER:<br>
+                               port_attr->nb_meters++;<br>
+                               break;<br>
+                       case RTE_FLOW_ACTION_TYPE_CONNTRACK:<br>
+                               port_attr->nb_conn_tracks++;<br>
+                               break;<br>
+                       case RTE_FLOW_ACTION_TYPE_QUOTA:<br>
+                               port_attr->nb_quotas++;<br>
+                       default:;<br>
+                       }<br>
+<br>
+                       actions[actions_counter].type = template_actions[i].type;<br>
+                       actions[actions_counter].conf = template_actions[i].action_conf;<br>
+                       masks[actions_counter].type = template_actions[i].type;<br>
+                       masks[actions_counter].conf = template_actions[i].action_mask;<br>
+                       conf_sizes[actions_counter] = action_conf_size(template_actions[i].type);<br>
+                       *need_wire_orig_table |= template_actions[i].need_wire_orig_table;<br>
+                       actions_counter++;<br>
+                       break;<br>
+               }<br>
+       }<br>
+<br>
+       actions[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;<br>
+       masks[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;<br>
+<br>
+       /* take END into account */<br>
+       *n_actions_out = actions_counter + 1;<br>
+}<br>
diff --git a/app/test-flow-perf/actions_gen.h b/app/test-flow-perf/actions_gen.h<br>
index 9e13b164f9..3ac0ffed59 100644<br>
--- a/app/test-flow-perf/actions_gen.h<br>
+++ b/app/test-flow-perf/actions_gen.h<br>
@@ -17,9 +17,40 @@<br>
 #define RTE_VXLAN_GPE_UDP_PORT 250<br>
 #define RTE_GENEVE_UDP_PORT 6081<br>
<br>
+/* Compound action data structures (needed by async_flow.c for slot init) */<br>
+<br>
+/* Storage for struct rte_flow_action_raw_encap including external data. */<br>
+struct action_raw_encap_data {<br>
+       struct rte_flow_action_raw_encap conf;<br>
+       uint8_t data[128];<br>
+       uint8_t preserve[128];<br>
+       uint16_t idx;<br>
+};<br>
+<br>
+/* Storage for struct rte_flow_action_raw_decap including external data. */<br>
+struct action_raw_decap_data {<br>
+       struct rte_flow_action_raw_decap conf;<br>
+       uint8_t data[128];<br>
+       uint16_t idx;<br>
+};<br>
+<br>
+/* Storage for struct rte_flow_action_rss including external data. */<br>
+struct action_rss_data {<br>
+       struct rte_flow_action_rss conf;<br>
+       uint8_t key[40];<br>
+       uint16_t queue[128];<br>
+};<br>
+<br>
 void fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,<br>
        uint32_t counter, uint16_t next_table, uint16_t hairpinq,<br>
        uint64_t encap_data, uint64_t decap_data, uint8_t core_idx,<br>
        bool unique_data, uint8_t rx_queues_count, uint16_t dst_port);<br>
<br>
+/* Fill actions template for async flow API (types only, no values).<br>
+ * If conf_sizes is non-NULL, populates per-action conf sizes and n_actions_out.<br>
+ */<br>
+void fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,<br>
+                          uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,<br>
+                          bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out);<br>
+<br>
 #endif /* FLOW_PERF_ACTION_GEN */<br>
diff --git a/app/test-flow-perf/async_flow.c b/app/test-flow-perf/async_flow.c<br>
new file mode 100644<br>
index 0000000000..ae5a922856<br>
--- /dev/null<br>
+++ b/app/test-flow-perf/async_flow.c<br>
@@ -0,0 +1,761 @@<br>
+/* SPDX-License-Identifier: BSD-3-Clause<br>
+ * Copyright 2026 Maxime Peim <<a href="mailto:maxime.peim@gmail.com" target="_blank">maxime.peim@gmail.com</a>><br>
+ *<br>
+ * This file contains the async flow API implementation<br>
+ * for the flow-perf application.<br>
+ */<br>
+<br>
+#include <stdio.h><br>
+#include <stdlib.h><br>
+#include <string.h><br>
+<br>
+#include <rte_bitops.h><br>
+#include <rte_common.h><br>
+#include <rte_ethdev.h><br>
+#include <rte_flow.h><br>
+#include <rte_vxlan.h><br>
+<br>
+#include "actions_gen.h"<br>
+#include "async_flow.h"<br>
+#include "flow_gen.h"<br>
+#include "items_gen.h"<br>
+<br>
+/* Max iterations when draining pending async completions during cleanup */<br>
+#define DRAIN_MAX_ITERATIONS 100<br>
+<br>
+/* Per-port async flow resources */<br>
+static struct async_flow_resources port_resources[MAX_PORTS];<br>
+<br>
+/*<br>
+ * Initialize compound action types within a pre-allocated slot.<br>
+ * Called once per slot during pool init to set up internal pointers<br>
+ * for RSS, RAW_ENCAP, RAW_DECAP and VXLAN_ENCAP actions.<br>
+ */<br>
+static void<br>
+init_slot_compound_actions(struct rte_flow_action *actions, uint32_t n_actions,<br>
+                          const size_t *action_conf_sizes)<br>
+{<br>
+       uint32_t i;<br>
+<br>
+       for (i = 0; i < n_actions; i++) {<br>
+               if (action_conf_sizes[i] == 0)<br>
+                       continue;<br>
+<br>
+               switch (actions[i].type) {<br>
+               case RTE_FLOW_ACTION_TYPE_RSS: {<br>
+                       struct action_rss_data *rss =<br>
+                               (struct action_rss_data *)(uintptr_t)actions[i].conf;<br>
+                       rss->conf.func = RTE_ETH_HASH_FUNCTION_DEFAULT;<br>
+                       rss->conf.level = 0;<br>
+                       rss->conf.types = GET_RSS_HF();<br>
+                       rss->conf.key_len = sizeof(rss->key);<br>
+                       rss->conf.key = rss->key;<br>
+                       rss->conf.queue = rss->queue;<br>
+                       rss->key[0] = 1;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {<br>
+                       struct action_raw_encap_data *encap =<br>
+                               (struct action_raw_encap_data *)(uintptr_t)actions[i].conf;<br>
+                       encap->conf.data = encap->data;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {<br>
+                       struct action_raw_decap_data *decap =<br>
+                               (struct action_raw_decap_data *)(uintptr_t)actions[i].conf;<br>
+                       decap->conf.data = decap->data;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {<br>
+                       /*<br>
+                        * Layout within the conf area:<br>
+                        *   struct rte_flow_action_vxlan_encap<br>
+                        *   struct rte_flow_item[5]<br>
+                        *   struct rte_flow_item_eth<br>
+                        *   struct rte_flow_item_ipv4<br>
+                        *   struct rte_flow_item_udp<br>
+                        *   struct rte_flow_item_vxlan<br>
+                        */<br>
+                       uint8_t *base = (uint8_t *)(uintptr_t)actions[i].conf;<br>
+                       struct rte_flow_action_vxlan_encap *ve =<br>
+                               (struct rte_flow_action_vxlan_encap *)base;<br>
+                       struct rte_flow_item *items =<br>
+                               (struct rte_flow_item<br>
+                                        *)(base + sizeof(struct rte_flow_action_vxlan_encap));<br>
+                       uint8_t *data = (uint8_t *)(items + 5);<br>
+<br>
+                       struct rte_flow_item_eth *item_eth = (struct rte_flow_item_eth *)data;<br>
+                       data += sizeof(struct rte_flow_item_eth);<br>
+                       struct rte_flow_item_ipv4 *item_ipv4 = (struct rte_flow_item_ipv4 *)data;<br>
+                       data += sizeof(struct rte_flow_item_ipv4);<br>
+                       struct rte_flow_item_udp *item_udp = (struct rte_flow_item_udp *)data;<br>
+                       data += sizeof(struct rte_flow_item_udp);<br>
+                       struct rte_flow_item_vxlan *item_vxlan = (struct rte_flow_item_vxlan *)data;<br>
+<br>
+                       memset(item_eth, 0, sizeof(*item_eth));<br>
+                       memset(item_ipv4, 0, sizeof(*item_ipv4));<br>
+                       memset(item_udp, 0, sizeof(*item_udp));<br>
+                       memset(item_vxlan, 0, sizeof(*item_vxlan));<br>
+<br>
+                       item_ipv4->hdr.src_addr = RTE_IPV4(127, 0, 0, 1);<br>
+                       item_ipv4->hdr.version_ihl = RTE_IPV4_VHL_DEF;<br>
+                       item_udp->hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);<br>
+                       item_vxlan->hdr.vni[2] = 1;<br>
+<br>
+                       items[0].type = RTE_FLOW_ITEM_TYPE_ETH;<br>
+                       items[0].spec = item_eth;<br>
+                       items[0].mask = item_eth;<br>
+                       items[1].type = RTE_FLOW_ITEM_TYPE_IPV4;<br>
+                       items[1].spec = item_ipv4;<br>
+                       items[1].mask = item_ipv4;<br>
+                       items[2].type = RTE_FLOW_ITEM_TYPE_UDP;<br>
+                       items[2].spec = item_udp;<br>
+                       items[2].mask = item_udp;<br>
+                       items[3].type = RTE_FLOW_ITEM_TYPE_VXLAN;<br>
+                       items[3].spec = item_vxlan;<br>
+                       items[3].mask = item_vxlan;<br>
+                       items[4].type = RTE_FLOW_ITEM_TYPE_END;<br>
+<br>
+                       ve->definition = items;<br>
+                       break;<br>
+               }<br>
+               default:<br>
+                       break;<br>
+               }<br>
+       }<br>
+}<br>
+<br>
+/*<br>
+ * Allocate and pre-initialize all per-slot flat buffers.<br>
+ * Returns 0 on success.<br>
+ */<br>
+static int<br>
+init_slot_pool(struct async_flow_resources *res, uint32_t nb_queues, uint32_t queue_size,<br>
+              const struct rte_flow_item *pattern, uint32_t n_items, const size_t *item_spec_sizes,<br>
+              const struct rte_flow_action *template_actions, uint32_t n_actions,<br>
+              const size_t *action_conf_sizes)<br>
+{<br>
+       uint32_t items_array_bytes, actions_array_bytes;<br>
+       uint32_t spec_data_bytes, conf_data_bytes, mask_data_bytes;<br>
+       uint32_t slot_size, num_slots;<br>
+       uint32_t s, i;<br>
+       uint8_t *mptr;<br>
+<br>
+       /* Compute shared mask size */<br>
+       mask_data_bytes = 0;<br>
+       for (i = 0; i < n_items; i++)<br>
+               mask_data_bytes += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);<br>
+<br>
+       /* specs and masks have the same size */<br>
+       spec_data_bytes = mask_data_bytes;<br>
+<br>
+       conf_data_bytes = 0;<br>
+       for (i = 0; i < n_actions; i++)<br>
+               conf_data_bytes += RTE_ALIGN_CEIL(action_conf_sizes[i], 8);<br>
+<br>
+       /* Compute per-slot layout sizes (+ 1 for END sentinel) */<br>
+       items_array_bytes = n_items * sizeof(struct rte_flow_item);<br>
+       actions_array_bytes = n_actions * sizeof(struct rte_flow_action);<br>
+<br>
+       slot_size = RTE_ALIGN_CEIL(items_array_bytes + actions_array_bytes + spec_data_bytes +<br>
+                                          conf_data_bytes,<br>
+                                  RTE_CACHE_LINE_SIZE);<br>
+<br>
+       num_slots = queue_size * nb_queues;<br>
+<br>
+       /* Store layout info */<br>
+       res->slot_size = slot_size;<br>
+       res->slots_per_queue = queue_size;<br>
+       res->nb_queues = nb_queues;<br>
+       res->n_items = n_items;<br>
+       res->n_actions = n_actions;<br>
+<br>
+       /* Allocate shared masks */<br>
+       if (mask_data_bytes > 0) {<br>
+               res->shared_masks = aligned_alloc(<br>
+                       RTE_CACHE_LINE_SIZE, RTE_ALIGN_CEIL(mask_data_bytes, RTE_CACHE_LINE_SIZE));<br>
+               if (res->shared_masks == NULL) {<br>
+                       fprintf(stderr, "Failed to allocate shared masks (%u bytes)\n",<br>
+                               mask_data_bytes);<br>
+                       return -ENOMEM;<br>
+               }<br>
+               memset(res->shared_masks, 0, mask_data_bytes);<br>
+<br>
+               /* Copy mask data from template pattern */<br>
+               mptr = res->shared_masks;<br>
+               for (i = 0; i < n_items; i++) {<br>
+                       if (item_spec_sizes[i] > 0 && pattern[i].mask != NULL)<br>
+                               memcpy(mptr, pattern[i].mask, item_spec_sizes[i]);<br>
+                       mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);<br>
+               }<br>
+       }<br>
+<br>
+       /* Allocate per-slot pool */<br>
+       /* slot_size is already cache-line aligned, so total is a multiple */<br>
+       res->slot_pool = aligned_alloc(RTE_CACHE_LINE_SIZE, (size_t)num_slots * slot_size);<br>
+       if (res->slot_pool == NULL) {<br>
+               fprintf(stderr, "Failed to allocate slot pool (%u slots * %u bytes)\n", num_slots,<br>
+                       slot_size);<br>
+               free(res->shared_masks);<br>
+               res->shared_masks = NULL;<br>
+               return -ENOMEM;<br>
+       }<br>
+       memset(res->slot_pool, 0, (size_t)num_slots * slot_size);<br>
+<br>
+       /* Pre-initialize every slot */<br>
+       for (s = 0; s < num_slots; s++) {<br>
+               uint8_t *slot = res->slot_pool + (size_t)s * slot_size;<br>
+               struct rte_flow_item *items = (struct rte_flow_item *)slot;<br>
+               struct rte_flow_action *actions =<br>
+                       (struct rte_flow_action *)(slot + items_array_bytes);<br>
+               uint8_t *data = slot + items_array_bytes + actions_array_bytes;<br>
+<br>
+               /* Pre-set items: spec → per-slot data, mask → shared masks */<br>
+               mptr = res->shared_masks;<br>
+               for (i = 0; i < n_items; i++) {<br>
+                       items[i].type = pattern[i].type;<br>
+                       if (item_spec_sizes[i] > 0) {<br>
+                               items[i].spec = data;<br>
+                               items[i].mask = mptr;<br>
+                               data += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);<br>
+                               mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);<br>
+                       }<br>
+               }<br>
+               items[n_items].type = RTE_FLOW_ITEM_TYPE_END;<br>
+<br>
+               /* Pre-set actions: conf → per-slot data */<br>
+               for (i = 0; i < n_actions; i++) {<br>
+                       actions[i].type = template_actions[i].type;<br>
+                       if (action_conf_sizes[i] > 0) {<br>
+                               actions[i].conf = data;<br>
+                               data += RTE_ALIGN_CEIL(action_conf_sizes[i], 8);<br>
+                       }<br>
+               }<br>
+               actions[n_actions].type = RTE_FLOW_ACTION_TYPE_END;<br>
+<br>
+               /* Initialize compound action types (RSS, RAW_ENCAP, etc.) */<br>
+               init_slot_compound_actions(actions, n_actions, action_conf_sizes);<br>
+       }<br>
+<br>
+       /* Allocate and initialize per-queue slot tracking */<br>
+       res->queues = aligned_alloc(<br>
+               RTE_CACHE_LINE_SIZE,<br>
+               RTE_ALIGN_CEIL(nb_queues * sizeof(struct async_flow_queue), RTE_CACHE_LINE_SIZE));<br>
+       if (res->queues == NULL) {<br>
+               fprintf(stderr, "Failed to allocate queue structs (%u queues)\n", nb_queues);<br>
+               free(res->slot_pool);<br>
+               res->slot_pool = NULL;<br>
+               free(res->shared_masks);<br>
+               res->shared_masks = NULL;<br>
+               return -ENOMEM;<br>
+       }<br>
+       memset(res->queues, 0, nb_queues * sizeof(struct async_flow_queue));<br>
+       for (s = 0; s < nb_queues; s++) {<br>
+               res->queues[s].slots = res->slot_pool + (size_t)s * queue_size * slot_size;<br>
+               res->queues[s].head = 0;<br>
+       }<br>
+<br>
+       printf(":: Slot pool: %u slots * %u bytes = %u KB (shared masks: %u bytes)\n", num_slots,<br>
+              slot_size, (num_slots * slot_size) / 1024, mask_data_bytes);<br>
+<br>
+       return 0;<br>
+}<br>
+<br>
+/*<br>
+ * Hot-path: update per-flow item values through pre-set pointers.<br>
+ * Only IPV4/IPV6 src_addr varies per flow (based on counter).<br>
+ */<br>
+static void<br>
+update_item_values(struct rte_flow_item *items, uint32_t counter)<br>
+{<br>
+       uint8_t i;<br>
+<br>
+       for (i = 0; items[i].type != RTE_FLOW_ITEM_TYPE_END; i++) {<br>
+               switch (items[i].type) {<br>
+               case RTE_FLOW_ITEM_TYPE_IPV4:<br>
+                       ((struct rte_flow_item_ipv4 *)(uintptr_t)items[i].spec)->hdr.src_addr =<br>
+                               RTE_BE32(counter);<br>
+                       break;<br>
+               case RTE_FLOW_ITEM_TYPE_IPV6: {<br>
+                       struct rte_flow_item_ipv6 *spec =<br>
+                               (struct rte_flow_item_ipv6 *)(uintptr_t)items[i].spec;<br>
+                       uint8_t j;<br>
+                       for (j = 0; j < 4; j++)<br>
+                               spec->hdr.src_addr.a[15 - j] = counter >> (j * 8);<br>
+                       break;<br>
+               }<br>
+               default:<br>
+                       break;<br>
+               }<br>
+       }<br>
+}<br>
+<br>
+/*<br>
+ * Hot-path: update per-flow action values through pre-set pointers.<br>
+ */<br>
+static void<br>
+update_action_values(struct rte_flow_action *actions, uint32_t counter, uint16_t hairpinq,<br>
+                    uint64_t encap_data, uint64_t decap_data, __rte_unused uint8_t core_idx,<br>
+                    bool unique_data, uint8_t rx_queues_count, uint16_t dst_port)<br>
+{<br>
+       uint8_t i;<br>
+<br>
+       for (i = 0; actions[i].type != RTE_FLOW_ACTION_TYPE_END; i++) {<br>
+               switch (actions[i].type) {<br>
+               case RTE_FLOW_ACTION_TYPE_MARK:<br>
+                       ((struct rte_flow_action_mark *)(uintptr_t)actions[i].conf)->id =<br>
+                               (counter % 255) + 1;<br>
+                       break;<br>
+               case RTE_FLOW_ACTION_TYPE_QUEUE:<br>
+                       ((struct rte_flow_action_queue *)(uintptr_t)actions[i].conf)->index =<br>
+                               hairpinq ? (counter % hairpinq) + rx_queues_count :<br>
+                                          counter % rx_queues_count;<br>
+                       break;<br>
+               case RTE_FLOW_ACTION_TYPE_METER:<br>
+                       ((struct rte_flow_action_meter *)(uintptr_t)actions[i].conf)->mtr_id =<br>
+                               counter;<br>
+                       break;<br>
+               case RTE_FLOW_ACTION_TYPE_RSS: {<br>
+                       struct action_rss_data *rss =<br>
+                               (struct action_rss_data *)(uintptr_t)actions[i].conf;<br>
+                       uint16_t q;<br>
+                       if (hairpinq) {<br>
+                               rss->conf.queue_num = hairpinq;<br>
+                               for (q = 0; q < hairpinq; q++)<br>
+                                       rss->queue[q] = q + rx_queues_count;<br>
+                       } else {<br>
+                               rss->conf.queue_num = rx_queues_count;<br>
+                               for (q = 0; q < rx_queues_count; q++)<br>
+                                       rss->queue[q] = q;<br>
+                       }<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:<br>
+               case RTE_FLOW_ACTION_TYPE_SET_MAC_DST: {<br>
+                       struct rte_flow_action_set_mac *mac =<br>
+                               (struct rte_flow_action_set_mac *)(uintptr_t)actions[i].conf;<br>
+                       uint32_t val = unique_data ? counter : 1;<br>
+                       uint8_t j;<br>
+                       for (j = 0; j < RTE_ETHER_ADDR_LEN; j++) {<br>
+                               mac->mac_addr[j] = val & 0xff;<br>
+                               val >>= 8;<br>
+                       }<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST: {<br>
+                       uint32_t ip = unique_data ? counter : 1;<br>
+                       ((struct rte_flow_action_set_ipv4 *)(uintptr_t)actions[i].conf)->ipv4_addr =<br>
+                               RTE_BE32(ip + 1);<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST: {<br>
+                       struct rte_flow_action_set_ipv6 *v6 =<br>
+                               (struct rte_flow_action_set_ipv6 *)(uintptr_t)actions[i].conf;<br>
+                       uint32_t val = unique_data ? counter : 1;<br>
+                       uint8_t j;<br>
+                       for (j = 0; j < 16; j++) {<br>
+                               v6->ipv6_addr.a[j] = val & 0xff;<br>
+                               val >>= 8;<br>
+                       }<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_TP_SRC: {<br>
+                       uint32_t tp = unique_data ? counter : 100;<br>
+                       tp = tp % 0xffff;<br>
+                       ((struct rte_flow_action_set_tp *)(uintptr_t)actions[i].conf)->port =<br>
+                               RTE_BE16(tp & 0xffff);<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_TP_DST: {<br>
+                       uint32_t tp = unique_data ? counter : 100;<br>
+                       if (tp > 0xffff)<br>
+                               tp >>= 16;<br>
+                       ((struct rte_flow_action_set_tp *)(uintptr_t)actions[i].conf)->port =<br>
+                               RTE_BE16(tp & 0xffff);<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:<br>
+               case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:<br>
+               case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:<br>
+               case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ: {<br>
+                       uint32_t val = unique_data ? counter : 1;<br>
+                       *(rte_be32_t *)(uintptr_t)actions[i].conf = RTE_BE32(val);<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_TTL: {<br>
+                       uint32_t val = unique_data ? counter : 1;<br>
+                       ((struct rte_flow_action_set_ttl *)(uintptr_t)actions[i].conf)->ttl_value =<br>
+                               val % 0xff;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:<br>
+               case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP: {<br>
+                       uint32_t val = unique_data ? counter : 1;<br>
+                       ((struct rte_flow_action_set_dscp *)(uintptr_t)actions[i].conf)->dscp =<br>
+                               val % 0xff;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_PORT_ID:<br>
+                       ((struct rte_flow_action_port_id *)(uintptr_t)actions[i].conf)->id =<br>
+                               dst_port;<br>
+                       break;<br>
+               case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {<br>
+                       struct action_raw_encap_data *encap =<br>
+                               (struct action_raw_encap_data *)(uintptr_t)actions[i].conf;<br>
+                       uint8_t *header = encap->data;<br>
+                       struct rte_ether_hdr eth_hdr;<br>
+                       struct rte_ipv4_hdr ipv4_hdr;<br>
+                       struct rte_udp_hdr udp_hdr;<br>
+<br>
+                       memset(&eth_hdr, 0, sizeof(eth_hdr));<br>
+                       if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {<br>
+                               if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VLAN))<br>
+                                       eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_VLAN);<br>
+                               else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))<br>
+                                       eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);<br>
+                               else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))<br>
+                                       eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);<br>
+                               memcpy(header, &eth_hdr, sizeof(eth_hdr));<br>
+                               header += sizeof(eth_hdr);<br>
+                       }<br>
+                       if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4)) {<br>
+                               uint32_t ip_dst = unique_data ? counter : 1;<br>
+                               memset(&ipv4_hdr, 0, sizeof(ipv4_hdr));<br>
+                               ipv4_hdr.src_addr = RTE_IPV4(127, 0, 0, 1);<br>
+                               ipv4_hdr.dst_addr = RTE_BE32(ip_dst);<br>
+                               ipv4_hdr.version_ihl = RTE_IPV4_VHL_DEF;<br>
+                               if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP))<br>
+                                       ipv4_hdr.next_proto_id = 17; /* UDP */<br>
+                               if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_GRE))<br>
+                                       ipv4_hdr.next_proto_id = 47; /* GRE */<br>
+                               memcpy(header, &ipv4_hdr, sizeof(ipv4_hdr));<br>
+                               header += sizeof(ipv4_hdr);<br>
+                       }<br>
+                       if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP)) {<br>
+                               memset(&udp_hdr, 0, sizeof(udp_hdr));<br>
+                               if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VXLAN))<br>
+                                       udp_hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);<br>
+                               memcpy(header, &udp_hdr, sizeof(udp_hdr));<br>
+                               header += sizeof(udp_hdr);<br>
+                       }<br>
+                       encap->conf.size = header - encap->data;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {<br>
+                       struct action_raw_decap_data *decap_d =<br>
+                               (struct action_raw_decap_data *)(uintptr_t)actions[i].conf;<br>
+                       uint8_t *header = decap_d->data;<br>
+                       struct rte_ether_hdr eth_hdr;<br>
+<br>
+                       memset(&eth_hdr, 0, sizeof(eth_hdr));<br>
+                       if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {<br>
+                               if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))<br>
+                                       eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);<br>
+                               else if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))<br>
+                                       eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);<br>
+                               memcpy(header, &eth_hdr, sizeof(eth_hdr));<br>
+                               header += sizeof(eth_hdr);<br>
+                       }<br>
+                       decap_d->conf.size = header - decap_d->data;<br>
+                       break;<br>
+               }<br>
+               case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {<br>
+                       uint8_t *base = (uint8_t *)(uintptr_t)actions[i].conf;<br>
+                       struct rte_flow_item *vitems =<br>
+                               (struct rte_flow_item<br>
+                                        *)(base + sizeof(struct rte_flow_action_vxlan_encap));<br>
+                       uint32_t ip_dst = unique_data ? counter : 1;<br>
+                       /* vitems[1] is IPV4 */<br>
+                       ((struct rte_flow_item_ipv4 *)(uintptr_t)vitems[1].spec)->hdr.dst_addr =<br>
+                               RTE_BE32(ip_dst);<br>
+                       break;<br>
+               }<br>
+               default:<br>
+                       break;<br>
+               }<br>
+       }<br>
+}<br>
+<br>
+int<br>
+async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,<br>
+                    uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,<br>
+                    uint8_t flow_group, uint32_t rules_count)<br>
+{<br>
+       struct rte_flow_port_info port_info = {0};<br>
+       struct rte_flow_queue_info queue_info = {0};<br>
+       struct rte_flow_error error = {0};<br>
+       struct rte_flow_port_attr port_attr = {0};<br>
+       struct rte_flow_queue_attr queue_attr;<br>
+       const struct rte_flow_queue_attr **queue_attr_list;<br>
+       struct rte_flow_pattern_template_attr pt_attr = {0};<br>
+       struct rte_flow_actions_template_attr at_attr = {0};<br>
+       struct rte_flow_template_table_attr table_attr = {0};<br>
+       struct rte_flow_item pattern[MAX_ITEMS_NUM];<br>
+       struct rte_flow_action actions[MAX_ACTIONS_NUM];<br>
+       struct rte_flow_action action_masks[MAX_ACTIONS_NUM];<br>
+       size_t item_spec_sizes[MAX_ITEMS_NUM];<br>
+       size_t action_conf_sizes[MAX_ACTIONS_NUM];<br>
+       uint32_t n_items, n_actions;<br>
+       struct async_flow_resources *res;<br>
+       bool need_wire_orig_table = false;<br>
+       uint32_t i;<br>
+       int ret;<br>
+<br>
+       if (port_id >= MAX_PORTS)<br>
+               return -1;<br>
+<br>
+       res = &port_resources[port_id];<br>
+       memset(res, 0, sizeof(*res));<br>
+<br>
+       /* Query port flow info */<br>
+       ret = rte_flow_info_get(port_id, &port_info, &queue_info, &error);<br>
+       if (ret != 0) {<br>
+               fprintf(stderr, "Port %u: rte_flow_info_get failed: %s\n", port_id,<br>
+                       error.message ? error.message : "(no message)");<br>
+               return ret;<br>
+       }<br>
+<br>
+       if (port_info.max_nb_queues == 0 || queue_info.max_size == 0) {<br>
+               fprintf(stderr, "Port %u: rte_flow_info_get reports that no queues are supported\n",<br>
+                       port_id);<br>
+               return -1;<br>
+       }<br>
+<br>
+       /* Limit to device capabilities if reported */<br>
+       if (port_info.max_nb_queues != 0 && port_info.max_nb_queues != UINT32_MAX &&<br>
+           nb_queues > port_info.max_nb_queues)<br>
+               nb_queues = port_info.max_nb_queues;<br>
+       if (queue_info.max_size != 0 && queue_info.max_size != UINT32_MAX &&<br>
+           queue_size > queue_info.max_size)<br>
+               queue_size = queue_info.max_size;<br>
+<br>
+       /* Slot ring uses bitmask wrapping, so queue_size must be power of 2 */<br>
+       queue_size = rte_align32prevpow2(queue_size);<br>
+       if (queue_size == 0) {<br>
+               fprintf(stderr, "Port %u: queue_size is 0 after rounding\n", port_id);<br>
+               return -EINVAL;<br>
+       }<br>
+<br>
+       for (i = 0; i < MAX_ATTRS_NUM; i++) {<br>
+               if (flow_attrs[i] == 0)<br>
+                       break;<br>
+               if (flow_attrs[i] & INGRESS)<br>
+                       pt_attr.ingress = 1;<br>
+               else if (flow_attrs[i] & EGRESS)<br>
+                       pt_attr.egress = 1;<br>
+               else if (flow_attrs[i] & TRANSFER)<br>
+                       pt_attr.transfer = 1;<br>
+       }<br>
+       /* Enable relaxed matching for better performance */<br>
+       pt_attr.relaxed_matching = 1;<br>
+<br>
+       memset(pattern, 0, sizeof(pattern));<br>
+       memset(actions, 0, sizeof(actions));<br>
+       memset(action_masks, 0, sizeof(action_masks));<br>
+<br>
+       /* Fill templates and gather per-item/action sizes */<br>
+       fill_items_template(pattern, flow_items, 0, 0, item_spec_sizes, &n_items);<br>
+<br>
+       at_attr.ingress = pt_attr.ingress;<br>
+       at_attr.egress = pt_attr.egress;<br>
+       at_attr.transfer = pt_attr.transfer;<br>
+<br>
+       fill_actions_template(actions, action_masks, flow_actions, &port_attr,<br>
+                             &need_wire_orig_table, action_conf_sizes, &n_actions);<br>
+<br>
+       /* fill_actions_template count the number of actions that require each kind of object,<br>
+        * so we multiply by the number of rules to have correct number */<br>
+       port_attr.nb_counters *= rules_count;<br>
+       port_attr.nb_aging_objects *= rules_count;<br>
+       port_attr.nb_meters *= rules_count;<br>
+       port_attr.nb_conn_tracks *= rules_count;<br>
+       port_attr.nb_quotas *= rules_count;<br>
+<br>
+       table_attr.flow_attr.group = flow_group;<br>
+       table_attr.flow_attr.priority = 0;<br>
+       table_attr.flow_attr.ingress = pt_attr.ingress;<br>
+       table_attr.flow_attr.egress = pt_attr.egress;<br>
+       table_attr.flow_attr.transfer = pt_attr.transfer;<br>
+       table_attr.nb_flows = rules_count;<br>
+<br>
+       if (pt_attr.transfer && need_wire_orig_table)<br>
+               table_attr.specialize = RTE_FLOW_TABLE_SPECIALIZE_TRANSFER_WIRE_ORIG;<br>
+<br>
+       queue_attr_list = malloc(sizeof(*queue_attr_list) * nb_queues);<br>
+       if (queue_attr_list == NULL) {<br>
+               fprintf(stderr, "Port %u: failed to allocate queue_attr_list\n", port_id);<br>
+               return -ENOMEM;<br>
+       }<br>
+<br>
+       queue_attr.size = queue_size;<br>
+       for (i = 0; i < nb_queues; i++)<br>
+               queue_attr_list[i] = &queue_attr;<br>
+<br>
+       ret = rte_flow_configure(port_id, &port_attr, nb_queues, queue_attr_list, &error);<br>
+<br>
+       free(queue_attr_list);<br>
+<br>
+       if (ret != 0) {<br>
+               fprintf(stderr, "Port %u: rte_flow_configure failed (ret=%d, type=%d): %s\n",<br>
+                       port_id, ret, error.type, error.message ? error.message : "(no message)");<br>
+               return ret;<br>
+       }<br>
+<br>
+       /* Create pattern template */<br>
+       res->pattern_template =<br>
+               rte_flow_pattern_template_create(port_id, &pt_attr, pattern, &error);<br>
+       if (res->pattern_template == NULL) {<br>
+               fprintf(stderr, "Port %u: pattern template create failed: %s\n", port_id,<br>
+                       error.message ? error.message : "(no message)");<br>
+               return -1;<br>
+       }<br>
+<br>
+       /* Create actions template */<br>
+       res->actions_template =<br>
+               rte_flow_actions_template_create(port_id, &at_attr, actions, action_masks, &error);<br>
+       if (res->actions_template == NULL) {<br>
+               fprintf(stderr, "Port %u: actions template create failed: %s\n", port_id,<br>
+                       error.message ? error.message : "(no message)");<br>
+               rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);<br>
+               res->pattern_template = NULL;<br>
+               return -1;<br>
+       }<br>
+<br>
+       /* Create template table */<br>
+       res->table = rte_flow_template_table_create(port_id, &table_attr, &res->pattern_template, 1,<br>
+                                                   &res->actions_template, 1, &error);<br>
+       if (res->table == NULL) {<br>
+               fprintf(stderr, "Port %u: template table create failed: %s\n", port_id,<br>
+                       error.message ? error.message : "(no message)");<br>
+               rte_flow_actions_template_destroy(port_id, res->actions_template, &error);<br>
+               rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);<br>
+               res->pattern_template = NULL;<br>
+               res->actions_template = NULL;<br>
+               return -1;<br>
+       }<br>
+<br>
+       /* Allocate and pre-initialize per-slot flat buffers */<br>
+       ret = init_slot_pool(res, nb_queues, queue_size, pattern, n_items, item_spec_sizes, actions,<br>
+                            n_actions, action_conf_sizes);<br>
+       if (ret != 0) {<br>
+               fprintf(stderr, "Port %u: slot pool init failed\n", port_id);<br>
+               rte_flow_template_table_destroy(port_id, res->table, &error);<br>
+               rte_flow_actions_template_destroy(port_id, res->actions_template, &error);<br>
+               rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);<br>
+               res->table = NULL;<br>
+               res->actions_template = NULL;<br>
+               res->pattern_template = NULL;<br>
+               return ret;<br>
+       }<br>
+<br>
+       res->table_capacity = rules_count;<br>
+       res->initialized = true;<br>
+<br>
+       printf(":: Port %u: Async flow engine initialized (queues=%u, queue_size=%u)\n", port_id,<br>
+              nb_queues, queue_size);<br>
+<br>
+       return 0;<br>
+}<br>
+<br>
+struct rte_flow *<br>
+async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter, uint16_t hairpinq,<br>
+                   uint64_t encap_data, uint64_t decap_data, uint16_t dst_port, uint8_t core_idx,<br>
+                   uint8_t rx_queues_count, bool unique_data, bool postpone,<br>
+                   struct rte_flow_error *error)<br>
+{<br>
+       struct async_flow_resources *res;<br>
+       struct async_flow_queue *q;<br>
+       uint8_t *slot;<br>
+       uint32_t idx, items_array_bytes;<br>
+       struct rte_flow_item *items;<br>
+       struct rte_flow_action *actions;<br>
+       struct rte_flow_op_attr op_attr = {<br>
+               .postpone = postpone,<br>
+       };<br>
+<br>
+       if (port_id >= MAX_PORTS) {<br>
+               rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,<br>
+                                  "Invalid port ID");<br>
+               return NULL;<br>
+       }<br>
+<br>
+       res = &port_resources[port_id];<br>
+       if (!res->initialized) {<br>
+               rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,<br>
+                                  "Async flow resources not initialized");<br>
+               return NULL;<br>
+       }<br>
+<br>
+       if (queue_id >= res->nb_queues) {<br>
+               rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,<br>
+                                  "Invalid queue ID");<br>
+               return NULL;<br>
+       }<br>
+<br>
+       /* Pick the next slot from this queue's ring */<br>
+       q = &res->queues[queue_id];<br>
+       idx = q->head;<br>
+       q->head = (idx + 1) & (res->slots_per_queue - 1);<br>
+       slot = q->slots + (size_t)idx * res->slot_size;<br>
+       items_array_bytes = res->n_items * sizeof(struct rte_flow_item);<br>
+       items = (struct rte_flow_item *)slot;<br>
+       actions = (struct rte_flow_action *)(slot + items_array_bytes);<br>
+<br>
+       /* Update only per-flow varying values */<br>
+       update_item_values(items, counter);<br>
+       update_action_values(actions, counter, hairpinq, encap_data, decap_data, core_idx,<br>
+                            unique_data, rx_queues_count, dst_port);<br>
+<br>
+       return rte_flow_async_create(port_id, queue_id, &op_attr, res->table, items, 0, actions, 0,<br>
+                                    NULL, error);<br>
+}<br>
+<br>
+void<br>
+async_flow_cleanup_port(uint16_t port_id)<br>
+{<br>
+       struct async_flow_resources *res;<br>
+       struct rte_flow_error error;<br>
+       struct rte_flow_op_result results[64];<br>
+       int ret, i;<br>
+<br>
+       if (port_id >= MAX_PORTS)<br>
+               return;<br>
+<br>
+       res = &port_resources[port_id];<br>
+       if (!res->initialized)<br>
+               return;<br>
+<br>
+       /* Drain any pending async completions from flow flush */<br>
+       for (i = 0; i < DRAIN_MAX_ITERATIONS; i++) {<br>
+               rte_flow_push(port_id, 0, &error);<br>
+               ret = rte_flow_pull(port_id, 0, results, 64, &error);<br>
+               if (ret <= 0)<br>
+                       break;<br>
+       }<br>
+<br>
+       if (res->table != NULL) {<br>
+               rte_flow_template_table_destroy(port_id, res->table, &error);<br>
+               res->table = NULL;<br>
+       }<br>
+<br>
+       if (res->actions_template != NULL) {<br>
+               rte_flow_actions_template_destroy(port_id, res->actions_template, &error);<br>
+               res->actions_template = NULL;<br>
+       }<br>
+<br>
+       if (res->pattern_template != NULL) {<br>
+               rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);<br>
+               res->pattern_template = NULL;<br>
+       }<br>
+<br>
+       free(res->queues);<br>
+       res->queues = NULL;<br>
+       free(res->slot_pool);<br>
+       res->slot_pool = NULL;<br>
+       free(res->shared_masks);<br>
+       res->shared_masks = NULL;<br>
+<br>
+       res->initialized = false;<br>
+}<br>
diff --git a/app/test-flow-perf/async_flow.h b/app/test-flow-perf/async_flow.h<br>
new file mode 100644<br>
index 0000000000..8c12924bc6<br>
--- /dev/null<br>
+++ b/app/test-flow-perf/async_flow.h<br>
@@ -0,0 +1,54 @@<br>
+/* SPDX-License-Identifier: BSD-3-Clause<br>
+ * Copyright 2026 Maxime Peim <<a href="mailto:maxime.peim@gmail.com" target="_blank">maxime.peim@gmail.com</a>><br>
+ *<br>
+ * This file contains the async flow API related definitions<br>
+ * and function declarations.<br>
+ */<br>
+<br>
+#ifndef FLOW_PERF_ASYNC_FLOW<br>
+#define FLOW_PERF_ASYNC_FLOW<br>
+<br>
+#include <rte_flow.h><br>
+#include <stdbool.h><br>
+#include <stdint.h><br>
+<br>
+#include "config.h"<br>
+<br>
+/* Per-queue slot ring — tracks which slot to use next */<br>
+struct async_flow_queue {<br>
+       uint8_t *slots; /* pointer to this queue's region within slot_pool */<br>
+       uint32_t head;  /* next slot index (wraps mod slots_per_queue) */<br>
+};<br>
+<br>
+/* Per-port async flow resources */<br>
+struct async_flow_resources {<br>
+       struct rte_flow_pattern_template *pattern_template;<br>
+       struct rte_flow_actions_template *actions_template;<br>
+       struct rte_flow_template_table *table;<br>
+       uint8_t *slot_pool;    /* flat buffer pool for all slots */<br>
+       uint8_t *shared_masks; /* shared item mask data (one copy for all slots) */<br>
+       struct async_flow_queue *queues;<br>
+       uint32_t slot_size;       /* bytes per slot (cache-line aligned) */<br>
+       uint32_t slots_per_queue; /* = queue_size */<br>
+       uint32_t nb_queues;<br>
+       uint32_t n_items;   /* item count (excl. END) */<br>
+       uint32_t n_actions; /* action count (excl. END) */<br>
+       uint32_t table_capacity;<br>
+       bool initialized;<br>
+};<br>
+<br>
+/* Initialize async flow engine for a port */<br>
+int async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,<br>
+                        uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,<br>
+                        uint8_t flow_group, uint32_t rules_count);<br>
+<br>
+/* Create a flow rule asynchronously using pre-allocated slot */<br>
+struct rte_flow *async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter,<br>
+                                    uint16_t hairpinq, uint64_t encap_data, uint64_t decap_data,<br>
+                                    uint16_t dst_port, uint8_t core_idx, uint8_t rx_queues_count,<br>
+                                    bool unique_data, bool postpone, struct rte_flow_error *error);<br>
+<br>
+/* Cleanup async flow resources for a port */<br>
+void async_flow_cleanup_port(uint16_t port_id);<br>
+<br>
+#endif /* FLOW_PERF_ASYNC_FLOW */<br>
diff --git a/app/test-flow-perf/items_gen.c b/app/test-flow-perf/items_gen.c<br>
index c740e1838f..58f1c16cf8 100644<br>
--- a/app/test-flow-perf/items_gen.c<br>
+++ b/app/test-flow-perf/items_gen.c<br>
@@ -389,3 +389,61 @@ fill_items(struct rte_flow_item *items,<br>
<br>
        items[items_counter].type = RTE_FLOW_ITEM_TYPE_END;<br>
 }<br>
+<br>
+static size_t<br>
+item_spec_size(enum rte_flow_item_type type)<br>
+{<br>
+       switch (type) {<br>
+       case RTE_FLOW_ITEM_TYPE_ETH:<br>
+               return sizeof(struct rte_flow_item_eth);<br>
+       case RTE_FLOW_ITEM_TYPE_VLAN:<br>
+               return sizeof(struct rte_flow_item_vlan);<br>
+       case RTE_FLOW_ITEM_TYPE_IPV4:<br>
+               return sizeof(struct rte_flow_item_ipv4);<br>
+       case RTE_FLOW_ITEM_TYPE_IPV6:<br>
+               return sizeof(struct rte_flow_item_ipv6);<br>
+       case RTE_FLOW_ITEM_TYPE_TCP:<br>
+               return sizeof(struct rte_flow_item_tcp);<br>
+       case RTE_FLOW_ITEM_TYPE_UDP:<br>
+               return sizeof(struct rte_flow_item_udp);<br>
+       case RTE_FLOW_ITEM_TYPE_VXLAN:<br>
+               return sizeof(struct rte_flow_item_vxlan);<br>
+       case RTE_FLOW_ITEM_TYPE_VXLAN_GPE:<br>
+               return sizeof(struct rte_flow_item_vxlan_gpe);<br>
+       case RTE_FLOW_ITEM_TYPE_GRE:<br>
+               return sizeof(struct rte_flow_item_gre);<br>
+       case RTE_FLOW_ITEM_TYPE_GENEVE:<br>
+               return sizeof(struct rte_flow_item_geneve);<br>
+       case RTE_FLOW_ITEM_TYPE_GTP:<br>
+               return sizeof(struct rte_flow_item_gtp);<br>
+       case RTE_FLOW_ITEM_TYPE_META:<br>
+               return sizeof(struct rte_flow_item_meta);<br>
+       case RTE_FLOW_ITEM_TYPE_TAG:<br>
+               return sizeof(struct rte_flow_item_tag);<br>
+       case RTE_FLOW_ITEM_TYPE_ICMP:<br>
+               return sizeof(struct rte_flow_item_icmp);<br>
+       case RTE_FLOW_ITEM_TYPE_ICMP6:<br>
+               return sizeof(struct rte_flow_item_icmp6);<br>
+       default:<br>
+               return 0;<br>
+       }<br>
+}<br>
+<br>
+void<br>
+fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,<br>
+                   uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out)<br>
+{<br>
+       uint32_t count;<br>
+<br>
+       fill_items(items, flow_items, outer_ip_src, core_idx);<br>
+<br>
+       /* Count items before END */<br>
+       for (count = 0; items[count].type != RTE_FLOW_ITEM_TYPE_END; count++) {<br>
+               spec_sizes[count] = item_spec_size(items[count].type);<br>
+               /* For templates, set spec to NULL - only mask matters for template matching */<br>
+               items[count].spec = NULL;<br>
+       }<br>
+<br>
+       /* take END into account */<br>
+       *n_items_out = count + 1;<br>
+}<br>
diff --git a/app/test-flow-perf/items_gen.h b/app/test-flow-perf/items_gen.h<br>
index f4b0e9a981..0987f7be3c 100644<br>
--- a/app/test-flow-perf/items_gen.h<br>
+++ b/app/test-flow-perf/items_gen.h<br>
@@ -15,4 +15,10 @@<br>
 void fill_items(struct rte_flow_item *items, uint64_t *flow_items,<br>
        uint32_t outer_ip_src, uint8_t core_idx);<br>
<br>
+/* Fill items template for async flow API (masks only, no spec values).<br>
+ * If spec_sizes is non-NULL, populates per-item spec sizes and n_items_out.<br>
+ */<br>
+void fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,<br>
+                        uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out);<br>
+<br>
 #endif /* FLOW_PERF_ITEMS_GEN */<br>
diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c<br>
index 6636d1517f..2c6def95c2 100644<br>
--- a/app/test-flow-perf/main.c<br>
+++ b/app/test-flow-perf/main.c<br>
@@ -37,11 +37,15 @@<br>
 #include <rte_mtr.h><br>
 #include <rte_os_shim.h><br>
<br>
-#include "config.h"<br>
 #include "actions_gen.h"<br>
+#include "async_flow.h"<br>
+#include "config.h"<br>
 #include "flow_gen.h"<br>
+#include "rte_build_config.h"<br>
<br>
 #define MAX_BATCHES_COUNT          100<br>
+#define MAX_ASYNC_QUEUE_SIZE        (1 << 14)<br>
+#define MAX_PULL_RETRIES            (1 << 20)<br>
 #define DEFAULT_RULES_COUNT    4000000<br>
 #define DEFAULT_RULES_BATCH     100000<br>
 #define DEFAULT_GROUP                0<br>
@@ -55,7 +59,6 @@<br>
 #define HAIRPIN_TX_CONF_LOCKED_MEMORY (0x0100)<br>
 #define HAIRPIN_TX_CONF_RTE_MEMORY    (0x0200)<br>
<br>
-struct rte_flow *flow;<br>
 static uint8_t flow_group;<br>
<br>
 static uint64_t encap_data;<br>
@@ -81,6 +84,9 @@ static bool enable_fwd;<br>
 static bool unique_data;<br>
 static bool policy_mtr;<br>
 static bool packet_mode;<br>
+static bool async_mode;<br>
+static uint32_t async_queue_size = 1024;<br>
+static uint32_t async_push_batch = 256;<br>
<br>
 static uint8_t rx_queues_count;<br>
 static uint8_t tx_queues_count;<br>
@@ -598,6 +604,29 @@ usage(char *progname)<br>
                "Encapped data is fixed with pattern: ether,ipv4,udp,vxlan\n"<br>
                "With fixed values\n");<br>
        printf("  --vxlan-decap: add vxlan_decap action to flow actions\n");<br>
+<br>
+       printf("\nAsync flow API options:\n");<br>
+       printf("  --async: enable async flow API mode\n");<br>
+       printf("  --async-queue-size=N: size of each async queue,"<br>
+              " default is 1024\n");<br>
+       printf("  --async-push-batch=N: flows to batch before push,"<br>
+              " default is 256\n");<br>
+}<br>
+<br>
+static inline uint32_t<br>
+prev_power_of_two(uint32_t x)<br>
+{<br>
+       uint32_t saved = x;<br>
+       x--;<br>
+       x |= x >> 1;<br>
+       x |= x >> 2;<br>
+       x |= x >> 4;<br>
+       x |= x >> 8;<br>
+       x |= x >> 16;<br>
+       x++;<br>
+       if (x == saved)<br>
+               return x;<br>
+       return x >> 1;<br>
 }<br>
<br>
 static void<br>
@@ -734,6 +763,9 @@ args_parse(int argc, char **argv)<br>
                { "policy-mtr",                 1, 0, 0 },<br>
                { "meter-profile",              1, 0, 0 },<br>
                { "packet-mode",                0, 0, 0 },<br>
+               { "async",                      0, 0, 0 },<br>
+               { "async-queue-size",           1, 0, 0 },<br>
+               { "async-push-batch",           1, 0, 0 },<br>
                { 0, 0, 0, 0 },<br>
        };<br>
<br>
@@ -913,8 +945,7 @@ args_parse(int argc, char **argv)<br>
                                        rte_exit(EXIT_FAILURE, "Invalid hairpin config mask\n");<br>
                                hairpin_conf_mask = hp_conf;<br>
                        }<br>
-                       if (strcmp(lgopts[opt_idx].name,<br>
-                                       "port-id") == 0) {<br>
+                       if (strcmp(lgopts[opt_idx].name, "port-id") == 0) {<br>
                                uint16_t port_idx = 0;<br>
<br>
                                token = strtok(optarg, ",");<br>
@@ -981,6 +1012,26 @@ args_parse(int argc, char **argv)<br>
                        }<br>
                        if (strcmp(lgopts[opt_idx].name, "packet-mode") == 0)<br>
                                packet_mode = true;<br>
+                       if (strcmp(lgopts[opt_idx].name, "async") == 0)<br>
+                               async_mode = true;<br>
+                       if (strcmp(lgopts[opt_idx].name, "async-queue-size") == 0) {<br>
+                               n = atoi(optarg);<br>
+                               if (n >= MAX_ASYNC_QUEUE_SIZE)<br>
+                                       async_queue_size = MAX_ASYNC_QUEUE_SIZE;<br>
+                               else if (n > 0)<br>
+                                       async_queue_size = prev_power_of_two(n);<br>
+                               else<br>
+                                       rte_exit(EXIT_FAILURE, "async-queue-size should be > 0\n");<br>
+                       }<br>
+                       if (strcmp(lgopts[opt_idx].name, "async-push-batch") == 0) {<br>
+                               n = atoi(optarg);<br>
+                               if (n >= MAX_ASYNC_QUEUE_SIZE >> 1)<br>
+                                       async_push_batch = MAX_ASYNC_QUEUE_SIZE >> 1;<br>
+                               else if (n > 0)<br>
+                                       async_push_batch = prev_power_of_two(n);<br>
+                               else<br>
+                                       rte_exit(EXIT_FAILURE, "async-push-batch should be > 0\n");<br>
+                       }<br>
                        break;<br>
                default:<br>
                        usage(argv[0]);<br>
@@ -1457,10 +1508,10 @@ query_flows(int port_id, uint8_t core_id, struct rte_flow **flows_list)<br>
        mc_pool.flows_record.query[port_id][core_id] = cpu_time_used;<br>
 }<br>
<br>
-static struct rte_flow **<br>
-insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)<br>
+static void<br>
+insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)<br>
 {<br>
-       struct rte_flow **flows_list;<br>
+       struct rte_flow *flow;<br>
        struct rte_flow_error error;<br>
        clock_t start_batch, end_batch;<br>
        double first_flow_latency;<br>
@@ -1485,8 +1536,7 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)<br>
        global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);<br>
        global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);<br>
<br>
-       flows_list = rte_zmalloc("flows_list",<br>
-               (sizeof(struct rte_flow *) * (rules_count_per_core + 1)), 0);<br>
+       flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));<br>
        if (flows_list == NULL)<br>
                rte_exit(EXIT_FAILURE, "No Memory available!\n");<br>
<br>
@@ -1524,6 +1574,11 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)<br>
                        core_id, rx_queues_count,<br>
                        unique_data, max_priority, &error);<br>
<br>
+               if (!flow) {<br>
+                       print_flow_error(error);<br>
+                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");<br>
+               }<br>
+<br>
                if (!counter) {<br>
                        first_flow_latency = (double) (rte_get_timer_cycles() - start_batch);<br>
                        first_flow_latency /= rte_get_timer_hz();<br>
@@ -1537,11 +1592,6 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)<br>
                if (force_quit)<br>
                        counter = end_counter;<br>
<br>
-               if (!flow) {<br>
-                       print_flow_error(error);<br>
-                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");<br>
-               }<br>
-<br>
                flows_list[flow_index++] = flow;<br>
<br>
                /*<br>
@@ -1575,7 +1625,203 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)<br>
                port_id, core_id, rules_count_per_core, cpu_time_used);<br>
<br>
        mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;<br>
-       return flows_list;<br>
+}<br>
+<br>
+static uint32_t push_counter[RTE_MAX_LCORE];<br>
+<br>
+static inline int<br>
+push_pull_flows_async(int port_id, int queue_id, int core_id, uint32_t enqueued, bool empty,<br>
+                     bool check_op_status, struct rte_flow_error *error)<br>
+{<br>
+       static struct rte_flow_op_result results[RTE_MAX_LCORE][MAX_ASYNC_QUEUE_SIZE];<br>
+       uint32_t to_pull = (empty || async_push_batch > enqueued) ? enqueued : async_push_batch;<br>
+       uint32_t pulled_complete = 0;<br>
+       uint32_t retries = 0;<br>
+       int pulled, i;<br>
+       int ret = 0;<br>
+<br>
+       /* Push periodically to give HW work to do */<br>
+       ret = rte_flow_push(port_id, queue_id, error);<br>
+       if (ret)<br>
+               return ret;<br>
+       push_counter[core_id]++;<br>
+<br>
+       /* Check if queue is getting full, if so push and drain completions */<br>
+       if (!empty && push_counter[core_id] == 1)<br>
+               return 0;<br>
+<br>
+       while (to_pull > 0) {<br>
+               pulled = rte_flow_pull(port_id, queue_id, results[core_id], to_pull, error);<br>
+               if (pulled < 0) {<br>
+                       return -1;<br>
+               } else if (pulled == 0) {<br>
+                       if (++retries > MAX_PULL_RETRIES) {<br>
+                               rte_flow_error_set(error, ETIMEDOUT,<br>
+                                                  RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,<br>
+                                                  "Timeout waiting for async completions");<br>
+                               return -1;<br>
+                       }<br>
+                       rte_pause();<br>
+                       continue;<br>
+               }<br>
+               retries = 0;<br>
+<br>
+               to_pull -= pulled;<br>
+               pulled_complete += pulled;<br>
+               if (!check_op_status)<br>
+                       continue;<br>
+<br>
+               for (i = 0; i < pulled; i++) {<br>
+                       if (results[core_id][i].status != RTE_FLOW_OP_SUCCESS) {<br>
+                               rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED,<br>
+                                                  NULL, "Some flow rule insertion failed");<br>
+                               return -1;<br>
+                       }<br>
+               }<br>
+       }<br>
+<br>
+       return pulled_complete;<br>
+}<br>
+<br>
+static void<br>
+insert_flows_async(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)<br>
+{<br>
+       struct rte_flow *flow;<br>
+       struct rte_flow_error error;<br>
+       clock_t start_batch, end_batch;<br>
+       double first_flow_latency;<br>
+       double cpu_time_used;<br>
+       double insertion_rate;<br>
+       double cpu_time_per_batch[MAX_BATCHES_COUNT] = {0};<br>
+       double delta;<br>
+       uint32_t flow_index;<br>
+       uint32_t counter, batch_counter, start_counter = 0, end_counter;<br>
+       int rules_batch_idx;<br>
+       int rules_count_per_core;<br>
+       uint32_t enqueued = 0;<br>
+       uint32_t queue_id = core_id;<br>
+       bool first_batch = true;<br>
+       int pulled;<br>
+<br>
+       rules_count_per_core = rules_count / mc_pool.cores_count;<br>
+<br>
+       if (async_push_batch > async_queue_size >> 1)<br>
+               async_push_batch = async_queue_size >> 1;<br>
+<br>
+       /* Set boundaries of rules for each core. */<br>
+       if (core_id)<br>
+               start_counter = core_id * rules_count_per_core;<br>
+       end_counter = (core_id + 1) * rules_count_per_core;<br>
+<br>
+       cpu_time_used = 0;<br>
+       flow_index = 0;<br>
+       push_counter[core_id] = 0;<br>
+<br>
+       if (flow_group > 0 && core_id == 0) {<br>
+               /*<br>
+                * Create global rule to jump into flow_group,<br>
+                * this way the app will avoid the default rules.<br>
+                *<br>
+                * This rule will be created only once.<br>
+                *<br>
+                * Global rule:<br>
+                * group 0 eth / end actions jump group <flow_group><br>
+                */<br>
+<br>
+               uint64_t global_items[MAX_ITEMS_NUM] = {0};<br>
+               uint64_t global_actions[MAX_ACTIONS_NUM] = {0};<br>
+               global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);<br>
+               global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);<br>
+               flow = generate_flow(port_id, 0, flow_attrs, global_items, global_actions,<br>
+                                    flow_group, 0, 0, 0, 0, dst_port_id, core_id, rx_queues_count,<br>
+                                    unique_data, max_priority, &error);<br>
+<br>
+               if (flow == NULL) {<br>
+                       print_flow_error(error);<br>
+                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");<br>
+               }<br>
+               flows_list[flow_index++] = flow;<br>
+       }<br>
+<br>
+       start_batch = rte_get_timer_cycles();<br>
+       for (counter = start_counter; counter < end_counter;) {<br>
+               /* batch adding flow rules, this avoids unnecessary checks for push/pull */<br>
+               for (batch_counter = 0; batch_counter < async_push_batch && counter < end_counter;<br>
+                    batch_counter++, counter++) {<br>
+                       /* Create flow with postpone=true to batch operations */<br>
+                       flow = async_generate_flow(port_id, queue_id, counter, hairpin_queues_num,<br>
+                                                  encap_data, decap_data, dst_port_id, core_id,<br>
+                                                  rx_queues_count, unique_data, true, &error);<br>
+<br>
+                       if (!flow) {<br>
+                               print_flow_error(error);<br>
+                               rte_exit(EXIT_FAILURE, "Error in creating async flow\n");<br>
+                       }<br>
+<br>
+                       if (force_quit)<br>
+                               break;<br>
+<br>
+                       flows_list[flow_index++] = flow;<br>
+                       enqueued++;<br>
+<br>
+                       /*<br>
+                        * Save the insertion rate for rules batch.<br>
+                        * Check if the insertion reached the rules<br>
+                        * patch counter, then save the insertion rate<br>
+                        * for this batch.<br>
+                        */<br>
+                       if (!((counter + 1) % rules_batch)) {<br>
+                               end_batch = rte_get_timer_cycles();<br>
+                               delta = (double)(end_batch - start_batch);<br>
+                               rules_batch_idx = ((counter + 1) / rules_batch) - 1;<br>
+                               cpu_time_per_batch[rules_batch_idx] = delta / rte_get_timer_hz();<br>
+                               cpu_time_used += cpu_time_per_batch[rules_batch_idx];<br>
+                               start_batch = rte_get_timer_cycles();<br>
+                       }<br>
+               }<br>
+<br>
+               if ((pulled = push_pull_flows_async(port_id, queue_id, core_id, enqueued, false,<br>
+                                                   true, &error)) < 0) {<br>
+                       print_flow_error(error);<br>
+                       rte_exit(EXIT_FAILURE, "Error push/pull async operations\n");<br>
+               }<br>
+<br>
+               enqueued -= pulled;<br>
+<br>
+               if (first_batch) {<br>
+                       first_flow_latency = (double)(rte_get_timer_cycles() - start_batch);<br>
+                       first_flow_latency /= rte_get_timer_hz();<br>
+                       /* In millisecond */<br>
+                       first_flow_latency *= 1000;<br>
+                       printf(":: First Flow Batch Latency (Async) :: Port %d :: First batch (%u) "<br>
+                              "installed in %f milliseconds\n",<br>
+                              port_id, async_push_batch, first_flow_latency);<br>
+                       first_batch = false;<br>
+               }<br>
+       }<br>
+<br>
+       if (push_pull_flows_async(port_id, queue_id, core_id, enqueued, true, true, &error) < 0) {<br>
+               print_flow_error(error);<br>
+               rte_exit(EXIT_FAILURE, "Error final push/pull async operations\n");<br>
+       }<br>
+<br>
+       /* Print insertion rates for all batches */<br>
+       if (dump_iterations)<br>
+               print_rules_batches(cpu_time_per_batch);<br>
+<br>
+       printf(":: Port %d :: Core %d boundaries (Async) :: start @[%d] - end @[%d]\n", port_id,<br>
+              core_id, start_counter, end_counter - 1);<br>
+<br>
+       /* Insertion rate for all rules in one core */<br>
+       if (cpu_time_used > 0) {<br>
+               insertion_rate = ((double)rules_count_per_core / cpu_time_used) / 1000;<br>
+               printf(":: Port %d :: Core %d :: Async rules insertion rate -> %f K Rule/Sec\n",<br>
+                      port_id, core_id, insertion_rate);<br>
+       }<br>
+       printf(":: Port %d :: Core %d :: The time for creating %d async rules is %f seconds\n",<br>
+              port_id, core_id, rules_count_per_core, cpu_time_used);<br>
+<br>
+       mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;<br>
 }<br>
<br>
 static void<br>
@@ -1585,12 +1831,18 @@ flows_handler(uint8_t core_id)<br>
        uint16_t port_idx = 0;<br>
        uint16_t nr_ports;<br>
        int port_id;<br>
+       int rules_count_per_core;<br>
<br>
        nr_ports = rte_eth_dev_count_avail();<br>
<br>
        if (rules_batch > rules_count)<br>
                rules_batch = rules_count;<br>
<br>
+       rules_count_per_core = rules_count / mc_pool.cores_count;<br>
+       flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));<br>
+       if (flows_list == NULL)<br>
+               rte_exit(EXIT_FAILURE, "No Memory available!\n");<br>
+<br>
        printf(":: Rules Count per port: %d\n\n", rules_count);<br>
<br>
        for (port_id = 0; port_id < nr_ports; port_id++) {<br>
@@ -1602,10 +1854,10 @@ flows_handler(uint8_t core_id)<br>
                mc_pool.last_alloc[core_id] = (int64_t)dump_socket_mem(stdout);<br>
                if (has_meter())<br>
                        meters_handler(port_id, core_id, METER_CREATE);<br>
-               flows_list = insert_flows(port_id, core_id,<br>
-                                               dst_ports[port_idx++]);<br>
-               if (flows_list == NULL)<br>
-                       rte_exit(EXIT_FAILURE, "Error: Insertion Failed!\n");<br>
+               if (async_mode)<br>
+                       insert_flows_async(port_id, core_id, dst_ports[port_idx++], flows_list);<br>
+               else<br>
+                       insert_flows(port_id, core_id, dst_ports[port_idx++], flows_list);<br>
                mc_pool.current_alloc[core_id] = (int64_t)dump_socket_mem(stdout);<br>
<br>
                if (query_flag)<br>
@@ -2212,6 +2464,16 @@ init_port(void)<br>
                        }<br>
                }<br>
<br>
+               /* Configure async flow engine before device start */<br>
+               if (async_mode) {<br>
+                       ret = async_flow_init_port(port_id, mc_pool.cores_count, async_queue_size,<br>
+                                                  flow_items, flow_actions, flow_attrs, flow_group,<br>
+                                                  rules_count);<br>
+                       if (ret != 0)<br>
+                               rte_exit(EXIT_FAILURE, "Failed to init async flow on port %d\n",<br>
+                                        port_id);<br>
+               }<br>
+<br>
                ret = rte_eth_dev_start(port_id);<br>
                if (ret < 0)<br>
                        rte_exit(EXIT_FAILURE,<br>
@@ -2291,6 +2553,8 @@ main(int argc, char **argv)<br>
<br>
        RTE_ETH_FOREACH_DEV(port) {<br>
                rte_flow_flush(port, &error);<br>
+               if (async_mode)<br>
+                       async_flow_cleanup_port(port);<br>
                if (rte_eth_dev_stop(port) != 0)<br>
                        printf("Failed to stop device on port %u\n", port);<br>
                rte_eth_dev_close(port);<br>
diff --git a/app/test-flow-perf/meson.build b/app/test-flow-perf/meson.build<br>
index e101449e32..2f820a7597 100644<br>
--- a/app/test-flow-perf/meson.build<br>
+++ b/app/test-flow-perf/meson.build<br>
@@ -3,6 +3,7 @@<br>
<br>
 sources = files(<br>
         'actions_gen.c',<br>
+        'async_flow.c',<br>
         'flow_gen.c',<br>
         'items_gen.c',<br>
         'main.c',<br>
-- <br>
2.43.0<br>
<br>
</blockquote></div>