[dpdk-dev] [PATCH v11 5/9] net/virtio: implement transmit path for packed queues

Maxime Coquelin maxime.coquelin at redhat.com
Wed Dec 5 12:16:23 CET 2018



On 12/3/18 3:15 PM, Jens Freimann wrote:
> This implements the transmit path for devices with
> support for packed virtqueues.
> 
> Signed-off-by: Jens Freiman <jfreimann at redhat.com>
> Signed-off-by: Tiwei Bie <tiwei.bie at intel.com>
> ---
>   drivers/net/virtio/virtio_ethdev.c |  54 ++++---
>   drivers/net/virtio/virtio_ethdev.h |   2 +
>   drivers/net/virtio/virtio_rxtx.c   | 235 ++++++++++++++++++++++++++++-
>   drivers/net/virtio/virtqueue.h     |  21 ++-
>   4 files changed, 290 insertions(+), 22 deletions(-)
> 
> diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
> index 48707b7b8..bdcc9f365 100644
> --- a/drivers/net/virtio/virtio_ethdev.c
> +++ b/drivers/net/virtio/virtio_ethdev.c
> @@ -388,6 +388,9 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>   	if (vtpci_packed_queue(hw)) {
>   		vq->avail_wrap_counter = 1;
>   		vq->used_wrap_counter = 1;
> +		vq->avail_used_flags =
> +			VRING_DESC_F_AVAIL(vq->avail_wrap_counter) |
> +			VRING_DESC_F_USED(!vq->avail_wrap_counter);
>   	}
>   
>   	/*
> @@ -495,16 +498,22 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>   		memset(txr, 0, vq_size * sizeof(*txr));
>   		for (i = 0; i < vq_size; i++) {
>   			struct vring_desc *start_dp = txr[i].tx_indir;
> -
> -			vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
> -
> +			struct vring_packed_desc *start_dp_packed = txr[i].tx_indir_pq;
> +	
>   			/* first indirect descriptor is always the tx header */
> -			start_dp->addr = txvq->virtio_net_hdr_mem
> -				+ i * sizeof(*txr)
> -				+ offsetof(struct virtio_tx_region, tx_hdr);
> -
> -			start_dp->len = hw->vtnet_hdr_size;
> -			start_dp->flags = VRING_DESC_F_NEXT;
> +			if (vtpci_packed_queue(hw)) {
> +				start_dp_packed->addr = txvq->virtio_net_hdr_mem
> +					+ i * sizeof(*txr)
> +					+ offsetof(struct virtio_tx_region, tx_hdr);
> +				start_dp_packed->len = hw->vtnet_hdr_size;
> +			} else {
> +				vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
> +				start_dp->addr = txvq->virtio_net_hdr_mem
> +					+ i * sizeof(*txr)
> +					+ offsetof(struct virtio_tx_region, tx_hdr);
> +				start_dp->len = hw->vtnet_hdr_size;
> +				start_dp->flags = VRING_DESC_F_NEXT;
> +			}
>   		}
>   	}
>   
> @@ -1333,6 +1342,23 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
>   {
>   	struct virtio_hw *hw = eth_dev->data->dev_private;
>   
> +	if (vtpci_packed_queue(hw)) {
> +		PMD_INIT_LOG(INFO,
> +			"virtio: using packed ring standard Tx path on port %u",
> +			eth_dev->data->port_id);
> +		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
> +	} else {
> +		if (hw->use_inorder_tx) {
> +			PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
> +				eth_dev->data->port_id);
> +			eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
> +		} else {
> +			PMD_INIT_LOG(INFO, "virtio: using standard Tx path on port %u",
> +				eth_dev->data->port_id);
> +			eth_dev->tx_pkt_burst = virtio_xmit_pkts;
> +		}
> +	}
> +
>   	if (hw->use_simple_rx) {
>   		PMD_INIT_LOG(INFO, "virtio: using simple Rx path on port %u",
>   			eth_dev->data->port_id);
> @@ -1353,15 +1379,7 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
>   		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
>   	}
>   
> -	if (hw->use_inorder_tx) {
> -		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
> -			eth_dev->data->port_id);
> -		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
> -	} else {
> -		PMD_INIT_LOG(INFO, "virtio: using standard Tx path on port %u",
> -			eth_dev->data->port_id);
> -		eth_dev->tx_pkt_burst = virtio_xmit_pkts;
> -	}
> +
>   }
>   
>   /* Only support 1:1 queue/interrupt mapping so far.
> diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
> index e0f80e5a4..05d355180 100644
> --- a/drivers/net/virtio/virtio_ethdev.h
> +++ b/drivers/net/virtio/virtio_ethdev.h
> @@ -82,6 +82,8 @@ uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
>   
>   uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
>   		uint16_t nb_pkts);
> +uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
> +		uint16_t nb_pkts);
>   
>   uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
>   		uint16_t nb_pkts);
> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> index eb891433e..1fcc9cef7 100644
> --- a/drivers/net/virtio/virtio_rxtx.c
> +++ b/drivers/net/virtio/virtio_rxtx.c
> @@ -88,6 +88,23 @@ vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
>   	dp->next = VQ_RING_DESC_CHAIN_END;
>   }
>   
> +static void
> +vq_ring_free_id_packed(struct virtqueue *vq, uint16_t id)

I think it should be inlined.

> +{
> +	struct vq_desc_extra *dxp;
> +
> +	dxp = &vq->vq_descx[id];
> +	vq->vq_free_cnt += dxp->ndescs;
> +
> +	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END)
> +		vq->vq_desc_head_idx = id;
> +	else
> +		vq->vq_descx[vq->vq_desc_tail_idx].next = id;
> +
> +	vq->vq_desc_tail_idx = id;
> +	dxp->next = VQ_RING_DESC_CHAIN_END;
> +}
> +
>   static uint16_t
>   virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct rte_mbuf **rx_pkts,
>   			   uint32_t *len, uint16_t num)
> @@ -165,6 +182,33 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
>   #endif
>   
>   /* Cleanup from completed transmits. */
> +static void
> +virtio_xmit_cleanup_packed(struct virtqueue *vq, int num)

