[dpdk-dev] [RFC] lib/librte_ether: consistent PMD batching behavior

Andrew Rybchenko arybchenko at solarflare.com
Fri Jan 20 11:26:16 CET 2017


On 01/20/2017 12:51 PM, Zhiyong Yang wrote:
> The rte_eth_tx_burst() function in the file Rte_ethdev.h is invoked to
> transmit output packets on the output queue for DPDK applications as
> follows.
>
> static inline uint16_t
> rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
>                   struct rte_mbuf **tx_pkts, uint16_t nb_pkts);
>
> Note: The fourth parameter nb_pkts: The number of packets to transmit.
> The rte_eth_tx_burst() function returns the number of packets it actually
> sent. The return value equal to *nb_pkts* means that all packets have been
> sent, and this is likely to signify that other output packets could be
> immediately transmitted again. Applications that implement a "send as many
> packets to transmit as possible" policy can check this specific case and
> keep invoking the rte_eth_tx_burst() function until a value less than
> *nb_pkts* is returned.
>
> When you call TX only once in rte_eth_tx_burst, you may get different
> behaviors from different PMDs. One problem that every DPDK user has to
> face is that they need to take the policy into consideration at the app-
> lication level when using any specific PMD to send the packets whether or
> not it is necessary, which brings usage complexities and makes DPDK users
> easily confused since they have to learn the details on TX function limit
> of specific PMDs and have to handle the different return value: the number
> of packets transmitted successfully for various PMDs. Some PMDs Tx func-
> tions have a limit of sending at most 32 packets for every invoking, some
> PMDs have another limit of at most 64 packets once, another ones have imp-
> lemented to send as many packets to transmit as possible, etc. This will
> easily cause wrong usage for DPDK users.
>
> This patch proposes to implement the above policy in DPDK lib in order to
> simplify the application implementation and avoid the incorrect invoking
> as well. So, DPDK Users don't need to consider the implementation policy
> and to write duplicated code at the application level again when sending
> packets. In addition to it, the users don't need to know the difference of
> specific PMD TX and can transmit the arbitrary number of packets as they
> expect when invoking TX API rte_eth_tx_burst, then check the return value
> to get the number of packets actually sent.
>
> How to implement the policy in DPDK lib? Two solutions are proposed below.
>
> Solution 1:
> Implement the wrapper functions to remove some limits for each specific
> PMDs as i40e_xmit_pkts_simple and ixgbe_xmit_pkts_simple do like that.

IMHO, the solution is a bit better since it:
  1. Does not affect other PMDs at all
  2. Could be a bit faster for the PMDs which require it since has no 
indirect
      function call on each iteration
  3. No ABI change

> Solution 2:
> Implement the policy in the function rte_eth_tx_burst() at the ethdev lay-
> er in a more consistent batching way. Make best effort to send *nb_pkts*
> packets with bursts of no more than 32 by default since many DPDK TX PMDs
> are using this max TX burst size(32). In addition, one data member which
> defines the max TX burst size such as "uint16_t max_tx_burst_pkts;"will be
> added to rte_eth_dev_data, which drivers can override if they work with
> bursts of 64 or other NB(thanks for Bruce <bruce.richardson at intel.com>'s
> suggestion). This can reduce the performance impacting to the lowest limit.

I see no noticeable difference in performance, so don't mind if this is 
finally choosen.
Just be sure that you update all PMDs to set reasonable default values, 
or may be
even better, set UINT16_MAX in generic place - 0 is a bad default here.
(Lost few seconds wondering why nothing is sent and cannot stop)

