[dpdk-dev] [PATCH v9 16/18] examples/distributor: give Rx thread a core

David Hunt david.hunt at intel.com
Mon Mar 6 10:10:31 CET 2017


This so that with the increased amount of stats we are counting,
we don't interfere with the rx core.

Signed-off-by: David Hunt <david.hunt at intel.com>
---
 examples/distributor/main.c | 50 ++++++++++++++++++++++++++++++---------------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index cf2e826..8daf43d 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -278,6 +278,7 @@ lcore_rx(struct lcore_params *p)
 
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG_DP(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -295,13 +296,12 @@ lcore_rx(struct lcore_params *p)
 static inline void
 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
 	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG_DP(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
 		do {
 			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
 		} while (++nb_tx < outbuf->count);
@@ -313,6 +313,7 @@ static inline void
 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
 {
 	uint8_t outp;
+
 	for (outp = 0; outp < nb_ports; outp++) {
 		/* skip ports that are not enabled */
 		if ((enabled_port_mask & (1 << outp)) == 0)
@@ -405,9 +406,9 @@ lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -436,11 +437,12 @@ lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -562,6 +564,8 @@ lcore_worker(struct lcore_params *p)
 	for (i = 0; i < 8; i++)
 		buf[i] = NULL;
 
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
 	while (!quit_signal_work) {
 		num = rte_distributor_get_pkt(d, id, buf, buf, num);
@@ -573,6 +577,10 @@ lcore_worker(struct lcore_params *p)
 				rte_pause();
 			buf[i]->port ^= xor_val;
 		}
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
 	return 0;
 }
@@ -677,9 +685,10 @@ main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 4)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 4 logical cores to run:\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
 				"1 lcore for packet RX\n"
 				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
@@ -721,7 +730,7 @@ main(int argc, char *argv[])
 	}
 
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 3,
+			rte_lcore_count() - 4,
 			RTE_DIST_ALG_BURST);
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
@@ -760,7 +769,21 @@ main(int argc, char *argv[])
 			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
 					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
 		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
@@ -773,11 +796,6 @@ main(int argc, char *argv[])
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
-
-	if (lcore_rx(&p) != 0)
-		return -1;
 
 	freq = rte_get_timer_hz();
 	t = rte_rdtsc() + freq;
-- 
2.7.4



More information about the dev mailing list