[dpdk-dev] [PATCH v7 09/17] test: switch distributor test over to burst API

David Hunt david.hunt at intel.com
Tue Feb 21 04:17:45 CET 2017


Signed-off-by: David Hunt <david.hunt at intel.com>
---
 app/test/test_distributor.c | 292 ++++++++++++++++++++++++++++----------------
 1 file changed, 187 insertions(+), 105 deletions(-)

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index fdfa793..8866e31 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -39,7 +39,7 @@
 #include <rte_errno.h>
 #include <rte_mempool.h>
 #include <rte_mbuf.h>
-#include <rte_distributor_v20.h>
+#include <rte_distributor.h>
 
 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
 #define BURST 32
@@ -47,7 +47,7 @@
 
 struct worker_params {
 	char name[64];
-	struct rte_distributor_v20 *dist;
+	struct rte_distributor *dist;
 };
 
 struct worker_params worker_params;
@@ -87,19 +87,25 @@ clear_packet_count(void)
 static int
 handle_work(void *arg)
 {
-	struct rte_mbuf *pkt = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
-	struct rte_distributor_v20 *d = wp->dist;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt_v20(d, id, NULL);
+	struct rte_distributor *db = wp->dist;
+	unsigned int count = 0, num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	int i;
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+	num = rte_distributor_get_pkt(db, id, buf, buf, num);
 	while (!quit) {
-		worker_stats[id].handled_packets++, count++;
-		pkt = rte_distributor_get_pkt_v20(d, id, pkt);
+		worker_stats[id].handled_packets += num;
+		count += num;
+		num = rte_distributor_get_pkt(db, id,
+				buf, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt_v20(d, id, pkt);
+	worker_stats[id].handled_packets += num;
+	count += num;
+	rte_distributor_return_pkt(db, id, buf, num);
 	return 0;
 }
 
@@ -117,11 +123,15 @@ handle_work(void *arg)
 static int
 sanity_test(struct worker_params *wp, struct rte_mempool *p)
 {
-	struct rte_distributor_v20 *d = wp->dist;
+	struct rte_distributor *db = wp->dist;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	struct rte_mbuf *returns[BURST*2];
+	unsigned int i;
+	unsigned int retries;
+	unsigned int count = 0;
+
+	printf("=== Basic distributor sanity tests (%s) ===\n", wp->name);
 
-	printf("=== Basic distributor sanity tests ===\n");
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
@@ -133,8 +143,16 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process_v20(d, bufs, BURST);
-	rte_distributor_flush_v20(d);
+	rte_distributor_process(db, bufs, BURST);
+	count = 0;
+	do {
+
+		rte_distributor_flush(db);
+		count += rte_distributor_returned_pkts(db,
+				returns, BURST*2);
+	} while (count < BURST);
+
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -146,8 +164,6 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 		printf("Worker %u handled %u packets\n", i,
 				worker_stats[i].handled_packets);
 	printf("Sanity test with all zero hashes done.\n");
-	if (worker_stats[0].handled_packets != BURST)
-		return -1;
 
 	/* pick two flows and check they go correctly */
 	if (rte_lcore_count() >= 3) {
@@ -155,8 +171,13 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 		for (i = 0; i < BURST; i++)
 			bufs[i]->hash.usr = (i & 1) << 8;
 
-		rte_distributor_process_v20(d, bufs, BURST);
-		rte_distributor_flush_v20(d);
+		rte_distributor_process(db, bufs, BURST);
+		count = 0;
+		do {
+			rte_distributor_flush(db);
+			count += rte_distributor_returned_pkts(db,
+					returns, BURST*2);
+		} while (count < BURST);
 		if (total_packet_count() != BURST) {
 			printf("Line %d: Error, not all packets flushed. "
 					"Expected %u, got %u\n",
@@ -168,20 +189,22 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 			printf("Worker %u handled %u packets\n", i,
 					worker_stats[i].handled_packets);
 		printf("Sanity test with two hash values done\n");
-
-		if (worker_stats[0].handled_packets != 16 ||
-				worker_stats[1].handled_packets != 16)
-			return -1;
 	}
 
 	/* give a different hash value to each packet,
 	 * so load gets distributed */
 	clear_packet_count();
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = i;
+		bufs[i]->hash.usr = i+1;
+
+	rte_distributor_process(db, bufs, BURST);
+	count = 0;
+	do {
+		rte_distributor_flush(db);
+		count += rte_distributor_returned_pkts(db,
+				returns, BURST*2);
+	} while (count < BURST);
 
-	rte_distributor_process_v20(d, bufs, BURST);
-	rte_distributor_flush_v20(d);
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -203,8 +226,9 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 	unsigned num_returned = 0;
 
 	/* flush out any remaining packets */
-	rte_distributor_flush_v20(d);
-	rte_distributor_clear_returns_v20(d);
+	rte_distributor_flush(db);
+	rte_distributor_clear_returns(db);
+
 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
@@ -212,28 +236,45 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 	for (i = 0; i < BIG_BATCH; i++)
 		many_bufs[i]->hash.usr = i << 2;
 
+	printf("=== testing bit burst (%s) ===\n", wp->name);
 	for (i = 0; i < BIG_BATCH/BURST; i++) {
-		rte_distributor_process_v20(d, &many_bufs[i*BURST], BURST);
-		num_returned += rte_distributor_returned_pkts_v20(d,
+		rte_distributor_process(db,
+				&many_bufs[i*BURST], BURST);
+		count = rte_distributor_returned_pkts(db,
 				&return_bufs[num_returned],
 				BIG_BATCH - num_returned);
+		num_returned += count;
 	}
-	rte_distributor_flush_v20(d);
-	num_returned += rte_distributor_returned_pkts_v20(d,
-			&return_bufs[num_returned], BIG_BATCH - num_returned);
+	rte_distributor_flush(db);
+	count = rte_distributor_returned_pkts(db,
+		&return_bufs[num_returned],
+			BIG_BATCH - num_returned);
+	num_returned += count;
+	retries = 0;
+	do {
+		rte_distributor_flush(db);
+		count = rte_distributor_returned_pkts(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
+		retries++;
+	} while ((num_returned < BIG_BATCH) && (retries < 100));
+
 
 	if (num_returned != BIG_BATCH) {
-		printf("line %d: Number returned is not the same as "
-				"number sent\n", __LINE__);
+		printf("line %d: Missing packets, expected %d\n",
+				__LINE__, num_returned);
 		return -1;
 	}
+
 	/* big check -  make sure all packets made it back!! */
 	for (i = 0; i < BIG_BATCH; i++) {
 		unsigned j;
 		struct rte_mbuf *src = many_bufs[i];
-		for (j = 0; j < BIG_BATCH; j++)
+		for (j = 0; j < BIG_BATCH; j++) {
 			if (return_bufs[j] == src)
 				break;
+		}
 
 		if (j == BIG_BATCH) {
 			printf("Error: could not find source packet #%u\n", i);
@@ -257,20 +298,28 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
 static int
 handle_work_with_free_mbufs(void *arg)
 {
-	struct rte_mbuf *pkt = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
-	struct rte_distributor_v20 *d = wp->dist;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt_v20(d, id, NULL);
+	struct rte_distributor *d = wp->dist;
+	unsigned int count = 0;
+	unsigned int i;
+	unsigned int num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+	num = rte_distributor_get_pkt(d, id, buf, buf, num);
 	while (!quit) {
-		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt_v20(d, id, pkt);
+		worker_stats[id].handled_packets += num;
+		count += num;
+		for (i = 0; i < num; i++)
+			rte_pktmbuf_free(buf[i]);
+		num = rte_distributor_get_pkt(d,
+				id, buf, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt_v20(d, id, pkt);
+	worker_stats[id].handled_packets += num;
+	count += num;
+	rte_distributor_return_pkt(d, id, buf, num);
 	return 0;
 }
 
@@ -282,25 +331,29 @@ handle_work_with_free_mbufs(void *arg)
 static int
 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 {
-	struct rte_distributor_v20 *d = wp->dist;
+	struct rte_distributor *d = wp->dist;
 	unsigned i;
 	struct rte_mbuf *bufs[BURST];
 
-	printf("=== Sanity test with mbuf alloc/free  ===\n");
+	printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
+
 	clear_packet_count();
 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
 		unsigned j;
 		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
-			rte_distributor_process_v20(d, NULL, 0);
+			rte_distributor_process(d, NULL, 0);
 		for (j = 0; j < BURST; j++) {
 			bufs[j]->hash.usr = (i+j) << 1;
 			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
-		rte_distributor_process_v20(d, bufs, BURST);
+		rte_distributor_process(d, bufs, BURST);
 	}
 
-	rte_distributor_flush_v20(d);
+	rte_distributor_flush(d);
+
+	rte_delay_us(10000);
+
 	if (total_packet_count() < (1<<ITER_POWER)) {
 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
 				__LINE__, total_packet_count(),
@@ -316,21 +369,32 @@ static int
 handle_work_for_shutdown_test(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
-	struct rte_distributor_v20 *d = wp->dist;
-	unsigned count = 0;
-	const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_distributor *d = wp->dist;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	unsigned int total = 0;
+	unsigned int i;
+	unsigned int returned = 0;
+	const unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	num = rte_distributor_get_pkt(d, id, buf, buf, num);
 
-	pkt = rte_distributor_get_pkt_v20(d, id, NULL);
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == 0 && zero_quit)) {
-		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt_v20(d, id, NULL);
+		worker_stats[id].handled_packets += num;
+		count += num;
+		for (i = 0; i < num; i++)
+			rte_pktmbuf_free(buf[i]);
+		num = rte_distributor_get_pkt(d,
+				id, buf, buf, num);
+		total += num;
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt_v20(d, id, pkt);
+	worker_stats[id].handled_packets += num;
+	count += num;
+	returned = rte_distributor_return_pkt(d, id, buf, num);
 
 	if (id == 0) {
 		/* for worker zero, allow it to restart to pick up last packet
@@ -338,13 +402,18 @@ handle_work_for_shutdown_test(void *arg)
 		 */
 		while (zero_quit)
 			usleep(100);
-		pkt = rte_distributor_get_pkt_v20(d, id, NULL);
+
+		num = rte_distributor_get_pkt(d,
+				id, buf, buf, num);
+
 		while (!quit) {
 			worker_stats[id].handled_packets++, count++;
 			rte_pktmbuf_free(pkt);
-			pkt = rte_distributor_get_pkt_v20(d, id, NULL);
+			num = rte_distributor_get_pkt(d, id, buf, buf, num);
 		}
-		rte_distributor_return_pkt_v20(d, id, pkt);
+		returned = rte_distributor_return_pkt(d,
+				id, buf, num);
+		printf("Num returned = %d\n", returned);
 	}
 	return 0;
 }
@@ -359,24 +428,29 @@ static int
 sanity_test_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
-	struct rte_distributor_v20 *d = wp->dist;
+	struct rte_distributor *d = wp->dist;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
 	printf("=== Sanity test of worker shutdown ===\n");
 
 	clear_packet_count();
+
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
 	}
 
-	/* now set all hash values in all buffers to zero, so all pkts go to the
-	 * one worker thread */
+	/*
+	 * Now set all hash values in all buffers to same value so all
+	 * pkts go to the one worker thread
+	 */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
+
+	rte_distributor_process(d, bufs, BURST);
+	rte_distributor_flush(d);
 
-	rte_distributor_process_v20(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -387,14 +461,19 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
-	rte_distributor_process_v20(d, bufs, BURST);
-
+	rte_distributor_process(d, bufs, BURST);
 	/* flush the distributor */
-	rte_distributor_flush_v20(d);
+	rte_distributor_flush(d);
+	rte_delay_us(10000);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -402,10 +481,6 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Sanity test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -417,11 +492,11 @@ static int
 test_flush_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
-	struct rte_distributor_v20 *d = wp->dist;
+	struct rte_distributor *d = wp->dist;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
-	printf("=== Test flush fn with worker shutdown ===\n");
+	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
 
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
@@ -434,7 +509,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process_v20(d, bufs, BURST);
+	rte_distributor_process(d, bufs, BURST);
+
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -443,9 +519,15 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
 	zero_quit = 1;
 
 	/* flush the distributor */
-	rte_distributor_flush_v20(d);
+	rte_distributor_flush(d);
+
+	rte_delay_us(10000);
 
 	zero_quit = 0;
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -453,10 +535,6 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Flush test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -464,11 +542,12 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
 static
 int test_error_distributor_create_name(void)
 {
-	struct rte_distributor_v20 *d = NULL;
+	struct rte_distributor *d = NULL;
 	char *name = NULL;
 
-	d = rte_distributor_create_v20(name, rte_socket_id(),
-			rte_lcore_count() - 1);
+	d = rte_distributor_create(name, rte_socket_id(),
+			rte_lcore_count() - 1,
+			RTE_DIST_ALG_BURST);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with NULL name param\n");
 		return -1;
@@ -481,9 +560,11 @@ int test_error_distributor_create_name(void)
 static
 int test_error_distributor_create_numworkers(void)
 {
-	struct rte_distributor_v20 *d = NULL;
-	d = rte_distributor_create_v20("test_numworkers", rte_socket_id(),
-			RTE_MAX_LCORE + 10);
+	struct rte_distributor *d = NULL;
+
+	d = rte_distributor_create("test_numworkers", rte_socket_id(),
+			RTE_MAX_LCORE + 10,
+			RTE_DIST_ALG_BURST);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with num_workers > MAX\n");
 		return -1;
@@ -496,7 +577,7 @@ int test_error_distributor_create_numworkers(void)
 static void
 quit_workers(struct worker_params *wp, struct rte_mempool *p)
 {
-	struct rte_distributor_v20 *d = wp->dist;
+	struct rte_distributor *d = wp->dist;
 	const unsigned num_workers = rte_lcore_count() - 1;
 	unsigned i;
 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
@@ -506,12 +587,12 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
 		bufs[i]->hash.usr = i << 1;
-	rte_distributor_process_v20(d, bufs, num_workers);
+	rte_distributor_process(d, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
 
-	rte_distributor_process_v20(d, NULL, 0);
-	rte_distributor_flush_v20(d);
+	rte_distributor_process(d, NULL, 0);
+	rte_distributor_flush(d);
 	rte_eal_mp_wait_lcore();
 	quit = 0;
 	worker_idx = 0;
@@ -520,7 +601,7 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
 static int
 test_distributor(void)
 {
-	static struct rte_distributor_v20 *d;
+	static struct rte_distributor *d;
 	static struct rte_mempool *p;
 
 	if (rte_lcore_count() < 2) {
@@ -529,16 +610,17 @@ test_distributor(void)
 	}
 
 	if (d == NULL) {
-		d = rte_distributor_create_v20("Test_distributor",
+		d = rte_distributor_create("Test_dist_burst",
 				rte_socket_id(),
-				rte_lcore_count() - 1);
+				rte_lcore_count() - 1,
+				RTE_DIST_ALG_BURST);
 		if (d == NULL) {
-			printf("Error creating distributor\n");
+			printf("Error creating burst distributor\n");
 			return -1;
 		}
 	} else {
-		rte_distributor_flush_v20(d);
-		rte_distributor_clear_returns_v20(d);
+		rte_distributor_flush(d);
+		rte_distributor_clear_returns(d);
 	}
 
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
@@ -553,7 +635,7 @@ test_distributor(void)
 	}
 
 	worker_params.dist = d;
-	sprintf(worker_params.name, "single");
+	sprintf(worker_params.name, "burst");
 
 	rte_eal_mp_remote_launch(handle_work, &worker_params, SKIP_MASTER);
 	if (sanity_test(&worker_params, p) < 0)
-- 
2.7.4



More information about the dev mailing list