[dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added sample app

Hunt, David david.hunt at intel.com
Mon Jun 26 16:46:47 CEST 2017


Hi Jerin,

I'm assisting Harry on the sample app, and have just pushed up a V2 
patch based on your feedback. I've addressed most of your suggestions, 
comments below. There may still a couple of outstanding questions that 
need further discussion.

Regards,
Dave

On 10/5/2017 3:12 PM, Jerin Jacob wrote:
 > -----Original Message-----
 >> Date: Fri, 21 Apr 2017 10:51:37 +0100
 >> From: Harry van Haaren <harry.van.haaren at intel.com>
 >> To: dev at dpdk.org
 >> CC: jerin.jacob at caviumnetworks.com, Harry van Haaren
 >> <harry.van.haaren at intel.com>, Gage Eads <gage.eads at intel.com>, Bruce
 >>  Richardson <bruce.richardson at intel.com>
 >> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
 >> X-Mailer: git-send-email 2.7.4
 >>
 >> This commit adds a sample app for the eventdev library.
 >> The app has been tested with DPDK 17.05-rc2, hence this
 >> release (or later) is recommended.
 >>
 >> The sample app showcases a pipeline processing use-case,
 >> with event scheduling and processing defined per stage.
 >> The application recieves traffic as normal, with each
 >> packet traversing the pipeline. Once the packet has
 >> been processed by each of the pipeline stages, it is
 >> transmitted again.
 >>
 >> The app provides a framework to utilize cores for a single
 >> role or multiple roles. Examples of roles are the RX core,
 >> TX core, Scheduling core (in the case of the event/sw PMD),
 >> and worker cores.
 >>
 >> Various flags are available to configure numbers of stages,
 >> cycles of work at each stage, type of scheduling, number of
 >> worker cores, queue depths etc. For a full explaination,
 >> please refer to the documentation.
 >>
 >> Signed-off-by: Gage Eads <gage.eads at intel.com>
 >> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
 >> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
 >
 > Thanks for the example application to share the SW view.
 > I could make it run on HW after some tweaking(not optimized though)
 >
 > [...]
 >> +#define MAX_NUM_STAGES 8
 >> +#define BATCH_SIZE 16
 >> +#define MAX_NUM_CORE 64
 >
 > How about RTE_MAX_LCORE?

Core usage in the sample app is held in a uint64_t. Adding arrays would 
be possible, but I feel that the extra effort would not give that much 
benefit. I've left as is for the moment, unless you see any strong 
requirement to go beyond 64 cores?

 >
 >> +
 >> +static unsigned int active_cores;
 >> +static unsigned int num_workers;
 >> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
 >> +static unsigned int num_fids = 512;
 >> +static unsigned int num_priorities = 1;
 >
 > looks like its not used.

Yes, Removed.

 >
 >> +static unsigned int num_stages = 1;
 >> +static unsigned int worker_cq_depth = 16;
 >> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
 >> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
 >> +static int16_t qid[MAX_NUM_STAGES] = {-1};
 >
 > Moving all fastpath related variables under a structure with cache
 > aligned will help.

I tried a few different combinations of this, and saw no gains, some 
losses. So will leave as is for the moment, if that's OK.

 >
 >> +static int worker_cycles;
 >> +static int enable_queue_priorities;
 >> +
 >> +struct prod_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +    int32_t qid;
 >> +    unsigned num_nic_ports;
 >> +};

Yes, saw a percent or two gain when this plus following two data structs 
cache aligned.

 >
 > cache aligned ?
 >
 >> +
 >> +struct cons_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +};
 >> +
 >
 > cache aligned ?

Yes, see comment above

 >
 >> +static struct prod_data prod_data;
 >> +static struct cons_data cons_data;
 >> +
 >> +struct worker_data {
 >> +    uint8_t dev_id;
 >> +    uint8_t port_id;
 >> +};
 >
 > cache aligned ?

