[dpdk-dev] [PATCH 3/4] hash: fix rw concurrency while moving keys

Honnappa Nagarahalli honnappa.nagarahalli at arm.com
Thu Sep 6 19:12:17 CEST 2018


Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Reviewed-by: Gavin Hu <gavin.hu at arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
Reviewed-by: Steve Capper <steve.capper at arm.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 307 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   2 +
 lib/librte_hash/rte_hash.h        |   8 +-
 3 files changed, 206 insertions(+), 111 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 2d89158..1e4a8d4 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -256,6 +256,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 #endif
 	/* Setup hash context */
 	snprintf(h->name, sizeof(h->name), "%s", params->name);
+	h->tbl_chng_cnt = 0;
 	h->entries = params->entries;
 	h->key_len = params->key_len;
 	h->key_entry_size = key_entry_size;
@@ -588,7 +589,7 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
  * return 0 if succeeds.
  */
 static inline int
-rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
+rte_hash_cuckoo_move_insert_mw(struct rte_hash *h,
 			struct rte_hash_bucket *bkt,
 			struct rte_hash_bucket *alt_bkt,
 			const struct rte_hash_key *key, void *data,
@@ -639,11 +640,27 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		if (unlikely(&h->buckets[prev_alt_bkt_idx]
 				!= curr_bkt)) {
 			/* revert it to empty, otherwise duplicated keys */
-			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+				EMPTY_SLOT,
+				__ATOMIC_RELEASE);
 			__hash_rw_writer_unlock(h);
 			return -1;
 		}
 
+		/* Inform the previous move. The current move need
+		 * not be informed now as the current bucket entry
+		 * is present in both primary and secondary.
+		 * Since there is one writer, load acquires on
+		 * tbl_chng_cnt are not required.
+		 */
+		__atomic_store_n(&h->tbl_chng_cnt,
+				 h->tbl_chng_cnt + 1,
+				 __ATOMIC_RELEASE);
+		/* The stores to sig_alt and sig_current should not
+		 * move above the store to tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_RELEASE);
+
 		/* Need to swap current/alt sig to allow later
 		 * Cuckoo insert to move elements back to its
 		 * primary bucket if available
@@ -662,6 +679,20 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		curr_bkt = curr_node->bkt;
 	}
 
+	/* Inform the previous move. The current move need
+	 * not be informed now as the current bucket entry
+	 * is present in both primary and secondary.
+	 * Since there is one writer, load acquires on
+	 * tbl_chng_cnt are not required.
+	 */
+	__atomic_store_n(&h->tbl_chng_cnt,
+			 h->tbl_chng_cnt + 1,
+			 __ATOMIC_RELEASE);
+	/* The stores to sig_alt and sig_current should not
+	 * move above the store to tbl_chng_cnt.
+	 */
+	__atomic_thread_fence(__ATOMIC_RELEASE);
+
 	curr_bkt->sig_current[curr_slot] = sig;
 	curr_bkt->sig_alt[curr_slot] = alt_hash;
 	/* Release the new bucket entry */
@@ -680,7 +711,7 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
  * Cuckoo
  */
 static inline int
-rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+rte_hash_cuckoo_make_space_mw(struct rte_hash *h,
 			struct rte_hash_bucket *bkt,
 			struct rte_hash_bucket *sec_bkt,
 			const struct rte_hash_key *key, void *data,
@@ -728,7 +759,7 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 }
 
 static inline int32_t
