[dpdk-dev] [PATCH v7 08/16] test/distributor: fix freeing mbufs

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Oct 16 07:12:51 CEST 2020


<snip>

> 
> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed
> to worker cores are freed in handlers.
> Such packets should not be returned to the distributor's main core. The only
> packets that should be returned are the packets send after completion of
> the tests in quit_workers function.
> 
> This patch stops returning mbufs to distributor's core.
> In case of shutdown tests it is impossible to determine how worker and
> distributor threads would synchronize.
> Packets used by tests should be freed and packets used during
> quit_workers() shouldn't. That's why returning mbufs to mempool is moved
> to test procedure run on distributor thread from worker threads.
> 
> Additionally this patch cleans up unused variables.
> 
> Fixes: c0de0eb82e40 ("distributor: switch over to new API")
> Cc: david.hunt at intel.com
> Cc: stable at dpdk.org
> 
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
> Acked-by: David Hunt <david.hunt at intel.com>
> ---
>  app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
>  1 file changed, 47 insertions(+), 49 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 838459392..06e01ff9d 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -44,7 +44,7 @@ total_packet_count(void)
>  	unsigned i, count = 0;
>  	for (i = 0; i < worker_idx; i++)
>  		count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> -				__ATOMIC_ACQUIRE);
> +				__ATOMIC_RELAXED);
I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.

