<div dir="ltr">Recheck-request: iol-intel-Performance</div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Tue, Sep 24, 2024 at 2:12 PM Morten Brørup <<a href="mailto:mb@smartsharesystems.com">mb@smartsharesystems.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">This patch refactors the mempool cache to fix two bugs:<br>
1. When a mempool is created with a cache size of N objects, the cache was<br>
actually created with a size of 1.5 * N objects.<br>
2. The mempool cache field names did not reflect their purpose;<br>
the "flushthresh" field held the size, and the "size" field held the<br>
number of objects remaining in the cache when returning from a get<br>
operation refilling it from the backend.<br>
<br>
Especially the first item could be fatal:<br>
When more objects than a mempool's configured cache size is held in the<br>
mempool's caches associated with other lcores, a rightsized mempool may<br>
unexpectedly run out of objects, causing the application to fail.<br>
<br>
Furthermore, this patch introduces two optimizations:<br>
1. The mempool caches are flushed to/filled from the backend in their<br>
entirety, so backend accesses are CPU cache line aligned. (Assuming the<br>
mempool cache size is a multiplum of a CPU cache line size divided by the<br>
size of a pointer.)<br>
2. The unlikely paths in the get and put functions, where the cache is<br>
flushed to/filled from the backend, are moved from the inline functions to<br>
separate helper functions, thereby reducing the code size of the inline<br>
functions.<br>
Note: Accessing the backend for cacheless mempools remains inline.<br>
<br>
Various drivers accessing the mempool directly have been updated<br>
accordingly.<br>
These drivers did not update mempool statistics when accessing the mempool<br>
directly, so that is fixed too.<br>
<br>
Note: Performance not yet benchmarked.<br>
<br>
Signed-off-by: Morten Brørup <<a href="mailto:mb@smartsharesystems.com" target="_blank">mb@smartsharesystems.com</a>><br>
---<br>
v7:<br>
* Increased max mempool cache size from 512 to 1024 objects.<br>
Mainly for CI performance test purposes.<br>
Originally, the max mempool cache size was 768 objects, and used a fixed<br>
size array of 1024 objects in the mempool cache structure.<br>
v6:<br>
* Fix v5 incomplete implementation of passing large requests directly to<br>
the backend.<br>
* Use memcpy instead of rte_memcpy where compiler complains about it.<br>
* Added const to some function parameters.<br>
v5:<br>
* Moved helper functions back into the header file, for improved<br>
performance.<br>
* Pass large requests directly to the backend. This also simplifies the<br>
code.<br>
v4:<br>
* Updated subject to reflect that misleading names are considered bugs.<br>
* Rewrote patch description to provide more details about the bugs fixed.<br>
(Mattias Rönnblom)<br>
* Moved helper functions, not to be inlined, to mempool C file.<br>
(Mattias Rönnblom)<br>
* Pass requests for n >= RTE_MEMPOOL_CACHE_MAX_SIZE objects known at build<br>
time directly to backend driver, to avoid calling the helper functions.<br>
This also fixes the compiler warnings about out of bounds array access.<br>
v3:<br>
* Removed __attribute__(assume).<br>
v2:<br>
* Removed mempool perf test; not part of patch set.<br>
---<br>
config/rte_config.h | 2 +-<br>
drivers/common/idpf/idpf_common_rxtx_avx512.c | 54 +---<br>
drivers/mempool/dpaa/dpaa_mempool.c | 16 +-<br>
drivers/mempool/dpaa2/dpaa2_hw_mempool.c | 14 -<br>
drivers/net/i40e/i40e_rxtx_vec_avx512.c | 17 +-<br>
drivers/net/iavf/iavf_rxtx_vec_avx512.c | 27 +-<br>
drivers/net/ice/ice_rxtx_vec_avx512.c | 27 +-<br>
lib/mempool/mempool_trace.h | 1 -<br>
lib/mempool/rte_mempool.c | 12 +-<br>
lib/mempool/rte_mempool.h | 287 ++++++++++++------<br>
10 files changed, 232 insertions(+), 225 deletions(-)<br>
<br>
diff --git a/config/rte_config.h b/config/rte_config.h<br>
index dd7bb0d35b..2488ff167d 100644<br>
--- a/config/rte_config.h<br>
+++ b/config/rte_config.h<br>
@@ -56,7 +56,7 @@<br>
#define RTE_CONTIGMEM_DEFAULT_BUF_SIZE (512*1024*1024)<br>
<br>
/* mempool defines */<br>
-#define RTE_MEMPOOL_CACHE_MAX_SIZE 512<br>
+#define RTE_MEMPOOL_CACHE_MAX_SIZE 1024<br>
/* RTE_LIBRTE_MEMPOOL_STATS is not set */<br>
/* RTE_LIBRTE_MEMPOOL_DEBUG is not set */<br>
<br>
diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
index 3b5e124ec8..98535a48f3 100644<br>
--- a/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
+++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
@@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
rte_lcore_id());<br>
void **cache_objs;<br>
<br>
- if (cache == NULL || cache->len == 0)<br>
- goto normal;<br>
-<br>
- cache_objs = &cache->objs[cache->len];<br>
-<br>
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+ if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
goto done;<br>
}<br>
<br>
- /* The cache follows the following algorithm<br>
- * 1. Add the objects to the cache<br>
- * 2. Anything greater than the cache min value (if it crosses the<br>
- * cache flush threshold) is flushed to the ring.<br>
- */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+<br>
/* Add elements back into the cache */<br>
uint32_t copied = 0;<br>
/* n is multiple of 32 */<br>
@@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
}<br>
cache->len += n;<br>
<br>
- if (cache->len >= cache->flushthresh) {<br>
- rte_mempool_ops_enqueue_bulk(mp,<br>
- &cache->objs[cache->size],<br>
- cache->len - cache->size);<br>
- cache->len = cache->size;<br>
- }<br>
+ /* Increment stat. */<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
goto done;<br>
}<br>
<br>
-normal:<br>
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
if (likely(m != NULL)) {<br>
free[0] = m;<br>
@@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
rte_lcore_id());<br>
void **cache_objs;<br>
<br>
- if (!cache || cache->len == 0)<br>
- goto normal;<br>
-<br>
- cache_objs = &cache->objs[cache->len];<br>
-<br>
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+ if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
goto done;<br>
}<br>
<br>
- /* The cache follows the following algorithm<br>
- * 1. Add the objects to the cache<br>
- * 2. Anything greater than the cache min value (if it crosses the<br>
- * cache flush threshold) is flushed to the ring.<br>
- */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+<br>
/* Add elements back into the cache */<br>
uint32_t copied = 0;<br>
/* n is multiple of 32 */<br>
@@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
}<br>
cache->len += n;<br>
<br>
- if (cache->len >= cache->flushthresh) {<br>
- rte_mempool_ops_enqueue_bulk(mp,<br>
- &cache->objs[cache->size],<br>
- cache->len - cache->size);<br>
- cache->len = cache->size;<br>
- }<br>
+ /* Increment stat. */<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
goto done;<br>
}<br>
<br>
-normal:<br>
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
if (likely(m)) {<br>
free[0] = m;<br>
diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c<br>
index 74bfcab509..3a936826c8 100644<br>
--- a/drivers/mempool/dpaa/dpaa_mempool.c<br>
+++ b/drivers/mempool/dpaa/dpaa_mempool.c<br>
@@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)<br>
struct bman_pool_params params = {<br>
.flags = BMAN_POOL_FLAG_DYNAMIC_BPID<br>
};<br>
- unsigned int lcore_id;<br>
- struct rte_mempool_cache *cache;<br>
<br>
MEMPOOL_INIT_FUNC_TRACE();<br>
<br>
@@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)<br>
rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],<br>
sizeof(struct dpaa_bp_info));<br>
mp->pool_data = (void *)bp_info;<br>
- /* Update per core mempool cache threshold to optimal value which is<br>
- * number of buffers that can be released to HW buffer pool in<br>
- * a single API call.<br>
- */<br>
- for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {<br>
- cache = &mp->local_cache[lcore_id];<br>
- DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",<br>
- lcore_id, cache->flushthresh,<br>
- (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));<br>
- if (cache->flushthresh)<br>
- cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;<br>
- }<br>
<br>
DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);<br>
return 0;<br>
@@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,<br>
DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",<br>
count, bp_info->bpid);<br>
<br>
- if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {<br>
+ if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {<br>
DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers",<br>
count);<br>
return -1;<br>
diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
index 42e17d984c..a44f3cf616 100644<br>
--- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
+++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
@@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)<br>
struct dpaa2_bp_info *bp_info;<br>
struct dpbp_attr dpbp_attr;<br>
uint32_t bpid;<br>
- unsigned int lcore_id;<br>
- struct rte_mempool_cache *cache;<br>
int ret;<br>
<br>
avail_dpbp = dpaa2_alloc_dpbp_dev();<br>
@@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)<br>
DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);<br>
<br>
h_bp_list = bp_list;<br>
- /* Update per core mempool cache threshold to optimal value which is<br>
- * number of buffers that can be released to HW buffer pool in<br>
- * a single API call.<br>
- */<br>
- for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {<br>
- cache = &mp->local_cache[lcore_id];<br>
- DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",<br>
- lcore_id, cache->flushthresh,<br>
- (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));<br>
- if (cache->flushthresh)<br>
- cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;<br>
- }<br>
<br>
return 0;<br>
err3:<br>
diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
index 0238b03f8a..712ab1726f 100644<br>
--- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
+++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
@@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)<br>
struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,<br>
rte_lcore_id());<br>
<br>
- if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
+ if (!cache || unlikely(n + cache->len > cache->size)) {<br>
rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
goto done;<br>
}<br>
<br>
cache_objs = &cache->objs[cache->len];<br>
<br>
- /* The cache follows the following algorithm<br>
- * 1. Add the objects to the cache<br>
- * 2. Anything greater than the cache min value (if it<br>
- * crosses the cache flush threshold) is flushed to the ring.<br>
- */<br>
/* Add elements back into the cache */<br>
uint32_t copied = 0;<br>
/* n is multiple of 32 */<br>
@@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)<br>
}<br>
cache->len += n;<br>
<br>
- if (cache->len >= cache->flushthresh) {<br>
- rte_mempool_ops_enqueue_bulk<br>
- (mp, &cache->objs[cache->size],<br>
- cache->len - cache->size);<br>
- cache->len = cache->size;<br>
- }<br>
+ /* Increment stat. */<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
goto done;<br>
}<br>
<br>
diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
index 3bb6f305df..307bb8556a 100644<br>
--- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
+++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
@@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)<br>
rte_lcore_id());<br>
void **cache_objs;<br>
<br>
- if (!cache || cache->len == 0)<br>
- goto normal;<br>
-<br>
- cache_objs = &cache->objs[cache->len];<br>
-<br>
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+ if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
goto done;<br>
}<br>
<br>
- /* The cache follows the following algorithm<br>
- * 1. Add the objects to the cache<br>
- * 2. Anything greater than the cache min value (if it crosses the<br>
- * cache flush threshold) is flushed to the ring.<br>
- */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+<br>
/* Add elements back into the cache */<br>
uint32_t copied = 0;<br>
/* n is multiple of 32 */<br>
@@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)<br>
}<br>
cache->len += n;<br>
<br>
- if (cache->len >= cache->flushthresh) {<br>
- rte_mempool_ops_enqueue_bulk(mp,<br>
- &cache->objs[cache->size],<br>
- cache->len - cache->size);<br>
- cache->len = cache->size;<br>
- }<br>
+ /* Increment stat. */<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
goto done;<br>
}<br>
<br>
-normal:<br>
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
if (likely(m)) {<br>
free[0] = m;<br>
diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
index 04148e8ea2..4ea1db734e 100644<br>
--- a/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
+++ b/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
@@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)<br>
struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,<br>
rte_lcore_id());<br>
<br>
- if (!cache || cache->len == 0)<br>
- goto normal;<br>
-<br>
- cache_objs = &cache->objs[cache->len];<br>
-<br>
- if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
- rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+ if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+ rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
goto done;<br>
}<br>
<br>
- /* The cache follows the following algorithm<br>
- * 1. Add the objects to the cache<br>
- * 2. Anything greater than the cache min value (if it<br>
- * crosses the cache flush threshold) is flushed to the ring.<br>
- */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+<br>
/* Add elements back into the cache */<br>
uint32_t copied = 0;<br>
/* n is multiple of 32 */<br>
@@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)<br>
}<br>
cache->len += n;<br>
<br>
- if (cache->len >= cache->flushthresh) {<br>
- rte_mempool_ops_enqueue_bulk<br>
- (mp, &cache->objs[cache->size],<br>
- cache->len - cache->size);<br>
- cache->len = cache->size;<br>
- }<br>
+ /* Increment stat. */<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
goto done;<br>
}<br>
<br>
-normal:<br>
m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
if (likely(m)) {<br>
free[0] = m;<br>
diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h<br>
index dffef062e4..3c49b41a6d 100644<br>
--- a/lib/mempool/mempool_trace.h<br>
+++ b/lib/mempool/mempool_trace.h<br>
@@ -112,7 +112,6 @@ RTE_TRACE_POINT(<br>
rte_trace_point_emit_i32(socket_id);<br>
rte_trace_point_emit_ptr(cache);<br>
rte_trace_point_emit_u32(cache->len);<br>
- rte_trace_point_emit_u32(cache->flushthresh);<br>
)<br>
<br>
RTE_TRACE_POINT(<br>
diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c<br>
index d8e39e5c20..40fb13239a 100644<br>
--- a/lib/mempool/rte_mempool.c<br>
+++ b/lib/mempool/rte_mempool.c<br>
@@ -50,11 +50,6 @@ static void<br>
mempool_event_callback_invoke(enum rte_mempool_event event,<br>
struct rte_mempool *mp);<br>
<br>
-/* Note: avoid using floating point since that compiler<br>
- * may not think that is constant.<br>
- */<br>
-#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)<br>
-<br>
#if defined(RTE_ARCH_X86)<br>
/*<br>
* return the greatest common divisor between a and b (fast algorithm)<br>
@@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)<br>
static void<br>
mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)<br>
{<br>
- /* Check that cache have enough space for flush threshold */<br>
- RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) ><br>
+ /* Check that cache have enough space for size */<br>
+ RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE ><br>
RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /<br>
RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));<br>
<br>
cache->size = size;<br>
- cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);<br>
cache->len = 0;<br>
}<br>
<br>
@@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,<br>
<br>
/* asked cache too big */<br>
if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||<br>
- CALC_CACHE_FLUSHTHRESH(cache_size) > n) {<br>
+ cache_size > n) {<br>
rte_errno = EINVAL;<br>
return NULL;<br>
}<br>
diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h<br>
index 7bdc92b812..0801cec24a 100644<br>
--- a/lib/mempool/rte_mempool.h<br>
+++ b/lib/mempool/rte_mempool.h<br>
@@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {<br>
*/<br>
struct __rte_cache_aligned rte_mempool_cache {<br>
uint32_t size; /**< Size of the cache */<br>
- uint32_t flushthresh; /**< Threshold before we flush excess elements */<br>
uint32_t len; /**< Current cache count */<br>
#ifdef RTE_LIBRTE_MEMPOOL_STATS<br>
- uint32_t unused;<br>
/*<br>
* Alternative location for the most frequently updated mempool statistics (per-lcore),<br>
* providing faster update access when using a mempool cache.<br>
@@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {<br>
* Cache is allocated to this size to allow it to overflow in certain<br>
* cases to avoid needless emptying of cache.<br>
*/<br>
- alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2];<br>
+ alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];<br>
};<br>
<br>
/**<br>
@@ -1362,6 +1360,48 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,<br>
cache->len = 0;<br>
}<br>
<br>
+/**<br>
+ * @internal Put several objects back in the mempool; used internally when<br>
+ * the number of objects exceeds the remaining space in the mempool cache.<br>
+ * @param mp<br>
+ * A pointer to the mempool structure.<br>
+ * @param obj_table<br>
+ * A pointer to a table of void * pointers (objects).<br>
+ * @param n<br>
+ * The number of objects to store back in the mempool, must be strictly<br>
+ * positive.<br>
+ * Must be more than the remaining space in the mempool cache, i.e.:<br>
+ * cache->len + n > cache->size<br>
+ * Must be less than the size of the mempool cache, i.e.:<br>
+ * n < cache->size<br>
+ * @param cache<br>
+ * A pointer to a mempool cache structure. Not NULL.<br>
+ */<br>
+static void<br>
+rte_mempool_do_generic_put_split(struct rte_mempool *mp, void * const *obj_table,<br>
+ unsigned int n, struct rte_mempool_cache * const cache)<br>
+{<br>
+ void **cache_objs;<br>
+ unsigned int len;<br>
+ const uint32_t cache_size = cache->size;<br>
+<br>
+ /* Fill the cache with the first objects. */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+ len = (cache_size - cache->len);<br>
+ cache->len = n - len; /* Moved to here (for performance). */<br>
+ /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * len);<br>
+ obj_table += len;<br>
+ n -= len;<br>
+<br>
+ /* Flush the entire cache to the backend. */<br>
+ cache_objs = &cache->objs[0];<br>
+ rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);<br>
+<br>
+ /* Add the remaining objects to the cache. */<br>
+ /* Moved from here (for performance): cache->len = n; */<br>
+ /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+}<br>
+<br>
/**<br>
* @internal Put several objects back in the mempool; used internally.<br>
* @param mp<br>
@@ -1376,52 +1416,44 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,<br>
*/<br>
static __rte_always_inline void<br>
rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,<br>
- unsigned int n, struct rte_mempool_cache *cache)<br>
+ unsigned int n, struct rte_mempool_cache * const cache)<br>
{<br>
- void **cache_objs;<br>
-<br>
- /* No cache provided */<br>
+ /* No cache provided? */<br>
if (unlikely(cache == NULL))<br>
goto driver_enqueue;<br>
<br>
- /* increment stat now, adding in mempool always success */<br>
+ /* Increment stats now, adding in mempool always succeeds. */<br>
RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
<br>
- /* The request itself is too big for the cache */<br>
- if (unlikely(n > cache->flushthresh))<br>
+ /* The request itself is known to be too big for any cache? */<br>
+ if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)<br>
goto driver_enqueue_stats_incremented;<br>
<br>
- /*<br>
- * The cache follows the following algorithm:<br>
- * 1. If the objects cannot be added to the cache without crossing<br>
- * the flush threshold, flush the cache to the backend.<br>
- * 2. Add the objects to the cache.<br>
- */<br>
+ /* Enough remaining space in the cache? */<br>
+ if (likely(cache->len + n <= cache->size)) {<br>
+ void **cache_objs;<br>
<br>
- if (cache->len + n <= cache->flushthresh) {<br>
+ /* Add the objects to the cache. */<br>
cache_objs = &cache->objs[cache->len];<br>
cache->len += n;<br>
- } else {<br>
- cache_objs = &cache->objs[0];<br>
- rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);<br>
- cache->len = n;<br>
- }<br>
-<br>
- /* Add the objects to the cache. */<br>
- rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+ rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+ } else if (likely(n < cache->size))<br>
+ rte_mempool_do_generic_put_split(mp, obj_table, n, cache);<br>
+ else<br>
+ goto driver_enqueue_stats_incremented;<br>
<br>
return;<br>
<br>
driver_enqueue:<br>
<br>
- /* increment stat now, adding in mempool always success */<br>
+ /* Increment stats now, adding in mempool always succeeds. */<br>
RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);<br>
RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);<br>
<br>
driver_enqueue_stats_incremented:<br>
<br>
- /* push objects to the backend */<br>
+ /* Push the objects directly to the backend. */<br>
rte_mempool_ops_enqueue_bulk(mp, obj_table, n);<br>
}<br>
<br>
@@ -1490,122 +1522,183 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)<br>
}<br>
<br>
/**<br>
- * @internal Get several objects from the mempool; used internally.<br>
+ * @internal Get several objects from the mempool; used internally when<br>
+ * the number of objects exceeds what is available in the mempool cache.<br>
* @param mp<br>
* A pointer to the mempool structure.<br>
* @param obj_table<br>
* A pointer to a table of void * pointers (objects).<br>
* @param n<br>
* The number of objects to get, must be strictly positive.<br>
+ * Must be more than available in the mempool cache, i.e.:<br>
+ * n > cache->len<br>
* @param cache<br>
- * A pointer to a mempool cache structure. May be NULL if not needed.<br>
+ * A pointer to a mempool cache structure. Not NULL.<br>
* @return<br>
* - 0: Success.<br>
* - <0: Error; code of driver dequeue function.<br>
*/<br>
-static __rte_always_inline int<br>
-rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,<br>
- unsigned int n, struct rte_mempool_cache *cache)<br>
+static int<br>
+rte_mempool_do_generic_get_split(struct rte_mempool *mp, void **obj_table,<br>
+ unsigned int n, struct rte_mempool_cache * const cache)<br>
{<br>
int ret;<br>
unsigned int remaining;<br>
uint32_t index, len;<br>
void **cache_objs;<br>
+ const uint32_t cache_size = cache->size;<br>
<br>
- /* No cache provided */<br>
- if (unlikely(cache == NULL)) {<br>
- remaining = n;<br>
- goto driver_dequeue;<br>
- }<br>
-<br>
- /* The cache is a stack, so copy will be in reverse order. */<br>
+ /* Serve the first part of the request from the cache to return hot objects first. */<br>
cache_objs = &cache->objs[cache->len];<br>
+ len = cache->len;<br>
+ remaining = n - len;<br>
+ for (index = 0; index < len; index++)<br>
+ *obj_table++ = *--cache_objs;<br>
<br>
- if (__rte_constant(n) && n <= cache->len) {<br>
+ /* At this point, the cache is empty. */<br>
+<br>
+ /* More than can be served from a full cache? */<br>
+ if (unlikely(remaining >= cache_size)) {<br>
/*<br>
- * The request size is known at build time, and<br>
- * the entire request can be satisfied from the cache,<br>
- * so let the compiler unroll the fixed length copy loop.<br>
+ * Serve the following part of the request directly from the backend<br>
+ * in multipla of the cache size.<br>
*/<br>
- cache->len -= n;<br>
- for (index = 0; index < n; index++)<br>
- *obj_table++ = *--cache_objs;<br>
+ len = remaining - remaining % cache_size;<br>
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);<br>
+ if (unlikely(ret < 0)) {<br>
+ /*<br>
+ * No further action is required to roll back the request,<br>
+ * as objects in the cache are intact, and no objects have<br>
+ * been dequeued from the backend.<br>
+ */<br>
<br>
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
<br>
- return 0;<br>
- }<br>
+ return ret;<br>
+ }<br>
<br>
- /*<br>
- * Use the cache as much as we have to return hot objects first.<br>
- * If the request size 'n' is known at build time, the above comparison<br>
- * ensures that n > cache->len here, so omit RTE_MIN().<br>
- */<br>
- len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);<br>
- cache->len -= len;<br>
- remaining = n - len;<br>
- for (index = 0; index < len; index++)<br>
- *obj_table++ = *--cache_objs;<br>
+ remaining -= len;<br>
+ obj_table += len;<br>
<br>
- /*<br>
- * If the request size 'n' is known at build time, the case<br>
- * where the entire request can be satisfied from the cache<br>
- * has already been handled above, so omit handling it here.<br>
- */<br>
- if (!__rte_constant(n) && remaining == 0) {<br>
- /* The entire request is satisfied from the cache. */<br>
+ if (unlikely(remaining == 0)) {<br>
+ cache->len = 0;<br>
<br>
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
- RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
<br>
- return 0;<br>
+ return 0;<br>
+ }<br>
}<br>
<br>
- /* if dequeue below would overflow mem allocated for cache */<br>
- if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))<br>
- goto driver_dequeue;<br>
-<br>
- /* Fill the cache from the backend; fetch size + remaining objects. */<br>
- ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,<br>
- cache->size + remaining);<br>
+ /* Fill the entire cache from the backend. */<br>
+ ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);<br>
if (unlikely(ret < 0)) {<br>
/*<br>
- * We are buffer constrained, and not able to allocate<br>
- * cache + remaining.<br>
- * Do not fill the cache, just satisfy the remaining part of<br>
- * the request directly from the backend.<br>
+ * Unable to fill the cache.<br>
+ * Last resort: Try only the remaining part of the request,<br>
+ * served directly from the backend.<br>
*/<br>
- goto driver_dequeue;<br>
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);<br>
+ if (unlikely(ret == 0)) {<br>
+ cache->len = 0;<br>
+<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+<br>
+ return 0;<br>
+ }<br>
+<br>
+ /* Roll back. */<br>
+ if (cache->len + remaining == n) {<br>
+ /*<br>
+ * No further action is required to roll back the request,<br>
+ * as objects in the cache are intact, and no objects have<br>
+ * been dequeued from the backend.<br>
+ */<br>
+ } else {<br>
+ /* Update the state of the cache before putting back the objects. */<br>
+ cache->len = 0;<br>
+<br>
+ len = n - remaining;<br>
+ obj_table -= len;<br>
+ rte_mempool_do_generic_put(mp, obj_table, len, cache);<br>
+ }<br>
+<br>
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
+ RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
+<br>
+ return ret;<br>
}<br>
<br>
- /* Satisfy the remaining part of the request from the filled cache. */<br>
- cache_objs = &cache->objs[cache->size + remaining];<br>
+ /* Serve the remaining part of the request from the filled cache. */<br>
+ cache_objs = &cache->objs[cache_size];<br>
for (index = 0; index < remaining; index++)<br>
*obj_table++ = *--cache_objs;<br>
<br>
- cache->len = cache->size;<br>
+ cache->len = cache_size - remaining;<br>
<br>
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
<br>
return 0;<br>
+}<br>
<br>
-driver_dequeue:<br>
+/**<br>
+ * @internal Get several objects from the mempool; used internally.<br>
+ * @param mp<br>
+ * A pointer to the mempool structure.<br>
+ * @param obj_table<br>
+ * A pointer to a table of void * pointers (objects).<br>
+ * @param n<br>
+ * The number of objects to get, must be strictly positive.<br>
+ * @param cache<br>
+ * A pointer to a mempool cache structure. May be NULL if not needed.<br>
+ * @return<br>
+ * - 0: Success.<br>
+ * - <0: Error; code of driver dequeue function.<br>
+ */<br>
+static __rte_always_inline int<br>
+rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,<br>
+ unsigned int n, struct rte_mempool_cache * const cache)<br>
+{<br>
+ int ret;<br>
<br>
- /* Get remaining objects directly from the backend. */<br>
- ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);<br>
+ /* No cache provided? */<br>
+ if (unlikely(cache == NULL))<br>
+ goto driver_dequeue;<br>
<br>
- if (ret < 0) {<br>
- if (likely(cache != NULL)) {<br>
- cache->len = n - remaining;<br>
- /*<br>
- * No further action is required to roll the first part<br>
- * of the request back into the cache, as objects in<br>
- * the cache are intact.<br>
- */<br>
- }<br>
+ /* The request itself is known to be too big for any cache? */<br>
+ if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)<br>
+ goto driver_dequeue;<br>
+<br>
+ /* The request can be served entirely from the cache? */<br>
+ if (likely(n <= cache->len)) {<br>
+ unsigned int index;<br>
+ void **cache_objs;<br>
<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+ RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+<br>
+ /*<br>
+ * The cache is a stack, so copy will be in reverse order.<br>
+ * If the request size is known at build time,<br>
+ * the compiler will unroll the fixed length copy loop.<br>
+ */<br>
+ cache_objs = &cache->objs[cache->len];<br>
+ cache->len -= n;<br>
+ for (index = 0; index < n; index++)<br>
+ *obj_table++ = *--cache_objs;<br>
+<br>
+ return 0;<br>
+ } else<br>
+ return rte_mempool_do_generic_get_split(mp, obj_table, n, cache);<br>
+<br>
+driver_dequeue:<br>
+<br>
+ /* Get the objects directly from the backend. */<br>
+ ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);<br>
+ if (unlikely(ret < 0)) {<br>
RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
} else {<br>
-- <br>
2.43.0<br>
<br>
</blockquote></div>