[dpdk-dev] [PATCH v4 4/5] hash: add lock-free read-write concurrency

Honnappa Nagarahalli honnappa.nagarahalli at arm.com
Thu Oct 18 05:38:00 CEST 2018


Add lock-free read-write concurrency. This is achieved by the
following changes.

1) Add memory ordering to avoid race conditions. The only race
condition that can occur is -  using the key store element
before the key write is completed. Hence, while inserting the element
the release memory order is used. Any other race condition is caught
by the key comparison. Memory orderings are added only where needed.
For ex: reads in the writer's context do not need memory ordering
as there is a single writer.

key_idx in the bucket entry and pdata in the key store element are
used for synchronisation. key_idx is used to release an inserted
entry in the bucket to the reader. Use of pdata for synchronisation
is required due to updation of an existing entry where-in only
the pdata is updated without updating key_idx.

2) Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

3) Add the flag to enable reader-writer concurrency during
run time.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Reviewed-by: Gavin Hu <gavin.hu at arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
Reviewed-by: Steve Capper <steve.capper at arm.com>
Reviewed-by: Yipeng Wang <yipeng1.wang at intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c    | 417 +++++++++++++++++++++++++----------
 lib/librte_hash/rte_cuckoo_hash.h    |   5 +
 lib/librte_hash/rte_hash.h           |  47 +++-
 lib/librte_hash/rte_hash_version.map |   7 +
 4 files changed, 358 insertions(+), 118 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index a539531..7b0ce21 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 #include <string.h>
@@ -141,6 +142,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	unsigned int readwrite_concur_support = 0;
 	unsigned int writer_takes_lock = 0;
 	unsigned int no_free_on_del = 0;
+	uint32_t *tbl_chng_cnt = NULL;
+	unsigned int readwrite_concur_lf_support = 0;
 
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -160,6 +163,24 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		return NULL;
 	}
 
+	/* Validate correct usage of extra options */
+	if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) &&
+	    (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)) {
+		rte_errno = EINVAL;
+		RTE_LOG(ERR, HASH, "rte_hash_create: choose rw concurrency or "
+			"rw concurrency lock free\n");
+		return NULL;
+	}
+
+	if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) &&
+	    (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)) {
+		rte_errno = EINVAL;
+		RTE_LOG(ERR, HASH, "rte_hash_create: extendable bucket "
+			"feature not supported with rw concurrency "
+			"lock free\n");
+		return NULL;
+	}
+
 	/* Check extra flags field to check extra options. */
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
 		hw_trans_mem_support = 1;
@@ -180,6 +201,12 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL)
 		no_free_on_del = 1;
 
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
+		readwrite_concur_lf_support = 1;
+		/* Enable not freeing internal memory/index on delete */
+		no_free_on_del = 1;
+	}
+
 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
 	if (use_local_cache)
 		/*
@@ -292,6 +319,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		goto err_unlock;
 	}
 
+	tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+			RTE_CACHE_LINE_SIZE, params->socket_id);
+
+	if (tbl_chng_cnt == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		goto err_unlock;
+	}
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -360,12 +395,15 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		default_hash_func : params->hash_func;
 	h->key_store = k;
 	h->free_slots = r;
+	h->tbl_chng_cnt = tbl_chng_cnt;
+	*h->tbl_chng_cnt = 0;
 	h->hw_trans_mem_support = hw_trans_mem_support;
 	h->use_local_cache = use_local_cache;
 	h->readwrite_concur_support = readwrite_concur_support;
 	h->ext_table_support = ext_table_support;
 	h->writer_takes_lock = writer_takes_lock;
 	h->no_free_on_del = no_free_on_del;
+	h->readwrite_concur_lf_support = readwrite_concur_lf_support;
 
 #if defined(RTE_ARCH_X86)
 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2))
@@ -406,6 +444,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	rte_free(buckets);
 	rte_free(buckets_ext);
 	rte_free(k);
+	rte_free(tbl_chng_cnt);
 	return NULL;
 }
 
@@ -446,6 +485,7 @@ rte_hash_free(struct rte_hash *h)
 	rte_free(h->key_store);
 	rte_free(h->buckets);
 	rte_free(h->buckets_ext);
+	rte_free(h->tbl_chng_cnt);
 	rte_free(h);
 	rte_free(te);
 }
@@ -530,6 +570,7 @@ rte_hash_reset(struct rte_hash *h)
 	__hash_rw_writer_lock(h);
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+	*h->tbl_chng_cnt = 0;
 
 	/* clear the free ring */
 	while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -585,7 +626,9 @@ enqueue_slot_back(const struct rte_hash *h,
 		rte_ring_sp_enqueue(h->free_slots, slot_id);
 }
 
