[dpdk-dev] [PATCH v1 1/3] hash: add read and write concurrency support

Yipeng Wang yipeng1.wang at intel.com
Fri Jun 8 12:51:16 CEST 2018


The existing implementation of librte_hash does not support read-write
concurrency. This commit implements read-write safety using rte_rwlock
and rte_rwlock TM version if hardware transactional memory is available.

Both multi-writer and read-write concurrency is protected by rte_rwlock
now. The x86 specific header file is removed since the x86 specific RTM
function is not called directly by rte hash now.

Signed-off-by: Yipeng Wang <yipeng1.wang at intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c     | 627 +++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h     |  16 +-
 lib/librte_hash/rte_cuckoo_hash_x86.h | 164 ---------
 lib/librte_hash/rte_hash.h            |   3 +
 4 files changed, 390 insertions(+), 420 deletions(-)
 delete mode 100644 lib/librte_hash/rte_cuckoo_hash_x86.h

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index a07543a..a5bb4d4 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -31,9 +31,6 @@
 #include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
 
-#if defined(RTE_ARCH_X86)
-#include "rte_cuckoo_hash_x86.h"
-#endif
 
 TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
 
@@ -93,8 +90,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	void *buckets = NULL;
 	char ring_name[RTE_RING_NAMESIZE];
 	unsigned num_key_slots;
-	unsigned hw_trans_mem_support = 0;
 	unsigned i;
+	unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
+	unsigned int readwrite_concur_support = 0;
+
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
@@ -118,8 +117,16 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
 		hw_trans_mem_support = 1;
 
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD)
+		multi_writer_support = 1;
+
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) {
+		readwrite_concur_support = 1;
+		multi_writer_support = 1;
+	}
+
 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
-	if (hw_trans_mem_support)
+	if (multi_writer_support)
 		/*
 		 * Increase number of slots by total number of indices
 		 * that can be stored in the lcore caches
@@ -233,7 +240,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	h->cmp_jump_table_idx = KEY_OTHER_BYTES;
 #endif
 
-	if (hw_trans_mem_support) {
+	if (multi_writer_support) {
 		h->local_free_slots = rte_zmalloc_socket(NULL,
 				sizeof(struct lcore_cache) * RTE_MAX_LCORE,
 				RTE_CACHE_LINE_SIZE, params->socket_id);
@@ -261,6 +268,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	h->key_store = k;
 	h->free_slots = r;
 	h->hw_trans_mem_support = hw_trans_mem_support;
+	h->multi_writer_support = multi_writer_support;
+	h->readwrite_concur_support = readwrite_concur_support;
 
 #if defined(RTE_ARCH_X86)
 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX2))
@@ -271,21 +280,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
 #endif
 		h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR;
 
-	/* Turn on multi-writer only with explicit flat from user and TM
+	/* Turn on multi-writer only with explicit flag from user and TM
 	 * support.
 	 */
-	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD) {
-		if (h->hw_trans_mem_support) {
-			h->add_key = ADD_KEY_MULTIWRITER_TM;
-		} else {
-			h->add_key = ADD_KEY_MULTIWRITER;
-			h->multiwriter_lock = rte_malloc(NULL,
-							sizeof(rte_spinlock_t),
+	if (h->multi_writer_support) {
+		h->readwrite_lock = rte_malloc(NULL, sizeof(rte_rwlock_t),
 							LCORE_CACHE_SIZE);
-			rte_spinlock_init(h->multiwriter_lock);
-		}
-	} else
-		h->add_key = ADD_KEY_SINGLEWRITER;
+		rte_rwlock_init(h->readwrite_lock);
+	}
 
 	/* Populate free slots ring. Entry zero is reserved for key misses. */
 	for (i = 1; i < params->entries + 1; i++)
@@ -335,11 +337,10 @@ rte_hash_free(struct rte_hash *h)
 
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
-	if (h->hw_trans_mem_support)
+	if (h->multi_writer_support) {
 		rte_free(h->local_free_slots);
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_free(h->multiwriter_lock);
+		rte_free(h->readwrite_lock);
+	}
 	rte_ring_free(h->free_slots);
 	rte_free(h->key_store);
 	rte_free(h->buckets);
@@ -386,77 +387,50 @@ rte_hash_reset(struct rte_hash *h)
 	for (i = 1; i < h->entries + 1; i++)
 		rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
 
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		/* Reset local caches per lcore */
 		for (i = 0; i < RTE_MAX_LCORE; i++)
 			h->local_free_slots[i].len = 0;
 	}
 }
 
