[dpdk-dev] [RFC 2/3] tqs: add thread quiescent state library
Ananyev, Konstantin
konstantin.ananyev at intel.com
Sat Nov 24 13:18:00 CET 2018
Hi Honnappa,
> +
> +/* Allocate a new TQS variable with the name *name* in memory. */
> +struct rte_tqs * __rte_experimental
> +rte_tqs_alloc(const char *name, int socket_id, uint64_t lcore_mask)
> +{
> + char tqs_name[RTE_TQS_NAMESIZE];
> + struct rte_tailq_entry *te, *tmp_te;
> + struct rte_tqs_list *tqs_list;
> + struct rte_tqs *v, *tmp_v;
> + int ret;
> +
> + if (name == NULL) {
> + RTE_LOG(ERR, TQS, "Invalid input parameters\n");
> + rte_errno = -EINVAL;
> + return NULL;
> + }
> +
> + te = rte_zmalloc("TQS_TAILQ_ENTRY", sizeof(*te), 0);
> + if (te == NULL) {
> + RTE_LOG(ERR, TQS, "Cannot reserve memory for tailq\n");
> + rte_errno = -ENOMEM;
> + return NULL;
> + }
> +
> + snprintf(tqs_name, sizeof(tqs_name), "%s", name);
> + v = rte_zmalloc_socket(tqs_name, sizeof(struct rte_tqs),
> + RTE_CACHE_LINE_SIZE, socket_id);
> + if (v == NULL) {
> + RTE_LOG(ERR, TQS, "Cannot reserve memory for TQS variable\n");
> + rte_errno = -ENOMEM;
> + goto alloc_error;
> + }
> +
> + ret = snprintf(v->name, sizeof(v->name), "%s", name);
> + if (ret < 0 || ret >= (int)sizeof(v->name)) {
> + rte_errno = -ENAMETOOLONG;
> + goto alloc_error;
> + }
> +
> + te->data = (void *) v;
> + v->lcore_mask = lcore_mask;
> +
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + tqs_list = RTE_TAILQ_CAST(rte_tqs_tailq.head, rte_tqs_list);
> +
> + /* Search if a TQS variable with the same name exists already */
> + TAILQ_FOREACH(tmp_te, tqs_list, next) {
> + tmp_v = (struct rte_tqs *) tmp_te->data;
> + if (strncmp(name, tmp_v->name, RTE_TQS_NAMESIZE) == 0)
> + break;
> + }
> +
> + if (tmp_te != NULL) {
> + rte_errno = -EEXIST;
> + goto tqs_exist;
> + }
> +
> + TAILQ_INSERT_TAIL(tqs_list, te, next);
> +
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + return v;
> +
> +tqs_exist:
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +alloc_error:
> + rte_free(te);
> + rte_free(v);
> + return NULL;
> +}
That seems quite heavy-weight function just to allocate sync variable.
As size of struct rte_tqs is constant and known to the user, might be better
just provide rte_tqs_init(struct rte_tqs *tqs, ...) and let user allocate/free
memory for it by himself.
> +
> +/* Add a reader thread, running on an lcore, to the list of threads
> + * reporting their quiescent state on a TQS variable.
> + */
> +int __rte_experimental
> +rte_tqs_register_lcore(struct rte_tqs *v, unsigned int lcore_id)
> +{
> + TQS_RETURN_IF_TRUE((v == NULL || lcore_id >= RTE_TQS_MAX_LCORE),
> + -EINVAL);
It is not very good practice to make function return different values and behave
in a different way in debug/non-debug mode.
I'd say that for slow-path (functions in .c) it is always good to check input parameters.
For fast-path (functions in .h) we sometimes skip such checking,
but debug mode can probably use RTE_ASSERT() or so.
lcore_id >= RTE_TQS_MAX_LCORE
Is this limitation really necessary?
First it means that only lcores can use that API (at least data-path part), second
even today many machines have more than 64 cores.
I think you can easily avoid such limitation, if instead of requiring lcore_id as
input parameter, you'll just make it return index of next available entry in w[].
Then tqs_update() can take that index as input parameter.
> +
> + /* Worker thread has to count the quiescent states
> + * only from the current value of token.
> + */
> + v->w[lcore_id].cnt = v->token;
Wonder what would happen, if new reader will
call register(), after writer calls start()?
Looks like a race-condition.
Or such pattern is not supported?
> +
> + /* Release the store to initial TQS count so that workers
> + * can use it immediately after this function returns.
> + */
> + __atomic_fetch_or(&v->lcore_mask, (1UL << lcore_id), __ATOMIC_RELEASE);
> +
> + return 0;
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Trigger the worker threads to report the quiescent state
> + * status.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe and can be called from the worker threads as well.
> + *
> + * @param v
> + * TQS variable
> + * @param n
> + * Expected number of times the quiescent state is entered
> + * @param t
> + * - If successful, this is the token for this call of the API.
> + * This should be passed to rte_tqs_check API.
> + * @return
> + * - -EINVAL if the parameters are invalid (debug mode compilation only).
> + * - 0 Otherwise and always (non-debug mode compilation).
> + */
> +static __rte_always_inline int __rte_experimental
> +rte_tqs_start(struct rte_tqs *v, unsigned int n, uint32_t *t)
> +{
> + TQS_RETURN_IF_TRUE((v == NULL || t == NULL), -EINVAL);
> +
> + /* This store release will ensure that changes to any data
> + * structure are visible to the workers before the token
> + * update is visible.
> + */
> + *t = __atomic_add_fetch(&v->token, n, __ATOMIC_RELEASE);
> +
> + return 0;
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Update quiescent state for the worker thread on a lcore.
> + *
> + * This is implemented as a lock-free function. It is multi-thread safe.
> + * All the worker threads registered to report their quiescent state
> + * on the TQS variable must call this API.
> + *
> + * @param v
> + * TQS variable
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_tqs_update(struct rte_tqs *v, unsigned int lcore_id)
> +{
> + uint32_t t;
> +
> + TQS_ERR_LOG_IF_TRUE(v == NULL || lcore_id >= RTE_TQS_MAX_LCORE);
> +
> + /* Load the token before the worker thread loads any other
> + * (lock-free) data structure. This ensures that updates
> + * to the data structures are visible if the update
> + * to token is visible.
> + */
> + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
Hmm, I am not very familiar with C11 model, but it looks like a race
condition to me:
as I understand, update() supposed be called at the end of reader's
critical section, correct?
But ACQUIRE is only a hoist barrier, which means compiler and cpu
are free to move earlier reads (and writes) after it.
It probably needs to be a full ACQ_REL here.
> + if (v->w[lcore_id].cnt != t)
> + v->w[lcore_id].cnt++;
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Checks if all the worker threads have entered the quiescent state
> + * 'n' number of times. 'n' is provided in rte_tqs_start API.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe and can be called from the worker threads as well.
> + *
> + * @param v
> + * TQS variable
> + * @param t
> + * Token returned by rte_tqs_start API
> + * @param wait
> + * If true, block till all the worker threads have completed entering
> + * the quiescent state 'n' number of times
> + * @return
> + * - 0 if all worker threads have NOT passed through specified number
> + * of quiescent states.
> + * - 1 if all worker threads have passed through specified number
> + * of quiescent states.
> + * - -EINVAL if the parameters are invalid (debug mode compilation only).
> + */
> +static __rte_always_inline int __rte_experimental
> +rte_tqs_check(struct rte_tqs *v, uint32_t t, bool wait)
> +{
> + uint64_t l;
> + uint64_t lcore_mask;
> +
> + TQS_RETURN_IF_TRUE((v == NULL), -EINVAL);
> +
> + do {
> + /* Load the current lcore_mask before loading the
> + * worker thread quiescent state counters.
> + */
> + lcore_mask = __atomic_load_n(&v->lcore_mask, __ATOMIC_ACQUIRE);
What would happen if reader will call unregister() simultaneously
with check() and will update lcore_mask straight after that load?
As I understand check() might hang in such case.
> +
> + while (lcore_mask) {
> + l = __builtin_ctz(lcore_mask);
> + if (v->w[l].cnt != t)
> + break;
As I understand, that makes control-path function progress dependent
on simultaneous invocation of data-path functions.
In some cases that might cause control-path to hang.
Let say if data-path function wouldn't be called, or user invokes
control-path and data-path functions from the same thread.
> +
> + lcore_mask &= ~(1UL << l);
> + }
> +
> + if (lcore_mask == 0)
> + return 1;
> +
> + rte_pause();
> + } while (wait);
> +
> + return 0;
> +}
> +
More information about the dev
mailing list