[dpdk-dev] [PATCH v9 18/23] event/dlb2: add dequeue and its burst variants

Timothy McDaniel timothy.mcdaniel at intel.com
Sat Oct 31 18:26:16 CET 2020


Add support for dequeue, dequeue_burst, ...

DLB2 does not currently support interrupts, but instead use
umonitor/umwait if supported by the processor. This allows
the software to monitor and wait on writes to a cache-line.

DLB2 supports normal and sparse cq mode. In normal mode the
hardware will pack 4 QEs into each cache line. In sparse cq
mode, the hardware will only populate one QE per cache line.
Software must be aware of the cq mode, and take the appropriate
actions, based on the mode.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel at intel.com>
Reviewed-by: Gage Eads <gage.eads at intel.com>
---
 doc/guides/eventdevs/dlb2.rst |  21 ++
 drivers/event/dlb2/dlb2.c     | 783 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 804 insertions(+)

diff --git a/doc/guides/eventdevs/dlb2.rst b/doc/guides/eventdevs/dlb2.rst
index aa8bf01..b9f57fd 100644
--- a/doc/guides/eventdevs/dlb2.rst
+++ b/doc/guides/eventdevs/dlb2.rst
@@ -314,6 +314,27 @@ The PMD does not support the following configuration sequences:
 This sequence is not supported because the event device must be reconfigured
 before its ports or queues can be.
 
+Deferred Scheduling
+~~~~~~~~~~~~~~~~~~~
+
+The DLB2 PMD's default behavior for managing a CQ is to "pop" the CQ once per
+dequeued event before returning from rte_event_dequeue_burst(). This frees the
+corresponding entries in the CQ, which enables the DLB2 to schedule more events
+to it.
+
+To support applications seeking finer-grained scheduling control -- for example
+deferring scheduling to get the best possible priority scheduling and
+load-balancing -- the PMD supports a deferred scheduling mode. In this mode,
+the CQ entry is not popped until the *subsequent* rte_event_dequeue_burst()
+call. This mode only applies to load-balanced event ports with dequeue depth of
+1.
+
+To enable deferred scheduling, use the defer_sched vdev argument like so:
+
+    .. code-block:: console
+
+       --vdev=dlb1_event,defer_sched=on
+
 Atomic Inflights Allocation
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
index 4f5cf25..7cd227f 100644
--- a/drivers/event/dlb2/dlb2.c
+++ b/drivers/event/dlb2/dlb2.c
@@ -26,6 +26,7 @@
 #include <rte_log.h>
 #include <rte_malloc.h>
 #include <rte_mbuf.h>
+#include <rte_power_intrinsics.h>
 #include <rte_prefetch.h>
 #include <rte_ring.h>
 #include <rte_string_fns.h>
@@ -2227,6 +2228,32 @@ dlb2_pp_write(struct dlb2_enqueue_qe *qe4,
 	dlb2_movdir64b(port_data->pp_addr, qe4);
 }
 
