[dpdk-dev] [PATCH 09/11] net/vhostpci: add TX function

Zhiyong Yang zhiyong.yang at intel.com
Thu Nov 30 10:46:55 CET 2017


add the functions to support transmitting packets.

Signed-off-by: Zhiyong Yang <zhiyong.yang at intel.com>
---
 drivers/net/vhostpci/vhostpci_ethdev.c | 352 +++++++++++++++++++++++++++++++++
 1 file changed, 352 insertions(+)

diff --git a/drivers/net/vhostpci/vhostpci_ethdev.c b/drivers/net/vhostpci/vhostpci_ethdev.c
index 06e3f5c50..f233d85a8 100644
--- a/drivers/net/vhostpci/vhostpci_ethdev.c
+++ b/drivers/net/vhostpci/vhostpci_ethdev.c
@@ -53,6 +53,12 @@
 #define VHOSTPCI_MAX_PKT_BURST 32
 #define VHOSTPCI_BUF_VECTOR_MAX 256
 
+/* avoid write operation when necessary, to lessen cache issues */
+#define ASSIGN_UNLESS_EQUAL(var, val) do {	\
+	if ((var) != (val))			\
+		(var) = (val);			\
+} while (0)
+
 static void
 vhostpci_dev_info_get(struct rte_eth_dev *dev,
 		struct rte_eth_dev_info *dev_info);
@@ -100,6 +106,10 @@ static uint16_t
 vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
 
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
+
 static int
 vhostpci_dev_start(struct rte_eth_dev *dev);
 
@@ -321,6 +331,346 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 	return 0;
 }
 
