[dpdk-dev] [PATCH v4 2/8] test/distributor: synchronize lcores statistics

Lukasz Wojciechowski l.wojciechow at partner.samsung.com
Fri Oct 2 13:25:19 CEST 2020


Hi Honnappa,

Many thanks for the review!

I'll write my answers here not inline as it would be easier to read them 
in one place, I think.
So first of all I agree with you in 2 things:
1) all uses of statistics must be atomic and lack of that caused most of 
the problems
2) it would be better to replace barrier and memset in 
clear_packet_count() with atomic stores as you suggested

So I will apply both of above.

However I wasn't not fully convinced on changing acquire/release to 
relaxed. It wood be perfectly ok
if it would look like in this Herb Sutter's example: 
https://youtu.be/KeLBd2EJLOU?t=4170
But in his case the counters are cleared before worker threads start and 
are printout after they are completed.

In case of the dpdk distributor tests both worker and main cores are 
running at the same time. In the sanity_test, the statistics are cleared 
and verified few times for different hashes of packages. The worker 
cores are not stopped at this time and they continue their loops in 
handle procedure. Verification made in main core is an exchange of data 
as the current statistics indicate how the test will result.

So as I wasn't convinced, I run some tests with both both relaxed and 
acquire/release modes and they both fail :(
The failures caused by statistics errors to number of tests ratio for 
200000 tests was:
for relaxed: 0,000790562
for acq/rel: 0,000091321


That's why I'm going to modify tests in such way, that they would:
1) clear statistics
2) launch worker threads
3) run test
4) wait for workers procedures to complete
5) check stats, verify results and print them out

This way worker main core will use (clear or verify) stats only when 
there are no worker threads. This would make things simpler and allowing 
to focus on testing the distributor not tests. And of course relaxed 
mode would be enough!


Best regards
Lukasz


W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> <snip>
>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
>> This patch uses atomic acquire/release mechanisms to synchronize.
> In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.
>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richardson at intel.com
>> Cc: stable at dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
>> Acked-by: David Hunt <david.hunt at intel.com>
>> ---
>>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>>   1 file changed, 26 insertions(+), 13 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 35b25463a..0e49e3714 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>   	unsigned i, count = 0;
>>   	for (i = 0; i < worker_idx; i++)
>> -		count += worker_stats[i].handled_packets;
>> +		count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> +				__ATOMIC_ACQUIRE);
> RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.
>
>>   	return count;
>>   }
>>
>> @@ -52,6 +53,7 @@ static inline void
>>   clear_packet_count(void)
>>   {
>>   	memset(&worker_stats, 0, sizeof(worker_stats));
>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> Ideally, the counters should be set to 0 atomically rather than using a memset.
>
>>   }
>>
>>   /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>> handle_work(void *arg)
>>   	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>   	while (!quit) {
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_RELAXED);
>> +				__ATOMIC_ACQ_REL);
> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.
>
>>   		count += num;
>>   		num = rte_distributor_get_pkt(db, id,
>>   				buf, buf, num);
>>   	}
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_RELAXED);
>> +			__ATOMIC_ACQ_REL);
> Same here, do not see why this change is required.
>
>>   	count += num;
>>   	rte_distributor_return_pkt(db, id, buf, num);
>>   	return 0;
>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough.
>
>>   	printf("Sanity test with all zero hashes done.\n");
>>
>>   	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>>   		for (i = 0; i < rte_lcore_count() - 1; i++)
>>   			printf("Worker %u handled %u packets\n", i,
>> -					worker_stats[i].handled_packets);
>> +				__atomic_load_n(
>> +					&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>>   		printf("Sanity test with two hash values done\n");
>>   	}
>>
>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>>   	printf("Sanity test with non-zero hashes done\n");
>>
>>   	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>   		buf[i] = NULL;
>>   	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>   	while (!quit) {
>> -		worker_stats[id].handled_packets += num;
>>   		count += num;
>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +				__ATOMIC_ACQ_REL);
> IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough
>
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d,
>>   				id, buf, buf, num);
>>   	}
>> -	worker_stats[id].handled_packets += num;
>>   	count += num;
>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +			__ATOMIC_ACQ_REL);
> Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>
>>   	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>   	/* wait for quit single globally, or for worker zero, wait
>>   	 * for zero_quit */
>>   	while (!quit && !(id == zero_id && zero_quit)) {
>> -		worker_stats[id].handled_packets += num;
>>   		count += num;
>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +				__ATOMIC_ACQ_REL);
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d,
>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>>   		total += num;
>>   	}
>> -	worker_stats[id].handled_packets += num;
>>   	count += num;
>>   	returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +			__ATOMIC_ACQ_REL);
>>   	if (id == zero_id) {
>>   		/* for worker zero, allow it to restart to pick up last packet
>>   		 * when all workers are shutting down.
>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>   				id, buf, buf, num);
>>
>>   		while (!quit) {
>> -			worker_stats[id].handled_packets += num;
>>   			count += num;
>>   			rte_pktmbuf_free(pkt);
>>   			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> +
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> +					num, __ATOMIC_ACQ_REL);
>>   		}
>>   		returned = rte_distributor_return_pkt(d,
>>   				id, buf, num);
>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
>>
>>   	if (total_packet_count() != BURST * 2) {
>>   		printf("Line %d: Error, not all packets flushed. "
>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>>   	zero_quit = 0;
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
>>
>>   	if (total_packet_count() != BURST) {
>>   		printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1

-- 
Lukasz Wojciechowski
Principal Software Engineer

Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com



More information about the dev mailing list