<div dir="ltr">Recheck-request: iol-intel-Performance</div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Tue, Sep 24, 2024 at 2:12 PM Morten Brørup <<a href="mailto:mb@smartsharesystems.com">mb@smartsharesystems.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">This patch refactors the mempool cache to fix two bugs:<br>
1. When a mempool is created with a cache size of N objects, the cache was<br>
actually created with a size of 1.5 * N objects.<br>
2. The mempool cache field names did not reflect their purpose;<br>
the "flushthresh" field held the size, and the "size" field held the<br>
number of objects remaining in the cache when returning from a get<br>
operation refilling it from the backend.<br>
<br>
Especially the first item could be fatal:<br>
When more objects than a mempool's configured cache size is held in the<br>
mempool's caches associated with other lcores, a rightsized mempool may<br>
unexpectedly run out of objects, causing the application to fail.<br>
<br>
Furthermore, this patch introduces two optimizations:<br>
1. The mempool caches are flushed to/filled from the backend in their<br>
entirety, so backend accesses are CPU cache line aligned. (Assuming the<br>
mempool cache size is a multiplum of a CPU cache line size divided by the<br>
size of a pointer.)<br>
2. The unlikely paths in the get and put functions, where the cache is<br>
flushed to/filled from the backend, are moved from the inline functions to<br>
separate helper functions, thereby reducing the code size of the inline<br>
functions.<br>
Note: Accessing the backend for cacheless mempools remains inline.<br>
<br>
Various drivers accessing the mempool directly have been updated<br>
accordingly.<br>
These drivers did not update mempool statistics when accessing the mempool<br>
directly, so that is fixed too.<br>
<br>
Note: Performance not yet benchmarked.<br>
<br>
Signed-off-by: Morten Brørup <<a href="mailto:mb@smartsharesystems.com" target="_blank">mb@smartsharesystems.com</a>><br>
---<br>
v7:<br>
* Increased max mempool cache size from 512 to 1024 objects.<br>
  Mainly for CI performance test purposes.<br>
  Originally, the max mempool cache size was 768 objects, and used a fixed<br>
  size array of 1024 objects in the mempool cache structure.<br>
v6:<br>
* Fix v5 incomplete implementation of passing large requests directly to<br>
  the backend.<br>
* Use memcpy instead of rte_memcpy where compiler complains about it.<br>
* Added const to some function parameters.<br>
v5:<br>
* Moved helper functions back into the header file, for improved<br>
  performance.<br>
* Pass large requests directly to the backend. This also simplifies the<br>
  code.<br>
v4:<br>
* Updated subject to reflect that misleading names are considered bugs.<br>
* Rewrote patch description to provide more details about the bugs fixed.<br>
  (Mattias Rönnblom)<br>
* Moved helper functions, not to be inlined, to mempool C file.<br>
  (Mattias Rönnblom)<br>
* Pass requests for n >= RTE_MEMPOOL_CACHE_MAX_SIZE objects known at build<br>
  time directly to backend driver, to avoid calling the helper functions.<br>
  This also fixes the compiler warnings about out of bounds array access.<br>
v3:<br>
* Removed __attribute__(assume).<br>
v2:<br>
* Removed mempool perf test; not part of patch set.<br>
---<br>
 config/rte_config.h                           |   2 +-<br>
 drivers/common/idpf/idpf_common_rxtx_avx512.c |  54 +---<br>
 drivers/mempool/dpaa/dpaa_mempool.c           |  16 +-<br>
 drivers/mempool/dpaa2/dpaa2_hw_mempool.c      |  14 -<br>
 drivers/net/i40e/i40e_rxtx_vec_avx512.c       |  17 +-<br>
 drivers/net/iavf/iavf_rxtx_vec_avx512.c       |  27 +-<br>
 drivers/net/ice/ice_rxtx_vec_avx512.c         |  27 +-<br>
 lib/mempool/mempool_trace.h                   |   1 -<br>
 lib/mempool/rte_mempool.c                     |  12 +-<br>
 lib/mempool/rte_mempool.h                     | 287 ++++++++++++------<br>
 10 files changed, 232 insertions(+), 225 deletions(-)<br>
