[dpdk-dev] [PATCH v6 02/12] event/opdl: add opdl PMD main body and helper function

Liang Ma liang.j.ma at intel.com
Tue Jan 9 15:18:51 CET 2018


This commit adds a OPDL implementation of the eventdev API. The
implementation here is intended to enable the community to use
the OPDL infrastructure under eventdev API.

The main components of the implementation is three files:
  - opdl_evdev.c              Creation, configuration, etc
  - opdl_evdev_xstats.c       helper function to support stats collection
  - opdl_evdev.h              include the main data structure of opdl
                              device and all the function prototype
                              need to be exposed to support eventdev API.

  - opdl_evdev_init.c         implement all initailization helper function

This commit only adds the implementation, no existing DPDK files
are modified.

Signed-off-by: Liang Ma <liang.j.ma at intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy at intel.com>
---
 drivers/event/opdl/Makefile            |   3 +
 drivers/event/opdl/opdl_evdev.c        | 440 ++++++++++++++++
 drivers/event/opdl/opdl_evdev.h        | 341 ++++++++++++
 drivers/event/opdl/opdl_evdev_init.c   | 936 +++++++++++++++++++++++++++++++++
 drivers/event/opdl/opdl_evdev_xstats.c | 178 +++++++
 5 files changed, 1898 insertions(+)
 create mode 100644 drivers/event/opdl/opdl_evdev.c
 create mode 100644 drivers/event/opdl/opdl_evdev.h
 create mode 100644 drivers/event/opdl/opdl_evdev_init.c
 create mode 100644 drivers/event/opdl/opdl_evdev_xstats.c

diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile
index 0c975b4..b90214f 100644
--- a/drivers/event/opdl/Makefile
+++ b/drivers/event/opdl/Makefile
@@ -28,6 +28,9 @@ EXPORT_MAP := rte_pmd_evdev_opdl_version.map
 
 # library source files
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_init.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_xstats.c
 
 # export include files
 SYMLINK-y-include +=
