[dpdk-dev] [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del

Nikhil Rao nikhil.rao at intel.com
Fri Jun 8 20:15:15 CEST 2018


The new WRR sequence applicable after queue add/del is set
up after setting the new queue state, so a memory allocation
failure will leave behind an incorrect state.

This change separates the memory sizing + allocation for the
Rx poll and WRR array from calculation of the WRR sequence.
If there is a memory allocation failure, existing Rx queue
configuration remains unchanged.

Signed-off-by: Nikhil Rao <nikhil.rao at intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 413 ++++++++++++++++++-------
 1 file changed, 299 insertions(+), 114 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 9361d48..c8db11b 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -109,10 +109,16 @@ struct eth_device_info {
 	 * rx_adapter_stop callback needs to be invoked
 	 */
 	uint8_t dev_rx_started;
-	/* If nb_dev_queues > 0, the start callback will
+	/* Number of queues added for this device */
+	uint16_t nb_dev_queues;
+	/* If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
-	uint16_t nb_dev_queues;
+	uint16_t nb_rx_poll;
+	/* sum(wrr(q)) for all queues within the device
+	 * useful when deleting all device queues
+	 */
+	uint32_t wrr_len;
 };
 
 /* Per Rx queue */
@@ -188,13 +194,170 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static inline int
+rxa_polled_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return !dev_info->internal_event_port &&
+		dev_info->rx_queue &&
+		queue_info->queue_enabled && queue_info->wt != 0;
+}
+
+/* Calculate size of the eth_rx_poll and wrr_sched arrays
+ * after deleting poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint16_t wt,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_poll;
+		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
+				- dev_info->wrr_len;
+	} else {
+		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
+				wt - dev_info->rx_queue[rx_queue_id].wt :
+				wt;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding rx_queue_id */
+static void
+rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint16_t wt,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+				wt, nb_rx_poll, nb_wrr);
+}
+
+/* Calculate nb_rx_* after deleting rx_queue_id */
+static void
+rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
+				nb_wrr);
+}
+
+/*
+ * Allocate the rx_poll array
+ */
+static struct eth_rx_poll_entry *
+rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint32_t num_rx_polled)
+{
+	size_t len;
+
+	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
+							RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
+/*
+ * Allocate the WRR array
+ */
+static uint32_t *
+rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
+{
+	size_t len;
+
+	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
+			RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
 static int
-rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint32_t nb_poll,
+		uint32_t nb_wrr,
+		struct eth_rx_poll_entry **rx_poll,
+		uint32_t **wrr_sched)
+{
+
+	if (nb_poll == 0) {
+		*rx_poll = NULL;
+		*wrr_sched = NULL;
+		return 0;
+	}
+
+	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
+	if (*rx_poll == NULL) {
+		*wrr_sched = NULL;
+		return -ENOMEM;
+	}
+
+	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
+	if (*wrr_sched == NULL) {
+		rte_free(*rx_poll);
+		return -ENOMEM;
+	}
+	return 0;
+}
+
+/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static void
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_rx_poll_entry *rx_poll,
+		uint32_t *rx_wrr)
 {
 	uint16_t d;
 	uint16_t q;
 	unsigned int i;
+	int prev = -1;
+	int cw = -1;
 
 	/* Initialize variables for calculation of wrr schedule */
 	uint16_t max_wrr_pos = 0;
@@ -202,77 +365,48 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t max_wt = 0;
 	uint16_t gcd = 0;
 
-	struct eth_rx_poll_entry *rx_poll = NULL;
-	uint32_t *rx_wrr = NULL;
+	if (rx_poll == NULL)
+		return;
 
-	if (rx_adapter->num_rx_polled) {
-		size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
-				sizeof(*rx_adapter->eth_rx_poll),
-				RTE_CACHE_LINE_SIZE);
-		rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
-					     len,
-					     RTE_CACHE_LINE_SIZE,
-					     rx_adapter->socket_id);
-		if (rx_poll == NULL)
-			return -ENOMEM;
+	/* Generate array of all queues to poll, the size of this
+	 * array is poll_q
+	 */
+	RTE_ETH_FOREACH_DEV(d) {
+		uint16_t nb_rx_queues;
+		struct eth_device_info *dev_info =
+				&rx_adapter->eth_devices[d];
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		if (dev_info->rx_queue == NULL)
+			continue;
+		if (dev_info->internal_event_port)
+			continue;
+		dev_info->wrr_len = 0;
+		for (q = 0; q < nb_rx_queues; q++) {
+			struct eth_rx_queue_info *queue_info =
+				&dev_info->rx_queue[q];
+			uint16_t wt;
 
-		/* Generate array of all queues to poll, the size of this
-		 * array is poll_q
-		 */
-		RTE_ETH_FOREACH_DEV(d) {
-			uint16_t nb_rx_queues;
-			struct eth_device_info *dev_info =
-					&rx_adapter->eth_devices[d];
-			nb_rx_queues = dev_info->dev->data->nb_rx_queues;
-			if (dev_info->rx_queue == NULL)
+			if (!rxa_polled_queue(dev_info, q))
 				continue;
-			if (dev_info->internal_event_port)
-				continue;
-			for (q = 0; q < nb_rx_queues; q++) {
-				struct eth_rx_queue_info *queue_info =
-					&dev_info->rx_queue[q];
-				if (queue_info->queue_enabled == 0)
-					continue;
-
-				uint16_t wt = queue_info->wt;
-				rx_poll[poll_q].eth_dev_id = d;
-				rx_poll[poll_q].eth_rx_qid = q;
-				max_wrr_pos += wt;
-				max_wt = RTE_MAX(max_wt, wt);
-				gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
-				poll_q++;
-			}
-		}
-
-		len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
-				RTE_CACHE_LINE_SIZE);
-		rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
-					    len,
-					    RTE_CACHE_LINE_SIZE,
-					    rx_adapter->socket_id);
-		if (rx_wrr == NULL) {
-			rte_free(rx_poll);
-			return -ENOMEM;
-		}
-
-		/* Generate polling sequence based on weights */
-		int prev = -1;
-		int cw = -1;
-		for (i = 0; i < max_wrr_pos; i++) {
-			rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
-					     rx_poll, max_wt, gcd, prev);
-			prev = rx_wrr[i];
+			wt = queue_info->wt;
+			rx_poll[poll_q].eth_dev_id = d;
+			rx_poll[poll_q].eth_rx_qid = q;
+			max_wrr_pos += wt;
+			dev_info->wrr_len += wt;
+			max_wt = RTE_MAX(max_wt, wt);
+			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
+			poll_q++;
 		}
 	}
 
