[dpdk-dev] [PATCH v3 22/46] net/liquidio: add Rx data path

Shijith Thotton shijith.thotton at caviumnetworks.com
Sat Mar 25 07:24:33 CET 2017


Add APIs to receive packets and re-fill ring buffers.

Signed-off-by: Shijith Thotton <shijith.thotton at caviumnetworks.com>
Signed-off-by: Jerin Jacob <jerin.jacob at caviumnetworks.com>
Signed-off-by: Derek Chickles <derek.chickles at caviumnetworks.com>
Signed-off-by: Venkat Koppula <venkat.koppula at caviumnetworks.com>
Signed-off-by: Srisivasubramanian S <ssrinivasan at caviumnetworks.com>
Signed-off-by: Mallesham Jatharakonda <mjatharakonda at oneconvergence.com>
---
 doc/guides/nics/features/liquidio.ini   |   4 +
 drivers/net/liquidio/base/lio_hw_defs.h |  12 +
 drivers/net/liquidio/lio_ethdev.c       |   5 +
 drivers/net/liquidio/lio_rxtx.c         | 380 ++++++++++++++++++++++++++++++++
 drivers/net/liquidio/lio_rxtx.h         |  13 ++
 5 files changed, 414 insertions(+)

diff --git a/doc/guides/nics/features/liquidio.ini b/doc/guides/nics/features/liquidio.ini
index 6c5d8d1..554d921 100644
--- a/doc/guides/nics/features/liquidio.ini
+++ b/doc/guides/nics/features/liquidio.ini
@@ -4,6 +4,10 @@
 ; Refer to default.ini for the full list of available PMD features.
 ;
 [Features]
+Scattered Rx         = Y
+CRC offload          = Y
+L3 checksum offload  = Y
+L4 checksum offload  = Y
 Multiprocess aware   = Y
 Linux UIO            = Y
 Linux VFIO           = Y
diff --git a/drivers/net/liquidio/base/lio_hw_defs.h b/drivers/net/liquidio/base/lio_hw_defs.h
index 4271730..2db7085 100644
--- a/drivers/net/liquidio/base/lio_hw_defs.h
+++ b/drivers/net/liquidio/base/lio_hw_defs.h
@@ -112,12 +112,24 @@ enum octeon_tag_type {
 /* used for NIC operations */
 #define LIO_OPCODE	1
 
+/* Subcodes are used by host driver/apps to identify the sub-operation
+ * for the core. They only need to by unique for a given subsystem.
+ */
+#define LIO_OPCODE_SUBCODE(op, sub)		\
+		((((op) & 0x0f) << 8) | ((sub) & 0x7f))
+
 /** LIO_OPCODE subcodes */
 /* This subcode is sent by core PCI driver to indicate cores are ready. */
+#define LIO_OPCODE_NW_DATA		0x02 /* network packet data */
 #define LIO_OPCODE_IF_CFG		0x09
 
 #define LIO_MAX_RX_PKTLEN		(64 * 1024)
 
+/* RX(packets coming from wire) Checksum verification flags */
+/* TCP/UDP csum */
+#define LIO_L4_CSUM_VERIFIED		0x1
+#define LIO_IP_CSUM_VERIFIED		0x2
+
 /* Interface flags communicated between host driver and core app. */
 enum lio_ifflags {
 	LIO_IFFLAG_UNICAST	= 0x10
diff --git a/drivers/net/liquidio/lio_ethdev.c b/drivers/net/liquidio/lio_ethdev.c
index a93fa4a..ebfdf7a 100644
--- a/drivers/net/liquidio/lio_ethdev.c
+++ b/drivers/net/liquidio/lio_ethdev.c
@@ -404,6 +404,8 @@ static int lio_dev_configure(struct rte_eth_dev *eth_dev)
 	rte_free(eth_dev->data->mac_addrs);
 	eth_dev->data->mac_addrs = NULL;
 
+	eth_dev->rx_pkt_burst = NULL;
+
 	return 0;
 }
 
@@ -415,6 +417,8 @@ static int lio_dev_configure(struct rte_eth_dev *eth_dev)
 
 	PMD_INIT_FUNC_TRACE();
 
+	eth_dev->rx_pkt_burst = &lio_dev_recv_pkts;
+
 	/* Primary does the initialization. */
 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
 		return 0;
@@ -448,6 +452,7 @@ static int lio_dev_configure(struct rte_eth_dev *eth_dev)
 		lio_dev_err(lio_dev,
 			    "MAC addresses memory allocation failed\n");
 		eth_dev->dev_ops = NULL;
+		eth_dev->rx_pkt_burst = NULL;
 		return -ENOMEM;
 	}
 
