[dpdk-dev] [PATCH] net/memif: relax barrier for zero copy path

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Sat Sep 19 00:49:27 CEST 2020


Hi Jakub,
	I am trying to review this patch. I am having difficulty in understanding the implementation for the queue/ring, appreciate if you could help me understand the logic.

1) The S2M queues - are used to send packets from slave to master. My understanding is that, the slave thread would call 'eth_memif_tx_zc' and the master thread would call 'eth_memif_rx_zc'. Is this correct?
2) The M2S queues - are used to send packets from master to slave. Here the slave thread would call 'eth_memif_rx_zc' and the master thread would call 'eth_memif_tx_zc'. Is this correct?

Thank you,
Honnappa

> -----Original Message-----
> From: Phil Yang <phil.yang at arm.com>
> Sent: Friday, September 11, 2020 12:38 AM
> To: jgrajcia at cisco.com; dev at dpdk.org
> Cc: Honnappa Nagarahalli <Honnappa.Nagarahalli at arm.com>; Ruifeng Wang
> <Ruifeng.Wang at arm.com>; nd <nd at arm.com>
> Subject: [PATCH] net/memif: relax barrier for zero copy path
> 
> Using 'rte_mb' to synchronize the shared ring head/tail between producer
> and consumer will stall the pipeline and damage performance on the weak
> memory model platforms, such like aarch64.
> 
> Relax the expensive barrier with c11 atomic with explicit memory ordering
> can improve 3.6% performance on throughput.
> 
> Signed-off-by: Phil Yang <phil.yang at arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> ---
>  drivers/net/memif/rte_eth_memif.c | 35 +++++++++++++++++++++++++------
> ----
>  1 file changed, 25 insertions(+), 10 deletions(-)
> 
> diff --git a/drivers/net/memif/rte_eth_memif.c
> b/drivers/net/memif/rte_eth_memif.c
> index c1c7e9f..a19c0f3 100644
> --- a/drivers/net/memif/rte_eth_memif.c
> +++ b/drivers/net/memif/rte_eth_memif.c
> @@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct
> pmd_process_private *proc_private, struct memif_q
>  	memif_ring_t *ring = memif_get_ring_from_queue(proc_private,
> mq);
> 
>  	/* FIXME: improve performance */
> -	while (mq->last_tail != ring->tail) {
> +	/* The ring->tail acts as a guard variable between Tx and Rx
> +	 * threads, so using load-acquire pairs with store-release
> +	 * to synchronize it between threads.
> +	 */
> +	while (mq->last_tail != __atomic_load_n(&ring->tail,
> +						__ATOMIC_ACQUIRE)) {
>  		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail
> + 1) & mask]);
>  		/* Decrement refcnt and free mbuf. (current segment) */
>  		rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask],
> -1); @@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	mask = ring_size - 1;
> 
>  	cur_slot = mq->last_tail;
> -	last_slot = ring->tail;
> +	/* The ring->tail acts as a guard variable between Tx and Rx
> +	 * threads, so using load-acquire pairs with store-release
> +	 * to synchronize it between threads.
> +	 */
> +	last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
>  	if (cur_slot == last_slot)
>  		goto refill;
>  	n_slots = last_slot - cur_slot;
> @@ -501,7 +510,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> 
>  /* Supply master with new buffers */
>  refill:
> -	head = ring->head;
> +	/* The ring->head acts as a guard variable between Tx and Rx
> +	 * threads, so using load-acquire pairs with store-release
> +	 * to synchronize it between threads.
> +	 */
> +	head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
>  	n_slots = ring_size - head + mq->last_tail;
> 
>  	if (n_slots < 32)
> @@ -526,8 +539,7 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  			(uint8_t *)proc_private->regions[d0->region]->addr;
>  	}
>  no_free_mbufs:
> -	rte_mb();
> -	ring->head = head;
> +	__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
> 
>  	mq->n_pkts += n_rx_pkts;
> 
> @@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	memif_free_stored_mbufs(proc_private, mq);
> 
>  	/* ring type always MEMIF_RING_S2M */
> -	slot = ring->head;
> -	n_free = ring_size - ring->head + mq->last_tail;
> +	/* The ring->head acts as a guard variable between Tx and Rx
> +	 * threads, so using load-acquire pairs with store-release
> +	 * to synchronize it between threads.
> +	 */
> +	slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
> +	n_free = ring_size - slot + mq->last_tail;
> 
>  	int used_slots;
> 
> @@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	}
> 
>  no_free_slots:
> -	rte_mb();
>  	/* update ring pointers */
>  	if (type == MEMIF_RING_S2M)
> -		ring->head = slot;
> +		__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
>  	else
> -		ring->tail = slot;
> +		__atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
> 
>  	/* Send interrupt, if enabled. */
>  	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
> --
> 2.7.4



More information about the dev mailing list