[dpdk-dev] [PATCH v8 5/6] lib/hash: use ring with 32b element size to save memory

Honnappa Nagarahalli honnappa.nagarahalli at arm.com
Mon Jan 13 18:25:17 CET 2020


The freelist and external bucket indices are 32b. Using rings
that use 32b element sizes will save memory.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
Reviewed-by: Gavin Hu <gavin.hu at arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl at arm.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 97 ++++++++++++++++---------------
 lib/librte_hash/rte_cuckoo_hash.h |  2 +-
 2 files changed, 51 insertions(+), 48 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 87a4c01f2..734bec2ac 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -24,7 +24,7 @@
 #include <rte_cpuflags.h>
 #include <rte_rwlock.h>
 #include <rte_spinlock.h>
-#include <rte_ring.h>
+#include <rte_ring_elem.h>
 #include <rte_compat.h>
 #include <rte_vect.h>
 #include <rte_tailq.h>
@@ -136,7 +136,6 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	char ring_name[RTE_RING_NAMESIZE];
 	char ext_ring_name[RTE_RING_NAMESIZE];
 	unsigned num_key_slots;
-	unsigned i;
 	unsigned int hw_trans_mem_support = 0, use_local_cache = 0;
 	unsigned int ext_table_support = 0;
 	unsigned int readwrite_concur_support = 0;
@@ -213,8 +212,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 
 	snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
 	/* Create ring (Dummy slot index is not enqueued) */
-	r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots),
-			params->socket_id, 0);
+	r = rte_ring_create_elem(ring_name, sizeof(uint32_t),
+			rte_align32pow2(num_key_slots), params->socket_id, 0);
 	if (r == NULL) {
 		RTE_LOG(ERR, HASH, "memory allocation failed\n");
 		goto err;
@@ -227,7 +226,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	if (ext_table_support) {
 		snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s",
 								params->name);
-		r_ext = rte_ring_create(ext_ring_name,
+		r_ext = rte_ring_create_elem(ext_ring_name, sizeof(uint32_t),
 				rte_align32pow2(num_buckets + 1),
 				params->socket_id, 0);
 
@@ -294,8 +293,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 		 * use bucket index for the linked list and 0 means NULL
 		 * for next bucket
 		 */
-		for (i = 1; i <= num_buckets; i++)
-			rte_ring_sp_enqueue(r_ext, (void *)((uintptr_t) i));
+		for (uint32_t i = 1; i <= num_buckets; i++)
+			rte_ring_sp_enqueue_elem(r_ext, &i, sizeof(uint32_t));
 
 		if (readwrite_concur_lf_support) {
 			ext_bkt_to_free = rte_zmalloc(NULL, sizeof(uint32_t) *
@@ -433,8 +432,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	}
 
 	/* Populate free slots ring. Entry zero is reserved for key misses. */
-	for (i = 1; i < num_key_slots; i++)
-		rte_ring_sp_enqueue(r, (void *)((uintptr_t) i));
+	for (uint32_t i = 1; i < num_key_slots; i++)
+		rte_ring_sp_enqueue_elem(r, &i, sizeof(uint32_t));
 
 	te->data = (void *) h;
 	TAILQ_INSERT_TAIL(hash_list, te, next);
@@ -598,13 +597,13 @@ rte_hash_reset(struct rte_hash *h)
 		tot_ring_cnt = h->entries;
 
 	for (i = 1; i < tot_ring_cnt + 1; i++)
-		rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
+		rte_ring_sp_enqueue_elem(h->free_slots, &i, sizeof(uint32_t));
 
 	/* Repopulate the free ext bkt ring. */
 	if (h->ext_table_support) {
 		for (i = 1; i <= h->num_buckets; i++)
-			rte_ring_sp_enqueue(h->free_ext_bkts,
-						(void *)((uintptr_t) i));
+			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &i,
+							sizeof(uint32_t));
 	}
 
 	if (h->use_local_cache) {
@@ -623,13 +622,14 @@ rte_hash_reset(struct rte_hash *h)
 static inline void
 enqueue_slot_back(const struct rte_hash *h,
 		struct lcore_cache *cached_free_slots,
-		void *slot_id)
+		uint32_t slot_id)
 {
 	if (h->use_local_cache) {
 		cached_free_slots->objs[cached_free_slots->len] = slot_id;
 		cached_free_slots->len++;
 	} else
-		rte_ring_sp_enqueue(h->free_slots, slot_id);
+		rte_ring_sp_enqueue_elem(h->free_slots, &slot_id,
+						sizeof(uint32_t));
 }
 
 /* Search a key from bucket and update its data.
@@ -923,9 +923,8 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t prim_bucket_idx, sec_bucket_idx;
 	struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt;
 	struct rte_hash_key *new_k, *keys = h->key_store;
-	void *slot_id = NULL;
-	void *ext_bkt_id = NULL;
-	uint32_t new_idx, bkt_id;
+	uint32_t slot_id;
+	uint32_t ext_bkt_id;
 	int ret;
 	unsigned n_slots;
 	unsigned lcore_id;
@@ -968,8 +967,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		/* Try to get a free slot from the local cache */
 		if (cached_free_slots->len == 0) {
 			/* Need to get another burst of free slots from global ring */
-			n_slots = rte_ring_mc_dequeue_burst(h->free_slots,
+			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
 					cached_free_slots->objs,
+					sizeof(uint32_t),
 					LCORE_CACHE_SIZE, NULL);
 			if (n_slots == 0) {
 				return -ENOSPC;
@@ -982,13 +982,13 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		cached_free_slots->len--;
 		slot_id = cached_free_slots->objs[cached_free_slots->len];
 	} else {
-		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
+		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+						sizeof(uint32_t)) != 0) {
 			return -ENOSPC;
 		}
 	}
 
-	new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
-	new_idx = (uint32_t)((uintptr_t) slot_id);
+	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
 	/* The store to application data (by the application) at *data should
 	 * not leak after the store of pdata in the key store. i.e. pdata is
 	 * the guard variable. Release the application data to the readers.
@@ -1001,9 +1001,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 
 	/* Find an empty slot and insert */
 	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
-					short_sig, new_idx, &ret_val);
+					short_sig, slot_id, &ret_val);
 	if (ret == 0)
