[PATCH v2] rwlock: prevent readers from starving writers

Morten Brørup mb at smartsharesystems.com
Tue Jul 19 23:52:54 CEST 2022


> From: Stephen Hemminger [mailto:stephen at networkplumber.org]
> Sent: Tuesday, 19 July 2022 22.28
> 
> Modify reader/writer lock to avoid starvation of writer.  The previous
> implementation would cause a writer to get starved if readers kept
> acquiring the lock.  The new version uses an additional bit to indicate
> that a writer is waiting and which keeps readers from starving the
> writer.
> 
> Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
> Acked-by: Morten Brørup <mb at smartsharesystems.com>
> ---
> v2 - incorporate feedback, change from RFC to PATCH
> 
>  lib/eal/include/generic/rte_rwlock.h | 119 +++++++++++++++++++--------
>  1 file changed, 83 insertions(+), 36 deletions(-)
> 
> diff --git a/lib/eal/include/generic/rte_rwlock.h
> b/lib/eal/include/generic/rte_rwlock.h
> index da9bc3e9c0e2..59ec54110444 100644
> --- a/lib/eal/include/generic/rte_rwlock.h
> +++ b/lib/eal/include/generic/rte_rwlock.h
> @@ -15,23 +15,46 @@
>   * one writer. All readers are blocked until the writer is finished
>   * writing.
>   *
> + * This version does not give preference to readers or writers
> + * and does not starve either readers or writers.
> + *
> + * See also:
> + *  https://locklessinc.com/articles/locks/
>   */
> 
>  #ifdef __cplusplus
>  extern "C" {
>  #endif
> 
> +#include <rte_branch_prediction.h>
>  #include <rte_common.h>
> -#include <rte_atomic.h>
>  #include <rte_pause.h>
> 
>  /**
>   * The rte_rwlock_t type.
>   *
> - * cnt is -1 when write lock is held, and > 0 when read locks are
> held.
> + * Readers increment the counter by RTE_RWLOCK_READ (4)
> + * Writers set the RTE_RWLOCK_WRITE bit when lock is held
> + *     and set the RTE_RWLOCK_WAIT bit while waiting.
> + *
> + * 31                 2 1 0
> + * +-------------------+-+-+
> + * |  readers          | | |
> + * +-------------------+-+-+
> + *                      ^ ^
> + *                      | |
> + * WRITE: lock held ----/ |
> + * WAIT: writer pending --/
>   */
> +
> +#define RTE_RWLOCK_WAIT	 0x1	/* Writer is waiting */
> +#define RTE_RWLOCK_WRITE 0x2	/* Writer has the lock */
> +#define RTE_RWLOCK_MASK  (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)
> +				/* Writer is waiting or has lock */
> +#define RTE_RWLOCK_READ	 0x4	/* Reader increment */
> +
>  typedef struct {
> -	volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks
> held. */
> +	int32_t cnt;
>  } rte_rwlock_t;
> 
>  /**
> @@ -61,17 +84,24 @@ static inline void
>  rte_rwlock_read_lock(rte_rwlock_t *rwl)
>  {
>  	int32_t x;
> -	int success = 0;
> 
> -	while (success == 0) {
> -		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> -		/* write lock is held */
> -		if (x < 0) {
> +	while (1) {
> +		/* Wait while writer is present or pending */
> +		while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED)
> +		       & RTE_RWLOCK_MASK)
>  			rte_pause();
> -			continue;
> -		}
> -		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1,
> 1,
> -					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
> +
> +		/* Try to get read lock */
> +		x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> +				       __ATOMIC_ACQUIRE);
> +
> +		/* If no writer, then acquire was successful */
> +		if (likely(!(x & RTE_RWLOCK_MASK)))
> +			return;
> +
> +		/* Lost race with writer, backout the change. */
> +		__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> +				   __ATOMIC_RELAXED);
>  	}
>  }
> 
> @@ -93,17 +123,24 @@ static inline int
>  rte_rwlock_read_trylock(rte_rwlock_t *rwl)
>  {
>  	int32_t x;
> -	int success = 0;
> 
> -	while (success == 0) {
> -		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> -		/* write lock is held */
> -		if (x < 0)
> -			return -EBUSY;
> -		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1,
> 1,
> -					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
> -	}
> +	x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> +
> +	/* fail if write lock is held or writer is pending */
> +	if (x & RTE_RWLOCK_MASK)
> +		return -EBUSY;
> 
> +	/* Try to get read lock */
> +	x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> +			       __ATOMIC_ACQUIRE);
> +
> +	/* Back out if writer raced in */
> +	if (unlikely(x & RTE_RWLOCK_MASK)) {
> +		__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> +				   __ATOMIC_RELEASE);
> +
> +		return -EBUSY;
> +	}
>  	return 0;
>  }
> 
> @@ -116,7 +153,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl)
>  static inline void
>  rte_rwlock_read_unlock(rte_rwlock_t *rwl)
>  {
> -	__atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
> +	__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, __ATOMIC_RELEASE);
>  }
> 
>  /**
> @@ -139,11 +176,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
>  	int32_t x;
> 
>  	x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> -	if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
> -			      __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
> +	if (x < RTE_RWLOCK_WRITE &&

"x < RTE_RWLOCK_WRITE" will permit this writher thread to race a waiting writer thread, while the waiting writer thread is executing rte_pause(). Have you considered "!x" instead, giving priority to the waiting thread?

I suppose your solution is better, because we know that this writer thread is actively running, while the waiting writer thread may have been put on hold by the O/S scheduler.

> +	    __atomic_compare_exchange_n(&rwl->cnt, &x, x +
> RTE_RWLOCK_WRITE,

Only a matter of taste, but I would prefer "x | RTE_RWLOCK_WRITE" over "x + RTE_RWLOCK_WRITE". You can leave it as is.

> +					1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
> +		return 0;
> +	else
>  		return -EBUSY;
> -
> -	return 0;
>  }
> 
>  /**
> @@ -156,18 +194,27 @@ static inline void
>  rte_rwlock_write_lock(rte_rwlock_t *rwl)
>  {
>  	int32_t x;
> -	int success = 0;
> 
> -	while (success == 0) {
> +	while (1) {
>  		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> -		/* a lock is held */
> -		if (x != 0) {
> -			rte_pause();
> -			continue;
> +
> +		/* No readers or writers? */
> +		if (likely(x < RTE_RWLOCK_WRITE)) {
> +			/* Turn off RTE_RWLOCK_WAIT, turn on RTE_RWLOCK_WRITE
> */
> +			if (__atomic_compare_exchange_n(&rwl->cnt, &x,
> RTE_RWLOCK_WRITE, 1,
> +							__ATOMIC_ACQUIRE,
> __ATOMIC_RELAXED))
> +				return;

See comment below; this is the point I refer to as the "next race".

>  		}
> -		success = __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
> -					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
> -	}
> +
> +		/* Turn on writer wait bit */
> +		if (!(x & RTE_RWLOCK_WAIT))
> +			__atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT,
> __ATOMIC_RELAXED);

