[dpdk-dev] [PATCH 1/3] examples/eventdev_pipeline: added sample app

Jerin Jacob jerin.jacob at caviumnetworks.com
Wed May 10 16:12:13 CEST 2017


-----Original Message-----
> Date: Fri, 21 Apr 2017 10:51:37 +0100
> From: Harry van Haaren <harry.van.haaren at intel.com>
> To: dev at dpdk.org
> CC: jerin.jacob at caviumnetworks.com, Harry van Haaren
>  <harry.van.haaren at intel.com>, Gage Eads <gage.eads at intel.com>, Bruce
>  Richardson <bruce.richardson at intel.com>
> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
> X-Mailer: git-send-email 2.7.4
> 
> This commit adds a sample app for the eventdev library.
> The app has been tested with DPDK 17.05-rc2, hence this
> release (or later) is recommended.
> 
> The sample app showcases a pipeline processing use-case,
> with event scheduling and processing defined per stage.
> The application recieves traffic as normal, with each
> packet traversing the pipeline. Once the packet has
> been processed by each of the pipeline stages, it is
> transmitted again.
> 
> The app provides a framework to utilize cores for a single
> role or multiple roles. Examples of roles are the RX core,
> TX core, Scheduling core (in the case of the event/sw PMD),
> and worker cores.
> 
> Various flags are available to configure numbers of stages,
> cycles of work at each stage, type of scheduling, number of
> worker cores, queue depths etc. For a full explaination,
> please refer to the documentation.
> 
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>

Thanks for the example application to share the SW view.
I could make it run on HW after some tweaking(not optimized though)

[...]
> +#define MAX_NUM_STAGES 8
> +#define BATCH_SIZE 16
> +#define MAX_NUM_CORE 64

How about RTE_MAX_LCORE?

> +
> +static unsigned int active_cores;
> +static unsigned int num_workers;
> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
> +static unsigned int num_fids = 512;
> +static unsigned int num_priorities = 1;

looks like its not used.

> +static unsigned int num_stages = 1;
> +static unsigned int worker_cq_depth = 16;
> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
> +static int16_t qid[MAX_NUM_STAGES] = {-1};

Moving all fastpath related variables under a structure with cache
aligned will help.

> +static int worker_cycles;
> +static int enable_queue_priorities;
> +
> +struct prod_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +	int32_t qid;
> +	unsigned num_nic_ports;
> +};

cache aligned ?

> +
> +struct cons_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +};
> +

cache aligned ?

> +static struct prod_data prod_data;
> +static struct cons_data cons_data;
> +
> +struct worker_data {
> +	uint8_t dev_id;
> +	uint8_t port_id;
> +};

cache aligned ?

> +
> +static unsigned *enqueue_cnt;
> +static unsigned *dequeue_cnt;
> +
> +static volatile int done;
> +static volatile int prod_stop;

No one updating the prod_stop.

> +static int quiet;
> +static int dump_dev;
> +static int dump_dev_signal;
> +
> +static uint32_t rx_lock;
> +static uint32_t tx_lock;
> +static uint32_t sched_lock;
> +static bool rx_single;
> +static bool tx_single;
> +static bool sched_single;
> +
> +static unsigned rx_core[MAX_NUM_CORE];
> +static unsigned tx_core[MAX_NUM_CORE];
> +static unsigned sched_core[MAX_NUM_CORE];
> +static unsigned worker_core[MAX_NUM_CORE];
> +
> +static bool
> +core_in_use(unsigned lcore_id) {
> +	return (rx_core[lcore_id] || sched_core[lcore_id] ||
> +		tx_core[lcore_id] || worker_core[lcore_id]);
> +}
> +
> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
> +
> +static void
> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
> +			void *userdata)

IMO, It is better to not use rte_eth_* for application functions.

