[dpdk-dev] [PATCH 7/7] examples/eventdev_pipeline: adding example

Jerin Jacob jerin.jacob at caviumnetworks.com
Tue Nov 22 07:02:13 CET 2016


On Wed, Nov 16, 2016 at 06:00:07PM +0000, Harry van Haaren wrote:
> This patch adds a sample app to the examples/ directory, which can be used
> as a reference application and for general testing. The application requires
> two ethdev ports and expects traffic to be flowing. The application must be
> run with the --vdev flags as follows to indicate to EAL that a virtual
> eventdev device called "evdev_sw0" is available to be used:
> 
>     ./build/eventdev_pipeline --vdev evdev_sw0
> 
> The general flow of the traffic is as follows:
> 
>     Rx core -> Atomic Queue => 4 worker cores => TX core
> 
> A scheduler core is required to do the packet scheduling, making this
> configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers). Finally
> a master core brings the core count to 8 for this configuration. The

Thanks for the example application.I will try to share my views on
ethdev integration and usability perspective. Hope we can converge.

Some of the high level details first before getting into exact details.

1) From the HW and ethdev integration perspective, The integrated NIC controllers
does not need producer core(s) to push the event/packets to event queue. So, I was
thinking to use 6WIND rte_flow spec to create the "ethdev port to event
queue wiring" connection by extending the output ACTION definition, which
specifies event queue its need to enqueued to for the given ethdev port
(something your are doing in application).

I guess, the producer part of this example can be created as common
code, somewhere in rte_flow/ethdev to reuse. We would need this scheme also
where when we deal with external nics + HW event manager use case

The complete event driven model can be verified and exercised without
integrating with eventdev subsystem. So I think, may be we need to
focus on functional applications without ethdev to verify the eventdev
features like(automatic multicore scaling,  dynamic load balancing, pipelining,
packet ingress order maintenance and synchronization services) and then
integrate with ethdev

> +	const unsigned cores_needed = num_workers +
> +			/*main*/1 +
> +			/*sched*/1 +
> +			/*TX*/1 +
> +			/*RX*/1;
> +

2) One of the prime aims of the event driven model is to remove the fixed
function core mappings and enable automatic multicore scaling,  dynamic load
balancing etc.I will try to use an example in review section to show the
method for removing "consumer core" in this case.

> application can be configured for various numbers of flows and worker
> cores. Run the application with -h for details.
> 
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
> ---
>  examples/eventdev_pipeline/Makefile |  49 +++
>  examples/eventdev_pipeline/main.c   | 718 ++++++++++++++++++++++++++++++++++++
>  2 files changed, 767 insertions(+)
>  create mode 100644 examples/eventdev_pipeline/Makefile
>  create mode 100644 examples/eventdev_pipeline/main.c
> 
> +static int sched_type = RTE_SCHED_TYPE_ATOMIC;

RTE_SCHED_TYPE_ORDERED makes sense as a default. Most common case will
have ORDERD at first stage so that it can scale.