Yes, see comment above


 >
 >> +
 >> +static unsigned *enqueue_cnt;
 >> +static unsigned *dequeue_cnt;
 >> +
 >> +static volatile int done;
 >> +static volatile int prod_stop;
 >
 > No one updating the prod_stop.

Old var, removed.

 >
 >> +static int quiet;
 >> +static int dump_dev;
 >> +static int dump_dev_signal;
 >> +
 >> +static uint32_t rx_lock;
 >> +static uint32_t tx_lock;
 >> +static uint32_t sched_lock;
 >> +static bool rx_single;
 >> +static bool tx_single;
 >> +static bool sched_single;
 >> +
 >> +static unsigned rx_core[MAX_NUM_CORE];
 >> +static unsigned tx_core[MAX_NUM_CORE];
 >> +static unsigned sched_core[MAX_NUM_CORE];
 >> +static unsigned worker_core[MAX_NUM_CORE];
 >> +
 >> +static bool
 >> +core_in_use(unsigned lcore_id) {
 >> +    return (rx_core[lcore_id] || sched_core[lcore_id] ||
 >> +        tx_core[lcore_id] || worker_core[lcore_id]);
 >> +}
 >> +
 >> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
 >> +
 >> +static void
 >> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
 >> +            void *userdata)
 >
 > IMO, It is better to not use rte_eth_* for application functions.

Sure, removed the 'rte_' part of the function name.

 >
 >> +{
 >> +    int port_id = (uintptr_t) userdata;
 >> +    unsigned _sent = 0;
 >> +
 >> +    do {
 >> +        /* Note: hard-coded TX queue */
 >> +        _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
 >> +                      unsent - _sent);
 >> +    } while (_sent != unsent);
 >> +}
 >> +
 >> +static int
 >> +consumer(void)
 >> +{
 >> +    const uint64_t freq_khz = rte_get_timer_hz() / 1000;
 >> +    struct rte_event packets[BATCH_SIZE];
 >> +
 >> +    static uint64_t npackets;
 >> +    static uint64_t received;
 >> +    static uint64_t received_printed;
 >> +    static uint64_t time_printed;
 >> +    static uint64_t start_time;
 >> +    unsigned i, j;
 >> +    uint8_t dev_id = cons_data.dev_id;
 >> +    uint8_t port_id = cons_data.port_id;
 >> +
 >> +    if (!npackets)
 >> +        npackets = num_packets;
 >> +
 >> +    do {
 >> +        uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
 >> +                packets, RTE_DIM(packets), 0);
 >
 >         const uint16_t n =

sure.

 >
 >> +
 >> +        if (n == 0) {
 >> +            for (j = 0; j < rte_eth_dev_count(); j++)
 >> +                rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
 >> +            return 0;
 >> +        }
 >> +        if (start_time == 0)
 >> +            time_printed = start_time = rte_get_timer_cycles();
 >> +
 >> +        received += n;
 >> +        for (i = 0; i < n; i++) {
 >> +            uint8_t outport = packets[i].mbuf->port;
 >> +            rte_eth_tx_buffer(outport, 0, tx_buf[outport],
 >> +                    packets[i].mbuf);
 >> +        }
 >> +
 >> +        if (!quiet && received >= received_printed + (1<<22)) {
 >> +            const uint64_t now = rte_get_timer_cycles();
 >> +            const uint64_t delta_cycles = now - start_time;
 >> +            const uint64_t elapsed_ms = delta_cycles / freq_khz;
 >> +            const uint64_t interval_ms =
 >> +                    (now - time_printed) / freq_khz;
 >> +
 >> +            uint64_t rx_noprint = received - received_printed;
 >> +            printf("# consumer RX=%"PRIu64", time %"PRIu64
 >> +                "ms, avg %.3f mpps [current %.3f mpps]\n",
 >> +                    received, elapsed_ms,
 >> +                    (received) / (elapsed_ms * 1000.0),
 >> +                    rx_noprint / (interval_ms * 1000.0));
 >> +            received_printed = received;
 >> +            time_printed = now;
 >> +        }
 >> +
 >> +        dequeue_cnt[0] += n;
 >> +
 >> +        if (num_packets > 0 && npackets > 0) {
 >> +            npackets -= n;
 >> +            if (npackets == 0 || npackets > num_packets)
 >> +                done = 1;
 >> +        }
 >
 > Looks like very complicated logic.I think we can simplify it.

I've simplified this.


 >
 >> +    } while (0);
 >
 > do while(0); really required here?