> +{
> +	int port_id = (uintptr_t) userdata;
> +	unsigned _sent = 0;
> +
> +	do {
> +		/* Note: hard-coded TX queue */
> +		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
> +					  unsent - _sent);
> +	} while (_sent != unsent);
> +}
> +
> +static int
> +consumer(void)
> +{
> +	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
> +	struct rte_event packets[BATCH_SIZE];
> +
> +	static uint64_t npackets;
> +	static uint64_t received;
> +	static uint64_t received_printed;
> +	static uint64_t time_printed;
> +	static uint64_t start_time;
> +	unsigned i, j;
> +	uint8_t dev_id = cons_data.dev_id;
> +	uint8_t port_id = cons_data.port_id;
> +
> +	if (!npackets)
> +		npackets = num_packets;
> +
> +	do {
> +		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
> +				packets, RTE_DIM(packets), 0);

		const uint16_t n =
> +
> +		if (n == 0) {
> +			for (j = 0; j < rte_eth_dev_count(); j++)
> +				rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
> +			return 0;
> +		}
> +		if (start_time == 0)
> +			time_printed = start_time = rte_get_timer_cycles();
> +
> +		received += n;
> +		for (i = 0; i < n; i++) {
> +			uint8_t outport = packets[i].mbuf->port;
> +			rte_eth_tx_buffer(outport, 0, tx_buf[outport],
> +					packets[i].mbuf);
> +		}
> +
> +		if (!quiet && received >= received_printed + (1<<22)) {
> +			const uint64_t now = rte_get_timer_cycles();
> +			const uint64_t delta_cycles = now - start_time;
> +			const uint64_t elapsed_ms = delta_cycles / freq_khz;
> +			const uint64_t interval_ms =
> +					(now - time_printed) / freq_khz;
> +
> +			uint64_t rx_noprint = received - received_printed;
> +			printf("# consumer RX=%"PRIu64", time %"PRIu64
> +				"ms, avg %.3f mpps [current %.3f mpps]\n",
> +					received, elapsed_ms,
> +					(received) / (elapsed_ms * 1000.0),
> +					rx_noprint / (interval_ms * 1000.0));
> +			received_printed = received;
> +			time_printed = now;
> +		}
> +
> +		dequeue_cnt[0] += n;
> +
> +		if (num_packets > 0 && npackets > 0) {
> +			npackets -= n;
> +			if (npackets == 0 || npackets > num_packets)
> +				done = 1;
> +		}

Looks like very complicated logic.I think we can simplify it.

> +	} while (0);

do while(0); really required here?

> +
> +	return 0;
> +}
> +
> +static int
> +producer(void)
> +{
> +	static uint8_t eth_port;
> +	struct rte_mbuf *mbufs[BATCH_SIZE];
> +	struct rte_event ev[BATCH_SIZE];
> +	uint32_t i, num_ports = prod_data.num_nic_ports;
> +	int32_t qid = prod_data.qid;
> +	uint8_t dev_id = prod_data.dev_id;
> +	uint8_t port_id = prod_data.port_id;
> +	uint32_t prio_idx = 0;
> +
> +	const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
> +	if (++eth_port == num_ports)
> +		eth_port = 0;
> +	if (nb_rx == 0) {
> +		rte_pause();
> +		return 0;
> +	}
> +
> +	for (i = 0; i < nb_rx; i++) {
> +		ev[i].flow_id = mbufs[i]->hash.rss;

prefetching the buff[i+1] may help here?

> +		ev[i].op = RTE_EVENT_OP_NEW;
> +		ev[i].sched_type = queue_type;

The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED. So, we
cannot assign .sched_type as queue_type.

I think, one option could be to avoid translation in application is to
- Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
- Introduce a new RTE_EVENT_DEV_CAP_ to denote RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
ability
- add sched_type in struct rte_event_queue_conf. If capability flag is
  not set then implementation takes sched_type value for the queue.

Any thoughts?


> +		ev[i].queue_id = qid;
> +		ev[i].event_type = RTE_EVENT_TYPE_CPU;

IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
producing the Ethernet packets/events.

> +		ev[i].sub_event_type = 0;
> +		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
> +		ev[i].mbuf = mbufs[i];
> +		RTE_SET_USED(prio_idx);
> +	}
> +
> +	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);