> I prefer the latter between the 2 solutions because it makes DPDK code more
> consistent and easier and avoids to write too much duplicate logic in DPDK
> source code. In addition, I think no or a little performance drop is
> brought by solution 2. But ABI change will be introduced.
>
> In fact, the current rte_eth_rx_burst() function is using the similar
> mechanism and faces the same problem as rte_eth_tx_burst().
>
> static inline uint16_t
> rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
>                   struct rte_mbuf **rx_pkts, const uint16_t nb_pkts);
>
> Applications are responsible of implementing the policy "retrieve as many
> received packets as possible", and check this specific case and keep
> invoking the rte_eth_rx_burst() function until a value less than *nb_pkts*
> is returned.
>
> The patch proposes to apply the above method to rte_eth_rx_burst() as well.
>
> In summary, The purpose of the RFC makes the job easier and more simple for
> driver writers and avoids to write too much duplicate code at the applica-
> tion level.
>
> Signed-off-by: Zhiyong Yang <zhiyong.yang at intel.com>
> ---
>   lib/librte_ether/rte_ethdev.h | 41 +++++++++++++++++++++++++++++++++++++++--
>   1 file changed, 39 insertions(+), 2 deletions(-)
>
> diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
> index 1c356c1..6fa83cf 100644
> --- a/lib/librte_ether/rte_ethdev.h
> +++ b/lib/librte_ether/rte_ethdev.h
> @@ -1712,6 +1712,9 @@ struct rte_eth_dev_data {
>   	uint32_t min_rx_buf_size;
>   	/**< Common rx buffer size handled by all queues */
>   
> +	uint16_t max_rx_burst_pkts;
> +	uint16_t max_tx_burst_pkts;
> +
>   	uint64_t rx_mbuf_alloc_failed; /**< RX ring mbuf allocation failures. */
>   	struct ether_addr* mac_addrs;/**< Device Ethernet Link address. */
>   	uint64_t mac_pool_sel[ETH_NUM_RECEIVE_MAC_ADDR];
> @@ -2695,11 +2698,15 @@ int rte_eth_dev_set_vlan_pvid(uint8_t port_id, uint16_t pvid, int on);
>    *   of pointers to *rte_mbuf* structures effectively supplied to the
>    *   *rx_pkts* array.
>    */
> +
>   static inline uint16_t
>   rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
>   		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
>   {
>   	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +	int16_t nb_rx = 0;
> +	uint16_t pkts = 0;
> +	uint16_t rx_nb_pkts = nb_pkts;
>   
>   #ifdef RTE_LIBRTE_ETHDEV_DEBUG
>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> @@ -2710,8 +2717,20 @@ rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
>   		return 0;
>   	}
>   #endif
> -	int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
> +	if (likely(nb_pkts <= dev->data->max_rx_burst_pkts))
> +		return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
>   			rx_pkts, nb_pkts);
> +	while (rx_nb_pkts) {
> +		uint16_t num_burst = RTE_MIN(nb_pkts,
> +					      dev->data->max_rx_burst_pkts);
> +
> +		pkts = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
> +						&rx_pkts[nb_rx], num_burst);
> +		nb_rx += pkts;
> +		rx_nb_pkts -= pkts;
> +		if (pkts < num_burst)
> +			break;
> +	}
>   
>   #ifdef RTE_ETHDEV_RXTX_CALLBACKS
>   	struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
> @@ -2833,11 +2852,13 @@ rte_eth_rx_descriptor_done(uint8_t port_id, uint16_t queue_id, uint16_t offset)
>    *   the transmit ring. The return value can be less than the value of the
>    *   *tx_pkts* parameter when the transmit ring is full or has been filled up.
>    */
> +
>   static inline uint16_t
>   rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
>   		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>   {
>   	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +	uint16_t nb_tx = 0;
>   
>   #ifdef RTE_LIBRTE_ETHDEV_DEBUG
>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> @@ -2860,8 +2881,24 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
>   		} while (cb != NULL);
>   	}
>   #endif
> +	if (likely(nb_pkts <= dev->data->max_tx_burst_pkts))
> +		return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id],
> +						tx_pkts, nb_pkts);
> +
> +	while (nb_pkts) {
> +		uint16_t num_burst = RTE_MIN(nb_pkts,
> +					     dev->data->max_tx_burst_pkts);
> +		uint16_t pkts;
> +
> +		pkts = (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id],
> +						&tx_pkts[nb_tx], num_burst);
> +		nb_tx += pkts;
> +		nb_pkts -= pkts;
> +		if (pkts < num_burst)
> +			break;
> +	}
>   
> -	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
> +	return nb_tx;
>   }
>   
>   /**




More information about the dev mailing list