[RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing
Robin Jarry
rjarry at redhat.com
Thu Feb 5 10:26:35 CET 2026
Add rte_node_enqueue_deferred() which tracks runs of consecutive
objects going to the same edge and flushes them efficiently in bulk.
When all objects go to the same edge (the common case), the function
uses rte_node_next_stream_move() which swaps pointers instead of
copying data.
The deferred state (run_start and last_edge) is stored in the node
fast-path cache line 1, keeping it close to other frequently accessed
node data. The last_edge is preserved across node invocations,
allowing speculation: if traffic continues to the same destination,
no action is needed until the edge changes.
The flush is performed automatically at the end of node processing
by __rte_node_process().
Signed-off-by: Robin Jarry <rjarry at redhat.com>
---
lib/graph/graph_populate.c | 1 +
lib/graph/rte_graph_worker_common.h | 75 +++++++++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git a/lib/graph/graph_populate.c b/lib/graph/graph_populate.c
index 026daecb2122..fda46a7dd386 100644
--- a/lib/graph/graph_populate.c
+++ b/lib/graph/graph_populate.c
@@ -84,6 +84,7 @@ graph_nodes_populate(struct graph *_graph)
struct rte_node *node = RTE_PTR_ADD(graph, off);
memset(node, 0, sizeof(*node));
node->fence = RTE_GRAPH_FENCE;
+ node->deferred_last_edge = RTE_EDGE_ID_INVALID;
node->off = off;
if (graph_pcap_is_enable()) {
node->process = graph_pcap_dispatch;
diff --git a/lib/graph/rte_graph_worker_common.h b/lib/graph/rte_graph_worker_common.h
index 7fda67c07169..c6741d44877c 100644
--- a/lib/graph/rte_graph_worker_common.h
+++ b/lib/graph/rte_graph_worker_common.h
@@ -119,6 +119,8 @@ struct __rte_cache_aligned rte_node {
/** Fast path area cache line 1. */
alignas(RTE_CACHE_LINE_MIN_SIZE)
rte_graph_off_t xstat_off; /**< Offset to xstat counters. */
+ uint16_t deferred_run_start; /**< Used by rte_node_enqueue_deferred(). */
+ rte_edge_t deferred_last_edge; /**< Used by rte_node_enqueue_deferred(). */
/** Fast path area cache line 2. */
__extension__ struct __rte_cache_aligned {
@@ -184,6 +186,8 @@ void __rte_node_stream_alloc_size(struct rte_graph *graph,
/* Fast path helper functions */
+static inline void __rte_node_enqueue_deferred_flush(struct rte_graph *, struct rte_node *);
+
/**
* @internal
*
@@ -204,6 +208,8 @@ __rte_node_process(struct rte_graph *graph, struct rte_node *node)
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
objs = node->objs;
rte_prefetch0(objs);
+ node->deferred_run_start = 0;
+ /* Keep deferred_last_edge from previous invocation for speculation */
if (rte_graph_has_stats_feature()) {
start = rte_rdtsc();
@@ -214,6 +220,10 @@ __rte_node_process(struct rte_graph *graph, struct rte_node *node)
} else {
node->process(graph, node, objs, node->idx);
}
+
+ if (node->deferred_last_edge != RTE_EDGE_ID_INVALID)
+ __rte_node_enqueue_deferred_flush(graph, node);
+
node->idx = 0;
}
@@ -412,6 +422,8 @@ rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
node->idx = idx;
}
+static inline void rte_node_next_stream_move(struct rte_graph *, struct rte_node *, rte_edge_t);
+
/**
* Enqueue objs to multiple next nodes for further processing and
* set the next nodes to pending state in the circular buffer.
@@ -547,6 +559,69 @@ rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
}
}
+/**
+ * Enqueue objects to a next node in a cache-efficient deferred manner.
+ *
+ * This function tracks runs of objects going to the same edge. When the edge
+ * changes, the previous run is flushed using bulk enqueue. At the end of node
+ * processing, any remaining objects are flushed automatically. When all
+ * objects go to the same edge (the common case), rte_node_next_stream_move()
+ * is used which swaps pointers instead of copying.
+ *
+ * The function does not require consecutive idx values. It can be called with
+ * any stride (e.g., 0, 4, 8, ... to process batches of 4). All objects from
+ * the previous idx up to the current one are considered part of the current
+ * run until the edge changes.
+ *
+ * For homogeneous traffic, the destination node structure is touched once
+ * per batch instead of once per object, reducing cache line bouncing.
+ *
+ * @param graph
+ * Graph pointer returned from rte_graph_lookup().
+ * @param node
+ * Current node pointer.
+ * @param next
+ * Next node edge index.
+ * @param idx
+ * Index of the current object being processed in node->objs[].
+ *
+ * @see rte_node_next_stream_move().
+ */
+static inline void
+rte_node_enqueue_deferred(struct rte_graph *graph, struct rte_node *node,
+ rte_edge_t next, uint16_t idx)
+{
+ if (next != node->deferred_last_edge) {
+ /* edge changed, flush previous run if not empty */
+ if (idx > node->deferred_run_start)
+ rte_node_enqueue(graph, node, node->deferred_last_edge,
+ &node->objs[node->deferred_run_start],
+ idx - node->deferred_run_start);
+ node->deferred_run_start = idx;
+ node->deferred_last_edge = next;
+ }
+}
+
+/**
+ * @internal
+ * Flush any pending deferred enqueue at end of node processing.
+ */
+static inline void
+__rte_node_enqueue_deferred_flush(struct rte_graph *graph, struct rte_node *node)
+{
+ const uint16_t run_start = node->deferred_run_start;
+ const uint16_t count = node->idx;
+
+ if (run_start == 0 && count != 0) {
+ /* All packets went to the same edge - use stream move (pointer swap) */
+ rte_node_next_stream_move(graph, node, node->deferred_last_edge);
+ } else if (run_start < count) {
+ /* flush final run */
+ rte_node_enqueue(graph, node, node->deferred_last_edge,
+ &node->objs[run_start], count - run_start);
+ }
+}
+
/**
* Test the validity of model.
*
--
2.52.0
More information about the dev
mailing list