Removed.

 >
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +static int
 >> +producer(void)
 >> +{
 >> +    static uint8_t eth_port;
 >> +    struct rte_mbuf *mbufs[BATCH_SIZE];
 >> +    struct rte_event ev[BATCH_SIZE];
 >> +    uint32_t i, num_ports = prod_data.num_nic_ports;
 >> +    int32_t qid = prod_data.qid;
 >> +    uint8_t dev_id = prod_data.dev_id;
 >> +    uint8_t port_id = prod_data.port_id;
 >> +    uint32_t prio_idx = 0;
 >> +
 >> +    const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, 
BATCH_SIZE);
 >> +    if (++eth_port == num_ports)
 >> +        eth_port = 0;
 >> +    if (nb_rx == 0) {
 >> +        rte_pause();
 >> +        return 0;
 >> +    }
 >> +
 >> +    for (i = 0; i < nb_rx; i++) {
 >> +        ev[i].flow_id = mbufs[i]->hash.rss;
 >
 > prefetching the buff[i+1] may help here?

I tried, didn't make much difference.

 >
 >> +        ev[i].op = RTE_EVENT_OP_NEW;
 >> +        ev[i].sched_type = queue_type;
 >
 > The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != 
RTE_SCHED_TYPE_ORDERED. So, we
 > cannot assign .sched_type as queue_type.
 >
 > I think, one option could be to avoid translation in application is to
 > - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
 > - Introduce a new RTE_EVENT_DEV_CAP_ to denote 
RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
 > ability
 > - add sched_type in struct rte_event_queue_conf. If capability flag is
 >   not set then implementation takes sched_type value for the queue.
 >
 > Any thoughts?


Not sure here, would it be ok for the moment, and we can work on a patch 
in the future?

 >
 >
 >> +        ev[i].queue_id = qid;
 >> +        ev[i].event_type = RTE_EVENT_TYPE_CPU;
 >
 > IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
 > producing the Ethernet packets/events.

Changed to RTE_EVENT_TYPE_ETHDEV.


 >
 >> +        ev[i].sub_event_type = 0;
 >> +        ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
 >> +        ev[i].mbuf = mbufs[i];
 >> +        RTE_SET_USED(prio_idx);
 >> +    }
 >> +
 >> +    const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, 
nb_rx);
 >
 > For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do 
burst
 > operation unlike FORWARD case(which is one event at a time).Earlier, I
 > thought I can abstract the producer pattern in PMD, but it looks like we
 > are going with application driven producer model based on latest 
RFC.So I think,
 > we can add one flag to rte_event_enqueue_burst to denote all the events
 > are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.
 >
 > I can send a patch for the same.
 >
 > Any thoughts?

I think this comment is closed now, as your patch is upstreamed, afaik?

 >
 >
 >> +    if (nb_tx != nb_rx) {
 >> +        for (i = nb_tx; i < nb_rx; i++)
 >> +            rte_pktmbuf_free(mbufs[i]);
 >> +    }
 >> +    enqueue_cnt[0] += nb_tx;
 >> +
 >> +    if (unlikely(prod_stop))
 >
 > I think, No one updating the prod_stop

