[dpdk-dev] [PATCH v4 1/3] lib/hash: integrate RCU QSBR

Dharmik Thakkar dharmik.thakkar at arm.com
Mon Oct 19 18:35:17 CEST 2020


Currently, users have to use external RCU mechanisms to free resources
when using lock free hash algorithm.

Integrate RCU QSBR process to make it easier for the applications to use 
lock free algorithm.
Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.

Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Signed-off-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
Acked-by: Ray Kinsella <mdr at ashroe.eu>
---
 doc/guides/prog_guide/hash_lib.rst   |  11 +-
 lib/librte_hash/meson.build          |   1 +
 lib/librte_hash/rte_cuckoo_hash.c    | 302 +++++++++++++++++++++------
 lib/librte_hash/rte_cuckoo_hash.h    |   8 +
 lib/librte_hash/rte_hash.h           |  77 ++++++-
 lib/librte_hash/rte_hash_version.map |   2 +-
 6 files changed, 325 insertions(+), 76 deletions(-)

diff --git a/doc/guides/prog_guide/hash_lib.rst b/doc/guides/prog_guide/hash_lib.rst
index d06c7de2ead1..63e183ed1f08 100644
--- a/doc/guides/prog_guide/hash_lib.rst
+++ b/doc/guides/prog_guide/hash_lib.rst
@@ -102,6 +102,9 @@ For concurrent writes, and concurrent reads and writes the following flag values
 *  If the 'do not free on delete' (RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL) flag is set, the position of the entry in the hash table is not freed upon calling delete(). This flag is enabled
    by default when the lock free read/write concurrency flag is set. The application should free the position after all the readers have stopped referencing the position.
    Where required, the application can make use of RCU mechanisms to determine when the readers have stopped referencing the position.
+   RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature.
+   Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>` for more details.
+
 
 Extendable Bucket Functionality support
 ----------------------------------------
@@ -109,8 +112,8 @@ An extra flag is used to enable this functionality (flag is not set by default).
 in the very unlikely case due to excessive hash collisions that a key has failed to be inserted, the hash table bucket is extended with a linked
 list to insert these failed keys. This feature is important for the workloads (e.g. telco workloads) that need to insert up to 100% of the
 hash table size and can't tolerate any key insertion failure (even if very few).
-Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API in order to free the empty buckets and
-deleted keys, to maintain the 100% capacity guarantee.
+Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API or configure integrated RCU QSBR
+(or use external RCU mechanisms) in order to free the empty buckets and deleted keys, to maintain the 100% capacity guarantee.
 
 Implementation Details (non Extendable Bucket Case)
 ---------------------------------------------------
@@ -172,7 +175,7 @@ Example of deletion:
 Similar to lookup, the key is searched in its primary and secondary buckets. If the key is found, the
 entry is marked as empty. If the hash table was configured with 'no free on delete' or 'lock free read/write concurrency',
 the position of the key is not freed. It is the responsibility of the user to free the position after
-readers are not referencing the position anymore.
+readers are not referencing the position anymore. User can configure integrated RCU QSBR or use external RCU mechanisms to safely free the position on delete
 
 
 Implementation Details (with Extendable Bucket)
@@ -286,6 +289,8 @@ The flow table operations on the application side are described below:
 *   Free flow: Free flow key position. If 'no free on delete' or 'lock-free read/write concurrency' flags are set,
     wait till the readers are not referencing the position returned during add/delete flow and then free the position.
     RCU mechanisms can be used to find out when the readers are not referencing the position anymore.
+    RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature.
+    Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>` for more details.
 
 *   Lookup flow: Lookup for the flow key in the hash.
     If the returned position is valid (flow lookup hit), use the returned position to access the flow entry in the flow table.
diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index 6ab46ae9d768..0977a63fd279 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h',
 
 sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
 deps += ['ring']
+deps += ['rcu']
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index aad0c965be5e..b9e4d82a0c14 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_hash_tailq)
 
+struct __rte_hash_rcu_dq_entry {
+	uint32_t key_idx;
+	uint32_t ext_bkt_idx; /**< Extended bkt index */
+};
+
 struct rte_hash *
 rte_hash_find_existing(const char *name)
 {
@@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
 		readwrite_concur_lf_support = 1;
-		/* Enable not freeing internal memory/index on delete */
+		/* Enable not freeing internal memory/index on delete.
+		 * If internal RCU is enabled, freeing of internal memory/index
+		 * is done on delete
+		 */
 		no_free_on_del = 1;
 	}
 
@@ -505,6 +513,10 @@ rte_hash_free(struct rte_hash *h)
 
 	rte_mcfg_tailq_write_unlock();
 