For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do burst
operation unlike FORWARD case(which is one event at a time).Earlier, I
thought I can abstract the producer pattern in PMD, but it looks like we
are going with application driven producer model based on latest RFC.So I think,
we can add one flag to rte_event_enqueue_burst to denote all the events
are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.

I can send a patch for the same.

Any thoughts?


> +	if (nb_tx != nb_rx) {
> +		for (i = nb_tx; i < nb_rx; i++)
> +			rte_pktmbuf_free(mbufs[i]);
> +	}
> +	enqueue_cnt[0] += nb_tx;
> +
> +	if (unlikely(prod_stop))

I think, No one updating the prod_stop

> +		done = 1;
> +
> +	return 0;
> +}
> +
> +static inline void
> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
> +{
> +	if (rx_core[lcore_id] && (rx_single ||
> +	    rte_atomic32_cmpset(&rx_lock, 0, 1))) {

This pattern(rte_atomic32_cmpset) makes application can inject only
"one core" worth of packets. Not enough for low-end cores. May be we need
multiple producer options. I think, new RFC is addressing it.

> +		producer();
> +		rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
> +	}
> +
> +	if (sched_core[lcore_id] && (sched_single ||
> +	    rte_atomic32_cmpset(&sched_lock, 0, 1))) {
> +		rte_event_schedule(dev_id);
> +		if (dump_dev_signal) {
> +			rte_event_dev_dump(0, stdout);
> +			dump_dev_signal = 0;
> +		}
> +		rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
> +	}

Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set.

I think, We can make common code with compile time aware and make
runtime workers based on the flag..
i.e
rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id);
rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id);

May we can improve after initial version.

> +
> +	if (tx_core[lcore_id] && (tx_single ||
> +	    rte_atomic32_cmpset(&tx_lock, 0, 1))) {
> +		consumer();

Should consumer() need to come in this pattern? I am thinking like
if events is from last stage then call consumer() in worker()

I think, above scheme works better when the _same_ worker code need to run the
case where
1) ethdev HW is capable to enqueuing the packets to same txq from
  multiple thread
2) ethdev is not capable to do so.

So, The above cases can be addressed in configuration time where we link
the queues to port
case 1) Link all workers to last queue
case 2) Link only worker to last queue

and keeping the common worker code.

HW implementation has functional and performance issue if "two" ports are
assigned to one lcore for dequeue. The above scheme fixes that problem too.

> +		rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
> +	}
> +}
> +
> +static int
> +worker(void *arg)
> +{
> +	struct rte_event events[BATCH_SIZE];
> +
> +	struct worker_data *data = (struct worker_data *)arg;
> +	uint8_t dev_id = data->dev_id;
> +	uint8_t port_id = data->port_id;
> +	size_t sent = 0, received = 0;
> +	unsigned lcore_id = rte_lcore_id();
> +
> +	while (!done) {
> +		uint16_t i;
> +
> +		schedule_devices(dev_id, lcore_id);
> +
> +		if (!worker_core[lcore_id]) {
> +			rte_pause();
> +			continue;
> +		}
> +
> +		uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
> +				events, RTE_DIM(events), 0);
> +
> +		if (nb_rx == 0) {
> +			rte_pause();
> +			continue;
> +		}
> +		received += nb_rx;
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			struct ether_hdr *eth;
> +			struct ether_addr addr;
> +			struct rte_mbuf *m = events[i].mbuf;
> +
> +			/* The first worker stage does classification */
> +			if (events[i].queue_id == qid[0])
> +				events[i].flow_id = m->hash.rss % num_fids;

Not sure why we need do(shrinking the flows) this in worker() in queue based pipeline.
If an PMD has any specific requirement on num_fids,I think, we
can move this configuration stage or PMD can choose optimum fid internally to
avoid modulus operation tax in fastpath in all PMD.

Does struct rte_event_queue_conf.nb_atomic_flows help here?

> +
> +			events[i].queue_id = next_qid[events[i].queue_id];
> +			events[i].op = RTE_EVENT_OP_FORWARD;

missing events[i].sched_type.HW PMD does not work with this.
I think, we can use similar scheme like next_qid for next_sched_type.

> +
> +			/* change mac addresses on packet (to use mbuf data) */
> +			eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
> +			ether_addr_copy(&eth->d_addr, &addr);
> +			ether_addr_copy(&eth->s_addr, &eth->d_addr);
> +			ether_addr_copy(&addr, &eth->s_addr);

IMO, We can make packet processing code code as "static inline function" so
different worker types can reuse.

> +
> +			/* do a number of cycles of work per packet */
> +			volatile uint64_t start_tsc = rte_rdtsc();
> +			while (rte_rdtsc() < start_tsc + worker_cycles)
> +				rte_pause();

Ditto.

I think, All worker specific variables like "worker_cycles" can moved into
one structure and use.

> +		}
> +		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
> +				events, nb_rx);
> +		while (nb_tx < nb_rx && !done)
> +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> +							events + nb_tx,
> +							nb_rx - nb_tx);
> +		sent += nb_tx;
> +	}
> +
> +	if (!quiet)
> +		printf("  worker %u thread done. RX=%zu TX=%zu\n",
> +				rte_lcore_id(), received, sent);
> +
> +	return 0;
> +}
> +
> +/*
> + * Parse the coremask given as argument (hexadecimal string) and fill
> + * the global configuration (core role and core count) with the parsed
> + * value.
> + */
> +static int xdigit2val(unsigned char c)