-/* Search a key from bucket and update its data */
+/* Search a key from bucket and update its data.
+ * Writer holds the lock before calling this.
+ */
 static inline int32_t
 search_and_update(const struct rte_hash *h, void *data, const void *key,
 	struct rte_hash_bucket *bkt, uint16_t sig)
@@ -598,8 +641,13 @@ search_and_update(const struct rte_hash *h, void *data, const void *key,
 			k = (struct rte_hash_key *) ((char *)keys +
 					bkt->key_idx[i] * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				/* Update data */
-				k->pdata = data;
+				/* 'pdata' acts as the synchronization point
+				 * when an existing hash entry is updated.
+				 * Key is not updated in this case.
+				 */
+				__atomic_store_n(&k->pdata,
+					data,
+					__ATOMIC_RELEASE);
 				/*
 				 * Return index where key is stored,
 				 * subtracting the first dummy index
@@ -655,7 +703,15 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
 		/* Check if slot is available */
 		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
 			prim_bkt->sig_current[i] = sig;
-			prim_bkt->key_idx[i] = new_idx;
+			/* Key can be of arbitrary length, so it is
+			 * not possible to store it atomically.
+			 * Hence the new key element's memory stores
+			 * (key as well as data) should be complete
+			 * before it is referenced.
+			 */
+			__atomic_store_n(&prim_bkt->key_idx[i],
+					 new_idx,
+					 __ATOMIC_RELEASE);
 			break;
 		}
 	}
@@ -728,27 +784,66 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		if (unlikely(&h->buckets[prev_alt_bkt_idx]
 				!= curr_bkt)) {
 			/* revert it to empty, otherwise duplicated keys */
-			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+				EMPTY_SLOT,
+				__ATOMIC_RELEASE);
 			__hash_rw_writer_unlock(h);
 			return -1;
 		}
 
+		if (h->readwrite_concur_lf_support) {
+			/* Inform the previous move. The current move need
+			 * not be informed now as the current bucket entry
+			 * is present in both primary and secondary.
+			 * Since there is one writer, load acquires on
+			 * tbl_chng_cnt are not required.
+			 */
+			__atomic_store_n(h->tbl_chng_cnt,
+					 *h->tbl_chng_cnt + 1,
+					 __ATOMIC_RELEASE);
+			/* The stores to sig_alt and sig_current should not
+			 * move above the store to tbl_chng_cnt.
+			 */
+			__atomic_thread_fence(__ATOMIC_RELEASE);
+		}
+
 		/* Need to swap current/alt sig to allow later
 		 * Cuckoo insert to move elements back to its
 		 * primary bucket if available
 		 */
 		curr_bkt->sig_current[curr_slot] =
 			prev_bkt->sig_current[prev_slot];
-		curr_bkt->key_idx[curr_slot] =
-			prev_bkt->key_idx[prev_slot];
+		/* Release the updated bucket entry */
+		__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+			prev_bkt->key_idx[prev_slot],
+			__ATOMIC_RELEASE);
 
 		curr_slot = prev_slot;
 		curr_node = prev_node;
 		curr_bkt = curr_node->bkt;
 	}
 