Ditto.

> +{
> +	uint16_t used_idx, id;
> +	uint16_t size = vq->vq_nentries;
> +	struct vring_packed_desc *desc = vq->ring_packed.desc_packed;
> +	struct vq_desc_extra *dxp;
> +
> +	used_idx = vq->vq_used_cons_idx;
> +	while (num-- && desc_is_used(&desc[used_idx], vq)) {
> +		used_idx = vq->vq_used_cons_idx;
> +		id = desc[used_idx].id;
> +		dxp = &vq->vq_descx[id];
> +		vq->vq_used_cons_idx += dxp->ndescs;
> +		if (vq->vq_used_cons_idx >= size) {
> +			vq->vq_used_cons_idx -= size;
> +			vq->used_wrap_counter ^= 1;
> +		}
> +		vq_ring_free_id_packed(vq, id);
> +		if (dxp->cookie != NULL) {
> +			rte_pktmbuf_free(dxp->cookie);
> +			dxp->cookie = NULL;
> +		}
> +		used_idx = vq->vq_used_cons_idx;
> +	}
> +}
> +
>   static void
>   virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
>   {
> @@ -456,6 +500,107 @@ virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
>   	vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
>   }
>   
> +static inline void
> +virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
> +			      uint16_t needed, int can_push)
> +{
> +	struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
> +	struct vq_desc_extra *dxp;
> +	struct virtqueue *vq = txvq->vq;
> +	struct vring_packed_desc *start_dp, *head_dp;
> +	uint16_t idx, id, head_idx, head_flags;
> +	uint16_t head_size = vq->hw->vtnet_hdr_size;
> +	struct virtio_net_hdr *hdr;
> +	uint16_t prev;
> +
> +	id = vq->vq_desc_head_idx;
> +
> +	dxp = &vq->vq_descx[id];
> +	dxp->ndescs = needed;
> +	dxp->cookie = cookie;
> +
> +	head_idx = vq->vq_avail_idx;
> +	idx = head_idx;
> +	prev = head_idx;
> +	start_dp = vq->ring_packed.desc_packed;
> +
> +	head_dp = &vq->ring_packed.desc_packed[idx];
> +	head_flags = cookie->next ? VRING_DESC_F_NEXT: 0;
> +	head_flags |= vq->avail_used_flags;
> +
> +	if (can_push) {
> +		/* prepend cannot fail, checked by caller */
> +		hdr = (struct virtio_net_hdr *)
> +			rte_pktmbuf_prepend(cookie, head_size);
> +		/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
> +		 * which is wrong. Below subtract restores correct pkt size.
> +		 */
> +		cookie->pkt_len -= head_size;
> +
> +		/* if offload disabled, it is not zeroed below, do it now */
> +		if (!vq->hw->has_tx_offload) {
> +			ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
> +		}
> +	} else {
> +		/* setup first tx ring slot to point to header
> +		 * stored in reserved region.
> +		 */
> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
> +			RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
> +		start_dp[idx].len   = vq->hw->vtnet_hdr_size;
> +		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
> +		idx++;
> +		if (idx >= vq->vq_nentries) {
> +			idx -= vq->vq_nentries;
> +			vq->avail_wrap_counter ^= 1;
> +			vq->avail_used_flags =
> +				VRING_DESC_F_AVAIL(vq->avail_wrap_counter) |
> +				VRING_DESC_F_USED(!vq->avail_wrap_counter);
> +		}
> +	}
> +
> +	virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
> +
> +	do {
> +		uint16_t flags;
> +
> +		start_dp[idx].addr = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
> +		start_dp[idx].len  = cookie->data_len;
> +		if (likely(idx != head_idx)) {
> +			flags = cookie->next ? VRING_DESC_F_NEXT : 0;
> +			flags |= vq->avail_used_flags;
> +			start_dp[idx].flags = flags;
> +		}
> +		prev = idx;
> +		idx++;
> +		if (idx >= vq->vq_nentries) {
> +			idx -= vq->vq_nentries;
> +			vq->avail_wrap_counter ^= 1;
> +			vq->avail_used_flags =
> +				VRING_DESC_F_AVAIL(vq->avail_wrap_counter) |
> +				VRING_DESC_F_USED(!vq->avail_wrap_counter);
> +		}
> +	} while ((cookie = cookie->next) != NULL);
> +
> +	start_dp[prev].id = id;
> +
> +	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
> +
> +	vq->vq_desc_head_idx = dxp->next;
> +	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> +		vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
> +
> +	vq->vq_avail_idx = idx;
> +
> +	rte_smp_wmb();
> +	head_dp->flags = head_flags;
> +}
> +
>   static inline void
>   virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
>   			uint16_t needed, int use_indirect, int can_push,
> @@ -733,8 +878,10 @@ virtio_dev_tx_queue_setup_finish(struct rte_eth_dev *dev,
>   
>   	PMD_INIT_FUNC_TRACE();
>   
> -	if (hw->use_inorder_tx)
> -		vq->vq_ring.desc[vq->vq_nentries - 1].next = 0;
> +	if (!vtpci_packed_queue(hw)) {
> +		if (hw->use_inorder_tx)
> +			vq->vq_ring.desc[vq->vq_nentries - 1].next = 0;
> +	}
>   
>   	VIRTQUEUE_DUMP(vq);
>   
> @@ -1346,6 +1493,90 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>   	return nb_rx;
>   }
>   
> +uint16_t
> +virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> +{
> +	struct virtnet_tx *txvq = tx_queue;
> +	struct virtqueue *vq = txvq->vq;
> +	struct virtio_hw *hw = vq->hw;
> +	uint16_t hdr_size = hw->vtnet_hdr_size;
> +	uint16_t nb_tx = 0;
> +	int error;
> +
> +	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
> +		return nb_tx;
> +
> +	if (unlikely(nb_pkts < 1))
> +		return nb_pkts;
> +
> +	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
> +
> +	if (nb_pkts > vq->vq_free_cnt)
> +		virtio_xmit_cleanup_packed(vq, nb_pkts - vq->vq_free_cnt);
> +
> +	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
> +		struct rte_mbuf *txm = tx_pkts[nb_tx];
> +		int can_push = 0, slots, need;
> +
> +		/* Do VLAN tag insertion */
> +		if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
> +			error = rte_vlan_insert(&txm);
> +			if (unlikely(error)) {
> +				rte_pktmbuf_free(txm);
> +				continue;
> +			}
> +		}
> +
> +		/* optimize ring usage */
> +		if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
> +		      vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
> +		    rte_mbuf_refcnt_read(txm) == 1 &&
> +		    RTE_MBUF_DIRECT(txm) &&
> +		    txm->nb_segs == 1 &&
> +		    rte_pktmbuf_headroom(txm) >= hdr_size &&
> +		    rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
> +				   __alignof__(struct virtio_net_hdr_mrg_rxbuf)))
> +			can_push = 1;
> +
> +		/* How many main ring entries are needed to this Tx?
> +		 * any_layout => number of segments
> +		 * default    => number of segments + 1
> +		 */
> +		slots = txm->nb_segs + !can_push;
> +		need = slots - vq->vq_free_cnt;
> +
> +		/* Positive value indicates it need free vring descriptors */
> +		if (unlikely(need > 0)) {
> +			virtio_rmb();
> +			need = RTE_MIN(need, (int)nb_pkts);
> +			virtio_xmit_cleanup_packed(vq, need);
> +			need = slots - vq->vq_free_cnt;
> +			if (unlikely(need > 0)) {
> +				PMD_TX_LOG(ERR,
> +					   "No free tx descriptors to transmit");
> +				break;
> +			}
> +		}
> +
> +		/* Enqueue Packet buffers */
> +		virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push);
> +
> +		txvq->stats.bytes += txm->pkt_len;
> +		virtio_update_packet_stats(&txvq->stats, txm);
> +	}
> +
> +	txvq->stats.packets += nb_tx;
> +
> +	if (likely(nb_tx)) {
> +		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
> +			virtqueue_notify(vq);
> +			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
> +		}
> +	}
> +
> +	return nb_tx;
> +}

