[dpdk-dev] [PATCH v3 5/7] hash: fix rw concurrency while moving keys

Honnappa Nagarahalli honnappa.nagarahalli at arm.com
Fri Oct 12 08:31:56 CEST 2018


Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Reviewed-by: Gavin Hu <gavin.hu at arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
Reviewed-by: Steve Capper <steve.capper at arm.com>
Reviewed-by: Yipeng Wang <yipeng1.wang at intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 305 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   3 +
 2 files changed, 208 insertions(+), 100 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index e55725a..262162c 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -142,6 +142,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	unsigned int readwrite_concur_support = 0;
 	unsigned int writer_takes_lock = 0;
 	unsigned int recycle_on_del = 1;
+	uint32_t *tbl_chng_cnt = NULL;
 
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -293,6 +294,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		goto err_unlock;
 	}
 
+	tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+			RTE_CACHE_LINE_SIZE, params->socket_id);
+
+	if (tbl_chng_cnt == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		goto err_unlock;
+	}
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -361,6 +370,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		default_hash_func : params->hash_func;
 	h->key_store = k;
 	h->free_slots = r;
+	h->tbl_chng_cnt = tbl_chng_cnt;
+	*h->tbl_chng_cnt = 0;
 	h->hw_trans_mem_support = hw_trans_mem_support;
 	h->multi_writer_support = multi_writer_support;
 	h->readwrite_concur_support = readwrite_concur_support;
@@ -407,6 +418,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	rte_free(buckets);
 	rte_free(buckets_ext);
 	rte_free(k);
+	rte_free(tbl_chng_cnt);
 	return NULL;
 }
 
@@ -447,6 +459,7 @@ rte_hash_free(struct rte_hash *h)
 	rte_free(h->key_store);
 	rte_free(h->buckets);
 	rte_free(h->buckets_ext);
+	rte_free(h->tbl_chng_cnt);
 	rte_free(h);
 	rte_free(te);
 }
@@ -531,6 +544,7 @@ rte_hash_reset(struct rte_hash *h)
 	__hash_rw_writer_lock(h);
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+	*h->tbl_chng_cnt = 0;
 
 	/* clear the free ring */
 	while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -744,11 +758,27 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		if (unlikely(&h->buckets[prev_alt_bkt_idx]
 				!= curr_bkt)) {
 			/* revert it to empty, otherwise duplicated keys */
-			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+				EMPTY_SLOT,
+				__ATOMIC_RELEASE);
 			__hash_rw_writer_unlock(h);
 			return -1;
 		}
 
