[dpdk-dev] [PATCH v4 2/8] test/distributor: synchronize lcores statistics

Lukasz Wojciechowski l.wojciechow at partner.samsung.com
Thu Oct 8 22:47:48 CEST 2020


Hi Honappa,

I pushed v5 of the patches today.

However all I fixed in this patch is replacing memset to loop of 
operations on atomic. I didn't move clearing and checking the statistics 
out of the test yet (to make sure no worker is running).

After fixing few more distributor issues I wasn't able now to catch even 
a single failure using the ACQ/REL memory models.
I'll test it maybe tomorrow with RELAXED to see if it will work.

Best regards

Lukasz

W dniu 02.10.2020 o 13:25, Lukasz Wojciechowski pisze:
> Hi Honnappa,
>
> Many thanks for the review!
>
> I'll write my answers here not inline as it would be easier to read them
> in one place, I think.
> So first of all I agree with you in 2 things:
> 1) all uses of statistics must be atomic and lack of that caused most of
> the problems
> 2) it would be better to replace barrier and memset in
> clear_packet_count() with atomic stores as you suggested
>
> So I will apply both of above.
>
> However I wasn't not fully convinced on changing acquire/release to
> relaxed. It wood be perfectly ok
> if it would look like in this Herb Sutter's example:
> https://youtu.be/KeLBd2EJLOU?t=4170
> But in his case the counters are cleared before worker threads start and
> are printout after they are completed.
>
> In case of the dpdk distributor tests both worker and main cores are
> running at the same time. In the sanity_test, the statistics are cleared
> and verified few times for different hashes of packages. The worker
> cores are not stopped at this time and they continue their loops in
> handle procedure. Verification made in main core is an exchange of data
> as the current statistics indicate how the test will result.
>
> So as I wasn't convinced, I run some tests with both both relaxed and
> acquire/release modes and they both fail :(
> The failures caused by statistics errors to number of tests ratio for
> 200000 tests was:
> for relaxed: 0,000790562
> for acq/rel: 0,000091321
>
>
> That's why I'm going to modify tests in such way, that they would:
> 1) clear statistics
> 2) launch worker threads
> 3) run test
> 4) wait for workers procedures to complete
> 5) check stats, verify results and print them out
>
> This way worker main core will use (clear or verify) stats only when
> there are no worker threads. This would make things simpler and allowing
> to focus on testing the distributor not tests. And of course relaxed
> mode would be enough!
>
>
> Best regards
> Lukasz
>
>
> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>> <snip>
>>
>>> Statistics of handled packets are cleared and read on main lcore, while they
>>> are increased in workers handlers on different lcores.
>>>
>>> Without synchronization occasionally showed invalid values.
>>> This patch uses atomic acquire/release mechanisms to synchronize.
>> In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.
>>
>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>> Cc: bruce.richardson at intel.com
>>> Cc: stable at dpdk.org
>>>
>>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
>>> Acked-by: David Hunt <david.hunt at intel.com>
>>> ---
>>>    app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>>>    1 file changed, 26 insertions(+), 13 deletions(-)
>>>
>>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>>> 35b25463a..0e49e3714 100644
>>> --- a/app/test/test_distributor.c
>>> +++ b/app/test/test_distributor.c
>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>    	unsigned i, count = 0;
>>>    	for (i = 0; i < worker_idx; i++)
>>> -		count += worker_stats[i].handled_packets;
>>> +		count +=
>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>> +				__ATOMIC_ACQUIRE);
>> RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.
>>
>>>    	return count;
>>>    }
>>>
>>> @@ -52,6 +53,7 @@ static inline void
>>>    clear_packet_count(void)
>>>    {
>>>    	memset(&worker_stats, 0, sizeof(worker_stats));
>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>> Ideally, the counters should be set to 0 atomically rather than using a memset.
>>
>>>    }
>>>
>>>    /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>>> handle_work(void *arg)
>>>    	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>    	while (!quit) {
>>>    		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> -				__ATOMIC_RELAXED);
>>> +				__ATOMIC_ACQ_REL);
>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.
>>
>>>    		count += num;
>>>    		num = rte_distributor_get_pkt(db, id,
>>>    				buf, buf, num);
>>>    	}
>>>    	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> -			__ATOMIC_RELAXED);
>>> +			__ATOMIC_ACQ_REL);
>> Same here, do not see why this change is required.
>>
>>>    	count += num;
>>>    	rte_distributor_return_pkt(db, id, buf, num);
>>>    	return 0;
>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>> rte_mempool *p)
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough.
>>
>>>    	printf("Sanity test with all zero hashes done.\n");
>>>
>>>    	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>>
>>>    		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    			printf("Worker %u handled %u packets\n", i,
>>> -					worker_stats[i].handled_packets);
>>> +				__atomic_load_n(
>>> +					&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough
>>
>>>    		printf("Sanity test with two hash values done\n");
>>>    	}
>>>
>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>> rte_mempool *p)
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough
>>
>>>    	printf("Sanity test with non-zero hashes done\n");
>>>
>>>    	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>    		buf[i] = NULL;
>>>    	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>    	while (!quit) {
>>> -		worker_stats[id].handled_packets += num;
>>>    		count += num;
>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> +				__ATOMIC_ACQ_REL);
>> IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough
>>
>>>    		for (i = 0; i < num; i++)
>>>    			rte_pktmbuf_free(buf[i]);
>>>    		num = rte_distributor_get_pkt(d,
>>>    				id, buf, buf, num);
>>>    	}
>>> -	worker_stats[id].handled_packets += num;
>>>    	count += num;
>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> +			__ATOMIC_ACQ_REL);
>> Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>
>>>    	rte_distributor_return_pkt(d, id, buf, num);
>>>    	return 0;
>>>    }
>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>    	/* wait for quit single globally, or for worker zero, wait
>>>    	 * for zero_quit */
>>>    	while (!quit && !(id == zero_id && zero_quit)) {
>>> -		worker_stats[id].handled_packets += num;
>>>    		count += num;
>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> +				__ATOMIC_ACQ_REL);
>>>    		for (i = 0; i < num; i++)
>>>    			rte_pktmbuf_free(buf[i]);
>>>    		num = rte_distributor_get_pkt(d,
>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>>
>>>    		total += num;
>>>    	}
>>> -	worker_stats[id].handled_packets += num;
>>>    	count += num;
>>>    	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>
>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> +			__ATOMIC_ACQ_REL);
>>>    	if (id == zero_id) {
>>>    		/* for worker zero, allow it to restart to pick up last packet
>>>    		 * when all workers are shutting down.
>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>    				id, buf, buf, num);
>>>
>>>    		while (!quit) {
>>> -			worker_stats[id].handled_packets += num;
>>>    			count += num;
>>>    			rte_pktmbuf_free(pkt);
>>>    			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>> +
>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> +					num, __ATOMIC_ACQ_REL);
>>>    		}
>>>    		returned = rte_distributor_return_pkt(d,
>>>    				id, buf, num);
>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>> worker_params *wp,
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>>>
>>>    	if (total_packet_count() != BURST * 2) {
>>>    		printf("Line %d: Error, not all packets flushed. "
>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>> worker_params *wp,
>>>    	zero_quit = 0;
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>>>
>>>    	if (total_packet_count() != BURST) {
>>>    		printf("Line %d: Error, not all packets flushed. "
>>> --
>>> 2.17.1

-- 
Lukasz Wojciechowski
Principal Software Engineer

Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com



More information about the dev mailing list