Removed.

 >
 >> +        done = 1;
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +static inline void
 >> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
 >> +{
 >> +    if (rx_core[lcore_id] && (rx_single ||
 >> +        rte_atomic32_cmpset(&rx_lock, 0, 1))) {
 >
 > This pattern(rte_atomic32_cmpset) makes application can inject only
 > "one core" worth of packets. Not enough for low-end cores. May be we need
 > multiple producer options. I think, new RFC is addressing it.

OK, will leave this to the RFC.

 >
 >> +        producer();
 >> +        rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
 >> +    }
 >> +
 >> +    if (sched_core[lcore_id] && (sched_single ||
 >> +        rte_atomic32_cmpset(&sched_lock, 0, 1))) {
 >> +        rte_event_schedule(dev_id);
 >> +        if (dump_dev_signal) {
 >> +            rte_event_dev_dump(0, stdout);
 >> +            dump_dev_signal = 0;
 >> +        }
 >> +        rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
 >> +    }
 >
 > Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set.
 >
 > I think, We can make common code with compile time aware and make
 > runtime workers based on the flag..
 > i.e
 > rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id);
 > rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id);
 >
 > May we can improve after initial version.

Yes, we can clean up after initial version.


 >
 >> +
 >> +    if (tx_core[lcore_id] && (tx_single ||
 >> +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
 >> +        consumer();
 >
 > Should consumer() need to come in this pattern? I am thinking like
 > if events is from last stage then call consumer() in worker()
 >
 > I think, above scheme works better when the _same_ worker code need 
to run the
 > case where
 > 1) ethdev HW is capable to enqueuing the packets to same txq from
 >   multiple thread
 > 2) ethdev is not capable to do so.
 >
 > So, The above cases can be addressed in configuration time where we link
 > the queues to port
 > case 1) Link all workers to last queue
 > case 2) Link only worker to last queue
 >
 > and keeping the common worker code.
 >
 > HW implementation has functional and performance issue if "two" ports are
 > assigned to one lcore for dequeue. The above scheme fixes that 
problem too.


Can we have a bit more discussion on this item? Is this needed for this 
sample app, or can we perhaps work a patch for this later? Harry?


 >
 >> +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
 >> +    }
 >> +}
 >> +
 >> +static int
 >> +worker(void *arg)
 >> +{
 >> +    struct rte_event events[BATCH_SIZE];
 >> +
 >> +    struct worker_data *data = (struct worker_data *)arg;
 >> +    uint8_t dev_id = data->dev_id;
 >> +    uint8_t port_id = data->port_id;
 >> +    size_t sent = 0, received = 0;
 >> +    unsigned lcore_id = rte_lcore_id();
 >> +
 >> +    while (!done) {
 >> +        uint16_t i;
 >> +
 >> +        schedule_devices(dev_id, lcore_id);
 >> +
 >> +        if (!worker_core[lcore_id]) {
 >> +            rte_pause();
 >> +            continue;
 >> +        }
 >> +
 >> +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
 >> +                events, RTE_DIM(events), 0);
 >> +
 >> +        if (nb_rx == 0) {
 >> +            rte_pause();
 >> +            continue;
 >> +        }
 >> +        received += nb_rx;
 >> +
 >> +        for (i = 0; i < nb_rx; i++) {
 >> +            struct ether_hdr *eth;
 >> +            struct ether_addr addr;
 >> +            struct rte_mbuf *m = events[i].mbuf;
 >> +
 >> +            /* The first worker stage does classification */
 >> +            if (events[i].queue_id == qid[0])
 >> +                events[i].flow_id = m->hash.rss % num_fids;
 >
 > Not sure why we need do(shrinking the flows) this in worker() in 
queue based pipeline.
 > If an PMD has any specific requirement on num_fids,I think, we
 > can move this configuration stage or PMD can choose optimum fid 
internally to
 > avoid modulus operation tax in fastpath in all PMD.
 >
 > Does struct rte_event_queue_conf.nb_atomic_flows help here?

In my tests the modulus makes very little difference in the throughput. 
And I think it's good to have a way of varying the number of flows for 
testing different scenarios, even if it's not the most performant.

 >
 >> +
 >> +            events[i].queue_id = next_qid[events[i].queue_id];
 >> +            events[i].op = RTE_EVENT_OP_FORWARD;
 >
 > missing events[i].sched_type.HW PMD does not work with this.
 > I think, we can use similar scheme like next_qid for next_sched_type.

Done. added events[i].sched_type = queue_type.

 >
 >> +
 >> +            /* change mac addresses on packet (to use mbuf data) */
 >> +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
 >> +            ether_addr_copy(&eth->d_addr, &addr);
 >> +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
 >> +            ether_addr_copy(&addr, &eth->s_addr);
 >
 > IMO, We can make packet processing code code as "static inline 
function" so
 > different worker types can reuse.

Done. moved out to a work() function.

 >
 >> +
 >> +            /* do a number of cycles of work per packet */
 >> +            volatile uint64_t start_tsc = rte_rdtsc();
 >> +            while (rte_rdtsc() < start_tsc + worker_cycles)
 >> +                rte_pause();
 >
 > Ditto.

Done. moved out to a work() function.

 >
 > I think, All worker specific variables like "worker_cycles" can moved 
into
 > one structure and use.
 >
 >> +        }
 >> +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
 >> +                events, nb_rx);
 >> +        while (nb_tx < nb_rx && !done)
 >> +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
 >> +                            events + nb_tx,
 >> +                            nb_rx - nb_tx);
 >> +        sent += nb_tx;
 >> +    }
 >> +
 >> +    if (!quiet)
 >> +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
 >> +                rte_lcore_id(), received, sent);
 >> +
 >> +    return 0;
 >> +}
 >> +
 >> +/*
 >> + * Parse the coremask given as argument (hexadecimal string) and fill
 >> + * the global configuration (core role and core count) with the parsed
 >> + * value.
 >> + */
 >> +static int xdigit2val(unsigned char c)
 >
 > multiple instance of "xdigit2val" in DPDK repo. May be we can push this
 > as common code.