I wonder what the performance cost would be to have a common function 
for split and packed, and just call
virtqueue_enqueue_xmit_packed/virtqueue_enqueue_xmit_split dynamically.

> +
>   uint16_t
>   virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>   {
> diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
> index 8bb0c5b3f..5119818e1 100644
> --- a/drivers/net/virtio/virtqueue.h
> +++ b/drivers/net/virtio/virtqueue.h
> @@ -171,6 +171,7 @@ struct virtqueue {
>   	bool avail_wrap_counter;
>   	bool used_wrap_counter;
>   	uint16_t event_flags_shadow;
> +	uint16_t avail_used_flags;
>   
>   	/**
>   	 * Last consumed descriptor in the used table,
> @@ -247,8 +248,12 @@ struct virtio_net_hdr_mrg_rxbuf {
>   #define VIRTIO_MAX_TX_INDIRECT 8
>   struct virtio_tx_region {
>   	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
> -	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
> -			   __attribute__((__aligned__(16)));
> +	union {
> +		struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
> +			__attribute__((__aligned__(16)));
> +		struct vring_packed_desc tx_indir_pq[VIRTIO_MAX_TX_INDIRECT]
> +			__attribute__((__aligned__(16)));
> +	};
>   };
>   
>   static inline void
> @@ -392,6 +397,7 @@ virtio_get_queue_type(struct virtio_hw *hw, uint16_t vtpci_queue_idx)
>   #define VIRTQUEUE_NUSED(vq) ((uint16_t)((vq)->vq_ring.used->idx - (vq)->vq_used_cons_idx))
>   
>   void vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx);
> +void vq_ring_free_chain_packed(struct virtqueue *vq, uint16_t desc_idx);
>   void vq_ring_free_inorder(struct virtqueue *vq, uint16_t desc_idx,
>   			  uint16_t num);
>   
> @@ -425,6 +431,17 @@ virtqueue_kick_prepare(struct virtqueue *vq)
>   	return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
>   }
>   
> +static inline int
> +virtqueue_kick_prepare_packed(struct virtqueue *vq)
> +{
> +	uint16_t flags;
> +
> +	virtio_mb();
> +	flags = vq->ring_packed.device_event->desc_event_flags;
> +
> +	return flags != RING_EVENT_FLAGS_DISABLE;
> +}
> +
>   static inline void
>   virtqueue_notify(struct virtqueue *vq)
>   {
> 


More information about the dev mailing list