[PATCH v2] rwlock: prevent readers from starving writers
Stephen Hemminger
stephen at networkplumber.org
Tue Jul 19 22:27:50 CEST 2022
Modify reader/writer lock to avoid starvation of writer. The previous
implementation would cause a writer to get starved if readers kept
acquiring the lock. The new version uses an additional bit to indicate
that a writer is waiting and which keeps readers from starving the
writer.
Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
Acked-by: Morten Brørup <mb at smartsharesystems.com>
---
v2 - incorporate feedback, change from RFC to PATCH
lib/eal/include/generic/rte_rwlock.h | 119 +++++++++++++++++++--------
1 file changed, 83 insertions(+), 36 deletions(-)
diff --git a/lib/eal/include/generic/rte_rwlock.h b/lib/eal/include/generic/rte_rwlock.h
index da9bc3e9c0e2..59ec54110444 100644
--- a/lib/eal/include/generic/rte_rwlock.h
+++ b/lib/eal/include/generic/rte_rwlock.h
@@ -15,23 +15,46 @@
* one writer. All readers are blocked until the writer is finished
* writing.
*
+ * This version does not give preference to readers or writers
+ * and does not starve either readers or writers.
+ *
+ * See also:
+ * https://locklessinc.com/articles/locks/
*/
#ifdef __cplusplus
extern "C" {
#endif
+#include <rte_branch_prediction.h>
#include <rte_common.h>
-#include <rte_atomic.h>
#include <rte_pause.h>
/**
* The rte_rwlock_t type.
*
- * cnt is -1 when write lock is held, and > 0 when read locks are held.
+ * Readers increment the counter by RTE_RWLOCK_READ (4)
+ * Writers set the RTE_RWLOCK_WRITE bit when lock is held
+ * and set the RTE_RWLOCK_WAIT bit while waiting.
+ *
+ * 31 2 1 0
+ * +-------------------+-+-+
+ * | readers | | |
+ * +-------------------+-+-+
+ * ^ ^
+ * | |
+ * WRITE: lock held ----/ |
+ * WAIT: writer pending --/
*/
+
+#define RTE_RWLOCK_WAIT 0x1 /* Writer is waiting */
+#define RTE_RWLOCK_WRITE 0x2 /* Writer has the lock */
+#define RTE_RWLOCK_MASK (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)
+ /* Writer is waiting or has lock */
+#define RTE_RWLOCK_READ 0x4 /* Reader increment */
+
typedef struct {
- volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held. */
+ int32_t cnt;
} rte_rwlock_t;
/**
@@ -61,17 +84,24 @@ static inline void
rte_rwlock_read_lock(rte_rwlock_t *rwl)
{
int32_t x;
- int success = 0;
- while (success == 0) {
- x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
- /* write lock is held */
- if (x < 0) {
+ while (1) {
+ /* Wait while writer is present or pending */
+ while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED)
+ & RTE_RWLOCK_MASK)
rte_pause();
- continue;
- }
- success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
- __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
+
+ /* Try to get read lock */
+ x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+ __ATOMIC_ACQUIRE);
+
+ /* If no writer, then acquire was successful */
+ if (likely(!(x & RTE_RWLOCK_MASK)))
+ return;
+
+ /* Lost race with writer, backout the change. */
+ __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+ __ATOMIC_RELAXED);
}
}
@@ -93,17 +123,24 @@ static inline int
rte_rwlock_read_trylock(rte_rwlock_t *rwl)
{
int32_t x;
- int success = 0;
- while (success == 0) {
- x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
- /* write lock is held */
- if (x < 0)
- return -EBUSY;
- success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
- __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
- }
+ x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
+
+ /* fail if write lock is held or writer is pending */
+ if (x & RTE_RWLOCK_MASK)
+ return -EBUSY;
+ /* Try to get read lock */
+ x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+ __ATOMIC_ACQUIRE);
+
+ /* Back out if writer raced in */
+ if (unlikely(x & RTE_RWLOCK_MASK)) {
+ __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+ __ATOMIC_RELEASE);
+
+ return -EBUSY;
+ }
return 0;
}
@@ -116,7 +153,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl)
static inline void
rte_rwlock_read_unlock(rte_rwlock_t *rwl)
{
- __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
+ __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, __ATOMIC_RELEASE);
}
/**
@@ -139,11 +176,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
int32_t x;
x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
- if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
- __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
+ if (x < RTE_RWLOCK_WRITE &&
+ __atomic_compare_exchange_n(&rwl->cnt, &x, x + RTE_RWLOCK_WRITE,
+ 1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
+ return 0;
+ else
return -EBUSY;
-
- return 0;
}
/**
@@ -156,18 +194,27 @@ static inline void
rte_rwlock_write_lock(rte_rwlock_t *rwl)
{
int32_t x;
- int success = 0;
- while (success == 0) {
+ while (1) {
x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
- /* a lock is held */
- if (x != 0) {
- rte_pause();
- continue;
+
+ /* No readers or writers? */
+ if (likely(x < RTE_RWLOCK_WRITE)) {
+ /* Turn off RTE_RWLOCK_WAIT, turn on RTE_RWLOCK_WRITE */
+ if (__atomic_compare_exchange_n(&rwl->cnt, &x, RTE_RWLOCK_WRITE, 1,
+ __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
+ return;
}
- success = __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
- __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
- }
+
+ /* Turn on writer wait bit */
+ if (!(x & RTE_RWLOCK_WAIT))
+ __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT, __ATOMIC_RELAXED);
+
+ /* Wait until no readers befor trying again */
+ while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > RTE_RWLOCK_WAIT)
+ rte_pause();
+
+ }
}
/**
@@ -179,7 +226,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl)
static inline void
rte_rwlock_write_unlock(rte_rwlock_t *rwl)
{
- __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
+ __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE, __ATOMIC_RELEASE);
}
/**
--
2.35.1
More information about the dev
mailing list