[RFC]: mempool: zero-copy cache get bulk
Morten Brørup
mb at smartsharesystems.com
Sat Nov 5 14:19:13 CET 2022
Zero-copy access to the mempool cache is beneficial for PMD performance, and must be provided by the mempool library to fix [Bug 1052] without a performance regression.
[Bug 1052]: https://bugs.dpdk.org/show_bug.cgi?id=1052
This RFC offers two conceptual variants of zero-copy get:
1. A simple version.
2. A version where existing (hot) objects in the cache are moved to the top of the cache before new objects from the backend driver are pulled in.
I would like some early feedback. Also, which variant do you prefer?
Notes:
* Allowing the 'cache' parameter to be NULL, and getting it from the mempool instead, was inspired by rte_mempool_cache_flush().
* Asserting that the 'mp' parameter is not NULL is not done by other functions, so I omitted it here too.
NB: Please ignore formatting. Also, this code has not even been compile tested.
PS: No promises, but I expect to offer an RFC for zero-copy put too. :-)
1. Simple version:
/**
* Get objects from a mempool via zero-copy access to a user-owned mempool cache.
*
* @param cache
* A pointer to the mempool cache.
* @param mp
* A pointer to the mempool.
* @param n
* The number of objects to prefetch into the mempool cache.
* @return
* The pointer to the objects in the mempool cache.
* NULL on error
* with rte_errno set appropriately.
*/
static __rte_always_inline void *
rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache,
struct rte_mempool *mp,
unsigned int n)
{
unsigned int len;
if (cache == NULL)
cache = rte_mempool_default_cache(mp, rte_lcore_id());
if (cache == NULL) {
rte_errno = EINVAL;
goto fail;
}
rte_mempool_trace_cache_get_bulk(cache, mp, n);
len = cache->len;
if (unlikely(n > len)) {
unsigned int size;
if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) {
rte_errno = EINVAL;
goto fail;
}
/* Fill the cache from the backend; fetch size + requested - len objects. */
size = cache->size;
ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n - len);
if (unlikely(ret < 0)) {
/*
* We are buffer constrained.
* Do not fill the cache, just satisfy the request.
*/
ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - len);
if (unlikely(ret < 0)) {
rte_errno = -ret;
goto fail;
}
len = 0;
} else
len = size;
} else
len -= n;
cache->len = len;
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
return &cache->objs[len];
fail:
RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
return NULL;
}
2. Advanced version:
/**
* Get objects from a mempool via zero-copy access to a user-owned mempool cache.
*
* @param cache
* A pointer to the mempool cache.
* @param mp
* A pointer to the mempool.
* @param n
* The number of objects to prefetch into the mempool cache.
* @return
* The pointer to the objects in the mempool cache.
* NULL on error
* with rte_errno set appropriately.
*/
static __rte_always_inline void *
rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache,
struct rte_mempool *mp,
unsigned int n)
{
unsigned int len;
if (cache == NULL)
cache = rte_mempool_default_cache(mp, rte_lcore_id());
if (cache == NULL) {
rte_errno = EINVAL;
goto fail;
}
rte_mempool_trace_cache_get_bulk(cache, mp, n);
len = cache->len;
if (unlikely(n > len)) {
unsigned int size;
if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) {
rte_errno = EINVAL;
goto fail;
}
/* Fill the cache from the backend; fetch size + requested - len objects. */
size = cache->size;
if (likely(size + n >= 2 * len)) {
/*
* No overlap when copying (dst >= len): size + n - len >= len.
* Move (i.e. copy) the existing objects in the cache to the
* coming top of the cache, to make room for new objects below.
*/
rte_memcpy(&cache->objs[size + n - len], &cache->objs[0], len);
/* Fill the cache below the existing objects in the cache. */
ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[0], size + n - len);
if (unlikely(ret < 0)) {
goto constrained;
} else
len = size;
} else {
/* Fill the cache on top of any objects in it. */
ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n - len);
if (unlikely(ret < 0)) {
constrained:
/*
* We are buffer constrained.
* Do not fill the cache, just satisfy the request.
*/
ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - len);
if (unlikely(ret < 0)) {
rte_errno = -ret;
goto fail;
}
len = 0;
} else
len = size;
}
} else
len -= n;
cache->len = len;
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
return &cache->objs[len];
fail:
RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
return NULL;
}
Med venlig hilsen / Kind regards,
-Morten Brørup
More information about the dev
mailing list