[dpdk-dev] [PATCH v3 2/3] lib/rcu: add resource reclamation APIs

Medvedkin, Vladimir vladimir.medvedkin at intel.com
Fri Oct 4 21:01:04 CEST 2019


Hi Honnappa,

On 01/10/2019 07:29, Honnappa Nagarahalli wrote:
> Add resource reclamation APIs to make it simple for applications
> and libraries to integrate rte_rcu library.
>
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli at arm.com>
> Reviewed-by: Ola Liljedhal <ola.liljedhal at arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
> ---
>   app/test/test_rcu_qsbr.c           | 291 ++++++++++++++++++++++++++++-
>   lib/librte_rcu/meson.build         |   2 +
>   lib/librte_rcu/rte_rcu_qsbr.c      | 185 ++++++++++++++++++
>   lib/librte_rcu/rte_rcu_qsbr.h      | 169 +++++++++++++++++
>   lib/librte_rcu/rte_rcu_qsbr_pvt.h  |  46 +++++
>   lib/librte_rcu/rte_rcu_version.map |   4 +
>   lib/meson.build                    |   6 +-
>   7 files changed, 700 insertions(+), 3 deletions(-)
>   create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h
There are compilation errors when building DPDK as a shared library.

I think you need something like:

--- a/lib/librte_rcu/Makefile
+++ b/lib/librte_rcu/Makefile
@@ -8,7 +8,7 @@ LIB = librte_rcu.a

  CFLAGS += -DALLOW_EXPERIMENTAL_API
  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
