[RFC PATCH v7] mempool: fix mempool cache size
    Patrick Robb 
    probb at iol.unh.edu
       
    Tue Sep 24 22:44:29 CEST 2024
    
    
  
Recheck-request: iol-intel-Performance
On Tue, Sep 24, 2024 at 2:12 PM Morten Brørup <mb at smartsharesystems.com>
wrote:
> This patch refactors the mempool cache to fix two bugs:
> 1. When a mempool is created with a cache size of N objects, the cache was
> actually created with a size of 1.5 * N objects.
> 2. The mempool cache field names did not reflect their purpose;
> the "flushthresh" field held the size, and the "size" field held the
> number of objects remaining in the cache when returning from a get
> operation refilling it from the backend.
>
> Especially the first item could be fatal:
> When more objects than a mempool's configured cache size is held in the
> mempool's caches associated with other lcores, a rightsized mempool may
> unexpectedly run out of objects, causing the application to fail.
>
> Furthermore, this patch introduces two optimizations:
> 1. The mempool caches are flushed to/filled from the backend in their
> entirety, so backend accesses are CPU cache line aligned. (Assuming the
> mempool cache size is a multiplum of a CPU cache line size divided by the
> size of a pointer.)
> 2. The unlikely paths in the get and put functions, where the cache is
> flushed to/filled from the backend, are moved from the inline functions to
> separate helper functions, thereby reducing the code size of the inline
> functions.
> Note: Accessing the backend for cacheless mempools remains inline.
>
> Various drivers accessing the mempool directly have been updated
> accordingly.
> These drivers did not update mempool statistics when accessing the mempool
> directly, so that is fixed too.
>
> Note: Performance not yet benchmarked.
>
> Signed-off-by: Morten Brørup <mb at smartsharesystems.com>
> ---
> v7:
> * Increased max mempool cache size from 512 to 1024 objects.
>   Mainly for CI performance test purposes.
>   Originally, the max mempool cache size was 768 objects, and used a fixed
>   size array of 1024 objects in the mempool cache structure.
> v6:
> * Fix v5 incomplete implementation of passing large requests directly to
>   the backend.
> * Use memcpy instead of rte_memcpy where compiler complains about it.
> * Added const to some function parameters.
> v5:
> * Moved helper functions back into the header file, for improved
>   performance.
> * Pass large requests directly to the backend. This also simplifies the
>   code.
> v4:
> * Updated subject to reflect that misleading names are considered bugs.
> * Rewrote patch description to provide more details about the bugs fixed.
>   (Mattias Rönnblom)
> * Moved helper functions, not to be inlined, to mempool C file.
>   (Mattias Rönnblom)
> * Pass requests for n >= RTE_MEMPOOL_CACHE_MAX_SIZE objects known at build
>   time directly to backend driver, to avoid calling the helper functions.
>   This also fixes the compiler warnings about out of bounds array access.
> v3:
> * Removed __attribute__(assume).
> v2:
> * Removed mempool perf test; not part of patch set.
> ---
>  config/rte_config.h                           |   2 +-
>  drivers/common/idpf/idpf_common_rxtx_avx512.c |  54 +---
>  drivers/mempool/dpaa/dpaa_mempool.c           |  16 +-
>  drivers/mempool/dpaa2/dpaa2_hw_mempool.c      |  14 -
>  drivers/net/i40e/i40e_rxtx_vec_avx512.c       |  17 +-
>  drivers/net/iavf/iavf_rxtx_vec_avx512.c       |  27 +-
>  drivers/net/ice/ice_rxtx_vec_avx512.c         |  27 +-
>  lib/mempool/mempool_trace.h                   |   1 -
>  lib/mempool/rte_mempool.c                     |  12 +-
>  lib/mempool/rte_mempool.h                     | 287 ++++++++++++------
>  10 files changed, 232 insertions(+), 225 deletions(-)
>
> diff --git a/config/rte_config.h b/config/rte_config.h
> index dd7bb0d35b..2488ff167d 100644
> --- a/config/rte_config.h
> +++ b/config/rte_config.h
> @@ -56,7 +56,7 @@
>  #define RTE_CONTIGMEM_DEFAULT_BUF_SIZE (512*1024*1024)
>
>  /* mempool defines */
> -#define RTE_MEMPOOL_CACHE_MAX_SIZE 512
> +#define RTE_MEMPOOL_CACHE_MAX_SIZE 1024
>  /* RTE_LIBRTE_MEMPOOL_STATS is not set */
>  /* RTE_LIBRTE_MEMPOOL_DEBUG is not set */
>
> diff --git a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> index 3b5e124ec8..98535a48f3 100644
> --- a/drivers/common/idpf/idpf_common_rxtx_avx512.c
> +++ b/drivers/common/idpf/idpf_common_rxtx_avx512.c
> @@ -1024,21 +1024,13 @@ idpf_tx_singleq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>
> rte_lcore_id());
>                 void **cache_objs;
>
> -               if (cache == NULL || cache->len == 0)
> -                       goto normal;
> -
> -               cache_objs = &cache->objs[cache->len];
> -
> -               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +               if (!cache || unlikely(n + cache->len > cache->size)) {
> +                       rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
>                         goto done;
>                 }
>
> -               /* The cache follows the following algorithm
> -                *   1. Add the objects to the cache
> -                *   2. Anything greater than the cache min value (if it
> crosses the
> -                *   cache flush threshold) is flushed to the ring.
> -                */
> +               cache_objs = &cache->objs[cache->len];
> +
>                 /* Add elements back into the cache */
>                 uint32_t copied = 0;
>                 /* n is multiple of 32 */
> @@ -1056,16 +1048,13 @@ idpf_tx_singleq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>                 }
>                 cache->len += n;
>
> -               if (cache->len >= cache->flushthresh) {
> -                       rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> -                                                    cache->len -
> cache->size);
> -                       cache->len = cache->size;
> -               }
> +               /* Increment stat. */
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>                 goto done;
>         }
>
> -normal:
>         m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>         if (likely(m != NULL)) {
>                 free[0] = m;
> @@ -1335,21 +1324,13 @@ idpf_tx_splitq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>
> rte_lcore_id());
>                 void **cache_objs;
>
> -               if (!cache || cache->len == 0)
> -                       goto normal;
> -
> -               cache_objs = &cache->objs[cache->len];
> -
> -               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +               if (!cache || unlikely(n + cache->len > cache->size)) {
> +                       rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
>                         goto done;
>                 }
>
> -               /* The cache follows the following algorithm
> -                *   1. Add the objects to the cache
> -                *   2. Anything greater than the cache min value (if it
> crosses the
> -                *   cache flush threshold) is flushed to the ring.
> -                */
> +               cache_objs = &cache->objs[cache->len];
> +
>                 /* Add elements back into the cache */
>                 uint32_t copied = 0;
>                 /* n is multiple of 32 */
> @@ -1367,16 +1348,13 @@ idpf_tx_splitq_free_bufs_avx512(struct
> idpf_tx_queue *txq)
>                 }
>                 cache->len += n;
>
> -               if (cache->len >= cache->flushthresh) {
> -                       rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> -                                                    cache->len -
> cache->size);
> -                       cache->len = cache->size;
> -               }
> +               /* Increment stat. */
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>                 goto done;
>         }
>
> -normal:
>         m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>         if (likely(m)) {
>                 free[0] = m;
> diff --git a/drivers/mempool/dpaa/dpaa_mempool.c
> b/drivers/mempool/dpaa/dpaa_mempool.c
> index 74bfcab509..3a936826c8 100644
> --- a/drivers/mempool/dpaa/dpaa_mempool.c
> +++ b/drivers/mempool/dpaa/dpaa_mempool.c
> @@ -51,8 +51,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
>         struct bman_pool_params params = {
>                 .flags = BMAN_POOL_FLAG_DYNAMIC_BPID
>         };
> -       unsigned int lcore_id;
> -       struct rte_mempool_cache *cache;
>
>         MEMPOOL_INIT_FUNC_TRACE();
>
> @@ -120,18 +118,6 @@ dpaa_mbuf_create_pool(struct rte_mempool *mp)
>         rte_memcpy(bp_info, (void *)&rte_dpaa_bpid_info[bpid],
>                    sizeof(struct dpaa_bp_info));
>         mp->pool_data = (void *)bp_info;
> -       /* Update per core mempool cache threshold to optimal value which
> is
> -        * number of buffers that can be released to HW buffer pool in
> -        * a single API call.
> -        */
> -       for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -               cache = &mp->local_cache[lcore_id];
> -               DPAA_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d -> %d",
> -                       lcore_id, cache->flushthresh,
> -                       (uint32_t)(cache->size + DPAA_MBUF_MAX_ACQ_REL));
> -               if (cache->flushthresh)
> -                       cache->flushthresh = cache->size +
> DPAA_MBUF_MAX_ACQ_REL;
> -       }
>
>         DPAA_MEMPOOL_INFO("BMAN pool created for bpid =%d", bpid);
>         return 0;
> @@ -234,7 +220,7 @@ dpaa_mbuf_alloc_bulk(struct rte_mempool *pool,
>         DPAA_MEMPOOL_DPDEBUG("Request to alloc %d buffers in bpid = %d",
>                              count, bp_info->bpid);
>
> -       if (unlikely(count >= (RTE_MEMPOOL_CACHE_MAX_SIZE * 2))) {
> +       if (unlikely(count >= RTE_MEMPOOL_CACHE_MAX_SIZE)) {
>                 DPAA_MEMPOOL_ERR("Unable to allocate requested (%u)
> buffers",
>                                  count);
>                 return -1;
> diff --git a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> index 42e17d984c..a44f3cf616 100644
> --- a/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> +++ b/drivers/mempool/dpaa2/dpaa2_hw_mempool.c
> @@ -44,8 +44,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
>         struct dpaa2_bp_info *bp_info;
>         struct dpbp_attr dpbp_attr;
>         uint32_t bpid;
> -       unsigned int lcore_id;
> -       struct rte_mempool_cache *cache;
>         int ret;
>
>         avail_dpbp = dpaa2_alloc_dpbp_dev();
> @@ -134,18 +132,6 @@ rte_hw_mbuf_create_pool(struct rte_mempool *mp)
>         DPAA2_MEMPOOL_DEBUG("BP List created for bpid =%d",
> dpbp_attr.bpid);
>
>         h_bp_list = bp_list;
> -       /* Update per core mempool cache threshold to optimal value which
> is
> -        * number of buffers that can be released to HW buffer pool in
> -        * a single API call.
> -        */
> -       for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -               cache = &mp->local_cache[lcore_id];
> -               DPAA2_MEMPOOL_DEBUG("lCore %d: cache->flushthresh %d ->
> %d",
> -                       lcore_id, cache->flushthresh,
> -                       (uint32_t)(cache->size + DPAA2_MBUF_MAX_ACQ_REL));
> -               if (cache->flushthresh)
> -                       cache->flushthresh = cache->size +
> DPAA2_MBUF_MAX_ACQ_REL;
> -       }
>
>         return 0;
>  err3:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 0238b03f8a..712ab1726f 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -783,18 +783,13 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
>                 struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
>                                 rte_lcore_id());
>
> -               if (!cache || n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> +               if (!cache || unlikely(n + cache->len > cache->size)) {
>                         rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
>                         goto done;
>                 }
>
>                 cache_objs = &cache->objs[cache->len];
>
> -               /* The cache follows the following algorithm
> -                *   1. Add the objects to the cache
> -                *   2. Anything greater than the cache min value (if it
> -                *   crosses the cache flush threshold) is flushed to the
> ring.
> -                */
>                 /* Add elements back into the cache */
>                 uint32_t copied = 0;
>                 /* n is multiple of 32 */
> @@ -812,12 +807,10 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue *txq)
>                 }
>                 cache->len += n;
>
> -               if (cache->len >= cache->flushthresh) {
> -                       rte_mempool_ops_enqueue_bulk
> -                               (mp, &cache->objs[cache->size],
> -                               cache->len - cache->size);
> -                       cache->len = cache->size;
> -               }
> +               /* Increment stat. */
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>                 goto done;
>         }
>
> diff --git a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> index 3bb6f305df..307bb8556a 100644
> --- a/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> +++ b/drivers/net/iavf/iavf_rxtx_vec_avx512.c
> @@ -1873,21 +1873,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
>
> rte_lcore_id());
>                 void **cache_objs;
>
> -               if (!cache || cache->len == 0)
> -                       goto normal;
> -
> -               cache_objs = &cache->objs[cache->len];
> -
> -               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +               if (!cache || unlikely(n + cache->len > cache->size)) {
> +                       rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
>                         goto done;
>                 }
>
> -               /* The cache follows the following algorithm
> -                *   1. Add the objects to the cache
> -                *   2. Anything greater than the cache min value (if it
> crosses the
> -                *   cache flush threshold) is flushed to the ring.
> -                */
> +               cache_objs = &cache->objs[cache->len];
> +
>                 /* Add elements back into the cache */
>                 uint32_t copied = 0;
>                 /* n is multiple of 32 */
> @@ -1905,16 +1897,13 @@ iavf_tx_free_bufs_avx512(struct iavf_tx_queue *txq)
>                 }
>                 cache->len += n;
>
> -               if (cache->len >= cache->flushthresh) {
> -                       rte_mempool_ops_enqueue_bulk(mp,
> -
> &cache->objs[cache->size],
> -                                                    cache->len -
> cache->size);
> -                       cache->len = cache->size;
> -               }
> +               /* Increment stat. */
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>                 goto done;
>         }
>
> -normal:
>         m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>         if (likely(m)) {
>                 free[0] = m;
> diff --git a/drivers/net/ice/ice_rxtx_vec_avx512.c
> b/drivers/net/ice/ice_rxtx_vec_avx512.c
> index 04148e8ea2..4ea1db734e 100644
> --- a/drivers/net/ice/ice_rxtx_vec_avx512.c
> +++ b/drivers/net/ice/ice_rxtx_vec_avx512.c
> @@ -888,21 +888,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
>                 struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
>                                 rte_lcore_id());
>
> -               if (!cache || cache->len == 0)
> -                       goto normal;
> -
> -               cache_objs = &cache->objs[cache->len];
> -
> -               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> -                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);
> +               if (!cache || unlikely(n + cache->len > cache->size)) {
> +                       rte_mempool_generic_put(mp, (void *)txep, n,
> cache);
>                         goto done;
>                 }
>
> -               /* The cache follows the following algorithm
> -                *   1. Add the objects to the cache
> -                *   2. Anything greater than the cache min value (if it
> -                *   crosses the cache flush threshold) is flushed to the
> ring.
> -                */
> +               cache_objs = &cache->objs[cache->len];
> +
>                 /* Add elements back into the cache */
>                 uint32_t copied = 0;
>                 /* n is multiple of 32 */
> @@ -920,16 +912,13 @@ ice_tx_free_bufs_avx512(struct ice_tx_queue *txq)
>                 }
>                 cache->len += n;
>
> -               if (cache->len >= cache->flushthresh) {
> -                       rte_mempool_ops_enqueue_bulk
> -                               (mp, &cache->objs[cache->size],
> -                                cache->len - cache->size);
> -                       cache->len = cache->size;
> -               }
> +               /* Increment stat. */
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
> +
>                 goto done;
>         }
>
> -normal:
>         m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
>         if (likely(m)) {
>                 free[0] = m;
> diff --git a/lib/mempool/mempool_trace.h b/lib/mempool/mempool_trace.h
> index dffef062e4..3c49b41a6d 100644
> --- a/lib/mempool/mempool_trace.h
> +++ b/lib/mempool/mempool_trace.h
> @@ -112,7 +112,6 @@ RTE_TRACE_POINT(
>         rte_trace_point_emit_i32(socket_id);
>         rte_trace_point_emit_ptr(cache);
>         rte_trace_point_emit_u32(cache->len);
> -       rte_trace_point_emit_u32(cache->flushthresh);
>  )
>
>  RTE_TRACE_POINT(
> diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
> index d8e39e5c20..40fb13239a 100644
> --- a/lib/mempool/rte_mempool.c
> +++ b/lib/mempool/rte_mempool.c
> @@ -50,11 +50,6 @@ static void
>  mempool_event_callback_invoke(enum rte_mempool_event event,
>                               struct rte_mempool *mp);
>
> -/* Note: avoid using floating point since that compiler
> - * may not think that is constant.
> - */
> -#define CALC_CACHE_FLUSHTHRESH(c) (((c) * 3) / 2)
> -
>  #if defined(RTE_ARCH_X86)
>  /*
>   * return the greatest common divisor between a and b (fast algorithm)
> @@ -746,13 +741,12 @@ rte_mempool_free(struct rte_mempool *mp)
>  static void
>  mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
>  {
> -       /* Check that cache have enough space for flush threshold */
> -
>  RTE_BUILD_BUG_ON(CALC_CACHE_FLUSHTHRESH(RTE_MEMPOOL_CACHE_MAX_SIZE) >
> +       /* Check that cache have enough space for size */
> +       RTE_BUILD_BUG_ON(RTE_MEMPOOL_CACHE_MAX_SIZE >
>                          RTE_SIZEOF_FIELD(struct rte_mempool_cache, objs) /
>                          RTE_SIZEOF_FIELD(struct rte_mempool_cache,
> objs[0]));
>
>         cache->size = size;
> -       cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
>         cache->len = 0;
>  }
>
> @@ -836,7 +830,7 @@ rte_mempool_create_empty(const char *name, unsigned n,
> unsigned elt_size,
>
>         /* asked cache too big */
>         if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> -           CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +           cache_size > n) {
>                 rte_errno = EINVAL;
>                 return NULL;
>         }
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 7bdc92b812..0801cec24a 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -89,10 +89,8 @@ struct __rte_cache_aligned rte_mempool_debug_stats {
>   */
>  struct __rte_cache_aligned rte_mempool_cache {
>         uint32_t size;        /**< Size of the cache */
> -       uint32_t flushthresh; /**< Threshold before we flush excess
> elements */
>         uint32_t len;         /**< Current cache count */
>  #ifdef RTE_LIBRTE_MEMPOOL_STATS
> -       uint32_t unused;
>         /*
>          * Alternative location for the most frequently updated mempool
> statistics (per-lcore),
>          * providing faster update access when using a mempool cache.
> @@ -110,7 +108,7 @@ struct __rte_cache_aligned rte_mempool_cache {
>          * Cache is allocated to this size to allow it to overflow in
> certain
>          * cases to avoid needless emptying of cache.
>          */
> -       alignas(RTE_CACHE_LINE_SIZE) void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE
> * 2];
> +       alignas(RTE_CACHE_LINE_SIZE) void
> *objs[RTE_MEMPOOL_CACHE_MAX_SIZE];
>  };
>
>  /**
> @@ -1362,6 +1360,48 @@ rte_mempool_cache_flush(struct rte_mempool_cache
> *cache,
>         cache->len = 0;
>  }
>
> +/**
> + * @internal Put several objects back in the mempool; used internally when
> + *   the number of objects exceeds the remaining space in the mempool
> cache.
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to store back in the mempool, must be strictly
> + *   positive.
> + *   Must be more than the remaining space in the mempool cache, i.e.:
> + *   cache->len + n > cache->size
> + *   Must be less than the size of the mempool cache, i.e.:
> + *   n < cache->size
> + * @param cache
> + *   A pointer to a mempool cache structure. Not NULL.
> + */
> +static void
> +rte_mempool_do_generic_put_split(struct rte_mempool *mp, void * const
> *obj_table,
> +               unsigned int n, struct rte_mempool_cache * const cache)
> +{
> +       void **cache_objs;
> +       unsigned int len;
> +       const uint32_t cache_size = cache->size;
> +
> +       /* Fill the cache with the first objects. */
> +       cache_objs = &cache->objs[cache->len];
> +       len = (cache_size - cache->len);
> +       cache->len = n - len; /* Moved to here (for performance). */
> +       /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * len);
> +       obj_table += len;
> +       n -= len;
> +
> +       /* Flush the entire cache to the backend. */
> +       cache_objs = &cache->objs[0];
> +       rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache_size);
> +
> +       /* Add the remaining objects to the cache. */
> +       /* Moved from here (for performance): cache->len = n; */
> +       /* rte_ */ memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +}
> +
>  /**
>   * @internal Put several objects back in the mempool; used internally.
>   * @param mp
> @@ -1376,52 +1416,44 @@ rte_mempool_cache_flush(struct rte_mempool_cache
> *cache,
>   */
>  static __rte_always_inline void
>  rte_mempool_do_generic_put(struct rte_mempool *mp, void * const
> *obj_table,
> -                          unsigned int n, struct rte_mempool_cache *cache)
> +                          unsigned int n, struct rte_mempool_cache *
> const cache)
>  {
> -       void **cache_objs;
> -
> -       /* No cache provided */
> +       /* No cache provided? */
>         if (unlikely(cache == NULL))
>                 goto driver_enqueue;
>
> -       /* increment stat now, adding in mempool always success */
> +       /* Increment stats now, adding in mempool always succeeds. */
>         RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
>         RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
>
> -       /* The request itself is too big for the cache */
> -       if (unlikely(n > cache->flushthresh))
> +       /* The request itself is known to be too big for any cache? */
> +       if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)
>                 goto driver_enqueue_stats_incremented;
>
> -       /*
> -        * The cache follows the following algorithm:
> -        *   1. If the objects cannot be added to the cache without
> crossing
> -        *      the flush threshold, flush the cache to the backend.
> -        *   2. Add the objects to the cache.
> -        */
> +       /* Enough remaining space in the cache? */
> +       if (likely(cache->len + n <= cache->size)) {
> +               void **cache_objs;
>
> -       if (cache->len + n <= cache->flushthresh) {
> +               /* Add the objects to the cache. */
>                 cache_objs = &cache->objs[cache->len];
>                 cache->len += n;
> -       } else {
> -               cache_objs = &cache->objs[0];
> -               rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> -               cache->len = n;
> -       }
> -
> -       /* Add the objects to the cache. */
> -       rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +               rte_memcpy(cache_objs, obj_table, sizeof(void *) * n);
> +       } else if (likely(n < cache->size))
> +               rte_mempool_do_generic_put_split(mp, obj_table, n, cache);
> +       else
> +               goto driver_enqueue_stats_incremented;
>
>         return;
>
>  driver_enqueue:
>
> -       /* increment stat now, adding in mempool always success */
> +       /* Increment stats now, adding in mempool always succeeds. */
>         RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
>         RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
>
>  driver_enqueue_stats_incremented:
>
> -       /* push objects to the backend */
> +       /* Push the objects directly to the backend. */
>         rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
>  }
>
> @@ -1490,122 +1522,183 @@ rte_mempool_put(struct rte_mempool *mp, void
> *obj)
>  }
>
>  /**
> - * @internal Get several objects from the mempool; used internally.
> + * @internal Get several objects from the mempool; used internally when
> + *   the number of objects exceeds what is available in the mempool cache.
>   * @param mp
>   *   A pointer to the mempool structure.
>   * @param obj_table
>   *   A pointer to a table of void * pointers (objects).
>   * @param n
>   *   The number of objects to get, must be strictly positive.
> + *   Must be more than available in the mempool cache, i.e.:
> + *   n > cache->len
>   * @param cache
> - *   A pointer to a mempool cache structure. May be NULL if not needed.
> + *   A pointer to a mempool cache structure. Not NULL.
>   * @return
>   *   - 0: Success.
>   *   - <0: Error; code of driver dequeue function.
>   */
> -static __rte_always_inline int
> -rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> -                          unsigned int n, struct rte_mempool_cache *cache)
> +static int
> +rte_mempool_do_generic_get_split(struct rte_mempool *mp, void **obj_table,
> +               unsigned int n, struct rte_mempool_cache * const cache)
>  {
>         int ret;
>         unsigned int remaining;
>         uint32_t index, len;
>         void **cache_objs;
> +       const uint32_t cache_size = cache->size;
>
> -       /* No cache provided */
> -       if (unlikely(cache == NULL)) {
> -               remaining = n;
> -               goto driver_dequeue;
> -       }
> -
> -       /* The cache is a stack, so copy will be in reverse order. */
> +       /* Serve the first part of the request from the cache to return
> hot objects first. */
>         cache_objs = &cache->objs[cache->len];
> +       len = cache->len;
> +       remaining = n - len;
> +       for (index = 0; index < len; index++)
> +               *obj_table++ = *--cache_objs;
>
> -       if (__rte_constant(n) && n <= cache->len) {
> +       /* At this point, the cache is empty. */
> +
> +       /* More than can be served from a full cache? */
> +       if (unlikely(remaining >= cache_size)) {
>                 /*
> -                * The request size is known at build time, and
> -                * the entire request can be satisfied from the cache,
> -                * so let the compiler unroll the fixed length copy loop.
> +                * Serve the following part of the request directly from
> the backend
> +                * in multipla of the cache size.
>                  */
> -               cache->len -= n;
> -               for (index = 0; index < n; index++)
> -                       *obj_table++ = *--cache_objs;
> +               len = remaining - remaining % cache_size;
> +               ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, len);
> +               if (unlikely(ret < 0)) {
> +                       /*
> +                        * No further action is required to roll back the
> request,
> +                        * as objects in the cache are intact, and no
> objects have
> +                        * been dequeued from the backend.
> +                        */
>
> -               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> -               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +                       RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> +                       RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>
> -               return 0;
> -       }
> +                       return ret;
> +               }
>
> -       /*
> -        * Use the cache as much as we have to return hot objects first.
> -        * If the request size 'n' is known at build time, the above
> comparison
> -        * ensures that n > cache->len here, so omit RTE_MIN().
> -        */
> -       len = __rte_constant(n) ? cache->len : RTE_MIN(n, cache->len);
> -       cache->len -= len;
> -       remaining = n - len;
> -       for (index = 0; index < len; index++)
> -               *obj_table++ = *--cache_objs;
> +               remaining -= len;
> +               obj_table += len;
>
> -       /*
> -        * If the request size 'n' is known at build time, the case
> -        * where the entire request can be satisfied from the cache
> -        * has already been handled above, so omit handling it here.
> -        */
> -       if (!__rte_constant(n) && remaining == 0) {
> -               /* The entire request is satisfied from the cache. */
> +               if (unlikely(remaining == 0)) {
> +                       cache->len = 0;
>
> -               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> -               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +                       RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_bulk, 1);
> +                       RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_objs, n);
>
> -               return 0;
> +                       return 0;
> +               }
>         }
>
> -       /* if dequeue below would overflow mem allocated for cache */
> -       if (unlikely(remaining > RTE_MEMPOOL_CACHE_MAX_SIZE))
> -               goto driver_dequeue;
> -
> -       /* Fill the cache from the backend; fetch size + remaining
> objects. */
> -       ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs,
> -                       cache->size + remaining);
> +       /* Fill the entire cache from the backend. */
> +       ret = rte_mempool_ops_dequeue_bulk(mp, cache->objs, cache_size);
>         if (unlikely(ret < 0)) {
>                 /*
> -                * We are buffer constrained, and not able to allocate
> -                * cache + remaining.
> -                * Do not fill the cache, just satisfy the remaining part
> of
> -                * the request directly from the backend.
> +                * Unable to fill the cache.
> +                * Last resort: Try only the remaining part of the request,
> +                * served directly from the backend.
>                  */
> -               goto driver_dequeue;
> +               ret = rte_mempool_ops_dequeue_bulk(mp, obj_table,
> remaining);
> +               if (unlikely(ret == 0)) {
> +                       cache->len = 0;
> +
> +                       RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_bulk, 1);
> +                       RTE_MEMPOOL_CACHE_STAT_ADD(cache,
> get_success_objs, n);
> +
> +                       return 0;
> +               }
> +
> +               /* Roll back. */
> +               if (cache->len + remaining == n) {
> +                       /*
> +                        * No further action is required to roll back the
> request,
> +                        * as objects in the cache are intact, and no
> objects have
> +                        * been dequeued from the backend.
> +                        */
> +               } else {
> +                       /* Update the state of the cache before putting
> back the objects. */
> +                       cache->len = 0;
> +
> +                       len = n - remaining;
> +                       obj_table -= len;
> +                       rte_mempool_do_generic_put(mp, obj_table, len,
> cache);
> +               }
> +
> +               RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
> +               RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
> +
> +               return ret;
>         }
>
> -       /* Satisfy the remaining part of the request from the filled
> cache. */
> -       cache_objs = &cache->objs[cache->size + remaining];
> +       /* Serve the remaining part of the request from the filled cache.
> */
> +       cache_objs = &cache->objs[cache_size];
>         for (index = 0; index < remaining; index++)
>                 *obj_table++ = *--cache_objs;
>
> -       cache->len = cache->size;
> +       cache->len = cache_size - remaining;
>
>         RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
>         RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
>
>         return 0;
> +}
>
> -driver_dequeue:
> +/**
> + * @internal Get several objects from the mempool; used internally.
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to get, must be strictly positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
> + * @return
> + *   - 0: Success.
> + *   - <0: Error; code of driver dequeue function.
> + */
> +static __rte_always_inline int
> +rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
> +                          unsigned int n, struct rte_mempool_cache *
> const cache)
> +{
> +       int ret;
>
> -       /* Get remaining objects directly from the backend. */
> -       ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, remaining);
> +       /* No cache provided? */
> +       if (unlikely(cache == NULL))
> +               goto driver_dequeue;
>
> -       if (ret < 0) {
> -               if (likely(cache != NULL)) {
> -                       cache->len = n - remaining;
> -                       /*
> -                        * No further action is required to roll the first
> part
> -                        * of the request back into the cache, as objects
> in
> -                        * the cache are intact.
> -                        */
> -               }
> +       /* The request itself is known to be too big for any cache? */
> +       if (__rte_constant(n) && n >= RTE_MEMPOOL_CACHE_MAX_SIZE)
> +               goto driver_dequeue;
> +
> +       /* The request can be served entirely from the cache? */
> +       if (likely(n <= cache->len)) {
> +               unsigned int index;
> +               void **cache_objs;
>
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
> +               RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
> +
> +               /*
> +                * The cache is a stack, so copy will be in reverse order.
> +                * If the request size is known at build time,
> +                * the compiler will unroll the fixed length copy loop.
> +                */
> +               cache_objs = &cache->objs[cache->len];
> +               cache->len -= n;
> +               for (index = 0; index < n; index++)
> +                       *obj_table++ = *--cache_objs;
> +
> +               return 0;
> +       } else
> +               return rte_mempool_do_generic_get_split(mp, obj_table, n,
> cache);
> +
> +driver_dequeue:
> +
> +       /* Get the objects directly from the backend. */
> +       ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
> +       if (unlikely(ret < 0)) {
>                 RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
>                 RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
>         } else {
> --
> 2.43.0
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mails.dpdk.org/archives/dev/attachments/20240924/e80a4343/attachment-0001.htm>
    
    
More information about the dev
mailing list