[dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs

Jerin Jacob jerin.jacob at caviumnetworks.com
Fri Dec 2 22:18:48 CET 2016


On Fri, Dec 02, 2016 at 01:45:56PM -0600, Gage Eads wrote:
> This commit adds buffered enqueue functionality to the eventdev API.
> It is conceptually similar to the ethdev API's tx buffering, however
> with a smaller API surface and no dropping of events.

Hello Gage,

Different implementation may have different strategies to hold the buffers.
and some does not need to hold the buffers if it is DDR backed.
IHMO, This may not be the candidate for common code. I guess you can move this
to driver side and abstract under SW driver's enqueue_burst.


> 
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> ---
>  lib/librte_eventdev/rte_eventdev.c |  29 ++++++++++
>  lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++++++++
>  2 files changed, 135 insertions(+)
> 
> diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
> index 17ce5c3..564573f 100644
> --- a/lib/librte_eventdev/rte_eventdev.c
> +++ b/lib/librte_eventdev/rte_eventdev.c
> @@ -219,6 +219,7 @@
>  	uint16_t *links_map;
>  	uint8_t *ports_dequeue_depth;
>  	uint8_t *ports_enqueue_depth;
> +	struct rte_eventdev_enqueue_buffer *port_buffers;
>  	unsigned int i;
>  
>  	EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports,
> @@ -272,6 +273,19 @@
>  					"nb_ports %u", nb_ports);
>  			return -(ENOMEM);
>  		}
> +
> +		/* Allocate memory to store port enqueue buffers */
> +		dev->data->port_buffers =
> +			rte_zmalloc_socket("eventdev->port_buffers",
> +			sizeof(dev->data->port_buffers[0]) * nb_ports,
> +			RTE_CACHE_LINE_SIZE, dev->data->socket_id);
> +		if (dev->data->port_buffers == NULL) {
> +			dev->data->nb_ports = 0;
> +			EDEV_LOG_ERR("failed to get memory for port enq"
> +				     " buffers, nb_ports %u", nb_ports);
> +			return -(ENOMEM);
> +		}
> +
>  	} else if (dev->data->ports != NULL && nb_ports != 0) {/* re-config */
>  		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
>  
> @@ -279,6 +293,7 @@
>  		ports_dequeue_depth = dev->data->ports_dequeue_depth;
>  		ports_enqueue_depth = dev->data->ports_enqueue_depth;
>  		links_map = dev->data->links_map;
> +		port_buffers = dev->data->port_buffers;
>  
>  		for (i = nb_ports; i < old_nb_ports; i++)
>  			(*dev->dev_ops->port_release)(ports[i]);
> @@ -324,6 +339,17 @@
>  			return -(ENOMEM);
>  		}
>  
> +		/* Realloc memory to store port enqueue buffers */
> +		port_buffers = rte_realloc(dev->data->port_buffers,
> +			sizeof(dev->data->port_buffers[0]) * nb_ports,
> +			RTE_CACHE_LINE_SIZE);
> +		if (port_buffers == NULL) {
> +			dev->data->nb_ports = 0;
> +			EDEV_LOG_ERR("failed to realloc mem for port enq"
> +				     " buffers, nb_ports %u", nb_ports);
> +			return -(ENOMEM);
> +		}
> +
>  		if (nb_ports > old_nb_ports) {
>  			uint8_t new_ps = nb_ports - old_nb_ports;
>  
> @@ -336,12 +362,15 @@
>  			memset(links_map +
>  				(old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV),
>  				0, sizeof(ports_enqueue_depth[0]) * new_ps);
> +			memset(port_buffers + old_nb_ports, 0,
> +				sizeof(port_buffers[0]) * new_ps);
>  		}
>  
>  		dev->data->ports = ports;
>  		dev->data->ports_dequeue_depth = ports_dequeue_depth;
>  		dev->data->ports_enqueue_depth = ports_enqueue_depth;
>  		dev->data->links_map = links_map;
> +		dev->data->port_buffers = port_buffers;
>  	} else if (dev->data->ports != NULL && nb_ports == 0) {
>  		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
>  
> diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
> index 778d6dc..3f24342 100644
> --- a/lib/librte_eventdev/rte_eventdev.h
> +++ b/lib/librte_eventdev/rte_eventdev.h
> @@ -246,6 +246,7 @@
>  #include <rte_dev.h>
>  #include <rte_memory.h>
>  #include <rte_errno.h>
> +#include <rte_memcpy.h>
>  
>  #define EVENTDEV_NAME_SKELETON_PMD event_skeleton
>  /**< Skeleton event device PMD name */
> @@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *port, struct rte_event ev[],
>  #define RTE_EVENTDEV_NAME_MAX_LEN	(64)
>  /**< @internal Max length of name of event PMD */
>  
> +#define RTE_EVENT_BUF_MAX 16
> +/**< Maximum number of events in an enqueue buffer. */
> +
> +/**
> + * @internal
> + * An enqueue buffer for each port.
> + *
> + * The reason this struct is in the header is for inlining the function calls
> + * to enqueue, as doing a function call per packet would incur significant
> + * performance overhead.
> + *
> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush()
> + */
> +struct rte_eventdev_enqueue_buffer {
> +	/**> Count of events in this buffer */
> +	uint16_t count;
> +	/**> Array of events in this buffer */
> +	struct rte_event events[RTE_EVENT_BUF_MAX];
> +} __rte_cache_aligned;
> +
>  /**
>   * @internal
>   * The data part, with no function pointers, associated with each device.
> @@ -983,6 +1004,8 @@ struct rte_eventdev_data {
>  	/**< Number of event ports. */
>  	void **ports;
>  	/**< Array of pointers to ports. */
> +	struct rte_eventdev_enqueue_buffer *port_buffers;
> +	/**< Array of port enqueue buffers. */
>  	uint8_t *ports_dequeue_depth;
>  	/**< Array of port dequeue depth. */
>  	uint8_t *ports_enqueue_depth;
> @@ -1132,6 +1155,89 @@ struct rte_eventdev {
>  }
>  
>  /**
> + * Flush the enqueue buffer of the event port specified by *port_id*, in the
> + * event device specified by *dev_id*.
> + *
> + * This function attempts to flush as many of the buffered events as possible,
> + * and returns the number of flushed events. Any unflushed events remain in
> + * the buffer.
> + *
> + * @param dev_id
> + *   The identifier of the device.
> + * @param port_id
> + *   The identifier of the event port.
> + *
> + * @return
> + *   The number of event objects actually flushed to the event device.
> + *
> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst()
> + * \see rte_event_port_enqueue_depth()
> + */
> +static inline int
> +rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id)
> +{
> +	struct rte_eventdev *dev = &rte_eventdevs[dev_id];
> +	struct rte_eventdev_enqueue_buffer *buf =
> +		&dev->data->port_buffers[port_id];
> +	int n;
> +
> +	n = rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->count);
> +
> +	if (n != buf->count)
> +		memmove(buf->events, &buf->events[n], buf->count - n);
> +
> +	buf->count -= n;
> +
> +	return n;
> +}
> +
> +/**
> + * Buffer an event object supplied in *rte_event* structure for future
> + * enqueueing on an event device designated by its *dev_id* through the event
> + * port specified by *port_id*.
> + *
> + * This function takes a single event and buffers it for later enqueuing to the
> + * queue specified in the event structure. If the buffer is full, the
> + * function will attempt to flush the buffer before buffering the event.
> + * If the flush operation fails, the previously buffered events remain in the
> + * buffer and an error is returned to the user to indicate that *ev* was not
> + * buffered.
> + *
> + * @param dev_id
> + *   The identifier of the device.
> + * @param port_id
> + *   The identifier of the event port.
> + * @param ev
> + *   Pointer to struct rte_event
> + *
> + * @return
> + *  - 0 on success
> + *  - <0 on failure. Failure can occur if the event port's output queue is
> + *     backpressured, for instance.
> + *
> + * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst()
> + * \see rte_event_port_enqueue_depth()
> + */
> +static inline int
> +rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_event *ev)
> +{
> +	struct rte_eventdev *dev = &rte_eventdevs[dev_id];
> +	struct rte_eventdev_enqueue_buffer *buf =
> +		&dev->data->port_buffers[port_id];
> +	int ret;
> +
> +	/* If necessary, flush the enqueue buffer to make space for ev. */
> +	if (buf->count == RTE_EVENT_BUF_MAX) {
> +		ret = rte_event_enqueue_buffer_flush(dev_id, port_id);
> +		if (ret == 0)
> +			return -ENOSPC;
> +	}
> +
> +	rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
> +	return 0;
> +}
> +
> +/**
>   * Converts nanoseconds to *wait* value for rte_event_dequeue()
>   *
>   * If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT flag then
> -- 
> 1.9.1
> 


More information about the dev mailing list