Sure, that's something we can look at in a separate patch, now that it's 
being used more and more.

 >
 >> +{
 >> +    int val;
 >> +
 >> +    if (isdigit(c))
 >> +        val = c - '0';
 >> +    else if (isupper(c))
 >> +        val = c - 'A' + 10;
 >> +    else
 >> +        val = c - 'a' + 10;
 >> +    return val;
 >> +}
 >> +
 >> +
 >> +static void
 >> +usage(void)
 >> +{
 >> +    const char *usage_str =
 >> +        "  Usage: eventdev_demo [options]\n"
 >> +        "  Options:\n"
 >> +        "  -n, --packets=N              Send N packets (default 
~32M), 0 implies no limit\n"
 >> +        "  -f, --atomic-flows=N         Use N random flows from 1 
to N (default 16)\n"
 >
 > I think, this parameter now, effects the application fast path code.I 
think,
 > it should eventdev configuration para-mater.

See above comment on num_fids

 >
 >> +        "  -s, --num_stages=N           Use N atomic stages 
(default 1)\n"
 >> +        "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core 
mask\n"
 >> +        "  -w, --worker-mask=core mask  Run worker on CPUs in core 
mask\n"
 >> +        "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core 
mask\n"
 >> +        "  -e  --sched-mask=core mask   Run scheduler on CPUs in 
core mask\n"
 >> +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
 >> +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
 >> +        "  -P  --queue-priority         Enable scheduler queue 
prioritization\n"
 >> +        "  -o, --ordered                Use ordered scheduling\n"
 >> +        "  -p, --parallel               Use parallel scheduling\n"
 >
 > IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
 > operation. It is valid have to any combination. We need to express 
that in
 > command like
 > example:
 > 3 stage with
 > O->A->P

How about we add an option that specifies the mode of operation for each 
stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4 
stages with atomic, parallel, paralled, ordered. Or maybe reuse your 
test-eventdev parameter style?

 >
 >> +        "  -q, --quiet                  Minimize printed output\n"
 >> +        "  -D, --dump                   Print detailed statistics 
