[dpdk-dev] [PATCH] event/opdl: fix atomic queue race condition issue
    Liang Ma 
    liang.j.ma at intel.com
       
    Tue Mar 13 12:34:13 CET 2018
    
    
  
If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.
Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
Signed-off-by: Liang Ma <liang.j.ma at intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy at intel.com>
---
 drivers/event/opdl/opdl_evdev_init.c |  3 ++
 drivers/event/opdl/opdl_ring.c       | 95 +++++++++++++++++++++++++-----------
 drivers/event/opdl/opdl_ring.h       | 16 +++++-
 3 files changed, 85 insertions(+), 29 deletions(-)
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
index 1454de5..582ad69 100644
--- a/drivers/event/opdl/opdl_evdev_init.c
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
 				queue->ports[queue->nb_ports] = port;
 				port->instance_id = queue->nb_ports;
 				queue->nb_ports++;
+				opdl_stage_set_queue_id(stage_inst,
+						port->queue_id);
+
 			} else if (queue->q_pos == OPDL_Q_POS_END) {
 
 				/* tx port  */
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
index eca7712..0f40d31 100644
--- a/drivers/event/opdl/opdl_ring.c
+++ b/drivers/event/opdl/opdl_ring.c
@@ -25,7 +25,10 @@
 #define OPDL_NAME_SIZE 64
 
 
-#define OPDL_EVENT_MASK  (0xFFFF0000000FFFFFULL)
+#define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
+#define OPDL_FLOWID_MASK (0xFFFFF)
+#define OPDL_OPA_MASK    (0xFF)
+#define OPDL_OPA_OFFSET  (0x38)
 
 int opdl_logtype_driver;
 
@@ -86,7 +89,6 @@ struct opdl_stage {
 	 */
 	uint32_t available_seq;
 	uint32_t head;  /* Current head for single-thread operation */
-	uint32_t shadow_head;  /* Shadow head for single-thread operation */
 	uint32_t nb_instance;  /* Number of instances */
 	uint32_t instance_id;  /* ID of this stage instance */
 	uint16_t num_claimed;  /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
 	/* For managing disclaims in multi-threaded processing stages */
 	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
 					       __rte_cache_aligned;
+	uint32_t shadow_head;  /* Shadow head for single-thread operation */
+	uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
+	uint32_t pos;		/* Atomic scan position */
 } __rte_cache_aligned;
 
 /* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 {
 	uint32_t i = 0, j = 0,  offset;
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 	void *get_slots;
 	struct rte_event *ev;
 	RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 
 		for (j = 0; j < num_entries; j++) {
 			ev = (struct rte_event *)get_slot(t, s->head+j);
-			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+			event  = __atomic_load_n(&(ev->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK&event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id%s->nb_instance) == s->instance_id) {
 				memcpy(entries_offset, ev, t->slot_size);
 				entries_offset += t->slot_size;
 				i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 	s->head += num_entries;
 	s->num_claimed = num_entries;
 	s->num_event = i;
+	s->pos = 0;
 
 	/* automatically disclaim entries if number of rte_events is zero */
 	if (unlikely(i == 0))
@@ -953,14 +972,19 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
 }
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic)
 {
-	uint32_t i = 0, j = 0, offset;
+	uint32_t i = 0, offset;
 	struct opdl_ring *t = s->t;
 	struct rte_event *ev_orig = NULL;
 	bool ev_updated = false;
-	uint64_t  ev_temp = 0;
+	uint64_t ev_temp    = 0;
+	uint64_t ev_update  = 0;
+
+	uint32_t opa_id   = 0;
+	uint32_t flow_id  = 0;
+	uint64_t event    = 0;
 
 	if (index > s->num_event) {
 		PMD_DRV_LOG(ERR, "index is overflow");
@@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
 		}
 
 	} else {
-		for (i = 0; i < s->num_claimed; i++) {
+		for (i = s->pos; i < s->num_claimed; i++) {
 			ev_orig = (struct rte_event *)
 				get_slot(t, s->shadow_head+i);
 
-			if ((ev_orig->flow_id%s->nb_instance) ==
-					s->instance_id) {
-
-				if (j == index) {
-					if ((ev_orig->event&OPDL_EVENT_MASK) !=
-							ev_temp) {
-						ev_orig->event = ev->event;
-						ev_updated = true;
-					}
-					if (ev_orig->u64 != ev->u64) {
-						ev_orig->u64 = ev->u64;
-						ev_updated = true;
-					}
-
-					break;
+			event  = __atomic_load_n(&(ev_orig->event),
+					__ATOMIC_ACQUIRE);
+
+			opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
+			flow_id  = OPDL_FLOWID_MASK&event;
+
+			if (opa_id >= s->queue_id)
+				continue;
+
+			if ((flow_id%s->nb_instance) == s->instance_id) {
+				ev_update = s->queue_id;
+				ev_update = (ev_update<<OPDL_OPA_OFFSET)
+					|ev->event;
+
+				s->pos = i+1;
+
+				if ((event&OPDL_EVENT_MASK) !=
+						ev_temp) {
+					__atomic_store_n(&(ev_orig->event),
+							ev_update,
+							__ATOMIC_RELEASE);
+					ev_updated = true;
 				}
-				j++;
+				if (ev_orig->u64 != ev->u64) {
+					ev_orig->u64 = ev->u64;
+					ev_updated = true;
+				}
+
+				break;
 			}
 		}
 
@@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
 			return -EINVAL;
 		}
 	}
-	if (num_deps > t->num_stages) {
-		PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
-				num_deps, t->num_stages);
-		return -EINVAL;
-	}
+
 	return 0;
 }
 
@@ -1154,6 +1186,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
 }
 
 void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id)
+{
+	s->queue_id = queue_id;
+}
+
+void
 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
 {
 	uint32_t i;
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
index 9e8c33e..751a59d 100644
--- a/drivers/event/opdl/opdl_ring.h
+++ b/drivers/event/opdl/opdl_ring.h
@@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
 struct opdl_stage *
 opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
 
+
+/**
+ * Set the internal queue id for each stage instance.
+ *
+ * @param s
+ *   The pointer of  stage instance.
+ *
+ * @param queue_id
+ *    The value of internal queue id.
+ */
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+		uint32_t queue_id);
+
 /**
  * Prints information on opdl_ring instance and all its stages
  *
@@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
  */
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 		uint32_t index, bool atomic);
 
 #ifdef __cplusplus
-- 
2.7.5
    
    
More information about the dev
mailing list