[dpdk-dev] vhost: virtio-net rx-ring stop work after work many hours, bug?

Linhaifeng haifeng.lin at huawei.com
Tue Feb 3 13:38:19 CET 2015



On 2015/1/27 17:37, Michael S. Tsirkin wrote:
> On Tue, Jan 27, 2015 at 03:57:13PM +0800, Linhaifeng wrote:
>> Hi,all
>>
>> I use vhost-user to send data to VM at first it cant work well but after many hours VM can not receive data but can send data.
>>
>> (gdb)p avail_idx
>> $4 = 2668
>> (gdb)p free_entries
>> $5 = 0
>> (gdb)l
>>         /* check that we have enough buffers */
>>         if (unlikely(count > free_entries))
>>             count = free_entries;
>>
>>         if (count == 0){
>>             int b=0;
>>             if(b) { // when set b=1 to notify guest rx_ring will restart to work
>>                 if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
>>
>>                     eventfd_write(vq->callfd, 1);
>>                 }
>>             }
>>             return 0;
>>         }
>>
>> some info i print in guest:
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1644, avail->idx=1644
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1645, avail->idx=1645
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1646, avail->idx=1646
>>
>> # free
>>              total       used       free     shared    buffers     cached
>> Mem:      3924100      337252    3586848          0      95984     138060
>> -/+ buffers/cache:     103208    3820892
>> Swap:       970748          0     970748
>>
>> I have two questions:
>> 1.Should we need to notify guest when there is no buffer in vq->avail?
> 
> No unless NOTIFY_ON_EMPTY is set (most guests don't set it).

Thank you for your new knowledge:)

> 
>> 2.Why virtio_net stop to fill avail?
> 
> Most likely, it didn't get an interrupt.
> 
> If so, it would be a dpdk vhost user bug.
> Which code are you using in dpdk?
> 

Hi,mst

Thank you for your reply.
Sorry, maybe my mail filter have a bug,so i saw this mail until now.

I use the dpdk code before 2bbb811.I paste the code here for you to review.
(Note that the vhost_enqueue_burstand vhost_dequeue_burst function runs as poll mode.)

I guess if vhost_enqueue_burst used all the buffers in rx_ring then try to notify guest
to receive but at this time vcpu may be exiting so guest cann't receive the notify.


/*
 * Enqueues packets to the guest virtio RX virtqueue for vhost devices.
 */
