[dpdk-dev] [PATCH v2 12/28] sched: update packet enqueue API

Jasvinder Singh jasvinder.singh at intel.com
Tue Jun 25 17:32:01 CEST 2019


Update packet enqueue api implementation to allow configuration
flexiblity for pipe traffic classes and queues, and subport
level configuration of the pipe parameters.

Signed-off-by: Jasvinder Singh <jasvinder.singh at intel.com>
Signed-off-by: Abraham Tovar <abrahamx.tovar at intel.com>
Signed-off-by: Lukasz Krakowiak <lukaszx.krakowiak at intel.com>
---
 lib/librte_sched/rte_sched.c | 228 ++++++++++++++++++++++-------------
 1 file changed, 144 insertions(+), 84 deletions(-)

diff --git a/lib/librte_sched/rte_sched.c b/lib/librte_sched/rte_sched.c
index 65c645df7..cb96e0613 100644
--- a/lib/librte_sched/rte_sched.c
+++ b/lib/librte_sched/rte_sched.c
@@ -1598,31 +1598,36 @@ rte_sched_port_update_queue_stats_on_drop(struct rte_sched_subport *subport,
 #ifdef RTE_SCHED_RED
 
 static inline int
-rte_sched_port_red_drop(struct rte_sched_port *port, struct rte_mbuf *pkt, uint32_t qindex, uint16_t qlen)
+rte_sched_port_red_drop(struct rte_sched_subport *subport,
+	struct rte_mbuf *pkt,
+	uint32_t qindex,
+	uint16_t qlen,
+	uint64_t time)
 {
 	struct rte_sched_queue_extra *qe;
 	struct rte_red_config *red_cfg;
 	struct rte_red *red;
 	uint32_t tc_index;
-	enum rte_meter_color color;
+	enum rte_color color;
 
-	tc_index = (qindex >> 2) & 0x3;
+	tc_index = rte_mbuf_sched_traffic_class_get(pkt);
 	color = rte_sched_port_pkt_read_color(pkt);
-	red_cfg = &port->red_config[tc_index][color];
+	red_cfg = &subport->red_config[tc_index][color];
 
 	if ((red_cfg->min_th | red_cfg->max_th) == 0)
 		return 0;
 
-	qe = port->queue_extra + qindex;
+	qe = subport->queue_extra + qindex;
 	red = &qe->red;
 
-	return rte_red_enqueue(red_cfg, red, qlen, port->time);
+	return rte_red_enqueue(red_cfg, red, qlen, time);
 }
 
 static inline void
-rte_sched_port_set_queue_empty_timestamp(struct rte_sched_port *port, uint32_t qindex)
+rte_sched_port_set_queue_empty_timestamp(struct rte_sched_port *port,
+	struct rte_sched_subport *subport, uint32_t qindex)
 {
-	struct rte_sched_queue_extra *qe = port->queue_extra + qindex;
+	struct rte_sched_queue_extra *qe = subport->queue_extra + qindex;
 	struct rte_red *red = &qe->red;
 
 	rte_red_mark_queue_empty(red, port->time);
@@ -1630,10 +1635,23 @@ rte_sched_port_set_queue_empty_timestamp(struct rte_sched_port *port, uint32_t q
 
 #else
 
-#define rte_sched_port_red_drop(port, pkt, qindex, qlen)             0
-
-#define rte_sched_port_set_queue_empty_timestamp(port, qindex)
+static inline int rte_sched_port_red_drop(
+	struct rte_sched_subport *subport __rte_unused,
+	struct rte_mbuf *pkt __rte_unused,
+	uint32_t qindex __rte_unused,
+	uint16_t qlen __rte_unused,
+	uint64_t time __rte_unused)
+{
+	return 0;
+}
 
+static inline void
+rte_sched_port_set_queue_empty_timestamp(struct rte_sched_port *port __rte_unused,
+	struct rte_sched_subport *subport __rte_unused,
+	uint32_t qindex __rte_unused)
+{
+	return;
+}
 #endif /* RTE_SCHED_RED */
 
 #ifdef RTE_SCHED_DEBUG
@@ -1665,63 +1683,71 @@ debug_check_queue_slab(struct rte_sched_port *port, uint32_t bmp_pos,
 
 #endif /* RTE_SCHED_DEBUG */
 
+static inline struct rte_sched_subport *
+rte_sched_port_get_subport(struct rte_sched_port *port,
+	struct rte_mbuf *pkt)
+{
+	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
+	uint32_t subport_id = (qindex >> (port->max_subport_pipes_log2 + 4)) &
+		(port->n_subports_per_port - 1);
+
+	return port->subports[subport_id];
+}
+
 static inline uint32_t
-rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_port *port,
-				       struct rte_mbuf *pkt)
+rte_sched_port_enqueue_qptrs_prefetch0(struct rte_sched_subport *subport,
+	struct rte_mbuf *pkt, uint32_t bitwidth)
 {
 	struct rte_sched_queue *q;
 #ifdef RTE_SCHED_COLLECT_STATS
 	struct rte_sched_queue_extra *qe;
 #endif
 	uint32_t qindex = rte_mbuf_sched_queue_get(pkt);
+	uint32_t queue_id = ((1 << (bitwidth + 4)) - 1) & qindex;
 
-	q = port->queue + qindex;
+	q = subport->queue + queue_id;
 	rte_prefetch0(q);
 #ifdef RTE_SCHED_COLLECT_STATS
-	qe = port->queue_extra + qindex;
+	qe = subport->queue_extra + queue_id;
 	rte_prefetch0(qe);
 #endif
 
-	return qindex;
+	return queue_id;
 }
 
 static inline void
-rte_sched_port_enqueue_qwa_prefetch0(struct rte_sched_port *port,
+rte_sched_port_enqueue_qwa_prefetch0(struct rte_sched_subport *subport,
 				     uint32_t qindex, struct rte_mbuf **qbase)
 {
 	struct rte_sched_queue *q;
 	struct rte_mbuf **q_qw;
 	uint16_t qsize;
 
-	q = port->queue + qindex;
-	qsize = rte_sched_port_qsize(port, qindex);
+	q = subport->queue + qindex;
+	qsize = rte_sched_subport_qsize(subport, qindex);
 	q_qw = qbase + (q->qw & (qsize - 1));
 
 	rte_prefetch0(q_qw);
-	rte_bitmap_prefetch0(port->bmp, qindex);
+	rte_bitmap_prefetch0(subport->bmp, qindex);
 }
 
 static inline int
-rte_sched_port_enqueue_qwa(struct rte_sched_port *port, uint32_t qindex,
-			   struct rte_mbuf **qbase, struct rte_mbuf *pkt)
+rte_sched_port_enqueue_qwa(struct rte_sched_subport *subport, uint32_t qindex,
+		struct rte_mbuf **qbase, struct rte_mbuf *pkt, uint64_t time)
 {
-	struct rte_sched_queue *q;
-	uint16_t qsize;
-	uint16_t qlen;
-
-	q = port->queue + qindex;
-	qsize = rte_sched_port_qsize(port, qindex);
-	qlen = q->qw - q->qr;
+	struct rte_sched_queue *q = subport->queue + qindex;
+	uint16_t qsize = rte_sched_subport_qsize(subport, qindex);
+	uint16_t qlen = q->qw - q->qr;
 
 	/* Drop the packet (and update drop stats) when queue is full */
-	if (unlikely(rte_sched_port_red_drop(port, pkt, qindex, qlen) ||
-		     (qlen >= qsize))) {
+	if (unlikely(rte_sched_port_red_drop(subport, pkt, qindex, qlen, time)
+		|| (qlen >= qsize))) {
 		rte_pktmbuf_free(pkt);
 #ifdef RTE_SCHED_COLLECT_STATS
-		rte_sched_port_update_subport_stats_on_drop(port, qindex, pkt,
-							    qlen < qsize);
-		rte_sched_port_update_queue_stats_on_drop(port, qindex, pkt,
-							  qlen < qsize);
+		rte_sched_port_update_subport_stats_on_drop(subport, pkt,
+			qlen < qsize);
+		rte_sched_port_update_queue_stats_on_drop(subport, qindex, pkt,
+			qlen < qsize);
 #endif
 		return 0;
 	}
@@ -1730,13 +1756,13 @@ rte_sched_port_enqueue_qwa(struct rte_sched_port *port, uint32_t qindex,
 	qbase[q->qw & (qsize - 1)] = pkt;
 	q->qw++;
 
-	/* Activate queue in the port bitmap */
-	rte_bitmap_set(port->bmp, qindex);
+	/* Activate queue in the subport bitmap */
+	rte_bitmap_set(subport->bmp, qindex);
 
 	/* Statistics */
 #ifdef RTE_SCHED_COLLECT_STATS
-	rte_sched_port_update_subport_stats(port, qindex, pkt);
-	rte_sched_port_update_queue_stats(port, qindex, pkt);
+	rte_sched_port_update_subport_stats(subport, pkt);
+	rte_sched_port_update_queue_stats(subport, qindex, pkt);
 #endif
 
 	return 1;
@@ -1764,17 +1790,21 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 		*pkt30, *pkt31, *pkt_last;
 	struct rte_mbuf **q00_base, **q01_base, **q10_base, **q11_base,
 		**q20_base, **q21_base, **q30_base, **q31_base, **q_last_base;
+	struct rte_sched_subport *subport00, *subport01, *subport10, *subport11,
+		*subport20, *subport21, *subport30, *subport31, *subport_last;
 	uint32_t q00, q01, q10, q11, q20, q21, q30, q31, q_last;
 	uint32_t r00, r01, r10, r11, r20, r21, r30, r31, r_last;
-	uint32_t result, i;
+	uint32_t result, bitwidth, i;
 
 	result = 0;
+	bitwidth = port->max_subport_pipes_log2;
 
 	/*
 	 * Less then 6 input packets available, which is not enough to
 	 * feed the pipeline
 	 */
 	if (unlikely(n_pkts < 6)) {
+		struct rte_sched_subport *subports[5];
 		struct rte_mbuf **q_base[5];
 		uint32_t q[5];
 
@@ -1782,22 +1812,27 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 		for (i = 0; i < n_pkts; i++)
 			rte_prefetch0(pkts[i]);
 
+		/* Prefetch the subport structure for each packet */
+		for (i = 0; i < n_pkts; i++)
+			subports[i] =
+				rte_sched_port_get_subport(port, pkts[i]);
+
 		/* Prefetch the queue structure for each queue */
 		for (i = 0; i < n_pkts; i++)
-			q[i] = rte_sched_port_enqueue_qptrs_prefetch0(port,
-								      pkts[i]);
+			q[i] = rte_sched_port_enqueue_qptrs_prefetch0(subports[i],
+					pkts[i], bitwidth);
 
 		/* Prefetch the write pointer location of each queue */
 		for (i = 0; i < n_pkts; i++) {
-			q_base[i] = rte_sched_port_qbase(port, q[i]);
-			rte_sched_port_enqueue_qwa_prefetch0(port, q[i],
+			q_base[i] = rte_sched_subport_qbase(subports[i], q[i]);
+			rte_sched_port_enqueue_qwa_prefetch0(subports[i], q[i],
 							     q_base[i]);
 		}
 
 		/* Write each packet to its queue */
 		for (i = 0; i < n_pkts; i++)
-			result += rte_sched_port_enqueue_qwa(port, q[i],
-							     q_base[i], pkts[i]);
+			result += rte_sched_port_enqueue_qwa(subports[i], q[i],
+					q_base[i], pkts[i], port->time);
 
 		return result;
 	}
@@ -1813,21 +1848,25 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 	rte_prefetch0(pkt10);
 	rte_prefetch0(pkt11);
 
-	q20 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt20);
-	q21 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt21);
+	subport20 = rte_sched_port_get_subport(port, pkt20);
+	subport21 = rte_sched_port_get_subport(port, pkt21);
+	q20 = rte_sched_port_enqueue_qptrs_prefetch0(subport20, pkt20, bitwidth);
+	q21 = rte_sched_port_enqueue_qptrs_prefetch0(subport21, pkt21, bitwidth);
 
 	pkt00 = pkts[4];
 	pkt01 = pkts[5];
 	rte_prefetch0(pkt00);
 	rte_prefetch0(pkt01);
 
-	q10 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt10);
-	q11 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt11);
+	subport10 = rte_sched_port_get_subport(port, pkt10);
+	subport11 = rte_sched_port_get_subport(port, pkt11);
+	q10 = rte_sched_port_enqueue_qptrs_prefetch0(subport10, pkt10, bitwidth);
+	q11 = rte_sched_port_enqueue_qptrs_prefetch0(subport11, pkt11, bitwidth);
 
-	q20_base = rte_sched_port_qbase(port, q20);
-	q21_base = rte_sched_port_qbase(port, q21);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q20, q20_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q21, q21_base);
+	q20_base = rte_sched_subport_qbase(subport20, q20);
+	q21_base = rte_sched_subport_qbase(subport21, q21);
+	rte_sched_port_enqueue_qwa_prefetch0(subport20, q20, q20_base);
+	rte_sched_port_enqueue_qwa_prefetch0(subport21, q21, q21_base);
 
 	/* Run the pipeline */
 	for (i = 6; i < (n_pkts & (~1)); i += 2) {
@@ -1842,6 +1881,10 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 		q31 = q21;
 		q20 = q10;
 		q21 = q11;
+		subport30 = subport20;
+		subport31 = subport21;
+		subport20 = subport10;
+		subport21 = subport11;
 		q30_base = q20_base;
 		q31_base = q21_base;
 
@@ -1851,19 +1894,25 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 		rte_prefetch0(pkt00);
 		rte_prefetch0(pkt01);
 
-		/* Stage 1: Prefetch queue structure storing queue pointers */
-		q10 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt10);
-		q11 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt11);
+		/* Stage 1: Prefetch subport and queue structure storing queue
+		 *  pointers
+		 */
+		subport10 = rte_sched_port_get_subport(port, pkt10);
+		subport11 = rte_sched_port_get_subport(port, pkt11);
+		q10 = rte_sched_port_enqueue_qptrs_prefetch0(subport10, pkt10, bitwidth);
+		q11 = rte_sched_port_enqueue_qptrs_prefetch0(subport11, pkt11, bitwidth);
 
 		/* Stage 2: Prefetch queue write location */
-		q20_base = rte_sched_port_qbase(port, q20);
-		q21_base = rte_sched_port_qbase(port, q21);
-		rte_sched_port_enqueue_qwa_prefetch0(port, q20, q20_base);
-		rte_sched_port_enqueue_qwa_prefetch0(port, q21, q21_base);
+		q20_base = rte_sched_subport_qbase(subport20, q20);
+		q21_base = rte_sched_subport_qbase(subport21, q21);
+		rte_sched_port_enqueue_qwa_prefetch0(subport20, q20, q20_base);
+		rte_sched_port_enqueue_qwa_prefetch0(subport21, q21, q21_base);
 
 		/* Stage 3: Write packet to queue and activate queue */
-		r30 = rte_sched_port_enqueue_qwa(port, q30, q30_base, pkt30);
-		r31 = rte_sched_port_enqueue_qwa(port, q31, q31_base, pkt31);
+		r30 = rte_sched_port_enqueue_qwa(subport30, q30, q30_base,
+			pkt30, port->time);
+		r31 = rte_sched_port_enqueue_qwa(subport31, q31, q31_base,
+			pkt31, port->time);
 		result += r30 + r31;
 	}
 
@@ -1875,38 +1924,49 @@ rte_sched_port_enqueue(struct rte_sched_port *port, struct rte_mbuf **pkts,
 	pkt_last = pkts[n_pkts - 1];
 	rte_prefetch0(pkt_last);
 
-	q00 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt00);
-	q01 = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt01);
+	subport00 = rte_sched_port_get_subport(port, pkt00);
+	subport01 = rte_sched_port_get_subport(port, pkt01);
+	q00 = rte_sched_port_enqueue_qptrs_prefetch0(subport00, pkt00, bitwidth);
+	q01 = rte_sched_port_enqueue_qptrs_prefetch0(subport01, pkt01, bitwidth);
 
