[PATCH v2] test/flow: add support for async API

Maxime Peim maxime.peim at gmail.com
Mon Mar 2 00:36:26 CET 2026


Hi Stephen,

Thanks for the review!
Some of  the AI generated comments were indeed issues, but already there
before my changes (the global flow variable for example).
So I updated my changes to fix those nevertheless.

Let me know if any other changes are needed.
Also, I have made improvement by pre-allocating a memory pool for async
flows:
 - it avoid every in-flight flow in the queue to point to the same items /
actions when using static variables
 - it allows better locality and cache usage
With this change I was able to double the number of flows I can insert on a
BF3.

On Mon, Mar 2, 2026 at 12:29 AM Maxime Peim <maxime.peim at gmail.com> wrote:

> Add async flow API mode to test-flow-perf application for improved
> flow rule insertion performance. The async API allows batching flow
> rule creation operations and processing completions in bulk, reducing
> per-rule overhead.
>
> New command line options:
>   --async: enable async flow API mode
>   --async-queue-size=N: size of async queues (default: 1024)
>   --async-push-batch=N: flows to batch before push (default: 256)
>
> Signed-off-by: Maxime Peim <maxime.peim at gmail.com>
> ---
> v2:
>   - Replace per-flow stack allocation with pre-allocated slot pool;
>     flat buffers are initialized once at init time and the hot path
>     only patches per-flow item/action values into a pre-set slot
>   - Fix alloca misuse: use heap allocation for queue_attr_list, round
>     queue_size to power of 2 for bitmask wrapping, add bounds checks
>   - Fix race on file-scope flow variable, premature latency
>     measurement, and integer division in rate calculation
>   - Drop unrelated lgopts reformatting
>   - Use malloc instead of rte_zmalloc for non-dataplane allocations
>   - Various robustness and style fixes
>
>  app/test-flow-perf/actions_gen.c | 281 +++++++++++-
>  app/test-flow-perf/actions_gen.h |  31 ++
>  app/test-flow-perf/async_flow.c  | 761 +++++++++++++++++++++++++++++++
>  app/test-flow-perf/async_flow.h  |  54 +++
>  app/test-flow-perf/items_gen.c   |  58 +++
>  app/test-flow-perf/items_gen.h   |   6 +
>  app/test-flow-perf/main.c        | 302 +++++++++++-
>  app/test-flow-perf/meson.build   |   1 +
>  8 files changed, 1454 insertions(+), 40 deletions(-)
>  create mode 100644 app/test-flow-perf/async_flow.c
>  create mode 100644 app/test-flow-perf/async_flow.h
>
> diff --git a/app/test-flow-perf/actions_gen.c
> b/app/test-flow-perf/actions_gen.c
> index 9d102e3af4..2b8edd50c8 100644
> --- a/app/test-flow-perf/actions_gen.c
> +++ b/app/test-flow-perf/actions_gen.c
> @@ -36,27 +36,7 @@ struct additional_para {
>         bool unique_data;
>  };
>
> -/* Storage for struct rte_flow_action_raw_encap including external data.
> */
> -struct action_raw_encap_data {
> -       struct rte_flow_action_raw_encap conf;
> -       uint8_t data[128];
> -       uint8_t preserve[128];
> -       uint16_t idx;
> -};
> -
> -/* Storage for struct rte_flow_action_raw_decap including external data.
> */
> -struct action_raw_decap_data {
> -       struct rte_flow_action_raw_decap conf;
> -       uint8_t data[128];
> -       uint16_t idx;
> -};
> -
> -/* Storage for struct rte_flow_action_rss including external data. */
> -struct action_rss_data {
> -       struct rte_flow_action_rss conf;
> -       uint8_t key[40];
> -       uint16_t queue[128];
> -};
> +/* Compound action data structs defined in actions_gen.h */
>
>  static void
>  add_mark(struct rte_flow_action *actions,
> @@ -1165,3 +1145,262 @@ fill_actions(struct rte_flow_action *actions,
> uint64_t *flow_actions,
>         free(queues);
>         free(hairpin_queues);
>  }
> +
> +static size_t
> +action_conf_size(enum rte_flow_action_type type)
> +{
> +       switch (type) {
> +       case RTE_FLOW_ACTION_TYPE_MARK:
> +               return sizeof(struct rte_flow_action_mark);
> +       case RTE_FLOW_ACTION_TYPE_QUEUE:
> +               return sizeof(struct rte_flow_action_queue);
> +       case RTE_FLOW_ACTION_TYPE_JUMP:
> +               return sizeof(struct rte_flow_action_jump);
> +       case RTE_FLOW_ACTION_TYPE_RSS:
> +               return sizeof(struct action_rss_data);
> +       case RTE_FLOW_ACTION_TYPE_SET_META:
> +               return sizeof(struct rte_flow_action_set_meta);
> +       case RTE_FLOW_ACTION_TYPE_SET_TAG:
> +               return sizeof(struct rte_flow_action_set_tag);
> +       case RTE_FLOW_ACTION_TYPE_PORT_ID:
> +               return sizeof(struct rte_flow_action_port_id);
> +       case RTE_FLOW_ACTION_TYPE_COUNT:
> +               return sizeof(struct rte_flow_action_count);
> +       case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
> +       case RTE_FLOW_ACTION_TYPE_SET_MAC_DST:
> +               return sizeof(struct rte_flow_action_set_mac);
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST:
> +               return sizeof(struct rte_flow_action_set_ipv4);
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST:
> +               return sizeof(struct rte_flow_action_set_ipv6);
> +       case RTE_FLOW_ACTION_TYPE_SET_TP_SRC:
> +       case RTE_FLOW_ACTION_TYPE_SET_TP_DST:
> +               return sizeof(struct rte_flow_action_set_tp);
> +       case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
> +       case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
> +       case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
> +       case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ:
> +               return sizeof(rte_be32_t);
> +       case RTE_FLOW_ACTION_TYPE_SET_TTL:
> +               return sizeof(struct rte_flow_action_set_ttl);
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
> +       case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP:
> +               return sizeof(struct rte_flow_action_set_dscp);
> +       case RTE_FLOW_ACTION_TYPE_METER:
> +               return sizeof(struct rte_flow_action_meter);
> +       case RTE_FLOW_ACTION_TYPE_RAW_ENCAP:
> +               return sizeof(struct action_raw_encap_data);
> +       case RTE_FLOW_ACTION_TYPE_RAW_DECAP:
> +               return sizeof(struct action_raw_decap_data);
> +       case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP:
> +               return sizeof(struct rte_flow_action_vxlan_encap) +
> +                      5 * sizeof(struct rte_flow_item) + sizeof(struct
> rte_flow_item_eth) +
> +                      sizeof(struct rte_flow_item_ipv4) + sizeof(struct
> rte_flow_item_udp) +
> +                      sizeof(struct rte_flow_item_vxlan);
> +       case RTE_FLOW_ACTION_TYPE_MODIFY_FIELD:
> +               return sizeof(struct rte_flow_action_modify_field);
> +       /* Zero-conf types */
> +       case RTE_FLOW_ACTION_TYPE_DROP:
> +       case RTE_FLOW_ACTION_TYPE_FLAG:
> +       case RTE_FLOW_ACTION_TYPE_DEC_TTL:
> +       case RTE_FLOW_ACTION_TYPE_VXLAN_DECAP:
> +               return 0;
> +       default:
> +               return 0;
> +       }
> +}
> +
> +void
> +fill_actions_template(struct rte_flow_action *actions, struct
> rte_flow_action *masks,
> +                     uint64_t *flow_actions, struct rte_flow_port_attr
> *port_attr,
> +                     bool *need_wire_orig_table, size_t *conf_sizes,
> uint32_t *n_actions_out)
> +{
> +       uint8_t actions_counter = 0;
> +       uint8_t i, j;
> +
> +       *need_wire_orig_table = false;
> +       memset(port_attr, 0, sizeof(*port_attr));
> +
> +       /* Static configurations for actions that need them in templates */
> +       static struct rte_flow_action_mark mark_conf = {
> +               .id = 1,
> +       };
> +       static struct rte_flow_action_queue queue_conf = {
> +               .index = 0,
> +       };
> +       static struct rte_flow_action_port_id port_id_conf = {
> +               .id = 0,
> +       };
> +       static struct rte_flow_action_jump jump_conf = {
> +               .group = 1,
> +       };
> +       static struct rte_flow_action_modify_field set_meta_conf = {
> +               .operation = RTE_FLOW_MODIFY_SET,
> +               .dst = {.field = RTE_FLOW_FIELD_META},
> +               .src =
> +                       {
> +                               .field = RTE_FLOW_FIELD_VALUE,
> +                               .value = {0, 0, 0, META_DATA},
> +                       },
> +               .width = 32,
> +       };
> +
> +       /* Static mask configurations for each action type */
> +       static struct rte_flow_action_mark mark_mask = {
> +               .id = UINT32_MAX,
> +       };
> +       static struct rte_flow_action_queue queue_mask = {
> +               .index = UINT16_MAX,
> +       };
> +       static struct rte_flow_action_jump jump_mask = {
> +               .group = UINT32_MAX,
> +       };
> +       static struct rte_flow_action_rss rss_mask = {
> +               .level = UINT32_MAX,
> +               .types = UINT64_MAX,
> +       };
> +       static struct rte_flow_action_set_meta set_meta_mask = {
> +               .data = UINT32_MAX,
> +               .mask = UINT32_MAX,
> +       };
> +       static struct rte_flow_action_set_tag set_tag_mask = {
> +               .data = UINT32_MAX,
> +               .mask = UINT32_MAX,
> +               .index = UINT8_MAX,
> +       };
> +       static struct rte_flow_action_port_id port_id_mask = {
> +               .id = UINT32_MAX,
> +       };
> +       static struct rte_flow_action_count count_mask;
> +       static struct rte_flow_action_set_mac set_mac_mask = {
> +               .mac_addr = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
> +       };
> +       static struct rte_flow_action_set_ipv4 set_ipv4_mask = {
> +               .ipv4_addr = UINT32_MAX,
> +       };
> +       static struct rte_flow_action_set_ipv6 set_ipv6_mask = {
> +               .ipv6_addr.a = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
> 0xff, 0xff, 0xff, 0xff,
> +                               0xff, 0xff, 0xff, 0xff, 0xff}};
> +       static struct rte_flow_action_set_tp set_tp_mask = {
> +               .port = UINT16_MAX,
> +       };
> +       static rte_be32_t tcp_seq_ack_mask = UINT32_MAX;
> +       static struct rte_flow_action_set_ttl set_ttl_mask = {
> +               .ttl_value = UINT8_MAX,
> +       };
> +       static struct rte_flow_action_set_dscp set_dscp_mask = {
> +               .dscp = UINT8_MAX,
> +       };
> +       static struct rte_flow_action_meter meter_mask = {
> +               .mtr_id = UINT32_MAX,
> +       };
> +
> +       static const struct {
> +               uint64_t flow_mask;
> +               enum rte_flow_action_type type;
> +               const void *action_conf;
> +               const void *action_mask;
> +               const bool need_wire_orig_table;
> +       } template_actions[] = {
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MARK),
> RTE_FLOW_ACTION_TYPE_MARK, &mark_conf,
> +                &mark_mask, true},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_COUNT),
> RTE_FLOW_ACTION_TYPE_COUNT, NULL,
> +                &count_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MODIFY_FIELD),
> +                RTE_FLOW_ACTION_TYPE_MODIFY_FIELD, &set_meta_conf,
> &set_meta_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TAG),
> RTE_FLOW_ACTION_TYPE_SET_TAG, NULL,
> +                &set_tag_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_FLAG),
> RTE_FLOW_ACTION_TYPE_FLAG, NULL, NULL,
> +                false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_SRC),
> +                RTE_FLOW_ACTION_TYPE_SET_MAC_SRC, NULL, &set_mac_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_DST),
> +                RTE_FLOW_ACTION_TYPE_SET_MAC_DST, NULL, &set_mac_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC, NULL, &set_ipv4_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DST),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV4_DST, NULL, &set_ipv4_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC, NULL, &set_ipv6_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DST),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV6_DST, NULL, &set_ipv6_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_SRC),
> RTE_FLOW_ACTION_TYPE_SET_TP_SRC,
> +                NULL, &set_tp_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_DST),
> RTE_FLOW_ACTION_TYPE_SET_TP_DST,
> +                NULL, &set_tp_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_ACK),
> +                RTE_FLOW_ACTION_TYPE_INC_TCP_ACK, NULL,
> &tcp_seq_ack_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK),
> +                RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK, NULL,
> &tcp_seq_ack_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ),
> +                RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ, NULL,
> &tcp_seq_ack_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ),
> +                RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ, NULL,
> &tcp_seq_ack_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TTL),
> RTE_FLOW_ACTION_TYPE_SET_TTL, NULL,
> +                &set_ttl_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TTL),
> RTE_FLOW_ACTION_TYPE_DEC_TTL, NULL,
> +                NULL, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP, NULL, &set_dscp_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP),
> +                RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP, NULL, &set_dscp_mask,
> false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_QUEUE),
> RTE_FLOW_ACTION_TYPE_QUEUE,
> +                &queue_conf, &queue_mask, true},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_RSS),
> RTE_FLOW_ACTION_TYPE_RSS, NULL,
> +                &rss_mask, true},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_JUMP),
> RTE_FLOW_ACTION_TYPE_JUMP, &jump_conf,
> +                &jump_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_PORT_ID),
> RTE_FLOW_ACTION_TYPE_PORT_ID,
> +                &port_id_conf, &port_id_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DROP),
> RTE_FLOW_ACTION_TYPE_DROP, NULL, NULL,
> +                false},
> +               {HAIRPIN_QUEUE_ACTION, RTE_FLOW_ACTION_TYPE_QUEUE,
> &queue_conf, &queue_mask, false},
> +               {HAIRPIN_RSS_ACTION, RTE_FLOW_ACTION_TYPE_RSS, NULL,
> &rss_mask, false},
> +               {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_METER),
> RTE_FLOW_ACTION_TYPE_METER, NULL,
> +                &meter_mask, false},
> +       };
> +
> +       for (j = 0; j < MAX_ACTIONS_NUM; j++) {
> +               if (flow_actions[j] == 0)
> +                       break;
> +               for (i = 0; i < RTE_DIM(template_actions); i++) {
> +                       if ((flow_actions[j] &
> template_actions[i].flow_mask) == 0)
> +                               continue;
> +
> +                       switch (template_actions[i].type) {
> +                       case RTE_FLOW_ACTION_TYPE_COUNT:
> +                               port_attr->nb_counters++;
> +                               break;
> +                       case RTE_FLOW_ACTION_TYPE_AGE:
> +                               port_attr->nb_aging_objects++;
> +                               break;
> +                       case RTE_FLOW_ACTION_TYPE_METER:
> +                               port_attr->nb_meters++;
> +                               break;
> +                       case RTE_FLOW_ACTION_TYPE_CONNTRACK:
> +                               port_attr->nb_conn_tracks++;
> +                               break;
> +                       case RTE_FLOW_ACTION_TYPE_QUOTA:
> +                               port_attr->nb_quotas++;
> +                       default:;
> +                       }
> +
> +                       actions[actions_counter].type =
> template_actions[i].type;
> +                       actions[actions_counter].conf =
> template_actions[i].action_conf;
> +                       masks[actions_counter].type =
> template_actions[i].type;
> +                       masks[actions_counter].conf =
> template_actions[i].action_mask;
> +                       conf_sizes[actions_counter] =
> action_conf_size(template_actions[i].type);
> +                       *need_wire_orig_table |=
> template_actions[i].need_wire_orig_table;
> +                       actions_counter++;
> +                       break;
> +               }
> +       }
> +
> +       actions[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
> +       masks[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
> +
> +       /* take END into account */
> +       *n_actions_out = actions_counter + 1;
> +}
> diff --git a/app/test-flow-perf/actions_gen.h
> b/app/test-flow-perf/actions_gen.h
> index 9e13b164f9..3ac0ffed59 100644
> --- a/app/test-flow-perf/actions_gen.h
> +++ b/app/test-flow-perf/actions_gen.h
> @@ -17,9 +17,40 @@
>  #define RTE_VXLAN_GPE_UDP_PORT 250
>  #define RTE_GENEVE_UDP_PORT 6081
>
> +/* Compound action data structures (needed by async_flow.c for slot init)
> */
> +
> +/* Storage for struct rte_flow_action_raw_encap including external data.
> */
> +struct action_raw_encap_data {
> +       struct rte_flow_action_raw_encap conf;
> +       uint8_t data[128];
> +       uint8_t preserve[128];
> +       uint16_t idx;
> +};
> +
> +/* Storage for struct rte_flow_action_raw_decap including external data.
> */
> +struct action_raw_decap_data {
> +       struct rte_flow_action_raw_decap conf;
> +       uint8_t data[128];
> +       uint16_t idx;
> +};
> +
> +/* Storage for struct rte_flow_action_rss including external data. */
> +struct action_rss_data {
> +       struct rte_flow_action_rss conf;
> +       uint8_t key[40];
> +       uint16_t queue[128];
> +};
> +
>  void fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,
>         uint32_t counter, uint16_t next_table, uint16_t hairpinq,
>         uint64_t encap_data, uint64_t decap_data, uint8_t core_idx,
>         bool unique_data, uint8_t rx_queues_count, uint16_t dst_port);
>
> +/* Fill actions template for async flow API (types only, no values).
> + * If conf_sizes is non-NULL, populates per-action conf sizes and
> n_actions_out.
> + */
> +void fill_actions_template(struct rte_flow_action *actions, struct
> rte_flow_action *masks,
> +                          uint64_t *flow_actions, struct
> rte_flow_port_attr *port_attr,
> +                          bool *need_wire_orig_table, size_t *conf_sizes,
> uint32_t *n_actions_out);
> +
>  #endif /* FLOW_PERF_ACTION_GEN */
> diff --git a/app/test-flow-perf/async_flow.c
> b/app/test-flow-perf/async_flow.c
> new file mode 100644
> index 0000000000..ae5a922856
> --- /dev/null
> +++ b/app/test-flow-perf/async_flow.c
> @@ -0,0 +1,761 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright 2026 Maxime Peim <maxime.peim at gmail.com>
> + *
> + * This file contains the async flow API implementation
> + * for the flow-perf application.
> + */
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <string.h>
> +
> +#include <rte_bitops.h>
> +#include <rte_common.h>
> +#include <rte_ethdev.h>
> +#include <rte_flow.h>
> +#include <rte_vxlan.h>
> +
> +#include "actions_gen.h"
> +#include "async_flow.h"
> +#include "flow_gen.h"
> +#include "items_gen.h"
> +
> +/* Max iterations when draining pending async completions during cleanup
> */
> +#define DRAIN_MAX_ITERATIONS 100
> +
> +/* Per-port async flow resources */
> +static struct async_flow_resources port_resources[MAX_PORTS];
> +
> +/*
> + * Initialize compound action types within a pre-allocated slot.
> + * Called once per slot during pool init to set up internal pointers
> + * for RSS, RAW_ENCAP, RAW_DECAP and VXLAN_ENCAP actions.
> + */
> +static void
> +init_slot_compound_actions(struct rte_flow_action *actions, uint32_t
> n_actions,
> +                          const size_t *action_conf_sizes)
> +{
> +       uint32_t i;
> +
> +       for (i = 0; i < n_actions; i++) {
> +               if (action_conf_sizes[i] == 0)
> +                       continue;
> +
> +               switch (actions[i].type) {
> +               case RTE_FLOW_ACTION_TYPE_RSS: {
> +                       struct action_rss_data *rss =
> +                               (struct action_rss_data
> *)(uintptr_t)actions[i].conf;
> +                       rss->conf.func = RTE_ETH_HASH_FUNCTION_DEFAULT;
> +                       rss->conf.level = 0;
> +                       rss->conf.types = GET_RSS_HF();
> +                       rss->conf.key_len = sizeof(rss->key);
> +                       rss->conf.key = rss->key;
> +                       rss->conf.queue = rss->queue;
> +                       rss->key[0] = 1;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
> +                       struct action_raw_encap_data *encap =
> +                               (struct action_raw_encap_data
> *)(uintptr_t)actions[i].conf;
> +                       encap->conf.data = encap->data;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
> +                       struct action_raw_decap_data *decap =
> +                               (struct action_raw_decap_data
> *)(uintptr_t)actions[i].conf;
> +                       decap->conf.data = decap->data;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
> +                       /*
> +                        * Layout within the conf area:
> +                        *   struct rte_flow_action_vxlan_encap
> +                        *   struct rte_flow_item[5]
> +                        *   struct rte_flow_item_eth
> +                        *   struct rte_flow_item_ipv4
> +                        *   struct rte_flow_item_udp
> +                        *   struct rte_flow_item_vxlan
> +                        */
> +                       uint8_t *base = (uint8_t
> *)(uintptr_t)actions[i].conf;
> +                       struct rte_flow_action_vxlan_encap *ve =
> +                               (struct rte_flow_action_vxlan_encap *)base;
> +                       struct rte_flow_item *items =
> +                               (struct rte_flow_item
> +                                        *)(base + sizeof(struct
> rte_flow_action_vxlan_encap));
> +                       uint8_t *data = (uint8_t *)(items + 5);
> +
> +                       struct rte_flow_item_eth *item_eth = (struct
> rte_flow_item_eth *)data;
> +                       data += sizeof(struct rte_flow_item_eth);
> +                       struct rte_flow_item_ipv4 *item_ipv4 = (struct
> rte_flow_item_ipv4 *)data;
> +                       data += sizeof(struct rte_flow_item_ipv4);
> +                       struct rte_flow_item_udp *item_udp = (struct
> rte_flow_item_udp *)data;
> +                       data += sizeof(struct rte_flow_item_udp);
> +                       struct rte_flow_item_vxlan *item_vxlan = (struct
> rte_flow_item_vxlan *)data;
> +
> +                       memset(item_eth, 0, sizeof(*item_eth));
> +                       memset(item_ipv4, 0, sizeof(*item_ipv4));
> +                       memset(item_udp, 0, sizeof(*item_udp));
> +                       memset(item_vxlan, 0, sizeof(*item_vxlan));
> +
> +                       item_ipv4->hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
> +                       item_ipv4->hdr.version_ihl = RTE_IPV4_VHL_DEF;
> +                       item_udp->hdr.dst_port =
> RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
> +                       item_vxlan->hdr.vni[2] = 1;
> +
> +                       items[0].type = RTE_FLOW_ITEM_TYPE_ETH;
> +                       items[0].spec = item_eth;
> +                       items[0].mask = item_eth;
> +                       items[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
> +                       items[1].spec = item_ipv4;
> +                       items[1].mask = item_ipv4;
> +                       items[2].type = RTE_FLOW_ITEM_TYPE_UDP;
> +                       items[2].spec = item_udp;
> +                       items[2].mask = item_udp;
> +                       items[3].type = RTE_FLOW_ITEM_TYPE_VXLAN;
> +                       items[3].spec = item_vxlan;
> +                       items[3].mask = item_vxlan;
> +                       items[4].type = RTE_FLOW_ITEM_TYPE_END;
> +
> +                       ve->definition = items;
> +                       break;
> +               }
> +               default:
> +                       break;
> +               }
> +       }
> +}
> +
> +/*
> + * Allocate and pre-initialize all per-slot flat buffers.
> + * Returns 0 on success.
> + */
> +static int
> +init_slot_pool(struct async_flow_resources *res, uint32_t nb_queues,
> uint32_t queue_size,
> +              const struct rte_flow_item *pattern, uint32_t n_items,
> const size_t *item_spec_sizes,
> +              const struct rte_flow_action *template_actions, uint32_t
> n_actions,
> +              const size_t *action_conf_sizes)
> +{
> +       uint32_t items_array_bytes, actions_array_bytes;
> +       uint32_t spec_data_bytes, conf_data_bytes, mask_data_bytes;
> +       uint32_t slot_size, num_slots;
> +       uint32_t s, i;
> +       uint8_t *mptr;
> +
> +       /* Compute shared mask size */
> +       mask_data_bytes = 0;
> +       for (i = 0; i < n_items; i++)
> +               mask_data_bytes += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
> +
> +       /* specs and masks have the same size */
> +       spec_data_bytes = mask_data_bytes;
> +
> +       conf_data_bytes = 0;
> +       for (i = 0; i < n_actions; i++)
> +               conf_data_bytes += RTE_ALIGN_CEIL(action_conf_sizes[i], 8);
> +
> +       /* Compute per-slot layout sizes (+ 1 for END sentinel) */
> +       items_array_bytes = n_items * sizeof(struct rte_flow_item);
> +       actions_array_bytes = n_actions * sizeof(struct rte_flow_action);
> +
> +       slot_size = RTE_ALIGN_CEIL(items_array_bytes + actions_array_bytes
> + spec_data_bytes +
> +                                          conf_data_bytes,
> +                                  RTE_CACHE_LINE_SIZE);
> +
> +       num_slots = queue_size * nb_queues;
> +
> +       /* Store layout info */
> +       res->slot_size = slot_size;
> +       res->slots_per_queue = queue_size;
> +       res->nb_queues = nb_queues;
> +       res->n_items = n_items;
> +       res->n_actions = n_actions;
> +
> +       /* Allocate shared masks */
> +       if (mask_data_bytes > 0) {
> +               res->shared_masks = aligned_alloc(
> +                       RTE_CACHE_LINE_SIZE,
> RTE_ALIGN_CEIL(mask_data_bytes, RTE_CACHE_LINE_SIZE));
> +               if (res->shared_masks == NULL) {
> +                       fprintf(stderr, "Failed to allocate shared masks
> (%u bytes)\n",
> +                               mask_data_bytes);
> +                       return -ENOMEM;
> +               }
> +               memset(res->shared_masks, 0, mask_data_bytes);
> +
> +               /* Copy mask data from template pattern */
> +               mptr = res->shared_masks;
> +               for (i = 0; i < n_items; i++) {
> +                       if (item_spec_sizes[i] > 0 && pattern[i].mask !=
> NULL)
> +                               memcpy(mptr, pattern[i].mask,
> item_spec_sizes[i]);
> +                       mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
> +               }
> +       }
> +
> +       /* Allocate per-slot pool */
> +       /* slot_size is already cache-line aligned, so total is a multiple
> */
> +       res->slot_pool = aligned_alloc(RTE_CACHE_LINE_SIZE,
> (size_t)num_slots * slot_size);
> +       if (res->slot_pool == NULL) {
> +               fprintf(stderr, "Failed to allocate slot pool (%u slots *
> %u bytes)\n", num_slots,
> +                       slot_size);
> +               free(res->shared_masks);
> +               res->shared_masks = NULL;
> +               return -ENOMEM;
> +       }
> +       memset(res->slot_pool, 0, (size_t)num_slots * slot_size);
> +
> +       /* Pre-initialize every slot */
> +       for (s = 0; s < num_slots; s++) {
> +               uint8_t *slot = res->slot_pool + (size_t)s * slot_size;
> +               struct rte_flow_item *items = (struct rte_flow_item *)slot;
> +               struct rte_flow_action *actions =
> +                       (struct rte_flow_action *)(slot +
> items_array_bytes);
> +               uint8_t *data = slot + items_array_bytes +
> actions_array_bytes;
> +
> +               /* Pre-set items: spec → per-slot data, mask → shared
> masks */
> +               mptr = res->shared_masks;
> +               for (i = 0; i < n_items; i++) {
> +                       items[i].type = pattern[i].type;
> +                       if (item_spec_sizes[i] > 0) {
> +                               items[i].spec = data;
> +                               items[i].mask = mptr;
> +                               data += RTE_ALIGN_CEIL(item_spec_sizes[i],
> 8);
> +                               mptr += RTE_ALIGN_CEIL(item_spec_sizes[i],
> 8);
> +                       }
> +               }
> +               items[n_items].type = RTE_FLOW_ITEM_TYPE_END;
> +
> +               /* Pre-set actions: conf → per-slot data */
> +               for (i = 0; i < n_actions; i++) {
> +                       actions[i].type = template_actions[i].type;
> +                       if (action_conf_sizes[i] > 0) {
> +                               actions[i].conf = data;
> +                               data +=
> RTE_ALIGN_CEIL(action_conf_sizes[i], 8);
> +                       }
> +               }
> +               actions[n_actions].type = RTE_FLOW_ACTION_TYPE_END;
> +
> +               /* Initialize compound action types (RSS, RAW_ENCAP, etc.)
> */
> +               init_slot_compound_actions(actions, n_actions,
> action_conf_sizes);
> +       }
> +
> +       /* Allocate and initialize per-queue slot tracking */
> +       res->queues = aligned_alloc(
> +               RTE_CACHE_LINE_SIZE,
> +               RTE_ALIGN_CEIL(nb_queues * sizeof(struct
> async_flow_queue), RTE_CACHE_LINE_SIZE));
> +       if (res->queues == NULL) {
> +               fprintf(stderr, "Failed to allocate queue structs (%u
> queues)\n", nb_queues);
> +               free(res->slot_pool);
> +               res->slot_pool = NULL;
> +               free(res->shared_masks);
> +               res->shared_masks = NULL;
> +               return -ENOMEM;
> +       }
> +       memset(res->queues, 0, nb_queues * sizeof(struct
> async_flow_queue));
> +       for (s = 0; s < nb_queues; s++) {
> +               res->queues[s].slots = res->slot_pool + (size_t)s *
> queue_size * slot_size;
> +               res->queues[s].head = 0;
> +       }
> +
> +       printf(":: Slot pool: %u slots * %u bytes = %u KB (shared masks:
> %u bytes)\n", num_slots,
> +              slot_size, (num_slots * slot_size) / 1024, mask_data_bytes);
> +
> +       return 0;
> +}
> +
> +/*
> + * Hot-path: update per-flow item values through pre-set pointers.
> + * Only IPV4/IPV6 src_addr varies per flow (based on counter).
> + */
> +static void
> +update_item_values(struct rte_flow_item *items, uint32_t counter)
> +{
> +       uint8_t i;
> +
> +       for (i = 0; items[i].type != RTE_FLOW_ITEM_TYPE_END; i++) {
> +               switch (items[i].type) {
> +               case RTE_FLOW_ITEM_TYPE_IPV4:
> +                       ((struct rte_flow_item_ipv4
> *)(uintptr_t)items[i].spec)->hdr.src_addr =
> +                               RTE_BE32(counter);
> +                       break;
> +               case RTE_FLOW_ITEM_TYPE_IPV6: {
> +                       struct rte_flow_item_ipv6 *spec =
> +                               (struct rte_flow_item_ipv6
> *)(uintptr_t)items[i].spec;
> +                       uint8_t j;
> +                       for (j = 0; j < 4; j++)
> +                               spec->hdr.src_addr.a[15 - j] = counter >>
> (j * 8);
> +                       break;
> +               }
> +               default:
> +                       break;
> +               }
> +       }
> +}
> +
> +/*
> + * Hot-path: update per-flow action values through pre-set pointers.
> + */
> +static void
> +update_action_values(struct rte_flow_action *actions, uint32_t counter,
> uint16_t hairpinq,
> +                    uint64_t encap_data, uint64_t decap_data,
> __rte_unused uint8_t core_idx,
> +                    bool unique_data, uint8_t rx_queues_count, uint16_t
> dst_port)
> +{
> +       uint8_t i;
> +
> +       for (i = 0; actions[i].type != RTE_FLOW_ACTION_TYPE_END; i++) {
> +               switch (actions[i].type) {
> +               case RTE_FLOW_ACTION_TYPE_MARK:
> +                       ((struct rte_flow_action_mark
> *)(uintptr_t)actions[i].conf)->id =
> +                               (counter % 255) + 1;
> +                       break;
> +               case RTE_FLOW_ACTION_TYPE_QUEUE:
> +                       ((struct rte_flow_action_queue
> *)(uintptr_t)actions[i].conf)->index =
> +                               hairpinq ? (counter % hairpinq) +
> rx_queues_count :
> +                                          counter % rx_queues_count;
> +                       break;
> +               case RTE_FLOW_ACTION_TYPE_METER:
> +                       ((struct rte_flow_action_meter
> *)(uintptr_t)actions[i].conf)->mtr_id =
> +                               counter;
> +                       break;
> +               case RTE_FLOW_ACTION_TYPE_RSS: {
> +                       struct action_rss_data *rss =
> +                               (struct action_rss_data
> *)(uintptr_t)actions[i].conf;
> +                       uint16_t q;
> +                       if (hairpinq) {
> +                               rss->conf.queue_num = hairpinq;
> +                               for (q = 0; q < hairpinq; q++)
> +                                       rss->queue[q] = q +
> rx_queues_count;
> +                       } else {
> +                               rss->conf.queue_num = rx_queues_count;
> +                               for (q = 0; q < rx_queues_count; q++)
> +                                       rss->queue[q] = q;
> +                       }
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
> +               case RTE_FLOW_ACTION_TYPE_SET_MAC_DST: {
> +                       struct rte_flow_action_set_mac *mac =
> +                               (struct rte_flow_action_set_mac
> *)(uintptr_t)actions[i].conf;
> +                       uint32_t val = unique_data ? counter : 1;
> +                       uint8_t j;
> +                       for (j = 0; j < RTE_ETHER_ADDR_LEN; j++) {
> +                               mac->mac_addr[j] = val & 0xff;
> +                               val >>= 8;
> +                       }
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST: {
> +                       uint32_t ip = unique_data ? counter : 1;
> +                       ((struct rte_flow_action_set_ipv4
> *)(uintptr_t)actions[i].conf)->ipv4_addr =
> +                               RTE_BE32(ip + 1);
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST: {
> +                       struct rte_flow_action_set_ipv6 *v6 =
> +                               (struct rte_flow_action_set_ipv6
> *)(uintptr_t)actions[i].conf;
> +                       uint32_t val = unique_data ? counter : 1;
> +                       uint8_t j;
> +                       for (j = 0; j < 16; j++) {
> +                               v6->ipv6_addr.a[j] = val & 0xff;
> +                               val >>= 8;
> +                       }
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_TP_SRC: {
> +                       uint32_t tp = unique_data ? counter : 100;
> +                       tp = tp % 0xffff;
> +                       ((struct rte_flow_action_set_tp
> *)(uintptr_t)actions[i].conf)->port =
> +                               RTE_BE16(tp & 0xffff);
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_TP_DST: {
> +                       uint32_t tp = unique_data ? counter : 100;
> +                       if (tp > 0xffff)
> +                               tp >>= 16;
> +                       ((struct rte_flow_action_set_tp
> *)(uintptr_t)actions[i].conf)->port =
> +                               RTE_BE16(tp & 0xffff);
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
> +               case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
> +               case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
> +               case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ: {
> +                       uint32_t val = unique_data ? counter : 1;
> +                       *(rte_be32_t *)(uintptr_t)actions[i].conf =
> RTE_BE32(val);
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_TTL: {
> +                       uint32_t val = unique_data ? counter : 1;
> +                       ((struct rte_flow_action_set_ttl
> *)(uintptr_t)actions[i].conf)->ttl_value =
> +                               val % 0xff;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
> +               case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP: {
> +                       uint32_t val = unique_data ? counter : 1;
> +                       ((struct rte_flow_action_set_dscp
> *)(uintptr_t)actions[i].conf)->dscp =
> +                               val % 0xff;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_PORT_ID:
> +                       ((struct rte_flow_action_port_id
> *)(uintptr_t)actions[i].conf)->id =
> +                               dst_port;
> +                       break;
> +               case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
> +                       struct action_raw_encap_data *encap =
> +                               (struct action_raw_encap_data
> *)(uintptr_t)actions[i].conf;
> +                       uint8_t *header = encap->data;
> +                       struct rte_ether_hdr eth_hdr;
> +                       struct rte_ipv4_hdr ipv4_hdr;
> +                       struct rte_udp_hdr udp_hdr;
> +
> +                       memset(&eth_hdr, 0, sizeof(eth_hdr));
> +                       if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
> +                               if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VLAN))
> +                                       eth_hdr.ether_type =
> RTE_BE16(RTE_ETHER_TYPE_VLAN);
> +                               else if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
> +                                       eth_hdr.ether_type =
> RTE_BE16(RTE_ETHER_TYPE_IPV4);
> +                               else if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
> +                                       eth_hdr.ether_type =
> RTE_BE16(RTE_ETHER_TYPE_IPV6);
> +                               memcpy(header, &eth_hdr, sizeof(eth_hdr));
> +                               header += sizeof(eth_hdr);
> +                       }
> +                       if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4)) {
> +                               uint32_t ip_dst = unique_data ? counter :
> 1;
> +                               memset(&ipv4_hdr, 0, sizeof(ipv4_hdr));
> +                               ipv4_hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
> +                               ipv4_hdr.dst_addr = RTE_BE32(ip_dst);
> +                               ipv4_hdr.version_ihl = RTE_IPV4_VHL_DEF;
> +                               if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP))
> +                                       ipv4_hdr.next_proto_id = 17; /*
> UDP */
> +                               if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_GRE))
> +                                       ipv4_hdr.next_proto_id = 47; /*
> GRE */
> +                               memcpy(header, &ipv4_hdr,
> sizeof(ipv4_hdr));
> +                               header += sizeof(ipv4_hdr);
> +                       }
> +                       if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP)) {
> +                               memset(&udp_hdr, 0, sizeof(udp_hdr));
> +                               if (encap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VXLAN))
> +                                       udp_hdr.dst_port =
> RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
> +                               memcpy(header, &udp_hdr, sizeof(udp_hdr));
> +                               header += sizeof(udp_hdr);
> +                       }
> +                       encap->conf.size = header - encap->data;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
> +                       struct action_raw_decap_data *decap_d =
> +                               (struct action_raw_decap_data
> *)(uintptr_t)actions[i].conf;
> +                       uint8_t *header = decap_d->data;
> +                       struct rte_ether_hdr eth_hdr;
> +
> +                       memset(&eth_hdr, 0, sizeof(eth_hdr));
> +                       if (decap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
> +                               if (decap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
> +                                       eth_hdr.ether_type =
> RTE_BE16(RTE_ETHER_TYPE_IPV4);
> +                               else if (decap_data &
> FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
> +                                       eth_hdr.ether_type =
> RTE_BE16(RTE_ETHER_TYPE_IPV6);
> +                               memcpy(header, &eth_hdr, sizeof(eth_hdr));
> +                               header += sizeof(eth_hdr);
> +                       }
> +                       decap_d->conf.size = header - decap_d->data;
> +                       break;
> +               }
> +               case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
> +                       uint8_t *base = (uint8_t
> *)(uintptr_t)actions[i].conf;
> +                       struct rte_flow_item *vitems =
> +                               (struct rte_flow_item
> +                                        *)(base + sizeof(struct
> rte_flow_action_vxlan_encap));
> +                       uint32_t ip_dst = unique_data ? counter : 1;
> +                       /* vitems[1] is IPV4 */
> +                       ((struct rte_flow_item_ipv4
> *)(uintptr_t)vitems[1].spec)->hdr.dst_addr =
> +                               RTE_BE32(ip_dst);
> +                       break;
> +               }
> +               default:
> +                       break;
> +               }
> +       }
> +}
> +
> +int
> +async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t
> queue_size,
> +                    uint64_t *flow_items, uint64_t *flow_actions,
> uint64_t *flow_attrs,
> +                    uint8_t flow_group, uint32_t rules_count)
> +{
> +       struct rte_flow_port_info port_info = {0};
> +       struct rte_flow_queue_info queue_info = {0};
> +       struct rte_flow_error error = {0};
> +       struct rte_flow_port_attr port_attr = {0};
> +       struct rte_flow_queue_attr queue_attr;
> +       const struct rte_flow_queue_attr **queue_attr_list;
> +       struct rte_flow_pattern_template_attr pt_attr = {0};
> +       struct rte_flow_actions_template_attr at_attr = {0};
> +       struct rte_flow_template_table_attr table_attr = {0};
> +       struct rte_flow_item pattern[MAX_ITEMS_NUM];
> +       struct rte_flow_action actions[MAX_ACTIONS_NUM];
> +       struct rte_flow_action action_masks[MAX_ACTIONS_NUM];
> +       size_t item_spec_sizes[MAX_ITEMS_NUM];
> +       size_t action_conf_sizes[MAX_ACTIONS_NUM];
> +       uint32_t n_items, n_actions;
> +       struct async_flow_resources *res;
> +       bool need_wire_orig_table = false;
> +       uint32_t i;
> +       int ret;
> +
> +       if (port_id >= MAX_PORTS)
> +               return -1;
> +
> +       res = &port_resources[port_id];
> +       memset(res, 0, sizeof(*res));
> +
> +       /* Query port flow info */
> +       ret = rte_flow_info_get(port_id, &port_info, &queue_info, &error);
> +       if (ret != 0) {
> +               fprintf(stderr, "Port %u: rte_flow_info_get failed: %s\n",
> port_id,
> +                       error.message ? error.message : "(no message)");
> +               return ret;
> +       }
> +
> +       if (port_info.max_nb_queues == 0 || queue_info.max_size == 0) {
> +               fprintf(stderr, "Port %u: rte_flow_info_get reports that
> no queues are supported\n",
> +                       port_id);
> +               return -1;
> +       }
> +
> +       /* Limit to device capabilities if reported */
> +       if (port_info.max_nb_queues != 0 && port_info.max_nb_queues !=
> UINT32_MAX &&
> +           nb_queues > port_info.max_nb_queues)
> +               nb_queues = port_info.max_nb_queues;
> +       if (queue_info.max_size != 0 && queue_info.max_size != UINT32_MAX
> &&
> +           queue_size > queue_info.max_size)
> +               queue_size = queue_info.max_size;
> +
> +       /* Slot ring uses bitmask wrapping, so queue_size must be power of
> 2 */
> +       queue_size = rte_align32prevpow2(queue_size);
> +       if (queue_size == 0) {
> +               fprintf(stderr, "Port %u: queue_size is 0 after
> rounding\n", port_id);
> +               return -EINVAL;
> +       }
> +
> +       for (i = 0; i < MAX_ATTRS_NUM; i++) {
> +               if (flow_attrs[i] == 0)
> +                       break;
> +               if (flow_attrs[i] & INGRESS)
> +                       pt_attr.ingress = 1;
> +               else if (flow_attrs[i] & EGRESS)
> +                       pt_attr.egress = 1;
> +               else if (flow_attrs[i] & TRANSFER)
> +                       pt_attr.transfer = 1;
> +       }
> +       /* Enable relaxed matching for better performance */
> +       pt_attr.relaxed_matching = 1;
> +
> +       memset(pattern, 0, sizeof(pattern));
> +       memset(actions, 0, sizeof(actions));
> +       memset(action_masks, 0, sizeof(action_masks));
> +
> +       /* Fill templates and gather per-item/action sizes */
> +       fill_items_template(pattern, flow_items, 0, 0, item_spec_sizes,
> &n_items);
> +
> +       at_attr.ingress = pt_attr.ingress;
> +       at_attr.egress = pt_attr.egress;
> +       at_attr.transfer = pt_attr.transfer;
> +
> +       fill_actions_template(actions, action_masks, flow_actions,
> &port_attr,
> +                             &need_wire_orig_table, action_conf_sizes,
> &n_actions);
> +
> +       /* fill_actions_template count the number of actions that require
> each kind of object,
> +        * so we multiply by the number of rules to have correct number */
> +       port_attr.nb_counters *= rules_count;
> +       port_attr.nb_aging_objects *= rules_count;
> +       port_attr.nb_meters *= rules_count;
> +       port_attr.nb_conn_tracks *= rules_count;
> +       port_attr.nb_quotas *= rules_count;
> +
> +       table_attr.flow_attr.group = flow_group;
> +       table_attr.flow_attr.priority = 0;
> +       table_attr.flow_attr.ingress = pt_attr.ingress;
> +       table_attr.flow_attr.egress = pt_attr.egress;
> +       table_attr.flow_attr.transfer = pt_attr.transfer;
> +       table_attr.nb_flows = rules_count;
> +
> +       if (pt_attr.transfer && need_wire_orig_table)
> +               table_attr.specialize =
> RTE_FLOW_TABLE_SPECIALIZE_TRANSFER_WIRE_ORIG;
> +
> +       queue_attr_list = malloc(sizeof(*queue_attr_list) * nb_queues);
> +       if (queue_attr_list == NULL) {
> +               fprintf(stderr, "Port %u: failed to allocate
> queue_attr_list\n", port_id);
> +               return -ENOMEM;
> +       }
> +
> +       queue_attr.size = queue_size;
> +       for (i = 0; i < nb_queues; i++)
> +               queue_attr_list[i] = &queue_attr;
> +
> +       ret = rte_flow_configure(port_id, &port_attr, nb_queues,
> queue_attr_list, &error);
> +
> +       free(queue_attr_list);
> +
> +       if (ret != 0) {
> +               fprintf(stderr, "Port %u: rte_flow_configure failed
> (ret=%d, type=%d): %s\n",
> +                       port_id, ret, error.type, error.message ?
> error.message : "(no message)");
> +               return ret;
> +       }
> +
> +       /* Create pattern template */
> +       res->pattern_template =
> +               rte_flow_pattern_template_create(port_id, &pt_attr,
> pattern, &error);
> +       if (res->pattern_template == NULL) {
> +               fprintf(stderr, "Port %u: pattern template create failed:
> %s\n", port_id,
> +                       error.message ? error.message : "(no message)");
> +               return -1;
> +       }
> +
> +       /* Create actions template */
> +       res->actions_template =
> +               rte_flow_actions_template_create(port_id, &at_attr,
> actions, action_masks, &error);
> +       if (res->actions_template == NULL) {
> +               fprintf(stderr, "Port %u: actions template create failed:
> %s\n", port_id,
> +                       error.message ? error.message : "(no message)");
> +               rte_flow_pattern_template_destroy(port_id,
> res->pattern_template, &error);
> +               res->pattern_template = NULL;
> +               return -1;
> +       }
> +
> +       /* Create template table */
> +       res->table = rte_flow_template_table_create(port_id, &table_attr,
> &res->pattern_template, 1,
> +
>  &res->actions_template, 1, &error);
> +       if (res->table == NULL) {
> +               fprintf(stderr, "Port %u: template table create failed:
> %s\n", port_id,
> +                       error.message ? error.message : "(no message)");
> +               rte_flow_actions_template_destroy(port_id,
> res->actions_template, &error);
> +               rte_flow_pattern_template_destroy(port_id,
> res->pattern_template, &error);
> +               res->pattern_template = NULL;
> +               res->actions_template = NULL;
> +               return -1;
> +       }
> +
> +       /* Allocate and pre-initialize per-slot flat buffers */
> +       ret = init_slot_pool(res, nb_queues, queue_size, pattern, n_items,
> item_spec_sizes, actions,
> +                            n_actions, action_conf_sizes);
> +       if (ret != 0) {
> +               fprintf(stderr, "Port %u: slot pool init failed\n",
> port_id);
> +               rte_flow_template_table_destroy(port_id, res->table,
> &error);
> +               rte_flow_actions_template_destroy(port_id,
> res->actions_template, &error);
> +               rte_flow_pattern_template_destroy(port_id,
> res->pattern_template, &error);
> +               res->table = NULL;
> +               res->actions_template = NULL;
> +               res->pattern_template = NULL;
> +               return ret;
> +       }
> +
> +       res->table_capacity = rules_count;
> +       res->initialized = true;
> +
> +       printf(":: Port %u: Async flow engine initialized (queues=%u,
> queue_size=%u)\n", port_id,
> +              nb_queues, queue_size);
> +
> +       return 0;
> +}
> +
> +struct rte_flow *
> +async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t
> counter, uint16_t hairpinq,
> +                   uint64_t encap_data, uint64_t decap_data, uint16_t
> dst_port, uint8_t core_idx,
> +                   uint8_t rx_queues_count, bool unique_data, bool
> postpone,
> +                   struct rte_flow_error *error)
> +{
> +       struct async_flow_resources *res;
> +       struct async_flow_queue *q;
> +       uint8_t *slot;
> +       uint32_t idx, items_array_bytes;
> +       struct rte_flow_item *items;
> +       struct rte_flow_action *actions;
> +       struct rte_flow_op_attr op_attr = {
> +               .postpone = postpone,
> +       };
> +
> +       if (port_id >= MAX_PORTS) {
> +               rte_flow_error_set(error, EINVAL,
> RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
> +                                  "Invalid port ID");
> +               return NULL;
> +       }
> +
> +       res = &port_resources[port_id];
> +       if (!res->initialized) {
> +               rte_flow_error_set(error, EINVAL,
> RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
> +                                  "Async flow resources not initialized");
> +               return NULL;
> +       }
> +
> +       if (queue_id >= res->nb_queues) {
> +               rte_flow_error_set(error, EINVAL,
> RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
> +                                  "Invalid queue ID");
> +               return NULL;
> +       }
> +
> +       /* Pick the next slot from this queue's ring */
> +       q = &res->queues[queue_id];
> +       idx = q->head;
> +       q->head = (idx + 1) & (res->slots_per_queue - 1);
> +       slot = q->slots + (size_t)idx * res->slot_size;
> +       items_array_bytes = res->n_items * sizeof(struct rte_flow_item);
> +       items = (struct rte_flow_item *)slot;
> +       actions = (struct rte_flow_action *)(slot + items_array_bytes);
> +
> +       /* Update only per-flow varying values */
> +       update_item_values(items, counter);
> +       update_action_values(actions, counter, hairpinq, encap_data,
> decap_data, core_idx,
> +                            unique_data, rx_queues_count, dst_port);
> +
> +       return rte_flow_async_create(port_id, queue_id, &op_attr,
> res->table, items, 0, actions, 0,
> +                                    NULL, error);
> +}
> +
> +void
> +async_flow_cleanup_port(uint16_t port_id)
> +{
> +       struct async_flow_resources *res;
> +       struct rte_flow_error error;
> +       struct rte_flow_op_result results[64];
> +       int ret, i;
> +
> +       if (port_id >= MAX_PORTS)
> +               return;
> +
> +       res = &port_resources[port_id];
> +       if (!res->initialized)
> +               return;
> +
> +       /* Drain any pending async completions from flow flush */
> +       for (i = 0; i < DRAIN_MAX_ITERATIONS; i++) {
> +               rte_flow_push(port_id, 0, &error);
> +               ret = rte_flow_pull(port_id, 0, results, 64, &error);
> +               if (ret <= 0)
> +                       break;
> +       }
> +
> +       if (res->table != NULL) {
> +               rte_flow_template_table_destroy(port_id, res->table,
> &error);
> +               res->table = NULL;
> +       }
> +
> +       if (res->actions_template != NULL) {
> +               rte_flow_actions_template_destroy(port_id,
> res->actions_template, &error);
> +               res->actions_template = NULL;
> +       }
> +
> +       if (res->pattern_template != NULL) {
> +               rte_flow_pattern_template_destroy(port_id,
> res->pattern_template, &error);
> +               res->pattern_template = NULL;
> +       }
> +
> +       free(res->queues);
> +       res->queues = NULL;
> +       free(res->slot_pool);
> +       res->slot_pool = NULL;
> +       free(res->shared_masks);
> +       res->shared_masks = NULL;
> +
> +       res->initialized = false;
> +}
> diff --git a/app/test-flow-perf/async_flow.h
> b/app/test-flow-perf/async_flow.h
> new file mode 100644
> index 0000000000..8c12924bc6
> --- /dev/null
> +++ b/app/test-flow-perf/async_flow.h
> @@ -0,0 +1,54 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright 2026 Maxime Peim <maxime.peim at gmail.com>
> + *
> + * This file contains the async flow API related definitions
> + * and function declarations.
> + */
> +
> +#ifndef FLOW_PERF_ASYNC_FLOW
> +#define FLOW_PERF_ASYNC_FLOW
> +
> +#include <rte_flow.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#include "config.h"
> +
> +/* Per-queue slot ring — tracks which slot to use next */
> +struct async_flow_queue {
> +       uint8_t *slots; /* pointer to this queue's region within slot_pool
> */
> +       uint32_t head;  /* next slot index (wraps mod slots_per_queue) */
> +};
> +
> +/* Per-port async flow resources */
> +struct async_flow_resources {
> +       struct rte_flow_pattern_template *pattern_template;
> +       struct rte_flow_actions_template *actions_template;
> +       struct rte_flow_template_table *table;
> +       uint8_t *slot_pool;    /* flat buffer pool for all slots */
> +       uint8_t *shared_masks; /* shared item mask data (one copy for all
> slots) */
> +       struct async_flow_queue *queues;
> +       uint32_t slot_size;       /* bytes per slot (cache-line aligned) */
> +       uint32_t slots_per_queue; /* = queue_size */
> +       uint32_t nb_queues;
> +       uint32_t n_items;   /* item count (excl. END) */
> +       uint32_t n_actions; /* action count (excl. END) */
> +       uint32_t table_capacity;
> +       bool initialized;
> +};
> +
> +/* Initialize async flow engine for a port */
> +int async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t
> queue_size,
> +                        uint64_t *flow_items, uint64_t *flow_actions,
> uint64_t *flow_attrs,
> +                        uint8_t flow_group, uint32_t rules_count);
> +
> +/* Create a flow rule asynchronously using pre-allocated slot */
> +struct rte_flow *async_generate_flow(uint16_t port_id, uint32_t queue_id,
> uint32_t counter,
> +                                    uint16_t hairpinq, uint64_t
> encap_data, uint64_t decap_data,
> +                                    uint16_t dst_port, uint8_t core_idx,
> uint8_t rx_queues_count,
> +                                    bool unique_data, bool postpone,
> struct rte_flow_error *error);
> +
> +/* Cleanup async flow resources for a port */
> +void async_flow_cleanup_port(uint16_t port_id);
> +
> +#endif /* FLOW_PERF_ASYNC_FLOW */
> diff --git a/app/test-flow-perf/items_gen.c
> b/app/test-flow-perf/items_gen.c
> index c740e1838f..58f1c16cf8 100644
> --- a/app/test-flow-perf/items_gen.c
> +++ b/app/test-flow-perf/items_gen.c
> @@ -389,3 +389,61 @@ fill_items(struct rte_flow_item *items,
>
>         items[items_counter].type = RTE_FLOW_ITEM_TYPE_END;
>  }
> +
> +static size_t
> +item_spec_size(enum rte_flow_item_type type)
> +{
> +       switch (type) {
> +       case RTE_FLOW_ITEM_TYPE_ETH:
> +               return sizeof(struct rte_flow_item_eth);
> +       case RTE_FLOW_ITEM_TYPE_VLAN:
> +               return sizeof(struct rte_flow_item_vlan);
> +       case RTE_FLOW_ITEM_TYPE_IPV4:
> +               return sizeof(struct rte_flow_item_ipv4);
> +       case RTE_FLOW_ITEM_TYPE_IPV6:
> +               return sizeof(struct rte_flow_item_ipv6);
> +       case RTE_FLOW_ITEM_TYPE_TCP:
> +               return sizeof(struct rte_flow_item_tcp);
> +       case RTE_FLOW_ITEM_TYPE_UDP:
> +               return sizeof(struct rte_flow_item_udp);
> +       case RTE_FLOW_ITEM_TYPE_VXLAN:
> +               return sizeof(struct rte_flow_item_vxlan);
> +       case RTE_FLOW_ITEM_TYPE_VXLAN_GPE:
> +               return sizeof(struct rte_flow_item_vxlan_gpe);
> +       case RTE_FLOW_ITEM_TYPE_GRE:
> +               return sizeof(struct rte_flow_item_gre);
> +       case RTE_FLOW_ITEM_TYPE_GENEVE:
> +               return sizeof(struct rte_flow_item_geneve);
> +       case RTE_FLOW_ITEM_TYPE_GTP:
> +               return sizeof(struct rte_flow_item_gtp);
> +       case RTE_FLOW_ITEM_TYPE_META:
> +               return sizeof(struct rte_flow_item_meta);
> +       case RTE_FLOW_ITEM_TYPE_TAG:
> +               return sizeof(struct rte_flow_item_tag);
> +       case RTE_FLOW_ITEM_TYPE_ICMP:
> +               return sizeof(struct rte_flow_item_icmp);
> +       case RTE_FLOW_ITEM_TYPE_ICMP6:
> +               return sizeof(struct rte_flow_item_icmp6);
> +       default:
> +               return 0;
> +       }
> +}
> +
> +void
> +fill_items_template(struct rte_flow_item *items, uint64_t *flow_items,
> uint32_t outer_ip_src,
> +                   uint8_t core_idx, size_t *spec_sizes, uint32_t
> *n_items_out)
> +{
> +       uint32_t count;
> +
> +       fill_items(items, flow_items, outer_ip_src, core_idx);
> +
> +       /* Count items before END */
> +       for (count = 0; items[count].type != RTE_FLOW_ITEM_TYPE_END;
> count++) {
> +               spec_sizes[count] = item_spec_size(items[count].type);
> +               /* For templates, set spec to NULL - only mask matters for
> template matching */
> +               items[count].spec = NULL;
> +       }
> +
> +       /* take END into account */
> +       *n_items_out = count + 1;
> +}
> diff --git a/app/test-flow-perf/items_gen.h
> b/app/test-flow-perf/items_gen.h
> index f4b0e9a981..0987f7be3c 100644
> --- a/app/test-flow-perf/items_gen.h
> +++ b/app/test-flow-perf/items_gen.h
> @@ -15,4 +15,10 @@
>  void fill_items(struct rte_flow_item *items, uint64_t *flow_items,
>         uint32_t outer_ip_src, uint8_t core_idx);
>
> +/* Fill items template for async flow API (masks only, no spec values).
> + * If spec_sizes is non-NULL, populates per-item spec sizes and
> n_items_out.
> + */
> +void fill_items_template(struct rte_flow_item *items, uint64_t
> *flow_items, uint32_t outer_ip_src,
> +                        uint8_t core_idx, size_t *spec_sizes, uint32_t
> *n_items_out);
> +
>  #endif /* FLOW_PERF_ITEMS_GEN */
> diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c
> index 6636d1517f..2c6def95c2 100644
> --- a/app/test-flow-perf/main.c
> +++ b/app/test-flow-perf/main.c
> @@ -37,11 +37,15 @@
>  #include <rte_mtr.h>
>  #include <rte_os_shim.h>
>
> -#include "config.h"
>  #include "actions_gen.h"
> +#include "async_flow.h"
> +#include "config.h"
>  #include "flow_gen.h"
> +#include "rte_build_config.h"
>
>  #define MAX_BATCHES_COUNT          100
> +#define MAX_ASYNC_QUEUE_SIZE        (1 << 14)
> +#define MAX_PULL_RETRIES            (1 << 20)
>  #define DEFAULT_RULES_COUNT    4000000
>  #define DEFAULT_RULES_BATCH     100000
>  #define DEFAULT_GROUP                0
> @@ -55,7 +59,6 @@
>  #define HAIRPIN_TX_CONF_LOCKED_MEMORY (0x0100)
>  #define HAIRPIN_TX_CONF_RTE_MEMORY    (0x0200)
>
> -struct rte_flow *flow;
>  static uint8_t flow_group;
>
>  static uint64_t encap_data;
> @@ -81,6 +84,9 @@ static bool enable_fwd;
>  static bool unique_data;
>  static bool policy_mtr;
>  static bool packet_mode;
> +static bool async_mode;
> +static uint32_t async_queue_size = 1024;
> +static uint32_t async_push_batch = 256;
>
>  static uint8_t rx_queues_count;
>  static uint8_t tx_queues_count;
> @@ -598,6 +604,29 @@ usage(char *progname)
>                 "Encapped data is fixed with pattern:
> ether,ipv4,udp,vxlan\n"
>                 "With fixed values\n");
>         printf("  --vxlan-decap: add vxlan_decap action to flow
> actions\n");
> +
> +       printf("\nAsync flow API options:\n");
> +       printf("  --async: enable async flow API mode\n");
> +       printf("  --async-queue-size=N: size of each async queue,"
> +              " default is 1024\n");
> +       printf("  --async-push-batch=N: flows to batch before push,"
> +              " default is 256\n");
> +}
> +
> +static inline uint32_t
> +prev_power_of_two(uint32_t x)
> +{
> +       uint32_t saved = x;
> +       x--;
> +       x |= x >> 1;
> +       x |= x >> 2;
> +       x |= x >> 4;
> +       x |= x >> 8;
> +       x |= x >> 16;
> +       x++;
> +       if (x == saved)
> +               return x;
> +       return x >> 1;
>  }
>
>  static void
> @@ -734,6 +763,9 @@ args_parse(int argc, char **argv)
>                 { "policy-mtr",                 1, 0, 0 },
>                 { "meter-profile",              1, 0, 0 },
>                 { "packet-mode",                0, 0, 0 },
> +               { "async",                      0, 0, 0 },
> +               { "async-queue-size",           1, 0, 0 },
> +               { "async-push-batch",           1, 0, 0 },
>                 { 0, 0, 0, 0 },
>         };
>
> @@ -913,8 +945,7 @@ args_parse(int argc, char **argv)
>                                         rte_exit(EXIT_FAILURE, "Invalid
> hairpin config mask\n");
>                                 hairpin_conf_mask = hp_conf;
>                         }
> -                       if (strcmp(lgopts[opt_idx].name,
> -                                       "port-id") == 0) {
> +                       if (strcmp(lgopts[opt_idx].name, "port-id") == 0) {
>                                 uint16_t port_idx = 0;
>
>                                 token = strtok(optarg, ",");
> @@ -981,6 +1012,26 @@ args_parse(int argc, char **argv)
>                         }
>                         if (strcmp(lgopts[opt_idx].name, "packet-mode") ==
> 0)
>                                 packet_mode = true;
> +                       if (strcmp(lgopts[opt_idx].name, "async") == 0)
> +                               async_mode = true;
> +                       if (strcmp(lgopts[opt_idx].name,
> "async-queue-size") == 0) {
> +                               n = atoi(optarg);
> +                               if (n >= MAX_ASYNC_QUEUE_SIZE)
> +                                       async_queue_size =
> MAX_ASYNC_QUEUE_SIZE;
> +                               else if (n > 0)
> +                                       async_queue_size =
> prev_power_of_two(n);
> +                               else
> +                                       rte_exit(EXIT_FAILURE,
> "async-queue-size should be > 0\n");
> +                       }
> +                       if (strcmp(lgopts[opt_idx].name,
> "async-push-batch") == 0) {
> +                               n = atoi(optarg);
> +                               if (n >= MAX_ASYNC_QUEUE_SIZE >> 1)
> +                                       async_push_batch =
> MAX_ASYNC_QUEUE_SIZE >> 1;
> +                               else if (n > 0)
> +                                       async_push_batch =
> prev_power_of_two(n);
> +                               else
> +                                       rte_exit(EXIT_FAILURE,
> "async-push-batch should be > 0\n");
> +                       }
>                         break;
>                 default:
>                         usage(argv[0]);
> @@ -1457,10 +1508,10 @@ query_flows(int port_id, uint8_t core_id, struct
> rte_flow **flows_list)
>         mc_pool.flows_record.query[port_id][core_id] = cpu_time_used;
>  }
>
> -static struct rte_flow **
> -insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
> +static void
> +insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id, struct
> rte_flow **flows_list)
>  {
> -       struct rte_flow **flows_list;
> +       struct rte_flow *flow;
>         struct rte_flow_error error;
>         clock_t start_batch, end_batch;
>         double first_flow_latency;
> @@ -1485,8 +1536,7 @@ insert_flows(int port_id, uint8_t core_id, uint16_t
> dst_port_id)
>         global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
>         global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
>
> -       flows_list = rte_zmalloc("flows_list",
> -               (sizeof(struct rte_flow *) * (rules_count_per_core + 1)),
> 0);
> +       flows_list = malloc(sizeof(struct rte_flow *) *
> (rules_count_per_core + 1));
>         if (flows_list == NULL)
>                 rte_exit(EXIT_FAILURE, "No Memory available!\n");
>
> @@ -1524,6 +1574,11 @@ insert_flows(int port_id, uint8_t core_id, uint16_t
> dst_port_id)
>                         core_id, rx_queues_count,
>                         unique_data, max_priority, &error);
>
> +               if (!flow) {
> +                       print_flow_error(error);
> +                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");
> +               }
> +
>                 if (!counter) {
>                         first_flow_latency = (double)
> (rte_get_timer_cycles() - start_batch);
>                         first_flow_latency /= rte_get_timer_hz();
> @@ -1537,11 +1592,6 @@ insert_flows(int port_id, uint8_t core_id, uint16_t
> dst_port_id)
>                 if (force_quit)
>                         counter = end_counter;
>
> -               if (!flow) {
> -                       print_flow_error(error);
> -                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");
> -               }
> -
>                 flows_list[flow_index++] = flow;
>
>                 /*
> @@ -1575,7 +1625,203 @@ insert_flows(int port_id, uint8_t core_id,
> uint16_t dst_port_id)
>                 port_id, core_id, rules_count_per_core, cpu_time_used);
>
>         mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
> -       return flows_list;
> +}
> +
> +static uint32_t push_counter[RTE_MAX_LCORE];
> +
> +static inline int
> +push_pull_flows_async(int port_id, int queue_id, int core_id, uint32_t
> enqueued, bool empty,
> +                     bool check_op_status, struct rte_flow_error *error)
> +{
> +       static struct rte_flow_op_result
> results[RTE_MAX_LCORE][MAX_ASYNC_QUEUE_SIZE];
> +       uint32_t to_pull = (empty || async_push_batch > enqueued) ?
> enqueued : async_push_batch;
> +       uint32_t pulled_complete = 0;
> +       uint32_t retries = 0;
> +       int pulled, i;
> +       int ret = 0;
> +
> +       /* Push periodically to give HW work to do */
> +       ret = rte_flow_push(port_id, queue_id, error);
> +       if (ret)
> +               return ret;
> +       push_counter[core_id]++;
> +
> +       /* Check if queue is getting full, if so push and drain
> completions */
> +       if (!empty && push_counter[core_id] == 1)
> +               return 0;
> +
> +       while (to_pull > 0) {
> +               pulled = rte_flow_pull(port_id, queue_id,
> results[core_id], to_pull, error);
> +               if (pulled < 0) {
> +                       return -1;
> +               } else if (pulled == 0) {
> +                       if (++retries > MAX_PULL_RETRIES) {
> +                               rte_flow_error_set(error, ETIMEDOUT,
> +
> RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
> +                                                  "Timeout waiting for
> async completions");
> +                               return -1;
> +                       }
> +                       rte_pause();
> +                       continue;
> +               }
> +               retries = 0;
> +
> +               to_pull -= pulled;
> +               pulled_complete += pulled;
> +               if (!check_op_status)
> +                       continue;
> +
> +               for (i = 0; i < pulled; i++) {
> +                       if (results[core_id][i].status !=
> RTE_FLOW_OP_SUCCESS) {
> +                               rte_flow_error_set(error, EINVAL,
> RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
> +                                                  NULL, "Some flow rule
> insertion failed");
> +                               return -1;
> +                       }
> +               }
> +       }
> +
> +       return pulled_complete;
> +}
> +
> +static void
> +insert_flows_async(int port_id, uint8_t core_id, uint16_t dst_port_id,
> struct rte_flow **flows_list)
> +{
> +       struct rte_flow *flow;
> +       struct rte_flow_error error;
> +       clock_t start_batch, end_batch;
> +       double first_flow_latency;
> +       double cpu_time_used;
> +       double insertion_rate;
> +       double cpu_time_per_batch[MAX_BATCHES_COUNT] = {0};
> +       double delta;
> +       uint32_t flow_index;
> +       uint32_t counter, batch_counter, start_counter = 0, end_counter;
> +       int rules_batch_idx;
> +       int rules_count_per_core;
> +       uint32_t enqueued = 0;
> +       uint32_t queue_id = core_id;
> +       bool first_batch = true;
> +       int pulled;
> +
> +       rules_count_per_core = rules_count / mc_pool.cores_count;
> +
> +       if (async_push_batch > async_queue_size >> 1)
> +               async_push_batch = async_queue_size >> 1;
> +
> +       /* Set boundaries of rules for each core. */
> +       if (core_id)
> +               start_counter = core_id * rules_count_per_core;
> +       end_counter = (core_id + 1) * rules_count_per_core;
> +
> +       cpu_time_used = 0;
> +       flow_index = 0;
> +       push_counter[core_id] = 0;
> +
> +       if (flow_group > 0 && core_id == 0) {
> +               /*
> +                * Create global rule to jump into flow_group,
> +                * this way the app will avoid the default rules.
> +                *
> +                * This rule will be created only once.
> +                *
> +                * Global rule:
> +                * group 0 eth / end actions jump group <flow_group>
> +                */
> +
> +               uint64_t global_items[MAX_ITEMS_NUM] = {0};
> +               uint64_t global_actions[MAX_ACTIONS_NUM] = {0};
> +               global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
> +               global_actions[0] =
> FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
> +               flow = generate_flow(port_id, 0, flow_attrs, global_items,
> global_actions,
> +                                    flow_group, 0, 0, 0, 0, dst_port_id,
> core_id, rx_queues_count,
> +                                    unique_data, max_priority, &error);
> +
> +               if (flow == NULL) {
> +                       print_flow_error(error);
> +                       rte_exit(EXIT_FAILURE, "Error in creating flow\n");
> +               }
> +               flows_list[flow_index++] = flow;
> +       }
> +
> +       start_batch = rte_get_timer_cycles();
> +       for (counter = start_counter; counter < end_counter;) {
> +               /* batch adding flow rules, this avoids unnecessary checks
> for push/pull */
> +               for (batch_counter = 0; batch_counter < async_push_batch
> && counter < end_counter;
> +                    batch_counter++, counter++) {
> +                       /* Create flow with postpone=true to batch
> operations */
> +                       flow = async_generate_flow(port_id, queue_id,
> counter, hairpin_queues_num,
> +                                                  encap_data, decap_data,
> dst_port_id, core_id,
> +                                                  rx_queues_count,
> unique_data, true, &error);
> +
> +                       if (!flow) {
> +                               print_flow_error(error);
> +                               rte_exit(EXIT_FAILURE, "Error in creating
> async flow\n");
> +                       }
> +
> +                       if (force_quit)
> +                               break;
> +
> +                       flows_list[flow_index++] = flow;
> +                       enqueued++;
> +
> +                       /*
> +                        * Save the insertion rate for rules batch.
> +                        * Check if the insertion reached the rules
> +                        * patch counter, then save the insertion rate
> +                        * for this batch.
> +                        */
> +                       if (!((counter + 1) % rules_batch)) {
> +                               end_batch = rte_get_timer_cycles();
> +                               delta = (double)(end_batch - start_batch);
> +                               rules_batch_idx = ((counter + 1) /
> rules_batch) - 1;
> +                               cpu_time_per_batch[rules_batch_idx] =
> delta / rte_get_timer_hz();
> +                               cpu_time_used +=
> cpu_time_per_batch[rules_batch_idx];
> +                               start_batch = rte_get_timer_cycles();
> +                       }
> +               }
> +
> +               if ((pulled = push_pull_flows_async(port_id, queue_id,
> core_id, enqueued, false,
> +                                                   true, &error)) < 0) {
> +                       print_flow_error(error);
> +                       rte_exit(EXIT_FAILURE, "Error push/pull async
> operations\n");
> +               }
> +
> +               enqueued -= pulled;
> +
> +               if (first_batch) {
> +                       first_flow_latency =
> (double)(rte_get_timer_cycles() - start_batch);
> +                       first_flow_latency /= rte_get_timer_hz();
> +                       /* In millisecond */
> +                       first_flow_latency *= 1000;
> +                       printf(":: First Flow Batch Latency (Async) ::
> Port %d :: First batch (%u) "
> +                              "installed in %f milliseconds\n",
> +                              port_id, async_push_batch,
> first_flow_latency);
> +                       first_batch = false;
> +               }
> +       }
> +
> +       if (push_pull_flows_async(port_id, queue_id, core_id, enqueued,
> true, true, &error) < 0) {
> +               print_flow_error(error);
> +               rte_exit(EXIT_FAILURE, "Error final push/pull async
> operations\n");
> +       }
> +
> +       /* Print insertion rates for all batches */
> +       if (dump_iterations)
> +               print_rules_batches(cpu_time_per_batch);
> +
> +       printf(":: Port %d :: Core %d boundaries (Async) :: start @[%d] -
> end @[%d]\n", port_id,
> +              core_id, start_counter, end_counter - 1);
> +
> +       /* Insertion rate for all rules in one core */
> +       if (cpu_time_used > 0) {
> +               insertion_rate = ((double)rules_count_per_core /
> cpu_time_used) / 1000;
> +               printf(":: Port %d :: Core %d :: Async rules insertion
> rate -> %f K Rule/Sec\n",
> +                      port_id, core_id, insertion_rate);
> +       }
> +       printf(":: Port %d :: Core %d :: The time for creating %d async
> rules is %f seconds\n",
> +              port_id, core_id, rules_count_per_core, cpu_time_used);
> +
> +       mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
>  }
>
>  static void
> @@ -1585,12 +1831,18 @@ flows_handler(uint8_t core_id)
>         uint16_t port_idx = 0;
>         uint16_t nr_ports;
>         int port_id;
> +       int rules_count_per_core;
>
>         nr_ports = rte_eth_dev_count_avail();
>
>         if (rules_batch > rules_count)
>                 rules_batch = rules_count;
>
> +       rules_count_per_core = rules_count / mc_pool.cores_count;
> +       flows_list = malloc(sizeof(struct rte_flow *) *
> (rules_count_per_core + 1));
> +       if (flows_list == NULL)
> +               rte_exit(EXIT_FAILURE, "No Memory available!\n");
> +
>         printf(":: Rules Count per port: %d\n\n", rules_count);
>
>         for (port_id = 0; port_id < nr_ports; port_id++) {
> @@ -1602,10 +1854,10 @@ flows_handler(uint8_t core_id)
>                 mc_pool.last_alloc[core_id] =
> (int64_t)dump_socket_mem(stdout);
>                 if (has_meter())
>                         meters_handler(port_id, core_id, METER_CREATE);
> -               flows_list = insert_flows(port_id, core_id,
> -                                               dst_ports[port_idx++]);
> -               if (flows_list == NULL)
> -                       rte_exit(EXIT_FAILURE, "Error: Insertion
> Failed!\n");
> +               if (async_mode)
> +                       insert_flows_async(port_id, core_id,
> dst_ports[port_idx++], flows_list);
> +               else
> +                       insert_flows(port_id, core_id,
> dst_ports[port_idx++], flows_list);
>                 mc_pool.current_alloc[core_id] =
> (int64_t)dump_socket_mem(stdout);
>
>                 if (query_flag)
> @@ -2212,6 +2464,16 @@ init_port(void)
>                         }
>                 }
>
> +               /* Configure async flow engine before device start */
> +               if (async_mode) {
> +                       ret = async_flow_init_port(port_id,
> mc_pool.cores_count, async_queue_size,
> +                                                  flow_items,
> flow_actions, flow_attrs, flow_group,
> +                                                  rules_count);
> +                       if (ret != 0)
> +                               rte_exit(EXIT_FAILURE, "Failed to init
> async flow on port %d\n",
> +                                        port_id);
> +               }
> +
>                 ret = rte_eth_dev_start(port_id);
>                 if (ret < 0)
>                         rte_exit(EXIT_FAILURE,
> @@ -2291,6 +2553,8 @@ main(int argc, char **argv)
>
>         RTE_ETH_FOREACH_DEV(port) {
>                 rte_flow_flush(port, &error);
> +               if (async_mode)
> +                       async_flow_cleanup_port(port);
>                 if (rte_eth_dev_stop(port) != 0)
>                         printf("Failed to stop device on port %u\n", port);
>                 rte_eth_dev_close(port);
> diff --git a/app/test-flow-perf/meson.build
> b/app/test-flow-perf/meson.build
> index e101449e32..2f820a7597 100644
> --- a/app/test-flow-perf/meson.build
> +++ b/app/test-flow-perf/meson.build
> @@ -3,6 +3,7 @@
>
>  sources = files(
>          'actions_gen.c',
> +        'async_flow.c',
>          'flow_gen.c',
>          'items_gen.c',
>          'main.c',
> --
> 2.43.0
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mails.dpdk.org/archives/dev/attachments/20260302/9923b32b/attachment-0001.htm>


More information about the dev mailing list