[dpdk-dev] [RFC 4/7] net/af_xdp: use mbuf mempool for buffer management

Qi Zhang qi.z.zhang at intel.com
Tue Feb 27 10:35:51 CET 2018


Now, af_xdp registered memory buffer is managed by rte_mempool.
mbuf be allocated from rte_mempool can be convert to descriptor
index and vice versa.

Signed-off-by: Qi Zhang <qi.z.zhang at intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 165 +++++++++++++++++++++---------------
 1 file changed, 97 insertions(+), 68 deletions(-)

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 4eb8a2c28..3c534c77c 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -43,7 +43,11 @@
 
 #define ETH_AF_XDP_FRAME_SIZE		2048
 #define ETH_AF_XDP_NUM_BUFFERS		131072
-#define ETH_AF_XDP_DATA_HEADROOM	0
+/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
+#define ETH_AF_XDP_MBUF_OVERHEAD	192
+/* data start from offset 320 (192 + 128) bytes */
+#define ETH_AF_XDP_DATA_HEADROOM \
+	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
 #define ETH_AF_XDP_DFLT_RING_SIZE	1024
 #define ETH_AF_XDP_DFLT_QUEUE_IDX	0
 
@@ -57,6 +61,7 @@ struct xdp_umem {
 	unsigned int frame_size_log2;
 	unsigned int nframes;
 	int mr_fd;
+	struct rte_mempool *mb_pool;
 };
 
 struct pmd_internals {
@@ -67,7 +72,7 @@ struct pmd_internals {
 	struct xdp_queue rx;
 	struct xdp_queue tx;
 	struct xdp_umem *umem;
-	struct rte_mempool *mb_pool;
+	struct rte_mempool *ext_mb_pool;
 
 	unsigned long rx_pkts;
 	unsigned long rx_bytes;
@@ -80,7 +85,6 @@ struct pmd_internals {
 	uint16_t port_id;
 	uint16_t queue_idx;
 	int ring_size;
-	struct rte_ring *buf_ring;
 };
 
 static const char * const valid_arguments[] = {
@@ -106,6 +110,21 @@ static void *get_pkt_data(struct pmd_internals *internals,
 			   offset);
 }
 
+static uint32_t
+mbuf_to_idx(struct pmd_internals *internals, struct rte_mbuf *mbuf)
+{
+	return (uint32_t)(((uint64_t)mbuf->buf_addr -
+			   (uint64_t)internals->umem->buffer) >>
+			  internals->umem->frame_size_log2);
+}
+
+static struct rte_mbuf *
+idx_to_mbuf(struct pmd_internals *internals, uint32_t idx)
+{
+	return (struct rte_mbuf *)(void *)(internals->umem->buffer + (idx
+			<< internals->umem->frame_size_log2) + 0x40);
+}
+
 static uint16_t
 eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
@@ -120,17 +139,18 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		  nb_pkts : ETH_AF_XDP_RX_BATCH_SIZE;
 
 	struct xdp_desc descs[ETH_AF_XDP_RX_BATCH_SIZE];
-	void *indexes[ETH_AF_XDP_RX_BATCH_SIZE];
+	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 	int rcvd, i;
 	/* fill rx ring */
 	if (rxq->num_free >= ETH_AF_XDP_RX_BATCH_SIZE) {
-		int n = rte_ring_dequeue_bulk(internals->buf_ring,
-					      indexes,
-					      ETH_AF_XDP_RX_BATCH_SIZE,
-					      NULL);
-		for (i = 0; i < n; i++)
-			descs[i].idx = (uint32_t)((long int)indexes[i]);
-		xq_enq(rxq, descs, n);
+		int ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+					     (void *)mbufs,
+					     ETH_AF_XDP_RX_BATCH_SIZE);
+		if (!ret) {
+			for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++)
+				descs[i].idx = mbuf_to_idx(internals, mbufs[i]);
+			xq_enq(rxq, descs, ETH_AF_XDP_RX_BATCH_SIZE);
+		}
 	}
 
 	/* read data */
@@ -142,7 +162,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		char *pkt;
 		uint32_t idx = descs[i].idx;
 
-		mbuf = rte_pktmbuf_alloc(internals->mb_pool);
+		mbuf = rte_pktmbuf_alloc(internals->ext_mb_pool);
 		rte_pktmbuf_pkt_len(mbuf) =
 			rte_pktmbuf_data_len(mbuf) =
 			descs[i].len;
@@ -155,11 +175,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		} else {
 			dropped++;
 		}