+	/* RCU clean up. */
+	if (h->dq)
+		rte_rcu_qsbr_dq_delete(h->dq);
+
 	if (h->use_local_cache)
 		rte_free(h->local_free_slots);
 	if (h->writer_takes_lock)
@@ -607,11 +619,21 @@ void
 rte_hash_reset(struct rte_hash *h)
 {
 	uint32_t tot_ring_cnt, i;
+	unsigned int pending;
 
 	if (h == NULL)
 		return;
 
 	__hash_rw_writer_lock(h);
+
+	/* RCU QSBR clean up. */
+	if (h->dq) {
+		/* Reclaim all the resources */
+		rte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL);
+		if (pending != 0)
+			RTE_LOG(ERR, HASH, "RCU reclaim all resources failed\n");
+	}
+
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 	*h->tbl_chng_cnt = 0;
@@ -952,6 +974,37 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 	return -ENOSPC;
 }
 
+static inline uint32_t
+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
+{
+	unsigned int  n_slots;
+	uint32_t slot_id;
+	if (h->use_local_cache) {
+		/* Try to get a free slot from the local cache */
+		if (cached_free_slots->len == 0) {
+			/* Need to get another burst of free slots from global ring */
+			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
+					cached_free_slots->objs,
+					sizeof(uint32_t),
+					LCORE_CACHE_SIZE, NULL);
+			if (n_slots == 0)
+				return EMPTY_SLOT;
+
+			cached_free_slots->len += n_slots;
+		}
+
+		/* Get a free slot from the local cache */
+		cached_free_slots->len--;
+		slot_id = cached_free_slots->objs[cached_free_slots->len];
+	} else {
+		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+						sizeof(uint32_t)) != 0)
+			return EMPTY_SLOT;
+	}
+
+	return slot_id;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
@@ -963,7 +1016,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t ext_bkt_id = 0;
 	uint32_t slot_id;
 	int ret;
-	unsigned n_slots;
 	unsigned lcore_id;
 	unsigned int i;
 	struct lcore_cache *cached_free_slots = NULL;
@@ -1001,28 +1053,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Try to get a free slot from the local cache */
-		if (cached_free_slots->len == 0) {
-			/* Need to get another burst of free slots from global ring */
-			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
-					cached_free_slots->objs,
-					sizeof(uint32_t),
-					LCORE_CACHE_SIZE, NULL);
-			if (n_slots == 0) {
-				return -ENOSPC;
-			}
-
-			cached_free_slots->len += n_slots;
+	}
+	slot_id = alloc_slot(h, cached_free_slots);
+	if (slot_id == EMPTY_SLOT) {
+		if (h->dq) {
+			__hash_rw_writer_lock(h);
+			ret = rte_rcu_qsbr_dq_reclaim(h->dq,
+					h->hash_rcu_cfg->max_reclaim_size,
+					NULL, NULL, NULL);
+			__hash_rw_writer_unlock(h);
+			if (ret == 0)
+				slot_id = alloc_slot(h, cached_free_slots);
 		}
-
-		/* Get a free slot from the local cache */
-		cached_free_slots->len--;
-		slot_id = cached_free_slots->objs[cached_free_slots->len];
-	} else {
-		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
-						sizeof(uint32_t)) != 0) {
+		if (slot_id == EMPTY_SLOT)
 			return -ENOSPC;
-		}
 	}
 
 	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
@@ -1118,8 +1162,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
 						sizeof(uint32_t)) != 0 ||
 					ext_bkt_id == 0) {
-		ret = -ENOSPC;
-		goto failure;
+		if (h->dq) {
+			if (rte_rcu_qsbr_dq_reclaim(h->dq,
+					h->hash_rcu_cfg->max_reclaim_size,
+					NULL, NULL, NULL) == 0) {
+				rte_ring_sc_dequeue_elem(h->free_ext_bkts,
+							 &ext_bkt_id,
+							 sizeof(uint32_t));
+			}
+		}
+		if (ext_bkt_id == 0) {
+			ret = -ENOSPC;
+			goto failure;
+		}
 	}
 
 	/* Use the first location of the new bucket */
@@ -1395,12 +1450,12 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)
 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
 }
 
-static inline void
-remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+static int
+free_slot(const struct rte_hash *h, uint32_t slot_id)
 {
 	unsigned lcore_id, n_slots;
-	struct lcore_cache *cached_free_slots;
-
+	struct lcore_cache *cached_free_slots = NULL;
+	/* Return key indexes to free slot ring */
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
@@ -1411,18 +1466,127 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 						cached_free_slots->objs,
 						sizeof(uint32_t),
 						LCORE_CACHE_SIZE, NULL);
