[dpdk-dev] [PATCH v2 4/5] eventdev: add interrupt driven queues to Rx adapter

Nikhil Rao nikhil.rao at intel.com
Wed Jun 27 12:55:37 CEST 2018


Add support for interrupt driven queues when eth device is
configured for rxq interrupts and servicing weight for the
queue is configured to be zero.

A interrupt driven packet received counter has been added to
rte_event_eth_rx_adapter_stats.

Signed-off-by: Nikhil Rao <nikhil.rao at intel.com>
---
 config/rte_config.h                                |   1 +
 lib/librte_eventdev/rte_event_eth_rx_adapter.h     |   5 +-
 lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 923 ++++++++++++++++++++-
 .../prog_guide/event_ethernet_rx_adapter.rst       |  24 +
 config/common_base                                 |   1 +
 lib/librte_eventdev/Makefile                       |   2 +-
 6 files changed, 927 insertions(+), 29 deletions(-)

diff --git a/config/rte_config.h b/config/rte_config.h
index a1d0175..ec88f14 100644
--- a/config/rte_config.h
+++ b/config/rte_config.h
@@ -64,6 +64,7 @@
 #define RTE_EVENT_MAX_DEVS 16
 #define RTE_EVENT_MAX_QUEUES_PER_DEV 64
 #define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32
+#define RTE_EVENT_ETH_INTR_RING_SIZE 1024
 #define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32
 
 /* rawdev defines */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 307b2b5..97f25e9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -64,8 +64,7 @@
  * the service function ID of the adapter in this case.
  *
  * Note:
- * 1) Interrupt driven receive queues are currently unimplemented.
- * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
+ * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
  *  should be added to a new instance of the rx adapter.
  */
 
@@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
 	 * block cycles can be used to compute the percentage of
 	 * cycles the service is blocked by the event device.
 	 */
+	uint64_t rx_intr_packets;
+	/**< Received packet count for interrupt mode Rx queues */
 };
 
 /**
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 8fe037f..62886c4 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -2,6 +2,8 @@
  * Copyright(c) 2017 Intel Corporation.
  * All rights reserved.
  */
+#include <unistd.h>
+#include <sys/epoll.h>
 #include <rte_cycles.h>
 #include <rte_common.h>
 #include <rte_dev.h>
@@ -11,6 +13,7 @@
 #include <rte_malloc.h>
 #include <rte_service_component.h>
 #include <rte_thash.h>
+#include <rte_interrupts.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -24,6 +27,22 @@
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
 
 #define RSS_KEY_SIZE	40
+/* value written to intr thread pipe to signal thread exit */
+#define ETH_BRIDGE_INTR_THREAD_EXIT	1
+/* Sentinel value to detect initialized file handle */
+#define INIT_FD		-1
+
+/*
+ * Used to store port and queue ID of interrupting Rx queue
+ */
+union queue_data {
+	RTE_STD_C11
+	void *ptr;
+	struct {
+		uint16_t port;
+		uint16_t queue;
+	};
+};
 
 /*
  * There is an instance of this struct per polled Rx queue added to the
@@ -75,6 +94,32 @@ struct rte_event_eth_rx_adapter {
 	uint16_t enq_block_count;
 	/* Block start ts */
 	uint64_t rx_enq_block_start_ts;
+	/* epoll fd used to wait for Rx interrupts */
+	int epd;
+	/* Num of interrupt driven interrupt queues */
+	uint32_t num_rx_intr;
+	/* Used to send <dev id, queue id> of interrupting Rx queues from
+	 * the interrupt thread to the Rx thread
+	 */
+	struct rte_ring *intr_ring;
+	/* Rx Queue data (dev id, queue id) for the last non-empty
+	 * queue polled
+	 */
+	union queue_data qd;
+	/* queue_data is valid */
+	int qd_valid;
+	/* Interrupt ring lock, synchronizes Rx thread
+	 * and interrupt thread
+	 */
+	rte_spinlock_t intr_ring_lock;
+	/* event array passed to rte_poll_wait */
+	struct rte_epoll_event *epoll_events;
+	/* Count of interrupt vectors in use */
+	uint32_t num_intr_vec;
+	/* Thread blocked on Rx interrupts */
+	pthread_t rx_intr_thread;
+	/* Stop thread flag */
+	uint8_t stop_thread;
 	/* Configuration callback for rte_service configuration */
 	rte_event_eth_rx_adapter_conf_cb conf_cb;
 	/* Configuration callback argument */
