[dpdk-dev] [PATCH 2/2] af_xdp: avoid to unnecessary allocation and free mbuf

Loftus, Ciara ciara.loftus at intel.com
Fri Sep 18 11:38:32 CEST 2020


> 
> optimize rx performance, by allocating mbuf based on result
> of xsk_ring_cons__peek, to avoid to redundancy allocation,
> and free mbuf when receive packets
> 
> Signed-off-by: Li RongQing <lirongqing at baidu.com>
> Signed-off-by: Dongsheng Rong <rongdongsheng at baidu.com>
> ---
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 64 ++++++++++++++++---------------
> ------
>  1 file changed, 27 insertions(+), 37 deletions(-)
> 
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 7ce4ad04a..48824050e 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	struct xsk_umem_info *umem = rxq->umem;
>  	uint32_t idx_rx = 0;
>  	unsigned long rx_bytes = 0;
> -	int rcvd, i;
> +	int i;
>  	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> -	/* allocate bufs for fill queue replenishment after rx */
> -	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> -		AF_XDP_LOG(DEBUG,
> -			"Failed to get enough buffers for fq.\n");
> -		return 0;
> -	}
> 
> -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> 
> -	if (rcvd == 0) {
> +	if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>  		if (xsk_ring_prod__needs_wakeup(&umem->fq))
>  			(void)poll(rxq->fds, 1, 1000);
>  #endif
> 
> -		goto out;
> +		return 0;
>  	}
> 
> -	for (i = 0; i < rcvd; i++) {
> +	/* allocate bufs for fill queue replenishment after rx */
> +	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> +		AF_XDP_LOG(DEBUG,
> +			"Failed to get enough buffers for fq.\n");

Thanks for this patch. I've considered this in the past.
There is a problem if we hit this condition.
We advance the rx producer @ xsk_ring_cons__peek.
But if we have no mbufs to hold the rx data, it is lost.
That's why we allocate the mbufs up front now.
Agree that we might have wasteful allocations and it's not the most optimal, but we don't drop packets due to failed mbuf allocs.

> +		return 0;
> +	}
> +
> +	for (i = 0; i < nb_pkts; i++) {
>  		const struct xdp_desc *desc;
>  		uint64_t addr;
>  		uint32_t len;
> @@ -275,20 +276,15 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  		rx_bytes += len;
>  	}
> 
> -	xsk_ring_cons__release(rx, rcvd);
> +	xsk_ring_cons__release(rx, nb_pkts);
> 
> -	(void)reserve_fill_queue(umem, rcvd, fq_bufs);
> +	(void)reserve_fill_queue(umem, nb_pkts, fq_bufs);
> 
>  	/* statistics */
> -	rxq->stats.rx_pkts += rcvd;
> +	rxq->stats.rx_pkts += nb_pkts;
>  	rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -	if (rcvd != nb_pkts)
> -		rte_mempool_put_bulk(umem->mb_pool, (void
> **)&fq_bufs[rcvd],
> -				     nb_pkts - rcvd);
> -
> -	return rcvd;
> +	return nb_pkts;
>  }
>  #else
>  static uint16_t
> @@ -300,27 +296,26 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	struct xsk_ring_prod *fq = &umem->fq;
>  	uint32_t idx_rx = 0;
>  	unsigned long rx_bytes = 0;
> -	int rcvd, i;
> +	int i;
>  	uint32_t free_thresh = fq->size >> 1;
>  	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> -	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> -		return 0;
> -
> -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> -	if (rcvd == 0) {
> +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> +	if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>  		if (xsk_ring_prod__needs_wakeup(fq))
>  			(void)poll(rxq->fds, 1, 1000);
>  #endif
> -
> -		goto out;
> +		return 0;
>  	}
> 
> +	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> +		return 0;
> +
>  	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
>  		(void)reserve_fill_queue(umem,
> ETH_AF_XDP_RX_BATCH_SIZE, NULL);
> 
> -	for (i = 0; i < rcvd; i++) {
> +	for (i = 0; i < nb_pkts; i++) {
>  		const struct xdp_desc *desc;
>  		uint64_t addr;
>  		uint32_t len;
> @@ -339,18 +334,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  		bufs[i] = mbufs[i];
>  	}
> 
> -	xsk_ring_cons__release(rx, rcvd);
> +	xsk_ring_cons__release(rx, nb_pkts);
> 
>  	/* statistics */
> -	rxq->stats.rx_pkts += rcvd;
> +	rxq->stats.rx_pkts += nb_pkts;
>  	rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -	if (rcvd != nb_pkts)
> -		rte_mempool_put_bulk(rxq->mb_pool, (void
> **)&mbufs[rcvd],
> -				     nb_pkts - rcvd);
> -
> -	return rcvd;
> +	return nb_pkts;
>  }
>  #endif
> 
> --
> 2.16.2



More information about the dev mailing list