[dpdk-dev] [PATCH 1/3] rcu: add RCU library supporting QSBR mechanism

Ananyev, Konstantin konstantin.ananyev at intel.com
Fri Mar 22 17:42:09 CET 2019


Hi Honnappa,

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> new file mode 100644
> index 000000000..0fc4515ea
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -0,0 +1,99 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <errno.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_malloc.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_errno.h>
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* Get the memory size of QSBR variable */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads)
> +{
> +	size_t sz;
> +
> +	RTE_ASSERT(max_threads == 0);

Here and in all similar places:
assert() will abort when its condition will be evaluated to false.
So it should be max_threads != 0.
Also it a public and non-datapath function.
Calling assert() for invalid input parameter - seems way too extreme.
Why not just return error to the caller? 

> +
> +	sz = sizeof(struct rte_rcu_qsbr);
> +
> +	/* Add the size of quiescent state counter array */
> +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
> +
> +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +}
> +
> +/* Initialize a quiescent state variable */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
> +{
> +	RTE_ASSERT(v == NULL);
> +
> +	memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
> +	v->m_threads = max_threads;
> +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
> +	v->token = RTE_QSBR_CNT_INIT;
> +}
> +
> +/* Dump the details of a single quiescent state variable to a file. */
> +void __rte_experimental
> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> +{
> +	uint64_t bmap;
> +	uint32_t i, t;
> +
> +	RTE_ASSERT(v == NULL || f == NULL);
> +
> +	fprintf(f, "\nQuiescent State Variable @%p\n", v);
> +
> +	fprintf(f, "  QS variable memory size = %lu\n",
> +				rte_rcu_qsbr_get_memsize(v->m_threads));
> +	fprintf(f, "  Given # max threads = %u\n", v->m_threads);
> +
> +	fprintf(f, "  Registered thread ID mask = 0x");
> +	for (i = 0; i < v->num_elems; i++)
> +		fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
> +					__ATOMIC_ACQUIRE));
> +	fprintf(f, "\n");
> +
> +	fprintf(f, "  Token = %lu\n",
> +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
> +
> +	fprintf(f, "Quiescent State Counts for readers:\n");
> +	for (i = 0; i < v->num_elems; i++) {
> +		bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
> +		while (bmap) {
> +			t = __builtin_ctzl(bmap);
> +			fprintf(f, "thread ID = %d, count = %lu\n", t,
> +				__atomic_load_n(
> +					&RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
> +					__ATOMIC_RELAXED));
> +			bmap &= ~(1UL << t);
> +		}
> +	}
> +}
> +
> +int rcu_log_type;
> +
> +RTE_INIT(rte_rcu_register)
> +{
> +	rcu_log_type = rte_log_register("lib.rcu");
> +	if (rcu_log_type >= 0)
> +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
> +}
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> new file mode 100644
> index 000000000..83943f751
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -0,0 +1,511 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_H_
> +#define _RTE_RCU_QSBR_H_
> +
> +/**
> + * @file
> + * RTE Quiescent State Based Reclamation (QSBR)
> + *
> + * Quiescent State (QS) is any point in the thread execution
> + * where the thread does not hold a reference to a data structure
> + * in shared memory. While using lock-less data structures, the writer
> + * can safely free memory once all the reader threads have entered
> + * quiescent state.
> + *
> + * This library provides the ability for the readers to report quiescent
> + * state and for the writers to identify when all the readers have
> + * entered quiescent state.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_debug.h>
> +
> +extern int rcu_log_type;
> +
> +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
> +#define RCU_DP_LOG(level, fmt, args...) \
> +	rte_log(RTE_LOG_ ## level, rcu_log_type, \
> +		"%s(): " fmt "\n", __func__, ## args)
> +#else
> +#define RCU_DP_LOG(level, fmt, args...)
> +#endif

Why do you need that?
Can't you use RTE_LOG_DP() instead?

> +
> +/* Registered thread IDs are stored as a bitmap of 64b element array.
> + * Given thread id needs to be converted to index into the array and
> + * the id within the array element.
> + */
> +#define RTE_RCU_MAX_THREADS 1024
> +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
> +#define RTE_QSBR_THRID_ARRAY_ELEMS \
> +	(RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \
> +	 RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE)
> +#define RTE_QSBR_THRID_INDEX_SHIFT 6
> +#define RTE_QSBR_THRID_MASK 0x3f
> +#define RTE_QSBR_THRID_INVALID 0xffffffff
> +
> +/* Worker thread counter */
> +struct rte_rcu_qsbr_cnt {
> +	uint64_t cnt;
> +	/**< Quiescent state counter. Value 0 indicates the thread is offline */
> +} __rte_cache_aligned;
> +
> +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1)) + i)