<br>
diff --git a/config/rte_config.h b/config/rte_config.h<br>
index dd7bb0d35b..2488ff167d 100644<br>
--- a/config/rte_config.h<br>
+++ b/config/rte_config.h<br>
@@ -56,7 +56,7 @@<br>
 #define RTE_CONTIGMEM_DEFAULT_BUF_SIZE (512*1024*1024)<br>
<br>
 /* mempool defines */<br>
-#define RTE_MEMPOOL_CACHE_MAX_SIZE 512<br>
+#define RTE_MEMPOOL_CACHE_MAX_SIZE 1024<br>
 /* RTE_LIBRTE_MEMPOOL_STATS is not set */<br>
 /* RTE_LIBRTE_MEMPOOL_DEBUG is not set */<br>
<br>
diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c b/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
index 3b5e124ec8..98535a48f3 100644<br>
--- a/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
+++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c<br>
@@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
                                                                rte_lcore_id());<br>
                void **cache_objs;<br>
<br>
-               if (cache == NULL || cache->len == 0)<br>
-                       goto normal;<br>
-<br>
-               cache_objs = &cache->objs[cache->len];<br>
-<br>
-               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+               if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+                       rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
                        goto done;<br>
                }<br>
<br>
-               /* The cache follows the following algorithm<br>
-                *   1. Add the objects to the cache<br>
-                *   2. Anything greater than the cache min value (if it crosses the<br>
-                *   cache flush threshold) is flushed to the ring.<br>
-                */<br>
+               cache_objs = &cache->objs[cache->len];<br>
+<br>
                /* Add elements back into the cache */<br>
                uint32_t copied = 0;<br>
                /* n is multiple of 32 */<br>
@@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
                }<br>
                cache->len += n;<br>
<br>
-               if (cache->len >= cache->flushthresh) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp,<br>
-                                                    &cache->objs[cache->size],<br>
-                                                    cache->len - cache->size);<br>
-                       cache->len = cache->size;<br>
-               }<br>
+               /* Increment stat. */<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
                goto done;<br>
        }<br>
<br>
-normal:<br>
        m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
        if (likely(m != NULL)) {<br>
                free[0] = m;<br>
@@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
                                                                rte_lcore_id());<br>
                void **cache_objs;<br>
<br>
-               if (!cache || cache->len == 0)<br>
-                       goto normal;<br>
-<br>
-               cache_objs = &cache->objs[cache->len];<br>
-<br>
-               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+               if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+                       rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
                        goto done;<br>
                }<br>
<br>
-               /* The cache follows the following algorithm<br>
-                *   1. Add the objects to the cache<br>
-                *   2. Anything greater than the cache min value (if it crosses the<br>
-                *   cache flush threshold) is flushed to the ring.<br>
-                */<br>
+               cache_objs = &cache->objs[cache->len];<br>
+<br>
                /* Add elements back into the cache */<br>
                uint32_t copied = 0;<br>
                /* n is multiple of 32 */<br>
@@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct idpf_tx_queue *txq)<br>
                }<br>
                cache->len += n;<br>
<br>
-               if (cache->len >= cache->flushthresh) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp,<br>
-                                                    &cache->objs[cache->size],<br>
-                                                    cache->len - cache->size);<br>
-                       cache->len = cache->size;<br>
-               }<br>
+               /* Increment stat. */<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
                goto done;<br>
        }<br>
<br>
-normal:<br>
        m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
        if (likely(m)) {<br>
                free[0] = m;<br>
diff --git a/drivers/mempool/dpaa/dpaa_mempool.c b/drivers/mempool/dpaa/dpaa_mempool.c<br>
index 74bfcab509..3a936826c8 100644<br>
--- a/drivers/mempool/dpaa/dpaa_mempool.c<br>
+++ b/drivers/mempool/dpaa/dpaa_mempool.c<br>
@@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)<br>
        struct bman_pool_params params = {<br>
                .flags = BMAN_POOL_FLAG_DYNAMIC_BPID<br>
        };<br>