multiple instance of "xdigit2val" in DPDK repo. May be we can push this
as common code.

> +{
> +	int val;
> +
> +	if (isdigit(c))
> +		val = c - '0';
> +	else if (isupper(c))
> +		val = c - 'A' + 10;
> +	else
> +		val = c - 'a' + 10;
> +	return val;
> +}
> +
> +
> +static void
> +usage(void)
> +{
> +	const char *usage_str =
> +		"  Usage: eventdev_demo [options]\n"
> +		"  Options:\n"
> +		"  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
> +		"  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"

I think, this parameter now, effects the application fast path code.I think,
it should eventdev configuration para-mater.

> +		"  -s, --num_stages=N           Use N atomic stages (default 1)\n"
> +		"  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
> +		"  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
> +		"  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
> +		"  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
> +		"  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
> +		"  -W  --work-cycles=N          Worker cycles (default 0)\n"
> +		"  -P  --queue-priority         Enable scheduler queue prioritization\n"
> +		"  -o, --ordered                Use ordered scheduling\n"
> +		"  -p, --parallel               Use parallel scheduling\n"

IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
operation. It is valid have to any combination. We need to express that in
command like
example:
3 stage with
O->A->P

> +		"  -q, --quiet                  Minimize printed output\n"
> +		"  -D, --dump                   Print detailed statistics before exit"
> +		"\n";
> +	fprintf(stderr, "%s", usage_str);
> +	exit(1);
> +}
> +

[...]

