[dpdk-dev] [RFC] mempool: implement index-based per core cache
Dharmik Thakkar
dharmik.thakkar at arm.com
Thu Sep 30 19:27:35 CEST 2021
Current mempool per core cache implementation is based on pointer
For most architectures, each pointer consumes 64b
Replace it with index-based implementation, where in each buffer
is addressed by (pool address + index)
It will reduce memory requirements
L3Fwd performance testing reveals minor improvements in the cache
performance and no change in throughput
Micro-benchmarking the patch using mempool_perf_test shows
significant improvement with majority of the test cases
Future plan involves replacing global pool's pointer-based implementation with index-based implementation
Signed-off-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
---
drivers/mempool/ring/rte_mempool_ring.c | 2 +-
lib/mempool/rte_mempool.c | 8 +++
lib/mempool/rte_mempool.h | 74 ++++++++++++++++++++++---
3 files changed, 74 insertions(+), 10 deletions(-)
diff --git a/drivers/mempool/ring/rte_mempool_ring.c b/drivers/mempool/ring/rte_mempool_ring.c
index b1f09ff28f4d..e55913e47f21 100644
--- a/drivers/mempool/ring/rte_mempool_ring.c
+++ b/drivers/mempool/ring/rte_mempool_ring.c
@@ -101,7 +101,7 @@ ring_alloc(struct rte_mempool *mp, uint32_t rg_flags)
return -rte_errno;
mp->pool_data = r;
-
+ mp->local_cache_base_addr = &r[1];
return 0;
}
diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
index 59a588425bd6..424bdb19c323 100644
--- a/lib/mempool/rte_mempool.c
+++ b/lib/mempool/rte_mempool.c
@@ -480,6 +480,7 @@ rte_mempool_populate_default(struct rte_mempool *mp)
int ret;
bool need_iova_contig_obj;
size_t max_alloc_size = SIZE_MAX;
+ unsigned lcore_id;
ret = mempool_ops_alloc_once(mp);
if (ret != 0)
@@ -600,6 +601,13 @@ rte_mempool_populate_default(struct rte_mempool *mp)
}
}
+ /* Init all default caches. */
+ if (mp->cache_size != 0) {
+ for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
+ mp->local_cache[lcore_id].local_cache_base_value =
+ *(void **)mp->local_cache_base_addr;
+ }
+
rte_mempool_trace_populate_default(mp);
return mp->size;
diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
index 4235d6f0bf2b..545405c0d3ce 100644
--- a/lib/mempool/rte_mempool.h
+++ b/lib/mempool/rte_mempool.h
@@ -51,6 +51,8 @@
#include <rte_memcpy.h>
#include <rte_common.h>
+#include <arm_neon.h>
+
#include "rte_mempool_trace_fp.h"
#ifdef __cplusplus
@@ -91,11 +93,12 @@ struct rte_mempool_cache {
uint32_t size; /**< Size of the cache */
uint32_t flushthresh; /**< Threshold before we flush excess elements */
uint32_t len; /**< Current cache count */
+ void *local_cache_base_value; /**< Base value to calculate indices */
/*
* Cache is allocated to this size to allow it to overflow in certain
* cases to avoid needless emptying of cache.
*/
- void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */
+ uint32_t objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */
} __rte_cache_aligned;
/**
@@ -172,7 +175,6 @@ struct rte_mempool_objtlr {
* A list of memory where objects are stored
*/
STAILQ_HEAD(rte_mempool_memhdr_list, rte_mempool_memhdr);
-
/**
* Callback used to free a memory chunk
*/
@@ -244,6 +246,7 @@ struct rte_mempool {
int32_t ops_index;
struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */
+ void *local_cache_base_addr; /**< Reference to the base value */
uint32_t populated_size; /**< Number of populated objects. */
struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
@@ -1269,7 +1272,15 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
if (cache == NULL || cache->len == 0)
return;
rte_mempool_trace_cache_flush(cache, mp);
- rte_mempool_ops_enqueue_bulk(mp, cache->objs, cache->len);
+
+ unsigned int i;
+ unsigned int cache_len = cache->len;
+ void *obj_table[RTE_MEMPOOL_CACHE_MAX_SIZE * 3];
+ void *base_value = cache->local_cache_base_value;
+ uint32_t *cache_objs = cache->objs;
+ for (i = 0; i < cache_len; i++)
+ obj_table[i] = (void *) RTE_PTR_ADD(base_value, cache_objs[i]);
+ rte_mempool_ops_enqueue_bulk(mp, obj_table, cache->len);
cache->len = 0;
}
@@ -1289,7 +1300,9 @@ static __rte_always_inline void
__mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
unsigned int n, struct rte_mempool_cache *cache)
{
- void **cache_objs;
+ uint32_t *cache_objs;
+ void *base_value;
+ uint32_t i;
/* increment stat now, adding in mempool always success */
__MEMPOOL_STAT_ADD(mp, put_bulk, 1);
@@ -1301,6 +1314,12 @@ __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
cache_objs = &cache->objs[cache->len];
+ base_value = cache->local_cache_base_value;
+
+ uint64x2_t v_obj_table;
+ uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value);
+ uint32x2_t v_cache_objs;
+
/*
* The cache follows the following algorithm
* 1. Add the objects to the cache
@@ -1309,12 +1328,26 @@ __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
*/
/* Add elements back into the cache */
- rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);
+
+#if defined __ARM_NEON
+ for (i = 0; i < (n & ~0x1); i+=2) {
+ v_obj_table = vld1q_u64((const uint64_t *)&obj_table[i]);
+ v_cache_objs = vqmovn_u64(vsubq_u64(v_obj_table, v_base_value));
+ vst1_u32(cache_objs + i, v_cache_objs);
+ }
+ if (n & 0x1) {
+ cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], base_value);
+ }
+#else
+ for (i = 0; i < n; i++) {
+ cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], base_value);
+ }
+#endif
cache->len += n;
if (cache->len >= cache->flushthresh) {
- rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size],
+ rte_mempool_ops_enqueue_bulk(mp, obj_table + cache->len - cache->size,
cache->len - cache->size);
cache->len = cache->size;
}
@@ -1415,23 +1448,26 @@ __mempool_generic_get(struct rte_mempool *mp, void **obj_table,
unsigned int n, struct rte_mempool_cache *cache)
{
int ret;
+ uint32_t i;
uint32_t index, len;
- void **cache_objs;
+ uint32_t *cache_objs;
/* No cache provided or cannot be satisfied from cache */
if (unlikely(cache == NULL || n >= cache->size))
goto ring_dequeue;
+ void *base_value = cache->local_cache_base_value;
cache_objs = cache->objs;
/* Can this be satisfied from the cache? */
if (cache->len < n) {
/* No. Backfill the cache first, and then fill from it */
uint32_t req = n + (cache->size - cache->len);
+ void *temp_objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */
/* How many do we require i.e. number to fill the cache + the request */
ret = rte_mempool_ops_dequeue_bulk(mp,
- &cache->objs[cache->len], req);
+ temp_objs, req);
if (unlikely(ret < 0)) {
/*
* In the off chance that we are buffer constrained,
@@ -1442,12 +1478,32 @@ __mempool_generic_get(struct rte_mempool *mp, void **obj_table,
goto ring_dequeue;
}
+ len = cache->len;
+ for (i = 0; i < req; ++i, ++len) {
+ cache_objs[len] = (uint32_t) RTE_PTR_DIFF(temp_objs[i], base_value);
+ }
+
cache->len += req;
}
+ uint64x2_t v_obj_table;
+ uint64x2_t v_cache_objs;
+ uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value);
+
/* Now fill in the response ... */
+#if defined __ARM_NEON
+ for (index = 0, len = cache->len - 1; index < (n & ~0x1); index+=2,
+ len-=2, obj_table+=2) {
+ v_cache_objs = vmovl_u32(vld1_u32(cache_objs + len - 1));
+ v_obj_table = vaddq_u64(v_cache_objs, v_base_value);
+ vst1q_u64((uint64_t *)obj_table, v_obj_table);
+ }
+ if (n & 0x1)
+ *obj_table = (void *) RTE_PTR_ADD(base_value, cache_objs[len]);
+#else
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
- *obj_table = cache_objs[len];
+ *obj_table = (void *) RTE_PTR_ADD(base_value, cache_objs[len]);
+#endif
cache->len -= n;
--
2.17.1
More information about the dev
mailing list