-       unsigned int lcore_id;<br>
-       struct rte_mempool_cache *cache;<br>
<br>
        MEMPOOL_INIT_FUNC_TRACE();<br>
<br>
@@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)<br>
        rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],<br>
                   sizeof(struct dpaa_bp_info));<br>
        mp->pool_data = (void *)bp_info;<br>
-       /* Update per core mempool cache threshold to optimal value which is<br>
-        * number of buffers that can be released to HW buffer pool in<br>
-        * a single API call.<br>
-        */<br>
-       for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {<br>
-               cache = &mp->local_cache[lcore_id];<br>
-               DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",<br>
-                       lcore_id, cache->flushthresh,<br>
-                       (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));<br>
-               if (cache->flushthresh)<br>
-                       cache->flushthresh = cache->size + DPAA_MBUF_MAX_ACQ_REL;<br>
-       }<br>
<br>
        DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);<br>
        return 0;<br>
@@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,<br>
        DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",<br>
                             count, bp_info->bpid);<br>
<br>
-       if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {<br>
+       if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {<br>
                DPAA_MEMPOOL_ERR("Unable to allocate requested (%u) buffers",<br>
                                 count);<br>
                return -1;<br>
diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
index 42e17d984c..a44f3cf616 100644<br>
--- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
+++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c<br>
@@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)<br>
        struct dpaa2_bp_info *bp_info;<br>
        struct dpbp_attr dpbp_attr;<br>
        uint32_t bpid;<br>
-       unsigned int lcore_id;<br>
-       struct rte_mempool_cache *cache;<br>
        int ret;<br>
<br>
        avail_dpbp = dpaa2_alloc_dpbp_dev();<br>
@@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)<br>
        DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d", dpbp_attr.bpid);<br>
<br>
        h_bp_list = bp_list;<br>
-       /* Update per core mempool cache threshold to optimal value which is<br>
-        * number of buffers that can be released to HW buffer pool in<br>
-        * a single API call.<br>
-        */<br>
-       for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {<br>
-               cache = &mp->local_cache[lcore_id];<br>
-               DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",<br>
-                       lcore_id, cache->flushthresh,<br>
-                       (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));<br>
-               if (cache->flushthresh)<br>
-                       cache->flushthresh = cache->size + DPAA2_MBUF_MAX_ACQ_REL;<br>
-       }<br>
<br>
        return 0;<br>
 err3:<br>
diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c b/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
index 0238b03f8a..712ab1726f 100644<br>
--- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
+++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c<br>
@@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)<br>
                struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,<br>
                                rte_lcore_id());<br>
<br>
-               if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
+               if (!cache || unlikely(n + cache->len > cache->size)) {<br>
                        rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
                        goto done;<br>
                }<br>
<br>
                cache_objs = &cache->objs[cache->len];<br>
<br>
-               /* The cache follows the following algorithm<br>
-                *   1. Add the objects to the cache<br>
-                *   2. Anything greater than the cache min value (if it<br>
-                *   crosses the cache flush threshold) is flushed to the ring.<br>
-                */<br>
                /* Add elements back into the cache */<br>
                uint32_t copied = 0;<br>
                /* n is multiple of 32 */<br>
@@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)<br>
                }<br>
                cache->len += n;<br>
<br>
-               if (cache->len >= cache->flushthresh) {<br>
-                       rte_mempool_ops_enqueue_bulk<br>
-                               (mp, &cache->objs[cache->size],<br>
-                               cache->len - cache->size);<br>
-                       cache->len = cache->size;<br>
-               }<br>
+               /* Increment stat. */<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
                goto done;<br>
        }<br>
<br>
diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c b/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
index 3bb6f305df..307bb8556a 100644<br>
--- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
+++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c<br>
@@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)<br>
                                                                rte_lcore_id());<br>
                void **cache_objs;<br>