-	rte_free(rx_adapter->eth_rx_poll);
-	rte_free(rx_adapter->wrr_sched);
-
-	rx_adapter->eth_rx_poll = rx_poll;
-	rx_adapter->wrr_sched = rx_wrr;
-	rx_adapter->wrr_len = max_wrr_pos;
-
-	return 0;
+	/* Generate polling sequence based on weights */
+	prev = -1;
+	cw = -1;
+	for (i = 0; i < max_wrr_pos; i++) {
+		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
+				     rx_poll, max_wt, gcd, prev);
+		prev = rx_wrr[i];
+	}
 }
 
 static inline void
@@ -719,31 +853,53 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-static int
+static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id)
+	int32_t rx_queue_id)
 {
-	struct eth_rx_queue_info *queue_info;
+	int pollq;
 
 	if (rx_adapter->nb_queues == 0)
-		return 0;
+		return;
 
-	queue_info = &dev_info->rx_queue[rx_queue_id];
-	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_sw_del(rx_adapter, dev_info, i);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
-	return 0;
+	rx_adapter->num_rx_polled -= pollq;
+	dev_info->nb_rx_poll -= pollq;
 }
 
 static void
 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id,
+	int32_t rx_queue_id,
 	const struct rte_event_eth_rx_adapter_queue_conf *conf)
-
 {
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
+	int pollq;
+
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_add_queue(rx_adapter, dev_info, i, conf);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -757,9 +913,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->flow_id_mask = ~0;
 	}
 
