[RFC] rwlock: prevent readers from starving writers
Honnappa Nagarahalli
Honnappa.Nagarahalli at arm.com
Fri Jul 8 21:22:18 CEST 2022
<snip>
>
> The original reader/writer lock in DPDK can cause a stream of readers to
> starve writers.
>
> The new version uses an additional bit to indicate that a writer is waiting and
> which keeps readers from starving the writer.
This addition makes sense.
I am wondering if we should create a new lock. Is it possible that some applications are dependent on the current behavior?
>
> Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
> ---
> Would like this to be in 22.11, but needs some more review
>
> lib/eal/include/generic/rte_rwlock.h | 93 ++++++++++++++++++----------
> 1 file changed, 61 insertions(+), 32 deletions(-)
>
> diff --git a/lib/eal/include/generic/rte_rwlock.h
> b/lib/eal/include/generic/rte_rwlock.h
> index da9bc3e9c0e2..725cd19ffb27 100644
> --- a/lib/eal/include/generic/rte_rwlock.h
> +++ b/lib/eal/include/generic/rte_rwlock.h
> @@ -13,7 +13,7 @@
> * This file defines an API for read-write locks. The lock is used to
> * protect data that allows multiple readers in parallel, but only
> * one writer. All readers are blocked until the writer is finished
> - * writing.
> + * writing. This version will not starve writers.
> *
> */
>
> @@ -28,10 +28,17 @@ extern "C" {
> /**
> * The rte_rwlock_t type.
> *
> - * cnt is -1 when write lock is held, and > 0 when read locks are held.
> + * Readers increment the counter by RW_READ (4)
> + * Writers set the RWLOCK_WRITE bit when lock is held
> + * and set the RWLOCK_WAIT bit while waiting.
> */
> +
> +#define RTE_RWLOCK_WAIT 0x1 /* Writer is waiting */
> +#define RTE_RWLOCK_WRITE 0x2 /* Writer has the lock */
> +#define RTE_RWLOCK_READ 0x4 /* Reader increment */
> +
> typedef struct {
> - volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held.
> */
> + volatile int32_t cnt;
> } rte_rwlock_t;
>
> /**
> @@ -61,17 +68,24 @@ static inline void
> rte_rwlock_read_lock(rte_rwlock_t *rwl) {
> int32_t x;
> - int success = 0;
>
> - while (success == 0) {
> + while (1) {
> x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> /* write lock is held */
> - if (x < 0) {
> + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) {
> rte_pause();
> continue;
> }
> - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x
> + 1, 1,
> - __ATOMIC_ACQUIRE,
> __ATOMIC_RELAXED);
> +
> + /* Try to get read lock */
> + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> + __ATOMIC_ACQUIRE);
> + if (!(x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)))
> + return;
> +
> + /* Undo */
> + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> + __ATOMIC_RELEASE);
> }
> }
>
> @@ -93,17 +107,23 @@ static inline int
> rte_rwlock_read_trylock(rte_rwlock_t *rwl) {
> int32_t x;
> - int success = 0;
>
> - while (success == 0) {
> - x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> - /* write lock is held */
> - if (x < 0)
> - return -EBUSY;
> - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x
> + 1, 1,
> - __ATOMIC_ACQUIRE,
> __ATOMIC_RELAXED);
> - }
> + x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> +
> + /* write lock is held */
> + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE))
> + return -EBUSY;
> +
> + /* Try to get read lock */
> + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> + __ATOMIC_ACQUIRE);
> +
> + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) {
> + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> + __ATOMIC_RELEASE);
>
> + return -EBUSY;
> + }
> return 0;
> }
>
> @@ -116,7 +136,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl) static
> inline void rte_rwlock_read_unlock(rte_rwlock_t *rwl) {
> - __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
> + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> __ATOMIC_RELEASE);
> }
>
> /**
> @@ -139,11 +159,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
> int32_t x;
>
> x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> - if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
> - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
> + if (x < RTE_RWLOCK_WRITE &&
> + __atomic_compare_exchange_n(&rwl->cnt, &x, x +
> RTE_RWLOCK_WRITE,
> + 1, __ATOMIC_ACQUIRE,
> __ATOMIC_RELAXED))
> + return 0;
> + else
> return -EBUSY;
> -
> - return 0;
> }
>
> /**
> @@ -156,18 +177,26 @@ static inline void
> rte_rwlock_write_lock(rte_rwlock_t *rwl) {
> int32_t x;
> - int success = 0;
>
> - while (success == 0) {
> + while (1) {
> x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> - /* a lock is held */
> - if (x != 0) {
> - rte_pause();
> - continue;
> +
> + /* No readers or writers */
> + if (x < RTE_RWLOCK_WRITE) {
> + /* Turn off RTE_RWLOCK_WAIT, turn on
> RTE_RWLOCK_WRITE */
> + if (__atomic_compare_exchange_n(&rwl->cnt, &x,
> RTE_RWLOCK_WRITE, 1,
> +
> __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
> + return;
> }
> - success = __atomic_compare_exchange_n(&rwl->cnt, &x, -
> 1, 1,
> - __ATOMIC_ACQUIRE,
> __ATOMIC_RELAXED);
> - }
> +
> + /* Turn on writer wait bit */
> + if (!(x & RTE_RWLOCK_WAIT))
> + __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT,
> __ATOMIC_RELAXED);
> +
> + /* Wait until can try to take the lock */
> + while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) >
> RTE_RWLOCK_WAIT)
> + rte_pause();
> + }
> }
>
> /**
> @@ -179,7 +208,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl) static
> inline void rte_rwlock_write_unlock(rte_rwlock_t *rwl) {
> - __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
> + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE,
> __ATOMIC_RELEASE);
> }
>
> /**
> --
> 2.35.1
More information about the dev
mailing list