<br>
-               if (!cache || cache->len == 0)<br>
-                       goto normal;<br>
-<br>
-               cache_objs = &cache->objs[cache->len];<br>
-<br>
-               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+               if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+                       rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
                        goto done;<br>
                }<br>
<br>
-               /* The cache follows the following algorithm<br>
-                *   1. Add the objects to the cache<br>
-                *   2. Anything greater than the cache min value (if it crosses the<br>
-                *   cache flush threshold) is flushed to the ring.<br>
-                */<br>
+               cache_objs = &cache->objs[cache->len];<br>
+<br>
                /* Add elements back into the cache */<br>
                uint32_t copied = 0;<br>
                /* n is multiple of 32 */<br>
@@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)<br>
                }<br>
                cache->len += n;<br>
<br>
-               if (cache->len >= cache->flushthresh) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp,<br>
-                                                    &cache->objs[cache->size],<br>
-                                                    cache->len - cache->size);<br>
-                       cache->len = cache->size;<br>
-               }<br>
+               /* Increment stat. */<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
                goto done;<br>
        }<br>
<br>
-normal:<br>
        m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
        if (likely(m)) {<br>
                free[0] = m;<br>
diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c b/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
index 04148e8ea2..4ea1db734e 100644<br>
--- a/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
+++ b/drivers/net/ice/ice_rxtx_vec_avx512.c<br>
@@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)<br>
                struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,<br>
                                rte_lcore_id());<br>
<br>
-               if (!cache || cache->len == 0)<br>
-                       goto normal;<br>
-<br>
-               cache_objs = &cache->objs[cache->len];<br>
-<br>
-               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {<br>
-                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);<br>
+               if (!cache || unlikely(n + cache->len > cache->size)) {<br>
+                       rte_mempool_generic_put(mp, (void *)txep, n, cache);<br>
                        goto done;<br>
                }<br>
<br>
-               /* The cache follows the following algorithm<br>
-                *   1. Add the objects to the cache<br>
-                *   2. Anything greater than the cache min value (if it<br>
-                *   crosses the cache flush threshold) is flushed to the ring.<br>
-                */<br>
+               cache_objs = &cache->objs[cache->len];<br>
+<br>
                /* Add elements back into the cache */<br>
                uint32_t copied = 0;<br>
                /* n is multiple of 32 */<br>
@@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)<br>
                }<br>
                cache->len += n;<br>
<br>
-               if (cache->len >= cache->flushthresh) {<br>
-                       rte_mempool_ops_enqueue_bulk<br>
-                               (mp, &cache->objs[cache->size],<br>
-                                cache->len - cache->size);<br>
-                       cache->len = cache->size;<br>
-               }<br>
+               /* Increment stat. */<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
+<br>
                goto done;<br>
        }<br>
<br>
-normal:<br>
        m = rte_pktmbuf_prefree_seg(txep[0].mbuf);<br>
        if (likely(m)) {<br>
                free[0] = m;<br>
diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h<br>
index dffef062e4..3c49b41a6d 100644<br>
--- a/lib/mempool/mempool_trace.h<br>
+++ b/lib/mempool/mempool_trace.h<br>
@@ -112,7 +112,6 @@ RTE_TRACE_POINT(<br>
        rte_trace_point_emit_i32(socket_id);<br>
        rte_trace_point_emit_ptr(cache);<br>
        rte_trace_point_emit_u32(cache->len);<br>
-       rte_trace_point_emit_u32(cache->flushthresh);<br>
 )<br>
<br>
 RTE_TRACE_POINT(<br>
diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c<br>
index d8e39e5c20..40fb13239a 100644<br>
--- a/lib/mempool/rte_mempool.c<br>
+++ b/lib/mempool/rte_mempool.c<br>
@@ -50,11 +50,6 @@ static void<br>
 mempool_event_callback_invoke(enum rte_mempool_event event,<br>
                              struct rte_mempool *mp);<br>
<br>
-/* Note: avoid using floating point since that compiler<br>
- * may not think that is constant.<br>
- */<br>
-#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)<br>
-<br>
 #if defined(RTE_ARCH_X86)<br>
 /*<br>
  * return the greatest common divisor between a and b (fast algorithm)<br>
@@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)<br>
 static void<br>
 mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)<br>
 {<br>
-       /* Check that cache have enough space for flush threshold */<br>
-       RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) ><br>
+       /* Check that cache have enough space for size */<br>
+       RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE ><br>
                         RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /<br>
                         RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs[0]));<br>