+		/* Inform the previous move. The current move need
+		 * not be informed now as the current bucket entry
+		 * is present in both primary and secondary.
+		 * Since there is one writer, load acquires on
+		 * tbl_chng_cnt are not required.
+		 */
+		__atomic_store_n(h->tbl_chng_cnt,
+				 *h->tbl_chng_cnt + 1,
+				 __ATOMIC_RELEASE);
+		/* The stores to sig_alt and sig_current should not
+		 * move above the store to tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_RELEASE);
+
 		/* Need to swap current/alt sig to allow later
 		 * Cuckoo insert to move elements back to its
 		 * primary bucket if available
@@ -765,6 +795,20 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		curr_bkt = curr_node->bkt;
 	}
 
+	/* Inform the previous move. The current move need
+	 * not be informed now as the current bucket entry
+	 * is present in both primary and secondary.
+	 * Since there is one writer, load acquires on
+	 * tbl_chng_cnt are not required.
+	 */
+	__atomic_store_n(h->tbl_chng_cnt,
+			 *h->tbl_chng_cnt + 1,
+			 __ATOMIC_RELEASE);
+	/* The stores to sig_alt and sig_current should not
+	 * move above the store to tbl_chng_cnt.
+	 */
+	__atomic_thread_fence(__ATOMIC_RELEASE);
+
 	curr_bkt->sig_current[curr_slot] = sig;
 	/* Release the new bucket entry */
 	__atomic_store_n(&curr_bkt->key_idx[curr_slot],
@@ -1095,34 +1139,62 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 {
 	uint32_t prim_bucket_idx, sec_bucket_idx;
 	struct rte_hash_bucket *bkt, *cur_bkt;
+	uint32_t cnt_b, cnt_a;
 	int ret;
 	uint16_t short_sig;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
-	bkt = &h->buckets[prim_bucket_idx];
 
 	__hash_rw_reader_lock(h);
 
-	/* Check if key is in primary location */
-	ret = search_one_bucket(h, key, short_sig, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
-	/* Calculate secondary hash */
-	bkt = &h->buckets[sec_bucket_idx];
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in search_one_bucket are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+				__ATOMIC_ACQUIRE);
 
-	/* Check if key is in secondary location */
-	FOR_EACH_BUCKET(cur_bkt, bkt) {
-		ret = search_one_bucket(h, key, short_sig, data, cur_bkt);
+		/* Check if key is in primary location */
+		bkt = &h->buckets[prim_bucket_idx];
+		ret = search_one_bucket(h, key, short_sig, data, bkt);
 		if (ret != -1) {
 			__hash_rw_reader_unlock(h);
 			return ret;
 		}
-	}
+		/* Calculate secondary hash */
+		bkt = &h->buckets[sec_bucket_idx];
+
+		/* Check if key is in secondary location */
+		FOR_EACH_BUCKET(cur_bkt, bkt) {
+			ret = search_one_bucket(h, key, short_sig,
+						data, cur_bkt);
+			if (ret != -1) {
+				__hash_rw_reader_unlock(h);
+				return ret;
+			}
+		}
+
+		/* The loads of sig_current in search_one_bucket
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, key index in primary bucket
+		 * and key index in secondary bucket will make sure
+		 * that it does not get hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
+
 	__hash_rw_reader_unlock(h);
+
 	return -ENOENT;
 }
 
@@ -1444,6 +1516,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	struct rte_hash_bucket *cur_bkt, *next_bkt;
 	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+	uint32_t cnt_b, cnt_a;
 
 	/* Prefetch first keys */
 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1485,106 +1558,138 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	}
 
 	__hash_rw_reader_lock(h);
-	/* Compare signatures and prefetch key slot of first hit */
-	for (i = 0; i < num_keys; i++) {
-		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in compare_signatures are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+
+		/* Compare signatures and prefetch key slot of first hit */
+		for (i = 0; i < num_keys; i++) {
+			compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
 				primary_bkt[i], secondary_bkt[i],
 				sig[i], h->sig_cmp_fn);
 
-		if (prim_hitmask[i]) {
-			uint32_t first_hit =
-					__builtin_ctzl(prim_hitmask[i]) >> 1;
-			uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
-			continue;
-		}
+			if (prim_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(prim_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+					primary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+				continue;
+			}
 
-		if (sec_hitmask[i]) {
-			uint32_t first_hit =
-					__builtin_ctzl(sec_hitmask[i]) >> 1;
-			uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
+			if (sec_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(sec_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+					secondary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+			}
 		}
-	}
 
-	/* Compare keys, first hits in primary first */
-	for (i = 0; i < num_keys; i++) {
-		positions[i] = -ENOENT;
-		while (prim_hitmask[i]) {
-			uint32_t hit_index =
-					__builtin_ctzl(prim_hitmask[i]) >> 1;
-
-			uint32_t key_idx =
-			__atomic_load_n(
-				&primary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+		/* Compare keys, first hits in primary first */
+		for (i = 0; i < num_keys; i++) {
+			positions[i] = -ENOENT;
+			while (prim_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(prim_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+				__atomic_load_n(
+					&primary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
 			}
-			prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
-		}
-
-		while (sec_hitmask[i]) {
-			uint32_t hit_index =
-					__builtin_ctzl(sec_hitmask[i]) >> 1;
 
-			uint32_t key_idx =
-			__atomic_load_n(
-				&secondary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
+			while (sec_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(sec_hitmask[i])
+						>> 1;
+				uint32_t key_idx =
+				__atomic_load_n(
+					&secondary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
 			}
-			sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
+next_key:
+			continue;
 		}
 
-next_key:
-		continue;
-	}
+		/* The loads of sig_current in compare_signatures
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, primary key index and secondary
+		 * key index will make sure that it does not get
+		 * hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
 	/* all found, do not need to go through ext bkt */
 	if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index b823d37..728cd17 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -196,6 +197,8 @@ struct rte_hash {
 	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
 	struct rte_hash_bucket *buckets_ext; /**< Extra buckets array */
 	struct rte_ring *free_ext_bkts; /**< Ring of indexes of free buckets */
+	uint32_t *tbl_chng_cnt;
+	/**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
-- 
2.7.4



More information about the dev mailing list