[dpdk-dev] [PATCH v4 09/17] event/sw: add support for event queues

Harry van Haaren harry.van.haaren at intel.com
Fri Mar 10 20:43:24 CET 2017


From: Bruce Richardson <bruce.richardson at intel.com>

Add in the data structures for the event queues, and the eventdev
functions to create and destroy those queues.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>

---

v3 -> v4:
- return -ENOTSUP for CFG_ALL_TYPES, as it is not supported
- fix bug when reordered queue is re-configured for second time
---
 drivers/event/sw/iq_ring.h  | 176 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/event/sw/sw_evdev.c | 166 +++++++++++++++++++++++++++++++++++++++++
 drivers/event/sw/sw_evdev.h |   5 ++
 3 files changed, 347 insertions(+)
 create mode 100644 drivers/event/sw/iq_ring.h

diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h
new file mode 100644
index 0000000..d480d15
--- /dev/null
+++ b/drivers/event/sw/iq_ring.h
@@ -0,0 +1,176 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Ring structure definitions used for the internal ring buffers of the
+ * SW eventdev implementation. These are designed for single-core use only.
+ */
+#ifndef _IQ_RING_
+#define _IQ_RING_
+
+#include <stdint.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eventdev.h>
+
+#define IQ_RING_NAMESIZE 12
+#define QID_IQ_DEPTH 512
+#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
+
+struct iq_ring {
+	char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
+	uint16_t write_idx;
+	uint16_t read_idx;
+
+	struct rte_event ring[QID_IQ_DEPTH];
+};
+
+#ifndef force_inline
+#define force_inline inline __attribute__((always_inline))
+#endif
+
+static inline struct iq_ring *
+iq_ring_create(const char *name, unsigned int socket_id)
+{
+	struct iq_ring *retval;
+
+	retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
+	if (retval == NULL)
+		goto end;
+
+	snprintf(retval->name, sizeof(retval->name), "%s", name);
+	retval->write_idx = retval->read_idx = 0;
+end:
+	return retval;
+}
+
+static inline void
+iq_ring_destroy(struct iq_ring *r)
+{
+	rte_free(r);
+}
+
+static force_inline uint16_t
+iq_ring_count(const struct iq_ring *r)
+{
+	return r->write_idx - r->read_idx;
+}
+
+static force_inline uint16_t
+iq_ring_free_count(const struct iq_ring *r)
+{
+	return QID_IQ_MASK - iq_ring_count(r);
+}
+
+static force_inline uint16_t
+iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+	const uint16_t read = r->read_idx;
+	uint16_t write = r->write_idx;
+	const uint16_t space = read + QID_IQ_MASK - write;
+	uint16_t i;
+
+	if (space < nb_qes)
+		nb_qes = space;
+
+	for (i = 0; i < nb_qes; i++, write++)
+		r->ring[write & QID_IQ_MASK] = qes[i];
+
+	r->write_idx = write;
+
+	return nb_qes;
+}
+
+static force_inline uint16_t
+iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+	uint16_t read = r->read_idx;
+	const uint16_t write = r->write_idx;
+	const uint16_t items = write - read;
+	uint16_t i;
+
+	for (i = 0; i < nb_qes; i++, read++)
+		qes[i] = r->ring[read & QID_IQ_MASK];
+
+	if (items < nb_qes)
+		nb_qes = items;
+
+	r->read_idx += nb_qes;
+
+	return nb_qes;
+}
+
+/* assumes there is space, from a previous dequeue_burst */
+static force_inline uint16_t
+iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+	uint16_t i, read = r->read_idx;
+
+	for (i = nb_qes; i-- > 0; )
+		r->ring[--read & QID_IQ_MASK] = qes[i];
+
+	r->read_idx = read;
+	return nb_qes;
+}
+
+static force_inline const struct rte_event *
+iq_ring_peek(const struct iq_ring *r)
+{
+	return &r->ring[r->read_idx & QID_IQ_MASK];
+}
+
+static force_inline void
+iq_ring_pop(struct iq_ring *r)
+{
+	r->read_idx++;
+}
+
+static force_inline int
+iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
+{
+	const uint16_t read = r->read_idx;
+	const uint16_t write = r->write_idx;
+	const uint16_t space = read + QID_IQ_MASK - write;
+
+	if (space == 0)
+		return -1;
+
+	r->ring[write & QID_IQ_MASK] = *qe;
+
+	r->write_idx = write + 1;
+
+	return 0;
+}
+
+#endif
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index d1fa3a7..eaf8e77 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -38,12 +38,176 @@
 #include <rte_ring.h>
 
 #include "sw_evdev.h"
+#include "iq_ring.h"
 
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define NUMA_NODE_ARG "numa_node"
 #define SCHED_QUANTA_ARG "sched_quanta"
 #define CREDIT_QUANTA_ARG "credit_quanta"
 