@@ -93,6 +138,8 @@ struct rte_event_eth_rx_adapter {
 	uint32_t service_id;
 	/* Adapter started flag */
 	uint8_t rxa_started;
+	/* Adapter ID */
+	uint8_t id;
 } __rte_cache_aligned;
 
 /* Per eth device */
@@ -111,19 +158,40 @@ struct eth_device_info {
 	uint8_t dev_rx_started;
 	/* Number of queues added for this device */
 	uint16_t nb_dev_queues;
-	/* If nb_rx_poll > 0, the start callback will
+	/* Number of poll based queues
+	 * If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
 	uint16_t nb_rx_poll;
+	/* Number of interrupt based queues
+	 * If nb_rx_intr > 0, the start callback will
+	 * be invoked if not already invoked.
+	 */
+	uint16_t nb_rx_intr;
+	/* Number of queues that use the shared interrupt */
+	uint16_t nb_shared_intr;
 	/* sum(wrr(q)) for all queues within the device
 	 * useful when deleting all device queues
 	 */
 	uint32_t wrr_len;
+	/* Intr based queue index to start polling from, this is used
+	 * if the number of shared interrupts is non-zero
+	 */
+	uint16_t next_q_idx;
+	/* Intr based queue indices */
+	uint16_t *intr_queue;
+	/* device generates per Rx queue interrupt for queue index
+	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
+	 */
+	int multi_intr_cap;
+	/* shared interrupt enabled */
+	int shared_intr_enabled;
 };
 
 /* Per Rx queue */
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
+	int intr_enabled;
 	uint16_t wt;		/* Polling weight */
 	uint8_t event_queue_id;	/* Event queue to enqueue packets to */
 	uint8_t sched_type;	/* Sched type for events */
@@ -150,7 +218,7 @@ struct eth_rx_queue_info {
 static inline int
 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
 {
-	return rx_adapter->num_rx_polled;
+	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
 }
 
 /* Greatest common divisor */
@@ -195,6 +263,28 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline int
+rxa_shared_intr(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	int multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
+	return !multi_intr_cap ||
+		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
+}
+
+static inline int
+rxa_intr_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return dev_info->rx_queue &&
+		!dev_info->internal_event_port &&
+		queue_info->queue_enabled && queue_info->wt == 0;
+}
+
+static inline int
 rxa_polled_queue(struct eth_device_info *dev_info,
 	int rx_queue_id)
 {
@@ -206,6 +296,95 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->queue_enabled && queue_info->wt != 0;
 }
 