>
> diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c
> index d1b9e46a2..3a6815243 100644
> --- a/app/test/test_rcu_qsbr.c
> +++ b/app/test/test_rcu_qsbr.c
I think it's better to split unittests patches and the library patches
> @@ -1,8 +1,9 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
> - * Copyright (c) 2018 Arm Limited
> + * Copyright (c) 2019 Arm Limited
>    */
>   
>   #include <stdio.h>
> +#include <string.h>
>   #include <rte_pause.h>
>   #include <rte_rcu_qsbr.h>
>   #include <rte_hash.h>
> @@ -33,6 +34,7 @@ static uint32_t *keys;
>   #define COUNTER_VALUE 4096
>   static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY];
>   static uint8_t writer_done;
> +static uint8_t cb_failed;
>   
>   static struct rte_rcu_qsbr *t[RTE_MAX_LCORE];
>   struct rte_hash *h[RTE_MAX_LCORE];
> @@ -582,6 +584,269 @@ test_rcu_qsbr_thread_offline(void)
>   	return 0;
>   }
>   
> +static void
> +rte_rcu_qsbr_test_free_resource(void *p, void *e)
This function is not a part of DPDK API so it's better to name it like 
test_rcu_qsbr_free_resource().
> +{
> +	if (p != NULL && e != NULL) {
> +		printf("%s: Test failed\n", __func__);
> +		cb_failed = 1;
> +	}
> +}
> +
> +/*
> + * rte_rcu_qsbr_dq_create: create a queue used to store the data structure
> + * elements that can be freed later. This queue is referred to as 'defer queue'.
> + */
> +static int
> +test_rcu_qsbr_dq_create(void)
> +{
> +	char rcu_dq_name[RTE_RING_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +	struct rte_rcu_qsbr_dq *dq;
> +
> +	printf("\nTest rte_rcu_qsbr_dq_create()\n");
> +
> +	/* Pass invalid parameters */
> +	dq = rte_rcu_qsbr_dq_create(NULL);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
> +	params.name = rcu_dq_name;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	params.f = rte_rcu_qsbr_test_free_resource;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
> +	params.v = t[0];
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	params.size = 1;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	params.esize = 3;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
> +
> +	/* Pass all valid parameters */
> +	params.esize = 16;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
> +	rte_rcu_qsbr_dq_delete(dq);
> +
> +	return 0;
> +}
> +
> +/*
> + * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
> + * to be freed later after atleast one grace period is over.
> + */
> +static int
> +test_rcu_qsbr_dq_enqueue(void)
> +{
> +	int ret;
> +	uint64_t r;
> +	char rcu_dq_name[RTE_RING_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +	struct rte_rcu_qsbr_dq *dq;
> +
> +	printf("\nTest rte_rcu_qsbr_dq_enqueue()\n");
> +
> +	/* Create a queue with simple parameters */
> +	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
> +	params.name = rcu_dq_name;
> +	params.f = rte_rcu_qsbr_test_free_resource;
> +	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
> +	params.v = t[0];
> +	params.size = 1;
> +	params.esize = 16;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
> +
> +	/* Pass invalid parameters */
> +	ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
> +
> +	ret = rte_rcu_qsbr_dq_enqueue(dq, NULL);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
> +
> +	ret = rte_rcu_qsbr_dq_enqueue(NULL, &r);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
> +
> +	ret = rte_rcu_qsbr_dq_delete(dq);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params");
> +
> +	return 0;
> +}
> +
> +/*
> + * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue.
> + */
> +static int
> +test_rcu_qsbr_dq_reclaim(void)
> +{
> +	int ret;
> +
> +	printf("\nTest rte_rcu_qsbr_dq_reclaim()\n");
> +
> +	/* Pass invalid parameters */
> +	ret = rte_rcu_qsbr_dq_reclaim(NULL);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
> +
> +	return 0;
> +}
> +
> +/*
> + * rte_rcu_qsbr_dq_delete: Delete a defer queue.
> + */
> +static int
> +test_rcu_qsbr_dq_delete(void)
> +{
> +	int ret;
> +	char rcu_dq_name[RTE_RING_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +	struct rte_rcu_qsbr_dq *dq;
> +
> +	printf("\nTest rte_rcu_qsbr_dq_delete()\n");
> +
> +	/* Pass invalid parameters */
> +	ret = rte_rcu_qsbr_dq_delete(NULL);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq delete invalid params");
> +
> +	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
> +	params.name = rcu_dq_name;
> +	params.f = rte_rcu_qsbr_test_free_resource;
> +	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
> +	params.v = t[0];
> +	params.size = 1;
> +	params.esize = 16;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
> +	ret = rte_rcu_qsbr_dq_delete(dq);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
> +
> +	return 0;
> +}
> +
> +/*
> + * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
> + * to be freed later after atleast one grace period is over.
> + */
> +static int
> +test_rcu_qsbr_dq_functional(int32_t size, int32_t esize)
> +{
> +	int i, j, ret;
> +	char rcu_dq_name[RTE_RING_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +	struct rte_rcu_qsbr_dq *dq;
> +	uint64_t *e;
> +	uint64_t sc = 200;
> +	int max_entries;
> +
> +	printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n");
> +	printf("Size = %d, esize = %d\n", size, esize);
> +
> +	e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE);
> +	if (e == NULL)
> +		return 0;
> +	cb_failed = 0;
> +
> +	/* Initialize the RCU variable. No threads are registered */
> +	rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
> +
> +	/* Create a queue with simple parameters */
> +	memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
> +	params.name = rcu_dq_name;
> +	params.f = rte_rcu_qsbr_test_free_resource;
> +	params.v = t[0];
> +	params.size = size;
> +	params.esize = esize;
> +	dq = rte_rcu_qsbr_dq_create(&params);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
> +
> +	/* Given the size and esize, calculate the maximum number of entries
> +	 * that can be stored on the defer queue (look at the logic used
> +	 * in capacity calculation of rte_ring).
> +	 */
> +	max_entries = rte_align32pow2(((esize/8 + 1) * size) + 1);
> +	max_entries = (max_entries - 1)/(esize/8 + 1);
> +
> +	/* Enqueue few counters starting with the value 'sc' */
> +	/* The queue size will be rounded up to 2. The enqueue API also
> +	 * reclaims if the queue size is above certain limit. Since, there
> +	 * are no threads registered, reclamation succedes. Hence, it should
> +	 * be possible to enqueue more than the provided queue size.
> +	 */
> +	for (i = 0; i < 10; i++) {
> +		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
> +		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
> +			"dq enqueue functional");
> +		for (j = 0; j < esize/8; j++)
> +			e[j] = sc++;
> +	}
> +
> +	/* Register a thread on the RCU QSBR variable. Reclamation will not
> +	 * succeed. It should not be possible to enqueue more than the size
> +	 * number of resources.
> +	 */
> +	rte_rcu_qsbr_thread_register(t[0], 1);
> +	rte_rcu_qsbr_thread_online(t[0], 1);
> +
> +	for (i = 0; i < max_entries; i++) {
> +		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
> +		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
> +			"dq enqueue functional");
> +		for (j = 0; j < esize/8; j++)
> +			e[j] = sc++;
> +	}
> +
> +	/* Enqueue fails as queue is full */
> +	ret = rte_rcu_qsbr_dq_enqueue(dq, e);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
> +
> +	/* Delete should fail as there are elements in defer queue which
> +	 * cannot be reclaimed.
> +	 */
> +	ret = rte_rcu_qsbr_dq_delete(dq);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params");
> +
> +	/* Report quiescent state, enqueue should succeed */
> +	rte_rcu_qsbr_quiescent(t[0], 1);
> +	for (i = 0; i < max_entries; i++) {
> +		ret = rte_rcu_qsbr_dq_enqueue(dq, e);
> +		TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
> +			"dq enqueue functional");
> +		for (j = 0; j < esize/8; j++)
> +			e[j] = sc++;
> +	}
> +
> +	/* Queue is full */
> +	ret = rte_rcu_qsbr_dq_enqueue(dq, e);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
> +
> +	/* Report quiescent state, delete should succeed */
> +	rte_rcu_qsbr_quiescent(t[0], 1);
> +	ret = rte_rcu_qsbr_dq_delete(dq);
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
> +
> +	/* Validate that call back function did not return any error */
> +	TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed");
> +
> +	rte_free(e);
> +	return 0;
> +}
> +
>   /*
>    * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
>    */
> @@ -1025,6 +1290,18 @@ test_rcu_qsbr_main(void)
>   	if (test_rcu_qsbr_thread_offline() < 0)
>   		goto test_fail;
>   
> +	if (test_rcu_qsbr_dq_create() < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_reclaim() < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_delete() < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_enqueue() < 0)
> +		goto test_fail;
> +
>   	printf("\nFunctional tests\n");
>   
>   	if (test_rcu_qsbr_sw_sv_3qs() < 0)
> @@ -1033,6 +1310,18 @@ test_rcu_qsbr_main(void)
>   	if (test_rcu_qsbr_mw_mv_mqs() < 0)
>   		goto test_fail;
>   
> +	if (test_rcu_qsbr_dq_functional(1, 8) < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_functional(2, 8) < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_functional(303, 16) < 0)
> +		goto test_fail;
> +
> +	if (test_rcu_qsbr_dq_functional(7, 128) < 0)
> +		goto test_fail;
> +
>   	free_rcu();
>   
>   	printf("\n");
> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
> index 62920ba02..e280b29c1 100644
> --- a/lib/librte_rcu/meson.build
> +++ b/lib/librte_rcu/meson.build
> @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
>   if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
>   	ext_deps += cc.find_library('atomic')
>   endif
> +
> +deps += ['ring']
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> index ce7f93dd3..76814f50b 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.c
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -21,6 +21,7 @@
>   #include <rte_errno.h>
>   
>   #include "rte_rcu_qsbr.h"
> +#include "rte_rcu_qsbr_pvt.h"
>   
>   /* Get the memory size of QSBR variable */
>   size_t
> @@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
>   	return 0;
>   }
>   
> +/* Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + */
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
> +{
> +	struct rte_rcu_qsbr_dq *dq;
> +	uint32_t qs_fifo_size;
> +
> +	if (params == NULL || params->f == NULL ||
> +		params->v == NULL || params->name == NULL ||
> +		params->size == 0 || params->esize == 0 ||
> +		(params->esize % 8 != 0)) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return NULL;
> +	}
> +
> +	dq = rte_zmalloc(NULL,
> +		(sizeof(struct rte_rcu_qsbr_dq) + params->esize),
> +		RTE_CACHE_LINE_SIZE);
> +	if (dq == NULL) {
> +		rte_errno = ENOMEM;
> +
> +		return NULL;
> +	}
> +
> +	/* round up qs_fifo_size to next power of two that is not less than
> +	 * max_size.
> +	 */
> +	qs_fifo_size = rte_align32pow2((((params->esize/8) + 1)
> +					* params->size) + 1);
> +	dq->r = rte_ring_create(params->name, qs_fifo_size,
> +					SOCKET_ID_ANY, 0);
> +	if (dq->r == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): defer queue create failed\n", __func__);
> +		rte_free(dq);
> +		return NULL;
> +	}
> +
> +	dq->v = params->v;
> +	dq->size = params->size;
> +	dq->esize = params->esize;
> +	dq->f = params->f;
> +	dq->p = params->p;
> +
> +	return dq;
> +}
> +
> +/* Enqueue one resource to the defer queue to free after the grace
> + * period is over.
> + */
> +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
> +{
> +	uint64_t token;
> +	uint64_t *tmp;
> +	uint32_t i;
> +	uint32_t cur_size, free_size;
> +
> +	if (dq == NULL || e == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	/* Start the grace period */
> +	token = rte_rcu_qsbr_start(dq->v);
> +
> +	/* Reclaim resources if the queue is 1/8th full. This helps
> +	 * the queue from growing too large and allows time for reader
> +	 * threads to report their quiescent state.
> +	 */
> +	cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1);
> +	if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) {
> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +			"%s(): Triggering reclamation\n", __func__);
> +		rte_rcu_qsbr_dq_reclaim(dq);
> +	}
> +
> +	/* Check if there is space for atleast for 1 resource */
> +	free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1);
> +	if (!free_size) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Defer queue is full\n", __func__);
> +		rte_errno = ENOSPC;
> +		return 1;
> +	}
> +
> +	/* Enqueue the resource */
> +	rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token);
> +
> +	/* The resource to enqueue needs to be a multiple of 64b
> +	 * due to the limitation of the rte_ring implementation.
> +	 */
> +	for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++)
> +		rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp);
> +
> +	return 0;
> +}
> +
> +/* Reclaim resources from the defer queue. */
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq)
> +{
> +	uint32_t max_cnt;
> +	uint32_t cnt;
> +	void *token;
> +	uint64_t *tmp;
> +	uint32_t i;
> +
> +	if (dq == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	/* Anything to reclaim? */
> +	if (rte_ring_count(dq->r) == 0)
> +		return 0;
> +
> +	/* Reclaim at the max 1/16th the total number of entries. */
> +	max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT;
> +	max_cnt = (max_cnt == 0) ? dq->size : max_cnt;
> +	cnt = 0;
> +
> +	/* Check reader threads quiescent state and reclaim resources */
> +	while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) &&
> +		(rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false)
> +			== 1)) {
> +		(void)rte_ring_sc_dequeue(dq->r, &token);
> +		/* The resource to dequeue needs to be a multiple of 64b
> +		 * due to the limitation of the rte_ring implementation.
> +		 */
> +		for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8;
> +			i++, tmp++)
> +			(void)rte_ring_sc_dequeue(dq->r,
> +					(void *)(uintptr_t)tmp);
> +		dq->f(dq->p, dq->e);
> +
> +		cnt++;
> +	}
> +
> +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +		"%s(): Reclaimed %u resources\n", __func__, cnt);
> +
> +	if (cnt == 0) {
> +		/* No resources were reclaimed */
> +		rte_errno = EAGAIN;
> +		return 1;
> +	}
> +
> +	return 0;
> +}
> +
> +/* Delete a defer queue. */
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
> +{
> +	if (dq == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	/* Reclaim all the resources */
> +	if (rte_rcu_qsbr_dq_reclaim(dq) != 0)
> +		/* Error number is already set by the reclaim API */
> +		return 1;
Here could be a potential problem. rte_rcu_qsbr_dq_reclai() reclaims 
only max_cnt entries that is 1/16 of possible enqueued entries, so the 
rest won't be reclaimed.
> +
> +	rte_ring_free(dq->r);
> +	rte_free(dq);
> +
> +	return 0;
> +}
> +
>   int rte_rcu_log_type;
>   
>   RTE_INIT(rte_rcu_register)
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> index c80f15c00..185d4b50a 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.h
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -34,6 +34,7 @@ extern "C" {
>   #include <rte_lcore.h>
>   #include <rte_debug.h>
>   #include <rte_atomic.h>
> +#include <rte_ring.h>
I think it's better to move this include into rte_rcu_qsbr.c
>   
>   extern int rte_rcu_log_type;
>   
> @@ -109,6 +110,67 @@ struct rte_rcu_qsbr {
>   	 */
>   } __rte_cache_aligned;
>   
> +/**
> + * Call back function called to free the resources.
> + *
> + * @param p
> + *   Pointer provided while creating the defer queue
> + * @param e
> + *   Pointer to the resource data stored on the defer queue
> + *
> + * @return
> + *   None
> + */
> +typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e);
> +
> +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
I don't see the usage of this macro anywhere in the rcu library (I see 
you are using it in LPM).