diff --git a/drivers/event/opdl/opdl_evdev.c b/drivers/event/opdl/opdl_evdev.c
new file mode 100644
index 0000000..cad000a
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.c
@@ -0,0 +1,440 @@
+/*-
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_lcore.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_errno.h>
+#include <rte_cycles.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+#include "opdl_log.h"
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define NUMA_NODE_ARG "numa_node"
+#define DO_VALIDATION_ARG "do_validation"
+#define DO_TEST_ARG "self_test"
+
+
+static void
+opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
+
+
+
+
+static int
+opdl_dev_configure(const struct rte_eventdev *dev)
+{
+	struct opdl_evdev *opdl = opdl_pmd_priv(dev);
+	const struct rte_eventdev_data *data = dev->data;
+	const struct rte_event_dev_config *conf = &data->dev_conf;
+
+	opdl->max_queue_nb = conf->nb_event_queues;
+	opdl->max_port_nb = conf->nb_event_ports;
+	opdl->nb_events_limit = conf->nb_events_limit;
+
+	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
+		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+			     "DEQUEUE_TIMEOUT not supported\n",
+			     dev->data->dev_id);
+		return -ENOTSUP;
+	}
+
+	return 0;
+}
+
+static void
+opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
+{
+	RTE_SET_USED(dev);
+
+	static const struct rte_event_dev_info evdev_opdl_info = {
+		.driver_name = OPDL_PMD_NAME,
+		.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
+		.max_event_queue_flows = OPDL_QID_NUM_FIDS,
+		.max_event_queue_priority_levels = OPDL_Q_PRIORITY_MAX,
+		.max_event_priority_levels = OPDL_IQS_MAX,
+		.max_event_ports = OPDL_PORTS_MAX,
+		.max_event_port_dequeue_depth = MAX_OPDL_CONS_Q_DEPTH,
+		.max_event_port_enqueue_depth = MAX_OPDL_CONS_Q_DEPTH,
+		.max_num_events = OPDL_INFLIGHT_EVENTS_TOTAL,
+		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE,
+	};
+
+	*info = evdev_opdl_info;
+}
+
+static void
+opdl_dump(struct rte_eventdev *dev, FILE *f)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return;
+
+	fprintf(f,
+		"\n\n -- RING STATISTICS --\n");
+
+	for (uint32_t i = 0; i < device->nb_opdls; i++)
+		opdl_ring_dump(device->opdl[i], f);
+
+	fprintf(f,
+		"\n\n -- PORT STATISTICS --\n"
+		"Type Port Index  Port Id  Queue Id     Av. Req Size  "
+		"Av. Grant Size     Av. Cycles PP"
+		"      Empty DEQs   Non Empty DEQs   Pkts Processed\n");
+
+	for (uint32_t i = 0; i < device->max_port_nb; i++) {
+		char queue_id[64];
+		char total_cyc[64];
+		const char *p_type;
+
+		uint64_t cne, cpg;
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->initialized) {
+			cne = port->port_stat[claim_non_empty];
+			cpg = port->port_stat[claim_pkts_granted];
+			if (port->p_type == OPDL_REGULAR_PORT)
+				p_type = "REG";
+			else if (port->p_type == OPDL_PURE_RX_PORT)
+				p_type = "  RX";
+			else if (port->p_type == OPDL_PURE_TX_PORT)
+				p_type = "  TX";
+			else if (port->p_type == OPDL_ASYNC_PORT)
+				p_type = "SYNC";
+			else
+				p_type = "????";
+
+			sprintf(queue_id, "%02u", port->external_qid);
+			if (port->p_type == OPDL_REGULAR_PORT ||
+					port->p_type == OPDL_ASYNC_PORT)
+				sprintf(total_cyc,
+					" %'16"PRIu64"",
+					(cpg != 0 ?
+					 port->port_stat[total_cycles] / cpg
+					 : 0));
+			else
+				sprintf(total_cyc,
+					"             ----");
+			fprintf(f,
+				"%4s %10u %8u %9s %'16"PRIu64" %'16"PRIu64" %s "
+				"%'16"PRIu64" %'16"PRIu64" %'16"PRIu64"\n",
+				p_type,
+				i,
+				port->id,
+				(port->external_qid == OPDL_INVALID_QID ? "---"
+				 : queue_id),
+				(cne != 0 ?
+				 port->port_stat[claim_pkts_requested] / cne
+				 : 0),
+				(cne != 0 ?
+				 port->port_stat[claim_pkts_granted] / cne
+				 : 0),
+				total_cyc,
+				port->port_stat[claim_empty],
+				port->port_stat[claim_non_empty],
+				port->port_stat[claim_pkts_granted]);
+		}
+	}
+	fprintf(f, "\n");
+}
+
+
+static void
+opdl_stop(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	opdl_xstats_uninit(dev);
+
+	destroy_queues_and_rings(dev);
+
+
+	device->started = 0;
+
+	rte_smp_wmb();
+}
+
+static int
+opdl_start(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	if (!err)
+		err = create_queues_and_rings(dev);
+
+
+	if (!err)
+		err = assign_internal_queue_ids(dev);
+
+
+	if (!err)
+		err = initialise_queue_zero_ports(dev);
+
+
+	if (!err)
+		err = initialise_all_other_ports(dev);
+
+
+	if (!err)
+		err = check_queues_linked(dev);
+
+
+	if (!err)
+		err = opdl_add_event_handlers(dev);
+
+
+	if (!err)
+		err = build_all_dependencies(dev);
+
+	if (!err) {
+		opdl_xstats_init(dev);
+
+		struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+		PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
+			      "SUCCESS : Created %u total queues (%u ex, %u in),"
+			      " %u opdls, %u event_dev ports, %u input ports",
+			      opdl_pmd_dev_id(device),
+			      device->nb_queues,
+			      (device->nb_queues - device->nb_opdls),
+			      device->nb_opdls,
+			      device->nb_opdls,
+			      device->nb_ports,
+			      device->queue[0].nb_ports);
+	} else
+		opdl_stop(dev);
+
+	return err;
+}
+
+static int
+opdl_close(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+		memset(&device->ports[i],
+		       0,
+		       sizeof(struct opdl_port));
+	}
+
+	memset(&device->s_md,
+			0x0,
+			sizeof(struct opdl_stage_meta_data)*OPDL_PORTS_MAX);
+
+	memset(&device->q_md,
+			0xFF,
+			sizeof(struct opdl_queue_meta_data)*OPDL_MAX_QUEUES);
+
+
+	memset(device->q_map_ex_to_in,
+			0,
+			sizeof(uint8_t)*OPDL_INVALID_QID);
+
+	opdl_xstats_uninit(dev);
+
+	device->max_port_nb = 0;
+
+	device->max_queue_nb = 0;
+
+	device->nb_opdls = 0;
+
+	device->nb_queues   = 0;
+
+	device->nb_ports    = 0;
+
+	device->nb_q_md     = 0;
+
+	dev->data->nb_queues = 0;
+
+	dev->data->nb_ports = 0;
+
+
+	return 0;
+}
+
+static int
+assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *socket_id = opaque;
+	*socket_id = atoi(value);
+	if (*socket_id >= RTE_MAX_NUMA_NODES)
+		return -1;
+	return 0;
+}
+
+static int
+set_do_validation(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *do_val = opaque;
+	*do_val = atoi(value);
+	if (*do_val != 0)
+		*do_val = 1;
+
+	return 0;
+}
+static int
+set_do_test(const char *key __rte_unused, const char *value, void *opaque)
+{
+	int *do_test = opaque;
+
+	*do_test = atoi(value);
+
+	if (*do_test != 0)
+		*do_test = 1;
+	return 0;
+}
+
+static int
+opdl_probe(struct rte_vdev_device *vdev)
+{
+	static const struct rte_eventdev_ops evdev_opdl_ops = {
+		.dev_configure = opdl_dev_configure,
+		.dev_infos_get = opdl_info_get,
+		.dev_close = opdl_close,
+		.dev_start = opdl_start,
+		.dev_stop = opdl_stop,
+		.dump = opdl_dump,
+
+		.xstats_get = opdl_xstats_get,
+		.xstats_get_names = opdl_xstats_get_names,
+		.xstats_get_by_name = opdl_xstats_get_by_name,
+		.xstats_reset = opdl_xstats_reset,
+	};
+
+	static const char *const args[] = {
+		NUMA_NODE_ARG,
+		DO_VALIDATION_ARG,
+		DO_TEST_ARG,
+		NULL
+	};
+	const char *name;
+	const char *params;
+	struct rte_eventdev *dev;
+	struct opdl_evdev *opdl;
+	int socket_id = rte_socket_id();
+	int do_validation = 0;
+	int do_test = 0;
+	int str_len;
+	int test_result = 0;
+
+	name = rte_vdev_device_name(vdev);
+	params = rte_vdev_device_args(vdev);
+	if (params != NULL && params[0] != '\0') {
+		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
+
+		if (!kvlist) {
+			PMD_DRV_LOG(INFO,
+					"Ignoring unsupported parameters when creating device '%s'\n",
+					name);
+		} else {
+			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
+					assign_numa_node, &socket_id);
+			if (ret != 0) {
+				PMD_DRV_LOG(ERR,
+						"%s: Error parsing numa node parameter",
+						name);
+
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			ret = rte_kvargs_process(kvlist, DO_VALIDATION_ARG,
+					set_do_validation, &do_validation);
+			if (ret != 0) {
+				PMD_DRV_LOG(ERR,
+					"%s: Error parsing do validation parameter",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			ret = rte_kvargs_process(kvlist, DO_TEST_ARG,
+					set_do_test, &do_test);
+			if (ret != 0) {
+				PMD_DRV_LOG(ERR,
+					"%s: Error parsing do test parameter",
+					name);
+				rte_kvargs_free(kvlist);
+				return ret;
+			}
+
+			rte_kvargs_free(kvlist);
+		}
+	}
+	dev = rte_event_pmd_vdev_init(name,
+			sizeof(struct opdl_evdev), socket_id);
+
+	if (dev == NULL) {
+		PMD_DRV_LOG(ERR, "eventdev vdev init() failed");
+		return -EFAULT;
+	}
+
+	PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
+		      "Success - creating eventdev device %s, numa_node:[%d], do_valdation:[%s]"
+			  " , self_test:[%s]\n",
+		      dev->data->dev_id,
+		      name,
+		      socket_id,
+		      (do_validation ? "true" : "false"),
+			  (do_test ? "true" : "false"));
+
+	dev->dev_ops = &evdev_opdl_ops;
+
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return 0;
+
+	opdl = dev->data->dev_private;
+	opdl->data = dev->data;
+	opdl->socket = socket_id;
+	opdl->do_validation = do_validation;
+	opdl->do_test = do_test;
+	str_len = strlen(name);
+	memcpy(opdl->service_name, name, str_len);
+
+	return test_result;
+}
+
+static int
+opdl_remove(struct rte_vdev_device *vdev)
+{
+	const char *name;
+
+	name = rte_vdev_device_name(vdev);
+	if (name == NULL)
+		return -EINVAL;
+
+	PMD_DRV_LOG(INFO, "Closing eventdev opdl device %s\n", name);
+
+	return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_opdl_pmd_drv = {
+	.probe = opdl_probe,
+	.remove = opdl_remove
+};
+
+RTE_INIT(opdl_init_log);
+
+static void
+opdl_init_log(void)
+{
+	opdl_logtype_driver = rte_log_register("eventdev.opdl.driver");
+	if (opdl_logtype_driver >= 0)
+		rte_log_set_level(opdl_logtype_driver, RTE_LOG_INFO);
+}
+
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_OPDL_PMD, evdev_opdl_pmd_drv);
+RTE_PMD_REGISTER_PARAM_STRING(event_opdl, NUMA_NODE_ARG "=<int>"
+			      DO_VALIDATION_ARG "=<int>" DO_TEST_ARG "=<int>");
diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h
new file mode 100644
index 0000000..33bc8f2
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.h
@@ -0,0 +1,341 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _OPDL_EVDEV_H_
+#define _OPDL_EVDEV_H_
+
+#include <rte_eventdev.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_atomic.h>
+#include "opdl_ring.h"
+
+#define OPDL_QID_NUM_FIDS 1024
+#define OPDL_IQS_MAX 1
+#define OPDL_Q_PRIORITY_MAX 1
+#define OPDL_PORTS_MAX 64
+#define MAX_OPDL_CONS_Q_DEPTH 128
+/* OPDL size */
+#define OPDL_INFLIGHT_EVENTS_TOTAL 4096
+/* allow for lots of over-provisioning */
+#define OPDL_FRAGMENTS_MAX 1
+
+/* report dequeue burst sizes in buckets */
+#define OPDL_DEQ_STAT_BUCKET_SHIFT 2
+/* how many packets pulled from port by sched */
+#define SCHED_DEQUEUE_BURST_SIZE 32
+
+/* size of our history list */
+#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH)
+
+/* how many data points use for average stats */
+#define NUM_SAMPLES 64
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define OPDL_PMD_NAME RTE_STR(event_opdl)
+#define OPDL_PMD_NAME_MAX 64
+
+#define OPDL_INVALID_QID 255
+
+#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#define OPDL_NUM_POLL_BUCKETS  \
+	(MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT)
+
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+enum port_type {
+	OPDL_INVALID_PORT = 0,
+	OPDL_REGULAR_PORT = 1,
+	OPDL_PURE_RX_PORT,
+	OPDL_PURE_TX_PORT,
+	OPDL_ASYNC_PORT
+};
+
+enum queue_type {
+	OPDL_Q_TYPE_INVALID = 0,
+	OPDL_Q_TYPE_SINGLE_LINK = 1,
+	OPDL_Q_TYPE_ATOMIC,
+	OPDL_Q_TYPE_ORDERED
+};
+
+enum queue_pos {
+	OPDL_Q_POS_START = 0,
+	OPDL_Q_POS_MIDDLE,
+	OPDL_Q_POS_END
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t opdl_qe_flag_map[] = {
+	QE_FLAG_VALID /* NEW Event */,
+	QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+	QE_FLAG_COMPLETE /* RELEASE Event */,
+
+	/* Values which can be used for future support for partial
+	 * events, i.e. where one event comes back to the scheduler
+	 * as multiple which need to be tracked together
+	 */
+	QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+
+enum port_xstat_name {
+	claim_pkts_requested = 0,
+	claim_pkts_granted,
+	claim_non_empty,
+	claim_empty,
+	total_cycles,
+	max_num_port_xstat
+};
+
+#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat)
+
+struct opdl_port;
+
+typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port,
+		const struct rte_event ev[],
+		uint16_t num);
+
+typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port,
+		struct rte_event ev[],
+		uint16_t num);
+
+struct opdl_evdev;
+
+struct opdl_stage_meta_data {
+	uint32_t num_claimed;	/* number of entries claimed by this stage */
+	uint32_t burst_sz;	/* Port claim burst size */
+};
+
+struct opdl_port {
+
+	/* back pointer */
+	struct opdl_evdev *opdl;
+
+	/* enq handler & stage instance */
+	opdl_enq_operation enq;
+	struct opdl_stage *enq_stage_inst;
+
+	/* deq handler & stage instance */
+	opdl_deq_operation deq;
+	struct opdl_stage *deq_stage_inst;
+
+	/* port id has correctly been set */
+	uint8_t configured;
+
+	/* set when the port is initialized */
+	uint8_t initialized;
+
+	/* A numeric ID for the port */
+	uint8_t id;
+
+	/* Space for claimed entries */
+	struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH];
+
+	/* RX/REGULAR/TX/ASYNC - determined on position in queue */
+	enum port_type p_type;
+
+	/* if the claim is static atomic type  */
+	bool atomic_claim;
+
+	/* Queue linked to this port - internal queue id*/
+	uint8_t queue_id;
+
+	/* Queue linked to this port - external queue id*/
+	uint8_t external_qid;
+
+	/* Next queue linked to this port - external queue id*/
+	uint8_t next_external_qid;
+
+	/* number of instances of this stage */
+	uint32_t num_instance;
+
+	/* instance ID of this stage*/
+	uint32_t instance_id;
+
+	/* track packets in and out of this port */
+	uint64_t port_stat[max_num_port_xstat];
+	uint64_t start_cycles;
+};
+
+struct opdl_queue_meta_data {
+	uint8_t         ext_id;
+	enum queue_type type;
+	int8_t          setup;
+};
+
+struct opdl_xstats_entry {
+	struct rte_event_dev_xstats_name stat;
+	unsigned int id;
+	uint64_t *value;
+};
+
+struct opdl_queue {
+
+	/* Opdl ring this queue is associated with */
+	uint32_t opdl_id;
+
+	/* type and position have correctly been set */
+	uint8_t configured;
+
+	/* port number and associated ports have been associated */
+	uint8_t initialized;
+
+	/* type of this queue (Atomic, Ordered, Parallel, Direct)*/
+	enum queue_type q_type;
+
+	/* position of queue (START, MIDDLE, END) */
+	enum queue_pos q_pos;
+
+	/* external queue id. It is mapped to the queue position */
+	uint8_t external_qid;
+
+	struct opdl_port *ports[OPDL_PORTS_MAX];
+	uint32_t nb_ports;
+
+	/* priority, reserved for future */
+	uint8_t priority;
+};
+
+
+#define OPDL_TUR_PER_DEV 12
+
+/* PMD needs an extra queue per Opdl  */
+#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV)
+
+
+struct opdl_evdev {
+	struct rte_eventdev_data *data;
+
+	uint8_t started;
+
+	/* Max number of ports and queues*/
+	uint32_t max_port_nb;
+	uint32_t max_queue_nb;
+
+	/* slots in the opdl ring */
+	uint32_t nb_events_limit;
+
+	/*
+	 * Array holding all opdl for this device
+	 */
+	struct opdl_ring *opdl[OPDL_TUR_PER_DEV];
+	uint32_t nb_opdls;
+
+	struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES];
+	uint32_t nb_q_md;
+
+	/* Internal queues - one per logical queue */
+	struct opdl_queue
+		queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
+	uint32_t nb_queues;
+
+	struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX];
+
+	/* Contains all ports - load balanced and directed */
+	struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned;
+	uint32_t nb_ports;
+
+	uint8_t q_map_ex_to_in[OPDL_INVALID_QID];
+
+	/* Stats */
+	struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM];
+
+	char service_name[OPDL_PMD_NAME_MAX];
+	int socket;
+	int do_validation;
+	int do_test;
+};
+
+
+static inline struct opdl_evdev *
+opdl_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+static inline uint8_t
+opdl_pmd_dev_id(const struct opdl_evdev *opdl)
+{
+	return opdl->data->dev_id;
+}
+
+static inline const struct opdl_evdev *
+opdl_pmd_priv_const(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev,
+		uint16_t num, uint64_t wait);
+void opdl_event_schedule(struct rte_eventdev *dev);
+
+void opdl_xstats_init(struct rte_eventdev *dev);
+int opdl_xstats_uninit(struct rte_eventdev *dev);
+int opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size);
+int opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id);
+int opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id,
+		const uint32_t ids[],
+		uint32_t nb_ids);
+
+int opdl_add_event_handlers(struct rte_eventdev *dev);
+int build_all_dependencies(struct rte_eventdev *dev);
+int check_queues_linked(struct rte_eventdev *dev);
+int create_queues_and_rings(struct rte_eventdev *dev);
+int initialise_all_other_ports(struct rte_eventdev *dev);
+int initialise_queue_zero_ports(struct rte_eventdev *dev);
+int assign_internal_queue_ids(struct rte_eventdev *dev);
+void destroy_queues_and_rings(struct rte_eventdev *dev);
+
+#endif /* _OPDL_EVDEV_H_ */
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
new file mode 100644
index 0000000..c37d8bc
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -0,0 +1,936 @@
+/*-
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_errno.h>
+#include <rte_cycles.h>
+#include <rte_memzone.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+#include "opdl_log.h"
+
+
+static __rte_always_inline uint32_t
+enqueue_check(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	uint16_t i;
+
+	if (p->opdl->do_validation) {
+
+		for (i = 0; i < num; i++) {
+			if (ev[i].queue_id != p->next_external_qid) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "ERROR - port:[%u] - event wants"
+					     " to enq to q_id[%u],"
+					     " but should be [%u]",
+					     opdl_pmd_dev_id(p->opdl),
+					     p->id,
+					     ev[i].queue_id,
+					     p->next_external_qid);
+				rte_errno = -EINVAL;
+				return 0;
+			}
+		}
+
+		/* Stats */
+		if (p->p_type == OPDL_PURE_RX_PORT ||
+				p->p_type == OPDL_ASYNC_PORT) {
+			/* Stats */
+			if (num_events) {
+				p->port_stat[claim_pkts_requested] += num;
+				p->port_stat[claim_pkts_granted] += num_events;
+				p->port_stat[claim_non_empty]++;
+				p->start_cycles = rte_rdtsc();
+			} else {
+				p->port_stat[claim_empty]++;
+				p->start_cycles = 0;
+			}
+		} else {
+			if (p->start_cycles) {
+				uint64_t end_cycles = rte_rdtsc();
+				p->port_stat[total_cycles] +=
+					end_cycles - p->start_cycles;
+			}
+		}
+	} else {
+		if (num > 0 &&
+				ev[0].queue_id != p->next_external_qid) {
+			rte_errno = -EINVAL;
+			return 0;
+		}
+	}
+
+	return num;
+}
+
+static __rte_always_inline void
+update_on_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	if (p->opdl->do_validation) {
+		int16_t i;
+		for (i = 0; i < num; i++)
+			ev[i].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+
+		/* Stats */
+		if (num_events) {
+			p->port_stat[claim_pkts_requested] += num;
+			p->port_stat[claim_pkts_granted] += num_events;
+			p->port_stat[claim_non_empty]++;
+			p->start_cycles = rte_rdtsc();
+		} else {
+			p->port_stat[claim_empty]++;
+			p->start_cycles = 0;
+		}
+	} else {
+		if (num > 0)
+			ev[0].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+	}
+}
+
+
+/*
+ * Error RX enqueue:
+ *
+ *
+ */
+
+static uint16_t
+opdl_rx_error_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * RX enqueue:
+ *
+ * This function handles enqueue for a single input stage_inst with
+ *	threadsafe disabled or enabled. eg 1 thread using a stage_inst or
+ *	multiple threads sharing a stage_inst
+ */
+
+static uint16_t
+opdl_rx_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
+				   ev,
+				   num,
+				   false);
+	if (!enqueue_check(p, ev, num, enqueued))
+		return 0;
+
+
+	if (enqueued < num)
+		rte_errno = -ENOSPC;
+
+	return enqueued;
+}
+
+/*
+ * Error TX handler
+ *
+ */
+
+static uint16_t
+opdl_tx_error_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * TX single threaded claim
+ *
+ * This function handles dequeue for a single worker stage_inst with
+ *	threadsafe disabled. eg 1 thread using an stage_inst
+ */
+
+static uint16_t
+opdl_tx_dequeue_single_thread(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint16_t returned;
+
+	struct opdl_ring  *ring;
+
+	ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
+
+	returned = opdl_ring_copy_to_burst(ring,
+					   p->deq_stage_inst,
+					   ev,
+					   num,
+					   false);
+
+	update_on_dequeue(p, ev, num, returned);
+
+	return returned;
+}
+
+/*
+ * TX multi threaded claim
+ *
+ * This function handles dequeue for multiple worker stage_inst with
+ *	threadsafe disabled. eg multiple stage_inst each with its own instance
+ */
+
+static uint16_t
+opdl_tx_dequeue_multi_inst(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+				    (void *)ev,
+				    num,
+				    NULL,
+				    false,
+				    false);
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
+}
+
+
+/*
+ * Worker thread claim
+ *
+ */
+
+static uint16_t
+opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
+		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+			     "Attempt to dequeue num of events larger than port (%d) max",
+			     opdl_pmd_dev_id(p->opdl),
+			     p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+			(void *)ev,
+			num,
+			NULL,
+			false,
+			p->atomic_claim);
+
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return num_events;
+}
+
+/*
+ * Worker thread disclaim
+ */
+
+static uint16_t
+opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	uint32_t i = 0;
+
+	for (i = 0; i < num; i++)
+		opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
+				i, p->atomic_claim);
+
+	enqueued = opdl_stage_disclaim(p->enq_stage_inst,
+				       num,
+				       false);
+
+	return enqueue_check(p, ev, num, enqueued);
+}
+
+static __rte_always_inline struct opdl_stage *
+stage_for_port(struct opdl_queue *q, unsigned int i)
+{
+	if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
+		return q->ports[i]->enq_stage_inst;
+	else
+		return q->ports[i]->deq_stage_inst;
+}
+
+static int opdl_add_deps(struct opdl_evdev *device,
+			 int q_id,
+			 int deps_q_id)
+{
+	unsigned int i, j;
+	int status;
+	struct opdl_ring  *ring;
+	struct opdl_queue *queue = &device->queue[q_id];
+	struct opdl_queue *queue_deps = &device->queue[deps_q_id];
+	struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
+
+	/* sanity check that all stages are for same opdl ring */
+	for (i = 0; i < queue->nb_ports; i++) {
+		struct opdl_ring *r =
+			opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+		for (j = 0; j < queue_deps->nb_ports; j++) {
+			struct opdl_ring *rj =
+				opdl_stage_get_opdl_ring(
+						stage_for_port(queue_deps, j));
+			if (r != rj) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "Stages and dependents"
+					     " are not for same opdl ring",
+					     opdl_pmd_dev_id(device));
+				for (uint32_t k = 0;
+						k < device->nb_opdls; k++) {
+					opdl_ring_dump(device->opdl[k],
+							stdout);
+				}
+				return -EINVAL;
+			}
+		}
+	}
+
+	/* Gather all stages instance in deps */
+	for (i = 0; i < queue_deps->nb_ports; i++)
+		dep_stages[i] = stage_for_port(queue_deps, i);
+
+
+	/* Add all deps for each port->stage_inst in this queue */
+	for (i = 0; i < queue->nb_ports; i++) {
+
+		ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+
+		status = opdl_stage_deps_add(ring,
+				stage_for_port(queue, i),
+				queue->ports[i]->num_instance,
+				queue->ports[i]->instance_id,
+				dep_stages,
+				queue_deps->nb_ports);
+		if (status < 0)
+			return -EINVAL;
+	}
+
+	return 0;
+}
+
+int
+opdl_add_event_handlers(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	unsigned int i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->configured) {
+			if (port->p_type == OPDL_PURE_RX_PORT) {
+				port->enq = opdl_rx_enqueue;
+				port->deq = opdl_tx_error_dequeue;
+
+			} else if (port->p_type == OPDL_PURE_TX_PORT) {
+
+				port->enq = opdl_rx_error_enqueue;
+
+				if (port->num_instance == 1)
+					port->deq =
+						opdl_tx_dequeue_single_thread;
+				else
+					port->deq = opdl_tx_dequeue_multi_inst;
+
+			} else if (port->p_type == OPDL_REGULAR_PORT) {
+
+				port->enq = opdl_disclaim;
+				port->deq = opdl_claim;
+
+			} else if (port->p_type == OPDL_ASYNC_PORT) {
+
+				port->enq = opdl_rx_enqueue;
+
+				/* Always single instance */
+				port->deq = opdl_tx_dequeue_single_thread;
+			} else {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "port:[%u] has invalid port type - ",
+					     opdl_pmd_dev_id(port->opdl),
+					     port->id);
+				err = -EINVAL;
+				break;
+			}
+			port->initialized = 1;
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
+	return err;
+}
+
+int
+build_all_dependencies(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	uint8_t start_qid = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+		if (!queue->initialized)
+			break;
+
+		if (queue->q_pos == OPDL_Q_POS_START) {
+			start_qid = i;
+			continue;
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "dependency addition for queue:[%u] - FAILED",
+					     dev->data->dev_id,
+					     queue->external_qid);
+				break;
+			}
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_END) {
+			/* Add this dependency */
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "dependency addition for queue:[%u] - FAILED",
+					     dev->data->dev_id,
+					     queue->external_qid);
+				break;
+			}
+			/* Add dependency for rx on tx */
+			err = opdl_add_deps(device, start_qid, i);
+			if (err < 0) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "dependency addition for queue:[%u] - FAILED",
+					     dev->data->dev_id,
+					     queue->external_qid);
+				break;
+			}
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - dependencies built\n");
+
+	return err;
+}
+int
+check_queues_linked(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t nb_iq = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+
+		if (!queue->initialized)
+			break;
+
+		if (queue->external_qid == OPDL_INVALID_QID)
+			nb_iq++;
+
+		if (queue->nb_ports == 0) {
+			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+				     "queue:[%u] has no associated ports",
+				     dev->data->dev_id,
+				     i);
+			err = -EINVAL;
+			break;
+		}
+	}
+	if (!err) {
+		if ((i - nb_iq) != device->max_queue_nb) {
+			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+				     "%u queues counted but should be %u",
+				     dev->data->dev_id,
+				     i - nb_iq,
+				     device->max_queue_nb);
+			err = -1;
+		}
+
+	}
+	return err;
+}
+
+void
+destroy_queues_and_rings(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_opdls; i++) {
+		if (device->opdl[i])
+			opdl_ring_free(device->opdl[i]);
+	}
+
+	memset(&device->queue,
+			0,
+			sizeof(struct opdl_queue)
+			* RTE_EVENT_MAX_QUEUES_PER_DEV);
+}
+
+#define OPDL_ID(d)(d->nb_opdls - 1)
+
+static __rte_always_inline void
+initialise_queue(struct opdl_evdev *device,
+		enum queue_pos pos,
+		int32_t i)
+{
+	struct opdl_queue *queue = &device->queue[device->nb_queues];
+
+	if (i == -1) {
+		queue->q_type = OPDL_Q_TYPE_ORDERED;
+		queue->external_qid = OPDL_INVALID_QID;
+	} else {
+		queue->q_type = device->q_md[i].type;
+		queue->external_qid = device->q_md[i].ext_id;
+		/* Add ex->in for queues setup */
+		device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
+	}
+	queue->opdl_id = OPDL_ID(device);
+	queue->q_pos = pos;
+	queue->nb_ports = 0;
+	queue->configured = 1;
+
+	device->nb_queues++;
+}
+
+
+static __rte_always_inline int
+create_opdl(struct opdl_evdev *device)
+{
+	int err = 0;
+
+	char name[RTE_MEMZONE_NAMESIZE];
+
+	snprintf(name, RTE_MEMZONE_NAMESIZE,
+			"%s_%u", device->service_name, device->nb_opdls);
+
+	device->opdl[device->nb_opdls] =
+		opdl_ring_create(name,
+				device->nb_events_limit,
+				sizeof(struct rte_event),
+				device->max_port_nb * 2,
+				device->socket);
+
+	if (!device->opdl[device->nb_opdls]) {
+		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+			     "opdl ring %u creation - FAILED",
+			     opdl_pmd_dev_id(device),
+			     device->nb_opdls);
+		err = -EINVAL;
+	} else {
+		device->nb_opdls++;
+	}
+	return err;
+}
+
+static __rte_always_inline int
+create_link_opdl(struct opdl_evdev *device, uint32_t index)
+{
+
+	int err = 0;
+
+	if (device->q_md[index + 1].type !=
+			OPDL_Q_TYPE_SINGLE_LINK) {
+
+		/* async queue with regular
+		 * queue following it
+		 */
+
+		/* create a new opdl ring */
+		err = create_opdl(device);
+		if (!err) {
+			/* create an initial
+			 * dummy queue for new opdl
+			 */
+			initialise_queue(device,
+					OPDL_Q_POS_START,
+					-1);
+		} else {
+			err = -EINVAL;
+		}
+	} else {
+		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+			     "queue %u, two consecutive"
+			     " SINGLE_LINK queues, not allowed",
+			     opdl_pmd_dev_id(device),
+			     index);
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
+int
+create_queues_and_rings(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	device->nb_queues = 0;
+
+	if (device->nb_ports != device->max_port_nb) {
+		PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
+				" number:%u for this device",
+				device->nb_ports,
+				device->max_port_nb);
+		err = -1;
+	}
+
+	if (!err) {
+		/* We will have at least one opdl so create it now */
+		err = create_opdl(device);
+	}
+
+	if (!err) {
+
+		/* Create 1st "dummy" queue */
+		initialise_queue(device,
+				 OPDL_Q_POS_START,
+				 -1);
+
+		for (uint32_t i = 0; i < device->nb_q_md; i++) {
+
+			/* Check */
+			if (!device->q_md[i].setup) {
+
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "queue meta data slot %u"
+					     " not setup - FAILING",
+					     dev->data->dev_id,
+					     i);
+				err = -EINVAL;
+				break;
+			} else if (device->q_md[i].type !=
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				if (!device->q_md[i + 1].setup) {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue at the end
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_END,
+							i);
+
+				} else {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue in the middle
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_MIDDLE,
+							i);
+				}
+			} else if (device->q_md[i].type ==
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				/* create last queue for this opdl */
+				initialise_queue(device,
+						OPDL_Q_POS_END,
+						i);
+
+				err = create_link_opdl(device, i);
+
+				if (err)
+					break;
+
+
+			}
+		}
+	}
+	if (err)
+		destroy_queues_and_rings(dev);
+
+	return err;
+}
+
+
+int
+initialise_all_other_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_stage *stage_inst = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		struct opdl_queue *queue = &device->queue[port->queue_id];
+
+		if (port->queue_id == 0) {
+			continue;
+		} else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
+
+			if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+
+				/* Regular port with claim/disclaim */
+				stage_inst = opdl_stage_add(
+					device->opdl[queue->opdl_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = stage_inst;
+
+				if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
+					port->atomic_claim = true;
+				else
+					port->atomic_claim = false;
+
+				port->p_type =  OPDL_REGULAR_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else if (queue->q_pos == OPDL_Q_POS_END) {
+
+				/* tx port  */
+				stage_inst = opdl_stage_add(
+					device->opdl[queue->opdl_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = NULL;
+				port->p_type = OPDL_PURE_TX_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else {
+
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "port %u:, linked incorrectly"
+					     " to a q_pos START/INVALID %u",
+					     opdl_pmd_dev_id(port->opdl),
+					     port->id,
+					     queue->q_pos);
+				err = -EINVAL;
+				break;
+			}
+
+		} else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
+
+			port->p_type = OPDL_ASYNC_PORT;
+
+			/* -- tx -- */
+			stage_inst = opdl_stage_add(
+				device->opdl[queue->opdl_id],
+					false,
+					false); /* First stage */
+			port->deq_stage_inst = stage_inst;
+
+			/* Add the port to the queue array of ports */
+			queue->ports[queue->nb_ports] = port;
+			port->instance_id = queue->nb_ports;
+			queue->nb_ports++;
+
+			if (queue->nb_ports > 1) {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "queue %u:, setup as SINGLE_LINK"
+					     " but has more than one port linked",
+					     opdl_pmd_dev_id(port->opdl),
+					     queue->external_qid);
+				err = -EINVAL;
+				break;
+			}
+
+			/* -- single instance rx for next opdl -- */
+			uint8_t next_qid =
+				device->q_map_ex_to_in[queue->external_qid] + 1;
+			if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
+					device->queue[next_qid].configured) {
+
+				/* Remap the queue */
+				queue = &device->queue[next_qid];
+
+				stage_inst = opdl_stage_add(
+					device->opdl[queue->opdl_id],
+						false,
+						true);
+				port->enq_stage_inst = stage_inst;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+				if (queue->nb_ports > 1) {
+					PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+						"dummy queue %u: for "
+						"port %u, "
+						"SINGLE_LINK but has more "
+						"than one port linked",
+						opdl_pmd_dev_id(port->opdl),
+						next_qid,
+						port->id);
+					err = -EINVAL;
+					break;
+				}
+				/* Set this queue to initialized as it is never
+				 * referenced by any ports
+				 */
+				queue->initialized = 1;
+			}
+		}
+	}
+
+	/* Now that all ports are initialised we need to
+	 * setup the last bit of stage md
+	 */
+	if (!err) {
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+
+			if (port->configured &&
+					(port->queue_id != OPDL_INVALID_QID)) {
+				if (queue->nb_ports == 0) {
+					PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+						"queue:[%u] has no ports"
+						" linked to it",
+						opdl_pmd_dev_id(port->opdl),
+						port->id);
+					err = -EINVAL;
+					break;
+				}
+
+				port->num_instance = queue->nb_ports;
+				port->initialized = 1;
+				queue->initialized = 1;
+			} else {
+				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
+					     "Port:[%u] not configured  invalid"
+					     " queue configuration",
+					     opdl_pmd_dev_id(port->opdl),
+					     port->id);
+				err = -EINVAL;
+				break;
+			}
+		}
+	}
+	return err;
+}
+
+int
+initialise_queue_zero_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	uint8_t mt_rx = 0;
+	struct opdl_stage *stage_inst = NULL;
+	struct opdl_queue *queue = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	/* Assign queue zero and figure out how many Q0 ports we have */
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->queue_id == OPDL_INVALID_QID) {
+			port->queue_id = 0;
+			port->external_qid = OPDL_INVALID_QID;
+			port->p_type = OPDL_PURE_RX_PORT;
+			mt_rx++;
+		}
+	}
+
+	/* Create the stage */
+	stage_inst = opdl_stage_add(device->opdl[0],
+			(mt_rx > 1 ? true : false),
+			true);
+	if (stage_inst) {
+
+		/* Assign the new created input stage to all relevant ports */
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			if (port->queue_id == 0) {
+				queue = &device->queue[port->queue_id];
+				port->enq_stage_inst = stage_inst;
+				port->deq_stage_inst = NULL;
+				port->configured = 1;
+				port->initialized = 1;
+
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			}
+		}
+	} else {
+		err = -1;
+	}
+	return err;
+}
+
+int
+assign_internal_queue_ids(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->external_qid != OPDL_INVALID_QID) {
+			port->queue_id =
+				device->q_map_ex_to_in[port->external_qid];
+
+			/* Now do the external_qid of the next queue */
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+			if (queue->q_pos == OPDL_Q_POS_END)
+				port->next_external_qid =
+				device->queue[port->queue_id + 2].external_qid;
+			else
+				port->next_external_qid =
+				device->queue[port->queue_id + 1].external_qid;
+		}
+	}
+	return err;
+}
diff --git a/drivers/event/opdl/opdl_evdev_xstats.c b/drivers/event/opdl/opdl_evdev_xstats.c
new file mode 100644
index 0000000..94dfeee
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_xstats.c
@@ -0,0 +1,178 @@
+/*-
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ */
+
+#include "opdl_evdev.h"
+#include "opdl_log.h"
+
+static const char * const port_xstat_str[] = {
+
+	"claim_pkts_requested",
+	"claim_pkts_granted",
+	"claim_non_empty",
+	"claim_empty",
+	"total_cycles",
+};
+
+
+void
+opdl_xstats_init(struct rte_eventdev *dev)
+{
+	uint32_t i, j;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+		struct opdl_port *port = &device->ports[i];
+
+		for (j = 0; j < max_num_port_xstat; j++) {
+			uint32_t index = (i * max_num_port_xstat) + j;
+
+			/* Name */
+			sprintf(device->port_xstat[index].stat.name,
+			       "port_%02u_%s",
+			       i,
+			       port_xstat_str[j]);
+
+			/* ID */
+			device->port_xstat[index].id = index;
+
+			/* Stats ptr */
+			device->port_xstat[index].value = &port->port_stat[j];
+		}
+	}
+}
+
+int
+opdl_xstats_uninit(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return 0;
+
+	memset(device->port_xstat,
+	       0,
+	       sizeof(device->port_xstat));
+
+	return 0;
+}
+
+int
+opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	if (mode == RTE_EVENT_DEV_XSTATS_DEVICE ||
+			mode == RTE_EVENT_DEV_XSTATS_QUEUE)
+		return -EINVAL;
+
+	if (queue_port_id >= device->max_port_nb)
+		return -EINVAL;
+
+	if (size < max_num_port_xstat)
+		return max_num_port_xstat;
+
+	uint32_t port_idx = queue_port_id * max_num_port_xstat;
+
+	for (uint32_t j = 0; j < max_num_port_xstat; j++) {
+
+		strcpy(xstats_names[j].name,
+				device->port_xstat[j + port_idx].stat.name);
+		ids[j] = device->port_xstat[j + port_idx].id;
+	}
+
+	return max_num_port_xstat;
+}
+
+int
+opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		uint8_t queue_port_id,
+		const unsigned int ids[],
+		uint64_t values[], unsigned int n)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	if (mode == RTE_EVENT_DEV_XSTATS_DEVICE ||
+			mode == RTE_EVENT_DEV_XSTATS_QUEUE)
+		return -EINVAL;
+
+	if (queue_port_id >= device->max_port_nb)
+		return -EINVAL;
+
+	if (n > max_num_port_xstat)
+		return -EINVAL;
+
+	uint32_t p_start = queue_port_id * max_num_port_xstat;
+	uint32_t p_finish = p_start + max_num_port_xstat;
+
+	for (uint32_t i = 0; i < n; i++) {
+		if (ids[i] < p_start || ids[i] >= p_finish)
+			return -EINVAL;
+
+		values[i] = *(device->port_xstat[ids[i]].value);
+	}
+
+	return n;
+}
+
+uint64_t
+opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	uint32_t max_index = device->max_port_nb * max_num_port_xstat;
+
+	for (uint32_t i = 0; i < max_index; i++) {
+
+		if (strncmp(name,
+			   device->port_xstat[i].stat.name,
+			   RTE_EVENT_DEV_XSTATS_NAME_SIZE) == 0) {
+			if (id != NULL)
+				*id = i;
+			if (device->port_xstat[i].value)
+				return *(device->port_xstat[i].value);
+			break;
+		}
+	}
+	return -EINVAL;
+}
+
+int
+opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id, const uint32_t ids[],
+		uint32_t nb_ids)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	if (!device->do_validation)
+		return -ENOTSUP;
+
+	RTE_SET_USED(dev);
+	RTE_SET_USED(mode);
+	RTE_SET_USED(queue_port_id);
+	RTE_SET_USED(ids);
+	RTE_SET_USED(nb_ids);
+
+	return -ENOTSUP;
+}
-- 
2.7.5



More information about the dev mailing list