+	if (h->readwrite_concur_lf_support) {
+		/* Inform the previous move. The current move need
+		 * not be informed now as the current bucket entry
+		 * is present in both primary and secondary.
+		 * Since there is one writer, load acquires on
+		 * tbl_chng_cnt are not required.
+		 */
+		__atomic_store_n(h->tbl_chng_cnt,
+				 *h->tbl_chng_cnt + 1,
+				 __ATOMIC_RELEASE);
+		/* The stores to sig_alt and sig_current should not
+		 * move above the store to tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_RELEASE);
+	}
+
 	curr_bkt->sig_current[curr_slot] = sig;
-	curr_bkt->key_idx[curr_slot] = new_idx;
+	/* Release the new bucket entry */
+	__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+			 new_idx,
+			 __ATOMIC_RELEASE);
 
 	__hash_rw_writer_unlock(h);
 
@@ -889,8 +984,15 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	new_idx = (uint32_t)((uintptr_t) slot_id);
 	/* Copy key */
 	rte_memcpy(new_k->key, key, h->key_len);
-	new_k->pdata = data;
-
+	/* Key can be of arbitrary length, so it is not possible to store
+	 * it atomically. Hence the new key element's memory stores
+	 * (key as well as data) should be complete before it is referenced.
+	 * 'pdata' acts as the synchronization point when an existing hash
+	 * entry is updated.
+	 */
+	__atomic_store_n(&new_k->pdata,
+		data,
+		__ATOMIC_RELEASE);
 
 	/* Find an empty slot and insert */
 	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
@@ -1034,21 +1136,27 @@ search_one_bucket(const struct rte_hash *h, const void *key, uint16_t sig,
 			void **data, const struct rte_hash_bucket *bkt)
 {
 	int i;
+	uint32_t key_idx;
+	void *pdata;
 	struct rte_hash_key *k, *keys = h->key_store;
 
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == sig &&
-				bkt->key_idx[i] != EMPTY_SLOT) {
+		key_idx = __atomic_load_n(&bkt->key_idx[i],
+					  __ATOMIC_ACQUIRE);
+		if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
 			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
+					key_idx * h->key_entry_size);
+			pdata = __atomic_load_n(&k->pdata,
+					__ATOMIC_ACQUIRE);
+
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				if (data != NULL)
-					*data = k->pdata;
+					*data = pdata;
 				/*
 				 * Return index where key is stored,
 				 * subtracting the first dummy index
 				 */
-				return bkt->key_idx[i] - 1;
+				return key_idx - 1;
 			}
 		}
 	}
@@ -1061,34 +1169,62 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 {
 	uint32_t prim_bucket_idx, sec_bucket_idx;
 	struct rte_hash_bucket *bkt, *cur_bkt;
+	uint32_t cnt_b, cnt_a;
 	int ret;
 	uint16_t short_sig;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
-	bkt = &h->buckets[prim_bucket_idx];
 
 	__hash_rw_reader_lock(h);
 
-	/* Check if key is in primary location */
-	ret = search_one_bucket(h, key, short_sig, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
-	/* Calculate secondary hash */
-	bkt = &h->buckets[sec_bucket_idx];
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in search_one_bucket are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+				__ATOMIC_ACQUIRE);
 
-	/* Check if key is in secondary location */
-	FOR_EACH_BUCKET(cur_bkt, bkt) {
-		ret = search_one_bucket(h, key, short_sig, data, cur_bkt);
+		/* Check if key is in primary location */
+		bkt = &h->buckets[prim_bucket_idx];
+		ret = search_one_bucket(h, key, short_sig, data, bkt);
 		if (ret != -1) {
 			__hash_rw_reader_unlock(h);
 			return ret;
 		}
-	}
+		/* Calculate secondary hash */
+		bkt = &h->buckets[sec_bucket_idx];
+
+		/* Check if key is in secondary location */
+		FOR_EACH_BUCKET(cur_bkt, bkt) {
+			ret = search_one_bucket(h, key, short_sig,
+						data, cur_bkt);
+			if (ret != -1) {
+				__hash_rw_reader_unlock(h);
+				return ret;
+			}
+		}
+
+		/* The loads of sig_current in search_one_bucket
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, key index in primary bucket
+		 * and key index in secondary bucket will make sure
+		 * that it does not get hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
+
 	__hash_rw_reader_unlock(h);
+
 	return -ENOENT;
 }
 
@@ -1173,21 +1309,25 @@ __rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) {
 	}
 }
 
-/* Search one bucket and remove the matched key */
+/* Search one bucket and remove the matched key.
+ * Writer is expected to hold the lock while calling this
+ * function.
+ */
 static inline int32_t
 search_and_remove(const struct rte_hash *h, const void *key,
 			struct rte_hash_bucket *bkt, uint16_t sig, int *pos)
 {
 	struct rte_hash_key *k, *keys = h->key_store;
 	unsigned int i;
-	int32_t ret;
+	uint32_t key_idx;
 
 	/* Check if key is in bucket */
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == sig &&
-				bkt->key_idx[i] != EMPTY_SLOT) {
+		key_idx = __atomic_load_n(&bkt->key_idx[i],
+					  __ATOMIC_ACQUIRE);
+		if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
 			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
+					key_idx * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				bkt->sig_current[i] = NULL_SIGNATURE;
 				/* Free the key store index if
@@ -1196,13 +1336,16 @@ search_and_remove(const struct rte_hash *h, const void *key,
 				if (!h->no_free_on_del)
 					remove_entry(h, bkt, i);
 
-				/* Return index where key is stored,
+				__atomic_store_n(&bkt->key_idx[i],
+						 EMPTY_SLOT,
+						 __ATOMIC_RELEASE);
+
+				*pos = i;
+				/*
+				 * Return index where key is stored,
 				 * subtracting the first dummy index
 				 */
-				ret = bkt->key_idx[i] - 1;
-				bkt->key_idx[i] = EMPTY_SLOT;
-				*pos = i;
-				return ret;
+				return key_idx - 1;
 			}
 		}
 	}