diff --git a/drivers/net/liquidio/lio_rxtx.c b/drivers/net/liquidio/lio_rxtx.c
index 9948023..9e4da3a 100644
--- a/drivers/net/liquidio/lio_rxtx.c
+++ b/drivers/net/liquidio/lio_rxtx.c
@@ -326,6 +326,386 @@
 	return 0;
 }
 
+static inline uint32_t
+lio_droq_get_bufcount(uint32_t buf_size, uint32_t total_len)
+{
+	uint32_t buf_cnt = 0;
+
+	while (total_len > (buf_size * buf_cnt))
+		buf_cnt++;
+
+	return buf_cnt;
+}
+
+/* If we were not able to refill all buffers, try to move around
+ * the buffers that were not dispatched.
+ */
+static inline uint32_t
+lio_droq_refill_pullup_descs(struct lio_droq *droq,
+			     struct lio_droq_desc *desc_ring)
+{
+	uint32_t refill_index = droq->refill_idx;
+	uint32_t desc_refilled = 0;
+
+	while (refill_index != droq->read_idx) {
+		if (droq->recv_buf_list[refill_index].buffer) {
+			droq->recv_buf_list[droq->refill_idx].buffer =
+				droq->recv_buf_list[refill_index].buffer;
+			desc_ring[droq->refill_idx].buffer_ptr =
+				desc_ring[refill_index].buffer_ptr;
+			droq->recv_buf_list[refill_index].buffer = NULL;
+			desc_ring[refill_index].buffer_ptr = 0;
+			do {
+				droq->refill_idx = lio_incr_index(
+							droq->refill_idx, 1,
+							droq->max_count);
+				desc_refilled++;
+				droq->refill_count--;
+			} while (droq->recv_buf_list[droq->refill_idx].buffer);
+		}
+		refill_index = lio_incr_index(refill_index, 1,
+					      droq->max_count);
+	}	/* while */
+
+	return desc_refilled;
+}
+
+/* lio_droq_refill
+ *
+ * @param lio_dev	- pointer to the lio device structure
+ * @param droq		- droq in which descriptors require new buffers.
+ *
+ * Description:
+ *  Called during normal DROQ processing in interrupt mode or by the poll
+ *  thread to refill the descriptors from which buffers were dispatched
+ *  to upper layers. Attempts to allocate new buffers. If that fails, moves
+ *  up buffers (that were not dispatched) to form a contiguous ring.
+ *
+ * Returns:
+ *  No of descriptors refilled.
+ *
+ * Locks:
+ * This routine is called with droq->lock held.
+ */
+static uint32_t
+lio_droq_refill(struct lio_device *lio_dev, struct lio_droq *droq)
+{
+	struct lio_droq_desc *desc_ring;
+	uint32_t desc_refilled = 0;
+	void *buf = NULL;
+
+	desc_ring = droq->desc_ring;
+
+	while (droq->refill_count && (desc_refilled < droq->max_count)) {
+		/* If a valid buffer exists (happens if there is no dispatch),
+		 * reuse the buffer, else allocate.
+		 */
+		if (droq->recv_buf_list[droq->refill_idx].buffer == NULL) {
+			buf = lio_recv_buffer_alloc(lio_dev, droq->q_no);
+			/* If a buffer could not be allocated, no point in
+			 * continuing
+			 */
+			if (buf == NULL)
+				break;
+
+			droq->recv_buf_list[droq->refill_idx].buffer = buf;
+		}
+
+		desc_ring[droq->refill_idx].buffer_ptr =
+		    lio_map_ring(droq->recv_buf_list[droq->refill_idx].buffer);
+		/* Reset any previous values in the length field. */
+		droq->info_list[droq->refill_idx].length = 0;
+
+		droq->refill_idx = lio_incr_index(droq->refill_idx, 1,
+						  droq->max_count);
+		desc_refilled++;
+		droq->refill_count--;
+	}
+
+	if (droq->refill_count)
+		desc_refilled += lio_droq_refill_pullup_descs(droq, desc_ring);
+
+	/* if droq->refill_count
+	 * The refill count would not change in pass two. We only moved buffers
+	 * to close the gap in the ring, but we would still have the same no. of
+	 * buffers to refill.
+	 */
+	return desc_refilled;
+}
+
+static int
+lio_droq_fast_process_packet(struct lio_device *lio_dev,
+			     struct lio_droq *droq,
+			     struct rte_mbuf **rx_pkts)
+{
+	struct rte_mbuf *nicbuf = NULL;
+	struct lio_droq_info *info;
+	uint32_t total_len = 0;
+	int data_total_len = 0;
+	uint32_t pkt_len = 0;
+	union octeon_rh *rh;
+	int data_pkts = 0;
+
+	info = &droq->info_list[droq->read_idx];
+	lio_swap_8B_data((uint64_t *)info, 2);
+
+	if (!info->length)
+		return -1;
+
+	/* Len of resp hdr in included in the received data len. */
+	info->length -= OCTEON_RH_SIZE;
+	rh = &info->rh;
+
+	total_len += (uint32_t)info->length;
+
+	if (lio_opcode_slow_path(rh)) {
+		uint32_t buf_cnt;
+
+		buf_cnt = lio_droq_get_bufcount(droq->buffer_size,
+						(uint32_t)info->length);
+		droq->read_idx = lio_incr_index(droq->read_idx, buf_cnt,
+						droq->max_count);
+		droq->refill_count += buf_cnt;
+	} else {
+		if (info->length <= droq->buffer_size) {
+			if (rh->r_dh.has_hash)
+				pkt_len = (uint32_t)(info->length - 8);
+			else
+				pkt_len = (uint32_t)info->length;
+
+			nicbuf = droq->recv_buf_list[droq->read_idx].buffer;
+			droq->recv_buf_list[droq->read_idx].buffer = NULL;
+			droq->read_idx = lio_incr_index(
+						droq->read_idx, 1,
+						droq->max_count);
+			droq->refill_count++;
+
+			if (likely(nicbuf != NULL)) {
+				nicbuf->data_off = RTE_PKTMBUF_HEADROOM;
+				nicbuf->nb_segs = 1;
+				nicbuf->next = NULL;
+				/* We don't have a way to pass flags yet */
+				nicbuf->ol_flags = 0;
+				if (rh->r_dh.has_hash) {
+					uint64_t *hash_ptr;
+
+					nicbuf->ol_flags |= PKT_RX_RSS_HASH;
+					hash_ptr = rte_pktmbuf_mtod(nicbuf,
+								    uint64_t *);
+					lio_swap_8B_data(hash_ptr, 1);
+					nicbuf->hash.rss = (uint32_t)*hash_ptr;
+					nicbuf->data_off += 8;
+				}
+
+				nicbuf->pkt_len = pkt_len;
+				nicbuf->data_len = pkt_len;
+				nicbuf->port = lio_dev->port_id;
+				/* Store the mbuf */
+				rx_pkts[data_pkts++] = nicbuf;
+				data_total_len += pkt_len;
+			}
+
+			/* Prefetch buffer pointers when on a cache line
+			 * boundary
+			 */
+			if ((droq->read_idx & 3) == 0) {
+				rte_prefetch0(
+				    &droq->recv_buf_list[droq->read_idx]);
+				rte_prefetch0(
+				    &droq->info_list[droq->read_idx]);
+			}
+		} else {
+			struct rte_mbuf *first_buf = NULL;
+			struct rte_mbuf *last_buf = NULL;
+
+			while (pkt_len < info->length) {
+				int cpy_len = 0;
+
+				cpy_len = ((pkt_len + droq->buffer_size) >
+						info->length)
+						? ((uint32_t)info->length -
+							pkt_len)
+						: droq->buffer_size;
+
+				nicbuf =
+				    droq->recv_buf_list[droq->read_idx].buffer;
+				droq->recv_buf_list[droq->read_idx].buffer =
+				    NULL;
+
+				if (likely(nicbuf != NULL)) {
+					/* Note the first seg */
+					if (!pkt_len)
+						first_buf = nicbuf;
+
+					nicbuf->data_off = RTE_PKTMBUF_HEADROOM;
+					nicbuf->nb_segs = 1;
+					nicbuf->next = NULL;
+					nicbuf->port = lio_dev->port_id;
+					/* We don't have a way to pass
+					 * flags yet
+					 */
+					nicbuf->ol_flags = 0;
+					if ((!pkt_len) && (rh->r_dh.has_hash)) {
+						uint64_t *hash_ptr;
+
+						nicbuf->ol_flags |=
+						    PKT_RX_RSS_HASH;
+						hash_ptr = rte_pktmbuf_mtod(
+						    nicbuf, uint64_t *);
+						lio_swap_8B_data(hash_ptr, 1);
+						nicbuf->hash.rss =
+						    (uint32_t)*hash_ptr;
+						nicbuf->data_off += 8;
+						nicbuf->pkt_len = cpy_len - 8;
+						nicbuf->data_len = cpy_len - 8;
+					} else {
+						nicbuf->pkt_len = cpy_len;
+						nicbuf->data_len = cpy_len;
+					}
+
+					if (pkt_len)
+						first_buf->nb_segs++;
+
+					if (last_buf)
+						last_buf->next = nicbuf;
+
+					last_buf = nicbuf;
+				} else {
+					PMD_RX_LOG(lio_dev, ERR, "no buf\n");
+				}
+
+				pkt_len += cpy_len;
+				droq->read_idx = lio_incr_index(
+							droq->read_idx,
+							1, droq->max_count);
+				droq->refill_count++;
+
+				/* Prefetch buffer pointers when on a
+				 * cache line boundary
+				 */
+				if ((droq->read_idx & 3) == 0) {
+					rte_prefetch0(&droq->recv_buf_list
+							      [droq->read_idx]);
+
+					rte_prefetch0(
+					    &droq->info_list[droq->read_idx]);
+				}
+			}
+			rx_pkts[data_pkts++] = first_buf;
+			if (rh->r_dh.has_hash)
+				data_total_len += (pkt_len - 8);
+			else
+				data_total_len += pkt_len;
+		}
+
+		/* Inform upper layer about packet checksum verification */
+		struct rte_mbuf *m = rx_pkts[data_pkts - 1];
+
+		if (rh->r_dh.csum_verified & LIO_IP_CSUM_VERIFIED)
+			m->ol_flags |= PKT_RX_IP_CKSUM_GOOD;
+
+		if (rh->r_dh.csum_verified & LIO_L4_CSUM_VERIFIED)
+			m->ol_flags |= PKT_RX_L4_CKSUM_GOOD;
+	}
+
+	if (droq->refill_count >= droq->refill_threshold) {
+		int desc_refilled = lio_droq_refill(lio_dev, droq);
+
+		/* Flush the droq descriptor data to memory to be sure
+		 * that when we update the credits the data in memory is
+		 * accurate.
+		 */
+		rte_wmb();
+		rte_write32(desc_refilled, droq->pkts_credit_reg);
+		/* make sure mmio write completes */
+		rte_wmb();
+	}
+
+	info->length = 0;
+	info->rh.rh64 = 0;
+
+	return data_pkts;
+}
+
+static uint32_t
+lio_droq_fast_process_packets(struct lio_device *lio_dev,
+			      struct lio_droq *droq,
+			      struct rte_mbuf **rx_pkts,
+			      uint32_t pkts_to_process)
+{
+	int ret, data_pkts = 0;
+	uint32_t pkt;
+
+	for (pkt = 0; pkt < pkts_to_process; pkt++) {
+		ret = lio_droq_fast_process_packet(lio_dev, droq,
+						   &rx_pkts[data_pkts]);
+		if (ret < 0) {
+			lio_dev_err(lio_dev, "Port[%d] DROQ[%d] idx: %d len:0, pkt_cnt: %d\n",
+				    lio_dev->port_id, droq->q_no,
+				    droq->read_idx, pkts_to_process);
+			break;
+		}
+		data_pkts += ret;
+	}
+
+	rte_atomic64_sub(&droq->pkts_pending, pkt);
+
+	return data_pkts;
+}
+
+static inline uint32_t
+lio_droq_check_hw_for_pkts(struct lio_droq *droq)
+{
+	uint32_t last_count;
+	uint32_t pkt_count;
+
+	pkt_count = rte_read32(droq->pkts_sent_reg);
+
+	last_count = pkt_count - droq->pkt_count;
+	droq->pkt_count = pkt_count;
+
+	if (last_count)
+		rte_atomic64_add(&droq->pkts_pending, last_count);
+
+	return last_count;
+}
+
+uint16_t
+lio_dev_recv_pkts(void *rx_queue,
+		  struct rte_mbuf **rx_pkts,
+		  uint16_t budget)
+{
+	struct lio_droq *droq = rx_queue;
+	struct lio_device *lio_dev = droq->lio_dev;
+	uint32_t pkts_processed = 0;
+	uint32_t pkt_count = 0;
+
+	lio_droq_check_hw_for_pkts(droq);
+
+	pkt_count = rte_atomic64_read(&droq->pkts_pending);
+	if (!pkt_count)
+		return 0;
+
+	if (pkt_count > budget)
+		pkt_count = budget;
+
+	/* Grab the lock */
+	rte_spinlock_lock(&droq->lock);
+	pkts_processed = lio_droq_fast_process_packets(lio_dev,
+						       droq, rx_pkts,
+						       pkt_count);
+
+	if (droq->pkt_count) {
+		rte_write32(droq->pkt_count, droq->pkts_sent_reg);
+		droq->pkt_count = 0;
+	}
+
+	/* Release the spin lock */
+	rte_spinlock_unlock(&droq->lock);
+
+	return pkts_processed;
+}
+
 /**
  *  lio_init_instr_queue()
  *  @param lio_dev	- pointer to the lio device structure.
diff --git a/drivers/net/liquidio/lio_rxtx.h b/drivers/net/liquidio/lio_rxtx.h
index fc623ad..420b893 100644
--- a/drivers/net/liquidio/lio_rxtx.h
+++ b/drivers/net/liquidio/lio_rxtx.h
@@ -515,6 +515,17 @@ enum {
 	return (uint64_t)dma_addr;
 }
 
+static inline int
+lio_opcode_slow_path(union octeon_rh *rh)
+{
+	uint16_t subcode1, subcode2;
+
+	subcode1 = LIO_OPCODE_SUBCODE(rh->r.opcode, rh->r.subcode);
+	subcode2 = LIO_OPCODE_SUBCODE(LIO_OPCODE, LIO_OPCODE_NW_DATA);
+
+	return subcode2 != subcode1;
+}
+
 /* Macro to increment index.
  * Index is incremented by count; if the sum exceeds
  * max, index is wrapped-around to the start.
@@ -533,6 +544,8 @@ enum {
 int lio_setup_droq(struct lio_device *lio_dev, int q_no, int num_descs,
 		   int desc_size, struct rte_mempool *mpool,
 		   unsigned int socket_id);
+uint16_t lio_dev_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
+			   uint16_t budget);
 
 /** Setup instruction queue zero for the device
  *  @param lio_dev which lio device to setup
-- 
1.8.3.1



More information about the dev mailing list