-		return new_idx - 1;
+		return slot_id - 1;
 	else if (ret == 1) {
 		enqueue_slot_back(h, cached_free_slots, slot_id);
 		return ret_val;
@@ -1011,9 +1011,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 
 	/* Primary bucket full, need to make space for new entry */
 	ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
-				short_sig, prim_bucket_idx, new_idx, &ret_val);
+				short_sig, prim_bucket_idx, slot_id, &ret_val);
 	if (ret == 0)
-		return new_idx - 1;
+		return slot_id - 1;
 	else if (ret == 1) {
 		enqueue_slot_back(h, cached_free_slots, slot_id);
 		return ret_val;
@@ -1021,10 +1021,10 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 
 	/* Also search secondary bucket to get better occupancy */
 	ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
-				short_sig, sec_bucket_idx, new_idx, &ret_val);
+				short_sig, sec_bucket_idx, slot_id, &ret_val);
 
 	if (ret == 0)
-		return new_idx - 1;
+		return slot_id - 1;
 	else if (ret == 1) {
 		enqueue_slot_back(h, cached_free_slots, slot_id);
 		return ret_val;
@@ -1067,10 +1067,10 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 				 * and key.
 				 */
 				__atomic_store_n(&cur_bkt->key_idx[i],
-						 new_idx,
+						 slot_id,
 						 __ATOMIC_RELEASE);
 				__hash_rw_writer_unlock(h);
-				return new_idx - 1;
+				return slot_id - 1;
 			}
 		}
 	}
@@ -1078,26 +1078,26 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	/* Failed to get an empty entry from extendable buckets. Link a new
 	 * extendable bucket. We first get a free bucket from ring.
 	 */
