[dpdk-dev] [PATCH v3 1/4] hash: add k32v64 hash library
Ananyev, Konstantin
konstantin.ananyev at intel.com
Thu Apr 23 15:31:20 CEST 2020
Hi Vladimir,
Apologies for late review.
My comments below.
> K32V64 hash is a hash table that supports 32 bit keys and 64 bit values.
> This table is hash function agnostic so user must provide
> precalculated hash signature for add/delete/lookup operations.
>
> Signed-off-by: Vladimir Medvedkin <vladimir.medvedkin at intel.com>
> ---
>
> --- /dev/null
> +++ b/lib/librte_hash/rte_k32v64_hash.c
> @@ -0,0 +1,315 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#include <string.h>
> +
> +#include <rte_eal_memconfig.h>
> +#include <rte_errno.h>
> +#include <rte_malloc.h>
> +#include <rte_memory.h>
> +#include <rte_tailq.h>
> +
> +#include <rte_k32v64_hash.h>
> +
> +TAILQ_HEAD(rte_k32v64_hash_list, rte_tailq_entry);
> +
> +static struct rte_tailq_elem rte_k32v64_hash_tailq = {
> + .name = "RTE_K32V64_HASH",
> +};
> +
> +EAL_REGISTER_TAILQ(rte_k32v64_hash_tailq);
> +
> +#define VALID_KEY_MSK ((1 << RTE_K32V64_KEYS_PER_BUCKET) - 1)
> +
> +#ifdef CC_AVX512VL_SUPPORT
> +int
> +k32v64_hash_bulk_lookup_avx512vl(struct rte_k32v64_hash_table *table,
> + uint32_t *keys, uint32_t *hashes, uint64_t *values, unsigned int n);
> +#endif
> +
> +static int
> +k32v64_hash_bulk_lookup(struct rte_k32v64_hash_table *table, uint32_t *keys,
> + uint32_t *hashes, uint64_t *values, unsigned int n)
> +{
> + int ret, cnt = 0;
> + unsigned int i;
> +
> + if (unlikely((table == NULL) || (keys == NULL) || (hashes == NULL) ||
> + (values == NULL)))
> + return -EINVAL;
> +
> + for (i = 0; i < n; i++) {
> + ret = rte_k32v64_hash_lookup(table, keys[i], hashes[i],
> + &values[i]);
> + if (ret == 0)
> + cnt++;
> + }
> + return cnt;
> +}
> +
> +static rte_k32v64_hash_bulk_lookup_t
> +get_lookup_bulk_fn(void)
> +{
> +#ifdef CC_AVX512VL_SUPPORT
> + if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512F))
> + return k32v64_hash_bulk_lookup_avx512vl;
> +#endif
> + return k32v64_hash_bulk_lookup;
> +}
> +
> +int
> +rte_k32v64_hash_add(struct rte_k32v64_hash_table *table, uint32_t key,
> + uint32_t hash, uint64_t value)
> +{
> + uint32_t bucket;
> + int i, idx, ret;
> + uint8_t msk;
> + struct rte_k32v64_ext_ent *tmp, *ent, *prev = NULL;
> +
> + if (table == NULL)
> + return -EINVAL;
> +
I think for add you also need to do update bucket.cnt
at the start/end of updates (as you do for del).
> + bucket = hash & table->bucket_msk;
> + /* Search key in table. Update value if exists */
> + for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
> + if ((key == table->t[bucket].key[i]) &&
> + (table->t[bucket].key_mask & (1 << i))) {
> + table->t[bucket].val[i] = value;
> + return 0;
> + }
> + }
> +
> + if (!SLIST_EMPTY(&table->t[bucket].head)) {
> + SLIST_FOREACH(ent, &table->t[bucket].head, next) {
> + if (ent->key == key) {
> + ent->val = value;
> + return 0;
> + }
> + }
> + }
> +
> + msk = ~table->t[bucket].key_mask & VALID_KEY_MSK;
> + if (msk) {
> + idx = __builtin_ctz(msk);
> + table->t[bucket].key[idx] = key;
> + table->t[bucket].val[idx] = value;
> + rte_smp_wmb();
> + table->t[bucket].key_mask |= 1 << idx;
> + table->nb_ent++;
> + return 0;
> + }
> +
> + ret = rte_mempool_get(table->ext_ent_pool, (void **)&ent);
> + if (ret < 0)
> + return ret;
> +
> + SLIST_NEXT(ent, next) = NULL;
> + ent->key = key;
> + ent->val = value;
> + rte_smp_wmb();
> + SLIST_FOREACH(tmp, &table->t[bucket].head, next)
> + prev = tmp;
> +
> + if (prev == NULL)
> + SLIST_INSERT_HEAD(&table->t[bucket].head, ent, next);
> + else
> + SLIST_INSERT_AFTER(prev, ent, next);
> +
> + table->nb_ent++;
> + table->nb_ext_ent++;
> + return 0;
> +}
> +
> +int
> +rte_k32v64_hash_delete(struct rte_k32v64_hash_table *table, uint32_t key,
> + uint32_t hash)
> +{
> + uint32_t bucket;
> + int i;
> + struct rte_k32v64_ext_ent *ent;
> +
> + if (table == NULL)
> + return -EINVAL;
> +
> + bucket = hash & table->bucket_msk;
> +
> + for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
> + if ((key == table->t[bucket].key[i]) &&
> + (table->t[bucket].key_mask & (1 << i))) {
> + ent = SLIST_FIRST(&table->t[bucket].head);
> + if (ent) {
> + rte_atomic32_inc(&table->t[bucket].cnt);
I know that right now rte_atomic32 uses _sync gcc builtins underneath,
so it should be safe.
But I think the proper way would be:
table->t[bucket].cnt++;
rte_smp_wmb();
or as alternative probably use C11 atomic ACQUIRE/RELEASE
> + table->t[bucket].key[i] = ent->key;
> + table->t[bucket].val[i] = ent->val;
> + SLIST_REMOVE_HEAD(&table->t[bucket].head, next);
> + rte_atomic32_inc(&table->t[bucket].cnt);
> + table->nb_ext_ent--;
> + } else
> + table->t[bucket].key_mask &= ~(1 << i);
I think you protect that update with bucket.cnt.
>From my perspective -a s a rule of thumb any update to the bucket/list
Should be within that transaction-start/transaction-end.
> + if (ent)
> + rte_mempool_put(table->ext_ent_pool, ent);
> + table->nb_ent--;
> + return 0;
> + }
> + }
> +
> + SLIST_FOREACH(ent, &table->t[bucket].head, next)
> + if (ent->key == key)
> + break;
> +
> + if (ent == NULL)
> + return -ENOENT;
> +
> + rte_atomic32_inc(&table->t[bucket].cnt);
> + SLIST_REMOVE(&table->t[bucket].head, ent, rte_k32v64_ext_ent, next);
> + rte_atomic32_inc(&table->t[bucket].cnt);
> + rte_mempool_put(table->ext_ent_pool, ent);
> +
> + table->nb_ext_ent--;
> + table->nb_ent--;
> +
> + return 0;
> +}
> +
> +struct rte_k32v64_hash_table *
> +rte_k32v64_hash_find_existing(const char *name)
> +{
> + struct rte_k32v64_hash_table *h = NULL;
> + struct rte_tailq_entry *te;
> + struct rte_k32v64_hash_list *k32v64_hash_list;
> +
> + k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
> + rte_k32v64_hash_list);
> +
> + rte_mcfg_tailq_read_lock();
> + TAILQ_FOREACH(te, k32v64_hash_list, next) {
> + h = (struct rte_k32v64_hash_table *) te->data;
> + if (strncmp(name, h->name, RTE_K32V64_HASH_NAMESIZE) == 0)
> + break;
> + }
> + rte_mcfg_tailq_read_unlock();
> + if (te == NULL) {
> + rte_errno = ENOENT;
> + return NULL;
> + }
> + return h;
> +}
> +
> +struct rte_k32v64_hash_table *
> +rte_k32v64_hash_create(const struct rte_k32v64_hash_params *params)
> +{
> + char hash_name[RTE_K32V64_HASH_NAMESIZE];
> + struct rte_k32v64_hash_table *ht = NULL;
> + struct rte_tailq_entry *te;
> + struct rte_k32v64_hash_list *k32v64_hash_list;
> + uint32_t mem_size, nb_buckets, max_ent;
> + int ret;
> + struct rte_mempool *mp;
> +
> + if ((params == NULL) || (params->name == NULL) ||
> + (params->entries == 0)) {
> + rte_errno = EINVAL;
> + return NULL;
> + }
> +
> + k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
> + rte_k32v64_hash_list);
> +
> + ret = snprintf(hash_name, sizeof(hash_name), "K32V64_%s", params->name);
> + if (ret < 0 || ret >= RTE_K32V64_HASH_NAMESIZE) {
> + rte_errno = ENAMETOOLONG;
> + return NULL;
> + }
> +
> + max_ent = rte_align32pow2(params->entries);
> + nb_buckets = max_ent / RTE_K32V64_KEYS_PER_BUCKET;
> + mem_size = sizeof(struct rte_k32v64_hash_table) +
> + sizeof(struct rte_k32v64_hash_bucket) * nb_buckets;
> +
> + mp = rte_mempool_create(hash_name, max_ent,
> + sizeof(struct rte_k32v64_ext_ent), 0, 0, NULL, NULL, NULL, NULL,
> + params->socket_id, 0);
> +
> + if (mp == NULL)
> + return NULL;
> +
> + rte_mcfg_tailq_write_lock();
> + TAILQ_FOREACH(te, k32v64_hash_list, next) {
> + ht = (struct rte_k32v64_hash_table *) te->data;
> + if (strncmp(params->name, ht->name,
> + RTE_K32V64_HASH_NAMESIZE) == 0)
> + break;
> + }
> + ht = NULL;
> + if (te != NULL) {
> + rte_errno = EEXIST;
> + rte_mempool_free(mp);
> + goto exit;
> + }
> +
> + te = rte_zmalloc("K32V64_HASH_TAILQ_ENTRY", sizeof(*te), 0);
> + if (te == NULL) {
> + RTE_LOG(ERR, HASH, "Failed to allocate tailq entry\n");
> + rte_mempool_free(mp);
> + goto exit;
> + }
> +
> + ht = rte_zmalloc_socket(hash_name, mem_size,
> + RTE_CACHE_LINE_SIZE, params->socket_id);
> + if (ht == NULL) {
> + RTE_LOG(ERR, HASH, "Failed to allocate fbk hash table\n");
> + rte_free(te);
> + rte_mempool_free(mp);
> + goto exit;
> + }
> +
> + memcpy(ht->name, hash_name, sizeof(ht->name));
> + ht->max_ent = max_ent;
> + ht->bucket_msk = nb_buckets - 1;
> + ht->ext_ent_pool = mp;
> + ht->lookup = get_lookup_bulk_fn();
> +
> + te->data = (void *)ht;
> + TAILQ_INSERT_TAIL(k32v64_hash_list, te, next);
> +
> +exit:
> + rte_mcfg_tailq_write_unlock();
> +
> + return ht;
> +}
> +
> +void
> +rte_k32v64_hash_free(struct rte_k32v64_hash_table *ht)
> +{
> + struct rte_tailq_entry *te;
> + struct rte_k32v64_hash_list *k32v64_hash_list;
> +
> + if (ht == NULL)
> + return;
> +
> + k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
> + rte_k32v64_hash_list);
> +
> + rte_mcfg_tailq_write_lock();
> +
> + /* find out tailq entry */
> + TAILQ_FOREACH(te, k32v64_hash_list, next) {
> + if (te->data == (void *) ht)
> + break;
> + }
> +
> +
> + if (te == NULL) {
> + rte_mcfg_tailq_write_unlock();
> + return;
> + }
> +
> + TAILQ_REMOVE(k32v64_hash_list, te, next);
> +
> + rte_mcfg_tailq_write_unlock();
> +
> + rte_mempool_free(ht->ext_ent_pool);
> + rte_free(ht);
> + rte_free(te);
> +}
> diff --git a/lib/librte_hash/rte_k32v64_hash.h b/lib/librte_hash/rte_k32v64_hash.h
> new file mode 100644
> index 0000000..b2c52e9
> --- /dev/null
> +++ b/lib/librte_hash/rte_k32v64_hash.h
> @@ -0,0 +1,211 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2020 Intel Corporation
> + */
> +
> +#ifndef _RTE_K32V64_HASH_H_
> +#define _RTE_K32V64_HASH_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_compat.h>
> +#include <rte_atomic.h>
> +#include <rte_mempool.h>
> +
> +#define RTE_K32V64_HASH_NAMESIZE 32
> +#define RTE_K32V64_KEYS_PER_BUCKET 4
> +#define RTE_K32V64_WRITE_IN_PROGRESS 1
> +
> +struct rte_k32v64_hash_params {
> + const char *name;
> + uint32_t entries;
> + int socket_id;
> +};
> +
> +struct rte_k32v64_ext_ent {
> + SLIST_ENTRY(rte_k32v64_ext_ent) next;
> + uint32_t key;
> + uint64_t val;
> +};
> +
> +struct rte_k32v64_hash_bucket {
> + uint32_t key[RTE_K32V64_KEYS_PER_BUCKET];
> + uint64_t val[RTE_K32V64_KEYS_PER_BUCKET];
> + uint8_t key_mask;
> + rte_atomic32_t cnt;
> + SLIST_HEAD(rte_k32v64_list_head, rte_k32v64_ext_ent) head;
> +} __rte_cache_aligned;
> +
> +struct rte_k32v64_hash_table;
> +
> +typedef int (*rte_k32v64_hash_bulk_lookup_t)
> +(struct rte_k32v64_hash_table *table, uint32_t *keys, uint32_t *hashes,
> + uint64_t *values, unsigned int n);
> +
> +struct rte_k32v64_hash_table {
> + char name[RTE_K32V64_HASH_NAMESIZE]; /**< Name of the hash. */
> + uint32_t nb_ent; /**< Number of entities in the table*/
> + uint32_t nb_ext_ent; /**< Number of extended entities */
> + uint32_t max_ent; /**< Maximum number of entities */
> + uint32_t bucket_msk;
> + struct rte_mempool *ext_ent_pool;
> + rte_k32v64_hash_bulk_lookup_t lookup;
> + __extension__ struct rte_k32v64_hash_bucket t[0];
> +};
> +
> +typedef int (*rte_k32v64_cmp_fn_t)
> +(struct rte_k32v64_hash_bucket *bucket, uint32_t key, uint64_t *val);
> +
> +static inline int
> +__k32v64_cmp_keys(struct rte_k32v64_hash_bucket *bucket, uint32_t key,
> + uint64_t *val)
> +{
> + int i;
> +
> + for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
> + if ((key == bucket->key[i]) &&
> + (bucket->key_mask & (1 << i))) {
> + *val = bucket->val[i];
> + return 1;
> + }
> + }
> +
> + return 0;
> +}
> +
> +static inline int
> +__k32v64_hash_lookup(struct rte_k32v64_hash_table *table, uint32_t key,
> + uint32_t hash, uint64_t *value, rte_k32v64_cmp_fn_t cmp_f)
> +{
> + uint64_t val = 0;
> + struct rte_k32v64_ext_ent *ent;
> + int32_t cnt;
> + int found = 0;
> + uint32_t bucket = hash & table->bucket_msk;
> +
> + do {
> + do
> + cnt = rte_atomic32_read(&table->t[bucket].cnt);
> + while (unlikely(cnt & RTE_K32V64_WRITE_IN_PROGRESS));
> +
> + found = cmp_f(&table->t[bucket], key, &val);
> + if (unlikely((found == 0) &&
> + (!SLIST_EMPTY(&table->t[bucket].head)))) {
> + SLIST_FOREACH(ent, &table->t[bucket].head, next) {
> + if (ent->key == key) {
> + val = ent->val;
> + found = 1;
> + break;
> + }
> + }
> + }
> +
> + } while (unlikely(cnt != rte_atomic32_read(&table->t[bucket].cnt)));
AFAIK atomic32_read is just a normal read op, so it can be reordered with other ops.
So this construction doesn't protect you from races.
What you probably need here:
do {
cnt1 = table->t[bucket].cnt;
rte_smp_rmb();
....
rte_smp_rmb();
cnt2 = table->t[bucket].cnt;
while (cnt1 != cnt2 || (cnt1 & RTE_K32V64_WRITE_IN_PROGRESS) != 0)
> +
> + if (found == 1) {
> + *value = val;
> + return 0;
> + } else
> + return -ENOENT;
> +}
> +
More information about the dev
mailing list