[dpdk-dev] [PATCH v3 1/4] vhost: simplify async copy completion
Fu, Patrick
patrick.fu at intel.com
Fri Oct 9 13:16:13 CEST 2020
> > On 9/29/20 11:29 AM, Patrick Fu wrote:
> > Current async ops allows check_completed_copies() callback to return
> > arbitrary number of async iov segments finished from backend async
> > devices. This design creates complexity for vhost to handle breaking
> > transfer of a single packet (i.e. transfer completes in the middle
> > of a async descriptor) and prevents application callbacks from
> > leveraging hardware capability to offload the work. Thus, this patch
> > enforces the check_completed_copies() callback to return the number
> > of async memory descriptors, which is aligned with async transfer
> > data ops callbacks. vHost async data path are revised to work with
> > new ops define, which provides a clean and simplified processing.
> >
> > Signed-off-by: Patrick Fu <patrick.fu at intel.com>
> > ---
> > lib/librte_vhost/rte_vhost_async.h | 15 ++-
> > lib/librte_vhost/vhost.c | 20 ++--
> > lib/librte_vhost/vhost.h | 8 +-
> > lib/librte_vhost/vhost_user.c | 6 +-
> > lib/librte_vhost/virtio_net.c | 151 ++++++++++++-----------------
> > 5 files changed, 90 insertions(+), 110 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> > index cb6284539..c73bd7c99 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
> > * @param max_packets
> > * max number of packets could be completed
> > * @return
> > - * number of iov segments completed
> > + * number of async descs completed
> > */
> > uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
> > struct rte_vhost_async_status *opaque_data,
> > uint16_t max_packets);
> > };
> >
> > +/**
> > + * inflight async packet information
> > + */
> > +struct async_inflight_info {
> > + union {
> > + uint32_t info;
> > + struct {
> > + uint16_t descs; /* num of descs inflight */
> > + uint16_t segs; /* iov segs inflight */
> > + };
> > + };
>
> I think you can removing the union.
>
There are some cases in the code that we want to set descs=N and set segs=0. In such cases, reference info=N directly will be faster than setting descs & segs separately. So I would prefer to keep the union.
> > +};
> > +
> > /**
> > * dma channel feature bit definition
> > */
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> > index 8f20a0818..eca507836 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct
> vhost_virtqueue *vq)
> > rte_free(vq->shadow_used_split);
> > if (vq->async_pkts_pending)
> > rte_free(vq->async_pkts_pending);
> > - if (vq->async_pending_info)
> > - rte_free(vq->async_pending_info);
> > + if (vq->async_pkts_info)
> > + rte_free(vq->async_pkts_info);
> > }
> > rte_free(vq->batch_copy_elems);
> > rte_mempool_free(vq->iotlb_pool);
> > @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> > vq->async_pkts_pending = rte_malloc(NULL,
> > vq->size * sizeof(uintptr_t),
> > RTE_CACHE_LINE_SIZE);
> > - vq->async_pending_info = rte_malloc(NULL,
> > - vq->size * sizeof(uint64_t),
> > + vq->async_pkts_info = rte_malloc(NULL,
> > + vq->size * sizeof(struct async_inflight_info),
> > RTE_CACHE_LINE_SIZE);
> > - if (!vq->async_pkts_pending || !vq->async_pending_info) {
> > + if (!vq->async_pkts_pending || !vq->async_pkts_info) {
> > if (vq->async_pkts_pending)
> > rte_free(vq->async_pkts_pending);
> >
> > - if (vq->async_pending_info)
> > - rte_free(vq->async_pending_info);
> > + if (vq->async_pkts_info)
> > + rte_free(vq->async_pkts_info);
> >
> > VHOST_LOG_CONFIG(ERR,
> > "async register failed: cannot allocate
> memory for vq data "
> > @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid,
> uint16_t queue_id)
> > vq->async_pkts_pending = NULL;
> > }
> >
> > - if (vq->async_pending_info) {
> > - rte_free(vq->async_pending_info);
> > - vq->async_pending_info = NULL;
> > + if (vq->async_pkts_info) {
> > + rte_free(vq->async_pkts_info);
> > + vq->async_pkts_info = NULL;
> > }
> >
> > vq->async_ops.transfer_data = NULL;
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 73a1ed889..155a832c1 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -46,8 +46,6 @@
> >
> > #define MAX_PKT_BURST 32
> >
> > -#define ASYNC_MAX_POLL_SEG 255
> > -
> > #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
> > #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
> >
> > @@ -225,12 +223,10 @@ struct vhost_virtqueue {
> >
> > /* async data transfer status */
> > uintptr_t **async_pkts_pending;
> > - #define ASYNC_PENDING_INFO_N_MSK 0xFFFF
> > - #define ASYNC_PENDING_INFO_N_SFT 16
> > - uint64_t *async_pending_info;
> > + struct async_inflight_info *async_pkts_info;
> > uint16_t async_pkts_idx;
> > uint16_t async_pkts_inflight_n;
> > - uint16_t async_last_seg_n;
> > + uint16_t async_last_pkts_n;
> >
> > /* vq async features */
> > bool async_inorder;
> > diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> > index 501218e19..67d2a2d43 100644
> > --- a/lib/librte_vhost/vhost_user.c
> > +++ b/lib/librte_vhost/vhost_user.c
> > @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net
> **pdev,
> > vq->shadow_used_split = NULL;
> > if (vq->async_pkts_pending)
> > rte_free(vq->async_pkts_pending);
> > - if (vq->async_pending_info)
> > - rte_free(vq->async_pending_info);
> > + if (vq->async_pkts_info)
> > + rte_free(vq->async_pkts_info);
> > vq->async_pkts_pending = NULL;
> > - vq->async_pending_info = NULL;
> > + vq->async_pkts_info = NULL;
> > }
> >
> > rte_free(vq->batch_copy_elems);
> > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> > index bd9303c8a..68ead9a71 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> > (vq_size - n_inflight + pkts_idx) & (vq_size - 1);
> > }
> >
> > -static __rte_always_inline void
> > -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> > - struct vhost_virtqueue *vq, uint16_t queue_id,
> > - uint16_t last_idx, uint16_t shadow_idx)
> > -{
> > - uint16_t start_idx, pkts_idx, vq_size;
> > - uint64_t *async_pending_info;
> > -
> > - pkts_idx = vq->async_pkts_idx;
> > - async_pending_info = vq->async_pending_info;
> > - vq_size = vq->size;
> > - start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> > - vq_size, vq->async_pkts_inflight_n);
> > -
> > - while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> > - uint64_t n_seg =
> > - async_pending_info[(start_idx) & (vq_size - 1)] >>
> > - ASYNC_PENDING_INFO_N_SFT;
> > -
> > - while (n_seg)
> > - n_seg -= vq-
> >async_ops.check_completed_copies(dev->vid,
> > - queue_id, 0, 1);
> > - }
> > -
> > - vq->async_pkts_inflight_n = 0;
> > - vq->batch_copy_nb_elems = 0;
> > -
> > - vq->shadow_used_idx = shadow_idx;
> > - vq->last_avail_idx = last_idx;
> > -}
> > -
> > static __rte_noinline uint32_t
> > virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> > struct vhost_virtqueue *vq, uint16_t queue_id,
> > @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> > uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> > uint16_t num_buffers;
> > struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > - uint16_t avail_head, last_idx, shadow_idx;
> > + uint16_t avail_head;
> >
> > struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> > struct iovec *vec_pool = vq->vec_pool;
> > @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> > struct rte_vhost_iov_iter *src_it = it_pool;
> > struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> > uint16_t n_free_slot, slot_idx;
> > + uint16_t pkt_err = 0;
> > + struct async_inflight_info *pkts_info = vq->async_pkts_info;
> > int n_pkts = 0;
> >
> > avail_head = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE);
> > - last_idx = vq->last_avail_idx;
> > - shadow_idx = vq->shadow_used_idx;
> >
> > /*
> > * The ordering between avail index and
> > @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> > if (src_it->count) {
> > async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
> > pkt_burst_idx++;
> > - vq->async_pending_info[slot_idx] =
> > - num_buffers | (src_it->nr_segs << 16);
> > + pkts_info[slot_idx].descs = num_buffers;
> > + pkts_info[slot_idx].segs = src_it->nr_segs;
> > src_iovec += src_it->nr_segs;
> > dst_iovec += dst_it->nr_segs;
> > src_it += 2;
> > dst_it += 2;
> > } else {
> > - vq->async_pending_info[slot_idx] = num_buffers;
> > + pkts_info[slot_idx].info = num_buffers;
> > vq->async_pkts_inflight_n++;
> > }
> >
> > @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> > dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> > src_it = it_pool;
> > dst_it = it_pool + 1;
> > + vq->async_pkts_inflight_n += n_pkts;
> >
> > if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > - vq->async_pkts_inflight_n +=
> > - n_pkts > 0 ? n_pkts : 0;
> > - virtio_dev_rx_async_submit_split_err(dev,
> > - vq, queue_id, last_idx, shadow_idx);
> > - return 0;
> > + /*
> > + * log error packets number here and do
> actual
> > + * error processing when applications poll
> > + * completion
> > + */
> > + pkt_err = pkt_burst_idx - n_pkts;
> > + pkt_burst_idx = 0;
> > + break;
> > }
> >
> > pkt_burst_idx = 0;
> > - vq->async_pkts_inflight_n += n_pkts;
> > }
> > }
> >
> > if (pkt_burst_idx) {
> > n_pkts = vq->async_ops.transfer_data(dev->vid,
> > queue_id, tdes, 0, pkt_burst_idx);
> > - if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > - vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> > - virtio_dev_rx_async_submit_split_err(dev, vq,
> queue_id,
> > - last_idx, shadow_idx);
> > - return 0;
> > - }
> > -
> > vq->async_pkts_inflight_n += n_pkts;
> > +
> > + if (unlikely(n_pkts < (int)pkt_burst_idx))
> > + pkt_err = pkt_burst_idx - n_pkts;
> > }
> >
> > do_data_copy_enqueue(dev, vq);
> >
> > + if (unlikely(pkt_err)) {
> > + do {
> > + if (pkts_info[slot_idx].segs)
> > + pkt_err--;
> > + vq->last_avail_idx -= pkts_info[slot_idx].descs;
> > + vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> > + vq->async_pkts_inflight_n--;
> > + slot_idx--;
> > + pkt_idx--;
> > + } while (pkt_err);
>
> That does not sound really robust.
> Maybe you could exit also on slot_idx < 0 to avoid out of range
> accesses?
>
> } while (pkt_err && slot_idx >= 0)
>
slot_idx is unsigned type and it's design to be working like a ring index. But I do need to send a new patch to handle the case that "slot_idx==0".
For the robust, I can add a check: "pkt_idx > 0".
Thanks,
Patrick
More information about the dev
mailing list