-	q10_base = rte_sched_port_qbase(port, q10);
-	q11_base = rte_sched_port_qbase(port, q11);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q10, q10_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q11, q11_base);
+	q10_base = rte_sched_subport_qbase(subport10, q10);
+	q11_base = rte_sched_subport_qbase(subport11, q11);
+	rte_sched_port_enqueue_qwa_prefetch0(subport10, q10, q10_base);
+	rte_sched_port_enqueue_qwa_prefetch0(subport11, q11, q11_base);
 
-	r20 = rte_sched_port_enqueue_qwa(port, q20, q20_base, pkt20);
-	r21 = rte_sched_port_enqueue_qwa(port, q21, q21_base, pkt21);
+	r20 = rte_sched_port_enqueue_qwa(subport20, q20, q20_base, pkt20,
+		port->time);
+	r21 = rte_sched_port_enqueue_qwa(subport21, q21, q21_base, pkt21,
+		port->time);
 	result += r20 + r21;
 
-	q_last = rte_sched_port_enqueue_qptrs_prefetch0(port, pkt_last);
+	subport_last = rte_sched_port_get_subport(port, pkt_last);
+	q_last = rte_sched_port_enqueue_qptrs_prefetch0(subport_last,
+				pkt_last, bitwidth);
 
-	q00_base = rte_sched_port_qbase(port, q00);
-	q01_base = rte_sched_port_qbase(port, q01);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q00, q00_base);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q01, q01_base);
+	q00_base = rte_sched_subport_qbase(subport00, q00);
+	q01_base = rte_sched_subport_qbase(subport01, q01);
+	rte_sched_port_enqueue_qwa_prefetch0(subport00, q00, q00_base);
+	rte_sched_port_enqueue_qwa_prefetch0(subport01, q01, q01_base);
 