char rcu_dq_name[RTE_RING_NAMESIZE];
is using instead in the tests.
+ See my comments for  [PATCH v3 1/3] lib/lpm: integrate RCU QSBR
> +
> +/**
> + *  Trigger automatic reclamation after 1/8th the defer queue is full.
> + */
> +#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3
> +
> +/**
> + *  Reclaim at the max 1/16th the total number of resources.
> + */
> +#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4
Those two defines could be moved into .c file.
> +
> +/**
> + * Parameters used when creating the defer queue.
> + */
> +struct rte_rcu_qsbr_dq_parameters {
> +	const char *name;
> +	/**< Name of the queue. */
> +	uint32_t size;
> +	/**< Number of entries in queue. Typically, this will be
> +	 *   the same as the maximum number of entries supported in the
> +	 *   lock free data structure.
> +	 *   Data structures with unbounded number of entries is not
> +	 *   supported currently.
> +	 */
> +	uint32_t esize;
> +	/**< Size (in bytes) of each element in the defer queue.
> +	 *   This has to be multiple of 8B as the rte_ring APIs
> +	 *   support 8B element sizes only.
> +	 */
> +	rte_rcu_qsbr_free_resource f;
> +	/**< Function to call to free the resource. */
> +	void *p;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 *   pointer to the data structure to which the resource to free
> +	 *   belongs. This can be NULL.
> +	 */
> +	struct rte_rcu_qsbr *v;
> +	/**< RCU QSBR variable to use for this defer queue */
> +};
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq;
> +
>   /**
>    * @warning
>    * @b EXPERIMENTAL: this API may change without prior notice
> @@ -648,6 +710,113 @@ __rte_experimental
>   int
>   rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
>   
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + *
> + * @param params
> + *   Parameters to create a defer queue.
> + * @return
> + *   On success - Valid pointer to defer queue
> + *   On error - NULL
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOMEM - Not enough memory
> + */
> +__rte_experimental
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Enqueue one resource to the defer queue and start the grace period.
> + * The resource will be freed later after at least one grace period
> + * is over.
> + *
> + * If the defer queue is full, it will attempt to reclaim resources.
> + * It will also reclaim resources at regular intervals to avoid
> + * the defer queue from growing too big.
> + *
> + * This API is not multi-thread safe. It is expected that the caller
> + * provides multi-thread safety by locking a mutex or some other means.
> + *
> + * A lock free multi-thread writer algorithm could achieve multi-thread
> + * safety by creating and using one defer queue per thread.
> + *
> + * @param dq
> + *   Defer queue to allocate an entry from.
> + * @param e
> + *   Pointer to resource data to copy to the defer queue. The size of
> + *   the data to copy is equal to the element size provided when the
> + *   defer queue was created.
> + * @return
> + *   On success - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOSPC - Defer queue is full. This condition can not happen
> + *		if the defer queue size is equal (or larger) than the
> + *		number of elements in the data structure.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Reclaim resources from the defer queue.
> + *
> + * This API is not multi-thread safe. It is expected that the caller
> + * provides multi-thread safety by locking a mutex or some other means.
> + *
> + * A lock free multi-thread writer algorithm could achieve multi-thread
> + * safety by creating and using one defer queue per thread.
> + *
> + * @param dq
> + *   Defer queue to reclaim an entry from.
> + * @return
> + *   On successful reclamation of at least 1 resource - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + *   - EAGAIN - None of the resources have completed at least 1 grace period,
> + *		try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Delete a defer queue.
> + *
> + * It tries to reclaim all the resources on the defer queue.
> + * If any of the resources have not completed the grace period
> + * the reclamation stops and returns immediately. The rest of
> + * the resources are not reclaimed and the defer queue is not
> + * freed.
> + *
> + * @param dq
> + *   Defer queue to delete.
> + * @return
> + *   On success - 0
> + *   On error - 1
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - EAGAIN - Some of the resources have not completed at least 1 grace
> + *		period, try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> +
>   #ifdef __cplusplus
>   }
>   #endif
> diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
> new file mode 100644
> index 000000000..2122bc36a
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
> @@ -0,0 +1,46 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2019 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_PVT_H_
> +#define _RTE_RCU_QSBR_PVT_H_
> +
> +/**
> + * This file is private to the RCU library. It should not be included
> + * by the user of this library.
> + */
Why this struct definition is separated into private .h? Maybe just 
define it in rte_rcu_qsbr.c instead?
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq {
> +	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> +	struct rte_ring *r;     /**< RCU QSBR defer queue. */
> +	uint32_t size;
> +	/**< Number of elements in the defer queue */
> +	uint32_t esize;
> +	/**< Size (in bytes) of data stored on the defer queue */
> +	rte_rcu_qsbr_free_resource f;
> +	/**< Function to call to free the resource. */
> +	void *p;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 *   pointer to the data structure to which the resource to free
> +	 *   belongs.
> +	 */
> +	char e[0];
> +	/**< Temporary storage to copy the defer queue element. */
> +};
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
> index f8b9ef2ab..dfac88a37 100644
> --- a/lib/librte_rcu/rte_rcu_version.map
> +++ b/lib/librte_rcu/rte_rcu_version.map
> @@ -8,6 +8,10 @@ EXPERIMENTAL {
>   	rte_rcu_qsbr_synchronize;
>   	rte_rcu_qsbr_thread_register;
>   	rte_rcu_qsbr_thread_unregister;
> +	rte_rcu_qsbr_dq_create;
> +	rte_rcu_qsbr_dq_enqueue;
> +	rte_rcu_qsbr_dq_reclaim;
> +	rte_rcu_qsbr_dq_delete;
>   
>   	local: *;
>   };
> diff --git a/lib/meson.build b/lib/meson.build
> index e5ff83893..0e1be8407 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -11,7 +11,9 @@
>   libraries = [
>   	'kvargs', # eal depends on kvargs
>   	'eal', # everything depends on eal
> -	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> +	'ring',
> +	'rcu', # rcu depends on ring
> +	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
>   	'cmdline',
>   	'metrics', # bitrate/latency stats depends on this
>   	'hash',    # efd depends on this
> @@ -22,7 +24,7 @@ libraries = [
>   	'gro', 'gso', 'ip_frag', 'jobstats',
>   	'kni', 'latencystats', 'lpm', 'member',
>   	'power', 'pdump', 'rawdev',
> -	'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
> +	'reorder', 'sched', 'security', 'stack', 'vhost',
>   	# ipsec lib depends on net, crypto and security
>   	'ipsec',
>   	# add pkt framework libs which use other libs from above

-- 
Regards,
Vladimir



More information about the dev mailing list