-	if (rte_ring_sc_dequeue(h->free_ext_bkts, &ext_bkt_id) != 0) {
+	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
+						sizeof(uint32_t)) != 0) {
 		ret = -ENOSPC;
 		goto failure;
 	}
 
-	bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1;
 	/* Use the first location of the new bucket */
-	(h->buckets_ext[bkt_id]).sig_current[0] = short_sig;
+	(h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig;
 	/* Store to signature and key should not leak after
 	 * the store to key_idx. i.e. key_idx is the guard variable
 	 * for signature and key.
 	 */
-	__atomic_store_n(&(h->buckets_ext[bkt_id]).key_idx[0],
-			 new_idx,
+	__atomic_store_n(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0],
+			 slot_id,
 			 __ATOMIC_RELEASE);
 	/* Link the new bucket to sec bucket linked list */
 	last = rte_hash_get_last_bkt(sec_bkt);
-	last->next = &h->buckets_ext[bkt_id];
+	last->next = &h->buckets_ext[ext_bkt_id - 1];
 	__hash_rw_writer_unlock(h);
-	return new_idx - 1;
+	return slot_id - 1;
 
 failure:
 	__hash_rw_writer_unlock(h);
@@ -1373,8 +1373,9 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 		/* Cache full, need to free it. */
 		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
 			/* Need to enqueue the free slots in global ring. */
-			n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
+			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
 						cached_free_slots->objs,
+						sizeof(uint32_t),
 						LCORE_CACHE_SIZE, NULL);
 			ERR_IF_TRUE((n_slots == 0),
 				"%s: could not enqueue free slots in global ring\n",
@@ -1383,11 +1384,11 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 		}
 		/* Put index of new free slot in cache. */
 		cached_free_slots->objs[cached_free_slots->len] =
-				(void *)((uintptr_t)bkt->key_idx[i]);
+							bkt->key_idx[i];
 		cached_free_slots->len++;
 	} else {
-		rte_ring_sp_enqueue(h->free_slots,
-				(void *)((uintptr_t)bkt->key_idx[i]));
+		rte_ring_sp_enqueue_elem(h->free_slots,
+				&bkt->key_idx[i], sizeof(uint32_t));
 	}
 }
 
@@ -1551,7 +1552,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 			 */
 			h->ext_bkt_to_free[ret] = index;
 		else
-			rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index);
+			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
+							sizeof(uint32_t));
 	}
 	__hash_rw_writer_unlock(h);
 	return ret;
@@ -1614,7 +1616,8 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 		uint32_t index = h->ext_bkt_to_free[position];
 		if (index) {
 			/* Recycle empty ext bkt to free list. */
-			rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index);
+			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
+							sizeof(uint32_t));
 			h->ext_bkt_to_free[position] = 0;
 		}
 	}
@@ -1625,19 +1628,19 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
 		/* Cache full, need to free it. */
 		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
 			/* Need to enqueue the free slots in global ring. */
-			n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
+			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
 						cached_free_slots->objs,
+						sizeof(uint32_t),
 						LCORE_CACHE_SIZE, NULL);
 			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
 			cached_free_slots->len -= n_slots;
 		}
 		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] =
-					(void *)((uintptr_t)key_idx);
+		cached_free_slots->objs[cached_free_slots->len] = key_idx;
 		cached_free_slots->len++;
 	} else {
-		rte_ring_sp_enqueue(h->free_slots,
-				(void *)((uintptr_t)key_idx));
+		rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
+						sizeof(uint32_t));
 	}
 
 	return 0;
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index fb19bb27d..345de6bf9 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -124,7 +124,7 @@ const rte_hash_cmp_eq_t cmp_jump_table[NUM_KEY_CMP_CASES] = {
 
 struct lcore_cache {
 	unsigned len; /**< Cache len */
-	void *objs[LCORE_CACHE_SIZE]; /**< Cache objects */
+	uint32_t objs[LCORE_CACHE_SIZE]; /**< Cache objects */
 } __rte_cache_aligned;
 
 /* Structure that stores key-value pair */
-- 
2.17.1



More information about the dev mailing list