[dpdk-dev] [PATCH v2 3/4] hash: keep the list locked at creation

Olivier Matz olivier.matz at 6wind.com
Tue Apr 5 09:35:58 CEST 2016


To avoid a race condition while creating a new hash object, the
list has to be locked before the lookup, and released only once the
new object is added in the list.

As the lock is held by the rte_ring_create(), move its creation at the
beginning of the function and only take the lock after the ring is
created to avoid a deadlock.

Signed-off-by: Olivier Matz <olivier.matz at 6wind.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 68 ++++++++++++++++++++++-----------------
 1 file changed, 38 insertions(+), 30 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index ccec2db..63a74fd 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -226,19 +226,46 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
 		hw_trans_mem_support = 1;
 
+	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
+	if (hw_trans_mem_support)
+		/*
+		 * Increase number of slots by total number of indices
+		 * that can be stored in the lcore caches
+		 * except for the first cache
+		 */
+		num_key_slots = params->entries + (RTE_MAX_LCORE - 1) *
+					LCORE_CACHE_SIZE + 1;
+	else
+		num_key_slots = params->entries + 1;
+
+	snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
+	r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots),
+			params->socket_id, 0);
+	if (r == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		goto err;
+	}
+
 	snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
 
-	/* Guarantee there's no existing */
-	h = rte_hash_find_existing(params->name);
-	if (h != NULL) {
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* guarantee there's no existing: this is normally already checked
+	 * by ring creation above */
+	TAILQ_FOREACH(te, hash_list, next) {
+		h = (struct rte_hash *) te->data;
+		if (strncmp(params->name, h->name, RTE_HASH_NAMESIZE) == 0)
+			break;
+	}
+	if (te != NULL) {
 		rte_errno = EEXIST;
-		return NULL;
+		goto err_unlock;
 	}
 
 	te = rte_zmalloc("HASH_TAILQ_ENTRY", sizeof(*te), 0);
 	if (te == NULL) {
 		RTE_LOG(ERR, HASH, "tailq entry allocation failed\n");
-		goto err;
+		goto err_unlock;
 	}
 
 	h = (struct rte_hash *)rte_zmalloc_socket(hash_name, sizeof(struct rte_hash),
@@ -246,7 +273,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (h == NULL) {
 		RTE_LOG(ERR, HASH, "memory allocation failed\n");
-		goto err;
+		goto err_unlock;
 	}
 
 	const uint32_t num_buckets = rte_align32pow2(params->entries)
@@ -258,23 +285,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (buckets == NULL) {
 		RTE_LOG(ERR, HASH, "memory allocation failed\n");
-		goto err;
+		goto err_unlock;
 	}
 
 	const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len;
-
-	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
-	if (hw_trans_mem_support)
-		/*
-		 * Increase number of slots by total number of indices
-		 * that can be stored in the lcore caches
-		 * except for the first cache
-		 */
-		num_key_slots = params->entries + (RTE_MAX_LCORE - 1) *
-					LCORE_CACHE_SIZE + 1;
-	else
-		num_key_slots = params->entries + 1;
-
 	const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;
 
 	k = rte_zmalloc_socket(NULL, key_tbl_size,
@@ -282,7 +296,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (k == NULL) {
 		RTE_LOG(ERR, HASH, "memory allocation failed\n");
-		goto err;
+		goto err_unlock;
 	}
 
 /*
@@ -325,14 +339,6 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	h->rte_hash_cmp_eq = memcmp;
 #endif
 
-	snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
-	r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots),
-			params->socket_id, 0);
-	if (r == NULL) {
-		RTE_LOG(ERR, HASH, "memory allocation failed\n");
-		goto err;
-	}
-
 	if (hw_trans_mem_support) {
 		h->local_free_slots = rte_zmalloc_socket(NULL,
 				sizeof(struct lcore_cache) * RTE_MAX_LCORE,
@@ -359,13 +365,15 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	for (i = 1; i < params->entries + 1; i++)
 		rte_ring_sp_enqueue(r, (void *)((uintptr_t) i));
 
-	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
 	te->data = (void *) h;
 	TAILQ_INSERT_TAIL(hash_list, te, next);
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
 	return h;
+err_unlock:
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 err:
+	rte_ring_free(r);
 	rte_free(te);
 	rte_free(h);
 	rte_free(buckets);
-- 
2.1.4



More information about the dev mailing list