-	/* The same queue can be added more than once */
-	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
+	if (rxa_polled_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled += !pollq;
+		dev_info->nb_rx_poll += !pollq;
+	}
 }
 
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
@@ -769,8 +927,12 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 {
 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
-	uint32_t i;
 	int ret;
+	struct eth_rx_poll_entry *rx_poll;
+	struct eth_rx_queue_info *rx_queue;
+	uint32_t *rx_wrr;
+	uint16_t nb_rx_queues;
+	uint32_t nb_rx_poll, nb_wrr;
 
 	if (queue_conf->servicing_weight == 0) {
 
@@ -787,31 +949,51 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		queue_conf = &temp_conf;
 	}
 
+	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+	rx_queue = dev_info->rx_queue;
+
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
 		    rte_zmalloc_socket(rx_adapter->mem_name,
-				       dev_info->dev->data->nb_rx_queues *
+				       nb_rx_queues *
 				       sizeof(struct eth_rx_queue_info), 0,
 				       rx_adapter->socket_id);
 		if (dev_info->rx_queue == NULL)
 			return -ENOMEM;
 	}
+	rx_wrr = NULL;
+	rx_poll = NULL;
 
-	if (rx_queue_id == -1) {
-		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
-	} else {
-		rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
-			queue_conf);
-	}
+	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
+			queue_conf->servicing_weight,
+			&nb_rx_poll, &nb_wrr);
 
-	ret = rxa_calc_wrr_sequence(rx_adapter);
-	if (ret) {
-		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
-		return ret;
+	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+				&rx_poll, &rx_wrr);
+	if (ret)
+		goto err_free_rxqueue;
+
+	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
+	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+	rte_free(rx_adapter->eth_rx_poll);
+	rte_free(rx_adapter->wrr_sched);
+
+	rx_adapter->eth_rx_poll = rx_poll;
+	rx_adapter->wrr_sched = rx_wrr;
+	rx_adapter->wrr_len = nb_wrr;
+	return 0;
+
+err_free_rxqueue:
+	if (rx_queue == NULL) {
+		rte_free(dev_info->rx_queue);
+		dev_info->rx_queue = NULL;
 	}
 
-	return ret;
+	rte_free(rx_poll);
+	rte_free(rx_wrr);
+
+	return 0;
 }
 
 static int
@@ -995,7 +1177,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
-	int start_service;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1038,7 +1219,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		return -EINVAL;
 	}
 
-	start_service = 0;
 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
 
 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
@@ -1072,16 +1252,13 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
 					queue_conf);
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
-		if (ret == 0)
-			start_service =
-				!!rxa_sw_adapter_queue_count(rx_adapter);
 	}
 
 	if (ret)
 		return ret;
 
-	if (start_service)
-		rte_service_component_runstate_set(rx_adapter->service_id, 1);
+	rte_service_component_runstate_set(rx_adapter->service_id,
+				rxa_sw_adapter_queue_count(rx_adapter));
 
 	return 0;
 }
@@ -1095,7 +1272,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct eth_device_info *dev_info;
 	uint32_t cap;
-	uint16_t i;
+	uint32_t nb_rx_poll = 0;
+	uint32_t nb_wrr = 0;
+	struct eth_rx_poll_entry *rx_poll = NULL;
+	uint32_t *rx_wrr = NULL;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1137,26 +1317,31 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 			}
 		}
 	} else {
-		int rc;
+		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
+			&nb_rx_poll, &nb_wrr);
+		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+			&rx_poll, &rx_wrr);
+		if (ret)
+			return ret;
+
 		rte_spinlock_lock(&rx_adapter->rx_lock);
-		if (rx_queue_id == -1) {
-			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-				rxa_sw_del(rx_adapter, dev_info, i);
-		} else {
-			rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
-		}
+		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
+		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+		rte_free(rx_adapter->eth_rx_poll);
+		rte_free(rx_adapter->wrr_sched);
 
-		rc = rxa_calc_wrr_sequence(rx_adapter);
-		if (rc)
-			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
-					rc);
+		rx_adapter->eth_rx_poll = rx_poll;
+		rx_adapter->num_rx_polled = nb_rx_poll;
+		rx_adapter->wrr_sched = rx_wrr;
+		rx_adapter->wrr_len = nb_wrr;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
-
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
 	}
-- 
1.8.3.1



More information about the dev mailing list