+static inline int
+dlb2_consume_qe_immediate(struct dlb2_port *qm_port, int num)
+{
+	struct process_local_port_data *port_data;
+	struct dlb2_cq_pop_qe *qe;
+
+	RTE_ASSERT(qm_port->config_state == DLB2_CONFIGURED);
+
+	qe = qm_port->consume_qe;
+
+	qe->tokens = num - 1;
+
+	/* No store fence needed since no pointer is being sent, and CQ token
+	 * pops can be safely reordered with other HCWs.
+	 */
+	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
+
+	dlb2_movntdq_single(port_data->pp_addr, qe);
+
+	DLB2_LOG_DBG("dlb2: consume immediate - %d QEs\n", num);
+
+	qm_port->owed_tokens = 0;
+
+	return 0;
+}
+
 static inline void
 dlb2_hw_do_enqueue(struct dlb2_port *qm_port,
 		   bool do_sfence,
@@ -2623,9 +2650,756 @@ dlb2_event_enqueue_forward_burst(void *event_port,
 	return dlb2_event_enqueue_burst(event_port, events, num);
 }
 
+static inline void
+dlb2_port_credits_inc(struct dlb2_port *qm_port, int num)
+{
+	uint32_t batch_size = DLB2_SW_CREDIT_BATCH_SZ;
+
+	/* increment port credits, and return to pool if exceeds threshold */
+	if (!qm_port->is_directed) {
+		qm_port->cached_ldb_credits += num;
+		if (qm_port->cached_ldb_credits >= 2 * batch_size) {
+			__atomic_fetch_add(
+				qm_port->credit_pool[DLB2_LDB_QUEUE],
+				batch_size, __ATOMIC_SEQ_CST);
+			qm_port->cached_ldb_credits -= batch_size;
+		}
+	} else {
+		qm_port->cached_dir_credits += num;
+		if (qm_port->cached_dir_credits >= 2 * batch_size) {
+			__atomic_fetch_add(
+				qm_port->credit_pool[DLB2_DIR_QUEUE],
+				batch_size, __ATOMIC_SEQ_CST);
+			qm_port->cached_dir_credits -= batch_size;
+		}
+	}
+}
+
+static inline int
+dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
+		  struct dlb2_eventdev_port *ev_port,
+		  struct dlb2_port *qm_port,
+		  uint64_t timeout,
+		  uint64_t start_ticks)
+{
+	struct process_local_port_data *port_data;
+	uint64_t elapsed_ticks;
+
+	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
+
+	elapsed_ticks = rte_get_timer_cycles() - start_ticks;
+
+	/* Wait/poll time expired */
+	if (elapsed_ticks >= timeout) {
+		return 1;
+	} else if (dlb2->umwait_allowed) {
+		volatile struct dlb2_dequeue_qe *cq_base;
+		union {
+			uint64_t raw_qe[2];
+			struct dlb2_dequeue_qe qe;
+		} qe_mask;
+		uint64_t expected_value;
+		volatile uint64_t *monitor_addr;
+
+		qe_mask.qe.cq_gen = 1; /* set mask */
+
+		cq_base = port_data->cq_base;
+		monitor_addr = (volatile uint64_t *)(volatile void *)
+			&cq_base[qm_port->cq_idx];
+		monitor_addr++; /* cq_gen bit is in second 64bit location */
+
+		if (qm_port->gen_bit)
+			expected_value = qe_mask.raw_qe[1];
+		else
+			expected_value = 0;
+
+		rte_power_monitor(monitor_addr, expected_value,
+				  qe_mask.raw_qe[1], timeout + start_ticks,
+				  sizeof(uint64_t));
+
+		DLB2_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
+	} else {
+		uint64_t poll_interval = RTE_LIBRTE_PMD_DLB2_POLL_INTERVAL;
+		uint64_t curr_ticks = rte_get_timer_cycles();
+		uint64_t init_ticks = curr_ticks;
+
+		while ((curr_ticks - start_ticks < timeout) &&
+		       (curr_ticks - init_ticks < poll_interval))
+			curr_ticks = rte_get_timer_cycles();
+	}
+
+	return 0;
+}
+
+static inline int
+dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
+			 struct dlb2_port *qm_port,
+			 struct rte_event *events,
+			 struct dlb2_dequeue_qe *qes,
+			 int cnt)
+{
+	uint8_t *qid_mappings = qm_port->qid_mappings;
+	int i, num, evq_id;
+
+	for (i = 0, num = 0; i < cnt; i++) {
+		struct dlb2_dequeue_qe *qe = &qes[i];
+		int sched_type_map[DLB2_NUM_HW_SCHED_TYPES] = {
+			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+		};
+
+		/* Fill in event information.
+		 * Note that flow_id must be embedded in the data by
+		 * the app, such as the mbuf RSS hash field if the data
+		 * buffer is a mbuf.
+		 */
+		if (unlikely(qe->error)) {
+			DLB2_LOG_ERR("QE error bit ON\n");
+			DLB2_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
+			dlb2_consume_qe_immediate(qm_port, 1);
+			continue; /* Ignore */
+		}
+
+		events[num].u64 = qe->data;
+		events[num].flow_id = qe->flow_id;
+		events[num].priority = DLB2_TO_EV_PRIO((uint8_t)qe->priority);
+		events[num].event_type = qe->u.event_type.major;
+		events[num].sub_event_type = qe->u.event_type.sub;
+		events[num].sched_type = sched_type_map[qe->sched_type];
+		events[num].impl_opaque = qe->qid_depth;
+
+		/* qid not preserved for directed queues */
+		if (qm_port->is_directed)
+			evq_id = ev_port->link[0].queue_id;
+		else
+			evq_id = qid_mappings[qe->qid];
+
+		events[num].queue_id = evq_id;
+		DLB2_INC_STAT(
+			ev_port->stats.queue[evq_id].qid_depth[qe->qid_depth],
+			1);
+		DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
+		num++;
+	}
+
+	DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num);
+
+	return num;
+}
+
+static inline int
+dlb2_process_dequeue_four_qes(struct dlb2_eventdev_port *ev_port,
+			      struct dlb2_port *qm_port,
+			      struct rte_event *events,
+			      struct dlb2_dequeue_qe *qes)
+{
+	int sched_type_map[] = {
+		[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+		[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+		[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+		[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+	};
+	const int num_events = DLB2_NUM_QES_PER_CACHE_LINE;
+	uint8_t *qid_mappings = qm_port->qid_mappings;
+	__m128i sse_evt[2];
+
+	/* In the unlikely case that any of the QE error bits are set, process
+	 * them one at a time.
+	 */
+	if (unlikely(qes[0].error || qes[1].error ||
+		     qes[2].error || qes[3].error))
+		return dlb2_process_dequeue_qes(ev_port, qm_port, events,
+						 qes, num_events);
+
+	events[0].u64 = qes[0].data;
+	events[1].u64 = qes[1].data;
+	events[2].u64 = qes[2].data;
+	events[3].u64 = qes[3].data;
+
+	/* Construct the metadata portion of two struct rte_events
+	 * in one 128b SSE register. Event metadata is constructed in the SSE
+	 * registers like so:
+	 * sse_evt[0][63:0]:   event[0]'s metadata
+	 * sse_evt[0][127:64]: event[1]'s metadata
+	 * sse_evt[1][63:0]:   event[2]'s metadata
+	 * sse_evt[1][127:64]: event[3]'s metadata
+	 */
+	sse_evt[0] = _mm_setzero_si128();
+	sse_evt[1] = _mm_setzero_si128();
+
+	/* Convert the hardware queue ID to an event queue ID and store it in
+	 * the metadata:
+	 * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
+	 * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
+	 * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
+	 * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
+	 */
+#define DLB_EVENT_QUEUE_ID_BYTE 5
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     qid_mappings[qes[0].qid],
+				     DLB_EVENT_QUEUE_ID_BYTE);
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+				     qid_mappings[qes[1].qid],
+				     DLB_EVENT_QUEUE_ID_BYTE + 8);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     qid_mappings[qes[2].qid],
+				     DLB_EVENT_QUEUE_ID_BYTE);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+				     qid_mappings[qes[3].qid],
+				     DLB_EVENT_QUEUE_ID_BYTE + 8);
+
+	/* Convert the hardware priority to an event priority and store it in
+	 * the metadata, while also returning the queue depth status
+	 * value captured by the hardware, storing it in impl_opaque, which can
+	 * be read by the application but not modified
+	 * sse_evt[0][55:48]   = DLB2_TO_EV_PRIO(qes[0].priority)
+	 * sse_evt[0][63:56]   = qes[0].qid_depth
+	 * sse_evt[0][119:112] = DLB2_TO_EV_PRIO(qes[1].priority)
+	 * sse_evt[0][127:120] = qes[1].qid_depth
+	 * sse_evt[1][55:48]   = DLB2_TO_EV_PRIO(qes[2].priority)
+	 * sse_evt[1][63:56]   = qes[2].qid_depth
+	 * sse_evt[1][119:112] = DLB2_TO_EV_PRIO(qes[3].priority)
+	 * sse_evt[1][127:120] = qes[3].qid_depth
+	 */
+#define DLB_EVENT_PRIO_IMPL_OPAQUE_WORD 3
+#define DLB_BYTE_SHIFT 8
+	sse_evt[0] =
+		_mm_insert_epi16(sse_evt[0],
+			DLB2_TO_EV_PRIO((uint8_t)qes[0].priority) |
+			(qes[0].qid_depth << DLB_BYTE_SHIFT),
+			DLB_EVENT_PRIO_IMPL_OPAQUE_WORD);
+	sse_evt[0] =
+		_mm_insert_epi16(sse_evt[0],
+			DLB2_TO_EV_PRIO((uint8_t)qes[1].priority) |
+			(qes[1].qid_depth << DLB_BYTE_SHIFT),
+			DLB_EVENT_PRIO_IMPL_OPAQUE_WORD + 4);
+	sse_evt[1] =
+		_mm_insert_epi16(sse_evt[1],
+			DLB2_TO_EV_PRIO((uint8_t)qes[2].priority) |
+			(qes[2].qid_depth << DLB_BYTE_SHIFT),
+			DLB_EVENT_PRIO_IMPL_OPAQUE_WORD);
+	sse_evt[1] =
+		_mm_insert_epi16(sse_evt[1],
+			DLB2_TO_EV_PRIO((uint8_t)qes[3].priority) |
+			(qes[3].qid_depth << DLB_BYTE_SHIFT),
+			DLB_EVENT_PRIO_IMPL_OPAQUE_WORD + 4);
+
+	/* Write the event type, sub event type, and flow_id to the event
+	 * metadata.
+	 * sse_evt[0][31:0]   = qes[0].flow_id |
+	 *			qes[0].u.event_type.major << 28 |
+	 *			qes[0].u.event_type.sub << 20;
+	 * sse_evt[0][95:64]  = qes[1].flow_id |
+	 *			qes[1].u.event_type.major << 28 |
+	 *			qes[1].u.event_type.sub << 20;
+	 * sse_evt[1][31:0]   = qes[2].flow_id |
+	 *			qes[2].u.event_type.major << 28 |
+	 *			qes[2].u.event_type.sub << 20;
+	 * sse_evt[1][95:64]  = qes[3].flow_id |
+	 *			qes[3].u.event_type.major << 28 |
+	 *			qes[3].u.event_type.sub << 20;
+	 */
+#define DLB_EVENT_EV_TYPE_DW 0
+#define DLB_EVENT_EV_TYPE_SHIFT 28
+#define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
+	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
+			qes[0].flow_id |
+			qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
+			qes[0].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
+			DLB_EVENT_EV_TYPE_DW);
+	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
+			qes[1].flow_id |
+			qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
+			qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
+			DLB_EVENT_EV_TYPE_DW + 2);
+	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
+			qes[2].flow_id |
+			qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
+			qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
+			DLB_EVENT_EV_TYPE_DW);
+	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
+			qes[3].flow_id |
+			qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
+			qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
+			DLB_EVENT_EV_TYPE_DW + 2);
+
+	/* Write the sched type to the event metadata. 'op' and 'rsvd' are not
+	 * set:
+	 * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
+	 * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
+	 * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
+	 * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
+	 */
+#define DLB_EVENT_SCHED_TYPE_BYTE 4
+#define DLB_EVENT_SCHED_TYPE_SHIFT 6
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+		sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
+		DLB_EVENT_SCHED_TYPE_BYTE);
+	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
+		sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
+		DLB_EVENT_SCHED_TYPE_BYTE + 8);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+		sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
+		DLB_EVENT_SCHED_TYPE_BYTE);
+	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
+		sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
+		DLB_EVENT_SCHED_TYPE_BYTE + 8);
+
+	/* Store the metadata to the event (use the double-precision
+	 * _mm_storeh_pd because there is no integer function for storing the
+	 * upper 64b):
+	 * events[0].event = sse_evt[0][63:0]
+	 * events[1].event = sse_evt[0][127:64]
+	 * events[2].event = sse_evt[1][63:0]
+	 * events[3].event = sse_evt[1][127:64]
+	 */
+	_mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
+	_mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
+	_mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
+	_mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
+
+	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
+	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
+	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
+	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
+
+	DLB2_INC_STAT(
+		ev_port->stats.queue[events[0].queue_id].
+			qid_depth[qes[0].qid_depth],
+		1);
+	DLB2_INC_STAT(
+		ev_port->stats.queue[events[1].queue_id].
+			qid_depth[qes[1].qid_depth],
+		1);
+	DLB2_INC_STAT(
+		ev_port->stats.queue[events[2].queue_id].
+			qid_depth[qes[2].qid_depth],
+		1);
+	DLB2_INC_STAT(
+		ev_port->stats.queue[events[3].queue_id].
+			qid_depth[qes[3].qid_depth],
+		1);
+
+	DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
+
+	return num_events;
+}
+
+static __rte_always_inline int
+dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
+{
+	volatile struct dlb2_dequeue_qe *cq_addr;
+	uint8_t xor_mask[2] = {0x0F, 0x00};
+	const uint8_t and_mask = 0x0F;
+	__m128i *qes = (__m128i *)qe;
+	uint8_t gen_bits, gen_bit;
+	uintptr_t addr[4];
+	uint16_t idx;
+
+	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+	idx = qm_port->cq_idx;
+
+	/* Load the next 4 QEs */
+	addr[0] = (uintptr_t)&cq_addr[idx];
+	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
+	addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
+	addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
+
+	/* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
+	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+	/* Correct the xor_mask for wrap-around QEs */
+	gen_bit = qm_port->gen_bit;
+	xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
+	xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
+	xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
+
+	/* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
+	 * valid, then QEs[0:N-1] are too.
+	 */
+	qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
+	rte_compiler_barrier();
+	qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
+	rte_compiler_barrier();
+	qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
+	rte_compiler_barrier();
+	qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
+
+	/* Extract and combine the gen bits */
+	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
+		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
+		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
+		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
+
+	/* XOR the combined bits such that a 1 represents a valid QE */
+	gen_bits ^= xor_mask[gen_bit];
+
+	/* Mask off gen bits we don't care about */
+	gen_bits &= and_mask;
+
+	return __builtin_popcount(gen_bits);
+}
+
+static inline void
+dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
+{
+	uint16_t idx = qm_port->cq_idx_unmasked + cnt;
+
+	qm_port->cq_idx_unmasked = idx;
+	qm_port->cq_idx = idx & qm_port->cq_depth_mask;
+	qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
+}
+
+static int
+dlb2_event_release(struct dlb2_eventdev *dlb2,
+		   uint8_t port_id,
+		   int n)
+{
+	struct process_local_port_data *port_data;
+	struct dlb2_eventdev_port *ev_port;
+	struct dlb2_port *qm_port;
+	int i, cnt;
+
+	if (port_id > dlb2->num_ports) {
+		DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
+			     port_id);
+		rte_errno = -EINVAL;
+		return rte_errno;
+	}
+
+	ev_port = &dlb2->ev_ports[port_id];
+	qm_port = &ev_port->qm_port;
+	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
+
+	cnt = 0;
+
+	if (qm_port->is_directed) {
+		cnt = n;
+		goto sw_credit_update;
+	}
+
+	for (i = 0; i < n; i += DLB2_NUM_QES_PER_CACHE_LINE) {
+		int j;
+
+		/* Zero-out QEs */
+		qm_port->qe4[0].cmd_byte = 0;
+		qm_port->qe4[1].cmd_byte = 0;
+		qm_port->qe4[2].cmd_byte = 0;
+		qm_port->qe4[3].cmd_byte = 0;
+
+		for (j = 0; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++)
+			qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
+
+		qm_port->issued_releases += j;
+
+		if (j == 0)
+			break;
+
+		dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
+
+		cnt += j;
+	}
+
+sw_credit_update:
+	/* each release returns one credit */
+	if (!ev_port->outstanding_releases) {
+		DLB2_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
+		rte_errno = -ENOTRECOVERABLE;
+		return rte_errno;
+	}
+
+	ev_port->outstanding_releases -= cnt;
+	ev_port->inflight_credits += cnt;
+
+	/* Replenish s/w credits if enough releases are performed */
+	dlb2_replenish_sw_credits(dlb2, ev_port);
+	return 0;
+}
+
+static inline int16_t
+dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
+		       struct dlb2_eventdev_port *ev_port,
+		       struct rte_event *events,
+		       uint16_t max_num,
+		       uint64_t dequeue_timeout_ticks)
+{
+	uint64_t timeout;
+	uint64_t start_ticks = 0ULL;
+	struct dlb2_port *qm_port;
+	int num = 0;
+
+	qm_port = &ev_port->qm_port;
+
+	/* We have a special implementation for waiting. Wait can be:
+	 * 1) no waiting at all
+	 * 2) busy poll only
+	 * 3) wait for interrupt. If wakeup and poll time
+	 * has expired, then return to caller
+	 * 4) umonitor/umwait repeatedly up to poll time
+	 */
+
+	/* If configured for per dequeue wait, then use wait value provided
+	 * to this API. Otherwise we must use the global
+	 * value from eventdev config time.
+	 */
+	if (!dlb2->global_dequeue_wait)
+		timeout = dequeue_timeout_ticks;
+	else
+		timeout = dlb2->global_dequeue_wait_ticks;
+
+	start_ticks = rte_get_timer_cycles();
+
+	while (num < max_num) {
+		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
+		int num_avail;
+
+		/* Copy up to 4 QEs from the current cache line into qes */
+		num_avail = dlb2_recv_qe_sparse(qm_port, qes);
+
+		/* But don't process more than the user requested */
+		num_avail = RTE_MIN(num_avail, max_num - num);
+
+		dlb2_inc_cq_idx(qm_port, num_avail << 2);
+
+		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+			num += dlb2_process_dequeue_four_qes(ev_port,
+							      qm_port,
+							      &events[num],
+							      &qes[0]);
+		else if (num_avail)
+			num += dlb2_process_dequeue_qes(ev_port,
+							 qm_port,
+							 &events[num],
+							 &qes[0],
+							 num_avail);
+		else if ((timeout == 0) || (num > 0))
+			/* Not waiting in any form, or 1+ events received? */
+			break;
+		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+					   timeout, start_ticks))
+			break;
+	}
+
+	qm_port->owed_tokens += num;
+
+	if (num) {
+
+		dlb2_consume_qe_immediate(qm_port, num);
+
+		ev_port->outstanding_releases += num;
+
+		dlb2_port_credits_inc(qm_port, num);
+	}
+
+	return num;
+}
+
+static __rte_always_inline int
+dlb2_recv_qe(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe,
+	     uint8_t *offset)
+{
+	uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
+				   {0x00, 0x01, 0x03, 0x07} };
+	uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
+	volatile struct dlb2_dequeue_qe *cq_addr;
+	__m128i *qes = (__m128i *)qe;
+	uint64_t *cache_line_base;
+	uint8_t gen_bits;
+
+	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+	cq_addr = &cq_addr[qm_port->cq_idx];
+
+	cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
+	*offset = ((uintptr_t)cq_addr & 0x30) >> 4;
+
+	/* Load the next CQ cache line from memory. Pack these reads as tight
+	 * as possible to reduce the chance that DLB invalidates the line while
+	 * the CPU is reading it. Read the cache line backwards to ensure that
+	 * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
+	 *
+	 * (Valid QEs start at &qe[offset])
+	 */
+	qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
+	qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
+	qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
+	qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
+
+	/* Evict the cache line ASAP */
+	dlb2_cldemote(cache_line_base);
+
+	/* Extract and combine the gen bits */
+	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
+		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
+		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
+		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
+
+	/* XOR the combined bits such that a 1 represents a valid QE */
+	gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
+
+	/* Mask off gen bits we don't care about */
+	gen_bits &= and_mask[*offset];
+
+	return __builtin_popcount(gen_bits);
+}
+
+static inline int16_t
+dlb2_hw_dequeue(struct dlb2_eventdev *dlb2,
+		struct dlb2_eventdev_port *ev_port,
+		struct rte_event *events,
+		uint16_t max_num,
+		uint64_t dequeue_timeout_ticks)
+{
+	uint64_t timeout;
+	uint64_t start_ticks = 0ULL;
+	struct dlb2_port *qm_port;
+	int num = 0;
+
+	qm_port = &ev_port->qm_port;
+
+	/* We have a special implementation for waiting. Wait can be:
+	 * 1) no waiting at all
+	 * 2) busy poll only
+	 * 3) wait for interrupt. If wakeup and poll time
+	 * has expired, then return to caller
+	 * 4) umonitor/umwait repeatedly up to poll time
+	 */
+
+	/* If configured for per dequeue wait, then use wait value provided
+	 * to this API. Otherwise we must use the global
+	 * value from eventdev config time.
+	 */
+	if (!dlb2->global_dequeue_wait)
+		timeout = dequeue_timeout_ticks;
+	else
+		timeout = dlb2->global_dequeue_wait_ticks;
+
+	start_ticks = rte_get_timer_cycles();
+
+	while (num < max_num) {
+		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
+		uint8_t offset;
+		int num_avail;
+
+		/* Copy up to 4 QEs from the current cache line into qes */
+		num_avail = dlb2_recv_qe(qm_port, qes, &offset);
+
+		/* But don't process more than the user requested */
+		num_avail = RTE_MIN(num_avail, max_num - num);
+
+		dlb2_inc_cq_idx(qm_port, num_avail);
+
+		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+			num += dlb2_process_dequeue_four_qes(ev_port,
+							     qm_port,
+							     &events[num],
+							     &qes[offset]);
+		else if (num_avail)
+			num += dlb2_process_dequeue_qes(ev_port,
+							qm_port,
+							&events[num],
+							&qes[offset],
+							num_avail);
+		else if ((timeout == 0) || (num > 0))
+			/* Not waiting in any form, or 1+ events received? */
+			break;
+		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+					   timeout, start_ticks))
+			break;
+	}
+
+	qm_port->owed_tokens += num;
+
+	if (num) {
+
+		dlb2_consume_qe_immediate(qm_port, num);
+
+		ev_port->outstanding_releases += num;
+
+		dlb2_port_credits_inc(qm_port, num);
+	}
+
+	return num;
+}
+
+static uint16_t
+dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
+			 uint64_t wait)
+{
+	struct dlb2_eventdev_port *ev_port = event_port;
+	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
+	uint16_t cnt;
+
+	RTE_ASSERT(ev_port->setup_done);
+	RTE_ASSERT(ev != NULL);
+
+	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
+		uint16_t out_rels = ev_port->outstanding_releases;
+
+		if (dlb2_event_release(dlb2, ev_port->id, out_rels))
+			return 0; /* rte_errno is set */
+
+		DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
+	}
+
+	cnt = dlb2_hw_dequeue(dlb2, ev_port, ev, num, wait);
+
+	DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
+	DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
+
+	return cnt;
+}
+
+static uint16_t
+dlb2_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
+{
+	return dlb2_event_dequeue_burst(event_port, ev, 1, wait);
+}
+
+static uint16_t
+dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
+				uint16_t num, uint64_t wait)
+{
+	struct dlb2_eventdev_port *ev_port = event_port;
+	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
+	uint16_t cnt;
+
+	RTE_ASSERT(ev_port->setup_done);
+	RTE_ASSERT(ev != NULL);
+
+	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
+		uint16_t out_rels = ev_port->outstanding_releases;
+
+		if (dlb2_event_release(dlb2, ev_port->id, out_rels))
+			return 0; /* rte_errno is set */
+
+		DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
+	}
+
+	cnt = dlb2_hw_dequeue_sparse(dlb2, ev_port, ev, num, wait);
+
+	DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
+	DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
+	return cnt;
+}
+
+static uint16_t
+dlb2_event_dequeue_sparse(void *event_port, struct rte_event *ev,
+			  uint64_t wait)
+{
+	return dlb2_event_dequeue_burst_sparse(event_port, ev, 1, wait);
+}
+
 static void
 dlb2_entry_points_init(struct rte_eventdev *dev)
 {
+	struct dlb2_eventdev *dlb2;
+
 	/* Expose PMD's eventdev interface */
 	static struct rte_eventdev_ops dlb2_eventdev_entry_ops = {
 		.dev_infos_get    = dlb2_eventdev_info_get,
@@ -2653,6 +3427,15 @@ dlb2_entry_points_init(struct rte_eventdev *dev)
 	dev->enqueue_burst = dlb2_event_enqueue_burst;
 	dev->enqueue_new_burst = dlb2_event_enqueue_new_burst;
 	dev->enqueue_forward_burst = dlb2_event_enqueue_forward_burst;
+
+	dlb2 = dev->data->dev_private;
+	if (dlb2->poll_mode == DLB2_CQ_POLL_MODE_SPARSE) {
+		dev->dequeue = dlb2_event_dequeue_sparse;
+		dev->dequeue_burst = dlb2_event_dequeue_burst_sparse;
+	} else {
+		dev->dequeue = dlb2_event_dequeue;
+		dev->dequeue_burst = dlb2_event_dequeue_burst;
+	}
 }
 
 int
-- 
2.6.4



More information about the dev mailing list