@@ -1402,6 +1545,8 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	struct rte_hash_bucket *cur_bkt, *next_bkt;
+	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+	uint32_t cnt_b, cnt_a;
 
 	/* Prefetch first keys */
 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1443,91 +1588,138 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	}
 
 	__hash_rw_reader_lock(h);
-	/* Compare signatures and prefetch key slot of first hit */
-	for (i = 0; i < num_keys; i++) {
-		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in compare_signatures are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+
+		/* Compare signatures and prefetch key slot of first hit */
+		for (i = 0; i < num_keys; i++) {
+			compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
 				primary_bkt[i], secondary_bkt[i],
 				sig[i], h->sig_cmp_fn);
 
-		if (prim_hitmask[i]) {
-			uint32_t first_hit =
-					__builtin_ctzl(prim_hitmask[i]) >> 1;
-			uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
-			continue;
-		}
-
-		if (sec_hitmask[i]) {
-			uint32_t first_hit =
-					__builtin_ctzl(sec_hitmask[i]) >> 1;
-			uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
-		}
-	}
-
-	/* Compare keys, first hits in primary first */
-	for (i = 0; i < num_keys; i++) {
-		positions[i] = -ENOENT;
-		while (prim_hitmask[i]) {
-			uint32_t hit_index =
-					__builtin_ctzl(prim_hitmask[i]) >> 1;
-
-			uint32_t key_idx = primary_bkt[i]->key_idx[hit_index];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = key_slot->pdata;
+			if (prim_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(prim_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+					primary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+				continue;
+			}
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+			if (sec_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(sec_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+					secondary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
 			}
-			prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
 		}
 
-		while (sec_hitmask[i]) {
-			uint32_t hit_index =
-					__builtin_ctzl(sec_hitmask[i]) >> 1;
-
-			uint32_t key_idx = secondary_bkt[i]->key_idx[hit_index];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
+		/* Compare keys, first hits in primary first */
+		for (i = 0; i < num_keys; i++) {
+			positions[i] = -ENOENT;
+			while (prim_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(prim_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+				__atomic_load_n(
+					&primary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
+			}
 
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = key_slot->pdata;
+			while (sec_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(sec_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+				__atomic_load_n(
+					&secondary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
 			}
-			sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
+next_key:
+			continue;
 		}
 
-next_key:
-		continue;
-	}
+		/* The loads of sig_current in compare_signatures
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, primary key index and secondary
+		 * key index will make sure that it does not get
+		 * hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
 	/* all found, do not need to go through ext bkt */
 	if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
@@ -1612,7 +1804,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	idx = *next % RTE_HASH_BUCKET_ENTRIES;
 
 	/* If current position is empty, go to the next one */
-	while ((position = h->buckets[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
+	while ((position = __atomic_load_n(&h->buckets[bucket_idx].key_idx[idx],
+					__ATOMIC_ACQUIRE)) == EMPTY_SLOT) {
 		(*next)++;
 		/* End of table */
 		if (*next == total_entries)
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 601b2ce..5dfbbc4 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -174,6 +175,8 @@ struct rte_hash {
 	 * free the key index associated with the deleted entry.
 	 * This flag is enabled by default.
 	 */
+	uint8_t readwrite_concur_lf_support;
+	/**< If read-write concurrency lock free support is enabled */
 	uint8_t writer_takes_lock;
 	/**< Indicates if the writer threads need to take lock */
 	rte_hash_function hash_func;    /**< Function used to calculate hash. */
@@ -196,6 +199,8 @@ struct rte_hash {
 	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
 	struct rte_hash_bucket *buckets_ext; /**< Extra buckets array */
 	struct rte_ring *free_ext_bkts; /**< Ring of indexes of free buckets */
+	uint32_t *tbl_chng_cnt;
+	/**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index dfa542b..c93d1a1 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -44,9 +44,18 @@ extern "C" {
 
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
+ * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
+ * is enabled.
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
+/** Flag to support lock free reader writer concurrency. Both single writer
+ * and multi writer use cases are supported.
+ * Currently, extendable bucket table feature is not supported with
+ * this feature.
+ */
+#define RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF 0x20
+
 /**
  * The type of hash value of a key.
  * It should be a value of at least 32bit with fully random pattern.
@@ -132,7 +141,12 @@ void
 rte_hash_free(struct rte_hash *h);
 
 /**
- * Reset all hash structure, by zeroing all entries
+ * Reset all hash structure, by zeroing all entries.
+ * When RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * it is application's responsibility to make sure that
+ * none of the readers are referencing the hash table
+ * while calling this API.
+ *
  * @param h
  *   Hash table to reset
  */
@@ -156,6 +170,11 @@ rte_hash_count(const struct rte_hash *h);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
+ * If the key exists already in the table, this API updates its value
+ * with 'data' passed in this API. It is the responsibility of
+ * the application to manage any memory associated with the old value.
+ * The readers might still be using the old value even after this API
+ * has returned.
  *
  * @param h
  *   Hash table to add the key to.
@@ -178,6 +197,11 @@ rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
+ * If the key exists already in the table, this API updates its value
+ * with 'data' passed in this API. It is the responsibility of
+ * the application to manage any memory associated with the old value.
+ * The readers might still be using the old value even after this API
+ * has returned.
  *
  * @param h
  *   Hash table to add the key to.
@@ -243,10 +267,14 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
+ * rte_hash_free_key_with_position API should be called after all
+ * the readers have stopped referencing the entry corresponding to
+ * this key. RCU mechanisms could be used to determine such a state.
  *
  * @param h
  *   Hash table to remove the key from.
@@ -268,10 +296,14 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
+ * rte_hash_free_key_with_position API should be called after all
+ * the readers have stopped referencing the entry corresponding to
+ * this key. RCU mechanisms could be used to determine such a state.
  *
  * @param h
  *   Hash table to remove the key from.
@@ -318,9 +350,12 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * of the key. This operation is not multi-thread safe and should
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
- * this API must be called, with the key index returned by rte_hash_add_key_xxx
- * APIs, after the key is deleted using rte_hash_del_key_xxx APIs.
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * the key index returned by rte_hash_del_key_xxx APIs must be freed
+ * using this API. This API should be called after all the readers
+ * have stopped referencing the entry corresponding to this key.
+ * RCU mechanisms could be used to determine such a state.
  * This API does not validate if the key is already freed.
  *
  * @param h
diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
index e216ac8..734ae28 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -53,3 +53,10 @@ DPDK_18.08 {
 	rte_hash_count;
 
 } DPDK_16.07;
+
+EXPERIMENTAL {
+	global:
+
+	rte_hash_free_key_with_position;
+
+};
-- 
2.7.4



More information about the dev mailing list