static inline uint32_t __attribute__((always_inline))
vhost_enqueue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned count)
{
	struct vhost_virtqueue *vq;
	struct vring_desc *desc;
	struct rte_mbuf *buff;
	/* The virtio_hdr is initialised to 0. */
	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0,0,0,0,0,0},0};
	uint64_t buff_addr = 0;
	uint64_t buff_hdr_addr = 0;
	uint32_t head[PKT_BURST_SIZE], packet_len = 0;
	uint32_t head_idx, packet_success = 0;
	uint32_t mergeable, mrg_count = 0;
	uint32_t retry = 0;
	uint16_t avail_idx, res_cur_idx;
	uint16_t res_base_idx, res_end_idx;
	uint16_t free_entries;
	uint8_t success = 0;

	LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
	vq = dev->virtqueue[VIRTIO_RXQ];
	count = (count > PKT_BURST_SIZE) ? PKT_BURST_SIZE : count;

	/* As many data cores may want access to available buffers, they need to be reserved. */
	do {
		res_base_idx = vq->last_used_idx_res;
		avail_idx = *((volatile uint16_t *)&vq->avail->idx);

		free_entries = (avail_idx - res_base_idx);
		/* If retry is enabled and the queue is full then we wait and retry to avoid packet loss. */
		if (unlikely(count > free_entries)) {
			for (retry = 0; retry < burst_tx_retry_num; retry++) {
				rte_delay_us(burst_tx_delay_time);
				avail_idx =
					*((volatile uint16_t *)&vq->avail->idx);
				free_entries = (avail_idx - res_base_idx);
				if (count <= free_entries)
					break;
			}
		}

		/*check that we have enough buffers*/
		if (unlikely(count > free_entries))
			count = free_entries;

		if (count == 0) 			// !!!!!!!!!!!!!!!!!!! when VM cann't receive always return here
			return 0;

		res_end_idx = res_base_idx + count;
		/* vq->last_used_idx_res is atomically updated. */
		success = rte_atomic16_cmpset(&vq->last_used_idx_res, res_base_idx,
									res_end_idx);
	} while (unlikely(success == 0));
	res_cur_idx = res_base_idx;
	LOG_DEBUG(APP, "(%"PRIu64") Current Index %d| End Index %d\n", dev->device_fh, res_cur_idx, res_end_idx);

	/* Prefetch available ring to retrieve indexes. */
	rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);

	/* Check if the VIRTIO_NET_F_MRG_RXBUF feature is enabled. */
	mergeable = dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF);

	/* Retrieve all of the head indexes first to avoid caching issues. */
	for (head_idx = 0; head_idx < count; head_idx++)
		head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) & (vq->size - 1)];

	/*Prefetch descriptor index. */
	rte_prefetch0(&vq->desc[head[packet_success]]);

	while (res_cur_idx != res_end_idx) {
		/* Get descriptor from available ring */
		desc = &vq->desc[head[packet_success]];

		buff = pkts[packet_success];

		/* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
		buff_addr = gpa_to_vva(dev, desc->addr);
		/* Prefetch buffer address. */
		rte_prefetch0((void*)(uintptr_t)buff_addr);

		if (mergeable && (mrg_count != 0)) {
			desc->len = packet_len = rte_pktmbuf_data_len(buff);
		} else {
			/* Copy virtio_hdr to packet and increment buffer address */
			buff_hdr_addr = buff_addr;
			packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;

			/*
			 * If the descriptors are chained the header and data are placed in
			 * separate buffers.
			 */
			if (desc->flags & VRING_DESC_F_NEXT) {
				desc->len = vq->vhost_hlen;
				desc = &vq->desc[desc->next];
				/* Buffer address translation. */
				buff_addr = gpa_to_vva(dev, desc->addr);
				desc->len = rte_pktmbuf_data_len(buff);
			} else {
				buff_addr += vq->vhost_hlen;
				desc->len = packet_len;
			}
		}

		/* Update used ring with desc information */
		vq->used->ring[res_cur_idx & (vq->size - 1)].id = head[packet_success];
		vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;

		/* Copy mbuf data to buffer */
		rte_memcpy((void *)(uintptr_t)buff_addr, (const void*)buff->pkt.data, rte_pktmbuf_data_len(buff));

		PRINT_PACKET(dev, (uintptr_t)buff_addr, rte_pktmbuf_data_len(buff), 0);

		res_cur_idx++;
		packet_success++;

		/* If mergeable is disabled then a header is required per buffer. */
		if (!mergeable) {
			rte_memcpy((void *)(uintptr_t)buff_hdr_addr, (const void*)&virtio_hdr, vq->vhost_hlen);
			PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
		} else {
			mrg_count++;
			/* Merge buffer can only handle so many buffers at a time. Tell the guest if this limit is reached. */
			if ((mrg_count == MAX_MRG_PKT_BURST) || (res_cur_idx == res_end_idx)) {
				virtio_hdr.num_buffers = mrg_count;
				LOG_DEBUG(APP, "(%"PRIu64") RX: Num merge buffers %d\n", dev->device_fh, virtio_hdr.num_buffers);
				rte_memcpy((void *)(uintptr_t)buff_hdr_addr, (const void*)&virtio_hdr, vq->vhost_hlen);
				PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
				mrg_count = 0;
			}
		}
		if (res_cur_idx < res_end_idx) {
			/* Prefetch descriptor index. */
			rte_prefetch0(&vq->desc[head[packet_success]]);
		}
	}

	rte_compiler_barrier();

	/* Wait until it's our turn to add our buffer to the used ring. */
	while (unlikely(vq->last_used_idx != res_base_idx))
		rte_pause();

	*(volatile uint16_t *) &vq->used->idx += count;
	vq->last_used_idx = res_end_idx;

	/* Kick the guest if necessary. */
	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
		eventfd_write(vq->kickfd,1);

	return count;
}

