[dpdk-dev] [PATCH 4/8] net/mlx4: optimize Tx multi-segment case
Matan Azrad
matan at mellanox.com
Wed Dec 6 12:29:38 CET 2017
Hi Adrien
> -----Original Message-----
> From: Adrien Mazarguil [mailto:adrien.mazarguil at 6wind.com]
> Sent: Wednesday, December 6, 2017 12:59 PM
> To: Matan Azrad <matan at mellanox.com>
> Cc: dev at dpdk.org
> Subject: Re: [PATCH 4/8] net/mlx4: optimize Tx multi-segment case
>
> On Tue, Nov 28, 2017 at 12:19:26PM +0000, Matan Azrad wrote:
> > mlx4 Tx block can handle up to 4 data segments or control segment + up
> > to 3 data segments. The first data segment in each not first Tx block
> > must validate Tx queue wraparound and must use IO memory barrier
> > before writing the byte count.
> >
> > The previous multi-segment code used "for" loop to iterate over all
> > packet segments and separated first Tx block data case by "if"
> > statments.
>
> statments => statements
>
> >
> > Use switch case and unconditional branches instead of "for" loop can
> > optimize the case and prevents the unnecessary checks for each data
> > segment; This hints to compiler to create opitimized jump table.
>
> opitimized => optimized
>
> >
> > Optimize this case by switch case and unconditional branches usage.
> >
> > Signed-off-by: Matan Azrad <matan at mellanox.com>
> > ---
> > drivers/net/mlx4/mlx4_rxtx.c | 165
> > +++++++++++++++++++++++++------------------
> > drivers/net/mlx4/mlx4_rxtx.h | 33 +++++++++
> > 2 files changed, 128 insertions(+), 70 deletions(-)
> >
> > diff --git a/drivers/net/mlx4/mlx4_rxtx.c
> > b/drivers/net/mlx4/mlx4_rxtx.c index 1d8240a..b9cb2fc 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > +++ b/drivers/net/mlx4/mlx4_rxtx.c
> > @@ -432,15 +432,14 @@ struct pv {
> > uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> > volatile struct mlx4_wqe_ctrl_seg *ctrl;
> > volatile struct mlx4_wqe_data_seg *dseg;
> > - struct rte_mbuf *sbuf;
> > + struct rte_mbuf *sbuf = buf;
> > uint32_t lkey;
> > - uintptr_t addr;
> > - uint32_t byte_count;
> > int pv_counter = 0;
> > + int nb_segs = buf->nb_segs;
> >
> > /* Calculate the needed work queue entry size for this packet. */
> > wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> > - buf->nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> > + nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> > nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> > /*
> > * Check that there is room for this WQE in the send queue and that
> > @@ -457,67 +456,99 @@ struct pv {
> > dseg = (volatile struct mlx4_wqe_data_seg *)
> > ((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
> > *pctrl = ctrl;
> > - /* Fill the data segments with buffer information. */
> > - for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> > - addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> > - rte_prefetch0((volatile void *)addr);
> > - /* Memory region key (big endian) for this memory pool. */
> > + /*
> > + * Fill the data segments with buffer information.
> > + * First WQE TXBB head segment is always control segment,
> > + * so jump to tail TXBB data segments code for the first
> > + * WQE data segments filling.
> > + */
> > + goto txbb_tail_segs;
> > +txbb_head_seg:
>
> I'm not fundamentally opposed to "goto" unlike a lot of people out there,
> but this doesn't look good. It's OK to use goto for error cases and to extricate
> yourself when trapped in an inner loop, also in some optimization scenarios
> where it sometimes make sense, but not when the same can be achieved
> through standard loop constructs and keywords.
>
> In this case I'm under the impression you could have managed with a do { ... }
> while (...) construct. You need to try harder to reorganize these changes or
> prove it can't be done without negatively impacting performance.
>
> Doing so should make this patch shorter as well.
>
I noticed this could be done with loop and without unconditional branches, but I checked it and found nice performance improvement using that way.
When I used the loop I degraded performance.
> > + /* Memory region key (big endian) for this memory pool. */
> > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > + if (unlikely(lkey == (uint32_t)-1)) {
> > + DEBUG("%p: unable to get MP <-> MR association",
> > + (void *)txq);
> > + return -1;
> > + }
> > + /* Handle WQE wraparound. */
> > + if (dseg >=
> > + (volatile struct mlx4_wqe_data_seg *)sq->eob)
> > + dseg = (volatile struct mlx4_wqe_data_seg *)
> > + sq->buf;
> > + dseg->addr = rte_cpu_to_be_64(rte_pktmbuf_mtod(sbuf,
> uintptr_t));
> > + dseg->lkey = rte_cpu_to_be_32(lkey);
> > + /*
> > + * This data segment starts at the beginning of a new
> > + * TXBB, so we need to postpone its byte_count writing
> > + * for later.
> > + */
> > + pv[pv_counter].dseg = dseg;
> > + /*
> > + * Zero length segment is treated as inline segment
> > + * with zero data.
> > + */
> > + pv[pv_counter++].val = rte_cpu_to_be_32(sbuf->data_len ?
> > + sbuf->data_len :
> 0x80000000);
> > + sbuf = sbuf->next;
> > + dseg++;
> > + nb_segs--;
> > +txbb_tail_segs:
> > + /* Jump to default if there are more than two segments remaining.
> */
> > + switch (nb_segs) {
> > + default:
> > lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > - dseg->lkey = rte_cpu_to_be_32(lkey);
> > - /* Calculate the needed work queue entry size for this
> packet */
> > - if (unlikely(lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> > - /* MR does not exist. */
> > + if (unlikely(lkey == (uint32_t)-1)) {
> > DEBUG("%p: unable to get MP <-> MR association",
> > (void *)txq);
> > return -1;
> > }
> > - if (likely(sbuf->data_len)) {
> > - byte_count = rte_cpu_to_be_32(sbuf->data_len);
> > - } else {
> > - /*
> > - * Zero length segment is treated as inline segment
> > - * with zero data.
> > - */
> > - byte_count = RTE_BE32(0x80000000);
> > + mlx4_fill_tx_data_seg(dseg, lkey,
> > + rte_pktmbuf_mtod(sbuf, uintptr_t),
> > + rte_cpu_to_be_32(sbuf->data_len ?
> > + sbuf->data_len :
> > + 0x80000000));
> > + sbuf = sbuf->next;
> > + dseg++;
> > + nb_segs--;
> > + /* fallthrough */
> > + case 2:
> > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > + if (unlikely(lkey == (uint32_t)-1)) {
> > + DEBUG("%p: unable to get MP <-> MR association",
> > + (void *)txq);
> > + return -1;
> > }
> > - /*
> > - * If the data segment is not at the beginning of a
> > - * Tx basic block (TXBB) then write the byte count,
> > - * else postpone the writing to just before updating the
> > - * control segment.
> > - */
> > - if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> > - dseg->addr = rte_cpu_to_be_64(addr);
> > - dseg->lkey = rte_cpu_to_be_32(lkey);
> > -#if RTE_CACHE_LINE_SIZE < 64
> > - /*
> > - * Need a barrier here before writing the byte_count
> > - * fields to make sure that all the data is visible
> > - * before the byte_count field is set.
> > - * Otherwise, if the segment begins a new cacheline,
> > - * the HCA prefetcher could grab the 64-byte chunk
> and
> > - * get a valid (!= 0xffffffff) byte count but stale
> > - * data, and end up sending the wrong data.
> > - */
> > - rte_io_wmb();
> > -#endif /* RTE_CACHE_LINE_SIZE */
> > - dseg->byte_count = byte_count;
> > - } else {
> > - /*
> > - * This data segment starts at the beginning of a new
> > - * TXBB, so we need to postpone its byte_count
> writing
> > - * for later.
> > - */
> > - /* Handle WQE wraparound. */
> > - if (dseg >=
> > - (volatile struct mlx4_wqe_data_seg *)sq->eob)
> > - dseg = (volatile struct mlx4_wqe_data_seg *)
> > - sq->buf;
> > - dseg->addr = rte_cpu_to_be_64(addr);
> > - dseg->lkey = rte_cpu_to_be_32(lkey);
> > - pv[pv_counter].dseg = dseg;
> > - pv[pv_counter++].val = byte_count;
> > + mlx4_fill_tx_data_seg(dseg, lkey,
> > + rte_pktmbuf_mtod(sbuf, uintptr_t),
> > + rte_cpu_to_be_32(sbuf->data_len ?
> > + sbuf->data_len :
> > + 0x80000000));
> > + sbuf = sbuf->next;
> > + dseg++;
> > + nb_segs--;
> > + /* fallthrough */
> > + case 1:
> > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > + if (unlikely(lkey == (uint32_t)-1)) {
> > + DEBUG("%p: unable to get MP <-> MR association",
> > + (void *)txq);
> > + return -1;
> > + }
> > + mlx4_fill_tx_data_seg(dseg, lkey,
> > + rte_pktmbuf_mtod(sbuf, uintptr_t),
> > + rte_cpu_to_be_32(sbuf->data_len ?
> > + sbuf->data_len :
> > + 0x80000000));
> > + nb_segs--;
> > + if (nb_segs) {
> > + sbuf = sbuf->next;
> > + dseg++;
> > + goto txbb_head_seg;
> > }
> > + /* fallthrough */
> > + case 0:
> > + break;
> > }
>
> I think this "switch (nb_segs)" idea is an interesting approach, but should
> occur inside a loop construct as previously described.
>
Same comment as above.
> > /* Write the first DWORD of each TXBB save earlier. */
> > if (pv_counter) {
> > @@ -583,7 +614,6 @@ struct pv {
> > } srcrb;
> > uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> > uint32_t lkey;
> > - uintptr_t addr;
> >
> > /* Clean up old buffer. */
> > if (likely(elt->buf != NULL)) {
> > @@ -618,24 +648,19 @@ struct pv {
> > dseg = (volatile struct mlx4_wqe_data_seg *)
> > ((uintptr_t)ctrl +
> > sizeof(struct mlx4_wqe_ctrl_seg));
> > - addr = rte_pktmbuf_mtod(buf, uintptr_t);
> > - rte_prefetch0((volatile void *)addr);
> > - dseg->addr = rte_cpu_to_be_64(addr);
> > - /* Memory region key (big endian). */
> > +
> > + ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> > lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(buf));
> > - dseg->lkey = rte_cpu_to_be_32(lkey);
> > - if (unlikely(dseg->lkey ==
> > - rte_cpu_to_be_32((uint32_t)-1))) {
> > + if (unlikely(lkey == (uint32_t)-1)) {
> > /* MR does not exist. */
> > DEBUG("%p: unable to get MP <-> MR
> association",
> > (void *)txq);
> > elt->buf = NULL;
> > break;
> > }
> > - /* Never be TXBB aligned, no need compiler barrier.
> */
> > - dseg->byte_count = rte_cpu_to_be_32(buf-
> >data_len);
> > - /* Fill the control parameters for this packet. */
> > - ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> > + mlx4_fill_tx_data_seg(dseg, lkey,
> > + rte_pktmbuf_mtod(buf,
> uintptr_t),
> > + rte_cpu_to_be_32(buf-
> >data_len));
> > nr_txbbs = 1;
> > } else {
> > nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl); diff -
> -git
> > a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h index
> > 463df2b..8207232 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.h
> > +++ b/drivers/net/mlx4/mlx4_rxtx.h
> > @@ -212,4 +212,37 @@ int mlx4_tx_queue_setup(struct rte_eth_dev
> *dev, uint16_t idx,
> > return mlx4_txq_add_mr(txq, mp, i);
> > }
> >
> > +/**
> > + * Write Tx data segment to the SQ.
> > + *
> > + * @param dseg
> > + * Pointer to data segment in SQ.
> > + * @param lkey
> > + * Memory region lkey.
> > + * @param addr
> > + * Data address.
> > + * @param byte_count
> > + * Big Endian bytes count of the data to send.
>
> Big Endian => Big endian
>
> How about using the dedicated type to properly document it?
> See rte_be32_t from rte_byteorder.h.
>
Learned new something, thanks, will check it.
> > + */
> > +static inline void
> > +mlx4_fill_tx_data_seg(volatile struct mlx4_wqe_data_seg *dseg,
> > + uint32_t lkey, uintptr_t addr, uint32_t byte_count) {
> > + dseg->addr = rte_cpu_to_be_64(addr);
> > + dseg->lkey = rte_cpu_to_be_32(lkey); #if RTE_CACHE_LINE_SIZE <
> 64
> > + /*
> > + * Need a barrier here before writing the byte_count
> > + * fields to make sure that all the data is visible
> > + * before the byte_count field is set.
> > + * Otherwise, if the segment begins a new cacheline,
> > + * the HCA prefetcher could grab the 64-byte chunk and
> > + * get a valid (!= 0xffffffff) byte count but stale
> > + * data, and end up sending the wrong data.
> > + */
> > + rte_io_wmb();
> > +#endif /* RTE_CACHE_LINE_SIZE */
> > + dseg->byte_count = byte_count;
> > +}
> > +
>
> No need to expose this function in a header file. Note that rte_cpu_*() and
> rte_io*() require the inclusion of rte_byteorder.h and rte_atomic.h
> respectively.
>
Shouldn't inline functions be in header files?
> > #endif /* MLX4_RXTX_H_ */
> > --
> > 1.8.3.1
> >
>
> --
> Adrien Mazarguil
> 6WIND
More information about the dev
mailing list