<br>
        cache->size = size;<br>
-       cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);<br>
        cache->len = 0;<br>
 }<br>
<br>
@@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,<br>
<br>
        /* asked cache too big */<br>
        if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||<br>
-           CALC_CACHE_FLUSHTHRESH(cache_size) > n) {<br>
+           cache_size > n) {<br>
                rte_errno = EINVAL;<br>
                return NULL;<br>
        }<br>
diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h<br>
index 7bdc92b812..0801cec24a 100644<br>
--- a/lib/mempool/rte_mempool.h<br>
+++ b/lib/mempool/rte_mempool.h<br>
@@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {<br>
  */<br>
 struct __rte_cache_aligned rte_mempool_cache {<br>
        uint32_t size;        /**< Size of the cache */<br>
-       uint32_t flushthresh; /**< Threshold before we flush excess elements */<br>
        uint32_t len;         /**< Current cache count */<br>
 #ifdef RTE_LIBRTE_MEMPOOL_STATS<br>
-       uint32_t unused;<br>
        /*<br>
         * Alternative location for the most frequently updated mempool statistics (per-lcore),<br>
         * providing faster update access when using a mempool cache.<br>
@@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {<br>
         * Cache is allocated to this size to allow it to overflow in certain<br>
         * cases to avoid needless emptying of cache.<br>
         */<br>
-       alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 2];<br>
+       alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];<br>
 };<br>
<br>
 /**<br>
@@ -1362,6 +1360,48 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,<br>
        cache->len = 0;<br>
 }<br>
<br>
+/**<br>
+ * @internal Put several objects back in the mempool; used internally when<br>
+ *   the number of objects exceeds the remaining space in the mempool cache.<br>
+ * @param mp<br>
+ *   A pointer to the mempool structure.<br>
+ * @param obj_table<br>
+ *   A pointer to a table of void * pointers (objects).<br>
+ * @param n<br>
+ *   The number of objects to store back in the mempool, must be strictly<br>
+ *   positive.<br>
+ *   Must be more than the remaining space in the mempool cache, i.e.:<br>
+ *   cache->len + n > cache->size<br>
+ *   Must be less than the size of the mempool cache, i.e.:<br>
+ *   n < cache->size<br>
+ * @param cache<br>
+ *   A pointer to a mempool cache structure. Not NULL.<br>
+ */<br>
+static void<br>
+rte_mempool_do_generic_put_split(struct rte_mempool *mp, void * const *obj_table,<br>
+               unsigned int n, struct rte_mempool_cache * const cache)<br>
+{<br>
+       void **cache_objs;<br>
+       unsigned int len;<br>
+       const uint32_t cache_size = cache->size;<br>
+<br>
+       /* Fill the cache with the first objects. */<br>
+       cache_objs = &cache->objs[cache->len];<br>
+       len = (cache_size - cache->len);<br>
+       cache->len = n - len; /* Moved to here (for performance). */<br>
+       /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * len);<br>
+       obj_table += len;<br>
+       n -= len;<br>
+<br>
+       /* Flush the entire cache to the backend. */<br>
+       cache_objs = &cache->objs[0];<br>
+       rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);<br>
+<br>
+       /* Add the remaining objects to the cache. */<br>
+       /* Moved from here (for performance): cache->len = n; */<br>
+       /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+}<br>
+<br>
 /**<br>
  * @internal Put several objects back in the mempool; used internally.<br>
  * @param mp<br>
@@ -1376,52 +1416,44 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,<br>
  */<br>
 static __rte_always_inline void<br>
 rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,<br>