> +			rx_single = (popcnt == 1);
> +			break;
> +		case 't':
> +			tx_lcore_mask = parse_coremask(optarg);
> +			popcnt = __builtin_popcountll(tx_lcore_mask);
> +			tx_single = (popcnt == 1);
> +			break;
> +		case 'e':
> +			sched_lcore_mask = parse_coremask(optarg);
> +			popcnt = __builtin_popcountll(sched_lcore_mask);
> +			sched_single = (popcnt == 1);
> +			break;
> +		default:
> +			usage();
> +		}
> +	}
> +
> +	if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
> +	    sched_lcore_mask == 0 || tx_lcore_mask == 0) {

Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask
is zero can be valid case.


> +		printf("Core part of pipeline was not assigned any cores. "
> +			"This will stall the pipeline, please check core masks "
> +			"(use -h for details on setting core masks):\n"
> +			"\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
> +			"\n\tworkers: %"PRIu64"\n",
> +			rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
> +			worker_lcore_mask);
> +		rte_exit(-1, "Fix core masks\n");
> +	}
> +	if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
> +		usage();
> +
> +	for (i = 0; i < MAX_NUM_CORE; i++) {
> +		rx_core[i] = !!(rx_lcore_mask & (1UL << i));
> +		tx_core[i] = !!(tx_lcore_mask & (1UL << i));
> +		sched_core[i] = !!(sched_lcore_mask & (1UL << i));
> +		worker_core[i] = !!(worker_lcore_mask & (1UL << i));
> +
> +		if (worker_core[i])
> +			num_workers++;
> +		if (core_in_use(i))
> +			active_cores++;
> +	}
> +}
> +
> +
> +struct port_link {
> +	uint8_t queue_id;
> +	uint8_t priority;
> +};
> +
> +static int
> +setup_eventdev(struct prod_data *prod_data,
> +		struct cons_data *cons_data,
> +		struct worker_data *worker_data)
> +{
> +	const uint8_t dev_id = 0;
> +	/* +1 stages is for a SINGLE_LINK TX stage */
> +	const uint8_t nb_queues = num_stages + 1;
> +	/* + 2 is one port for producer and one for consumer */
> +	const uint8_t nb_ports = num_workers + 2;

selection of number of ports is a function of rte_event_has_producer().
I think, it will be addressed with RFC.

> +	const struct rte_event_dev_config config = {
> +			.nb_event_queues = nb_queues,
> +			.nb_event_ports = nb_ports,
> +			.nb_events_limit  = 4096,
> +			.nb_event_queue_flows = 1024,
> +			.nb_event_port_dequeue_depth = 128,
> +			.nb_event_port_enqueue_depth = 128,

OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and
.nb_event_port_enqueue_depth = 1 and  struct rte_event_dev_info.min_dequeue_timeout_ns
= 853 value.
I think, we need to check the rte_event_dev_info_get() first to get the sane
values and take RTE_MIN or RTE_MAX based on the use case.

or

I can ignore this value in OCTEONTX PMD. But I am not sure NXP case,
Any thoughts from NXP folks.


> +	};
> +	const struct rte_event_port_conf wkr_p_conf = {
> +			.dequeue_depth = worker_cq_depth,

Same as above

> +			.enqueue_depth = 64,

Same as above

> +			.new_event_threshold = 4096,
> +	};
> +	struct rte_event_queue_conf wkr_q_conf = {
> +			.event_queue_cfg = queue_type,
> +			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
> +			.nb_atomic_flows = 1024,
> +			.nb_atomic_order_sequences = 1024,
> +	};
> +	const struct rte_event_port_conf tx_p_conf = {
> +			.dequeue_depth = 128,

Same as above
> +			.enqueue_depth = 128,

Same as above

> +			.new_event_threshold = 4096,
> +	};
> +	const struct rte_event_queue_conf tx_q_conf = {
> +			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
> +			.event_queue_cfg =
> +					RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
> +					RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
> +			.nb_atomic_flows = 1024,
> +			.nb_atomic_order_sequences = 1024,
> +	};
> +
> +	struct port_link worker_queues[MAX_NUM_STAGES];
> +	struct port_link tx_queue;
> +	unsigned i;
> +
> +	int ret, ndev = rte_event_dev_count();
> +	if (ndev < 1) {
> +		printf("%d: No Eventdev Devices Found\n", __LINE__);
> +		return -1;
> +	}
> +
> +	struct rte_event_dev_info dev_info;
> +	ret = rte_event_dev_info_get(dev_id, &dev_info);
> +	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
> +
> +	ret = rte_event_dev_configure(dev_id, &config);
> +	if (ret < 0)
> +		printf("%d: Error configuring device\n", __LINE__)

Don't process further with failed configure.

> +
> +	/* Q creation - one load balanced per pipeline stage*/
> +
> +	/* set up one port per worker, linking to all stage queues */
> +	for (i = 0; i < num_workers; i++) {
> +		struct worker_data *w = &worker_data[i];
> +		w->dev_id = dev_id;
> +		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
> +			printf("Error setting up port %d\n", i);
> +			return -1;
> +		}
> +
> +		uint32_t s;
> +		for (s = 0; s < num_stages; s++) {
> +			if (rte_event_port_link(dev_id, i,
> +						&worker_queues[s].queue_id,
> +						&worker_queues[s].priority,
> +						1) != 1) {
> +				printf("%d: error creating link for port %d\n",
> +						__LINE__, i);
> +				return -1;
> +			}
> +		}
> +		w->port_id = i;
> +	}
> +	/* port for consumer, linked to TX queue */
> +	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {

If ethdev supports MT txq queue support then this port can be linked to
worker too. something to consider for future.

> +		printf("Error setting up port %d\n", i);
> +		return -1;
> +	}
> +	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
> +				&tx_queue.priority, 1) != 1) {
> +		printf("%d: error creating link for port %d\n",
> +				__LINE__, i);
> +		return -1;
> +	}
> +	/* port for producer, no links */
> +	const struct rte_event_port_conf rx_p_conf = {
> +			.dequeue_depth = 8,
> +			.enqueue_depth = 8,

same as above issue.You could get default config first and configure.

> +			.new_event_threshold = 1200,
> +	};
> +	if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
> +		printf("Error setting up port %d\n", i);
> +		return -1;
> +	}
> +
> +	*prod_data = (struct prod_data){.dev_id = dev_id,
> +					.port_id = i + 1,
> +					.qid = qid[0] };
> +	*cons_data = (struct cons_data){.dev_id = dev_id,
> +					.port_id = i };
> +
> +	enqueue_cnt = rte_calloc(0,
> +			RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
> +			sizeof(enqueue_cnt[0]), 0);
> +	dequeue_cnt = rte_calloc(0,
> +			RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
> +			sizeof(dequeue_cnt[0]), 0);

Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used anywhere.