-/* Search for an entry that can be pushed to its alternative location */
-static inline int
-make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt,
-		unsigned int *nr_pushes)
-{
-	unsigned i, j;
-	int ret;
-	uint32_t next_bucket_idx;
-	struct rte_hash_bucket *next_bkt[RTE_HASH_BUCKET_ENTRIES];
 
-	/*
-	 * Push existing item (search for bucket with space in
-	 * alternative locations) to its alternative location
-	 */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		/* Search for space in alternative locations */
-		next_bucket_idx = bkt->sig_alt[i] & h->bucket_bitmask;
-		next_bkt[i] = &h->buckets[next_bucket_idx];
-		for (j = 0; j < RTE_HASH_BUCKET_ENTRIES; j++) {
-			if (next_bkt[i]->key_idx[j] == EMPTY_SLOT)
-				break;
-		}
-
-		if (j != RTE_HASH_BUCKET_ENTRIES)
-			break;
-	}
-
-	/* Alternative location has spare room (end of recursive function) */
-	if (i != RTE_HASH_BUCKET_ENTRIES) {
-		next_bkt[i]->sig_alt[j] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[j] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[j] = bkt->key_idx[i];
-		return i;
-	}
+static inline void
+__hash_rw_writer_lock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_lock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_lock(h->readwrite_lock);
+}
 
-	/* Pick entry that has not been pushed yet */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++)
-		if (bkt->flag[i] == 0)
-			break;
 
-	/* All entries have been pushed, so entry cannot be added */
-	if (i == RTE_HASH_BUCKET_ENTRIES || ++(*nr_pushes) > RTE_HASH_MAX_PUSHES)
-		return -ENOSPC;
+static inline void
+__hash_rw_reader_lock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_lock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_lock(h->readwrite_lock);
+}
 
-	/* Set flag to indicate that this entry is going to be pushed */
-	bkt->flag[i] = 1;
+static inline void
+__hash_rw_writer_unlock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_unlock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_unlock(h->readwrite_lock);
+}
 
-	/* Need room in alternative bucket to insert the pushed entry */
-	ret = make_space_bucket(h, next_bkt[i], nr_pushes);
-	/*
-	 * After recursive function.
-	 * Clear flags and insert the pushed entry
-	 * in its alternative location if successful,
-	 * or return error
-	 */
-	bkt->flag[i] = 0;
-	if (ret >= 0) {
-		next_bkt[i]->sig_alt[ret] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[ret] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[ret] = bkt->key_idx[i];
-		return i;
-	} else
-		return ret;
 