-		indexes[i] = (void *)((long int)idx);
+		rte_pktmbuf_free(idx_to_mbuf(internals, idx));
 	}
 
-	rte_ring_enqueue_bulk(internals->buf_ring, indexes, rcvd, NULL);
-
 	internals->rx_pkts += (rcvd - dropped);
 	internals->rx_bytes += rx_bytes;
 	internals->rx_dropped += dropped;
@@ -187,9 +205,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct xdp_queue *txq = &internals->tx;
 	struct rte_mbuf *mbuf;
 	struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE];
-	void *indexes[ETH_AF_XDP_TX_BATCH_SIZE];
+	struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE];
 	uint16_t i, valid;
 	unsigned long tx_bytes = 0;
+	int ret;
 
 	nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ?
 		  nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE;
@@ -198,13 +217,15 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		int n = xq_deq(txq, descs, ETH_AF_XDP_TX_BATCH_SIZE);
 
 		for (i = 0; i < n; i++)
-			indexes[i] = (void *)((long int)descs[i].idx);
-		rte_ring_enqueue_bulk(internals->buf_ring, indexes, n, NULL);
+			rte_pktmbuf_free(idx_to_mbuf(internals, descs[i].idx));
 	}
 
 	nb_pkts = nb_pkts > txq->num_free ? txq->num_free : nb_pkts;
-	nb_pkts = rte_ring_dequeue_bulk(internals->buf_ring, indexes,
-					nb_pkts, NULL);
+	ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+				   (void *)mbufs,
+				   nb_pkts);
+	if (ret)
+		return 0;
 
 	valid = 0;
 	for (i = 0; i < nb_pkts; i++) {
@@ -213,14 +234,14 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 			internals->umem->frame_size - ETH_AF_XDP_DATA_HEADROOM;
 		mbuf = bufs[i];
 		if (mbuf->pkt_len <= buf_len) {
-			descs[valid].idx = (uint32_t)((long int)indexes[valid]);
+			descs[valid].idx = mbuf_to_idx(internals, mbufs[i]);
 			descs[valid].offset = ETH_AF_XDP_DATA_HEADROOM;
 			descs[valid].flags = 0;
 			descs[valid].len = mbuf->pkt_len;
 			pkt = get_pkt_data(internals, descs[i].idx,
 					   descs[i].offset);
 			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-			       descs[i].len);
+					   descs[i].len);
 			valid++;
 			tx_bytes += mbuf->pkt_len;
 		}
@@ -230,9 +251,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	xq_enq(txq, descs, valid);
 	kick_tx(internals->sfd);
 
-	if (valid < nb_pkts)
-		rte_ring_enqueue_bulk(internals->buf_ring, &indexes[valid],
-				      nb_pkts - valid, NULL);
+	if (valid < nb_pkts) {
+		for (i = valid; i < nb_pkts; i++)
+			rte_pktmbuf_free(mbufs[i]);
+	}
 
 	internals->err_pkts += (nb_pkts - valid);
 	internals->tx_pkts += valid;
@@ -245,14 +267,13 @@ static void
 fill_rx_desc(struct pmd_internals *internals)
 {
 	int num_free = internals->rx.num_free;
-	void *p = NULL;
 	int i;
-
 	for (i = 0; i < num_free; i++) {
 		struct xdp_desc desc = {};
+		struct rte_mbuf *mbuf =
+			rte_pktmbuf_alloc(internals->umem->mb_pool);
 
-		rte_ring_dequeue(internals->buf_ring, &p);
-		desc.idx = (uint32_t)((long int)p);
+		desc.idx = mbuf_to_idx(internals, mbuf);
 		xq_enq(&internals->rx, &desc, 1);
 	}
 }
@@ -347,33 +368,53 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused,
 	return 0;
 }
 