+/* Calculate change in number of vectors after Rx queue ID is add/deleted */
+static int
+rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
+{
+	uint16_t i;
+	int n, s;
+	uint16_t nbq;
+
+	nbq = dev_info->dev->data->nb_rx_queues;
+	n = 0; /* non shared count */
+	s = 0; /* shared count */
+
+	if (rx_queue_id == -1) {
+		for (i = 0; i < nbq; i++) {
+			if (!rxa_shared_intr(dev_info, i))
+				n += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+			else
+				s += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+		}
+
+		if (s > 0) {
+			if ((add && dev_info->nb_shared_intr == 0) ||
+				(!add && dev_info->nb_shared_intr))
+				n += 1;
+		}
+	} else {
+		if (!rxa_shared_intr(dev_info, rx_queue_id))
+			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
+				rxa_intr_queue(dev_info, rx_queue_id);
+		else
+			n = add ? !dev_info->nb_shared_intr :
+				dev_info->nb_shared_intr == 1;
+	}
+
+	return add ? n : -n;
+}
+
+/* Calculate nb_rx_intr after deleting interrupt mode rx queues
+ */
+static void
+rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_intr)
+{
+	uint32_t intr_diff;
+
+	if (rx_queue_id == -1)
+		intr_diff = dev_info->nb_rx_intr;
+	else
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
+
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
+}
+
+/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
+ * interrupt queues could currently be poll mode Rx queues
+ */
+static void
+rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
+			uint32_t *nb_wrr)
+{
+	uint32_t intr_diff;
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		intr_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_intr;
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
 /* Calculate size of the eth_rx_poll and wrr_sched arrays
  * after deleting poll mode rx queues
  */
@@ -240,17 +419,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			int rx_queue_id,
 			uint16_t wt,
 			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
 			uint32_t *nb_wrr)
 {
+	uint32_t intr_diff;
 	uint32_t poll_diff;
 	uint32_t wrr_len_diff;
 
 	if (rx_queue_id == -1) {
+		intr_diff = dev_info->nb_rx_intr;
 		poll_diff = dev_info->dev->data->nb_rx_queues -
 						dev_info->nb_rx_poll;
 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
 				- dev_info->wrr_len;
 	} else {
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
 				wt - dev_info->rx_queue[rx_queue_id].wt :
@@ -258,6 +441,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
 }
 
@@ -268,10 +452,15 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		int rx_queue_id,
 		uint16_t wt,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
-	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
-				wt, nb_rx_poll, nb_wrr);
+	if (wt != 0)
+		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
+	else
+		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
+					nb_rx_poll, nb_rx_intr, nb_wrr);
 }
 
 /* Calculate nb_rx_* after deleting rx_queue_id */
@@ -280,10 +469,13 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		struct eth_device_info *dev_info,
 		int rx_queue_id,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
 				nb_wrr);
+	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
+				nb_rx_intr);
 }
 
 /*
@@ -622,7 +814,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t port_id,
 	uint16_t queue_id,
 	uint32_t rx_count,
-	uint32_t max_rx)
+	uint32_t max_rx,
+	int *rxq_empty)
 {
 	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf =
@@ -632,6 +825,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t n;
 	uint32_t nb_rx = 0;
 
+	if (rxq_empty)
+		*rxq_empty = 0;
 	/* Don't do a batch dequeue from the rx queue if there isn't
 	 * enough space in the enqueue buffer.
 	 */
@@ -641,8 +836,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 
 		stats->rx_poll_count++;
 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
-		if (unlikely(!n))
+		if (unlikely(!n)) {
+			if (rxq_empty)
+				*rxq_empty = 1;
 			break;
+		}
 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
 		nb_rx += n;
 		if (rx_count + nb_rx > max_rx)
@@ -655,6 +853,228 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	return nb_rx;
 }
 