-__rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
+__rte_hash_add_key_with_hash(struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
 {
 	hash_sig_t alt_hash;
@@ -844,7 +875,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 }
 
 int32_t
-rte_hash_add_key_with_hash(const struct rte_hash *h,
+rte_hash_add_key_with_hash(struct rte_hash *h,
 			const void *key, hash_sig_t sig)
 {
 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
@@ -852,14 +883,14 @@ rte_hash_add_key_with_hash(const struct rte_hash *h,
 }
 
 int32_t
-rte_hash_add_key(const struct rte_hash *h, const void *key)
+rte_hash_add_key(struct rte_hash *h, const void *key)
 {
 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
 	return __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), 0);
 }
 
 int
-rte_hash_add_key_with_hash_data(const struct rte_hash *h,
+rte_hash_add_key_with_hash_data(struct rte_hash *h,
 			const void *key, hash_sig_t sig, void *data)
 {
 	int ret;
@@ -873,7 +904,7 @@ rte_hash_add_key_with_hash_data(const struct rte_hash *h,
 }
 
 int
-rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data)
+rte_hash_add_key_data(struct rte_hash *h, const void *key, void *data)
 {
 	int ret;
 
@@ -926,30 +957,56 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t bucket_idx;
 	hash_sig_t alt_hash;
 	struct rte_hash_bucket *bkt;
+	uint32_t cnt_b, cnt_a;
 	int ret;
 
-	bucket_idx = sig & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
-
 	__hash_rw_reader_lock(h);
 
-	/* Check if key is in primary location */
-	ret = search_one_bucket(h, key, sig, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
-	/* Calculate secondary hash */
-	alt_hash = rte_hash_secondary_hash(sig);
-	bucket_idx = alt_hash & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in search_one_bucket are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(&h->tbl_chng_cnt,
+				__ATOMIC_ACQUIRE);
+
+		bucket_idx = sig & h->bucket_bitmask;
+		bkt = &h->buckets[bucket_idx];
+
+		/* Check if key is in primary location */
+		ret = search_one_bucket(h, key, sig, data, bkt);
+		if (ret != -1) {
+			__hash_rw_reader_unlock(h);
+			return ret;
+		}
+		/* Calculate secondary hash */
+		alt_hash = rte_hash_secondary_hash(sig);
+		bucket_idx = alt_hash & h->bucket_bitmask;
+		bkt = &h->buckets[bucket_idx];
+
+		/* Check if key is in secondary location */
+		ret = search_one_bucket(h, key, alt_hash, data, bkt);
+		if (ret != -1) {
+			__hash_rw_reader_unlock(h);
+			return ret;
+		}
+
+		/* The loads of sig_current in search_one_bucket
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, key index in primary bucket
+		 * and key index in secondary bucket will make sure
+		 * that it does not get hoisted.
+		 */
+		cnt_a = __atomic_load_n(&h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
-	/* Check if key is in secondary location */
-	ret = search_one_bucket(h, key, alt_hash, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
 	__hash_rw_reader_unlock(h);
 	return -ENOENT;
 }
@@ -1190,6 +1247,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+	uint32_t cnt_b, cnt_a;
 
 	/* Prefetch first keys */
 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1225,102 +1283,137 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	}
 
 	__hash_rw_reader_lock(h);
-	/* Compare signatures and prefetch key slot of first hit */
-	for (i = 0; i < num_keys; i++) {
-		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in compare_signatures are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(&h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+
+		/* Compare signatures and prefetch key slot of first hit */
+		for (i = 0; i < num_keys; i++) {
+			compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
 				primary_bkt[i], secondary_bkt[i],
 				prim_hash[i], sec_hash[i], h->sig_cmp_fn);
 
-		if (prim_hitmask[i]) {
-			uint32_t first_hit = __builtin_ctzl(prim_hitmask[i]);
-			uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
-			continue;
-		}
+			if (prim_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(prim_hitmask[i]);
+				uint32_t key_idx =
+					primary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+				continue;
+			}
 
-		if (sec_hitmask[i]) {
-			uint32_t first_hit = __builtin_ctzl(sec_hitmask[i]);
-			uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
+			if (sec_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(sec_hitmask[i]);
+				uint32_t key_idx =
+					secondary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+			}
 		}
-	}
 
-	/* Compare keys, first hits in primary first */
-	for (i = 0; i < num_keys; i++) {
-		positions[i] = -ENOENT;
-		while (prim_hitmask[i]) {
-			uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]);
+		/* Compare keys, first hits in primary first */
+		for (i = 0; i < num_keys; i++) {
+			positions[i] = -ENOENT;
+			while (prim_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(prim_hitmask[i]);
 
-			uint32_t key_idx =
-			__atomic_load_n(
-				&primary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+				uint32_t key_idx =
+				__atomic_load_n(
+					&primary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				prim_hitmask[i] &= ~(1 << (hit_index));
 			}
-			prim_hitmask[i] &= ~(1 << (hit_index));
-		}
 
-		while (sec_hitmask[i]) {
-			uint32_t hit_index = __builtin_ctzl(sec_hitmask[i]);
+			while (sec_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(sec_hitmask[i]);
 
-			uint32_t key_idx =
-			__atomic_load_n(
-				&secondary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
+				uint32_t key_idx =
+				__atomic_load_n(
+					&secondary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				sec_hitmask[i] &= ~(1 << (hit_index));
 			}
-			sec_hitmask[i] &= ~(1 << (hit_index));
-		}
 
 next_key:
-		continue;
-	}
+			continue;
+		}
+
+		/* The loads of sig_current in compare_signatures
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, primary key index and secondary
+		 * key index will make sure that it does not get
+		 * hoisted.
+		 */
+		cnt_a = __atomic_load_n(&h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
 	__hash_rw_reader_unlock(h);
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index b0c7ef9..5ce864c 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -186,6 +186,8 @@ struct rte_hash {
 	 * to the key table.
 	 */
 	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
+	uint32_t tbl_chng_cnt;
+	/**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index 9e7d931..05e024b 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -156,7 +156,7 @@ rte_hash_count(const struct rte_hash *h);
  *   - -ENOSPC if there is no space in the hash for this key.
  */
 int
-rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data);
+rte_hash_add_key_data(struct rte_hash *h, const void *key, void *data);
 
 /**
  * Add a key-value pair with a pre-computed hash value
@@ -180,7 +180,7 @@ rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data);
  *   - -ENOSPC if there is no space in the hash for this key.
  */
 int32_t
-rte_hash_add_key_with_hash_data(const struct rte_hash *h, const void *key,
+rte_hash_add_key_with_hash_data(struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data);
 
 /**
@@ -200,7 +200,7 @@ rte_hash_add_key_with_hash_data(const struct rte_hash *h, const void *key,
  *     array of user data. This value is unique for this key.
  */
 int32_t
-rte_hash_add_key(const struct rte_hash *h, const void *key);
+rte_hash_add_key(struct rte_hash *h, const void *key);
 
 /**
  * Add a key to an existing hash table.
@@ -222,7 +222,7 @@ rte_hash_add_key(const struct rte_hash *h, const void *key);
  *     array of user data. This value is unique for this key.
  */
 int32_t
-rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig);
+rte_hash_add_key_with_hash(struct rte_hash *h, const void *key, hash_sig_t sig);
 
 /**
  * Remove a key from an existing hash table.
-- 
2.7.4



More information about the dev mailing list