-static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd, size_t nbuffers)
+static void *get_base_addr(struct rte_mempool *mb_pool)
+{
+	struct rte_mempool_memhdr *memhdr;
+
+	STAILQ_FOREACH(memhdr, &mb_pool->mem_list, next) {
+		return memhdr->addr;
+	}
+	return NULL;
+}
+
+static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd,
+						      size_t nbuffers,
+						      const char *pool_name)
 {
 	struct xdp_mr_req req = { .frame_size = ETH_AF_XDP_FRAME_SIZE,
 				  .data_headroom = ETH_AF_XDP_DATA_HEADROOM };
-	struct xdp_umem *umem;
-	void *bufs;
-	int ret;
+	struct xdp_umem *umem = calloc(1, sizeof(*umem));
 
-	ret = posix_memalign((void **)&bufs, getpagesize(),
-			     nbuffers * req.frame_size);
-	if (ret)
+	if (!umem)
+		return NULL;
+
+	umem->mb_pool =
+		rte_pktmbuf_pool_create_with_flags(
+			pool_name, nbuffers,
+			250, 0,
+			(ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD),
+			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+			SOCKET_ID_ANY);
+
+	if (!umem->mb_pool) {
+		free(umem);
 		return NULL;
+	}
 
-	umem = calloc(1, sizeof(*umem));
-	if (!umem) {
-		free(bufs);
+	if (umem->mb_pool->nb_mem_chunks > 1) {
+		rte_mempool_free(umem->mb_pool);
+		free(umem);
 		return NULL;
 	}
 
-	req.addr = (unsigned long)bufs;
+	req.addr = (uint64_t)get_base_addr(umem->mb_pool);
 	req.len = nbuffers * req.frame_size;
-	ret = setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
-	RTE_ASSERT(ret == 0);
+	setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
 
 	umem->frame_size = ETH_AF_XDP_FRAME_SIZE;
 	umem->frame_size_log2 = 11;
-	umem->buffer = bufs;
+	umem->buffer = (char *)req.addr;
 	umem->size = nbuffers * req.frame_size;
 	umem->nframes = nbuffers;
 	umem->mr_fd = sfd;
@@ -386,38 +427,27 @@ xdp_configure(struct pmd_internals *internals)
 {
 	struct sockaddr_xdp sxdp;
 	struct xdp_ring_req req;
-	char ring_name[0x100];
+	char pool_name[0x100];
+
 	int ret = 0;
-	long int i;
 
-	snprintf(ring_name, 0x100, "%s_%s_%d", "af_xdp_ring",
+	snprintf(pool_name, 0x100, "%s_%s_%d", "af_xdp_pool",
 		 internals->if_name, internals->queue_idx);
-	internals->buf_ring = rte_ring_create(ring_name,
-					      ETH_AF_XDP_NUM_BUFFERS,
-					      SOCKET_ID_ANY,
-					      0x0);
-	if (!internals->buf_ring)
-		return -1;
-
-	for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
-		rte_ring_enqueue(internals->buf_ring, (void *)i);
-
 	internals->umem = xsk_alloc_and_mem_reg_buffers(internals->sfd,
-							ETH_AF_XDP_NUM_BUFFERS);
+							ETH_AF_XDP_NUM_BUFFERS,
+							pool_name);
 	if (!internals->umem)
-		goto error;
+		return -1;
 
 	req.mr_fd = internals->umem->mr_fd;
 	req.desc_nr = internals->ring_size;
 
 	ret = setsockopt(internals->sfd, SOL_XDP, XDP_RX_RING,
 			 &req, sizeof(req));
-
 	RTE_ASSERT(ret == 0);
 
 	ret = setsockopt(internals->sfd, SOL_XDP, XDP_TX_RING,
 			 &req, sizeof(req));
-
 	RTE_ASSERT(ret == 0);
 
 	internals->rx.ring = mmap(0, req.desc_nr * sizeof(struct xdp_desc),
@@ -448,10 +478,6 @@ xdp_configure(struct pmd_internals *internals)
 	RTE_ASSERT(ret == 0);
 
 	return ret;
-error:
-	rte_ring_free(internals->buf_ring);
-	internals->buf_ring = NULL;
-	return -1;
 }
 
 static int
@@ -466,11 +492,11 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
 	unsigned int buf_size, data_size;
 
 	RTE_ASSERT(rx_queue_id == 0);
-	internals->mb_pool = mb_pool;
+	internals->ext_mb_pool = mb_pool;
 	xdp_configure(internals);
 
 	/* Now get the space available for data in the mbuf */
-	buf_size = rte_pktmbuf_data_room_size(internals->mb_pool) -
+	buf_size = rte_pktmbuf_data_room_size(internals->ext_mb_pool) -
 		RTE_PKTMBUF_HEADROOM;
 	data_size = internals->umem->frame_size;
 
@@ -739,8 +765,11 @@ rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
 		return -1;
 
 	internals = eth_dev->data->dev_private;
-	rte_ring_free(internals->buf_ring);
-	rte_free(internals->umem);
+	if (internals->umem) {
+		if (internals->umem->mb_pool)
+			rte_mempool_free(internals->umem->mb_pool);
+		rte_free(internals->umem);
+	}
 	rte_free(eth_dev->data->dev_private);
 	rte_free(eth_dev->data);
 	close(internals->sfd);
-- 
2.13.6



More information about the dev mailing list