+static inline void
+rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
+		void *data)
+{
+	uint16_t port_id;
+	uint16_t queue;
+	int err;
+	union queue_data qd;
+	struct eth_device_info *dev_info;
+	struct eth_rx_queue_info *queue_info;
+	int *intr_enabled;
+
+	qd.ptr = data;
+	port_id = qd.port;
+	queue = qd.queue;
+
+	dev_info = &rx_adapter->eth_devices[port_id];
+	queue_info = &dev_info->rx_queue[queue];
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+	if (rxa_shared_intr(dev_info, queue))
+		intr_enabled = &dev_info->shared_intr_enabled;
+	else
+		intr_enabled = &queue_info->intr_enabled;
+
+	if (*intr_enabled) {
+		*intr_enabled = 0;
+		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
+		/* Entry should always be available.
+		 * The ring size equals the maximum number of interrupt
+		 * vectors supported (an interrupt vector is shared in
+		 * case of shared interrupts)
+		 */
+		if (err)
+			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
+				" to ring: %s", strerror(err));
+		else
+			rte_eth_dev_rx_intr_disable(port_id, queue);
+	}
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+static int
+rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
+			uint32_t num_intr_vec)
+{
+	if (rx_adapter->num_intr_vec + num_intr_vec >
+				RTE_EVENT_ETH_INTR_RING_SIZE) {
+		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
+		" %d needed %d limit %d", rx_adapter->num_intr_vec,
+		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+/* Delete entries for (dev, queue) from the interrupt ring */
+static void
+rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			uint16_t rx_queue_id)
+{
+	int i, n;
+	union queue_data qd;
+
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+
+	n = rte_ring_count(rx_adapter->intr_ring);
+	for (i = 0; i < n; i++) {
+		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
+			if (qd.port == dev_info->dev->data->port_id &&
+				qd.queue == rx_queue_id)
+				continue;
+		} else {
+			if (qd.port == dev_info->dev->data->port_id)
+				continue;
+		}
+		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
+	}
+
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+/* pthread callback handling interrupt mode receive queues
+ * After receiving an Rx interrupt, it enqueues the port id and queue id of the
+ * interrupting queue to the adapter's ring buffer for interrupt events.
+ * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
+ * the adapter service function.
+ */
+static void *
+rxa_intr_thread(void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
+	int n, i;
+
+	while (1) {
+		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
+				RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
+		if (unlikely(n < 0))
+			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
+					n);
+		for (i = 0; i < n; i++) {
+			rxa_intr_ring_enqueue(rx_adapter,
+					epoll_events[i].epdata.data);
+		}
+	}
+
+	return NULL;
+}
+
+/* Dequeue <port, q> from interrupt ring and enqueue received
+ * mbufs to eventdev
+ */
+static inline uint32_t
+rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	uint32_t n;
+	uint32_t nb_rx = 0;
+	int rxq_empty;
+	struct rte_eth_event_enqueue_buffer *buf;
+	rte_spinlock_t *ring_lock;
+	uint8_t max_done = 0;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	if (rte_ring_count(rx_adapter->intr_ring) == 0
+		&& !rx_adapter->qd_valid)
+		return 0;
+
+	buf = &rx_adapter->event_enqueue_buffer;
+	ring_lock = &rx_adapter->intr_ring_lock;
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		struct eth_device_info *dev_info;
+		uint16_t port;
+		uint16_t queue;
+		union queue_data qd  = rx_adapter->qd;
+		int err;
+
+		if (!rx_adapter->qd_valid) {
+			struct eth_rx_queue_info *queue_info;
+
+			rte_spinlock_lock(ring_lock);
+			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+			if (err) {
+				rte_spinlock_unlock(ring_lock);
+				break;
+			}
+
+			port = qd.port;
+			queue = qd.queue;
+			rx_adapter->qd = qd;
+			rx_adapter->qd_valid = 1;
+			dev_info = &rx_adapter->eth_devices[port];
+			if (rxa_shared_intr(dev_info, queue))
+				dev_info->shared_intr_enabled = 1;
+			else {
+				queue_info = &dev_info->rx_queue[queue];
+				queue_info->intr_enabled = 1;
+			}
+			rte_eth_dev_rx_intr_enable(port, queue);
+			rte_spinlock_unlock(ring_lock);
+		} else {
+			port = qd.port;
+			queue = qd.queue;
+
+			dev_info = &rx_adapter->eth_devices[port];
+		}
+
+		if (rxa_shared_intr(dev_info, queue)) {
+			uint16_t i;
+			uint16_t nb_queues;
+
+			nb_queues = dev_info->dev->data->nb_rx_queues;
+			n = 0;
+			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
+				uint8_t enq_buffer_full;
+
+				if (!rxa_intr_queue(dev_info, i))
+					continue;
+				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
+					rx_adapter->max_nb_rx,
+					&rxq_empty);
+				nb_rx += n;
+
+				enq_buffer_full = !rxq_empty && n == 0;
+				max_done = nb_rx > rx_adapter->max_nb_rx;
+
+				if (enq_buffer_full || max_done) {
+					dev_info->next_q_idx = i;
+					goto done;
+				}
+			}
+
+			rx_adapter->qd_valid = 0;
+
+			/* Reinitialize for next interrupt */
+			dev_info->next_q_idx = dev_info->multi_intr_cap ?
+						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
+						0;
+		} else {
+			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
+				rx_adapter->max_nb_rx,
+				&rxq_empty);
+			rx_adapter->qd_valid = !rxq_empty;
+			nb_rx += n;
+			if (nb_rx > rx_adapter->max_nb_rx)
+				break;
+		}
+	}
+
+done:
+	rx_adapter->stats.rx_intr_packets += nb_rx;
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -668,7 +1088,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
  * the hypervisor's switching layer where adjustments can be made to deal with
  * it.
  */