You can probably add
struct rte_rcu_qsbr_cnt cnt[0];
at the end of struct rte_rcu_qsbr, then wouldn't need macro above.

> +#define RTE_QSBR_CNT_THR_OFFLINE 0
> +#define RTE_QSBR_CNT_INIT 1
> +
> +/**
> + * RTE thread Quiescent State structure.
> + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'),
> + * whose size is dependent on the maximum number of reader threads
> + * (m_threads) using this variable is stored immediately following
> + * this structure.
> + */
> +struct rte_rcu_qsbr {
> +	uint64_t token __rte_cache_aligned;
> +	/**< Counter to allow for multiple simultaneous QS queries */
> +
> +	uint32_t num_elems __rte_cache_aligned;
> +	/**< Number of elements in the thread ID array */
> +	uint32_t m_threads;
> +	/**< Maximum number of threads this RCU variable will use */
> +
> +	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned;
> +	/**< Registered thread IDs are stored in a bitmap array */


As I understand you ended up with fixed size array to avoid 2 variable size arrays in this struct?
Is that big penalty for register/unregister() to either store a pointer to bitmap, or calculate it based on num_elems value?
As another thought - do we really need bitmap at all?
Might it is possible to sotre register value for each thread inside it's rte_rcu_qsbr_cnt:
struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_aligned;
?
That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array,
but from other side would help to avoid cache conflicts for register/unregister. 

> +} __rte_cache_aligned;
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Return the size of the memory occupied by a Quiescent State variable.
> + *
> + * @param max_threads
> + *   Maximum number of threads reporting quiescent state on this variable.
> + * @return
> + *   Size of memory in bytes required for this QS variable.
> + */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Initialize a Quiescent State (QS) variable.
> + *
> + * @param v
> + *   QS variable
> + * @param max_threads
> + *   Maximum number of threads reporting QS on this variable.
> + *
> + */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Register a reader thread to report its quiescent state
> + * on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + * Any reader thread that wants to report its quiescent state must
> + * call this API. This can be called during initialization or as part
> + * of the packet processing loop.
> + *
> + * Note that rte_rcu_qsbr_thread_online must be called before the
> + * thread updates its QS using rte_rcu_qsbr_update.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	unsigned int i, id;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	id = thread_id & RTE_QSBR_THRID_MASK;
> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +	/* Release the new register thread ID to other threads
> +	 * calling rte_rcu_qsbr_check.
> +	 */
> +	__atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Remove a reader thread, from the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread safe.
> + * This API can be called from the reader threads during shutdown.
> + * Ongoing QS queries will stop waiting for the status from this
> + * unregistered reader thread.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will stop reporting its quiescent
> + *   state on the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	unsigned int i, id;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	id = thread_id & RTE_QSBR_THRID_MASK;
> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +	/* Make sure the removal of the thread from the list of
> +	 * reporting threads is visible before the thread
> +	 * does anything else.
> +	 */
> +	__atomic_fetch_and(&v->reg_thread_id[i],
> +				~(1UL << id), __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Add a registered reader thread, to the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + *
> + * Any registered reader thread that wants to report its quiescent state must
> + * call this API before calling rte_rcu_qsbr_update. This can be called
> + * during initialization or as part of the packet processing loop.
> + *
> + * The reader thread must call rte_rcu_thread_offline API, before
> + * calling any functions that block, to ensure that rte_rcu_qsbr_check
> + * API does not wait indefinitely for the reader thread to update its QS.
> + *
> + * The reader thread must call rte_rcu_thread_online API, after the blocking
> + * function call returns, to ensure that rte_rcu_qsbr_check API
> + * waits for the reader thread to update its QS.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	uint64_t t;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	/* Copy the current value of token.
> +	 * The fence at the end of the function will ensure that
> +	 * the following will not move down after the load of any shared
> +	 * data structure.
> +	 */
> +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
> +
> +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
> +	 * 'cnt' (64b) is accessed atomically.
> +	 */
> +	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
> +		t, __ATOMIC_RELAXED);
> +
> +	/* The subsequent load of the data structure should not
> +	 * move above the store. Hence a store-load barrier
> +	 * is required.
> +	 * If the load of the data structure moves above the store,
> +	 * writer might not see that the reader is online, even though
> +	 * the reader is referencing the shared data structure.
> +	 */
> +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

If it has to generate a proper memory-barrier here anyway,
could it use rte_smp_mb() here?
At least for IA it would generate more lightweight one. 
Konstantin

> +}
> +


More information about the dev mailing list