Is there a risk of race with two writer threads at this location?

If a reader is active, and two writer threads reach this point simultaneously, they will both set RTE_RWLOCK_WAIT here. And then, when the reader thread is done, one of the writer thread will win the next race and replace RTE_RWLOCK_WAIT by RTE_RWLOCK_WRITE. The winning thread will then do its job and afterwards clear RTE_RWLOCK_WRITE.
This means that both RTE_RWLOCK_WAIT and RTE_RWLOCK_WRITE have been cleared, but RTE_RWLOCK_WAIT should remain set for the writer thread that lost the race.

Did I miss something?

It does work with only one writer thread, though.

> +
> +		/* Wait until no readers befor trying again */

Typo: befor -> before.

> +		while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) >
> RTE_RWLOCK_WAIT)
> +			rte_pause();
> +
> +    }
>  }
> 
>  /**
> @@ -179,7 +226,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl)
>  static inline void
>  rte_rwlock_write_unlock(rte_rwlock_t *rwl)
>  {
> -	__atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
> +	__atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE,
> __ATOMIC_RELEASE);

Yes. This is correct, regardless if another writer thread is waiting or not. (Reviewed for one writer thread using rte_rwlock_write_lock() and another using rte_rwlock_write_trylock().)

>  }
> 
>  /**
> --
> 2.35.1
> 

The comments in this version are good too.



More information about the dev mailing list