+static int32_t
+qid_init(struct sw_evdev *sw, unsigned int idx, int type,
+		const struct rte_event_queue_conf *queue_conf)
+{
+	unsigned int i;
+	int dev_id = sw->data->dev_id;
+	int socket_id = sw->data->socket_id;
+	char buf[IQ_RING_NAMESIZE];
+	struct sw_qid *qid = &sw->qids[idx];
+
+	for (i = 0; i < SW_IQS_MAX; i++) {
+		snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
+		qid->iq[i] = iq_ring_create(buf, socket_id);
+		if (!qid->iq[i]) {
+			SW_LOG_DBG("ring create failed");
+			goto cleanup;
+		}
+	}
+
+	/* Initialize the FID structures to no pinning (-1), and zero packets */
+	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
+	for (i = 0; i < RTE_DIM(qid->fids); i++)
+		qid->fids[i] = fid;
+
+	qid->id = idx;
+	qid->type = type;
+	qid->priority = queue_conf->priority;
+
+	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+		char ring_name[RTE_RING_NAMESIZE];
+		uint32_t window_size;
+
+		/* rte_ring and window_size_mask require require window_size to
+		 * be a power-of-2.
+		 */
+		window_size = rte_align32pow2(
+				queue_conf->nb_atomic_order_sequences);
+
+		qid->window_size = window_size - 1;
+
+		if (!window_size) {
+			SW_LOG_DBG(
+				"invalid reorder_window_size for ordered queue\n"
+				);
+			goto cleanup;
+		}
+
+		snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i);
+		qid->reorder_buffer = rte_zmalloc_socket(buf,
+				window_size * sizeof(qid->reorder_buffer[0]),
+				0, socket_id);
+		if (!qid->reorder_buffer) {
+			SW_LOG_DBG("reorder_buffer malloc failed\n");
+			goto cleanup;
+		}
+
+		memset(&qid->reorder_buffer[0],
+		       0,
+		       window_size * sizeof(qid->reorder_buffer[0]));
+
+		snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist",
+				dev_id, idx);
+
+		/* lookup the ring, and if it already exists, free it */
+		struct rte_ring *cleanup = rte_ring_lookup(ring_name);
+		if (cleanup)
+			rte_ring_free(cleanup);
+
+		qid->reorder_buffer_freelist = rte_ring_create(ring_name,
+				window_size,
+				socket_id,
+				RING_F_SP_ENQ | RING_F_SC_DEQ);
+		if (!qid->reorder_buffer_freelist) {
+			SW_LOG_DBG("freelist ring create failed");
+			goto cleanup;
+		}
+
+		/* Populate the freelist with reorder buffer entries. Enqueue
+		 * 'window_size - 1' entries because the rte_ring holds only
+		 * that many.
+		 */
+		for (i = 0; i < window_size - 1; i++) {
+			if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
+						&qid->reorder_buffer[i]) < 0)
+				goto cleanup;
+		}
+
+		qid->reorder_buffer_index = 0;
+		qid->cq_next_tx = 0;
+	}
+
+	qid->initialized = 1;
+
+	return 0;
+
+cleanup:
+	for (i = 0; i < SW_IQS_MAX; i++) {
+		if (qid->iq[i])
+			iq_ring_destroy(qid->iq[i]);
+	}
+
+	if (qid->reorder_buffer) {
+		rte_free(qid->reorder_buffer);
+		qid->reorder_buffer = NULL;
+	}
+
+	if (qid->reorder_buffer_freelist) {
+		rte_ring_free(qid->reorder_buffer_freelist);
+		qid->reorder_buffer_freelist = NULL;
+	}
+
+	return -EINVAL;
+}
+
+static int
+sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+		const struct rte_event_queue_conf *conf)
+{
+	int type;
+
+	switch (conf->event_queue_cfg) {
+	case RTE_EVENT_QUEUE_CFG_SINGLE_LINK:
+		type = SW_SCHED_TYPE_DIRECT;
+		break;
+	case RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY:
+		type = RTE_SCHED_TYPE_ATOMIC;
+		break;
+	case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
+		type = RTE_SCHED_TYPE_ORDERED;
+		break;
+	case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
+		type = RTE_SCHED_TYPE_PARALLEL;
+		break;
+	case RTE_EVENT_QUEUE_CFG_ALL_TYPES:
+		SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
+		return -ENOTSUP;
+	default:
+		SW_LOG_ERR("Unknown queue type %d requested\n",
+			   conf->event_queue_cfg);
+		return -EINVAL;
+	}
+
+	struct sw_evdev *sw = sw_pmd_priv(dev);
+	return qid_init(sw, queue_id, type, conf);
+}
+
+static void
+sw_queue_release(struct rte_eventdev *dev, uint8_t id)
+{
+	struct sw_evdev *sw = sw_pmd_priv(dev);
+	struct sw_qid *qid = &sw->qids[id];
+	uint32_t i;
+
+	for (i = 0; i < SW_IQS_MAX; i++)
+		iq_ring_destroy(qid->iq[i]);
+
+	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+		rte_free(qid->reorder_buffer);
+		rte_ring_free(qid->reorder_buffer_freelist);
+	}
+	memset(qid, 0, sizeof(*qid));
+}
+
 static void
 sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
 				 struct rte_event_queue_conf *conf)
@@ -147,6 +311,8 @@ sw_probe(const char *name, const char *params)
 			.dev_infos_get = sw_info_get,
 
 			.queue_def_conf = sw_queue_def_conf,
+			.queue_setup = sw_queue_setup,
+			.queue_release = sw_queue_release,
 			.port_def_conf = sw_port_def_conf,
 	};
 
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index fda57df..ddf0cd2 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -52,6 +52,8 @@
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define SW_PMD_NAME RTE_STR(event_sw)
 
+#define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
 #ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
 #define SW_LOG_INFO(fmt, args...) \
 	RTE_LOG(INFO, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
@@ -139,6 +141,9 @@ struct sw_evdev {
 	 */
 	uint32_t nb_events_limit;
 
+	/* Internal queues - one per logical queue */
+	struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
 	int32_t sched_quanta;
 
 	uint32_t credit_update_quanta;
-- 
2.7.4



More information about the dev mailing list