[dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in mergeable path

Maxime Coquelin maxime.coquelin at redhat.com
Fri Dec 21 09:36:14 CET 2018



On 12/21/18 7:27 AM, Gavin Hu (Arm Technology China) wrote:
> 
> 
>> -----Original Message-----
>> From: dev <dev-bounces at dpdk.org> On Behalf Of Maxime Coquelin
>> Sent: Friday, December 21, 2018 1:27 AM
>> To: dev at dpdk.org; jfreimann at redhat.com; tiwei.bie at intel.com;
>> zhihong.wang at intel.com
>> Cc: Maxime Coquelin <maxime.coquelin at redhat.com>
>> Subject: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in
>> mergeable path
>>
>> This patch improves both descriptors dequeue and refill,
>> by using the same batching strategy as done in in-order path.
>>
>> Signed-off-by: Maxime Coquelin <maxime.coquelin at redhat.com>
>> Tested-by: Jens Freimann <jfreimann at redhat.com>
>> Reviewed-by: Jens Freimann <jfreimann at redhat.com>
>> ---
>>   drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++--------------
>>   1 file changed, 129 insertions(+), 110 deletions(-)
>>
>> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
>> index 58376ced3..1cfa2f0d6 100644
>> --- a/drivers/net/virtio/virtio_rxtx.c
>> +++ b/drivers/net/virtio/virtio_rxtx.c
>> @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct
>> virtqueue *vq,
>>   }
>>
>>   static inline int
>> -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
>> *cookie)
>> +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
>> **cookie,
>> +				uint16_t num)
>>   {
>>   	struct vq_desc_extra *dxp;
>>   	struct virtio_hw *hw = vq->hw;
>> -	struct vring_desc *start_dp;
>> -	uint16_t needed = 1;
>> -	uint16_t head_idx, idx;
>> +	struct vring_desc *start_dp = vq->vq_ring.desc;
>> +	uint16_t idx, i;
>>
>>   	if (unlikely(vq->vq_free_cnt == 0))
>>   		return -ENOSPC;
>> -	if (unlikely(vq->vq_free_cnt < needed))
>> +	if (unlikely(vq->vq_free_cnt < num))
>>   		return -EMSGSIZE;
>>
>> -	head_idx = vq->vq_desc_head_idx;
>> -	if (unlikely(head_idx >= vq->vq_nentries))
>> +	if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries))
>>   		return -EFAULT;
>>
>> -	idx = head_idx;
>> -	dxp = &vq->vq_descx[idx];
>> -	dxp->cookie = (void *)cookie;
>> -	dxp->ndescs = needed;
>> +	for (i = 0; i < num; i++) {
>> +		idx = vq->vq_desc_head_idx;
>> +		dxp = &vq->vq_descx[idx];
>> +		dxp->cookie = (void *)cookie[i];
>> +		dxp->ndescs = 1;
>>
>> -	start_dp = vq->vq_ring.desc;
>> -	start_dp[idx].addr =
>> -		VIRTIO_MBUF_ADDR(cookie, vq) +
>> -		RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
>> -	start_dp[idx].len =
>> -		cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw-
>>> vtnet_hdr_size;
>> -	start_dp[idx].flags =  VRING_DESC_F_WRITE;
>> -	idx = start_dp[idx].next;
>> -	vq->vq_desc_head_idx = idx;
>> -	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
>> -		vq->vq_desc_tail_idx = idx;
>> -	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
>> -	vq_update_avail_ring(vq, head_idx);
>> +		start_dp[idx].addr =
>> +			VIRTIO_MBUF_ADDR(cookie[i], vq) +
>> +			RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
>> +		start_dp[idx].len =
>> +			cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM +
>> +			hw->vtnet_hdr_size;
>> +		start_dp[idx].flags = VRING_DESC_F_WRITE;
>> +		vq->vq_desc_head_idx = start_dp[idx].next;
>> +		vq_update_avail_ring(vq, idx);
>> +		if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
>> {
>> +			vq->vq_desc_tail_idx = vq->vq_desc_head_idx;
>> +			break;
>> +		}
>> +	}
>> +
>> +	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num);
>>
>>   	return 0;
>>   }
>> @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct
>> rte_eth_dev *dev, uint16_t queue_idx)
>>   				error =
>> virtqueue_enqueue_recv_refill_packed(vq,
>>   						&m, 1);
>>   			else
>> -				error = virtqueue_enqueue_recv_refill(vq,
>> m);
>> +				error = virtqueue_enqueue_recv_refill(vq,
>> +						&m, 1);
>>   			if (error) {
>>   				rte_pktmbuf_free(m);
>>   				break;
>> @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct
>> rte_mbuf *m)
>>   	if (vtpci_packed_queue(vq->hw))
>>   		error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1);
>>   	else
>> -		error = virtqueue_enqueue_recv_refill(vq, m);
>> +		error = virtqueue_enqueue_recv_refill(vq, &m, 1);
>>
>>   	if (unlikely(error)) {
>>   		RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf");
>> @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf
>> **rx_pkts, uint16_t nb_pkts)
>>   			dev->data->rx_mbuf_alloc_failed++;
>>   			break;
>>   		}
>> -		error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
>> +		error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1);
>>   		if (unlikely(error)) {
>>   			rte_pktmbuf_free(new_mbuf);
>>   			break;
>> @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>   	struct virtnet_rx *rxvq = rx_queue;
>>   	struct virtqueue *vq = rxvq->vq;
>>   	struct virtio_hw *hw = vq->hw;
>> -	struct rte_mbuf *rxm, *new_mbuf;
>> -	uint16_t nb_used, num, nb_rx;
>> +	struct rte_mbuf *rxm;
>> +	struct rte_mbuf *prev;
>> +	uint16_t nb_used, num, nb_rx = 0;
>>   	uint32_t len[VIRTIO_MBUF_BURST_SZ];
>>   	struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ];
>> -	struct rte_mbuf *prev;
>>   	int error;
>> -	uint32_t i, nb_enqueued;
>> -	uint32_t seg_num;
>> -	uint16_t extra_idx;
>> -	uint32_t seg_res;
>> -	uint32_t hdr_size;
>> +	uint32_t nb_enqueued = 0;
>> +	uint32_t seg_num = 0;
>> +	uint32_t seg_res = 0;
>> +	uint32_t hdr_size = hw->vtnet_hdr_size;
>> +	int32_t i;
>>
>> -	nb_rx = 0;
>>   	if (unlikely(hw->started == 0))
>>   		return nb_rx;
>>
>> @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>
>>   	PMD_RX_LOG(DEBUG, "used:%d", nb_used);
>>
>> -	i = 0;
>> -	nb_enqueued = 0;
>> -	seg_num = 0;
>> -	extra_idx = 0;
>> -	seg_res = 0;
>> -	hdr_size = hw->vtnet_hdr_size;
>> -
>> -	while (i < nb_used) {
>> -		struct virtio_net_hdr_mrg_rxbuf *header;
>> +	num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts;
>> +	if (unlikely(num > VIRTIO_MBUF_BURST_SZ))
>> +		num = VIRTIO_MBUF_BURST_SZ;
>> +	if (likely(num > DESC_PER_CACHELINE))
>> +		num = num - ((vq->vq_used_cons_idx + num) %
>> +				DESC_PER_CACHELINE);
>>
>> -		if (nb_rx == nb_pkts)
>> -			break;
>>
>> -		num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1);
>> -		if (num != 1)
>> -			continue;
>> +	num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num);
>>
>> -		i++;
>> +	for (i = 0; i < num; i++) {
>> +		struct virtio_net_hdr_mrg_rxbuf *header;
>>
>>   		PMD_RX_LOG(DEBUG, "dequeue:%d", num);
>> -		PMD_RX_LOG(DEBUG, "packet len:%d", len[0]);
>> +		PMD_RX_LOG(DEBUG, "packet len:%d", len[i]);
>>
>> -		rxm = rcv_pkts[0];
>> +		rxm = rcv_pkts[i];
>>
>> -		if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) {
>> +		if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) {
>>   			PMD_RX_LOG(ERR, "Packet drop");
>>   			nb_enqueued++;
>>   			virtio_discard_rxbuf(vq, rxm);
>> @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>   			continue;
>>   		}
>>
>> -		header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm-
>>> buf_addr +
>> -			RTE_PKTMBUF_HEADROOM - hdr_size);
>> +		header = (struct virtio_net_hdr_mrg_rxbuf *)
>> +			 ((char *)rxm->buf_addr +
>> RTE_PKTMBUF_HEADROOM
>> +			 - hdr_size);
>>   		seg_num = header->num_buffers;
>> -
>>   		if (seg_num == 0)
>>   			seg_num = 1;
>>
>> @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>   		rxm->nb_segs = seg_num;
>>   		rxm->ol_flags = 0;
>>   		rxm->vlan_tci = 0;
>> -		rxm->pkt_len = (uint32_t)(len[0] - hdr_size);
>> -		rxm->data_len = (uint16_t)(len[0] - hdr_size);
>> +		rxm->pkt_len = (uint32_t)(len[i] - hdr_size);
>> +		rxm->data_len = (uint16_t)(len[i] - hdr_size);
>>
>>   		rxm->port = rxvq->port_id;
>> +
>>   		rx_pkts[nb_rx] = rxm;
>>   		prev = rxm;
>>
>> @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>>   			continue;
>>   		}
>>
>> +		if (hw->vlan_strip)
>> +			rte_vlan_strip(rx_pkts[nb_rx]);
>> +
>>   		seg_res = seg_num - 1;
>>
>> -		while (seg_res != 0) {
>> -			/*
>> -			 * Get extra segments for current uncompleted
>> packet.
>> -			 */
>> -			uint16_t  rcv_cnt =
>> -				RTE_MIN(seg_res, RTE_DIM(rcv_pkts));
>> -			if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
>> -				uint32_t rx_num =
>> -					virtqueue_dequeue_burst_rx(vq,
>> -					rcv_pkts, len, rcv_cnt);
>> -				i += rx_num;
>> -				rcv_cnt = rx_num;
>> -			} else {
>> -				PMD_RX_LOG(ERR,
>> -					   "No enough segments for
>> packet.");
>> -				nb_enqueued++;
>> -				virtio_discard_rxbuf(vq, rxm);
>> -				rxvq->stats.errors++;
>> -				break;
>> -			}
>> +		/* Merge remaining segments */
>> +		while (seg_res != 0 && i < (num - 1)) {
>> +			i++;
>> +
>> +			rxm = rcv_pkts[i];
>> +			rxm->data_off = RTE_PKTMBUF_HEADROOM -
>> hdr_size;
>> +			rxm->pkt_len = (uint32_t)(len[i]);
>> +			rxm->data_len = (uint16_t)(len[i]);
>> +
>> +			rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]);
>> +			rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]);
>> +
>> +			if (prev)
>> +				prev->next = rxm;
>> +
>> +			prev = rxm;
>> +			seg_res -= 1;
>> +		}
>> +
>> +		if (!seg_res) {
>> +			virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]);
>> +			nb_rx++;
>> +		}
>> +	}
>> +
>> +	/* Last packet still need merge segments */
>> +	while (seg_res != 0) {
>> +		uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res,
>> +					VIRTIO_MBUF_BURST_SZ);
>>
>> -			extra_idx = 0;
>> +		prev = rcv_pkts[nb_rx];
>> +		if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
>> +			num = virtqueue_dequeue_burst_rx(vq, rcv_pkts,
>> len,
>> +							   rcv_cnt);
>> +			uint16_t extra_idx = 0;
>>
>> +			rcv_cnt = num;
>>   			while (extra_idx < rcv_cnt) {
>>   				rxm = rcv_pkts[extra_idx];
>> -
>> -				rxm->data_off =
>> RTE_PKTMBUF_HEADROOM - hdr_size;
>> +				rxm->data_off =
>> +					RTE_PKTMBUF_HEADROOM -
>> hdr_size;
>>   				rxm->pkt_len = (uint32_t)(len[extra_idx]);
>>   				rxm->data_len = (uint16_t)(len[extra_idx]);
>> -
>> -				if (prev)
>> -					prev->next = rxm;
>> -
>> +				prev->next = rxm;
>>   				prev = rxm;
>> -				rx_pkts[nb_rx]->pkt_len += rxm->pkt_len;
>> -				extra_idx++;
>> +				rx_pkts[nb_rx]->pkt_len += len[extra_idx];
>> +				rx_pkts[nb_rx]->data_len += len[extra_idx];
>> +				extra_idx += 1;
>>   			};
>>   			seg_res -= rcv_cnt;
>> -		}
>> -
>> -		if (hw->vlan_strip)
>> -			rte_vlan_strip(rx_pkts[nb_rx]);
>> -
>> -		VIRTIO_DUMP_PACKET(rx_pkts[nb_rx],
>> -			rx_pkts[nb_rx]->data_len);
>>
>> -		virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]);
>> -		nb_rx++;
>> +			if (!seg_res) {
>> +				virtio_rx_stats_updated(rxvq,
>> rx_pkts[nb_rx]);
>> +				nb_rx++;
>> +			}
>> +		} else {
>> +			PMD_RX_LOG(ERR,
>> +					"No enough segments for packet.");
>> +			virtio_discard_rxbuf(vq, prev);
>> +			rxvq->stats.errors++;
>> +			break;
>> +		}
>>   	}
>>
>>   	rxvq->stats.packets += nb_rx;
>>
>>   	/* Allocate new mbuf for the used descriptor */
>> -	while (likely(!virtqueue_full(vq))) {
>> -		new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool);
>> -		if (unlikely(new_mbuf == NULL)) {
>> -			struct rte_eth_dev *dev
>> -				= &rte_eth_devices[rxvq->port_id];
>> -			dev->data->rx_mbuf_alloc_failed++;
>> -			break;
>> -		}
>> -		error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
>> -		if (unlikely(error)) {
>> -			rte_pktmbuf_free(new_mbuf);
>> -			break;
>> +	if (likely(!virtqueue_full(vq))) {
>> +		/* free_cnt may include mrg descs */
>> +		uint16_t free_cnt = vq->vq_free_cnt;
>> +		struct rte_mbuf *new_pkts[free_cnt];
>> +
>> +		if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts,
>> free_cnt)) {
>> +			error = virtqueue_enqueue_recv_refill(vq, new_pkts,
>> +					free_cnt);
>> +			if (unlikely(error)) {
>> +				for (i = 0; i < free_cnt; i++)
>> +					rte_pktmbuf_free(new_pkts[i]);
> Missing error handling here?  the execution keeps going on without the mbufs?

That's fine because if we get an error here, no descriptors were
inserted in the avail queue.

nb_enqueue is just used to know whether we should notify host descs are
available. I can set free_cnt to 0 in the error path though, so that
host isn't notified for nothing but that's not a big deal, really.

Thanks!
Maxime

> /Gavin
> 
>> +			}
>> +			nb_enqueued += free_cnt;
>> +		} else {
>> +			struct rte_eth_dev *dev =
>> +				&rte_eth_devices[rxvq->port_id];
>> +			dev->data->rx_mbuf_alloc_failed += free_cnt;
>>   		}
>> -		nb_enqueued++;
>>   	}
>>
>>   	if (likely(nb_enqueued)) {
>> --
>> 2.17.2
> 


More information about the dev mailing list