[dpdk-dev] [PATCH v2 3/6] lib/lpm: integrate RCU QSBR

Medvedkin, Vladimir vladimir.medvedkin at intel.com
Wed Sep 18 18:15:55 CEST 2019


Hi Ruifeng,

Thanks for this patchseries, see comments below.

On 06/09/2019 10:45, Ruifeng Wang wrote:
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
>
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
>
> Signed-off-by: Ruifeng Wang <ruifeng.wang at arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> ---
>   lib/librte_lpm/Makefile            |   3 +-
>   lib/librte_lpm/meson.build         |   2 +
>   lib/librte_lpm/rte_lpm.c           | 223 +++++++++++++++++++++++++++--
>   lib/librte_lpm/rte_lpm.h           |  22 +++
>   lib/librte_lpm/rte_lpm_version.map |   6 +
>   lib/meson.build                    |   3 +-
>   6 files changed, 244 insertions(+), 15 deletions(-)
>
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index a7946a1c5..ca9e16312 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk
>   # library name
>   LIB = librte_lpm.a
>   
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
>   CFLAGS += -O3
>   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
>   
>   EXPORT_MAP := rte_lpm_version.map
>   
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index a5176d8ae..19a35107f 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -2,9 +2,11 @@
>   # Copyright(c) 2017 Intel Corporation
>   
>   version = 2
> +allow_experimental_apis = true
>   sources = files('rte_lpm.c', 'rte_lpm6.c')
>   headers = files('rte_lpm.h', 'rte_lpm6.h')
>   # since header files have different names, we can install all vector headers
>   # without worrying about which architecture we actually need
>   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>   deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 3a929a1b1..9764b8de6 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>    */
>   
>   #include <string.h>
> @@ -22,6 +23,7 @@
>   #include <rte_rwlock.h>
>   #include <rte_spinlock.h>
>   #include <rte_tailq.h>
> +#include <rte_ring.h>
>   
>   #include "rte_lpm.h"
>   
> @@ -39,6 +41,11 @@ enum valid_flag {
>   	VALID
>   };
>   
> +struct __rte_lpm_qs_item {
> +	uint64_t token;	/**< QSBR token.*/
> +	uint32_t index;	/**< tbl8 group index.*/
> +};
> +
>   /* Macro to enable/disable run-time checks. */
>   #if defined(RTE_LIBRTE_LPM_DEBUG)
>   #include <rte_debug.h>
> @@ -381,6 +388,7 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
>   
>   	rte_mcfg_tailq_write_unlock();
>   
> +	rte_ring_free(lpm->qs_fifo);
>   	rte_free(lpm->tbl8);
>   	rte_free(lpm->rules_tbl);
>   	rte_free(lpm);
> @@ -390,6 +398,147 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04);
>   MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
>   		rte_lpm_free_v1604);
>   
> +/* Add an item into FIFO.
> + * return: 0 - success
> + */
> +static int
> +__rte_lpm_rcu_qsbr_fifo_push(struct rte_ring *fifo,
> +	struct __rte_lpm_qs_item *item)
> +{
> +	if (rte_ring_free_count(fifo) < 2) {
> +		RTE_LOG(ERR, LPM, "QS FIFO full\n");
> +		rte_errno = ENOSPC;
> +		return 1;
> +	}
> +
> +	(void)rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->token);
> +	(void)rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->index);
> +
> +	return 0;
> +}
> +
> +/* Remove item from FIFO.
> + * Used when data observed by rte_ring_peek.
> + */
> +static void
> +__rte_lpm_rcu_qsbr_fifo_pop(struct rte_ring *fifo,
> +	struct __rte_lpm_qs_item *item)
Is it necessary to pass the pointer for struct __rte_lpm_qs_item? 
According to code only item.index is used after this call.
> +{
> +	void *obj_token = NULL;
> +	void *obj_index = NULL;
> +
> +	(void)rte_ring_sc_dequeue(fifo, &obj_token);
I think it is not necessary to cast here.
> +	(void)rte_ring_sc_dequeue(fifo, &obj_index);
> +
> +	if (item) {
I think it is redundant, it is always not NULL.
> +		item->token = (uint64_t)((uintptr_t)obj_token);
> +		item->index = (uint32_t)((uintptr_t)obj_index);
> +	}
> +}
> +
> +/* Max number of tbl8 groups to reclaim at one time. */
> +#define RCU_QSBR_RECLAIM_SIZE	8
> +
> +/* When RCU QSBR FIFO usage is above 1/(2^RCU_QSBR_RECLAIM_LEVEL),
> + * reclaim will be triggered by tbl8_free.
> + */
> +#define RCU_QSBR_RECLAIM_LEVEL	3
> +
> +/* Reclaim some tbl8 groups based on quiescent state check.
> + * RCU_QSBR_RECLAIM_SIZE groups will be reclaimed at max.
> + * Params: lpm   - lpm object handle
> + *         index - (onput) one of successfully reclaimed tbl8 groups
> + * return: 0 - success, 1 - no group reclaimed.
> + */
> +static uint32_t
> +__rte_lpm_rcu_qsbr_reclaim_chunk(struct rte_lpm *lpm, uint32_t *index)
> +{
> +	struct __rte_lpm_qs_item qs_item;
> +	struct rte_lpm_tbl_entry *tbl8_entry = NULL;
It is not necessary to init it with NULL.
> +	void *obj_token;
> +	uint32_t cnt = 0;
> +
> +	RTE_LOG(DEBUG, LPM, "RCU QSBR reclaimation triggered.\n");
> +	/* Check reader threads quiescent state and
> +	 * reclaim as much tbl8 groups as possible.
> +	 */
> +	while ((cnt < RCU_QSBR_RECLAIM_SIZE) &&
> +		(rte_ring_peek(lpm->qs_fifo, &obj_token) == 0) &&
> +		(rte_rcu_qsbr_check(lpm->qsv, (uint64_t)((uintptr_t)obj_token),
> +					false) == 1)) {
> +		__rte_lpm_rcu_qsbr_fifo_pop(lpm->qs_fifo, &qs_item);
> +
> +		tbl8_entry = &lpm->tbl8[qs_item.index *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +		memset(&tbl8_entry[0], 0,
> +				RTE_LPM_TBL8_GROUP_NUM_ENTRIES *
> +				sizeof(tbl8_entry[0]));
> +		cnt++;
> +	}
> +
> +	RTE_LOG(DEBUG, LPM, "RCU QSBR reclaimed %u groups.\n", cnt);
> +	if (cnt) {
> +		if (index)
> +			*index = qs_item.index;
> +		return 0;
> +	}
> +	return 1;
> +}
> +
> +/* Trigger tbl8 group reclaim when necessary.
> + * Reclaim happens when RCU QSBR queue usage
> + * is over 1/(2^RCU_QSBR_RECLAIM_LEVEL).
> + */
> +static void
> +__rte_lpm_rcu_qsbr_try_reclaim(struct rte_lpm *lpm)
> +{
> +	if (lpm->qsv == NULL)
> +		return;
This check is redundant.
> +
> +	if (rte_ring_count(lpm->qs_fifo) <
> +		(rte_ring_get_capacity(lpm->qs_fifo) >> RCU_QSBR_RECLAIM_LEVEL))
> +		return;
> +
> +	(void)__rte_lpm_rcu_qsbr_reclaim_chunk(lpm, NULL);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v)
> +{
> +	uint32_t qs_fifo_size;
> +	char rcu_ring_name[RTE_RING_NAMESIZE];
> +
> +	if ((lpm == NULL) || (v == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->qsv) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	/* round up qs_fifo_size to next power of two that is not less than
> +	 * number_tbl8s. Will store 'token' and 'index'.
> +	 */
> +	qs_fifo_size = rte_align32pow2((2 * lpm->number_tbl8s) + 1);
> +
> +	/* Init QSBR reclaiming FIFO. */
> +	snprintf(rcu_ring_name, sizeof(rcu_ring_name), "LPM_RCU_%s", lpm->name);
> +	lpm->qs_fifo = rte_ring_create(rcu_ring_name, qs_fifo_size,
> +					SOCKET_ID_ANY, 0);
> +	if (lpm->qs_fifo == NULL) {
> +		RTE_LOG(ERR, LPM, "LPM QS FIFO memory allocation failed\n");
> +		rte_errno = ENOMEM;
rte_ring_create() sets rte_errno on error, I don't think we need to 
rewrite it here.
> +		return 1;
> +	}
> +	lpm->qsv = v;
> +
> +	return 0;
> +}
> +
>   /*
>    * Adds a rule to the rule table.
>    *
> @@ -640,6 +789,35 @@ rule_find_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth)
>   	return -EINVAL;
>   }
>   
> +static int32_t
> +tbl8_alloc_reclaimed(struct rte_lpm *lpm)
> +{
> +	struct rte_lpm_tbl_entry *tbl8_entry = NULL;
> +	uint32_t index;
> +
> +	if (lpm->qsv != NULL) {
> +		if (__rte_lpm_rcu_qsbr_reclaim_chunk(lpm, &index) == 0) {
> +			/* Set the last reclaimed tbl8 group as VALID. */
> +			struct rte_lpm_tbl_entry new_tbl8_entry = {
> +				.next_hop = 0,
> +				.valid = INVALID,
> +				.depth = 0,
> +				.valid_group = VALID,
> +			};
> +
> +			tbl8_entry = &lpm->tbl8[index *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +			__atomic_store(tbl8_entry, &new_tbl8_entry,
> +					__ATOMIC_RELAXED);
> +
> +			/* Return group index for reclaimed tbl8 group. */
> +			return index;
> +		}
> +	}
> +
> +	return -ENOSPC;
> +}
> +
>   /*
>    * Find, clean and allocate a tbl8.
>    */
> @@ -679,14 +857,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8)
>   }
>   
>   static int32_t
> -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +tbl8_alloc_v1604(struct rte_lpm *lpm)
>   {
>   	uint32_t group_idx; /* tbl8 group index. */
>   	struct rte_lpm_tbl_entry *tbl8_entry;
>   
>   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>   		/* If a free tbl8 group is found clean it and set as VALID. */
>   		if (!tbl8_entry->valid_group) {
>   			struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -708,8 +887,8 @@ tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>   		}
>   	}
>   
> -	/* If there are no tbl8 groups free then return error. */
> -	return -ENOSPC;
> +	/* If there are no tbl8 groups free then check reclaim queue. */
> +	return tbl8_alloc_reclaimed(lpm);
>   }
>   
>   static void
> @@ -728,13 +907,31 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>   }
>   
>   static void
> -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>   {
> -	/* Set tbl8 group invalid*/
> +	struct __rte_lpm_qs_item qs_item;
>   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
>   
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (lpm->qsv != NULL) {
> +		/* Push into QSBR FIFO. */
> +		qs_item.token = rte_rcu_qsbr_start(lpm->qsv);
> +		qs_item.index =
> +			tbl8_group_start / RTE_LPM_TBL8_GROUP_NUM_ENTRIES;
> +		if (__rte_lpm_rcu_qsbr_fifo_push(lpm->qs_fifo, &qs_item) != 0)
> +			/* This should never happen as FIFO size is big enough
> +			 * to hold all tbl8 groups.
> +			 */
> +			RTE_LOG(ERR, LPM, "Failed to push QSBR FIFO\n");
> +
> +		/* Speculatively reclaim tbl8 groups.
> +		 * Help spread the reclaim work load across multiple calls.
> +		 */
> +		__rte_lpm_rcu_qsbr_try_reclaim(lpm);
> +	} else {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	}
>   }
>   
>   static __rte_noinline int32_t
> @@ -1037,7 +1234,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   
>   	if (!lpm->tbl24[tbl24_index].valid) {
>   		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
>   
>   		/* Check tbl8 allocation was successful. */
>   		if (tbl8_group_index < 0) {
> @@ -1083,7 +1280,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   	} /* If valid entry but not extended calculate the index into Table8. */
>   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>   		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
>   
>   		if (tbl8_group_index < 0) {
>   			return tbl8_group_index;
> @@ -1818,7 +2015,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>   		 */
>   		lpm->tbl24[tbl24_index].valid = 0;
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>   	} else if (tbl8_recycle_index > -1) {
>   		/* Update tbl24 entry. */
>   		struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -1834,7 +2031,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>   		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>   				__ATOMIC_RELAXED);
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>   	}
>   #undef group_idx
>   	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index 906ec4483..5079fb262 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>    */
>   
>   #ifndef _RTE_LPM_H_
> @@ -21,6 +22,7 @@
>   #include <rte_common.h>
>   #include <rte_vect.h>
>   #include <rte_compat.h>
> +#include <rte_rcu_qsbr.h>
>   
>   #ifdef __cplusplus
>   extern "C" {
> @@ -186,6 +188,8 @@ struct rte_lpm {
>   			__rte_cache_aligned; /**< LPM tbl24 table. */
>   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +	struct rte_rcu_qsbr *qsv;	/**< RCU QSBR variable for tbl8 group.*/
> +	struct rte_ring *qs_fifo;	/**< RCU QSBR reclaiming queue. */
>   };
>   
>   /**
> @@ -248,6 +252,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
>   void
>   rte_lpm_free_v1604(struct rte_lpm *lpm);
>   
> +/**
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param v
> + *   RCU QSBR variable
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v);
> +
>   /**
>    * Add a rule to the LPM table.
>    *
> diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
> index 90beac853..b353aabd2 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -44,3 +44,9 @@ DPDK_17.05 {
>   	rte_lpm6_lookup_bulk_func;
>   
>   } DPDK_16.04;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};
> diff --git a/lib/meson.build b/lib/meson.build
> index e5ff83893..3a96f005d 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -11,6 +11,7 @@
>   libraries = [
>   	'kvargs', # eal depends on kvargs
>   	'eal', # everything depends on eal
> +	'rcu', # hash and lpm depends on this
>   	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
>   	'cmdline',
>   	'metrics', # bitrate/latency stats depends on this
> @@ -22,7 +23,7 @@ libraries = [
>   	'gro', 'gso', 'ip_frag', 'jobstats',
>   	'kni', 'latencystats', 'lpm', 'member',
>   	'power', 'pdump', 'rawdev',
> -	'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
> +	'reorder', 'sched', 'security', 'stack', 'vhost',
>   	# ipsec lib depends on net, crypto and security
>   	'ipsec',
>   	# add pkt framework libs which use other libs from above

-- 
Regards,
Vladimir



More information about the dev mailing list