-                          unsigned int n, struct rte_mempool_cache *cache)<br>
+                          unsigned int n, struct rte_mempool_cache * const cache)<br>
 {<br>
-       void **cache_objs;<br>
-<br>
-       /* No cache provided */<br>
+       /* No cache provided? */<br>
        if (unlikely(cache == NULL))<br>
                goto driver_enqueue;<br>
<br>
-       /* increment stat now, adding in mempool always success */<br>
+       /* Increment stats now, adding in mempool always succeeds. */<br>
        RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);<br>
        RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);<br>
<br>
-       /* The request itself is too big for the cache */<br>
-       if (unlikely(n > cache->flushthresh))<br>
+       /* The request itself is known to be too big for any cache? */<br>
+       if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)<br>
                goto driver_enqueue_stats_incremented;<br>
<br>
-       /*<br>
-        * The cache follows the following algorithm:<br>
-        *   1. If the objects cannot be added to the cache without crossing<br>
-        *      the flush threshold, flush the cache to the backend.<br>
-        *   2. Add the objects to the cache.<br>
-        */<br>
+       /* Enough remaining space in the cache? */<br>
+       if (likely(cache->len + n <= cache->size)) {<br>
+               void **cache_objs;<br>
<br>
-       if (cache->len + n <= cache->flushthresh) {<br>
+               /* Add the objects to the cache. */<br>
                cache_objs = &cache->objs[cache->len];<br>
                cache->len += n;<br>
-       } else {<br>
-               cache_objs = &cache->objs[0];<br>
-               rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);<br>
-               cache->len = n;<br>
-       }<br>
-<br>
-       /* Add the objects to the cache. */<br>
-       rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+               rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);<br>
+       } else if (likely(n < cache->size))<br>
+               rte_mempool_do_generic_put_split(mp, obj_table, n, cache);<br>
+       else<br>
+               goto driver_enqueue_stats_incremented;<br>
<br>
        return;<br>
<br>
 driver_enqueue:<br>
<br>
-       /* increment stat now, adding in mempool always success */<br>
+       /* Increment stats now, adding in mempool always succeeds. */<br>
        RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);<br>
        RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);<br>
<br>
 driver_enqueue_stats_incremented:<br>
<br>
-       /* push objects to the backend */<br>
+       /* Push the objects directly to the backend. */<br>
        rte_mempool_ops_enqueue_bulk(mp, obj_table, n);<br>
 }<br>
<br>
@@ -1490,122 +1522,183 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)<br>
 }<br>
<br>
 /**<br>
- * @internal Get several objects from the mempool; used internally.<br>
+ * @internal Get several objects from the mempool; used internally when<br>
+ *   the number of objects exceeds what is available in the mempool cache.<br>
  * @param mp<br>
  *   A pointer to the mempool structure.<br>
  * @param obj_table<br>
  *   A pointer to a table of void * pointers (objects).<br>
  * @param n<br>
  *   The number of objects to get, must be strictly positive.<br>
+ *   Must be more than available in the mempool cache, i.e.:<br>
+ *   n > cache->len<br>
  * @param cache<br>
- *   A pointer to a mempool cache structure. May be NULL if not needed.<br>
+ *   A pointer to a mempool cache structure. Not NULL.<br>
  * @return<br>
  *   - 0: Success.<br>
  *   - <0: Error; code of driver dequeue function.<br>
  */<br>