before exit"
 >> +        "\n";
 >> +    fprintf(stderr, "%s", usage_str);
 >> +    exit(1);
 >> +}
 >> +
 >
 > [...]
 >
 >> +            rx_single = (popcnt == 1);
 >> +            break;
 >> +        case 't':
 >> +            tx_lcore_mask = parse_coremask(optarg);
 >> +            popcnt = __builtin_popcountll(tx_lcore_mask);
 >> +            tx_single = (popcnt == 1);
 >> +            break;
 >> +        case 'e':
 >> +            sched_lcore_mask = parse_coremask(optarg);
 >> +            popcnt = __builtin_popcountll(sched_lcore_mask);
 >> +            sched_single = (popcnt == 1);
 >> +            break;
 >> +        default:
 >> +            usage();
 >> +        }
 >> +    }
 >> +
 >> +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
 >> +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
 >
 > Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask
 > is zero can be valid case.

I'll seperate this out to a check for RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED.
Need to do it later in eventdev_setup(), after eventdev instance is created.

 >
 >> +        printf("Core part of pipeline was not assigned any cores. "
 >> +            "This will stall the pipeline, please check core masks "
 >> +            "(use -h for details on setting core masks):\n"
 >> +            "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
 >> +            "\n\tworkers: %"PRIu64"\n",
 >> +            rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
 >> +            worker_lcore_mask);
 >> +        rte_exit(-1, "Fix core masks\n");
 >> +    }
 >> +    if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
 >> +        usage();
 >> +
 >> +    for (i = 0; i < MAX_NUM_CORE; i++) {
 >> +        rx_core[i] = !!(rx_lcore_mask & (1UL << i));
 >> +        tx_core[i] = !!(tx_lcore_mask & (1UL << i));
 >> +        sched_core[i] = !!(sched_lcore_mask & (1UL << i));
 >> +        worker_core[i] = !!(worker_lcore_mask & (1UL << i));
 >> +
 >> +        if (worker_core[i])
 >> +            num_workers++;
 >> +        if (core_in_use(i))
 >> +            active_cores++;
 >> +    }
 >> +}
 >> +
 >> +
 >> +struct port_link {
 >> +    uint8_t queue_id;
 >> +    uint8_t priority;
 >> +};
 >> +
 >> +static int
 >> +setup_eventdev(struct prod_data *prod_data,
 >> +        struct cons_data *cons_data,
 >> +        struct worker_data *worker_data)
 >> +{
 >> +    const uint8_t dev_id = 0;
 >> +    /* +1 stages is for a SINGLE_LINK TX stage */
 >> +    const uint8_t nb_queues = num_stages + 1;
 >> +    /* + 2 is one port for producer and one for consumer */
 >> +    const uint8_t nb_ports = num_workers + 2;
 >
 > selection of number of ports is a function of rte_event_has_producer().
 > I think, it will be addressed with RFC.
 >
 >> +    const struct rte_event_dev_config config = {
 >> +            .nb_event_queues = nb_queues,
 >> +            .nb_event_ports = nb_ports,
 >> +            .nb_events_limit  = 4096,
 >> +            .nb_event_queue_flows = 1024,
 >> +            .nb_event_port_dequeue_depth = 128,
 >> +            .nb_event_port_enqueue_depth = 128,
 >
 > OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and
 > .nb_event_port_enqueue_depth = 1 and  struct 
rte_event_dev_info.min_dequeue_timeout_ns
 > = 853 value.
 > I think, we need to check the rte_event_dev_info_get() first to get 
the sane
 > values and take RTE_MIN or RTE_MAX based on the use case.
 >
 > or
 >
 > I can ignore this value in OCTEONTX PMD. But I am not sure NXP case,
 > Any thoughts from NXP folks.
 >
 >

Added in code to do the relevant checks. It should now limit the 
enqueue/dequeue depths to the configured depth for the port if it's less.



 >> +    };
 >> +    const struct rte_event_port_conf wkr_p_conf = {
 >> +            .dequeue_depth = worker_cq_depth,
 >
 > Same as above

See previous comment.

 >
 >> +            .enqueue_depth = 64,
 >
 > Same as above

See previous comment.

 >
 >> +            .new_event_threshold = 4096,
 >> +    };
 >> +    struct rte_event_queue_conf wkr_q_conf = {
 >> +            .event_queue_cfg = queue_type,
 >> +            .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 >> +            .nb_atomic_flows = 1024,
 >> +            .nb_atomic_order_sequences = 1024,
 >> +    };
 >> +    const struct rte_event_port_conf tx_p_conf = {
 >> +            .dequeue_depth = 128,
 >
 > Same as above

See previous comment.

 >> +            .enqueue_depth = 128,
 >
 > Same as above

See previous comment.

 >> +            .new_event_threshold = 4096,
 >> +    };
 >> +    const struct rte_event_queue_conf tx_q_conf = {
 >> +            .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
 >> +            .event_queue_cfg =
 >> +                    RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
 >> +                    RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
 >> +            .nb_atomic_flows = 1024,
 >> +            .nb_atomic_order_sequences = 1024,
 >> +    };
 >> +
 >> +    struct port_link worker_queues[MAX_NUM_STAGES];
 >> +    struct port_link tx_queue;
 >> +    unsigned i;
 >> +
 >> +    int ret, ndev = rte_event_dev_count();
 >> +    if (ndev < 1) {
 >> +        printf("%d: No Eventdev Devices Found\n", __LINE__);
 >> +        return -1;
 >> +    }
 >> +
 >> +    struct rte_event_dev_info dev_info;
 >> +    ret = rte_event_dev_info_get(dev_id, &dev_info);
 >> +    printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 >> +
 >> +    ret = rte_event_dev_configure(dev_id, &config);
 >> +    if (ret < 0)
 >> +        printf("%d: Error configuring device\n", __LINE__)
 >
 > Don't process further with failed configure.
 >

Done.

 >> +
 >> +    /* Q creation - one load balanced per pipeline stage*/
 >> +
 >> +    /* set up one port per worker, linking to all stage queues */
 >> +    for (i = 0; i < num_workers; i++) {
 >> +        struct worker_data *w = &worker_data[i];
 >> +        w->dev_id = dev_id;
 >> +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
 >> +            printf("Error setting up port %d\n", i);
 >> +            return -1;
 >> +        }
 >> +
 >> +        uint32_t s;
 >> +        for (s = 0; s < num_stages; s++) {
 >> +            if (rte_event_port_link(dev_id, i,
 >> +                        &worker_queues[s].queue_id,
 >> +                        &worker_queues[s].priority,
 >> +                        1) != 1) {
 >> +                printf("%d: error creating link for port %d\n",
 >> +                        __LINE__, i);
 >> +                return -1;
 >> +            }
 >> +        }
 >> +        w->port_id = i;
 >> +    }
 >> +    /* port for consumer, linked to TX queue */
 >> +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
 >
 > If ethdev supports MT txq queue support then this port can be linked to
 > worker too. something to consider for future.
 >

