[dpdk-dev] [PATCH] mempool: improbe cache search

Zoltan Kiss zoltan.kiss at linaro.org
Thu Jun 25 20:48:30 CEST 2015


The current way has a few problems:

- if cache->len < n, we copy our elements into the cache first, then
  into obj_table, that's unnecessary
- if n >= cache_size (or the backfill fails), and we can't fulfil the
  request from the ring alone, we don't try to combine with the cache
- if refill fails, we don't return anything, even if the ring has enough
  for our request

This patch rewrites it severely:
- at the first part of the function we only try the cache if cache->len < n
- otherwise take our elements straight from the ring
- if that fails but we have something in the cache, try to combine them
- the refill happens at the end, and its failure doesn't modify our return
  value

Signed-off-by: Zoltan Kiss <zoltan.kiss at linaro.org>
---
 lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index a8054e1..896946c 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 	unsigned lcore_id = rte_lcore_id();
 	uint32_t cache_size = mp->cache_size;
 
-	/* cache is not enabled or single consumer */
+	cache = &mp->local_cache[lcore_id];
+	/* cache is not enabled or single consumer or not enough */
 	if (unlikely(cache_size == 0 || is_mc == 0 ||
-		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+		     cache->len < n || lcore_id >= RTE_MAX_LCORE))
 		goto ring_dequeue;
 
-	cache = &mp->local_cache[lcore_id];
 	cache_objs = cache->objs;
 
-	/* Can this be satisfied from the cache? */
-	if (cache->len < n) {
-		/* No. Backfill the cache first, and then fill from it */
-		uint32_t req = n + (cache_size - cache->len);
-
-		/* How many do we require i.e. number to fill the cache + the request */
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
-		if (unlikely(ret < 0)) {
-			/*
-			 * In the offchance that we are buffer constrained,
-			 * where we are not able to allocate cache + n, go to
-			 * the ring directly. If that fails, we are truly out of
-			 * buffers.
-			 */
-			goto ring_dequeue;
-		}
-
-		cache->len += req;
-	}
-
 	/* Now fill in the response ... */
 	for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
 		*obj_table = cache_objs[len];
@@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 
 	__MEMPOOL_STAT_ADD(mp, get_success, n);
 
-	return 0;
+	ret = 0;
+	goto cache_refill;
 
 ring_dequeue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
@@ -995,11 +976,45 @@ ring_dequeue:
 	else
 		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
 
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+	if (ret < 0 && is_mc == 1 && cache->len > 0) {
+		uint32_t req = n - cache->len;
+
+		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
+		if (ret == 0) {
+			cache_objs = cache->objs;
+			obj_table += req;
+			for (index = 0; index < cache->len;
+			     ++index, ++obj_table)
+				*obj_table = cache_objs[index];
+			cache->len = 0;
+		}
+	}
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
 	if (ret < 0)
 		__MEMPOOL_STAT_ADD(mp, get_fail, n);
 	else
 		__MEMPOOL_STAT_ADD(mp, get_success, n);
 
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+cache_refill:
+	/* If previous dequeue was OK and we have less than n, start refill */
+	if (ret == 0 && cache_size > 0 && cache->len < n) {
+		uint32_t req = cache_size - cache->len;
+
+		cache_objs = cache->objs;
+		ret = rte_ring_mc_dequeue_bulk(mp->ring,
+					       &cache->objs[cache->len],
+					       req);
+		if (likely(ret == 0))
+			cache->len += req;
+		else
+			/* Don't spoil the return value */
+			ret = 0;
+	}
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
 	return ret;
 }
 
-- 
1.9.1



More information about the dev mailing list