[dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs
Gage Eads
gage.eads at intel.com
Fri Dec 2 20:45:56 CET 2016
This commit adds buffered enqueue functionality to the eventdev API.
It is conceptually similar to the ethdev API's tx buffering, however
with a smaller API surface and no dropping of events.
Signed-off-by: Gage Eads <gage.eads at intel.com>
---
lib/librte_eventdev/rte_eventdev.c | 29 ++++++++++
lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index 17ce5c3..564573f 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -219,6 +219,7 @@
uint16_t *links_map;
uint8_t *ports_dequeue_depth;
uint8_t *ports_enqueue_depth;
+ struct rte_eventdev_enqueue_buffer *port_buffers;
unsigned int i;
EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports,
@@ -272,6 +273,19 @@
"nb_ports %u", nb_ports);
return -(ENOMEM);
}
+
+ /* Allocate memory to store port enqueue buffers */
+ dev->data->port_buffers =
+ rte_zmalloc_socket("eventdev->port_buffers",
+ sizeof(dev->data->port_buffers[0]) * nb_ports,
+ RTE_CACHE_LINE_SIZE, dev->data->socket_id);
+ if (dev->data->port_buffers == NULL) {
+ dev->data->nb_ports = 0;
+ EDEV_LOG_ERR("failed to get memory for port enq"
+ " buffers, nb_ports %u", nb_ports);
+ return -(ENOMEM);
+ }
+
} else if (dev->data->ports != NULL && nb_ports != 0) {/* re-config */
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
@@ -279,6 +293,7 @@
ports_dequeue_depth = dev->data->ports_dequeue_depth;
ports_enqueue_depth = dev->data->ports_enqueue_depth;
links_map = dev->data->links_map;
+ port_buffers = dev->data->port_buffers;
for (i = nb_ports; i < old_nb_ports; i++)
(*dev->dev_ops->port_release)(ports[i]);
@@ -324,6 +339,17 @@
return -(ENOMEM);
}
+ /* Realloc memory to store port enqueue buffers */
+ port_buffers = rte_realloc(dev->data->port_buffers,
+ sizeof(dev->data->port_buffers[0]) * nb_ports,
+ RTE_CACHE_LINE_SIZE);
+ if (port_buffers == NULL) {
+ dev->data->nb_ports = 0;
+ EDEV_LOG_ERR("failed to realloc mem for port enq"
+ " buffers, nb_ports %u", nb_ports);
+ return -(ENOMEM);
+ }
+
if (nb_ports > old_nb_ports) {
uint8_t new_ps = nb_ports - old_nb_ports;
@@ -336,12 +362,15 @@
memset(links_map +
(old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV),
0, sizeof(ports_enqueue_depth[0]) * new_ps);
+ memset(port_buffers + old_nb_ports, 0,
+ sizeof(port_buffers[0]) * new_ps);
}
dev->data->ports = ports;
dev->data->ports_dequeue_depth = ports_dequeue_depth;
dev->data->ports_enqueue_depth = ports_enqueue_depth;
dev->data->links_map = links_map;
+ dev->data->port_buffers = port_buffers;
} else if (dev->data->ports != NULL && nb_ports == 0) {
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index 778d6dc..3f24342 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -246,6 +246,7 @@
#include <rte_dev.h>
#include <rte_memory.h>
#include <rte_errno.h>
+#include <rte_memcpy.h>
#define EVENTDEV_NAME_SKELETON_PMD event_skeleton
/**< Skeleton event device PMD name */
@@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *port, struct rte_event ev[],
#define RTE_EVENTDEV_NAME_MAX_LEN (64)
/**< @internal Max length of name of event PMD */
+#define RTE_EVENT_BUF_MAX 16
+/**< Maximum number of events in an enqueue buffer. */
+
+/**
+ * @internal
+ * An enqueue buffer for each port.
+ *
+ * The reason this struct is in the header is for inlining the function calls
+ * to enqueue, as doing a function call per packet would incur significant
+ * performance overhead.
+ *
+ * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush()
+ */
+struct rte_eventdev_enqueue_buffer {
+ /**> Count of events in this buffer */
+ uint16_t count;
+ /**> Array of events in this buffer */
+ struct rte_event events[RTE_EVENT_BUF_MAX];
+} __rte_cache_aligned;
+
/**
* @internal
* The data part, with no function pointers, associated with each device.
@@ -983,6 +1004,8 @@ struct rte_eventdev_data {
/**< Number of event ports. */
void **ports;
/**< Array of pointers to ports. */
+ struct rte_eventdev_enqueue_buffer *port_buffers;
+ /**< Array of port enqueue buffers. */
uint8_t *ports_dequeue_depth;
/**< Array of port dequeue depth. */
uint8_t *ports_enqueue_depth;
@@ -1132,6 +1155,89 @@ struct rte_eventdev {
}
/**
+ * Flush the enqueue buffer of the event port specified by *port_id*, in the
+ * event device specified by *dev_id*.
+ *
+ * This function attempts to flush as many of the buffered events as possible,
+ * and returns the number of flushed events. Any unflushed events remain in
+ * the buffer.
+ *
+ * @param dev_id
+ * The identifier of the device.
+ * @param port_id
+ * The identifier of the event port.
+ *
+ * @return
+ * The number of event objects actually flushed to the event device.
+ *
+ * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst()
+ * \see rte_event_port_enqueue_depth()
+ */
+static inline int
+rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id)
+{
+ struct rte_eventdev *dev = &rte_eventdevs[dev_id];
+ struct rte_eventdev_enqueue_buffer *buf =
+ &dev->data->port_buffers[port_id];
+ int n;
+
+ n = rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->count);
+
+ if (n != buf->count)
+ memmove(buf->events, &buf->events[n], buf->count - n);
+
+ buf->count -= n;
+
+ return n;
+}
+
+/**
+ * Buffer an event object supplied in *rte_event* structure for future
+ * enqueueing on an event device designated by its *dev_id* through the event
+ * port specified by *port_id*.
+ *
+ * This function takes a single event and buffers it for later enqueuing to the
+ * queue specified in the event structure. If the buffer is full, the
+ * function will attempt to flush the buffer before buffering the event.
+ * If the flush operation fails, the previously buffered events remain in the
+ * buffer and an error is returned to the user to indicate that *ev* was not
+ * buffered.
+ *
+ * @param dev_id
+ * The identifier of the device.
+ * @param port_id
+ * The identifier of the event port.
+ * @param ev
+ * Pointer to struct rte_event
+ *
+ * @return
+ * - 0 on success
+ * - <0 on failure. Failure can occur if the event port's output queue is
+ * backpressured, for instance.
+ *
+ * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst()
+ * \see rte_event_port_enqueue_depth()
+ */
+static inline int
+rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_event *ev)
+{
+ struct rte_eventdev *dev = &rte_eventdevs[dev_id];
+ struct rte_eventdev_enqueue_buffer *buf =
+ &dev->data->port_buffers[port_id];
+ int ret;
+
+ /* If necessary, flush the enqueue buffer to make space for ev. */
+ if (buf->count == RTE_EVENT_BUF_MAX) {
+ ret = rte_event_enqueue_buffer_flush(dev_id, port_id);
+ if (ret == 0)
+ return -ENOSPC;
+ }
+
+ rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
+ return 0;
+}
+
+/**
* Converts nanoseconds to *wait* value for rte_event_dequeue()
*
* If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT flag then
--
1.9.1
More information about the dev
mailing list