> +
> +
> +static int
> +worker(void *arg)
> +{
> +	struct rte_event rcv_events[BATCH_SIZE];
> +
> +	struct worker_data *data = (struct worker_data *)arg;
> +	uint8_t event_dev_id = data->event_dev_id;
> +	uint8_t event_port_id = data->event_port_id;
> +	int32_t qid = data->qid;
> +	size_t sent = 0, received = 0;
> +
> +	while (!done) {
> +		uint16_t i;
> +
> +		uint16_t n = rte_event_dequeue_burst(event_dev_id,
> +						     event_port_id,
> +						     rcv_events,
> +						     RTE_DIM(rcv_events),
> +						     false);
> +		if (n == 0){
> +			rte_pause();
> +			/* Flush any buffered events */
> +			rte_event_dequeue(event_dev_id,
> +					  event_port_id,
> +					  NULL,
> +					  false);

The above can be done in implementation. May not be the candidate for common code.

> +			continue;
> +		}
> +		received += n;
> +
> +		for (i = 0; i < n; i++) {
> +			struct ether_hdr *eth;
> +			struct ether_addr addr;
> +			struct rte_event *ev = &rcv_events[i];
> +
> +			ev->queue_id = qid;
> +			ev->flow_id = 0;

Another way to deal wit out additional consumer core(it creates issue in
scaling and load balancing) is

in worker:
while(1) {

	ev = dequeue(port);

	// stage 1 app processing
	if (ev.event_type == RTE_EVENT_TYPE_ETHDEV) {
		// identify the Ethernet port and tx queue the packet needs to go
		// create the flow based on that
		ev.flow_id = flow(port_id, tx_queue_id);
		ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
		ev.operation = RTE_EVENT_OP_FORWARD;
		ev.event_type = RTE_EVENT_TYPE_CORE;
	} // stage 2 app processing
	else if (ev.event_type == RTE_EVENT_TYPE_CORE) {
		port_id = function_of(ev.flow_id) ;// look stage 1 processing
		tx_queue_id = function_of(ev.flow_id) //look stage 1 processing
		remaining ethdev based tx is same as yours
	}
	enqueue(ev);
}



> +			ev->priority = 0;
> +			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> +			ev->operation = RTE_EVENT_OP_FORWARD;
> +
> +			uint64_t now = rte_rdtsc();
> +			while(now + 750 > rte_rdtsc()) {}

Why delay ?

> +
> +			/* change mac addresses on packet */
> +			eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *);
> +			ether_addr_copy(&eth->d_addr, &addr);
> +			ether_addr_copy(&eth->s_addr, &eth->d_addr);
> +			ether_addr_copy(&addr, &eth->s_addr);
> +		}
> +		int ret = rte_event_enqueue_burst(event_dev_id, event_port_id,
> +					rcv_events, n, false);
> +		if (ret != n)
> +			rte_panic("worker %u thread failed to enqueue event\n",
> +				rte_lcore_id());
> +	}
> +
> +	/* Flush the buffered events */
> +	rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> +
> +	if (!quiet)
> +		printf("  worker %u thread done. RX=%zu TX=%zu\n",
> +				rte_lcore_id(), received, sent);
> +
> +	return 0;
> +}
> +
> +static int
> +scheduler(void *arg)
> +{

Maybe better to abstract as "service core" or something like I mentioned
earlier, as HW implementation does not need this

> +	RTE_SET_USED(arg);
> +	size_t loops = 0;
> +
> +	while (!done) {
> +		/* Assumes an event dev ID of 0 */
> +		rte_event_schedule(0);
> +		loops++;
> +	}
> +
> +	printf("  scheduler thread done. loops=%zu\n", loops);
> +
> +	return 0;
> +}
> +
> +
> +static int
> +producer(void *arg)
> +{
> +
> +	struct prod_data *data = (struct prod_data *)arg;
> +	size_t npackets = num_packets;
> +	unsigned i;
> +	uint64_t mbuf_seqno = 0;
> +	size_t sent = 0;
> +	uint8_t eth_port = 0;
> +	uint8_t event_dev_id = data->event_dev_id;
> +	uint8_t event_port_id = data->event_port_id;
> +	int fid_counter = 0;
> +
> +	while (!done) {
> +		int ret;
> +		unsigned num_ports = data->num_ports;
> +		int32_t qid = data->qid;
> +		struct rte_event events[BATCH_SIZE];
> +		struct rte_mbuf *mbufs[BATCH_SIZE];
> +
> +		uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
> +		if (++eth_port == num_ports)
> +			eth_port = 0;
> +		if (nb_rx == 0) {
> +			rte_pause();
> +			/* Flush any buffered events */
> +			rte_event_dequeue(event_dev_id,
> +					  event_port_id,
> +					  NULL,
> +					  false);
> +			continue;
> +		}
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			struct rte_mbuf *m = mbufs[i];
> +			struct rte_event *ev = &events[i];
> +
> +			ev->queue_id = qid;
> +			ev->flow_id = fid_counter++ % 6;

To me, flow_id should be a function of port_id and rx queue number here.
right?

> +			ev->priority = 0;
> +			m->udata64 = mbuf_seqno++;

Why update mbuf_seqno++ here. Shouldn't be something inside the
implementation?

> +			ev->mbuf = m;
> +			ev->sched_type = sched_type;
> +			ev->operation = RTE_EVENT_OP_NEW;
> +		}
> +
> +		do {
> +			ret = rte_event_enqueue_burst(event_dev_id,
> +							event_port_id,
> +							events,
> +							nb_rx,
> +							false);
> +		} while (ret == -ENOSPC);

I guess, -ENOSPC can be checked inside the implementation. I guess, we
can pass the info required in the configuration stage to decide the timeout. May
not be the candidate for common code.

> +		if (ret != nb_rx)
> +			rte_panic("producer thread failed to enqueue *all* events\n");
> +
> +		sent += nb_rx;
> +
> +		if (num_packets > 0 && npackets > 0) {
> +			npackets -= nb_rx;
> +			if (npackets == 0)
> +				break;
> +		}
> +	}
> +
> +	/* Flush any buffered events */
> +	while (!done)
> +		rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> +
> +	printf("  prod thread done! TX=%zu across %u flows\n", sent, num_fids);
> +
> +	return 0;
> +}
> +

> +static uint8_t
> +setup_event_dev(struct prod_data *prod_data,
> +		struct cons_data *cons_data,
> +		struct worker_data *worker_data)
> +{
> +	config.nb_events_limit = 256;

In real application, we may need to pass as command line

> +	config.dequeue_wait_ns = 0;
> +
> +	ret = rte_event_dev_configure(id, &config);
> +	if (ret)
> +		rte_panic("Failed to configure the event dev\n");
> +
> +	/* Create queues */
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> +	queue_config.priority = 0;
> +
> +	qid0 = 0;
> +	ret = rte_event_queue_setup(id, qid0, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the scheduled QID\n");
> +
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
> +	queue_config.priority = 0;
> +
> +	cons_qid = 1;
> +	ret = rte_event_queue_setup(id, cons_qid, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the cons directed QID\n");
> +
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;

I guess its more of RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER case, Does it
make sense to add RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER in spec, if you are
enqueueing only through that port. see next comment.

> +	queue_config.priority = 0;
> +
> +	prod_qid = 2;
> +	ret = rte_event_queue_setup(id, prod_qid, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the prod directed QID\n");
> +

Looks like prod_qid is just created as a dummy, The actual producer is
en-queuing on qid0.Something not adding up.

> +	/* Create ports */
> +#define LB_PORT_DEPTH 16
> +#define DIR_PORT_DEPTH 32
> +	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
> +	port_config.dequeue_queue_depth = LB_PORT_DEPTH;

We need to check the info->max_enqueue_queue_depth.

Jerin



More information about the dev mailing list