[dpdk-dev] [PATCH v4 2/8] test/distributor: synchronize lcores statistics

Lukasz Wojciechowski l.wojciechow at partner.samsung.com
Fri Oct 16 14:43:10 CEST 2020


Hi Honnappa,

Thank you for your answer.
In the current v7 version I followed your advise and used RELAXED memory model.
And it works without any issues. I guess after fixing other issues found since v4 the distributor works more stable.
I didn't have time to rearrange all tests in the way I proposed, but I guess if they work like this it's not a top priority.

Can you give an ack on the series? I believe David Marchand is waiting for your opinion to process it.

Best regards
Lukasz

W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze:
> <snip>
>
>> Hi Honnappa,
>>
>> Many thanks for the review!
>>
>> I'll write my answers here not inline as it would be easier to read them in one
>> place, I think.
>> So first of all I agree with you in 2 things:
>> 1) all uses of statistics must be atomic and lack of that caused most of the
>> problems
>> 2) it would be better to replace barrier and memset in
>> clear_packet_count() with atomic stores as you suggested
>>
>> So I will apply both of above.
>>
>> However I wasn't not fully convinced on changing acquire/release to relaxed.
>> It wood be perfectly ok if it would look like in this Herb Sutter's example:
>> https://youtu.be/KeLBd2[]  EJLOU?t=4170
>> But in his case the counters are cleared before worker threads start and are
>> printout after they are completed.
>>
>> In case of the dpdk distributor tests both worker and main cores are running
>> at the same time. In the sanity_test, the statistics are cleared and verified
>> few times for different hashes of packages. The worker cores are not
>> stopped at this time and they continue their loops in handle procedure.
>> Verification made in main core is an exchange of data as the current statistics
>> indicate how the test will result.
> Agree. The key point we have to note is that the data that is exchanged between the two threads is already atomic (handled_packets is atomic).
>
>> So as I wasn't convinced, I run some tests with both both relaxed and
>> acquire/release modes and they both fail :( The failures caused by statistics
>> errors to number of tests ratio for
>> 200000 tests was:
>> for relaxed: 0,000790562
>> for acq/rel: 0,000091321
>>
>>
>> That's why I'm going to modify tests in such way, that they would:
>> 1) clear statistics
>> 2) launch worker threads
>> 3) run test
>> 4) wait for workers procedures to complete
>> 5) check stats, verify results and print them out
>>
>> This way worker main core will use (clear or verify) stats only when there are
>> no worker threads. This would make things simpler and allowing to focus on
>> testing the distributor not tests. And of course relaxed mode would be
>> enough!
> Agree, this would be the only way to ensure that the main thread sees the correct statistics (just like in the video)
>
>>
>> Best regards
>> Lukasz
>>
>>
>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>>> <snip>
>>>
>>>> Statistics of handled packets are cleared and read on main lcore,
>>>> while they are increased in workers handlers on different lcores.
>>>>
>>>> Without synchronization occasionally showed invalid values.
>>>> This patch uses atomic acquire/release mechanisms to synchronize.
>>> In general, load-acquire and store-release memory orderings are required
>> while synchronizing data (that cannot be updated atomically) between
>> threads. In the situation, making counters atomic is enough.
>>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>>> Cc: bruce.richardson at intel.com
>>>> Cc: stable at dpdk.org
>>>>
>>>> Signed-off-by: Lukasz Wojciechowski
>>>> <l.wojciechow at partner.samsung.com>
>>>> Acked-by: David Hunt <david.hunt at intel.com>
>>>> ---
>>>>    app/test/test_distributor.c | 39 ++++++++++++++++++++++++-----------
>> --
>>>>    1 file changed, 26 insertions(+), 13 deletions(-)
>>>>
>>>> diff --git a/app/test/test_distributor.c
>>>> b/app/test/test_distributor.c index
>>>> 35b25463a..0e49e3714 100644
>>>> --- a/app/test/test_distributor.c
>>>> +++ b/app/test/test_distributor.c
>>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>>    	unsigned i, count = 0;
>>>>    	for (i = 0; i < worker_idx; i++)
>>>> -		count += worker_stats[i].handled_packets;
>>>> +		count +=
>>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>>> +				__ATOMIC_ACQUIRE);
>>> RELAXED memory order is sufficient. For ex: the worker threads are not
>> 'releasing' any data that is not atomically updated to the main thread.
>>>>    	return count;
>>>>    }
>>>>
>>>> @@ -52,6 +53,7 @@ static inline void
>>>>    clear_packet_count(void)
>>>>    {
>>>>    	memset(&worker_stats, 0, sizeof(worker_stats));
>>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>> Ideally, the counters should be set to 0 atomically rather than using a
>> memset.
>>>>    }
>>>>
>>>>    /* this is the basic worker function for sanity test @@ -72,13
>>>> +74,13 @@ handle_work(void *arg)
>>>>    	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>>    	while (!quit) {
>>>>    		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> -				__ATOMIC_RELAXED);
>>>> +				__ATOMIC_ACQ_REL);
>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main
>> thread. The main thread might still see the updates from different threads in
>> different order.
>>>>    		count += num;
>>>>    		num = rte_distributor_get_pkt(db, id,
>>>>    				buf, buf, num);
>>>>    	}
>>>>    	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> -			__ATOMIC_RELAXED);
>>>> +			__ATOMIC_ACQ_REL);
>>> Same here, do not see why this change is required.
>>>
>>>>    	count += num;
>>>>    	rte_distributor_return_pkt(db, id, buf, num);
>>>>    	return 0;
>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>>> rte_mempool *p)
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough.
>>>
>>>>    	printf("Sanity test with all zero hashes done.\n");
>>>>
>>>>    	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>>>
>>>>    		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    			printf("Worker %u handled %u packets\n", i,
>>>> -					worker_stats[i].handled_packets);
>>>> +				__atomic_load_n(
>>>> +					&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    		printf("Sanity test with two hash values done\n");
>>>>    	}
>>>>
>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>>> rte_mempool *p)
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    	printf("Sanity test with non-zero hashes done\n");
>>>>
>>>>    	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>>    		buf[i] = NULL;
>>>>    	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>>    	while (!quit) {
>>>> -		worker_stats[id].handled_packets += num;
>>>>    		count += num;
>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> +				__ATOMIC_ACQ_REL);
>>> IMO, the problem would be the non-atomic update of the statistics. So,
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    		for (i = 0; i < num; i++)
>>>>    			rte_pktmbuf_free(buf[i]);
>>>>    		num = rte_distributor_get_pkt(d,
>>>>    				id, buf, buf, num);
>>>>    	}
>>>> -	worker_stats[id].handled_packets += num;
>>>>    	count += num;
>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> +			__ATOMIC_ACQ_REL);
>>> Same here, the problem is non-atomic update of the statistics,
>> __ATOMIC_RELAXED is enough.
>>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>>
>>>>    	rte_distributor_return_pkt(d, id, buf, num);
>>>>    	return 0;
>>>>    }
>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>>    	/* wait for quit single globally, or for worker zero, wait
>>>>    	 * for zero_quit */
>>>>    	while (!quit && !(id == zero_id && zero_quit)) {
>>>> -		worker_stats[id].handled_packets += num;
>>>>    		count += num;
>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> +				__ATOMIC_ACQ_REL);
>>>>    		for (i = 0; i < num; i++)
>>>>    			rte_pktmbuf_free(buf[i]);
>>>>    		num = rte_distributor_get_pkt(d,
>>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>
>>>>    		total += num;
>>>>    	}
>>>> -	worker_stats[id].handled_packets += num;
>>>>    	count += num;
>>>>    	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>>
>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> +			__ATOMIC_ACQ_REL);
>>>>    	if (id == zero_id) {
>>>>    		/* for worker zero, allow it to restart to pick up last packet
>>>>    		 * when all workers are shutting down.
>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>    				id, buf, buf, num);
>>>>
>>>>    		while (!quit) {
>>>> -			worker_stats[id].handled_packets += num;
>>>>    			count += num;
>>>>    			rte_pktmbuf_free(pkt);
>>>>    			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>> +
>>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> +					num, __ATOMIC_ACQ_REL);
>>>>    		}
>>>>    		returned = rte_distributor_return_pkt(d,
>>>>    				id, buf, num);
>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>>> worker_params *wp,
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>>>
>>>>    	if (total_packet_count() != BURST * 2) {
>>>>    		printf("Line %d: Error, not all packets flushed. "
>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>>> worker_params *wp,
>>>>    	zero_quit = 0;
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>>>
>>>>    	if (total_packet_count() != BURST) {
>>>>    		printf("Line %d: Error, not all packets flushed. "
>>>> --
>>>> 2.17.1
>> --
>> Lukasz Wojciechowski
>> Principal Software Engineer
>>
>> Samsung R&D Institute Poland
>> Samsung Electronics
>> Office +48 22 377 88 25
>> l.wojciechow at partner.samsung.com

-- 
Lukasz Wojciechowski
Principal Software Engineer

Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com



More information about the dev mailing list