[dpdk-dev] [PATCH 4/8] net/mlx4: optimize Tx multi-segment case

Matan Azrad matan at mellanox.com
Wed Dec 6 12:29:38 CET 2017


Hi Adrien

> -----Original Message-----
> From: Adrien Mazarguil [mailto:adrien.mazarguil at 6wind.com]
> Sent: Wednesday, December 6, 2017 12:59 PM
> To: Matan Azrad <matan at mellanox.com>
> Cc: dev at dpdk.org
> Subject: Re: [PATCH 4/8] net/mlx4: optimize Tx multi-segment case
> 
> On Tue, Nov 28, 2017 at 12:19:26PM +0000, Matan Azrad wrote:
> > mlx4 Tx block can handle up to 4 data segments or control segment + up
> > to 3 data segments. The first data segment in each not first Tx block
> > must validate Tx queue wraparound and must use IO memory barrier
> > before writing the byte count.
> >
> > The previous multi-segment code used "for" loop to iterate over all
> > packet segments and separated first Tx block data case by "if"
> > statments.
> 
> statments => statements
> 
> >
> > Use switch case and unconditional branches instead of "for" loop can
> > optimize the case and prevents the unnecessary checks for each data
> > segment; This hints to compiler to create opitimized jump table.
> 
> opitimized => optimized
> 
> >
> > Optimize this case by switch case and unconditional branches usage.
> >
> > Signed-off-by: Matan Azrad <matan at mellanox.com>
> > ---
> >  drivers/net/mlx4/mlx4_rxtx.c | 165
> > +++++++++++++++++++++++++------------------
> >  drivers/net/mlx4/mlx4_rxtx.h |  33 +++++++++
> >  2 files changed, 128 insertions(+), 70 deletions(-)
> >
> > diff --git a/drivers/net/mlx4/mlx4_rxtx.c
> > b/drivers/net/mlx4/mlx4_rxtx.c index 1d8240a..b9cb2fc 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > +++ b/drivers/net/mlx4/mlx4_rxtx.c
> > @@ -432,15 +432,14 @@ struct pv {
> >  	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >  	volatile struct mlx4_wqe_ctrl_seg *ctrl;
> >  	volatile struct mlx4_wqe_data_seg *dseg;
> > -	struct rte_mbuf *sbuf;
> > +	struct rte_mbuf *sbuf = buf;
> >  	uint32_t lkey;
> > -	uintptr_t addr;
> > -	uint32_t byte_count;
> >  	int pv_counter = 0;
> > +	int nb_segs = buf->nb_segs;
> >
> >  	/* Calculate the needed work queue entry size for this packet. */
> >  	wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> > -		buf->nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> > +		nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> >  	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> >  	/*
> >  	 * Check that there is room for this WQE in the send queue and that
> > @@ -457,67 +456,99 @@ struct pv {
> >  	dseg = (volatile struct mlx4_wqe_data_seg *)
> >  			((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
> >  	*pctrl = ctrl;
> > -	/* Fill the data segments with buffer information. */
> > -	for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> > -		addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> > -		rte_prefetch0((volatile void *)addr);
> > -		/* Memory region key (big endian) for this memory pool. */
> > +	/*
> > +	 * Fill the data segments with buffer information.
> > +	 * First WQE TXBB head segment is always control segment,
> > +	 * so jump to tail TXBB data segments code for the first
> > +	 * WQE data segments filling.
> > +	 */
> > +	goto txbb_tail_segs;
> > +txbb_head_seg:
> 
> I'm not fundamentally opposed to "goto" unlike a lot of people out there,
> but this doesn't look good. It's OK to use goto for error cases and to extricate
> yourself when trapped in an inner loop, also in some optimization scenarios
> where it sometimes make sense, but not when the same can be achieved
> through standard loop constructs and keywords.
> 
> In this case I'm under the impression you could have managed with a do { ... }
> while (...) construct. You need to try harder to reorganize these changes or
> prove it can't be done without negatively impacting performance.
> 
> Doing so should make this patch shorter as well.
> 

I noticed this could be done with loop and without unconditional branches, but I checked it and found nice performance improvement using that way.
When I used the loop I degraded performance.  
 
> > +	/* Memory region key (big endian) for this memory pool. */
> > +	lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +	if (unlikely(lkey == (uint32_t)-1)) {
> > +		DEBUG("%p: unable to get MP <-> MR association",
> > +		      (void *)txq);
> > +		return -1;
> > +	}
> > +	/* Handle WQE wraparound. */
> > +	if (dseg >=
> > +		(volatile struct mlx4_wqe_data_seg *)sq->eob)
> > +		dseg = (volatile struct mlx4_wqe_data_seg *)
> > +			sq->buf;
> > +	dseg->addr = rte_cpu_to_be_64(rte_pktmbuf_mtod(sbuf,
> uintptr_t));
> > +	dseg->lkey = rte_cpu_to_be_32(lkey);
> > +	/*
> > +	 * This data segment starts at the beginning of a new
> > +	 * TXBB, so we need to postpone its byte_count writing
> > +	 * for later.
> > +	 */
> > +	pv[pv_counter].dseg = dseg;
> > +	/*
> > +	 * Zero length segment is treated as inline segment
> > +	 * with zero data.
> > +	 */
> > +	pv[pv_counter++].val = rte_cpu_to_be_32(sbuf->data_len ?
> > +						sbuf->data_len :
> 0x80000000);
> > +	sbuf = sbuf->next;
> > +	dseg++;
> > +	nb_segs--;
> > +txbb_tail_segs:
> > +	/* Jump to default if there are more than two segments remaining.
> */
> > +	switch (nb_segs) {
> > +	default:
> >  		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > -		dseg->lkey = rte_cpu_to_be_32(lkey);
> > -		/* Calculate the needed work queue entry size for this
> packet */
> > -		if (unlikely(lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> > -			/* MR does not exist. */
> > +		if (unlikely(lkey == (uint32_t)-1)) {
> >  			DEBUG("%p: unable to get MP <-> MR association",
> >  			      (void *)txq);
> >  			return -1;
> >  		}
> > -		if (likely(sbuf->data_len)) {
> > -			byte_count = rte_cpu_to_be_32(sbuf->data_len);
> > -		} else {
> > -			/*
> > -			 * Zero length segment is treated as inline segment
> > -			 * with zero data.
> > -			 */
> > -			byte_count = RTE_BE32(0x80000000);
> > +		mlx4_fill_tx_data_seg(dseg, lkey,
> > +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +				      rte_cpu_to_be_32(sbuf->data_len ?
> > +						       sbuf->data_len :
> > +						       0x80000000));
> > +		sbuf = sbuf->next;
> > +		dseg++;
> > +		nb_segs--;
> > +		/* fallthrough */
> > +	case 2:
> > +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +		if (unlikely(lkey == (uint32_t)-1)) {
> > +			DEBUG("%p: unable to get MP <-> MR association",
> > +			      (void *)txq);
> > +			return -1;
> >  		}
> > -		/*
> > -		 * If the data segment is not at the beginning of a
> > -		 * Tx basic block (TXBB) then write the byte count,
> > -		 * else postpone the writing to just before updating the
> > -		 * control segment.
> > -		 */
> > -		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> > -			dseg->addr = rte_cpu_to_be_64(addr);
> > -			dseg->lkey = rte_cpu_to_be_32(lkey);
> > -#if RTE_CACHE_LINE_SIZE < 64
> > -			/*
> > -			 * Need a barrier here before writing the byte_count
> > -			 * fields to make sure that all the data is visible
> > -			 * before the byte_count field is set.
> > -			 * Otherwise, if the segment begins a new cacheline,
> > -			 * the HCA prefetcher could grab the 64-byte chunk
> and
> > -			 * get a valid (!= 0xffffffff) byte count but stale
> > -			 * data, and end up sending the wrong data.
> > -			 */
> > -			rte_io_wmb();
> > -#endif /* RTE_CACHE_LINE_SIZE */
> > -			dseg->byte_count = byte_count;
> > -		} else {
> > -			/*
> > -			 * This data segment starts at the beginning of a new
> > -			 * TXBB, so we need to postpone its byte_count
> writing
> > -			 * for later.
> > -			 */
> > -			/* Handle WQE wraparound. */
> > -			if (dseg >=
> > -			    (volatile struct mlx4_wqe_data_seg *)sq->eob)
> > -				dseg = (volatile struct mlx4_wqe_data_seg *)
> > -					sq->buf;
> > -			dseg->addr = rte_cpu_to_be_64(addr);
> > -			dseg->lkey = rte_cpu_to_be_32(lkey);
> > -			pv[pv_counter].dseg = dseg;
> > -			pv[pv_counter++].val = byte_count;
> > +		mlx4_fill_tx_data_seg(dseg, lkey,
> > +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +				      rte_cpu_to_be_32(sbuf->data_len ?
> > +						       sbuf->data_len :
> > +						       0x80000000));
> > +		sbuf = sbuf->next;
> > +		dseg++;
> > +		nb_segs--;
> > +		/* fallthrough */
> > +	case 1:
> > +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +		if (unlikely(lkey == (uint32_t)-1)) {
> > +			DEBUG("%p: unable to get MP <-> MR association",
> > +			      (void *)txq);
> > +			return -1;
> > +		}
> > +		mlx4_fill_tx_data_seg(dseg, lkey,
> > +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +				      rte_cpu_to_be_32(sbuf->data_len ?
> > +						       sbuf->data_len :
> > +						       0x80000000));
> > +		nb_segs--;
> > +		if (nb_segs) {
> > +			sbuf = sbuf->next;
> > +			dseg++;
> > +			goto txbb_head_seg;
> >  		}
> > +		/* fallthrough */
> > +	case 0:
> > +		break;
> >  	}
> 
> I think this "switch (nb_segs)" idea is an interesting approach, but should
> occur inside a loop construct as previously described.
> 

Same comment as above.

> >  	/* Write the first DWORD of each TXBB save earlier. */
> >  	if (pv_counter) {
> > @@ -583,7 +614,6 @@ struct pv {
> >  		} srcrb;
> >  		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >  		uint32_t lkey;
> > -		uintptr_t addr;
> >
> >  		/* Clean up old buffer. */
> >  		if (likely(elt->buf != NULL)) {
> > @@ -618,24 +648,19 @@ struct pv {
> >  			dseg = (volatile struct mlx4_wqe_data_seg *)
> >  					((uintptr_t)ctrl +
> >  					sizeof(struct mlx4_wqe_ctrl_seg));
> > -			addr = rte_pktmbuf_mtod(buf, uintptr_t);
> > -			rte_prefetch0((volatile void *)addr);
> > -			dseg->addr = rte_cpu_to_be_64(addr);
> > -			/* Memory region key (big endian). */
> > +
> > +			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> >  			lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(buf));
> > -			dseg->lkey = rte_cpu_to_be_32(lkey);
> > -			if (unlikely(dseg->lkey ==
> > -				rte_cpu_to_be_32((uint32_t)-1))) {
> > +			if (unlikely(lkey == (uint32_t)-1)) {
> >  				/* MR does not exist. */
> >  				DEBUG("%p: unable to get MP <-> MR
> association",
> >  				      (void *)txq);
> >  				elt->buf = NULL;
> >  				break;
> >  			}
> > -			/* Never be TXBB aligned, no need compiler barrier.
> */
> > -			dseg->byte_count = rte_cpu_to_be_32(buf-
> >data_len);
> > -			/* Fill the control parameters for this packet. */
> > -			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> > +			mlx4_fill_tx_data_seg(dseg, lkey,
> > +					      rte_pktmbuf_mtod(buf,
> uintptr_t),
> > +					      rte_cpu_to_be_32(buf-
> >data_len));
> >  			nr_txbbs = 1;
> >  		} else {
> >  			nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl); diff -
> -git
> > a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h index
> > 463df2b..8207232 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.h
> > +++ b/drivers/net/mlx4/mlx4_rxtx.h
> > @@ -212,4 +212,37 @@ int mlx4_tx_queue_setup(struct rte_eth_dev
> *dev, uint16_t idx,
> >  	return mlx4_txq_add_mr(txq, mp, i);
> >  }
> >
> > +/**
> > + * Write Tx data segment to the SQ.
> > + *
> > + * @param dseg
> > + *   Pointer to data segment in SQ.
> > + * @param lkey
> > + *   Memory region lkey.
> > + * @param addr
> > + *   Data address.
> > + * @param byte_count
> > + *   Big Endian bytes count of the data to send.
> 
> Big Endian => Big endian
> 
> How about using the dedicated type to properly document it?
> See rte_be32_t from rte_byteorder.h.
> 
Learned new something, thanks, will check it.

> > + */
> > +static inline void
> > +mlx4_fill_tx_data_seg(volatile struct mlx4_wqe_data_seg *dseg,
> > +		       uint32_t lkey, uintptr_t addr, uint32_t byte_count) {
> > +	dseg->addr = rte_cpu_to_be_64(addr);
> > +	dseg->lkey = rte_cpu_to_be_32(lkey); #if RTE_CACHE_LINE_SIZE <
> 64
> > +	/*
> > +	 * Need a barrier here before writing the byte_count
> > +	 * fields to make sure that all the data is visible
> > +	 * before the byte_count field is set.
> > +	 * Otherwise, if the segment begins a new cacheline,
> > +	 * the HCA prefetcher could grab the 64-byte chunk and
> > +	 * get a valid (!= 0xffffffff) byte count but stale
> > +	 * data, and end up sending the wrong data.
> > +	 */
> > +	rte_io_wmb();
> > +#endif /* RTE_CACHE_LINE_SIZE */
> > +	dseg->byte_count = byte_count;
> > +}
> > +
> 
> No need to expose this function in a header file. Note that rte_cpu_*() and
> rte_io*() require the inclusion of rte_byteorder.h and rte_atomic.h
> respectively.
> 

Shouldn't inline functions be in header files?

> >  #endif /* MLX4_RXTX_H_ */
> > --
> > 1.8.3.1
> >
> 
> --
> Adrien Mazarguil
> 6WIND


More information about the dev mailing list