-static inline void
+static inline uint32_t
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
@@ -676,7 +1096,6 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
-	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
@@ -696,10 +1115,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			break;
+			return nb_rx;
 		}
 
-		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
+				NULL);
 		if (nb_rx > max_nb_rx) {
 			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
@@ -709,14 +1129,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
-
-	stats->rx_packets += nb_rx;
+	return nb_rx;
 }
 
 static int
 rxa_service_func(void *args)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter = args;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
 		return 0;
@@ -724,7 +1144,10 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		return 0;
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 	}
-	rxa_poll(rx_adapter);
+
+	stats = &rx_adapter->stats;
+	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
+	stats->rx_packets += rxa_poll(rx_adapter);
 	rte_spinlock_unlock(&rx_adapter->rx_lock);
 	return 0;
 }
@@ -809,6 +1232,339 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	if (rx_adapter->epd != INIT_FD)
+		return 0;
+
+	rx_adapter->epd = epoll_create1(EPOLL_CLOEXEC);
+	if (rx_adapter->epd < 0) {
+		rx_adapter->epd = INIT_FD;
+		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", errno);
+		return -errno;
+	}
+
+	return 0;
+}
+
+static int
+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	if (rx_adapter->intr_ring)
+		return 0;
+
+	rx_adapter->intr_ring = rte_ring_create("intr_ring",
+					RTE_EVENT_ETH_INTR_RING_SIZE,
+					rte_socket_id(), 0);
+	if (!rx_adapter->intr_ring)
+		return -ENOMEM;
+
+	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
+					(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *
+					sizeof(struct rte_epoll_event),
+					RTE_CACHE_LINE_SIZE,
+					rx_adapter->socket_id);
+	if (!rx_adapter->epoll_events) {
+		err = -ENOMEM;
+		goto error;
+	}
+
+	rte_spinlock_init(&rx_adapter->intr_ring_lock);
+
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+			"rx-intr-thread-%d", rx_adapter->id);
+
+	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
+				NULL, rxa_intr_thread, rx_adapter);
+	if (!err) {
+		rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
+		return 0;
+	}
+
+	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
+error:
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return err;
+}
+
+static int
+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+
+	rx_adapter->stop_thread = 1;
+	err = pthread_cancel(rx_adapter->rx_intr_thread);
+	if (err)
+		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
+				err);
+
+	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+	if (err)
+		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
+
+	rx_adapter->stop_thread = 0;
+	rte_free(rx_adapter->epoll_events);
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return 0;
+}
+
+static int
+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int ret;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	ret = rxa_destroy_intr_thread(rx_adapter);
+	if (ret)
+		return ret;
+
+	close(rx_adapter->epd);
+	rx_adapter->epd = INIT_FD;
+
+	return ret;
+}
+
+static int
+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
+			rx_queue_id);
+		return err;
+	}
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err)
+		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
+
+	if (sintr)
+		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
+	else
+		dev_info->shared_intr_enabled = 0;
+	return err;
+}
+
+static int
+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id)
+{
+	int err;
+	int i;
+	int s;
+
+	if (dev_info->nb_rx_intr == 0)
+		return 0;
+
+	err = 0;
+	if (rx_queue_id == -1) {
+		s = dev_info->nb_shared_intr;
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			int sintr;
+			uint16_t q;
+
+			q = dev_info->intr_queue[i];
+			sintr = rxa_shared_intr(dev_info, q);
+			s -= sintr;
+
+			if (!sintr || s == 0) {
+
+				err = rxa_disable_intr(rx_adapter, dev_info,
+						q);
+				if (err)
+					return err;
+				rxa_intr_ring_del_entries(rx_adapter, dev_info,
+							q);
+			}
+		}
+	} else {
+		if (!rxa_intr_queue(dev_info, rx_queue_id))
+			return 0;
+		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
+				dev_info->nb_shared_intr == 1) {
+			err = rxa_disable_intr(rx_adapter, dev_info,
+					rx_queue_id);
+			if (err)
+				return err;
+			rxa_intr_ring_del_entries(rx_adapter, dev_info,
+						rx_queue_id);
+		}
+
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			if (dev_info->intr_queue[i] == rx_queue_id) {
+				for (; i < dev_info->nb_rx_intr - 1; i++)
+					dev_info->intr_queue[i] =
+						dev_info->intr_queue[i + 1];
+				break;
+			}
+		}
+	}
+
+	return err;
+}
+
+static int
+rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err, err1;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	union queue_data qd;
+	int init_fd;
+	uint16_t *intr_queue;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	if (rxa_intr_queue(dev_info, rx_queue_id))
+		return 0;
+
+	intr_queue = dev_info->intr_queue;
+	if (dev_info->intr_queue == NULL) {
+		size_t len =
+			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
+		dev_info->intr_queue =
+			rte_zmalloc_socket(
+				rx_adapter->mem_name,
+				len,
+				0,
+				rx_adapter->socket_id);
+		if (dev_info->intr_queue == NULL)
+			return -ENOMEM;
+	}
+
+	init_fd = rx_adapter->epd;
+	err = rxa_init_epd(rx_adapter);
+	if (err)
+		goto err_free_queue;
+
+	qd.port = eth_dev_id;
+	qd.queue = rx_queue_id;
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_ADD,
+					qd.ptr);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
+			" Rx Queue %u err %d", rx_queue_id, err);
+		goto err_del_fd;
+	}
+
+	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+
+		goto err_del_event;
+	}
+
+	err = rxa_create_intr_thread(rx_adapter);
+	if (!err)  {
+		if (sintr)
+			dev_info->shared_intr_enabled = 1;
+		else
+			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
+		return 0;
+	}
+
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err)
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+err_del_event:
+	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err1) {
+		RTE_EDEV_LOG_ERR("Could not delete event for"
+				" Rx Queue %u err %d", rx_queue_id, err1);
+	}
+err_del_fd:
+	if (init_fd == INIT_FD) {
+		close(rx_adapter->epd);
+		rx_adapter->epd = -1;
+	}
+err_free_queue:
+	if (intr_queue == NULL)
+		rte_free(dev_info->intr_queue);
+
+	return err;
+}
+
+static int
+rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	int rx_queue_id)
+
+{
+	int i, j, err;
+	int si = -1;
+	int shared_done = (dev_info->nb_shared_intr > 0);
+
+	if (rx_queue_id != -1) {
+		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
+			return 0;
+		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
+	}
+
+	err = 0;
+	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
+
+		if (rxa_shared_intr(dev_info, i) && shared_done)
+			continue;
+
+		err = rxa_config_intr(rx_adapter, dev_info, i);
+
+		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
+		if (shared_done) {
+			si = i;
+			dev_info->shared_intr_enabled = 1;
+		}
+		if (err)
+			break;
+	}
+
+	if (err == 0)
+		return 0;
+
+	shared_done = (dev_info->nb_shared_intr > 0);
+	for (j = 0; j < i; j++) {
+		if (rxa_intr_queue(dev_info, j))
+			continue;
+		if (rxa_shared_intr(dev_info, j) && si != j)
+			continue;
+		err = rxa_disable_intr(rx_adapter, dev_info, j);
+		if (err)
+			break;
+
+	}
+
+	return err;
+}
+
+
+static int
 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
 {
 	int ret;
@@ -843,6 +1599,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
 	rx_adapter->service_inited = 1;
+	rx_adapter->epd = INIT_FD;
 	return 0;
 
 err_done:
@@ -886,6 +1643,9 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	int32_t rx_queue_id)
 {
 	int pollq;
+	int intrq;
+	int sintrq;
+
 
 	if (rx_adapter->nb_queues == 0)
 		return;
@@ -901,9 +1661,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
 	rx_adapter->num_rx_polled -= pollq;
 	dev_info->nb_rx_poll -= pollq;
+	rx_adapter->num_rx_intr -= intrq;
+	dev_info->nb_rx_intr -= intrq;
+	dev_info->nb_shared_intr -= intrq && sintrq;
 }
 
 static void
