[dpdk-dev] [PATCH v3 1/4] vhost: simplify async copy completion
Maxime Coquelin
maxime.coquelin at redhat.com
Mon Oct 5 15:46:47 CEST 2020
On 9/29/20 11:29 AM, Patrick Fu wrote:
> Current async ops allows check_completed_copies() callback to return
> arbitrary number of async iov segments finished from backend async
> devices. This design creates complexity for vhost to handle breaking
> transfer of a single packet (i.e. transfer completes in the middle
> of a async descriptor) and prevents application callbacks from
> leveraging hardware capability to offload the work. Thus, this patch
> enforces the check_completed_copies() callback to return the number
> of async memory descriptors, which is aligned with async transfer
> data ops callbacks. vHost async data path are revised to work with
> new ops define, which provides a clean and simplified processing.
>
> Signed-off-by: Patrick Fu <patrick.fu at intel.com>
> ---
> lib/librte_vhost/rte_vhost_async.h | 15 ++-
> lib/librte_vhost/vhost.c | 20 ++--
> lib/librte_vhost/vhost.h | 8 +-
> lib/librte_vhost/vhost_user.c | 6 +-
> lib/librte_vhost/virtio_net.c | 151 ++++++++++++-----------------
> 5 files changed, 90 insertions(+), 110 deletions(-)
>
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index cb6284539..c73bd7c99 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
> * @param max_packets
> * max number of packets could be completed
> * @return
> - * number of iov segments completed
> + * number of async descs completed
> */
> uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
> struct rte_vhost_async_status *opaque_data,
> uint16_t max_packets);
> };
>
> +/**
> + * inflight async packet information
> + */
> +struct async_inflight_info {
> + union {
> + uint32_t info;
> + struct {
> + uint16_t descs; /* num of descs inflight */
> + uint16_t segs; /* iov segs inflight */
> + };
> + };
I think you can removing the union.
> +};
> +
> /**
> * dma channel feature bit definition
> */
> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> index 8f20a0818..eca507836 100644
> --- a/lib/librte_vhost/vhost.c
> +++ b/lib/librte_vhost/vhost.c
> @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct vhost_virtqueue *vq)
> rte_free(vq->shadow_used_split);
> if (vq->async_pkts_pending)
> rte_free(vq->async_pkts_pending);
> - if (vq->async_pending_info)
> - rte_free(vq->async_pending_info);
> + if (vq->async_pkts_info)
> + rte_free(vq->async_pkts_info);
> }
> rte_free(vq->batch_copy_elems);
> rte_mempool_free(vq->iotlb_pool);
> @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
> vq->async_pkts_pending = rte_malloc(NULL,
> vq->size * sizeof(uintptr_t),
> RTE_CACHE_LINE_SIZE);
> - vq->async_pending_info = rte_malloc(NULL,
> - vq->size * sizeof(uint64_t),
> + vq->async_pkts_info = rte_malloc(NULL,
> + vq->size * sizeof(struct async_inflight_info),
> RTE_CACHE_LINE_SIZE);
> - if (!vq->async_pkts_pending || !vq->async_pending_info) {
> + if (!vq->async_pkts_pending || !vq->async_pkts_info) {
> if (vq->async_pkts_pending)
> rte_free(vq->async_pkts_pending);
>
> - if (vq->async_pending_info)
> - rte_free(vq->async_pending_info);
> + if (vq->async_pkts_info)
> + rte_free(vq->async_pkts_info);
>
> VHOST_LOG_CONFIG(ERR,
> "async register failed: cannot allocate memory for vq data "
> @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id)
> vq->async_pkts_pending = NULL;
> }
>
> - if (vq->async_pending_info) {
> - rte_free(vq->async_pending_info);
> - vq->async_pending_info = NULL;
> + if (vq->async_pkts_info) {
> + rte_free(vq->async_pkts_info);
> + vq->async_pkts_info = NULL;
> }
>
> vq->async_ops.transfer_data = NULL;
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 73a1ed889..155a832c1 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -46,8 +46,6 @@
>
> #define MAX_PKT_BURST 32
>
> -#define ASYNC_MAX_POLL_SEG 255
> -
> #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
> #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
>
> @@ -225,12 +223,10 @@ struct vhost_virtqueue {
>
> /* async data transfer status */
> uintptr_t **async_pkts_pending;
> - #define ASYNC_PENDING_INFO_N_MSK 0xFFFF
> - #define ASYNC_PENDING_INFO_N_SFT 16
> - uint64_t *async_pending_info;
> + struct async_inflight_info *async_pkts_info;
> uint16_t async_pkts_idx;
> uint16_t async_pkts_inflight_n;
> - uint16_t async_last_seg_n;
> + uint16_t async_last_pkts_n;
>
> /* vq async features */
> bool async_inorder;
> diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> index 501218e19..67d2a2d43 100644
> --- a/lib/librte_vhost/vhost_user.c
> +++ b/lib/librte_vhost/vhost_user.c
> @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
> vq->shadow_used_split = NULL;
> if (vq->async_pkts_pending)
> rte_free(vq->async_pkts_pending);
> - if (vq->async_pending_info)
> - rte_free(vq->async_pending_info);
> + if (vq->async_pkts_info)
> + rte_free(vq->async_pkts_info);
> vq->async_pkts_pending = NULL;
> - vq->async_pending_info = NULL;
> + vq->async_pkts_info = NULL;
> }
>
> rte_free(vq->batch_copy_elems);
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index bd9303c8a..68ead9a71 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
> (vq_size - n_inflight + pkts_idx) & (vq_size - 1);
> }
>
> -static __rte_always_inline void
> -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> - struct vhost_virtqueue *vq, uint16_t queue_id,
> - uint16_t last_idx, uint16_t shadow_idx)
> -{
> - uint16_t start_idx, pkts_idx, vq_size;
> - uint64_t *async_pending_info;
> -
> - pkts_idx = vq->async_pkts_idx;
> - async_pending_info = vq->async_pending_info;
> - vq_size = vq->size;
> - start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> - vq_size, vq->async_pkts_inflight_n);
> -
> - while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> - uint64_t n_seg =
> - async_pending_info[(start_idx) & (vq_size - 1)] >>
> - ASYNC_PENDING_INFO_N_SFT;
> -
> - while (n_seg)
> - n_seg -= vq->async_ops.check_completed_copies(dev->vid,
> - queue_id, 0, 1);
> - }
> -
> - vq->async_pkts_inflight_n = 0;
> - vq->batch_copy_nb_elems = 0;
> -
> - vq->shadow_used_idx = shadow_idx;
> - vq->last_avail_idx = last_idx;
> -}
> -
> static __rte_noinline uint32_t
> virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> uint16_t num_buffers;
> struct buf_vector buf_vec[BUF_VECTOR_MAX];
> - uint16_t avail_head, last_idx, shadow_idx;
> + uint16_t avail_head;
>
> struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> struct iovec *vec_pool = vq->vec_pool;
> @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> struct rte_vhost_iov_iter *src_it = it_pool;
> struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> uint16_t n_free_slot, slot_idx;
> + uint16_t pkt_err = 0;
> + struct async_inflight_info *pkts_info = vq->async_pkts_info;
> int n_pkts = 0;
>
> avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
> - last_idx = vq->last_avail_idx;
> - shadow_idx = vq->shadow_used_idx;
>
> /*
> * The ordering between avail index and
> @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> if (src_it->count) {
> async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
> pkt_burst_idx++;
> - vq->async_pending_info[slot_idx] =
> - num_buffers | (src_it->nr_segs << 16);
> + pkts_info[slot_idx].descs = num_buffers;
> + pkts_info[slot_idx].segs = src_it->nr_segs;
> src_iovec += src_it->nr_segs;
> dst_iovec += dst_it->nr_segs;
> src_it += 2;
> dst_it += 2;
> } else {
> - vq->async_pending_info[slot_idx] = num_buffers;
> + pkts_info[slot_idx].info = num_buffers;
> vq->async_pkts_inflight_n++;
> }
>
> @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> src_it = it_pool;
> dst_it = it_pool + 1;
> + vq->async_pkts_inflight_n += n_pkts;
>
> if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> - vq->async_pkts_inflight_n +=
> - n_pkts > 0 ? n_pkts : 0;
> - virtio_dev_rx_async_submit_split_err(dev,
> - vq, queue_id, last_idx, shadow_idx);
> - return 0;
> + /*
> + * log error packets number here and do actual
> + * error processing when applications poll
> + * completion
> + */
> + pkt_err = pkt_burst_idx - n_pkts;
> + pkt_burst_idx = 0;
> + break;
> }
>
> pkt_burst_idx = 0;
> - vq->async_pkts_inflight_n += n_pkts;
> }
> }
>
> if (pkt_burst_idx) {
> n_pkts = vq->async_ops.transfer_data(dev->vid,
> queue_id, tdes, 0, pkt_burst_idx);
> - if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> - vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> - virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
> - last_idx, shadow_idx);
> - return 0;
> - }
> -
> vq->async_pkts_inflight_n += n_pkts;
> +
> + if (unlikely(n_pkts < (int)pkt_burst_idx))
> + pkt_err = pkt_burst_idx - n_pkts;
> }
>
> do_data_copy_enqueue(dev, vq);
>
> + if (unlikely(pkt_err)) {
> + do {
> + if (pkts_info[slot_idx].segs)
> + pkt_err--;
> + vq->last_avail_idx -= pkts_info[slot_idx].descs;
> + vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> + vq->async_pkts_inflight_n--;
> + slot_idx--;
> + pkt_idx--;
> + } while (pkt_err);
That does not sound really robust.
Maybe you could exit also on slot_idx < 0 to avoid out of range
accesses?
} while (pkt_err && slot_idx >= 0)
> + }
> +
> n_free_slot = vq->size - vq->async_pkts_idx;
> if (n_free_slot > pkt_idx) {
> rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> @@ -1640,10 +1620,10 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> {
> struct virtio_net *dev = get_device(vid);
> struct vhost_virtqueue *vq;
> - uint16_t n_segs_cpl, n_pkts_put = 0, n_descs = 0;
> + uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> uint16_t start_idx, pkts_idx, vq_size;
> uint16_t n_inflight;
> - uint64_t *async_pending_info;
> + struct async_inflight_info *pkts_info;
>
> if (!dev)
> return 0;
> @@ -1657,51 +1637,40 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>
> vq = dev->virtqueue[queue_id];
>
> + if (unlikely(!vq->async_registered)) {
> + VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
> + dev->vid, __func__, queue_id);
> + return 0;
> + }
> +
> rte_spinlock_lock(&vq->access_lock);
>
> n_inflight = vq->async_pkts_inflight_n;
> pkts_idx = vq->async_pkts_idx;
> - async_pending_info = vq->async_pending_info;
> + pkts_info = vq->async_pkts_info;
> vq_size = vq->size;
> start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> vq_size, vq->async_pkts_inflight_n);
>
> - n_segs_cpl = vq->async_ops.check_completed_copies(vid, queue_id,
> - 0, ASYNC_MAX_POLL_SEG - vq->async_last_seg_n) +
> - vq->async_last_seg_n;
> + if (count > vq->async_last_pkts_n)
> + n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
> + queue_id, 0, count - vq->async_last_pkts_n);
> + n_pkts_cpl += vq->async_last_pkts_n;
>
> rte_smp_wmb();
>
> while (likely((n_pkts_put < count) && n_inflight)) {
> - uint64_t info = async_pending_info[
> - (start_idx + n_pkts_put) & (vq_size - 1)];
> - uint64_t n_segs;
> + uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
> + if (n_pkts_cpl && pkts_info[info_idx].segs)
> + n_pkts_cpl--;
> + else if (!n_pkts_cpl && pkts_info[info_idx].segs)
> + break;
> n_pkts_put++;
> n_inflight--;
> - n_descs += info & ASYNC_PENDING_INFO_N_MSK;
> - n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
> -
> - if (n_segs) {
> - if (unlikely(n_segs_cpl < n_segs)) {
> - n_pkts_put--;
> - n_inflight++;
> - n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
> - if (n_segs_cpl) {
> - async_pending_info[
> - (start_idx + n_pkts_put) &
> - (vq_size - 1)] =
> - ((n_segs - n_segs_cpl) <<
> - ASYNC_PENDING_INFO_N_SFT) |
> - (info & ASYNC_PENDING_INFO_N_MSK);
> - n_segs_cpl = 0;
> - }
> - break;
> - }
> - n_segs_cpl -= n_segs;
> - }
> + n_descs += pkts_info[info_idx].descs;
> }
>
> - vq->async_last_seg_n = n_segs_cpl;
> + vq->async_last_pkts_n = n_pkts_cpl;
>
> if (n_pkts_put) {
> vq->async_pkts_inflight_n = n_inflight;
> @@ -1710,16 +1679,18 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> n_descs, __ATOMIC_RELEASE);
> vhost_vring_call_split(dev, vq);
> }
> - }
>
> - if (start_idx + n_pkts_put <= vq_size) {
> - rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> - n_pkts_put * sizeof(uintptr_t));
> - } else {
> - rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> - (vq_size - start_idx) * sizeof(uintptr_t));
> - rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
> - (n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
> + if (start_idx + n_pkts_put <= vq_size) {
> + rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> + n_pkts_put * sizeof(uintptr_t));
> + } else {
> + rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> + (vq_size - start_idx) * sizeof(uintptr_t));
> + rte_memcpy(&pkts[vq_size - start_idx],
> + vq->async_pkts_pending,
> + (n_pkts_put + start_idx - vq_size) *
> + sizeof(uintptr_t));
> + }
> }
>
> rte_spinlock_unlock(&vq->access_lock);
>
More information about the dev
mailing list