[dpdk-dev] [PATCH][v2] net/af_xdp: avoid to unnecessary allocation and free mbuf in rx path
Loftus, Ciara
ciara.loftus at intel.com
Thu Oct 1 18:24:18 CEST 2020
>
> when receive packets, the max bunch number of mbuf are allocated
> if hardware does not receive the max bunch number packets, it
> will free redundancy mbuf, that is low-performance
>
> so optimize rx performance, by allocating number of mbuf based on
> result of xsk_ring_cons__peek, to avoid to redundancy allocation,
> and free mbuf when receive packets
Hi,
Thanks for the patch and fixing the issue I raised.
With my testing so far I haven't measured an improvement in performance with the patch.
Do you have data to share which shows the benefit of your patch?
I agree the potential excess allocation of mbufs for the fill ring is not the most optimal, but if doing it does not significantly impact the performance I would be in favour of keeping that approach versus touching the cached_cons outside of libbpf which is unconventional.
If a benefit can be shown and we proceed with the approach, I would suggest creating a new function for the cached consumer rollback eg. xsk_ring_cons_cancel() or similar, and add a comment describing what it does.
Thanks,
Ciara
>
> V2: rollback rx cached_cons if mbuf failed to be allocated
>
> Signed-off-by: Li RongQing <lirongqing at baidu.com>
> Signed-off-by: Dongsheng Rong <rongdongsheng at baidu.com>
> ---
> drivers/net/af_xdp/rte_eth_af_xdp.c | 67 ++++++++++++++++---------------
> ------
> 1 file changed, 29 insertions(+), 38 deletions(-)
>
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 01f462b46..e04fa43f6 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -251,28 +251,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> struct xsk_umem_info *umem = rxq->umem;
> uint32_t idx_rx = 0;
> unsigned long rx_bytes = 0;
> - int rcvd, i;
> + int i;
> struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
>
> - /* allocate bufs for fill queue replenishment after rx */
> - if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> - AF_XDP_LOG(DEBUG,
> - "Failed to get enough buffers for fq.\n");
> - return 0;
> - }
> + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
>
> - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> -
> - if (rcvd == 0) {
> + if (nb_pkts == 0) {
> #if defined(XDP_USE_NEED_WAKEUP)
> if (xsk_ring_prod__needs_wakeup(fq))
> (void)poll(rxq->fds, 1, 1000);
> #endif
>
> - goto out;
> + return 0;
> + }
> +
> + /* allocate bufs for fill queue replenishment after rx */
> + if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> + AF_XDP_LOG(DEBUG,
> + "Failed to get enough buffers for fq.\n");
> + rx->cached_cons -= nb_pkts;
> + return 0;
> }
>
> - for (i = 0; i < rcvd; i++) {
> + for (i = 0; i < nb_pkts; i++) {
> const struct xdp_desc *desc;
> uint64_t addr;
> uint32_t len;
> @@ -297,20 +298,14 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> rx_bytes += len;
> }
>
> - xsk_ring_cons__release(rx, rcvd);
> -
> - (void)reserve_fill_queue(umem, rcvd, fq_bufs, fq);
> + xsk_ring_cons__release(rx, nb_pkts);
> + (void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
>
> /* statistics */
> - rxq->stats.rx_pkts += rcvd;
> + rxq->stats.rx_pkts += nb_pkts;
> rxq->stats.rx_bytes += rx_bytes;
>
> -out:
> - if (rcvd != nb_pkts)
> - rte_mempool_put_bulk(umem->mb_pool, (void
> **)&fq_bufs[rcvd],
> - nb_pkts - rcvd);
> -
> - return rcvd;
> + return nb_pkts;
> }
> #else
> static uint16_t
> @@ -322,7 +317,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
> struct xsk_ring_prod *fq = &rxq->fq;
> uint32_t idx_rx = 0;
> unsigned long rx_bytes = 0;
> - int rcvd, i;
> + int i;
> uint32_t free_thresh = fq->size >> 1;
> struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
>
> @@ -330,20 +325,21 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> (void)reserve_fill_queue(umem,
> ETH_AF_XDP_RX_BATCH_SIZE,
> NULL, fq);
>
> - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> - return 0;
> -
> - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> - if (rcvd == 0) {
> + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> + if (nb_pkts == 0) {
> #if defined(XDP_USE_NEED_WAKEUP)
> if (xsk_ring_prod__needs_wakeup(fq))
> (void)poll(rxq->fds, 1, 1000);
> #endif
> + return 0;
> + }
>
> - goto out;
> + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs,
> nb_pkts))) {
> + rx->cached_cons -= nb_pkts;
> + return 0;
> }
>
> - for (i = 0; i < rcvd; i++) {
> + for (i = 0; i < nb_pkts; i++) {
> const struct xdp_desc *desc;
> uint64_t addr;
> uint32_t len;
> @@ -362,18 +358,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> bufs[i] = mbufs[i];
> }
>
> - xsk_ring_cons__release(rx, rcvd);
> + xsk_ring_cons__release(rx, nb_pkts);
>
> /* statistics */
> - rxq->stats.rx_pkts += rcvd;
> + rxq->stats.rx_pkts += nb_pkts;
> rxq->stats.rx_bytes += rx_bytes;
>
> -out:
> - if (rcvd != nb_pkts)
> - rte_mempool_put_bulk(rxq->mb_pool, (void
> **)&mbufs[rcvd],
> - nb_pkts - rcvd);
> -
> - return rcvd;
> + return nb_pkts;
> }
> #endif
>
> --
> 2.16.2
More information about the dev
mailing list