+static inline void
+__hash_rw_reader_unlock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_unlock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_unlock(h->readwrite_lock);
 }
 
 /*
@@ -469,32 +443,236 @@ enqueue_slot_back(const struct rte_hash *h,
 		struct lcore_cache *cached_free_slots,
 		void *slot_id)
 {
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		cached_free_slots->objs[cached_free_slots->len] = slot_id;
 		cached_free_slots->len++;
 	} else
 		rte_ring_sp_enqueue(h->free_slots, slot_id);
 }
 
+/* Search a key from bucket and update its data */
+static inline int32_t
+search_and_update(const struct rte_hash *h, void *data, const void *key,
+	struct rte_hash_bucket *bkt, hash_sig_t sig, hash_sig_t alt_hash)
+{
+	int i;
+	struct rte_hash_key *k, *keys = h->key_store;
+
+	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+		if (bkt->sig_current[i] == sig &&
+				bkt->sig_alt[i] == alt_hash) {
+			k = (struct rte_hash_key *) ((char *)keys +
+					bkt->key_idx[i] * h->key_entry_size);
+			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
+				/* Update data */
+				k->pdata = data;
+				/*
+				 * Return index where key is stored,
+				 * subtracting the first dummy index
+				 */
+				return bkt->key_idx[i] - 1;
+			}
+		}
+	}
+	return -1;
+}
+
+
+/* Only tries to insert at one bucket (@prim_bkt) without trying to push
+ * buckets around
+ */
+static inline int32_t
+rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
+		struct rte_hash_bucket *prim_bkt,
+		struct rte_hash_bucket *sec_bkt,
+		const struct rte_hash_key *key, void *data,
+		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+		int32_t *ret_val)
+{
+	unsigned int i;
+	struct rte_hash_bucket *cur_bkt = prim_bkt;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+	/* Check if key is already inserted before protected region*/
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	/* Insert new entry if there is room in the primary
+	 * bucket.
+	 */
+	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+		/* Check if slot is available */
+		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
+			prim_bkt->sig_current[i] = sig;
+			prim_bkt->sig_alt[i] = alt_hash;
+			prim_bkt->key_idx[i] = new_idx;
+			break;
+		}
+	}
+	__hash_rw_writer_unlock(h);
+
+	if (i != RTE_HASH_BUCKET_ENTRIES)
+		return 0;
+
+	/* no empty entry */
+	return -1;
+}
+
+/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
+ * the path head with new entry (sig, alt_hash, new_idx)
+ */
+static inline int
+rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *alt_bkt,
+			const struct rte_hash_key *key, void *data,
+			struct queue_node *leaf, uint32_t leaf_slot,
+			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+			int32_t *ret_val)
+{
+	uint32_t prev_alt_bkt_idx;
+	struct rte_hash_bucket *cur_bkt = bkt;
+	struct queue_node *prev_node, *curr_node = leaf;
+	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
+	uint32_t prev_slot, curr_slot = leaf_slot;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+
+	/* Check if key is already inserted */
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	ret = search_and_update(h, data, key, alt_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	while (likely(curr_node->prev != NULL)) {
+		prev_node = curr_node->prev;
+		prev_bkt = prev_node->bkt;
+		prev_slot = curr_node->prev_slot;
+
+		prev_alt_bkt_idx =
+			prev_bkt->sig_alt[prev_slot] & h->bucket_bitmask;
+
+		if (unlikely(&h->buckets[prev_alt_bkt_idx]
+				!= curr_bkt)) {
+			__hash_rw_writer_unlock(h);
+			return -1;
+		}
+
+		/* Need to swap current/alt sig to allow later
+		 * Cuckoo insert to move elements back to its
+		 * primary bucket if available
+		 */
+		curr_bkt->sig_alt[curr_slot] =
+			 prev_bkt->sig_current[prev_slot];
+		curr_bkt->sig_current[curr_slot] =
+			prev_bkt->sig_alt[prev_slot];
+		curr_bkt->key_idx[curr_slot] =
+			prev_bkt->key_idx[prev_slot];
+
+		curr_slot = prev_slot;
+		curr_node = prev_node;
+		curr_bkt = curr_node->bkt;
+	}
+
+	curr_bkt->sig_current[curr_slot] = sig;
+	curr_bkt->sig_alt[curr_slot] = alt_hash;
+	curr_bkt->key_idx[curr_slot] = new_idx;
+
+	__hash_rw_writer_unlock(h);
+
+	return 0;
+
+}
+
+/*
+ * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
+ * Cuckoo
+ */
+static inline int
+rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *sec_bkt,
+			const struct rte_hash_key *key, void *data,
+			hash_sig_t sig, hash_sig_t alt_hash,
+			uint32_t new_idx, int32_t *ret_val)
+{
+	unsigned int i;
+	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
+	struct queue_node *tail, *head;
+	struct rte_hash_bucket *curr_bkt, *alt_bkt;
+
+	tail = queue;
+	head = queue + 1;
+	tail->bkt = bkt;
+	tail->prev = NULL;
+	tail->prev_slot = -1;
+
+	/* Cuckoo bfs Search */
+	while (likely(tail != head && head <
+					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
+					RTE_HASH_BUCKET_ENTRIES)) {
+		curr_bkt = tail->bkt;
+		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
+				int32_t ret = rte_hash_cuckoo_move_insert_mw(h,
+						bkt, sec_bkt, key, data,
+						tail, i, sig, alt_hash,
+						new_idx, ret_val);
+				if (likely(ret != -1))
+					return ret;
+			}
+
+			/* Enqueue new node and keep prev node info */
+			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
+						    & h->bucket_bitmask]);
+			head->bkt = alt_bkt;
+			head->prev = tail;
+			head->prev_slot = i;
+			head++;
+		}
+		tail++;
+	}
+
+	return -ENOSPC;
+}
+
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
 {
 	hash_sig_t alt_hash;
 	uint32_t prim_bucket_idx, sec_bucket_idx;
-	unsigned i;
 	struct rte_hash_bucket *prim_bkt, *sec_bkt;
-	struct rte_hash_key *new_k, *k, *keys = h->key_store;
+	struct rte_hash_key *new_k, *keys = h->key_store;
 	void *slot_id = NULL;
 	uint32_t new_idx;
 	int ret;
 	unsigned n_slots;
 	unsigned lcore_id;
 	struct lcore_cache *cached_free_slots = NULL;
-	unsigned int nr_pushes = 0;
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_lock(h->multiwriter_lock);
+	int32_t ret_val;
 
 	prim_bucket_idx = sig & h->bucket_bitmask;
 	prim_bkt = &h->buckets[prim_bucket_idx];
@@ -506,7 +684,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	rte_prefetch0(sec_bkt);
 
 	/* Get a new slot for storing the new key */
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Try to get a free slot from the local cache */
@@ -516,8 +694,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 					cached_free_slots->objs,
 					LCORE_CACHE_SIZE, NULL);
 			if (n_slots == 0) {
-				ret = -ENOSPC;
-				goto failure;
+				return -ENOSPC;
 			}
 
 			cached_free_slots->len += n_slots;
