[dpdk-dev] [PATCH v3 1/3] net/af_xdp: allow bigger batch sizes

Ciara Loftus ciara.loftus at intel.com
Wed Mar 10 08:48:14 CET 2021


Prior to this commit, the maximum batch sizes for zero-copy and copy-mode
rx and copy-mode tx were set to 32. Apart from zero-copy tx, the user
could never rx/tx any more than 32 packets at a time and without inspecting
the code the user wouldn't be aware of this.

This commit removes these upper limits placed on the user and instead
sets an internal batch size equal to the default ring size (2048). Batches
larger than this are still processed, however they are split into smaller
batches similar to how it's done in other drivers. This is necessary
because some arrays used during rx/tx need to be sized at compile-time.

Allowing a larger batch size allows for fewer batches and thus larger bulk
operations, fewer ring accesses and fewer syscalls which should yield
improved performance.

Signed-off-by: Ciara Loftus <ciara.loftus at intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 67 ++++++++++++++++++++++++-----
 1 file changed, 57 insertions(+), 10 deletions(-)

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 3957227bf0..be524e4784 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -66,8 +66,8 @@ RTE_LOG_REGISTER(af_xdp_logtype, pmd.net.af_xdp, NOTICE);
 #define ETH_AF_XDP_DFLT_START_QUEUE_IDX	0
 #define ETH_AF_XDP_DFLT_QUEUE_COUNT	1
 
-#define ETH_AF_XDP_RX_BATCH_SIZE	32
-#define ETH_AF_XDP_TX_BATCH_SIZE	32
+#define ETH_AF_XDP_RX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
+#define ETH_AF_XDP_TX_BATCH_SIZE	XSK_RING_CONS__DEFAULT_NUM_DESCS
 
 
 struct xsk_umem_info {
@@ -329,8 +329,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
 	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
-		(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE,
-					 NULL, fq);
+		(void)reserve_fill_queue(umem, nb_pkts, NULL, fq);
 
 	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
 	if (nb_pkts == 0) {
@@ -379,10 +378,8 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 
 static uint16_t
-eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
-	nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
-
 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
 	return af_xdp_rx_zc(queue, bufs, nb_pkts);
 #else
@@ -390,6 +387,32 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 }
 
+static uint16_t
+eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+	uint16_t nb_rx;
+
+	if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE))
+		return af_xdp_rx(queue, bufs, nb_pkts);
+
+	/* Split larger batch into smaller batches of size
+	 * ETH_AF_XDP_RX_BATCH_SIZE or less.
+	 */
+	nb_rx = 0;
+	while (nb_pkts) {
+		uint16_t ret, n;
+
+		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
+		ret = af_xdp_rx(queue, &bufs[nb_rx], n);
+		nb_rx = (uint16_t)(nb_rx + ret);
+		nb_pkts = (uint16_t)(nb_pkts - ret);
+		if (ret < n)
+			break;
+	}
+
+	return nb_rx;
+}
+
 static void
 pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq)
 {
@@ -535,8 +558,6 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	uint32_t idx_tx;
 	struct xsk_ring_cons *cq = &txq->pair->cq;
 
-	nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
-
 	pull_umem_cq(umem, nb_pkts, cq);
 
 	nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
@@ -575,6 +596,32 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	return nb_pkts;
 }
+
+static uint16_t
+af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+	uint16_t nb_tx;
+
+	if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE))
+		return af_xdp_tx_cp(queue, bufs, nb_pkts);
+
+	nb_tx = 0;
+	while (nb_pkts) {
+		uint16_t ret, n;
+
+		/* Split larger batch into smaller batches of size
+		 * ETH_AF_XDP_TX_BATCH_SIZE or less.
+		 */
+		n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
+		ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n);
+		nb_tx = (uint16_t)(nb_tx + ret);
+		nb_pkts = (uint16_t)(nb_pkts - ret);
+		if (ret < n)
+			break;
+	}
+
+	return nb_tx;
+}
 #endif
 
 static uint16_t
@@ -583,7 +630,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
 	return af_xdp_tx_zc(queue, bufs, nb_pkts);
 #else
-	return af_xdp_tx_cp(queue, bufs, nb_pkts);
+	return af_xdp_tx_cp_batch(queue, bufs, nb_pkts);
 #endif
 }
 
-- 
2.17.1



More information about the dev mailing list