-static __rte_always_inline int<br>
-rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,<br>
-                          unsigned int n, struct rte_mempool_cache *cache)<br>
+static int<br>
+rte_mempool_do_generic_get_split(struct rte_mempool *mp, void **obj_table,<br>
+               unsigned int n, struct rte_mempool_cache * const cache)<br>
 {<br>
        int ret;<br>
        unsigned int remaining;<br>
        uint32_t index, len;<br>
        void **cache_objs;<br>
+       const uint32_t cache_size = cache->size;<br>
<br>
-       /* No cache provided */<br>
-       if (unlikely(cache == NULL)) {<br>
-               remaining = n;<br>
-               goto driver_dequeue;<br>
-       }<br>
-<br>
-       /* The cache is a stack, so copy will be in reverse order. */<br>
+       /* Serve the first part of the request from the cache to return hot objects first. */<br>
        cache_objs = &cache->objs[cache->len];<br>
+       len = cache->len;<br>
+       remaining = n - len;<br>
+       for (index = 0; index < len; index++)<br>
+               *obj_table++ = *--cache_objs;<br>
<br>
-       if (__rte_constant(n) && n <= cache->len) {<br>
+       /* At this point, the cache is empty. */<br>
+<br>
+       /* More than can be served from a full cache? */<br>
+       if (unlikely(remaining >= cache_size)) {<br>
                /*<br>
-                * The request size is known at build time, and<br>
-                * the entire request can be satisfied from the cache,<br>
-                * so let the compiler unroll the fixed length copy loop.<br>
+                * Serve the following part of the request directly from the backend<br>
+                * in multipla of the cache size.<br>
                 */<br>
-               cache->len -= n;<br>
-               for (index = 0; index < n; index++)<br>
-                       *obj_table++ = *--cache_objs;<br>
+               len = remaining - remaining % cache_size;<br>
+               ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);<br>
+               if (unlikely(ret < 0)) {<br>
+                       /*<br>
+                        * No further action is required to roll back the request,<br>
+                        * as objects in the cache are intact, and no objects have<br>
+                        * been dequeued from the backend.<br>
+                        */<br>
<br>
-               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
-               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+                       RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
+                       RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
<br>
-               return 0;<br>
-       }<br>
+                       return ret;<br>
+               }<br>
<br>
-       /*<br>
-        * Use the cache as much as we have to return hot objects first.<br>
-        * If the request size 'n' is known at build time, the above comparison<br>
-        * ensures that n > cache->len here, so omit RTE_MIN().<br>
-        */<br>
-       len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);<br>
-       cache->len -= len;<br>
-       remaining = n - len;<br>
-       for (index = 0; index < len; index++)<br>
-               *obj_table++ = *--cache_objs;<br>
+               remaining -= len;<br>
+               obj_table += len;<br>
<br>
-       /*<br>
-        * If the request size 'n' is known at build time, the case<br>
-        * where the entire request can be satisfied from the cache<br>
-        * has already been handled above, so omit handling it here.<br>
-        */<br>
-       if (!__rte_constant(n) && remaining == 0) {<br>
-               /* The entire request is satisfied from the cache. */<br>
+               if (unlikely(remaining == 0)) {<br>
+                       cache->len = 0;<br>
<br>
-               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
-               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+                       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+                       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
<br>
-               return 0;<br>
+                       return 0;<br>
+               }<br>
        }<br>
<br>
-       /* if dequeue below would overflow mem allocated for cache */<br>
-       if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))<br>
-               goto driver_dequeue;<br>
-<br>
-       /* Fill the cache from the backend; fetch size + remaining objects. */<br>
-       ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,<br>
-                       cache->size + remaining);<br>
+       /* Fill the entire cache from the backend. */<br>
+       ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);<br>
        if (unlikely(ret < 0)) {<br>
                /*<br>
-                * We are buffer constrained, and not able to allocate<br>
-                * cache + remaining.<br>
-                * Do not fill the cache, just satisfy the remaining part of<br>
-                * the request directly from the backend.<br>
+                * Unable to fill the cache.<br>
+                * Last resort: Try only the remaining part of the request,<br>
+                * served directly from the backend.<br>
                 */<br>
-               goto driver_dequeue;<br>
+               ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);<br>
+               if (unlikely(ret == 0)) {<br>
+                       cache->len = 0;<br>
+<br>
+                       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+                       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+<br>
+                       return 0;<br>
+               }<br>
+<br>
+               /* Roll back. */<br>
+               if (cache->len + remaining == n) {<br>
+                       /*<br>
+                        * No further action is required to roll back the request,<br>
+                        * as objects in the cache are intact, and no objects have<br>
+                        * been dequeued from the backend.<br>
+                        */<br>
+               } else {<br>
+                       /* Update the state of the cache before putting back the objects. */<br>
+                       cache->len = 0;<br>
+<br>
+                       len = n - remaining;<br>
+                       obj_table -= len;<br>
+                       rte_mempool_do_generic_put(mp, obj_table, len, cache);<br>
+               }<br>
+<br>
+               RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
+               RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
+<br>
+               return ret;<br>
        }<br>