@@ -528,8 +705,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		slot_id = cached_free_slots->objs[cached_free_slots->len];
 	} else {
 		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
-			ret = -ENOSPC;
-			goto failure;
+			return -ENOSPC;
 		}
 	}
 
@@ -538,116 +714,60 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	new_idx = (uint32_t)((uintptr_t) slot_id);
 
 	/* Check if key is already inserted in primary location */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (prim_bkt->sig_current[i] == sig &&
-				prim_bkt->sig_alt[i] == alt_hash) {
-			k = (struct rte_hash_key *) ((char *)keys +
-					prim_bkt->key_idx[i] * h->key_entry_size);
-			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				/* Enqueue index of free slot back in the ring. */
-				enqueue_slot_back(h, cached_free_slots, slot_id);
-				/* Update data */
-				k->pdata = data;
-				/*
-				 * Return index where key is stored,
-				 * subtracting the first dummy index
-				 */
-				ret = prim_bkt->key_idx[i] - 1;
-				goto failure;
-			}
-		}
+	ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
+	if (ret != -1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret;
 	}
 
 	/* Check if key is already inserted in secondary location */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (sec_bkt->sig_alt[i] == sig &&
-				sec_bkt->sig_current[i] == alt_hash) {
-			k = (struct rte_hash_key *) ((char *)keys +
-					sec_bkt->key_idx[i] * h->key_entry_size);
-			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				/* Enqueue index of free slot back in the ring. */
-				enqueue_slot_back(h, cached_free_slots, slot_id);
-				/* Update data */
-				k->pdata = data;
-				/*
-				 * Return index where key is stored,
-				 * subtracting the first dummy index
-				 */
-				ret = sec_bkt->key_idx[i] - 1;
-				goto failure;
-			}
-		}
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret;
 	}
 
 	/* Copy key */
 	rte_memcpy(new_k->key, key, h->key_len);
 	new_k->pdata = data;
 