Sure. No change for now.

 >> +        printf("Error setting up port %d\n", i);
 >> +        return -1;
 >> +    }
 >> +    if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
 >> +                &tx_queue.priority, 1) != 1) {
 >> +        printf("%d: error creating link for port %d\n",
 >> +                __LINE__, i);
 >> +        return -1;
 >> +    }
 >> +    /* port for producer, no links */
 >> +    const struct rte_event_port_conf rx_p_conf = {
 >> +            .dequeue_depth = 8,
 >> +            .enqueue_depth = 8,
 >
 > same as above issue.You could get default config first and configure.
 >

Done. Checking config.nb_event_port_dequeue_depth and reducing if less.

 >> +            .new_event_threshold = 1200,
 >> +    };
 >> +    if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
 >> +        printf("Error setting up port %d\n", i);
 >> +        return -1;
 >> +    }
 >> +
 >> +    *prod_data = (struct prod_data){.dev_id = dev_id,
 >> +                    .port_id = i + 1,
 >> +                    .qid = qid[0] };
 >> +    *cons_data = (struct cons_data){.dev_id = dev_id,
 >> +                    .port_id = i };
 >> +
 >> +    enqueue_cnt = rte_calloc(0,
 >> +            RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
 >> +            sizeof(enqueue_cnt[0]), 0);
 >> +    dequeue_cnt = rte_calloc(0,
 >> +            RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
 >> +            sizeof(dequeue_cnt[0]), 0);
 >
 > Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used 