> +
> +	if (rte_event_dev_start(dev_id) < 0) {
> +		printf("Error starting eventdev\n");
> +		return -1;
> +	}
> +
> +	return dev_id;
> +}
> +
> +static void
> +signal_handler(int signum)
> +{
> +	if (done || prod_stop)

I think, No one updating the prod_stop

> +		rte_exit(1, "Exiting on signal %d\n", signum);
> +	if (signum == SIGINT || signum == SIGTERM) {
> +		printf("\n\nSignal %d received, preparing to exit...\n",
> +				signum);
> +		done = 1;
> +	}
> +	if (signum == SIGTSTP)
> +		rte_event_dev_dump(0, stdout);
> +}
> +
> +int
> +main(int argc, char **argv)

[...]

> +       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
> +               if (lcore_id >= MAX_NUM_CORE)
> +                       break;
> +
> +               if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
> +                   !tx_core[lcore_id] && !sched_core[lcore_id])
> +                       continue;
> +
> +               if (rx_core[lcore_id])
> +                       printf(
> +                                "[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n",
> +                                __func__, lcore_id, prod_data.port_id);

These prints wont show if rx,tx, scheduler running on master core(as we
are browsing through RTE_LCORE_FOREACH_SLAVE)

> +
> +	if (!quiet) {
> +		printf("\nPort Workload distribution:\n");
> +		uint32_t i;
> +		uint64_t tot_pkts = 0;
> +		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
> +		for (i = 0; i < num_workers; i++) {
> +			char statname[64];
> +			snprintf(statname, sizeof(statname), "port_%u_rx",
> +					worker_data[i].port_id);

Please check "port_%u_rx" xstat availability with PMD first.

> +			pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
> +					dev_id, statname, NULL);
> +			tot_pkts += pkts_per_wkr[i];
> +		}
> +		for (i = 0; i < num_workers; i++) {
> +			float pc = pkts_per_wkr[i]  * 100 /
> +				((float)tot_pkts);
> +			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
> +					i, pc, pkts_per_wkr[i]);
> +		}
> +
> +	}
> +
> +	return 0;
> +}

As final note, considering the different options in fastpath, I was
thinking like introducing app/test-eventdev like app/testpmd and have
set of function pointers# for different modes like "macswap", "txonly"
in testpmd to exercise different options and framework for adding new use
cases.I will work on that to check the feasibility.

##
struct fwd_engine {
        const char       *fwd_mode_name; /**< Forwarding mode name. */
        port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */ 
        port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special to do. */ 
        packet_fwd_t     packet_fwd;     /**< Mandatory. */
};

> -- 
> 2.7.4
> 


More information about the dev mailing list