-			ERR_IF_TRUE((n_slots == 0),
-				"%s: could not enqueue free slots in global ring\n",
-				__func__);
+			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
 			cached_free_slots->len -= n_slots;
 		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] =
-							bkt->key_idx[i];
-		cached_free_slots->len++;
+	}
+
+	enqueue_slot_back(h, cached_free_slots, slot_id);
+	return 0;
+}
+
+static void
+__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
+{
+	void *key_data = NULL;
+	int ret;
+	struct rte_hash_key *keys, *k;
+	struct rte_hash *h = (struct rte_hash *)p;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry =
+			*((struct __rte_hash_rcu_dq_entry *)e);
+
+	RTE_SET_USED(n);
+	keys = h->key_store;
+
+	k = (struct rte_hash_key *) ((char *)keys +
+				rcu_dq_entry.key_idx * h->key_entry_size);
+	key_data = k->pdata;
+	if (h->hash_rcu_cfg->free_key_data_func)
+		h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
+						    key_data);
+
+	if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
+		/* Recycle empty ext bkt to free list. */
+		rte_ring_sp_enqueue_elem(h->free_ext_bkts,
+			&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
+
+	/* Return key indexes to free slot ring */
+	ret = free_slot(h, rcu_dq_entry.key_idx);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
+	}
+}
+
+int
+rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg)
+{
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
+
+	const uint32_t total_entries = h->use_local_cache ?
+		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
+							: h->entries + 1;
+
+	if ((h == NULL) || cfg == NULL || cfg->v == NULL) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (h->hash_rcu_cfg) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
+	if (hash_rcu_cfg == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		return 1;
+	}
+
+	if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+					"HASH_RCU_%s", h->name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = total_entries;
+		params.trigger_reclaim_limit = cfg->trigger_reclaim_limit;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
+		params.free_fn = __hash_rcu_qsbr_free_resource;
+		params.p = h;
+		params.v = cfg->v;
+		h->dq = rte_rcu_qsbr_dq_create(&params);
+		if (h->dq == NULL) {
+			rte_free(hash_rcu_cfg);
+			RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n");
+			return 1;
+		}
 	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots,
-				&bkt->key_idx[i], sizeof(uint32_t));
+		rte_free(hash_rcu_cfg);
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	hash_rcu_cfg->v = cfg->v;
+	hash_rcu_cfg->mode = cfg->mode;
+	hash_rcu_cfg->dq_size = params.size;
+	hash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit;
+	hash_rcu_cfg->max_reclaim_size = params.max_reclaim_size;
+	hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
+	hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
+
+	h->hash_rcu_cfg = hash_rcu_cfg;
+
+	return 0;
+}
+
+static inline void
+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+{
+	int ret = free_slot(h, bkt->key_idx[i]);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
 	}
 }
 
@@ -1521,6 +1685,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	int pos;
 	int32_t ret, i;
 	uint16_t short_sig;
+	uint32_t index = EMPTY_SLOT;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
@@ -1555,10 +1721,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 /* Search last bucket to see if empty to be recycled */
 return_bkt:
-	if (!last_bkt) {
-		__hash_rw_writer_unlock(h);
-		return ret;
-	}
+	if (!last_bkt)
+		goto return_key;
+
 	while (last_bkt->next) {
 		prev_bkt = last_bkt;
 		last_bkt = last_bkt->next;
@@ -1571,11 +1736,11 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	/* found empty bucket and recycle */
 	if (i == RTE_HASH_BUCKET_ENTRIES) {
 		prev_bkt->next = NULL;
-		uint32_t index = last_bkt - h->buckets_ext + 1;
+		index = last_bkt - h->buckets_ext + 1;
 		/* Recycle the empty bkt if
 		 * no_free_on_del is disabled.
 		 */
-		if (h->no_free_on_del)
+		if (h->no_free_on_del) {
 			/* Store index of an empty ext bkt to be recycled
 			 * on calling rte_hash_del_xxx APIs.
 			 * When lock free read-write concurrency is enabled,
@@ -1583,12 +1748,34 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 			 * immediately (as readers might be using it still).
 			 * Hence freeing of the ext bkt is piggy-backed to
 			 * freeing of the key index.
+			 * If using external RCU, store this index in an array.
 			 */
-			h->ext_bkt_to_free[ret] = index;
-		else
+			if (h->hash_rcu_cfg == NULL)
+				h->ext_bkt_to_free[ret] = index;
+		} else
 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
 							sizeof(uint32_t));
 	}