-	r10 = rte_sched_port_enqueue_qwa(port, q10, q10_base, pkt10);
-	r11 = rte_sched_port_enqueue_qwa(port, q11, q11_base, pkt11);
+	r10 = rte_sched_port_enqueue_qwa(subport10, q10, q10_base, pkt10,
+		port->time);
+	r11 = rte_sched_port_enqueue_qwa(subport11, q11, q11_base, pkt11,
+		port->time);
 	result += r10 + r11;
 
-	q_last_base = rte_sched_port_qbase(port, q_last);
-	rte_sched_port_enqueue_qwa_prefetch0(port, q_last, q_last_base);
+	q_last_base = rte_sched_subport_qbase(subport_last, q_last);
+	rte_sched_port_enqueue_qwa_prefetch0(subport_last, q_last, q_last_base);
 
-	r00 = rte_sched_port_enqueue_qwa(port, q00, q00_base, pkt00);
-	r01 = rte_sched_port_enqueue_qwa(port, q01, q01_base, pkt01);
+	r00 = rte_sched_port_enqueue_qwa(subport00, q00, q00_base, pkt00,
+		port->time);
+	r01 = rte_sched_port_enqueue_qwa(subport01, q01, q01_base, pkt01,
+		port->time);
 	result += r00 + r01;
 
 	if (n_pkts & 1) {
-		r_last = rte_sched_port_enqueue_qwa(port, q_last, q_last_base, pkt_last);
+		r_last = rte_sched_port_enqueue_qwa(subport_last, q_last,
+				q_last_base, pkt_last, port->time);
 		result += r_last;
 	}
 
@@ -2148,7 +2208,7 @@ grinder_schedule(struct rte_sched_port *port, uint32_t pos)
 		rte_bitmap_clear(port->bmp, qindex);
 		grinder->qmask &= ~(1 << grinder->qpos);
 		grinder->wrr_mask[grinder->qpos] = 0;
-		rte_sched_port_set_queue_empty_timestamp(port, qindex);
+		rte_sched_port_set_queue_empty_timestamp(port, port->subport, qindex);
 	}
 
 	/* Reset pipe loop detection */
-- 
2.21.0



More information about the dev mailing list