-#if defined(RTE_ARCH_X86) /* currently only x86 support HTM */
-	if (h->add_key == ADD_KEY_MULTIWRITER_TM) {
-		ret = rte_hash_cuckoo_insert_mw_tm(prim_bkt,
-				sig, alt_hash, new_idx);
-		if (ret >= 0)
-			return new_idx - 1;
 
-		/* Primary bucket full, need to make space for new entry */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, prim_bkt, sig,
-							alt_hash, new_idx);
+	/* Find an empty slot and insert */
+	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		if (ret >= 0)
-			return new_idx - 1;
+	/* Primary bucket full, need to make space for new entry */
+	ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		/* Also search secondary bucket to get better occupancy */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, sec_bkt, sig,
-							alt_hash, new_idx);
+	/* Also search secondary bucket to get better occupancy */
+	ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
 
-		if (ret >= 0)
-			return new_idx - 1;
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
 	} else {
-#endif
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			/* Check if slot is available */
-			if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-				prim_bkt->sig_current[i] = sig;
-				prim_bkt->sig_alt[i] = alt_hash;
-				prim_bkt->key_idx[i] = new_idx;
-				break;
-			}
-		}
-
-		if (i != RTE_HASH_BUCKET_ENTRIES) {
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-
-		/* Primary bucket full, need to make space for new entry
-		 * After recursive function.
-		 * Insert the new entry in the position of the pushed entry
-		 * if successful or return error and
-		 * store the new slot back in the ring
-		 */
-		ret = make_space_bucket(h, prim_bkt, &nr_pushes);
-		if (ret >= 0) {
-			prim_bkt->sig_current[ret] = sig;
-			prim_bkt->sig_alt[ret] = alt_hash;
-			prim_bkt->key_idx[ret] = new_idx;
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-#if defined(RTE_ARCH_X86)
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret;
 	}
-#endif
-	/* Error in addition, store new slot back in the ring and return error */
-	enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
-
-failure:
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_unlock(h->multiwriter_lock);
-	return ret;
 }
 
+
 int32_t
 rte_hash_add_key_with_hash(const struct rte_hash *h,
 			const void *key, hash_sig_t sig)
@@ -690,25 +810,20 @@ rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data)
 	else
 		return ret;
 }
+
+/* Search one bucket to find the match key */
 static inline int32_t
-__rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
-					hash_sig_t sig, void **data)
+search_one_bucket(const struct rte_hash *h, const void *key, hash_sig_t sig,
+			void **data, struct rte_hash_bucket *bkt)
 {
-	uint32_t bucket_idx;
-	hash_sig_t alt_hash;
-	unsigned i;
-	struct rte_hash_bucket *bkt;
+	int i;
 	struct rte_hash_key *k, *keys = h->key_store;
 
-	bucket_idx = sig & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
-
-	/* Check if key is in primary location */
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
 		if (bkt->sig_current[i] == sig &&
 				bkt->key_idx[i] != EMPTY_SLOT) {
 			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
+				bkt->key_idx[i] * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				if (data != NULL)
 					*data = k->pdata;
@@ -720,6 +835,29 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 			}
 		}
 	}