@@ -915,6 +1680,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
 	int pollq;
+	int intrq;
+	int sintrq;
 
 	if (rx_queue_id == -1) {
 		uint16_t nb_rx_queues;
@@ -927,6 +1694,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -944,6 +1713,24 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
 		dev_info->nb_rx_poll += !pollq;
+		rx_adapter->num_rx_intr -= intrq;
+		dev_info->nb_rx_intr -= intrq;
+		dev_info->nb_shared_intr -= intrq && sintrq;
+	}
+
+	if (rxa_intr_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled -= pollq;
+		dev_info->nb_rx_poll -= pollq;
+		rx_adapter->num_rx_intr += !intrq;
+		dev_info->nb_rx_intr += !intrq;
+		dev_info->nb_shared_intr += !intrq && sintrq;
+		if (dev_info->nb_shared_intr == 1) {
+			if (dev_info->multi_intr_cap)
+				dev_info->next_q_idx =
+					RTE_MAX_RXTX_INTR_VEC_ID - 1;
+			else
+				dev_info->next_q_idx = 0;
+		}
 	}
 }
 
@@ -960,24 +1747,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t *rx_wrr;
 	uint16_t nb_rx_queues;
 	uint32_t nb_rx_poll, nb_wrr;
+	uint32_t nb_rx_intr;
+	int num_intr_vec;
+	uint16_t wt;
 
 	if (queue_conf->servicing_weight == 0) {
-
 		struct rte_eth_dev_data *data = dev_info->dev->data;
-		if (data->dev_conf.intr_conf.rxq) {
-			RTE_EDEV_LOG_ERR("Interrupt driven queues"
-					" not supported");
-			return -ENOTSUP;
-		}
-		temp_conf = *queue_conf;
 
-		/* If Rx interrupts are disabled set wt = 1 */
-		temp_conf.servicing_weight = 1;
+		temp_conf = *queue_conf;
+		if (!data->dev_conf.intr_conf.rxq) {
+			/* If Rx interrupts are disabled set wt = 1 */
+			temp_conf.servicing_weight = 1;
+		}
 		queue_conf = &temp_conf;
 	}
 
 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
 	rx_queue = dev_info->rx_queue;
