[dpdk-dev] [PATCH 2/2] app/test-eventdev: add Tx adapter support

Pavan Nikhilesh pbhagavatula at caviumnetworks.com
Tue Sep 4 10:12:55 CEST 2018


On Tue, Sep 04, 2018 at 11:07:15AM +0530, Rao, Nikhil wrote:
> Hi Pavan,
>
> Few comments below.
>
> On 8/31/2018 4:10 PM, Pavan Nikhilesh wrote:
> > Convert existing Tx service based pipeline to Tx adapter based APIs and
> > simplify worker functions.
> >
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula at caviumnetworks.com>
> > ---
> >   app/test-eventdev/test_pipeline_atq.c    | 216 +++++++++++---------
> >   app/test-eventdev/test_pipeline_common.c | 193 ++++++------------
> >   app/test-eventdev/test_pipeline_common.h |  43 ++--
> >   app/test-eventdev/test_pipeline_queue.c  | 238 ++++++++++++-----------
> >   4 files changed, 322 insertions(+), 368 deletions(-)
> >
> > diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c
> </snip>
> > -static int
> > +static __rte_noinline int
> >   pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
> >   {
> >       PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
> > -     const uint8_t nb_stages = t->opt->nb_stages;
> > -     const uint8_t tx_queue = t->tx_service.queue_id;
> > +     const uint8_t *tx_queue = t->tx_evqueue_id;
> >
> >       while (t->done == false) {
> >               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
> > @@ -253,9 +235,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
> >
> >                       if (cq_id == last_queue) {
> >                               w->processed_pkts++;
> > -                             ev[i].queue_id = tx_queue;
> > +                             ev[i].queue_id = tx_queue[ev[i].mbuf->port];
> >                               pipeline_fwd_event(&ev[i],
> >                                               RTE_SCHED_TYPE_ATOMIC);
> > +
> Unintentional newline ?

Will remove in next version.

> >                       } else {
> >                               ev[i].sub_event_type++;
> >                               pipeline_fwd_event(&ev[i],
> >   static int
> >
> > @@ -317,23 +296,25 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> >       int nb_ports;
> >       int nb_queues;
> >       uint8_t queue;
> > -     struct rte_event_dev_info info;
> > -     struct test_pipeline *t = evt_test_priv(test);
> > -     uint8_t tx_evqueue_id = 0;
> > +     uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS] = {0};
> >       uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
> >       uint8_t nb_worker_queues = 0;
> > +     uint8_t tx_evport_id = 0;
> > +     uint16_t prod = 0;
> > +     struct rte_event_dev_info info;
> > +     struct test_pipeline *t = evt_test_priv(test);
> >
> >       nb_ports = evt_nr_active_lcores(opt->wlcores);
> >       nb_queues = rte_eth_dev_count_avail();
> >
> > -     /* One extra port and queueu for Tx service */
> > -     if (t->mt_unsafe) {
> > -             tx_evqueue_id = nb_queues;
> > -             nb_ports++;
> > -             nb_queues++;
> > +     /* One queue for Tx service */
> > +     if (!t->internal_port) {
> See comment about struct test_pipeline::internal_port in the
> test_pipeline_common.h review below.
>
> > +             RTE_ETH_FOREACH_DEV(prod) {
> > +                     tx_evqueue_id[prod] = nb_queues;
> > +                     nb_queues++;
> > +             }
> >       }
> >
> >
> > @@ -388,14 +371,11 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> >                       .new_event_threshold = info.max_num_events,
> >       };
> >
> > -     if (t->mt_unsafe) {
> > +     if (!t->internal_port) {
> >               ret = pipeline_event_port_setup(test, opt, queue_arr,
> >                               nb_worker_queues, p_conf);
> >               if (ret)
> >                       return ret;
> > -
> > -             ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
> > -                             nb_ports - 1, p_conf);
> >       } else
> >               ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
> >                               p_conf);
> > @@ -424,14 +404,17 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
> >        *      stride = 1
> >        *
> >        *      event queue pipelines:
> > -      *      eth0 -> q0
> > -      *                } (q3->tx) Tx service
> > -      *      eth1 -> q1
> > +      *      eth0 -> q0 \
> > +      *                  q3->tx
> > +      *      eth1 -> q1 /
> >        *
> >        *      q0,q1 are configured as stated above.
> >        *      q3 configured as SINGLE_LINK|ATOMIC.
> >        */
> >       ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
> > +     if (ret)
> > +             return ret;
> > +     ret = pipeline_event_tx_adapter_setup(opt, p_conf);
> pipeline_event_tx_adapter_setup() creates a tx adapter per eth port,
> that doesn't match the preceding diagram.

I will fix in next version.

>
> >
> > diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
> > index a54068df3..7f858e23f 100644
> > --- a/app/test-eventdev/test_pipeline_common.c
> > +++ b/app/test-eventdev/test_pipeline_common.c
> > @@ -5,58 +5,6 @@
> >
> </snip>
>
> > @@ -215,7 +160,6 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
> >   {
> >       uint16_t i;
> >       uint8_t nb_queues = 1;
> > -     uint8_t mt_state = 0;
> >       struct test_pipeline *t = evt_test_priv(test);
> >       struct rte_eth_rxconf rx_conf;
> >       struct rte_eth_conf port_conf = {
> > @@ -238,13 +182,13 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
> >               return -ENODEV;
> >       }
> >
> > +     t->internal_port = 0;
> >       RTE_ETH_FOREACH_DEV(i) {
> >               struct rte_eth_dev_info dev_info;
> >               struct rte_eth_conf local_port_conf = port_conf;
> > +             uint32_t caps = 0;
> >
> >               rte_eth_dev_info_get(i, &dev_info);
> > -             mt_state = !(dev_info.tx_offload_capa &
> > -                             DEV_TX_OFFLOAD_MT_LOCKFREE);
> >               rx_conf = dev_info.default_rxconf;
> >               rx_conf.offloads = port_conf.rxmode.offloads;
> >
> > @@ -279,11 +223,9 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
> >                       return -EINVAL;
> >               }
> >
> > -             t->mt_unsafe |= mt_state;
> > -             t->tx_service.tx_buf[i] =
> > -                     rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0);
> > -             if (t->tx_service.tx_buf[i] == NULL)
> > -                     rte_panic("Unable to allocate Tx buffer memory.");
> > +             rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
> > +             if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
> > +                     t->internal_port = 1;See comment about struct test_pipeline::internal_port below
> >               rte_eth_promiscuous_enable(i);
> >       }
> >
> > @@ -295,7 +237,6 @@ pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
> >               uint8_t *queue_arr, uint8_t nb_queues,
> >               const struct rte_event_port_conf p_conf)
> >   {
> > -     int i;
> >       int ret;
> >       uint8_t port;
> >       struct test_pipeline *t = evt_test_priv(test);
> > @@ -321,10 +262,9 @@ pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
> >                                               0) != nb_queues)
> >                               goto link_fail;
> >               } else {
> > -                     for (i = 0; i < nb_queues; i++) {
> > -                             if (rte_event_port_link(opt->dev_id, port,
> > -                                             &queue_arr[i], NULL, 1) != 1)
> > -                                     goto link_fail;
> > +                     if (rte_event_port_link(opt->dev_id, port, queue_arr,
> > +                                     NULL, nb_queues) != nb_queues) {
> > +                             goto link_fail;
> >                       }
> Minor, isn't it possible to replace the if (queue_arr == NULL) {} else
> {}  block with a single call to rte_event_port_link() ?

Will simplify in the next version.
>
> >
> > diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
> > index 9cd6b905b..b8939db81 100644
> > --- a/app/test-eventdev/test_pipeline_common.h
> > +++ b/app/test-eventdev/test_pipeline_common.h
> > @@ -14,6 +14,7 @@
> >   #include <rte_ethdev.h>
> >   #include <rte_eventdev.h>
> >   #include <rte_event_eth_rx_adapter.h>
> > +#include <rte_event_eth_tx_adapter.h>
> >   #include <rte_lcore.h>
> >   #include <rte_malloc.h>
> >   #include <rte_mempool.h>
> > @@ -26,6 +27,9 @@
> >   #include "evt_options.h"
> >   #include "evt_test.h"
> >
> > +#define PIPELINE_TX_ADPTR_ENQ 0x1
> > +#define PIPELINE_TX_ADPTR_FWD 0x2
> > +
> I don't see a reference to these defines.

These are artifacts from a different design, will remove.
>
> >   struct test_pipeline;
> >
> >   struct worker_data {
> > @@ -35,30 +39,19 @@ struct worker_data {
> >       struct test_pipeline *t;
> >   } __rte_cache_aligned;
> >
> > -struct tx_service_data {
> > -     uint8_t dev_id;
> > -     uint8_t queue_id;
> > -     uint8_t port_id;
> > -     uint32_t service_id;
> > -     uint64_t processed_pkts;
> > -     uint16_t nb_ethports;
> > -     struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
> > -     struct test_pipeline *t;
> > -} __rte_cache_aligned;
> > -
> >   struct test_pipeline {
> >       /* Don't change the offset of "done". Signal handler use this memory
> >        * to terminate all lcores work.
> >        */
> >       int done;
> >       uint8_t nb_workers;
> > -     uint8_t mt_unsafe;
> Can we also replace references to mt_unsafe in comments ?

Will clean up in the next version.
>
> > +     uint8_t internal_port;
> Shouldn't internal_port be a per eth device flag ? Or is there an
> assumption that all eth devices will be such that the eventdev PMD's
> internal port capability will be set ?
>

Current app design doesn't support both the models together. I will add a check
to quit when both types of PMD are detected.
>
> > diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
> > index 2e0d93d99..e1153573b 100644
> > --- a/app/test-eventdev/test_pipeline_queue.c
> > +++ b/app/test-eventdev/test_pipeline_queue.c
> > @@ -15,7 +15,7 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
> >       return (eth_count * opt->nb_stages) + eth_count;
> >   }
> >
> > -static int
> > +static __rte_noinline int
> >   pipeline_queue_worker_single_stage_tx(void *arg)
> >   {
> >       PIPELINE_WORKER_SINGLE_STAGE_INIT;
> > @@ -29,9 +29,12 @@ pipeline_queue_worker_single_stage_tx(void *arg)
> >               }
> >
> >               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> > -                     pipeline_tx_pkt(ev.mbuf);
> > +                     rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
> > +                     rte_event_eth_tx_adapter_enqueue(dev, port,
> > +                                     &ev, 1);
> Do we need a retry loop for enqueue above and at other usages in this
> patch ?

Will add it in the next version.
>
> >                       w->processed_pkts++;
> >               } else {
> > +
> new line is intentional ?
> >                       ev.queue_id++;
> >                       pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
> >                       pipeline_event_enqueue(dev, port, &ev);
> > @@ -41,11 +44,11 @@ pipeline_queue_worker_single_stage_tx(void *arg)
> >       return 0;
> >   }
> >
>
> Nikhil

Thanks for the review.

Pavan.


More information about the dev mailing list