+	return -1;
+}
+
+static inline int32_t
+__rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
+					hash_sig_t sig, void **data)
+{
+	uint32_t bucket_idx;
+	hash_sig_t alt_hash;
+	struct rte_hash_bucket *bkt;
+	int ret;
+
+	bucket_idx = sig & h->bucket_bitmask;
+	bkt = &h->buckets[bucket_idx];
+
+	__hash_rw_reader_lock(h);
+
+	/* Check if key is in primary location */
+	ret = search_one_bucket(h, key, sig, data, bkt);
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
+		return ret;
+	}
 
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
@@ -727,23 +865,13 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 	bkt = &h->buckets[bucket_idx];
 
 	/* Check if key is in secondary location */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == alt_hash &&
-				bkt->sig_alt[i] == sig) {
-			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
-			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				if (data != NULL)
-					*data = k->pdata;
-				/*
-				 * Return index where key is stored,
-				 * subtracting the first dummy index
-				 */
-				return bkt->key_idx[i] - 1;
-			}
-		}
+	ret = search_one_bucket(h, key, alt_hash, data, bkt);
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
+		return ret;
 	}
 
+	__hash_rw_reader_unlock(h);
 	return -ENOENT;
 }
 
@@ -784,8 +912,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 	struct lcore_cache *cached_free_slots;
 
 	bkt->sig_current[i] = NULL_SIGNATURE;
-	bkt->sig_alt[i] = NULL_SIGNATURE;
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Cache full, need to free it. */
@@ -806,19 +933,14 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 	}
 }
 
+/* Search one bucket and remove the matched key */
 static inline int32_t
-__rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
-						hash_sig_t sig)
+search_and_remove(const struct rte_hash *h, const void *key,
+			struct rte_hash_bucket *bkt, hash_sig_t sig,
+			int32_t *ret_val)
 {
-	uint32_t bucket_idx;
-	hash_sig_t alt_hash;
-	unsigned i;
-	struct rte_hash_bucket *bkt;
 	struct rte_hash_key *k, *keys = h->key_store;
-	int32_t ret;
-
-	bucket_idx = sig & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
+	unsigned int i;
 
 	/* Check if key is in primary location */
 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
@@ -833,38 +955,45 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 				 * Return index where key is stored,
 				 * subtracting the first dummy index
 				 */
-				ret = bkt->key_idx[i] - 1;
+				*ret_val = bkt->key_idx[i] - 1;
 				bkt->key_idx[i] = EMPTY_SLOT;
-				return ret;
+				return 0;
 			}
 		}
 	}
+	return -1;
+}
+
+static inline int32_t
+__rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
+						hash_sig_t sig)
+{
+	int32_t bucket_idx;
+	hash_sig_t alt_hash;
+	struct rte_hash_bucket *bkt;
+	int32_t ret_val;
+
+	bucket_idx = sig & h->bucket_bitmask;
+	bkt = &h->buckets[bucket_idx];
+
+	__hash_rw_writer_lock(h);
+	/* look for key in primary bucket */
+	if (!search_and_remove(h, key, bkt, sig, &ret_val)) {
+		__hash_rw_writer_unlock(h);
+		return ret_val;
+	}
 
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
 	bucket_idx = alt_hash & h->bucket_bitmask;
 	bkt = &h->buckets[bucket_idx];
 
-	/* Check if key is in secondary location */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		if (bkt->sig_current[i] == alt_hash &&
-				bkt->key_idx[i] != EMPTY_SLOT) {
-			k = (struct rte_hash_key *) ((char *)keys +
-					bkt->key_idx[i] * h->key_entry_size);
-			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-				remove_entry(h, bkt, i);
-
-				/*
-				 * Return index where key is stored,
-				 * subtracting the first dummy index
-				 */
-				ret = bkt->key_idx[i] - 1;
-				bkt->key_idx[i] = EMPTY_SLOT;
-				return ret;
-			}
-		}
+	/* look for key in secondary bucket */
+	if (!search_and_remove(h, key, bkt, alt_hash, &ret_val)) {
+		__hash_rw_writer_unlock(h);
+		return ret_val;
 	}