+	wt = queue_conf->servicing_weight;
 
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
@@ -993,13 +1780,64 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
 			queue_conf->servicing_weight,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
+	dev_info->multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
 
 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 				&rx_poll, &rx_wrr);
 	if (ret)
 		goto err_free_rxqueue;
 
+	if (wt == 0) {
+		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
+
+		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
+		if (ret)
+			goto err_free_rxqueue;
+
+		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
+		if (ret)
+			goto err_free_rxqueue;
+	} else {
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			/* interrupt based queues are being converted to
+			 * poll mode queues, delete the interrupt configuration
+			 * for those.
+			 */
+			ret = rxa_del_intr_queue(rx_adapter,
+						dev_info, rx_queue_id);
+			if (ret)
+				goto err_free_rxqueue;
+		}
+	}
+
+	if (nb_rx_intr == 0) {
+		ret = rxa_free_intr_resources(rx_adapter);
+		if (ret)
+			goto err_free_rxqueue;
+	}
+
+	if (wt == 0) {
+		uint16_t i;
+
+		if (rx_queue_id  == -1) {
+			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+				dev_info->intr_queue[i] = i;
+		} else {
+			if (!rxa_intr_queue(dev_info, rx_queue_id))
+				dev_info->intr_queue[nb_rx_intr - 1] =
+					rx_queue_id;
+		}
+	}
+
+
+
 	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
@@ -1009,6 +1847,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	rx_adapter->eth_rx_poll = rx_poll;
 	rx_adapter->wrr_sched = rx_wrr;
 	rx_adapter->wrr_len = nb_wrr;