+
+return_key:
+	/* Using internal RCU QSBR */
+	if (h->hash_rcu_cfg) {
+		/* Key index where key is stored, adding the first dummy index */
+		rcu_dq_entry.key_idx = ret + 1;
+		rcu_dq_entry.ext_bkt_idx = index;
+		if (h->dq == NULL) {
+			/* Wait for quiescent state change if using
+			 * RTE_HASH_QSBR_MODE_SYNC
+			 */
+			rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
+						 RTE_QSBR_THRID_INVALID);
+			__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
+						      &rcu_dq_entry, 1);
+		} else if (h->dq)
+			/* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */
+			if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
+				RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+	}
 	__hash_rw_writer_unlock(h);
 	return ret;
 }
@@ -1637,8 +1824,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 
 	RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
 
-	unsigned int lcore_id, n_slots;
-	struct lcore_cache *cached_free_slots;
 	const uint32_t total_entries = h->use_local_cache ?
 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
 							: h->entries + 1;
@@ -1656,28 +1841,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 		}
 	}
 
-	if (h->use_local_cache) {
-		lcore_id = rte_lcore_id();
-		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Cache full, need to free it. */
-		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
-			/* Need to enqueue the free slots in global ring. */
-			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
-						cached_free_slots->objs,
-						sizeof(uint32_t),
-						LCORE_CACHE_SIZE, NULL);
-			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
-			cached_free_slots->len -= n_slots;
-		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] = key_idx;
-		cached_free_slots->len++;
-	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
-						sizeof(uint32_t));
-	}
+	/* Enqueue slot to cache/ring of free slots. */
+	return free_slot(h, key_idx);
 
-	return 0;
 }
 
 static inline void
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 345de6bf9cfd..85be49d3bbe7 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -168,6 +168,11 @@ struct rte_hash {
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
 
+	/* RCU config */
+	struct rte_hash_rcu_config *hash_rcu_cfg;
+	/**< HASH RCU QSBR configuration structure */
+	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue. */
+
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
@@ -230,4 +235,7 @@ struct queue_node {
 	int prev_slot;               /* Parent(slot) in search path */
 };
 
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_HASH_RCU_DQ_RECLAIM_MAX	16
+
 #endif
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index bff40251bc98..3d28f177f14a 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -15,6 +15,7 @@
 #include <stddef.h>
 
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,7 +46,8 @@ extern "C" {
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
  * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
+ * is enabled. However, if internal RCU is enabled, freeing of internal
+ * memory/index is done on delete
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
@@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,
 /** Type of function used to compare the hash key. */
 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
 
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow application to free key-data once
+ * the key is returned to the the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *p, void *key_data);
+
 /**
  * Parameters used when creating the hash table.
  */
@@ -81,6 +90,39 @@ struct rte_hash_parameters {
 	uint8_t extra_flag;		/**< Indicate if additional parameters are present. */
 };
 
+/** RCU reclamation modes */
+enum rte_hash_qsbr_mode {
+	/** Create defer queue for reclaim. */
+	RTE_HASH_QSBR_MODE_DQ = 0,
+	/** Use blocking mode reclaim. No defer queue created. */
+	RTE_HASH_QSBR_MODE_SYNC
+};
+
+/** HASH RCU QSBR configuration structure. */
+struct rte_hash_rcu_config {
+	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
+	enum rte_hash_qsbr_mode mode;
+	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	uint32_t dq_size;
+	/**< RCU defer queue size.
+	 * default: total hash table entries.
+	 */
+	uint32_t trigger_reclaim_limit;	/**< Threshold to trigger auto reclaim. */
+	uint32_t max_reclaim_size;
+	/**< Max entries to reclaim in one go.
+	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
+	 */
+	void *key_data_ptr;
+	/**< Pointer passed to the free function. Typically, this is the
+	 * pointer to the data structure to which the resource to free
+	 * (key-data) belongs. This can be NULL.
+	 */
+	rte_hash_free_key_data free_key_data_func;
+	/**< Function to call to free the resource (key-data). */
+};
+
 /** @internal A hash table structure. */
 struct rte_hash;
 
@@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_del_key_xxx APIs must be freed
  * using this API. This API should be called after all the readers
  * have stopped referencing the entry corresponding to this key.
@@ -625,6 +670,30 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
  */
 int32_t
 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with an Hash object.
+ * This API should be called to enable the integrated RCU QSBR support and
+ * should be called immediately after creating the Hash object.
+ *
+ * @param h
+ *   the hash object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg);
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
index c0db81014ff9..c6d73080f478 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -36,5 +36,5 @@ EXPERIMENTAL {
 	rte_hash_lookup_with_hash_bulk;
 	rte_hash_lookup_with_hash_bulk_data;
 	rte_hash_max_key_id;
-
+	rte_hash_rcu_qsbr_add;
 };
-- 
2.17.1



More information about the dev mailing list