<br>
-       /* Satisfy the remaining part of the request from the filled cache. */<br>
-       cache_objs = &cache->objs[cache->size + remaining];<br>
+       /* Serve the remaining part of the request from the filled cache. */<br>
+       cache_objs = &cache->objs[cache_size];<br>
        for (index = 0; index < remaining; index++)<br>
                *obj_table++ = *--cache_objs;<br>
<br>
-       cache->len = cache->size;<br>
+       cache->len = cache_size - remaining;<br>
<br>
        RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
        RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
<br>
        return 0;<br>
+}<br>
<br>
-driver_dequeue:<br>
+/**<br>
+ * @internal Get several objects from the mempool; used internally.<br>
+ * @param mp<br>
+ *   A pointer to the mempool structure.<br>
+ * @param obj_table<br>
+ *   A pointer to a table of void * pointers (objects).<br>
+ * @param n<br>
+ *   The number of objects to get, must be strictly positive.<br>
+ * @param cache<br>
+ *   A pointer to a mempool cache structure. May be NULL if not needed.<br>
+ * @return<br>
+ *   - 0: Success.<br>
+ *   - <0: Error; code of driver dequeue function.<br>
+ */<br>
+static __rte_always_inline int<br>
+rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,<br>
+                          unsigned int n, struct rte_mempool_cache * const cache)<br>
+{<br>
+       int ret;<br>
<br>
-       /* Get remaining objects directly from the backend. */<br>
-       ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);<br>
+       /* No cache provided? */<br>
+       if (unlikely(cache == NULL))<br>
+               goto driver_dequeue;<br>
<br>
-       if (ret < 0) {<br>
-               if (likely(cache != NULL)) {<br>
-                       cache->len = n - remaining;<br>
-                       /*<br>
-                        * No further action is required to roll the first part<br>
-                        * of the request back into the cache, as objects in<br>
-                        * the cache are intact.<br>
-                        */<br>
-               }<br>
+       /* The request itself is known to be too big for any cache? */<br>
+       if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)<br>
+               goto driver_dequeue;<br>
+<br>
+       /* The request can be served entirely from the cache? */<br>
+       if (likely(n <= cache->len)) {<br>
+               unsigned int index;<br>
+               void **cache_objs;<br>
<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);<br>
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);<br>
+<br>
+               /*<br>
+                * The cache is a stack, so copy will be in reverse order.<br>
+                * If the request size is known at build time,<br>
+                * the compiler will unroll the fixed length copy loop.<br>
+                */<br>
+               cache_objs = &cache->objs[cache->len];<br>
+               cache->len -= n;<br>
+               for (index = 0; index < n; index++)<br>
+                       *obj_table++ = *--cache_objs;<br>
+<br>
+               return 0;<br>
+       } else<br>
+               return rte_mempool_do_generic_get_split(mp, obj_table, n, cache);<br>
+<br>
+driver_dequeue:<br>
+<br>
+       /* Get the objects directly from the backend. */<br>
+       ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);<br>
+       if (unlikely(ret < 0)) {<br>
                RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);<br>
                RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);<br>
        } else {<br>
-- <br>
2.43.0<br>
<br>
</blockquote></div>