[dpdk-dev] [PATCH v2 1/5] lib: distributor performance enhancements

Jerin Jacob jerin.jacob at caviumnetworks.com
Thu Dec 22 13:47:54 CET 2016


On Thu, Dec 22, 2016 at 04:37:04AM +0000, David Hunt wrote:
> Now sends bursts of up to 8 mbufs to each worker, and tracks
> the in-flight flow-ids (atomic scheduling)
> 
> New file with a new api, similar to the old API except with _burst
> at the end of the function names
> 
> Signed-off-by: David Hunt <david.hunt at intel.com>
> +
> +int
> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **pkts,
> +		struct rte_mbuf **oldpkt, unsigned int return_count)
> +{
> +	unsigned int count;
> +	uint64_t retries = 0;
> +
> +	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
> +
> +	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	while (count == 0) {
> +		rte_pause();
> +		retries++;
> +		if (retries > 1000) {
> +			retries = 0;

This retries write may not have any significance as it just before the
return

> +			return 0;
> +		}
> +		uint64_t t = __rdtsc()+100;

Use rte_ version of __rdtsc.

> +
> +		while (__rdtsc() < t)
> +			rte_pause();
> +
> +		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	}
> +	return count;
> +}
> +
> +int
> +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
> +{
> +	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
> +	unsigned int i;
> +
> +	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> +		/* Switch off the return bit first */
> +		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
> +
> +	for (i = num; i-- > 0; )
> +		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
> +			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
> +
> +	/* set the GET_BUF but even if we got no returns */
> +	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> +
> +	return 0;
> +}
> +
> +#if RTE_MACHINE_CPUFLAG_SSE2
> +static inline void

Move SSE version of the code to separate file so that later other SIMD arch
specific version like NEON can be incorporated.

> +find_match_sse2(struct rte_distributor_burst *d,
> +			uint16_t *data_ptr,
> +			uint16_t *output_ptr)
> +{
> +	/* Setup */
> +	__m128i incoming_fids;
> +	__m128i inflight_fids;
> +	__m128i preflight_fids;
> +	__m128i wkr;
> +	__m128i mask1;
> +	__m128i mask2;
> +	__m128i output;
> +	struct rte_distributor_backlog *bl;
> +
> +	/*
> +	 * Function overview:
> +	 * 2. Loop through all worker ID's
> +	 *  2a. Load the current inflights for that worker into an xmm reg
> +	 *  2b. Load the current backlog for that worker into an xmm reg
> +	 *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
> +	 *  2d. Add any matches to the output
> +	 * 3. Write the output xmm (matching worker ids).
> +	 */
> +
> +
> +	output = _mm_set1_epi16(0);
> +	incoming_fids = _mm_load_si128((__m128i *)data_ptr);
> +
> +	for (uint16_t i = 0; i < d->num_workers; i++) {
> +		bl = &d->backlog[i];
> +
> +		inflight_fids =
> +			_mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
> +		preflight_fids =
> +			_mm_load_si128((__m128i *)(bl->tags));
> +
> +		/*
> +		 * Any incoming_fid that exists anywhere in inflight_fids will
> +		 * have 0xffff in same position of the mask as the incoming fid
> +		 * Example (shortened to bytes for brevity):
> +		 * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
> +		 * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
> +		 * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
> +		 */
> +
> +		mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
> +			_SIDD_UWORD_OPS |
> +			_SIDD_CMP_EQUAL_ANY |
> +			_SIDD_UNIT_MASK);
> +		mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
> +			_SIDD_UWORD_OPS |
> +			_SIDD_CMP_EQUAL_ANY |
> +			_SIDD_UNIT_MASK);
> +
> +		mask1 = _mm_or_si128(mask1, mask2);
> +		/*
> +		 * Now mask contains 0xffff where there's a match.
> +		 * Next we need to store the worker_id in the relevant position
> +		 * in the output.
> +		 */
> +
> +		wkr = _mm_set1_epi16(i+1);
> +		mask1 = _mm_and_si128(mask1, wkr);
> +		output = _mm_or_si128(mask1, output);
> +	}
> +
> +/* process a set of packets to distribute them to workers */
> +int
> +rte_distributor_process_burst(struct rte_distributor_burst *d,
> +		struct rte_mbuf **mbufs, unsigned int num_mbufs)
> +{
> +	unsigned int next_idx = 0;
> +	static unsigned int wkr;
> +	struct rte_mbuf *next_mb = NULL;
> +	int64_t next_value = 0;
> +	uint16_t new_tag = 0;
> +	uint16_t flows[8] __rte_cache_aligned;

The const 8 has been used down in the function also. Please replace with macro

> +	//static int iter=0;

Please remove the test-code with // across the patch.

> +
> +	if (unlikely(num_mbufs == 0)) {
> +		/* Flush out all non-full cache-lines to workers. */
> +		for (unsigned int wid = 0 ; wid < d->num_workers; wid++) {
> +			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
> +				release(d, wid);
> +				handle_returns(d, wid);
> +			}
> +		}
> +		return 0;
> +	}
> +
> +	while (next_idx < num_mbufs) {
> +		uint16_t matches[8];
> +		int pkts;
> +
> +		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> +			d->bufs[wkr].count = 0;
> +
> +		for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
> +			if (mbufs[next_idx + i]) {
> +				/* flows have to be non-zero */
> +				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
> +			} else
> +				flows[i] = 0;
> +		}
> +
> +		switch (d->dist_match_fn) {
> +#ifdef RTE_MACHINE_CPUFLAG_SSE2

Is this conditional compilation flag is really required ? i.e
RTE_DIST_MATCH_SSE will not enabled in non SSE case

> +		case RTE_DIST_MATCH_SSE:
> +			find_match_sse2(d, &flows[0], &matches[0]);
> +			break;
> +#endif
> +		default:
> +			find_match_scalar(d, &flows[0], &matches[0]);
> +		}
> +
> +		/*
> +		 * Matches array now contain the intended worker ID (+1) of
> +		 * the incoming packets. Any zeroes need to be assigned
> +		 * workers.
> +		 */
> +
> +		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
> +			pkts = num_mbufs - next_idx;
> +		else
> +			pkts = RTE_DIST_BURST_SIZE;
> +
> +		for (int j = 0; j < pkts; j++) {
> +
> +			next_mb = mbufs[next_idx++];
> +			next_value = (((int64_t)(uintptr_t)next_mb) <<
> +					RTE_DISTRIB_FLAG_BITS);
> +			/*
> +			 * User is advocated to set tag vaue for each
> +			 * mbuf before calling rte_distributor_process.
> +			 * User defined tags are used to identify flows,
> +			 * or sessions.
> +			 */
> +			/* flows MUST be non-zero */
> +			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
> +
> +			/*
> +			 * Using the next line will cause the find_match
> +			 * function to be optimised out, making this function
> +			 * do parallel (non-atomic) distribution
> +			 */
> +			//matches[j] = 0;

test code with //



More information about the dev mailing list