+	rx_adapter->num_intr_vec += num_intr_vec;
 	return 0;
 
 err_free_rxqueue:
@@ -1119,6 +1958,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	rx_adapter->socket_id = socket_id;
 	rx_adapter->conf_cb = conf_cb;
 	rx_adapter->conf_arg = conf_arg;
+	rx_adapter->id = id;
 	strcpy(rx_adapter->mem_name, mem_name);
 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
 					/* FIXME: incompatible with hotplug */
@@ -1302,8 +2142,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t cap;
 	uint32_t nb_rx_poll = 0;
 	uint32_t nb_wrr = 0;
+	uint32_t nb_rx_intr;
 	struct eth_rx_poll_entry *rx_poll = NULL;
 	uint32_t *rx_wrr = NULL;
+	int num_intr_vec;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1346,29 +2188,59 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		}
 	} else {
 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 			&rx_poll, &rx_wrr);
 		if (ret)
 			return ret;
 
 		rte_spinlock_lock(&rx_adapter->rx_lock);
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			ret = rxa_del_intr_queue(rx_adapter, dev_info,
+					rx_queue_id);
+			if (ret)
+				goto unlock_ret;
+		}
+
+		if (nb_rx_intr == 0) {
+			ret = rxa_free_intr_resources(rx_adapter);
+			if (ret)
+				goto unlock_ret;
+		}
+
 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
 		rte_free(rx_adapter->eth_rx_poll);
 		rte_free(rx_adapter->wrr_sched);
 
+		if (nb_rx_intr == 0) {
+			rte_free(dev_info->intr_queue);
+			dev_info->intr_queue = NULL;
+		}
+
 		rx_adapter->eth_rx_poll = rx_poll;
-		rx_adapter->num_rx_polled = nb_rx_poll;
 		rx_adapter->wrr_sched = rx_wrr;
 		rx_adapter->wrr_len = nb_wrr;
+		rx_adapter->num_intr_vec += num_intr_vec;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
+unlock_ret:
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+		if (ret) {
+			rte_free(rx_poll);
+			rte_free(rx_wrr);
+			return ret;
+		}
 
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
@@ -1377,7 +2249,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	return ret;
 }
 
-
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
index 319e4f0..2f055ec 100644
--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
@@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks
 if the callback is supported, and the counts maintained by the service function,
 if one exists. The service function also maintains a count of cycles for which
 it was not able to enqueue to the event device.
+
+Interrupt Based Rx Queues
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The service core function is typically set up to poll ethernet Rx queues for
+packets. Certain queues may have low packet rates and it would be more
+efficient to enable the Rx queue interrupt and read packets after receiving
+the interrupt.
+
+The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf
+is applicable when the adapter uses a service core function. The application
+has to enable Rx queue interrupts when configuring the ethernet device
+uing the ``rte_eth_dev_configue()`` function and then use a servicing_weight
+of zero when addding the Rx queue to the adapter.
+
+The adapter creates a thread blocked on the interrupt, on an interrupt this
+thread enqueues the port id and the queue id to a ring buffer. The adapter
+service function dequeues the port id and queue id from the ring buffer,
+invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and
+converts the received packets to events in the same manner as packets
+received on a polled Rx queue. The interrupt thread is affinitized to the same
+CPUs as the lcores of the Rx adapter service function, if the Rx adapter
+service function has not been mapped to any lcores, the interrupt thread
+is mapped to the master lcore.
diff --git a/config/common_base b/config/common_base
index fcf3a1f..3cb5edd 100644
--- a/config/common_base
+++ b/config/common_base
@@ -597,6 +597,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n
 CONFIG_RTE_EVENT_MAX_DEVS=16
 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64
 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32
+CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024
 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32
 
 #
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index b3e2546..24af956 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -8,7 +8,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 LIB = librte_eventdev.a
 
 # library version
-LIBABIVER := 4
+LIBABIVER := 5
 
 # build flags
 CFLAGS += -DALLOW_EXPERIMENTAL_API
-- 
1.8.3.1



More information about the dev mailing list