anywhere.
 >

Looks like there was an intention to extend this more. And all the app 
does is increment without using. I've removed these two vars to clean up.

 >> +
 >> +    if (rte_event_dev_start(dev_id) < 0) {
 >> +        printf("Error starting eventdev\n");
 >> +        return -1;
 >> +    }
 >> +
 >> +    return dev_id;
 >> +}
 >> +
 >> +static void
 >> +signal_handler(int signum)
 >> +{
 >> +    if (done || prod_stop)
 >
 > I think, No one updating the prod_stop
 >

Removed.


 >> +        rte_exit(1, "Exiting on signal %d\n", signum);
 >> +    if (signum == SIGINT || signum == SIGTERM) {
 >> +        printf("\n\nSignal %d received, preparing to exit...\n",
 >> +                signum);
 >> +        done = 1;
 >> +    }
 >> +    if (signum == SIGTSTP)
 >> +        rte_event_dev_dump(0, stdout);
 >> +}
 >> +
 >> +int
 >> +main(int argc, char **argv)
 >
 > [...]
 >
 >> +       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 >> +               if (lcore_id >= MAX_NUM_CORE)
 >> +                       break;
 >> +
 >> +               if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
 >> +                   !tx_core[lcore_id] && !sched_core[lcore_id])
 >> +                       continue;
 >> +
 >> +               if (rx_core[lcore_id])
 >> +                       printf(
 >> +                                "[%s()] lcore %d executing NIC Rx, 
and using eventdev port %u\n",
 >> +                                __func__, lcore_id, prod_data.port_id);
 >
 > These prints wont show if rx,tx, scheduler running on master core(as we
 > are browsing through RTE_LCORE_FOREACH_SLAVE)

OK, changed to RTE_LCORE_FOREACH, which also includes the master core.

 >
 >> +
 >> +    if (!quiet) {
 >> +        printf("\nPort Workload distribution:\n");
 >> +        uint32_t i;
 >> +        uint64_t tot_pkts = 0;
 >> +        uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
 >> +        for (i = 0; i < num_workers; i++) {
 >> +            char statname[64];
 >> +            snprintf(statname, sizeof(statname), "port_%u_rx",
 >> +                    worker_data[i].port_id);
 >
 > Please check "port_%u_rx" xstat availability with PMD first.

Added a check after rte_event_dev_xstats_by_name_get() to see if it's 
not -ENOTSUP.

 >> +            pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
 >> +                    dev_id, statname, NULL);
 >> +            tot_pkts += pkts_per_wkr[i];
 >> +        }
 >> +        for (i = 0; i < num_workers; i++) {
 >> +            float pc = pkts_per_wkr[i]  * 100 /
 >> +                ((float)tot_pkts);
 >> +            printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
 >> +                    i, pc, pkts_per_wkr[i]);
 >> +        }
 >> +
 >> +    }
 >> +
 >> +    return 0;
 >> +}
 >
 > As final note, considering the different options in fastpath, I was
 > thinking like introducing app/test-eventdev like app/testpmd and have
 > set of function pointers# for different modes like "macswap", "txonly"
 > in testpmd to exercise different options and framework for adding new use
 > cases.I will work on that to check the feasibility.
 >
 > ##
 > struct fwd_engine {
 >         const char       *fwd_mode_name; /**< Forwarding mode name. */
 >         port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special 
to do. */
 >         port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special 
to do. */
 >         packet_fwd_t     packet_fwd;     /**< Mandatory. */
 > };
 >
 >> --
 >> 2.7.4
 >>



More information about the dev mailing list