/*
 * Dequeues packets from the guest virtio TX virtqueue for vhost devices.
 */
static inline uint16_t __attribute__((always_inline))
vhost_dequeue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned count)
{
	struct rte_mbuf *mbuf;
	struct vhost_virtqueue *vq;
	struct vring_desc *desc;
	uint64_t buff_addr = 0;
	uint32_t head[PKT_BURST_SIZE];
	uint32_t used_idx, i;
	uint16_t free_entries, packet_success = 0;
	uint16_t avail_idx;

	vq = dev->virtqueue[VIRTIO_TXQ];
	avail_idx = *((volatile uint16_t *)&vq->avail->idx);

	/* If there are no available buffers then return. */
	if (vq->last_used_idx == avail_idx)
		return 0;

	LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_tx()\n", dev->device_fh);

	/* Prefetch available ring to retrieve head indexes. */
	rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);

	/*get the number of free entries in the ring*/
	free_entries = (avail_idx - vq->last_used_idx);

	/* Limit to PKT_BURST_SIZE. */
	if (free_entries > count)
		free_entries = count;

	/*
	 * Performance is better if cachelines containing descriptors are not accessed by multiple
	 * cores. We try finish with a cacheline before passing it on.
	 */
	if (likely(free_entries > DESC_PER_CACHELINE))
		free_entries = free_entries - ((vq->last_used_idx + free_entries) % DESC_PER_CACHELINE);

	LOG_DEBUG(APP, "(%"PRIu64") Buffers available %d\n", dev->device_fh, free_entries);
	/* Retrieve all of the head indexes first to avoid caching issues. */
	for (i = 0; i < free_entries; i++)
		head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];

	/* Prefetch descriptor index. */
	rte_prefetch0(&vq->desc[head[packet_success]]);
	rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);

	while (packet_success < free_entries) {
		desc = &vq->desc[head[packet_success]];

		/* Discard first buffer as it is the virtio header */
		desc = &vq->desc[desc->next];

		/* Buffer address translation. */
		buff_addr = gpa_to_vva(dev, desc->addr);
		/* Prefetch buffer address. */
		rte_prefetch0((void*)(uintptr_t)buff_addr);

		used_idx = vq->last_used_idx & (vq->size - 1);

		if (packet_success < (free_entries - 1)) {
			/* Prefetch descriptor index. */
			rte_prefetch0(&vq->desc[head[packet_success+1]]);
			rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
		}

		/* Update used index buffer information. */
		vq->used->ring[used_idx].id = head[packet_success];
		vq->used->ring[used_idx].len = 0;

		/* Allocate an mbuf and populate the structure. */
		mbuf = rte_pktmbuf_alloc(pktmbuf_pool);
		if (unlikely(mbuf == NULL)) {
			RTE_LOG(ERR, APP, "Failed to allocate memory for mbuf.\n");
			return packet_success;
		}

		/* Setup dummy mbuf. */
		mbuf->pkt.data_len = desc->len;
		mbuf->pkt.pkt_len = mbuf->pkt.data_len;

		rte_memcpy((void*) mbuf->pkt.data,
		        (const void*) buff_addr, mbuf->pkt.data_len);

		pkts[packet_success]=mbuf;

		PRINT_PACKET(dev, (uintptr_t)buff_addr, desc->len, 0);

		vq->last_used_idx++;
		packet_success++;
	}

	rte_compiler_barrier();
	vq->used->idx += packet_success;
	/* Kick guest if required. */
	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
		eventfd_write(vq->kickfd,1);
	return packet_success;
}

>>
>>
>>
>>
>>
>> -- 
>> Regards,
>> Haifeng
> 
> .
> 

-- 
Regards,
Haifeng



More information about the dev mailing list