+
+static inline void
+do_data_copy_enqueue(struct vhostpci_virtqueue *vq)
+{
+	struct batch_copy_elem *elem = vq->batch_copy_elems;
+	uint16_t count = vq->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+}
+
+static __rte_always_inline int
+copy_mbuf_to_desc_mergeable(struct vhostpci_net *dev,
+		struct vhostpci_virtqueue *vq, struct rte_mbuf *m,
+		struct buf_vector *buf_vec, uint16_t num_buffers)
+{
+	uint32_t vec_idx = 0;
+	uint64_t desc_addr;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t desc_offset, desc_avail;
+	uint32_t cpy_len;
+	uint64_t hdr_addr;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	uint16_t copy_nb = vq->batch_copy_nb_elems;
+	int error = 0;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	desc_addr = remote_gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
+
+	if (buf_vec[vec_idx].buf_len < dev->vhost_hlen || !desc_addr) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_addr = desc_addr;
+	rte_prefetch0((void *)(uintptr_t)hdr_addr);
+
+	desc_avail  = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+	desc_offset = dev->vhost_hlen;
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		/* done with current desc buf, get the next one */
+		if (desc_avail == 0) {
+			vec_idx++;
+			desc_addr = remote_gpa_to_vva(dev,
+					buf_vec[vec_idx].buf_addr);
+
+			if (unlikely(!desc_addr)) {
+				error = -1;
+				goto out;
+			}
+
+			/* Prefetch buffer address. */
+			rte_prefetch0((void *)(uintptr_t)desc_addr);
+			desc_offset = 0;
+			desc_avail  = buf_vec[vec_idx].buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			struct virtio_net_hdr_mrg_rxbuf *hdr;
+
+			hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)
+				hdr_addr;
+			ASSIGN_UNLESS_EQUAL(hdr->num_buffers, num_buffers);
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+
+		if (likely(cpy_len > MAX_BATCH_LEN || copy_nb >= vq->size)) {
+			rte_memcpy((void *)((uintptr_t)(desc_addr +
+							desc_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+
+		} else {
+			batch_copy[copy_nb].dst =
+				(void *)((uintptr_t)(desc_addr + desc_offset));
+			batch_copy[copy_nb].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[copy_nb].len = cpy_len;
+			copy_nb++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		desc_avail  -= cpy_len;
+		desc_offset += cpy_len;
+	}
+
+out:
+	vq->batch_copy_nb_elems = copy_nb;
+
+	return error;
+}
+
+static __rte_always_inline int
+fill_vec_buf(struct vhostpci_virtqueue *vq,
+		uint32_t avail_idx, uint32_t *vec_idx,
+		struct buf_vector *buf_vec, uint16_t *desc_chain_head,
+		uint16_t *desc_chain_len)
+{
+	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+	uint32_t vec_id = *vec_idx;
+	uint32_t len = 0;
+	struct vring_desc *descs = vq->desc;
+
+	*desc_chain_head = idx;
+
+	while (1) {
+		if (unlikely(vec_id >= VHOSTPCI_BUF_VECTOR_MAX ||
+			idx >= vq->size))
+			return -1;
+
+		len += descs[idx].len;
+		buf_vec[vec_id].buf_addr = descs[idx].addr;
+		buf_vec[vec_id].buf_len  = descs[idx].len;
+		buf_vec[vec_id].desc_idx = idx;
+		vec_id++;
+
+		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
+			break;
+
+		idx = descs[idx].next;
+	}
+
+	*desc_chain_len = len;
+	*vec_idx = vec_id;
+
+	return 0;
+}
+
+static __rte_always_inline void
+update_shadow_used_ring(struct vhostpci_virtqueue *vq, uint16_t desc_idx,
+			uint16_t len)
+{
+	uint16_t i = vq->shadow_used_idx++;
+
+	vq->shadow_used_ring[i].id  = desc_idx;
+	vq->shadow_used_ring[i].len = len;
+}
+
+static inline int
+reserve_avail_buf_mergeable(struct vhostpci_virtqueue *vq,
+			    uint32_t size, struct buf_vector *buf_vec,
+			    uint16_t *num_buffers, uint16_t avail_head)
+{
+	uint16_t cur_idx;
+	uint32_t vec_idx = 0;
+	uint16_t tries = 0;
+
+	uint16_t head_idx = 0;
+	uint16_t len = 0;
+
+	*num_buffers = 0;
+	cur_idx  = vq->last_avail_idx;
+
+	while (size > 0) {
+		if (unlikely(cur_idx == avail_head))
+			return -1;
+
+		if (unlikely(fill_vec_buf(vq, cur_idx, &vec_idx, buf_vec,
+				&head_idx, &len) < 0))
+			return -1;
+		len = RTE_MIN(len, size);
+		update_shadow_used_ring(vq, head_idx, len);
+		size -= len;
+
+		cur_idx++;
+		tries++;
+		*num_buffers += 1;
+
+		/**
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(tries >= vq->size))
+			return -1;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline void
+do_flush_shadow_used_ring(struct vhostpci_virtqueue *vq, uint16_t to,
+		uint16_t from, uint16_t size)
+{
+	rte_memcpy(&vq->used->ring[to], &vq->shadow_used_ring[from],
+			size * sizeof(struct vring_used_elem));
+}
+
+static __rte_always_inline void
+flush_shadow_used_ring(struct vhostpci_virtqueue *vq)
+{
+	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
+
+	if (used_idx + vq->shadow_used_idx <= vq->size) {
+		do_flush_shadow_used_ring(vq, used_idx, 0,
+					  vq->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vq->size] */
+		size = vq->size - used_idx;
+		do_flush_shadow_used_ring(vq, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring(vq, 0, size,
+					  vq->shadow_used_idx - size);
+	}
+	vq->last_used_idx += vq->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	*(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx;
+}
+
+static __rte_always_inline uint32_t
+vhostpci_dev_merge_rx(struct vhostpci_net *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t count)
+{
+	struct vhostpci_virtqueue *vq;
+	uint32_t pkt_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[VHOSTPCI_BUF_VECTOR_MAX];
+	uint16_t avail_head;
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring)))
+		return 0;
+
+	vq = dev->virtqueue[queue_id];
+	if (unlikely(vq->enabled == 0))
+		return 0;
+
+	count = RTE_MIN((uint32_t)VHOSTPCI_MAX_PKT_BURST, count);
+	if (count == 0)
+		return 0;
+
+	vq->batch_copy_nb_elems = 0;
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	vq->shadow_used_idx = 0;
+	avail_head = *((volatile uint16_t *)&vq->avail->idx);
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
+
+		if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len, buf_vec,
+			&num_buffers, avail_head) < 0)) {
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		if (copy_mbuf_to_desc_mergeable(dev, vq, pkts[pkt_idx],
+						buf_vec, num_buffers) < 0) {
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		vq->last_avail_idx += num_buffers;
+	}
+
+	do_data_copy_enqueue(vq);
+
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring(vq);
+
+		/* flush used->idx update before we read avail->flags. */
+		rte_mb();
+	}
+
+	return pkt_idx;
+}
+
+static uint16_t
+vhostpci_enqueue_burst(struct vhostpci_net *vpnet, uint16_t queue_id,
+		       struct rte_mbuf **pkts, uint16_t count)
+{
+	return vhostpci_dev_merge_rx(vpnet, queue_id, pkts, count);
+}
+
+static uint16_t
+eth_vhostpci_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhostpci_queue *r = q;
+	uint16_t i, nb_tx = 0;
+	uint16_t nb_send = nb_bufs;
+
+	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&r->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+		goto out;
+
+	/* Enqueue packets to RX queue in the other guest*/
+	while (nb_send) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_send,
+						 VHOSTPCI_MAX_PKT_BURST);
+
+		nb_pkts = vhostpci_enqueue_burst(r->vpnet, r->virtqueue_id,
+						&bufs[nb_tx], num);
+
+		nb_tx += nb_pkts;
+		nb_send -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	r->stats.pkts += nb_tx;
+	r->stats.missed_pkts += nb_bufs - nb_tx;
+
+	for (i = 0; likely(i < nb_tx); i++)
+		r->stats.bytes += bufs[i]->pkt_len;
+
+	for (i = 0; likely(i < nb_tx); i++)
+		rte_pktmbuf_free(bufs[i]);
+out:
+	rte_atomic32_set(&r->while_queuing, 0);
+
+	return nb_tx;
+}
+
 static __rte_always_inline void
 update_used_ring(struct vhostpci_virtqueue *vq,
 		 uint32_t used_idx, uint32_t desc_idx)
@@ -1027,6 +1377,8 @@ eth_vhostpci_dev_init(struct rte_eth_dev *eth_dev)
 			vhostpci_interrupt_handler, eth_dev);
 
 	eth_dev->rx_pkt_burst = &eth_vhostpci_rx;
+	eth_dev->tx_pkt_burst = &eth_vhostpci_tx;
+
 	return 0;
 }
 
-- 
2.13.3



More information about the dev mailing list