[dpdk-dev] [PATCH 3/4] net/vhost: leverage DMA engines to accelerate Tx operations

Liu, Yong yong.liu at intel.com
Tue Mar 17 08:21:00 CET 2020


Hi Jiayu,
Some comments are inline.

Thanks,
Marvin

> -----Original Message-----
> From: dev <dev-bounces at dpdk.org> On Behalf Of Jiayu Hu
> Sent: Tuesday, March 17, 2020 5:21 PM
> To: dev at dpdk.org
> Cc: maxime.coquelin at redhat.com; Ye, Xiaolong <xiaolong.ye at intel.com>;
> Wang, Zhihong <zhihong.wang at intel.com>; Hu, Jiayu <jiayu.hu at intel.com>
> Subject: [dpdk-dev] [PATCH 3/4] net/vhost: leverage DMA engines to
> accelerate Tx operations
> 
> This patch accelerates large data movement in Tx operations via DMA
> engines, like I/OAT, the DMA engine in Intel's processors.
> 
> Large copies are offloaded from the CPU to the DMA engine in an
> asynchronous manner. The CPU just submits copy jobs to the DMA engine
> and without waiting for DMA copy completion; there is no CPU intervention
> during DMA data transfer. By overlapping CPU computation and DMA copy,
> we can save precious CPU cycles and improve the overall throughput for
> vhost-user PMD based applications, like OVS. Due to startup overheads
> associated with DMA engines, small copies are performed by the CPU.
> 
> Note that vhost-user PMD can support various DMA engines, but it just
> supports I/OAT devices currently. In addition, I/OAT acceleration
> is only enabled for split rings.
> 
> DMA devices used by queues are assigned by users; for a queue without
> assigning a DMA device, the PMD will leverages librte_vhost to perform
> Tx operations. A queue can only be assigned one I/OAT device, and
> an I/OAT device can only be used by one queue.
> 
> We introduce a new vdev parameter to enable DMA acceleration for Tx
> operations of queues:
>  - dmas: This parameter is used to specify the assigned DMA device of
>    a queue.
> Here is an example:
>  $ ./testpmd -c f -n 4 \
> 	 --vdev 'net_vhost0,iface=/tmp/s0,queues=1,dmas=[txq0 at 00:04.0]'
> 
> Signed-off-by: Jiayu Hu <jiayu.hu at intel.com>
> ---
>  drivers/net/vhost/Makefile        |   2 +-
>  drivers/net/vhost/internal.h      |  19 +
>  drivers/net/vhost/meson.build     |   2 +-
>  drivers/net/vhost/rte_eth_vhost.c | 252 ++++++++++++-
>  drivers/net/vhost/virtio_net.c    | 742
> ++++++++++++++++++++++++++++++++++++++
>  drivers/net/vhost/virtio_net.h    | 120 ++++++
>  6 files changed, 1120 insertions(+), 17 deletions(-)
> 
> diff --git a/drivers/net/vhost/Makefile b/drivers/net/vhost/Makefile
> index 19cae52..87dfb14 100644
> --- a/drivers/net/vhost/Makefile
> +++ b/drivers/net/vhost/Makefile
> @@ -11,7 +11,7 @@ LIB = librte_pmd_vhost.a
>  LDLIBS += -lpthread
>  LDLIBS += -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ring
>  LDLIBS += -lrte_ethdev -lrte_net -lrte_kvargs -lrte_vhost
> -LDLIBS += -lrte_bus_vdev
> +LDLIBS += -lrte_bus_vdev -lrte_rawdev_ioat
> 
>  CFLAGS += -O3
>  CFLAGS += $(WERROR_FLAGS)
> diff --git a/drivers/net/vhost/internal.h b/drivers/net/vhost/internal.h
> index 7588fdf..f19ed7a 100644
> --- a/drivers/net/vhost/internal.h
> +++ b/drivers/net/vhost/internal.h
> @@ -20,6 +20,8 @@ extern int vhost_logtype;
>  #define VHOST_LOG(level, ...) \
>  	rte_log(RTE_LOG_ ## level, vhost_logtype, __VA_ARGS__)
> 
> +typedef int (*process_dma_done_fn)(void *dev, void *dma_vr);
> +
>  enum vhost_xstats_pkts {
>  	VHOST_UNDERSIZE_PKT = 0,
>  	VHOST_64_PKT,
> @@ -96,6 +98,11 @@ struct dma_vring {
>  	 * used by the DMA.
>  	 */
>  	phys_addr_t used_idx_hpa;
> +
> +	struct ring_index *indices;
> +	uint16_t max_indices;
> +
> +	process_dma_done_fn dma_done_fn;
>  };
> 
>  struct vhost_queue {
> @@ -110,6 +117,13 @@ struct vhost_queue {
>  	struct dma_vring *dma_vring;
>  };
> 
> +struct dma_info {
> +	process_dma_done_fn dma_done_fn;
> +	struct rte_pci_addr addr;
> +	uint16_t dev_id;
> +	bool is_valid;
> +};
> +
>  struct pmd_internal {
>  	rte_atomic32_t dev_attached;
>  	char *iface_name;
> @@ -132,6 +146,11 @@ struct pmd_internal {
>  	/* negotiated features */
>  	uint64_t features;
>  	size_t hdr_len;
> +	bool vring_setup_done;
> +	bool guest_mem_populated;
> +
> +	/* User-assigned DMA information */
> +	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
>  };
> 
>  #ifdef __cplusplus
> diff --git a/drivers/net/vhost/meson.build b/drivers/net/vhost/meson.build
> index b308dcb..af3c640 100644
> --- a/drivers/net/vhost/meson.build
> +++ b/drivers/net/vhost/meson.build
> @@ -6,4 +6,4 @@ reason = 'missing dependency, DPDK vhost library'
>  sources = files('rte_eth_vhost.c',
>  		'virtio_net.c')
>  install_headers('rte_eth_vhost.h')
> -deps += 'vhost'
> +deps += ['vhost', 'rawdev']
> diff --git a/drivers/net/vhost/rte_eth_vhost.c
> b/drivers/net/vhost/rte_eth_vhost.c
> index b5c927c..9faaa02 100644
> --- a/drivers/net/vhost/rte_eth_vhost.c
> +++ b/drivers/net/vhost/rte_eth_vhost.c
> @@ -15,8 +15,12 @@
>  #include <rte_kvargs.h>
>  #include <rte_vhost.h>
>  #include <rte_spinlock.h>
> +#include <rte_string_fns.h>
> +#include <rte_rawdev.h>
> +#include <rte_ioat_rawdev.h>
> 
>  #include "internal.h"
> +#include "virtio_net.h"
>  #include "rte_eth_vhost.h"
> 
>  int vhost_logtype;
> @@ -30,8 +34,12 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
>  #define ETH_VHOST_IOMMU_SUPPORT		"iommu-support"
>  #define ETH_VHOST_POSTCOPY_SUPPORT	"postcopy-support"
>  #define ETH_VHOST_VIRTIO_NET_F_HOST_TSO "tso"
> +#define ETH_VHOST_DMA_ARG		"dmas"
>  #define VHOST_MAX_PKT_BURST 32
> 
> +/* ring size of I/OAT */
> +#define IOAT_RING_SIZE 1024
> +

Jiayu,
Configured I/OAT ring size is 1024 here, but do not see in_flight or nr_batching size check in enqueue function.
Is there any possibility that IOAT ring exhausted?

>  static const char *valid_arguments[] = {
>  	ETH_VHOST_IFACE_ARG,
>  	ETH_VHOST_QUEUES_ARG,
> @@ -40,6 +48,7 @@ static const char *valid_arguments[] = {
>  	ETH_VHOST_IOMMU_SUPPORT,
>  	ETH_VHOST_POSTCOPY_SUPPORT,
>  	ETH_VHOST_VIRTIO_NET_F_HOST_TSO,
> +	ETH_VHOST_DMA_ARG,
>  	NULL
>  };
> 
> @@ -377,6 +386,7 @@ static uint16_t
>  eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
>  {
>  	struct vhost_queue *r = q;
> +	struct pmd_internal *dev = r->internal;
>  	uint16_t i, nb_tx = 0;
>  	uint16_t nb_send = 0;
> 
> @@ -405,18 +415,33 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
>  	}
> 
>  	/* Enqueue packets to guest RX queue */
> -	while (nb_send) {
> -		uint16_t nb_pkts;
> -		uint16_t num = (uint16_t)RTE_MIN(nb_send,
> -						 VHOST_MAX_PKT_BURST);
> -
> -		nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
> -						  &bufs[nb_tx], num);
> -
> -		nb_tx += nb_pkts;
> -		nb_send -= nb_pkts;
> -		if (nb_pkts < num)
> -			break;
> +	if (!r->dma_vring->dma_enabled) {
> +		while (nb_send) {
> +			uint16_t nb_pkts;
> +			uint16_t num = (uint16_t)RTE_MIN(nb_send,
> +					VHOST_MAX_PKT_BURST);
> +
> +			nb_pkts = rte_vhost_enqueue_burst(r->vid,
> +							  r->virtqueue_id,
> +							  &bufs[nb_tx], num);
> +			nb_tx += nb_pkts;
> +			nb_send -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
> +	} else {
> +		while (nb_send) {
> +			uint16_t nb_pkts;
> +			uint16_t num = (uint16_t)RTE_MIN(nb_send,
> +
> VHOST_MAX_PKT_BURST);
> +
> +			nb_pkts = vhost_dma_enqueue_burst(dev, r-
> >dma_vring,
> +							  &bufs[nb_tx], num);
> +			nb_tx += nb_pkts;
> +			nb_send -= nb_pkts;
> +			if (nb_pkts < num)
> +				break;
> +		}
>  	}
> 
>  	r->stats.pkts += nb_tx;
> @@ -434,6 +459,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
>  	for (i = nb_tx; i < nb_bufs; i++)
>  		vhost_count_multicast_broadcast(r, bufs[i]);
> 
> +	/* Only DMA non-occupied mbuf segments will be freed */
>  	for (i = 0; likely(i < nb_tx); i++)
>  		rte_pktmbuf_free(bufs[i]);
>  out:
> @@ -483,6 +509,12 @@ eth_rxq_intr_enable(struct rte_eth_dev *dev,
> uint16_t qid)
>  		return -1;
>  	}
> 
> +	if (vq->dma_vring->dma_enabled) {
> +		VHOST_LOG(INFO, "Don't support interrupt when DMA "
> +			  "acceleration is enabled\n");
> +		return -1;
> +	}
> +
>  	ret = rte_vhost_get_vhost_vring(vq->vid, (qid << 1) + 1, &vring);
>  	if (ret < 0) {
>  		VHOST_LOG(ERR, "Failed to get rxq%d's vring\n", qid);
> @@ -508,6 +540,12 @@ eth_rxq_intr_disable(struct rte_eth_dev *dev,
> uint16_t qid)
>  		return -1;
>  	}
> 
> +	if (vq->dma_vring->dma_enabled) {
> +		VHOST_LOG(INFO, "Don't support interrupt when DMA "
> +			  "acceleration is enabled\n");
> +		return -1;
> +	}
> +
>  	ret = rte_vhost_get_vhost_vring(vq->vid, (qid << 1) + 1, &vring);
>  	if (ret < 0) {
>  		VHOST_LOG(ERR, "Failed to get rxq%d's vring", qid);
> @@ -692,6 +730,13 @@ new_device(int vid)
>  #endif
> 
>  	internal->vid = vid;
> +	if (internal->guest_mem_populated &&
> vhost_dma_setup(internal) >= 0)
> +		internal->vring_setup_done = true;
> +	else {
> +		VHOST_LOG(INFO, "Not setup vrings for DMA
> acceleration.\n");
> +		internal->vring_setup_done = false;
> +	}
> +
>  	if (rte_atomic32_read(&internal->started) == 1) {
>  		queue_setup(eth_dev, internal);
> 
> @@ -747,6 +792,11 @@ destroy_device(int vid)
>  	update_queuing_status(eth_dev);
> 
>  	eth_dev->data->dev_link.link_status = ETH_LINK_DOWN;
> +	/**
> +	 * before destroy guest's vrings, I/O threads have
> +	 * to stop accessing queues.
> +	 */
> +	vhost_dma_remove(internal);
> 
>  	if (eth_dev->data->rx_queues && eth_dev->data->tx_queues) {
>  		for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
> @@ -785,6 +835,11 @@ vring_state_changed(int vid, uint16_t vring, int
> enable)
>  	struct rte_eth_dev *eth_dev;
>  	struct internal_list *list;
>  	char ifname[PATH_MAX];
> +	struct pmd_internal *dev;
> +	struct dma_vring *dma_vr;
> +	struct rte_ioat_rawdev_config config;
> +	struct rte_rawdev_info info = { .dev_private = &config };
> +	char name[32];
> 
>  	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
>  	list = find_internal_resource(ifname);
> @@ -794,6 +849,53 @@ vring_state_changed(int vid, uint16_t vring, int
> enable)
>  	}
> 
>  	eth_dev = list->eth_dev;
> +	dev = eth_dev->data->dev_private;
> +
> +	/* if fail to set up vrings, return. */
> +	if (!dev->vring_setup_done)
> +		goto out;
> +
> +	/* DMA acceleration just supports split rings. */
> +	if (vhost_dma_vring_is_packed(dev)) {
> +		VHOST_LOG(INFO, "DMA acceleration just supports split "
> +			  "rings.\n");
> +		goto out;
> +	}
> +
> +	/* if the vring was not given a DMA device, return. */
> +	if (!dev->dmas[vring].is_valid)
> +		goto out;
> +
> +	/**
> +	 * a vring can only use one DMA device. If it has been
> +	 * assigned one, return.
> +	 */
> +	dma_vr = &dev->dma_vrings[vring];
> +	if (dma_vr->dma_enabled)
> +		goto out;
> +
> +	rte_pci_device_name(&dev->dmas[vring].addr, name, sizeof(name));
> +	rte_rawdev_info_get(dev->dmas[vring].dev_id, &info);
> +	config.ring_size = IOAT_RING_SIZE;
> +	if (rte_rawdev_configure(dev->dmas[vring].dev_id, &info) < 0) {
> +		VHOST_LOG(ERR, "Config the DMA device %s failed\n",
> name);
> +		goto out;
> +	}
> +
> +	rte_rawdev_start(dev->dmas[vring].dev_id);
> +
> +	memcpy(&dma_vr->dma_addr, &dev->dmas[vring].addr,
> +	       sizeof(struct rte_pci_addr));
> +	dma_vr->dev_id = dev->dmas[vring].dev_id;
> +	dma_vr->dma_enabled = true;
> +	dma_vr->nr_inflight = 0;
> +	dma_vr->nr_batching = 0;
> +	dma_vr->dma_done_fn = dev->dmas[vring].dma_done_fn;
> +
> +	VHOST_LOG(INFO, "Attach the DMA %s to vring %u of port %u\n",
> +		  name, vring, eth_dev->data->port_id);
> +
> +out:
>  	/* won't be NULL */
>  	state = vring_states[eth_dev->data->port_id];
>  	rte_spinlock_lock(&state->lock);
> @@ -1239,7 +1341,7 @@ static const struct eth_dev_ops ops = {
>  static int
>  eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
>  	int16_t queues, const unsigned int numa_node, uint64_t flags,
> -	uint64_t disable_flags)
> +	uint64_t disable_flags, struct dma_info *dmas)
>  {
>  	const char *name = rte_vdev_device_name(dev);
>  	struct rte_eth_dev_data *data;
> @@ -1290,6 +1392,13 @@ eth_dev_vhost_create(struct rte_vdev_device
> *dev, char *iface_name,
>  	eth_dev->rx_pkt_burst = eth_vhost_rx;
>  	eth_dev->tx_pkt_burst = eth_vhost_tx;
> 
> +	memcpy(internal->dmas, dmas, sizeof(struct dma_info) * 2 *
> +	       RTE_MAX_QUEUES_PER_PORT);
> +	if (flags & RTE_VHOST_USER_DMA_COPY)
> +		internal->guest_mem_populated = true;
> +	else
> +		internal->guest_mem_populated = false;
> +
>  	rte_eth_dev_probing_finish(eth_dev);
>  	return 0;
> 
> @@ -1329,6 +1438,100 @@ open_int(const char *key __rte_unused, const
> char *value, void *extra_args)
>  	return 0;
>  }
> 
> +struct dma_info_input {
> +	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
> +	uint16_t nr;
> +};
> +
> +static inline int
> +open_dma(const char *key __rte_unused, const char *value, void
> *extra_args)
> +{
> +	struct dma_info_input *dma_info = extra_args;
> +	char *input = strndup(value, strlen(value) + 1);
> +	char *addrs = input;
> +	char *ptrs[2];
> +	char *start, *end, *substr;
> +	int64_t qid, vring_id;
> +	struct rte_ioat_rawdev_config config;
> +	struct rte_rawdev_info info = { .dev_private = &config };
> +	char name[32];
> +	int dev_id;
> +	int ret = 0;
> +
> +	while (isblank(*addrs))
> +		addrs++;
> +	if (addrs == '\0') {
> +		VHOST_LOG(ERR, "No input DMA addresses\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	/* process DMA devices within bracket. */
> +	addrs++;
> +	substr = strtok(addrs, ";]");
> +	if (!substr) {
> +		VHOST_LOG(ERR, "No input DMA addresse\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	do {
> +		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
> +
Function rte_strsplit can be failed. Need to check return value.

> +		start = strstr(ptrs[0], "txq");
> +		if (start == NULL) {
> +			VHOST_LOG(ERR, "Illegal queue\n");
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		start += 3;

It's better not use hardcode value.

> +		qid = strtol(start, &end, 0);
> +		if (end == start) {
> +			VHOST_LOG(ERR, "No input queue ID\n");
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		vring_id = qid * 2 + VIRTIO_RXQ;
> +		if (rte_pci_addr_parse(ptrs[1],
> +				       &dma_info->dmas[vring_id].addr) < 0) {
> +			VHOST_LOG(ERR, "Invalid DMA address %s\n",
> ptrs[1]);
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		rte_pci_device_name(&dma_info->dmas[vring_id].addr,
> +				    name, sizeof(name));
> +		dev_id = rte_rawdev_get_dev_id(name);
> +		if (dev_id == (uint16_t)(-ENODEV) ||
> +		    dev_id == (uint16_t)(-EINVAL)) {
> +			VHOST_LOG(ERR, "Cannot find device %s.\n", name);
> +			ret = -1;
> +			goto out;
> +		}
> +
Multiple queues can't share one IOAT device. Check should be here as it is not allowed.

> +		if (rte_rawdev_info_get(dev_id, &info) < 0 ||
> +		    strstr(info.driver_name, "ioat") == NULL) {
> +			VHOST_LOG(ERR, "The input device %s is invalid or "
> +				  "it is not an I/OAT device\n", name);
> +			ret = -1;
> +			goto out;
> +		}
> +
> +		dma_info->dmas[vring_id].dev_id = dev_id;
> +		dma_info->dmas[vring_id].is_valid = true;
> +		dma_info->dmas[vring_id].dma_done_fn = free_dma_done;
> +		dma_info->nr++;
> +
> +		substr = strtok(NULL, ";]");
> +	} while (substr);
> +
> +out:
> +	free(input);
> +	return ret;
> +}
> +
>  static int
>  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
>  {
> @@ -1345,6 +1548,7 @@ rte_pmd_vhost_probe(struct rte_vdev_device
> *dev)
>  	int tso = 0;
>  	struct rte_eth_dev *eth_dev;
>  	const char *name = rte_vdev_device_name(dev);
> +	struct dma_info_input dma_info = {0};
> 
>  	VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
> 
> @@ -1440,11 +1644,28 @@ rte_pmd_vhost_probe(struct rte_vdev_device
> *dev)
>  		}
>  	}
> 
> +	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
> +		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
> +					 &open_dma, &dma_info);
> +		if (ret < 0)
> +			goto out_free;
> +
> +		if (dma_info.nr > 0) {
> +			flags |= RTE_VHOST_USER_DMA_COPY;
> +			/**
> +			 * don't support live migration when enable
> +			 * DMA acceleration.
> +			 */
> +			disable_flags |= (1ULL << VHOST_F_LOG_ALL);
> +		}
> +	}
> +
>  	if (dev->device.numa_node == SOCKET_ID_ANY)
>  		dev->device.numa_node = rte_socket_id();
> 
>  	ret = eth_dev_vhost_create(dev, iface_name, queues,
> -				   dev->device.numa_node, flags,
> disable_flags);
> +				   dev->device.numa_node, flags,
> +				   disable_flags, dma_info.dmas);
>  	if (ret == -1)
>  		VHOST_LOG(ERR, "Failed to create %s\n", name);
> 
> @@ -1491,7 +1712,8 @@ RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
>  	"dequeue-zero-copy=<0|1> "
>  	"iommu-support=<0|1> "
>  	"postcopy-support=<0|1> "
> -	"tso=<0|1>");
> +	"tso=<0|1> "
> +	"dmas=[txq0 at addr0;txq1 at addr1]");
> 
>  RTE_INIT(vhost_init_log)
>  {
> diff --git a/drivers/net/vhost/virtio_net.c b/drivers/net/vhost/virtio_net.c
> index 11591c0..e7ba5b3 100644
> --- a/drivers/net/vhost/virtio_net.c
> +++ b/drivers/net/vhost/virtio_net.c
> @@ -2,11 +2,735 @@
>  #include <stdbool.h>
>  #include <linux/virtio_net.h>
> 
> +#include <rte_ethdev.h>
> +#include <rte_mbuf.h>
>  #include <rte_malloc.h>
> +#include <rte_memcpy.h>
> +#include <rte_ip.h>
> +#include <rte_tcp.h>
> +#include <rte_udp.h>
> +#include <rte_sctp.h>
>  #include <rte_vhost.h>
> +#include <rte_rawdev.h>
> +#include <rte_ioat_rawdev.h>
> 
>  #include "virtio_net.h"
> 
> +#define BUF_VECTOR_MAX 256
> +#define MAX_BATCH_LEN 256
> +
> +struct buf_vector {
> +	uint64_t buf_iova;
> +	uint64_t buf_addr;
> +	uint32_t buf_len;
> +	uint32_t desc_idx;
> +};
> +
> +static __rte_always_inline int
> +vhost_need_event(uint16_t event_idx, uint16_t new_idx, uint16_t old)
> +{
> +	return (uint16_t)(new_idx - event_idx - 1) < (uint16_t)(new_idx -
> old);
> +}
> +
> +static __rte_always_inline void
> +vhost_vring_call_split(struct pmd_internal *dev, struct dma_vring
> *dma_vr)
> +{
> +	struct rte_vhost_vring *vr = &dma_vr->vr;
> +
> +	/* flush used->idx update before we read avail->flags. */
> +	rte_smp_mb();
> +
> +	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
> +		uint16_t old = dma_vr->signalled_used;
> +		uint16_t new = dma_vr->copy_done_used;
> +		bool signalled_used_valid = dma_vr->signalled_used_valid;
> +
> +		dma_vr->signalled_used = new;
> +		dma_vr->signalled_used_valid = true;
> +
> +		VHOST_LOG(DEBUG, "%s: used_event_idx=%d, old=%d,
> new=%d\n",
> +			  __func__, vhost_used_event(vr), old, new);
> +
> +		if ((vhost_need_event(vhost_used_event(vr), new, old) &&
> +		     (vr->callfd >= 0)) || unlikely(!signalled_used_valid))
> +			eventfd_write(vr->callfd, (eventfd_t)1);
> +	} else {
> +		if (!(vr->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) &&
> +		    (vr->callfd >= 0))
> +			eventfd_write(vr->callfd, (eventfd_t)1);
> +	}
> +}
> +
> +/* notify front-end of enqueued packets */
> +static __rte_always_inline void
> +vhost_dma_vring_call(struct pmd_internal *dev, struct dma_vring
> *dma_vr)
> +{
> +	vhost_vring_call_split(dev, dma_vr);
> +}
> +
> +int
> +free_dma_done(void *dev, void *dma_vr)
> +{
> +	uintptr_t flags[255], tmps[255];

Please add meaningful macro for 255, not sure why limitation is 255 not 256.

> +	int dma_done, i;
> +	uint16_t used_idx;
> +	struct pmd_internal *device = dev;
> +	struct dma_vring *dma_vring = dma_vr;
> +
> +	dma_done = rte_ioat_completed_copies(dma_vring->dev_id, 255,
> flags,
> +					     tmps);
> +	if (unlikely(dma_done <= 0))
> +		return dma_done;
> +
> +	dma_vring->nr_inflight -= dma_done;

Not sure whether DMA engine will return completion as input sequence,  mbuf free should after index update done. 

> +	for (i = 0; i < dma_done; i++) {
> +		if ((uint64_t)flags[i] >= dma_vring->max_indices) {
> +			struct rte_mbuf *pkt = (struct rte_mbuf *)flags[i];
> +
> +			/**
> +			 * the DMA completes a packet copy job, we
> +			 * decrease the refcnt or free the mbuf segment.
> +			 */
> +			rte_pktmbuf_free_seg(pkt);
> +		} else {
> +			uint16_t id = flags[i];
> +
> +			/**
> +			 * the DMA completes updating index of the
> +			 * used ring.
> +			 */
> +			used_idx = dma_vring->indices[id].data;
> +			VHOST_LOG(DEBUG, "The DMA finishes updating
> index %u "
> +				  "for the used ring.\n", used_idx);
> +
> +			dma_vring->copy_done_used = used_idx;
> +			vhost_dma_vring_call(device, dma_vring);
> +			put_used_index(dma_vring->indices,
> +				       dma_vring->max_indices, id);
> +		}
> +	}
> +	return dma_done;
> +}
> +
> +static  __rte_always_inline bool
> +rxvq_is_mergeable(struct pmd_internal *dev)
> +{
> +	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
> +}
> +

I'm not sure whether shadow used ring can help in DMA acceleration scenario. 
Vhost driver will wait until DMA copy is done. Optimization in CPU move may not help in overall performance but just add weird codes.

> +static __rte_always_inline void
> +do_flush_shadow_used_ring_split(struct dma_vring *dma_vr, uint16_t to,
> +				uint16_t from, uint16_t size)
> +{
> +	rte_memcpy(&dma_vr->vr.used->ring[to],
> +		   &dma_vr->shadow_used_split[from],
> +		   size * sizeof(struct vring_used_elem));
> +}
> +
> +static __rte_always_inline void
> +flush_shadow_used_ring_split(struct pmd_internal *dev,
> +			     struct dma_vring *dma_vr)
> +{
> +	uint16_t used_idx = dma_vr->last_used_idx & (dma_vr->vr.size - 1);
> +
> +	if (used_idx + dma_vr->shadow_used_idx <= dma_vr->vr.size) {
> +		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0,
> +						dma_vr->shadow_used_idx);
> +	} else {
> +		uint16_t size;
> +
> +		/* update used ring interval [used_idx, vr->size] */
> +		size = dma_vr->vr.size - used_idx;
> +		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0, size);
> +
> +		/* update the left half used ring interval [0, left_size] */
> +		do_flush_shadow_used_ring_split(dma_vr, 0, size,
> +						dma_vr->shadow_used_idx -
> +						size);
> +	}
> +	dma_vr->last_used_idx += dma_vr->shadow_used_idx;
> +
> +	rte_smp_wmb();
> +
> +	if (dma_vr->nr_inflight > 0) {
> +		struct ring_index *index;
> +
> +		index = get_empty_index(dma_vr->indices, dma_vr-
> >max_indices);
> +		index->data = dma_vr->last_used_idx;
> +		while (unlikely(rte_ioat_enqueue_copy(dma_vr->dev_id,
> +						      index->pa,
> +						      dma_vr->used_idx_hpa,
> +						      sizeof(uint16_t),
> +						      index->idx, 0, 0) ==
> +				0)) {
> +			int ret;
> +
> +			do {
> +				ret = dma_vr->dma_done_fn(dev, dma_vr);
> +			} while (ret <= 0);
> +		}
> +		dma_vr->nr_batching++;
> +		dma_vr->nr_inflight++;
> +	} else {
> +		/**
> +		 * we update index of used ring when all previous copy
> +		 * jobs are completed.
> +		 *
> +		 * When enabling DMA copy, if there are outstanding copy
> +		 * jobs of the DMA, to avoid the DMA overwriting the
> +		 * write of the CPU, the DMA is in charge of updating
> +		 * the index of used ring.
> +		 */

According to comments, here should be DMA data move. But following code is CPU data move. Anything wrong here?

> +		*(volatile uint16_t *)&dma_vr->vr.used->idx +=
> +			dma_vr->shadow_used_idx;
> +		dma_vr->copy_done_used += dma_vr->shadow_used_idx;
> +	}
> +
> +	dma_vr->shadow_used_idx = 0;
> +}
> +
> +static __rte_always_inline void
> +update_shadow_used_ring_split(struct dma_vring *dma_vr,
> +			      uint16_t desc_idx, uint32_t len)
> +{
> +	uint16_t i = dma_vr->shadow_used_idx++;
> +
> +	dma_vr->shadow_used_split[i].id  = desc_idx;
> +	dma_vr->shadow_used_split[i].len = len;
> +}
> +
> +static inline void
> +do_data_copy(struct dma_vring *dma_vr)
> +{
> +	struct batch_copy_elem *elem = dma_vr->batch_copy_elems;
> +	uint16_t count = dma_vr->batch_copy_nb_elems;
> +	int i;
> +
> +	for (i = 0; i < count; i++)
> +		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
> +
> +	dma_vr->batch_copy_nb_elems = 0;
> +}
> +
> +#define ASSIGN_UNLESS_EQUAL(var, val) do {	\
> +	if ((var) != (val))			\
> +		(var) = (val);			\
> +} while (0)
> +
> +static __rte_always_inline void
> +virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr
> *net_hdr)
> +{
> +	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
> +
> +	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
> +		csum_l4 |= PKT_TX_TCP_CKSUM;
> +
> +	if (csum_l4) {
> +		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
> +		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
> +
> +		switch (csum_l4) {
> +		case PKT_TX_TCP_CKSUM:
> +			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
> +						cksum));
> +			break;
> +		case PKT_TX_UDP_CKSUM:
> +			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
> +						dgram_cksum));
> +			break;
> +		case PKT_TX_SCTP_CKSUM:
> +			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
> +						cksum));
> +			break;
> +		}
> +	} else {
> +		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
> +		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
> +		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
> +	}
> +
> +	/* IP cksum verification cannot be bypassed, then calculate here */
> +	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
> +		struct rte_ipv4_hdr *ipv4_hdr;
> +
> +		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct
> rte_ipv4_hdr *,
> +						   m_buf->l2_len);
> +		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
> +	}
> +
> +	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
> +		if (m_buf->ol_flags & PKT_TX_IPV4)
> +			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
> +		else
> +			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
> +		net_hdr->gso_size = m_buf->tso_segsz;
> +		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
> +					+ m_buf->l4_len;
> +	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
> +		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
> +		net_hdr->gso_size = m_buf->tso_segsz;
> +		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
> +			m_buf->l4_len;
> +	} else {
> +		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
> +		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
> +		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
> +	}
> +}
> +
> +static __rte_always_inline void *
> +vhost_alloc_copy_ind_table(struct pmd_internal *dev, uint64_t desc_addr,
> +			   uint64_t desc_len)
> +{
> +	void *idesc;
> +	uint64_t src, dst;
> +	uint64_t len, remain = desc_len;
> +
> +	idesc = rte_malloc(NULL, desc_len, 0);
> +	if (unlikely(!idesc))
> +		return NULL;
> +
> +	dst = (uint64_t)(uintptr_t)idesc;
> +
> +	while (remain) {
> +		len = remain;
> +		src = rte_vhost_va_from_guest_pa(dev->mem, desc_addr,
> &len);
> +		if (unlikely(!src || !len)) {
> +			rte_free(idesc);
> +			return NULL;
> +		}
> +
> +		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
> +			   len);
> +
> +		remain -= len;
> +		dst += len;
> +		desc_addr += len;
> +	}
> +
> +	return idesc;
> +}
> +
> +static __rte_always_inline void
> +free_ind_table(void *idesc)
> +{
> +	rte_free(idesc);
> +}
> +
> +static __rte_always_inline int
> +map_one_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
> +	     uint16_t *vec_idx, uint64_t desc_iova, uint64_t desc_len)
> +{
> +	uint16_t vec_id = *vec_idx;
> +
> +	while (desc_len) {
> +		uint64_t desc_addr;
> +		uint64_t desc_chunck_len = desc_len;
> +
> +		if (unlikely(vec_id >= BUF_VECTOR_MAX))
> +			return -1;
> +
> +		desc_addr = rte_vhost_va_from_guest_pa(dev->mem,
> desc_iova,
> +						       &desc_chunck_len);
> +		if (unlikely(!desc_addr))
> +			return -1;
> +
> +		rte_prefetch0((void *)(uintptr_t)desc_addr);
> +
> +		buf_vec[vec_id].buf_iova = desc_iova;
> +		buf_vec[vec_id].buf_addr = desc_addr;
> +		buf_vec[vec_id].buf_len  = desc_chunck_len;
> +
> +		desc_len -= desc_chunck_len;
> +		desc_iova += desc_chunck_len;
> +		vec_id++;
> +	}
> +	*vec_idx = vec_id;
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline int
> +fill_vec_buf_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
> +		   uint32_t avail_idx, uint16_t *vec_idx,
> +		   struct buf_vector *buf_vec, uint16_t *desc_chain_head,
> +		   uint32_t *desc_chain_len)
> +{
> +	struct rte_vhost_vring *vr = &dma_vr->vr;
> +	uint16_t idx = vr->avail->ring[avail_idx & (vr->size - 1)];
> +	uint16_t vec_id = *vec_idx;
> +	uint32_t len    = 0;
> +	uint64_t dlen;
> +	uint32_t nr_descs = vr->size;
> +	uint32_t cnt    = 0;
> +	struct vring_desc *descs = vr->desc;
> +	struct vring_desc *idesc = NULL;
> +
> +	if (unlikely(idx >= vr->size))
> +		return -1;
> +
> +	*desc_chain_head = idx;
> +
> +	if (vr->desc[idx].flags & VRING_DESC_F_INDIRECT) {
> +		dlen = vr->desc[idx].len;
> +		nr_descs = dlen / sizeof(struct vring_desc);
> +		if (unlikely(nr_descs > vr->size))
> +			return -1;
> +
> +		descs = (struct vring_desc *)(uintptr_t)
> +			rte_vhost_va_from_guest_pa(dev->mem,
> +						   vr->desc[idx].addr, &dlen);
> +		if (unlikely(!descs))
> +			return -1;
> +
> +		if (unlikely(dlen < vr->desc[idx].len)) {
> +			/**
> +			 * the indirect desc table is not contiguous
> +			 * in process VA space, we have to copy it.
> +			 */
> +			idesc = vhost_alloc_copy_ind_table(dev,
> +							   vr->desc[idx].addr,
> +							   vr->desc[idx].len);
> +			if (unlikely(!idesc))
> +				return -1;
> +
> +			descs = idesc;
> +		}
> +
> +		idx = 0;
> +	}
> +
> +	while (1) {
> +		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
> +			free_ind_table(idesc);
> +			return -1;
> +		}
> +
> +		len += descs[idx].len;
> +
> +		if (unlikely(map_one_desc(dev, buf_vec, &vec_id,
> +					  descs[idx].addr, descs[idx].len))) {
> +			free_ind_table(idesc);
> +			return -1;
> +		}
> +
> +		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
> +			break;
> +
> +		idx = descs[idx].next;
> +	}
> +
> +	*desc_chain_len = len;
> +	*vec_idx = vec_id;
> +
> +	if (unlikely(!!idesc))
> +		free_ind_table(idesc);
> +
> +	return 0;
> +}
> +
> +static inline int
> +reserve_avail_buf_split(struct pmd_internal *dev, struct dma_vring
> *dma_vr,
> +			uint32_t size, struct buf_vector *buf_vec,
> +			uint16_t *num_buffers, uint16_t avail_head,
> +			uint16_t *nr_vec)
> +{
> +	struct rte_vhost_vring *vr = &dma_vr->vr;
> +
> +	uint16_t cur_idx;
> +	uint16_t vec_idx = 0;
> +	uint16_t max_tries, tries = 0;
> +
> +	uint16_t head_idx = 0;
> +	uint32_t len = 0;
> +
> +	*num_buffers = 0;
> +	cur_idx = dma_vr->last_avail_idx;
> +
> +	if (rxvq_is_mergeable(dev))
> +		max_tries = vr->size - 1;
> +	else
> +		max_tries = 1;
> +
> +	while (size > 0) {
> +		if (unlikely(cur_idx == avail_head))
> +			return -1;
> +		/**
> +		 * if we tried all available ring items, and still
> +		 * can't get enough buf, it means something abnormal
> +		 * happened.
> +		 */
> +		if (unlikely(++tries > max_tries))
> +			return -1;
> +
> +		if (unlikely(fill_vec_buf_split(dev, dma_vr, cur_idx,
> +						&vec_idx, buf_vec,
> +						&head_idx, &len) < 0))
> +			return -1;
> +		len = RTE_MIN(len, size);
> +		update_shadow_used_ring_split(dma_vr, head_idx, len);
> +		size -= len;
> +
> +		cur_idx++;
> +		*num_buffers += 1;
> +	}
> +
> +	*nr_vec = vec_idx;
> +
> +	return 0;
> +}
> +
> +static __rte_noinline void
> +copy_vnet_hdr_to_desc(struct pmd_internal *dev, struct buf_vector
> *buf_vec,
> +		      struct virtio_net_hdr_mrg_rxbuf *hdr)
> +{
> +	uint64_t len;
> +	uint64_t remain = dev->hdr_len;
> +	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
> +	uint64_t iova = buf_vec->buf_iova;
> +
> +	while (remain) {
> +		len = RTE_MIN(remain, buf_vec->buf_len);
> +		dst = buf_vec->buf_addr;
> +		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
> +			   len);
> +
> +		remain -= len;
> +		iova += len;
> +		src += len;
> +		buf_vec++;
> +	}
> +}
> +
> +static __rte_always_inline int
> +copy_mbuf_to_desc(struct pmd_internal *dev, struct dma_vring *dma_vr,
> +		  struct rte_mbuf *m, struct buf_vector *buf_vec,
> +		  uint16_t nr_vec, uint16_t num_buffers)
> +{
> +	uint32_t vec_idx = 0;
> +	uint32_t mbuf_offset, mbuf_avail;
> +	uint32_t buf_offset, buf_avail;
> +	uint64_t buf_addr, buf_iova, buf_len;
> +	uint32_t cpy_len;
> +	uint64_t hdr_addr;
> +	struct rte_mbuf *hdr_mbuf;
> +	struct batch_copy_elem *batch_copy = dma_vr->batch_copy_elems;
> +	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
> +	uint64_t dst, src;
> +	int error = 0;
> +
> +	if (unlikely(m == NULL)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	buf_addr = buf_vec[vec_idx].buf_addr;
> +	buf_iova = buf_vec[vec_idx].buf_iova;
> +	buf_len = buf_vec[vec_idx].buf_len;
> +
> +	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	hdr_mbuf = m;
> +	hdr_addr = buf_addr;
> +	if (unlikely(buf_len < dev->hdr_len))
> +		hdr = &tmp_hdr;
> +	else
> +		hdr = (struct virtio_net_hdr_mrg_rxbuf
> *)(uintptr_t)hdr_addr;
> +
> +	VHOST_LOG(DEBUG, "(%d) RX: num merge buffers %d\n", dev->vid,
> +		  num_buffers);
> +
> +	if (unlikely(buf_len < dev->hdr_len)) {
> +		buf_offset = dev->hdr_len - buf_len;
> +		vec_idx++;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_iova = buf_vec[vec_idx].buf_iova;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +		buf_avail = buf_len - buf_offset;
> +	} else {
> +		buf_offset = dev->hdr_len;
> +		buf_avail = buf_len - dev->hdr_len;
> +	}
> +
> +	mbuf_avail = rte_pktmbuf_data_len(m);
> +	mbuf_offset = 0;
> +	while (mbuf_avail != 0 || m->next != NULL) {
> +		bool dma_copy = false;
> +
> +		/* done with current buf, get the next one */
> +		if (buf_avail == 0) {
> +			vec_idx++;
> +			if (unlikely(vec_idx >= nr_vec)) {
> +				error = -1;
> +				goto out;
> +			}
> +
> +			buf_addr = buf_vec[vec_idx].buf_addr;
> +			buf_iova = buf_vec[vec_idx].buf_iova;
> +			buf_len = buf_vec[vec_idx].buf_len;
> +
> +			buf_offset = 0;
> +			buf_avail  = buf_len;
> +		}
> +
> +		/* done with current mbuf, get the next one */
> +		if (mbuf_avail == 0) {
> +			m = m->next;
> +			mbuf_offset = 0;
> +			mbuf_avail = rte_pktmbuf_data_len(m);
> +		}
> +
> +		if (hdr_addr) {
> +			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
> +			if (rxvq_is_mergeable(dev))
> +				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
> +						    num_buffers);
> +
> +			if (unlikely(hdr == &tmp_hdr))
> +				copy_vnet_hdr_to_desc(dev, buf_vec, hdr);
> +			hdr_addr = 0;
> +		}
> +
> +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> +		if (cpy_len >= DMA_COPY_LENGTH_THRESHOLD) {
> +			dst = gpa_to_hpa(dev, buf_iova + buf_offset,
> cpy_len);
> +			dma_copy = (dst != 0);
> +		}
> +
> +		if (dma_copy) {
> +			src = rte_pktmbuf_iova_offset(m, mbuf_offset);
> +			/**
> +			 * if DMA enqueue fails, we wait until there are
> +			 * available DMA descriptors.
> +			 */
> +			while (unlikely(rte_ioat_enqueue_copy(dma_vr-
> >dev_id,
> +							      src, dst, cpy_len,
> +							      (uintptr_t)
> +							      m, 0, 0) ==
> +					0)) {
> +				int ret;
> +
> +				do {
> +					ret = free_dma_done(dev, dma_vr);
> +				} while (ret <= 0);
> +			}
> +
> +			dma_vr->nr_batching++;
> +			dma_vr->nr_inflight++;
> +			rte_mbuf_refcnt_update(m, 1);
> +		} else if (likely(cpy_len > MAX_BATCH_LEN ||
> +				  dma_vr->batch_copy_nb_elems >=
> +				  dma_vr->vr.size)) {
> +			rte_memcpy((void *)((uintptr_t)(buf_addr +
> buf_offset)),
> +				   rte_pktmbuf_mtod_offset(m, void *,
> +							   mbuf_offset),
> +				   cpy_len);
> +		} else {
> +			batch_copy[dma_vr->batch_copy_nb_elems].dst =
> +				(void *)((uintptr_t)(buf_addr + buf_offset));
> +			batch_copy[dma_vr->batch_copy_nb_elems].src =
> +				rte_pktmbuf_mtod_offset(m, void *,
> mbuf_offset);
> +			batch_copy[dma_vr->batch_copy_nb_elems].len =
> cpy_len;
> +			dma_vr->batch_copy_nb_elems++;
> +		}
> +
> +		mbuf_avail  -= cpy_len;
> +		mbuf_offset += cpy_len;
> +		buf_avail  -= cpy_len;
> +		buf_offset += cpy_len;
> +	}
> +
> +out:
> +	return error;
> +}
> +
> +static __rte_always_inline uint16_t
> +vhost_dma_enqueue_split(struct pmd_internal *dev, struct dma_vring
> *dma_vr,
> +			 struct rte_mbuf **pkts, uint32_t count)
> +{
> +	struct rte_vhost_vring *vr = &dma_vr->vr;
> +
> +	uint32_t pkt_idx = 0;
> +	uint16_t num_buffers;
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	uint16_t avail_head;
> +
> +	if (dma_vr->nr_inflight > 0)
> +		free_dma_done(dev, dma_vr);
> +
> +	avail_head = *((volatile uint16_t *)&vr->avail->idx);
> +
> +	/**
> +	 * the ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	rte_smp_rmb();
> +
> +	rte_prefetch0(&vr->avail->ring[dma_vr->last_avail_idx &
> +			(vr->size - 1)]);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->hdr_len;
> +		uint16_t nr_vec = 0;
> +
> +		if (unlikely(reserve_avail_buf_split(dev, dma_vr, pkt_len,
> +						     buf_vec, &num_buffers,
> +						     avail_head, &nr_vec) <
> +			     0)) {
> +			VHOST_LOG(INFO,
> +				  "(%d) failed to get enough desc from
> vring\n",
> +				  dev->vid);
> +			dma_vr->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		VHOST_LOG(DEBUG, "(%d) current index %d | end
> index %d\n",
> +			  dev->vid, dma_vr->last_avail_idx,
> +			  dma_vr->last_avail_idx + num_buffers);
> +
> +		if (copy_mbuf_to_desc(dev, dma_vr, pkts[pkt_idx],
> +				      buf_vec, nr_vec, num_buffers) < 0) {
> +			dma_vr->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		if (unlikely(dma_vr->nr_batching >= DMA_BATCHING_SIZE)) {
> +			/**
> +			 * kick the DMA to do copy once the number of
> +			 * batching jobs reaches the batching threshold.
> +			 */
> +			rte_ioat_do_copies(dma_vr->dev_id);
> +			dma_vr->nr_batching = 0;
> +		}
> +
> +		dma_vr->last_avail_idx += num_buffers;
> +	}
> +
> +	do_data_copy(dma_vr);
> +
> +	if (dma_vr->shadow_used_idx) {
> +		flush_shadow_used_ring_split(dev, dma_vr);
> +		vhost_dma_vring_call(dev, dma_vr);
> +	}
> +
> +	if (dma_vr->nr_batching > 0) {
> +		rte_ioat_do_copies(dma_vr->dev_id);
> +		dma_vr->nr_batching = 0;
> +	}
> +
> +	return pkt_idx;
> +}
> +
> +uint16_t
> +vhost_dma_enqueue_burst(struct pmd_internal *dev, struct dma_vring
> *dma_vr,
> +			 struct rte_mbuf **pkts, uint32_t count)
> +{
> +	return vhost_dma_enqueue_split(dev, dma_vr, pkts, count);
> +}
> +
>  int
>  vhost_dma_setup(struct pmd_internal *dev)
>  {
> @@ -69,6 +793,9 @@ vhost_dma_setup(struct pmd_internal *dev)
>  		dma_vr->used_idx_hpa =
>  			rte_mem_virt2iova(&dma_vr->vr.used->idx);
> 
> +		dma_vr->max_indices = dma_vr->vr.size;
> +		setup_ring_index(&dma_vr->indices, dma_vr->max_indices);
> +
>  		dma_vr->copy_done_used = dma_vr->last_used_idx;
>  		dma_vr->signalled_used = dma_vr->last_used_idx;
>  		dma_vr->signalled_used_valid = false;
> @@ -83,6 +810,7 @@ vhost_dma_setup(struct pmd_internal *dev)
>  		dma_vr = &dev->dma_vrings[j];
>  		rte_free(dma_vr->shadow_used_split);
>  		rte_free(dma_vr->batch_copy_elems);
> +		destroy_ring_index(&dma_vr->indices);
>  		dma_vr->shadow_used_split = NULL;
>  		dma_vr->batch_copy_elems = NULL;
>  		dma_vr->used_idx_hpa = 0;
> @@ -104,12 +832,26 @@ vhost_dma_remove(struct pmd_internal *dev)
> 
>  	for (i = 0; i < dev->nr_vrings; i++) {
>  		dma_vr = &dev->dma_vrings[i];
> +		if (dma_vr->dma_enabled) {
> +			while (dma_vr->nr_inflight > 0)
> +				dma_vr->dma_done_fn(dev, dma_vr);
> +
> +			VHOST_LOG(INFO, "Wait for outstanding DMA jobs "
> +				  "of vring %u completion\n", i);
> +			rte_rawdev_stop(dma_vr->dev_id);
> +			dma_vr->dma_enabled = false;
> +			dma_vr->nr_batching = 0;
> +			dma_vr->dev_id = -1;
> +		}
> +
>  		rte_free(dma_vr->shadow_used_split);
>  		rte_free(dma_vr->batch_copy_elems);
>  		dma_vr->shadow_used_split = NULL;
>  		dma_vr->batch_copy_elems = NULL;
>  		dma_vr->signalled_used_valid = false;
>  		dma_vr->used_idx_hpa = 0;
> +		destroy_ring_index(&dma_vr->indices);
> +		dma_vr->max_indices = 0;
>  	}
> 
>  	free(dev->mem);
> diff --git a/drivers/net/vhost/virtio_net.h b/drivers/net/vhost/virtio_net.h
> index 7f99f1d..44a7cdd 100644
> --- a/drivers/net/vhost/virtio_net.h
> +++ b/drivers/net/vhost/virtio_net.h
> @@ -14,6 +14,89 @@ extern "C" {
> 
>  #include "internal.h"
> 
> +#ifndef VIRTIO_F_RING_PACKED
> +#define VIRTIO_F_RING_PACKED 34
> +#endif
> +
> +/* batching size before invoking the DMA to perform transfers */
> +#define DMA_BATCHING_SIZE 8
> +/**
> + * copy length threshold for the DMA engine. We offload copy jobs whose
> + * lengths are greater than DMA_COPY_LENGTH_THRESHOLD to the DMA;
> for
> + * small copies, we still use the CPU to perform copies, due to startup
> + * overheads associated with the DMA.
> + *
> + * As DMA copying is asynchronous with CPU computations, we can
> + * dynamically increase or decrease the value if the DMA is busier or
> + * idler than the CPU.
> + */
> +#define DMA_COPY_LENGTH_THRESHOLD 1024
> +
> +#define vhost_used_event(vr) \
> +	(*(volatile uint16_t*)&(vr)->avail->ring[(vr)->size])
> +
> +struct ring_index {
> +	/* physical address of 'data' */
> +	uintptr_t pa;
> +	uintptr_t idx;
> +	uint16_t data;
> +	bool in_use;
> +} __rte_cache_aligned;
> +
> +static __rte_always_inline int
> +setup_ring_index(struct ring_index **indices, uint16_t num)
> +{
> +	struct ring_index *array;
> +	uint16_t i;
> +
> +	array = rte_zmalloc(NULL, sizeof(struct ring_index) * num, 0);
> +	if (!array) {
> +		*indices = NULL;
> +		return -1;
> +	}
> +
> +	for (i = 0; i < num; i++) {
> +		array[i].pa = rte_mem_virt2iova(&array[i].data);
> +		array[i].idx = i;
> +	}
> +
> +	*indices = array;
> +	return 0;
> +}
> +
> +static __rte_always_inline void
> +destroy_ring_index(struct ring_index **indices)
> +{
> +	if (!indices)
> +		return;
> +	rte_free(*indices);
> +	*indices = NULL;
> +}
> +
> +static __rte_always_inline struct ring_index *
> +get_empty_index(struct ring_index *indices, uint16_t num)
> +{
> +	uint16_t i;
> +
> +	for (i = 0; i < num; i++)
> +		if (!indices[i].in_use)
> +			break;
> +
> +	if (unlikely(i == num))
> +		return NULL;
> +
> +	indices[i].in_use = true;
> +	return &indices[i];
> +}
> +
> +static __rte_always_inline void
> +put_used_index(struct ring_index *indices, uint16_t num, uint16_t idx)
> +{
> +	if (unlikely(idx >= num))
> +		return;
> +	indices[idx].in_use = false;
> +}
> +
>  static uint64_t
>  get_blk_size(int fd)
>  {
> @@ -149,6 +232,15 @@ gpa_to_hpa(struct pmd_internal *dev, uint64_t
> gpa, uint64_t size)
>  }
> 
>  /**
> + * This function checks if packed rings are enabled.
> + */
> +static __rte_always_inline bool
> +vhost_dma_vring_is_packed(struct pmd_internal *dev)
> +{
> +	return dev->features & (1ULL << VIRTIO_F_RING_PACKED);
> +}
> +
> +/**
>   * This function gets front end's memory and vrings information.
>   * In addition, it sets up necessary data structures for enqueue
>   * and dequeue operations.
> @@ -161,6 +253,34 @@ int vhost_dma_setup(struct pmd_internal *dev);
>   */
>  void vhost_dma_remove(struct pmd_internal *dev);
> 
> +/**
> + * This function frees DMA copy-done pktmbufs for the enqueue operation.
> + *
> + * @return
> + *  the number of packets that are completed by the DMA engine
> + */
> +int free_dma_done(void *dev, void *dma_vr);
> +
> +/**
> + * This function sends packet buffers to front end's RX vring.
> + * It will free the mbufs of successfully transmitted packets.
> + *
> + * @param dev
> + *  vhost-dma device
> + * @param dma_vr
> + *  a front end's RX vring
> + * @param pkts
> + *  packets to send
> + * @param count
> + *  the number of packets to send
> + *
> + * @return
> + *  the number of packets successfully sent
> + */
> +uint16_t vhost_dma_enqueue_burst(struct pmd_internal *dev,
> +				  struct dma_vring *dma_vr,
> +				  struct rte_mbuf **pkts, uint32_t count);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> --
> 2.7.4



More information about the dev mailing list