>  	return count;
>  }
> 
> @@ -55,7 +55,7 @@ clear_packet_count(void)
>  	unsigned int i;
>  	for (i = 0; i < RTE_MAX_LCORE; i++)
>  		__atomic_store_n(&worker_stats[i].handled_packets, 0,
> -			__ATOMIC_RELEASE);
> +			__ATOMIC_RELAXED);
>  }
> 
>  /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@
> handle_work(void *arg)
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *db = wp->dist;
> -	unsigned int count = 0, num;
> +	unsigned int num;
>  	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
> __ATOMIC_RELAXED);
> 
>  	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>  	while (!quit) {
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> -		count += num;
> +				__ATOMIC_RELAXED);
>  		num = rte_distributor_get_pkt(db, id,
>  				buf, buf, num);
>  	}
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> -	count += num;
> +			__ATOMIC_RELAXED);
>  	rte_distributor_return_pkt(db, id, buf, num);
>  	return 0;
>  }
> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  	printf("Sanity test with all zero hashes done.\n");
> 
>  	/* pick two flows and check they go correctly */ @@ -163,7 +161,7
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>  			printf("Worker %u handled %u packets\n", i,
>  				__atomic_load_n(
>  					&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  		printf("Sanity test with two hash values done\n");
>  	}
> 
> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  	printf("Sanity test with non-zero hashes done\n");
> 
>  	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23
> +274,20 @@ handle_work_with_free_mbufs(void *arg)
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *d = wp->dist;
> -	unsigned int count = 0;
>  	unsigned int i;
>  	unsigned int num;
>  	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
> __ATOMIC_RELAXED);
> 
>  	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  	while (!quit) {
> -		count += num;
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> +				__ATOMIC_RELAXED);
>  		for (i = 0; i < num; i++)
>  			rte_pktmbuf_free(buf[i]);
>  		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  	}
> -	count += num;
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> +			__ATOMIC_RELAXED);
>  	rte_distributor_return_pkt(d, id, buf, num);
>  	return 0;
>  }
> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params
> *wp, struct rte_mempool *p)
>  			rte_distributor_process(d, NULL, 0);
>  		for (j = 0; j < BURST; j++) {
>  			bufs[j]->hash.usr = (i+j) << 1;
> -			rte_mbuf_refcnt_set(bufs[j], 1);
>  		}
> 
>  		rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10
> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct
> rte_mempool *p)  static int  handle_work_for_shutdown_test(void *arg)  {
> -	struct rte_mbuf *pkt = NULL;
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *d = wp->dist;
> -	unsigned int count = 0;
>  	unsigned int num;
> -	unsigned int total = 0;
> -	unsigned int i;
> -	unsigned int returned = 0;
>  	unsigned int zero_id = 0;
>  	unsigned int zero_unset;
>  	const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -
> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
>  	/* wait for quit single globally, or for worker zero, wait
>  	 * for zero_quit */
>  	while (!quit && !(id == zero_id && zero_quit)) {
> -		count += num;
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> -		for (i = 0; i < num; i++)
> -			rte_pktmbuf_free(buf[i]);
> +				__ATOMIC_RELAXED);
>  		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
> 
>  		if (num > 0) {
> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
>  				false, __ATOMIC_ACQ_REL,
> __ATOMIC_ACQUIRE);
>  		}
>  		zero_id = __atomic_load_n(&zero_idx,
> __ATOMIC_ACQUIRE);
> -
> -		total += num;
>  	}
> -	count += num;
> -	returned = rte_distributor_return_pkt(d, id, buf, num);
> -
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> +			__ATOMIC_RELAXED);
>  	if (id == zero_id) {
> +		rte_distributor_return_pkt(d, id, NULL, 0);
> +
>  		/* for worker zero, allow it to restart to pick up last packet
>  		 * when all workers are shutting down.
>  		 */
> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
> 
>  		while (!quit) {
> 
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> -					num, __ATOMIC_ACQ_REL);
> -			count += num;
> -			rte_pktmbuf_free(pkt);
> +					num, __ATOMIC_RELAXED);
>  			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  		}
> -		returned = rte_distributor_return_pkt(d,
> -				id, buf, num);
> -		printf("Num returned = %d\n", returned);
>  	}
> +	rte_distributor_return_pkt(d, id, buf, num);
>  	return 0;
>  }
> 
> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,  {
>  	struct rte_distributor *d = wp->dist;
>  	struct rte_mbuf *bufs[BURST];
> -	unsigned i;
> +	struct rte_mbuf *bufs2[BURST];
> +	unsigned int i;
> +	unsigned int failed = 0;
> 
>  	printf("=== Sanity test of worker shutdown ===\n");
> 
> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>  	 */
> 
>  	/* get more buffers to queue up, again setting them to the same
> flow */
> -	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
> +	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
>  		printf("line %d: Error getting mbufs from pool\n", __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs, BURST);
>  		return -1;
>  	}
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.usr = 1;
> +		bufs2[i]->hash.usr = 1;
> 
>  	/* get worker zero to quit */
>  	zero_quit = 1;
> -	rte_distributor_process(d, bufs, BURST);
> +	rte_distributor_process(d, bufs2, BURST);
> 
>  	/* flush the distributor */
>  	rte_distributor_flush(d);
> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
> 
>  	if (total_packet_count() != BURST * 2) {
>  		printf("Line %d: Error, not all packets flushed. "
>  				"Expected %u, got %u\n",
>  				__LINE__, BURST * 2, total_packet_count());
> -		return -1;
> +		failed = 1;
>  	}
> 
> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
> +
> +	if (failed)
> +		return -1;
> +
>  	printf("Sanity test with worker shutdown passed\n\n");
>  	return 0;
>  }
> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,  {
>  	struct rte_distributor *d = wp->dist;
>  	struct rte_mbuf *bufs[BURST];
> -	unsigned i;
> +	unsigned int i;
> +	unsigned int failed = 0;
> 
>  	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp-
> >name);
> 
> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
> 
>  	if (total_packet_count() != BURST) {
>  		printf("Line %d: Error, not all packets flushed. "
>  				"Expected %u, got %u\n",
>  				__LINE__, BURST, total_packet_count());
> -		return -1;
> +		failed = 1;
>  	}
> 
> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +
> +	if (failed)
> +		return -1;
> +
>  	printf("Flush test with worker shutdown passed\n\n");
>  	return 0;
>  }
> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct
> rte_mempool *p)
>  	const unsigned num_workers = rte_lcore_count() - 1;
>  	unsigned i;
>  	struct rte_mbuf *bufs[RTE_MAX_LCORE];
> -	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
> +	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
> +		printf("line %d: Error getting mbufs from pool\n", __LINE__);
> +		return;
> +	}
> 
>  	zero_quit = 0;
>  	quit = 1;
> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct
> rte_mempool *p)
>  		bufs[i]->hash.usr = i << 1;
>  	rte_distributor_process(d, bufs, num_workers);
> 
> -	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> -
>  	rte_distributor_process(d, NULL, 0);
>  	rte_distributor_flush(d);
>  	rte_eal_mp_wait_lcore();
> +
> +	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> +
>  	quit = 0;
>  	worker_idx = 0;
>  	zero_idx = RTE_MAX_LCORE;
> --
> 2.17.1



More information about the dev mailing list