-
+	__hash_rw_writer_unlock(h);
 	return -ENOENT;
 }
 
@@ -1034,6 +1163,8 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		}
 	}
 
+	__hash_rw_reader_lock(h);
+
 	/* Compare keys, first hits in primary first */
 	for (i = 0; i < num_keys; i++) {
 		positions[i] = -ENOENT;
@@ -1088,6 +1219,8 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		continue;
 	}
 
+	__hash_rw_reader_unlock(h);
+
 	if (hit_mask != NULL)
 		*hit_mask = hits;
 }
@@ -1146,7 +1279,7 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 		bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
 		idx = *next % RTE_HASH_BUCKET_ENTRIES;
 	}
-
+	__hash_rw_reader_lock(h);
 	/* Get position of entry in key table */
 	position = h->buckets[bucket_idx].key_idx[idx];
 	next_key = (struct rte_hash_key *) ((char *)h->key_store +
@@ -1155,6 +1288,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	*key = next_key->key;
 	*data = next_key->pdata;
 
+	__hash_rw_reader_unlock(h);
+
 	/* Increment iterator */
 	(*next)++;
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 7a54e55..40b6be0 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -88,11 +88,6 @@ const rte_hash_cmp_eq_t cmp_jump_table[NUM_KEY_CMP_CASES] = {
 
 #endif
 
-enum add_key_case {
-	ADD_KEY_SINGLEWRITER = 0,
-	ADD_KEY_MULTIWRITER,
-	ADD_KEY_MULTIWRITER_TM,
-};
 
 /** Number of items per bucket. */
 #define RTE_HASH_BUCKET_ENTRIES		8
@@ -155,18 +150,19 @@ struct rte_hash {
 
 	struct rte_ring *free_slots;
 	/**< Ring that stores all indexes of the free slots in the key table */
-	uint8_t hw_trans_mem_support;
-	/**< Hardware transactional memory support */
+
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
-	enum add_key_case add_key; /**< Multi-writer hash add behavior */
-
-	rte_spinlock_t *multiwriter_lock; /**< Multi-writer spinlock for w/o TM */
 
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
 	/**< Length of hash key. */
+	uint8_t hw_trans_mem_support;
+	uint8_t multi_writer_support;
+	uint8_t readwrite_concur_support;
+	/**< Hardware transactional memory support */
+	rte_rwlock_t *readwrite_lock; /**< Multi-writer spinlock for w/o TM */
 	rte_hash_function hash_func;    /**< Function used to calculate hash. */
 	uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
 	rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
diff --git a/lib/librte_hash/rte_cuckoo_hash_x86.h b/lib/librte_hash/rte_cuckoo_hash_x86.h
deleted file mode 100644
index 2c5b017..0000000
--- a/lib/librte_hash/rte_cuckoo_hash_x86.h
+++ /dev/null
@@ -1,164 +0,0 @@
-/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016 Intel Corporation
- */
-
-/* rte_cuckoo_hash_x86.h
- * This file holds all x86 specific Cuckoo Hash functions
- */
-
-/* Only tries to insert at one bucket (@prim_bkt) without trying to push
- * buckets around
- */
-static inline unsigned
-rte_hash_cuckoo_insert_mw_tm(struct rte_hash_bucket *prim_bkt,
-		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned i, status;
-	unsigned try = 0;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			/* Insert new entry if there is room in the primary
-			* bucket.
-			*/
-			for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-				/* Check if slot is available */
-				if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-					prim_bkt->sig_current[i] = sig;
-					prim_bkt->sig_alt[i] = alt_hash;
-					prim_bkt->key_idx[i] = new_idx;
-					break;
-				}
-			}
-			rte_xend();
-
-			if (i != RTE_HASH_BUCKET_ENTRIES)
-				return 0;
-
-			break; /* break off try loop if transaction commits */
-		} else {
-			/* If we abort we give up this cuckoo path. */
-			try++;
-			rte_pause();
-		}
-	}
-
-	return -1;
-}
-
-/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
- * the path head with new entry (sig, alt_hash, new_idx)
- */
-static inline int
-rte_hash_cuckoo_move_insert_mw_tm(const struct rte_hash *h,
-			struct queue_node *leaf, uint32_t leaf_slot,
-			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned try = 0;
-	unsigned status;
-	uint32_t prev_alt_bkt_idx;
-
-	struct queue_node *prev_node, *curr_node = leaf;
-	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
-	uint32_t prev_slot, curr_slot = leaf_slot;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			while (likely(curr_node->prev != NULL)) {
-				prev_node = curr_node->prev;
-				prev_bkt = prev_node->bkt;
-				prev_slot = curr_node->prev_slot;
-
-				prev_alt_bkt_idx
-					= prev_bkt->sig_alt[prev_slot]
-					    & h->bucket_bitmask;
-
-				if (unlikely(&h->buckets[prev_alt_bkt_idx]
-					     != curr_bkt)) {
-					rte_xabort(RTE_XABORT_CUCKOO_PATH_INVALIDED);
-				}
-
-				/* Need to swap current/alt sig to allow later
-				 * Cuckoo insert to move elements back to its
-				 * primary bucket if available
-				 */
-				curr_bkt->sig_alt[curr_slot] =
-				    prev_bkt->sig_current[prev_slot];
-				curr_bkt->sig_current[curr_slot] =
-				    prev_bkt->sig_alt[prev_slot];
-				curr_bkt->key_idx[curr_slot]
-				    = prev_bkt->key_idx[prev_slot];
-
-				curr_slot = prev_slot;
-				curr_node = prev_node;
-				curr_bkt = curr_node->bkt;
-			}
-
-			curr_bkt->sig_current[curr_slot] = sig;
-			curr_bkt->sig_alt[curr_slot] = alt_hash;
-			curr_bkt->key_idx[curr_slot] = new_idx;
-
-			rte_xend();
-
-			return 0;
-		}
-
-		/* If we abort we give up this cuckoo path, since most likely it's
-		 * no longer valid as TSX detected data conflict
-		 */
-		try++;
-		rte_pause();
-	}
-
-	return -1;
-}
-
-/*
- * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
- * Cuckoo
- */
-static inline int
-rte_hash_cuckoo_make_space_mw_tm(const struct rte_hash *h,
-			struct rte_hash_bucket *bkt,
-			hash_sig_t sig, hash_sig_t alt_hash,
-			uint32_t new_idx)
-{
-	unsigned i;
-	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
-	struct queue_node *tail, *head;
-	struct rte_hash_bucket *curr_bkt, *alt_bkt;
-
-	tail = queue;
-	head = queue + 1;
-	tail->bkt = bkt;
-	tail->prev = NULL;
-	tail->prev_slot = -1;
-
-	/* Cuckoo bfs Search */
-	while (likely(tail != head && head <
-					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
-					RTE_HASH_BUCKET_ENTRIES)) {
-		curr_bkt = tail->bkt;
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
-				if (likely(rte_hash_cuckoo_move_insert_mw_tm(h,
-						tail, i, sig,
-						alt_hash, new_idx) == 0))
-					return 0;
-			}
-
-			/* Enqueue new node and keep prev node info */
-			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
-						    & h->bucket_bitmask]);
-			head->bkt = alt_bkt;
-			head->prev = tail;
-			head->prev_slot = i;
-			head++;
-		}
-		tail++;
-	}
-
-	return -ENOSPC;
-}
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index f71ca9f..ecb49e4 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -34,6 +34,9 @@ extern "C" {
 /** Default behavior of insertion, single writer/multi writer */
 #define RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD 0x02
 
+/** Flag to support reader writer concurrency */
+#define RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY 0x04
+
 /** Signature of key that is stored internally. */
 typedef uint32_t hash_sig_t;
 
-- 
2.7.4



More information about the dev mailing list