[PATCH v5 03/10] net/rtap: add Rx/Tx with scatter/gather support

Stephen Hemminger stephen at networkplumber.org
Mon Feb 9 19:39:02 CET 2026


Implement packet receive and transmit using io_uring asynchronous I/O,
with full support for both single-segment and multi-segment mbufs.

Rx path:
  - rtap_rx_alloc() chains multiple mbufs when the MTU exceeds a
    single mbuf's tailroom capacity
  - Pre-post read/readv requests to the io_uring submission queue,
    each backed by a pre-allocated (possibly chained) mbuf
  - On rx_burst, harvest completed CQEs and replace each consumed
    mbuf with a freshly allocated one
  - rtap_rx_adjust() distributes received data across segments and
    frees unused trailing segments
  - Parse the prepended virtio-net header (offload fields are
    ignored until the offload patch)

Tx path:
  - For single-segment mbufs, use io_uring write and batch submits
  - For multi-segment mbufs, use writev via io_uring with immediate
    submit (iovec is stack-allocated)
  - When the mbuf headroom is not writable (shared or indirect),
    chain a new header mbuf for the virtio-net header
  - Prepend a zeroed virtio-net header (offload population deferred)
  - Clean completed tx CQEs to free transmitted mbufs

Add io_uring cancel-all logic using IORING_ASYNC_CANCEL_ALL for
clean queue teardown, draining all pending CQEs and freeing mbufs.

Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
 doc/guides/nics/features/rtap.ini |   1 +
 drivers/net/rtap/meson.build      |   1 +
 drivers/net/rtap/rtap.h           |  13 +
 drivers/net/rtap/rtap_ethdev.c    |   7 +
 drivers/net/rtap/rtap_rxtx.c      | 755 ++++++++++++++++++++++++++++++
 5 files changed, 777 insertions(+)
 create mode 100644 drivers/net/rtap/rtap_rxtx.c

diff --git a/doc/guides/nics/features/rtap.ini b/doc/guides/nics/features/rtap.ini
index ed7c638029..c064e1e0b9 100644
--- a/doc/guides/nics/features/rtap.ini
+++ b/doc/guides/nics/features/rtap.ini
@@ -4,6 +4,7 @@
 ; Refer to default.ini for the full list of available PMD features.
 ;
 [Features]
+Scattered Rx         = P
 Linux                = Y
 ARMv7                = Y
 ARMv8                = Y
diff --git a/drivers/net/rtap/meson.build b/drivers/net/rtap/meson.build
index 7bd7806ef3..8e2b15f382 100644
--- a/drivers/net/rtap/meson.build
+++ b/drivers/net/rtap/meson.build
@@ -19,6 +19,7 @@ endif
 
 sources = files(
         'rtap_ethdev.c',
+        'rtap_rxtx.c',
 )
 
 ext_deps += liburing
diff --git a/drivers/net/rtap/rtap.h b/drivers/net/rtap/rtap.h
index 39a3188a7b..a0bbb1a8a0 100644
--- a/drivers/net/rtap/rtap.h
+++ b/drivers/net/rtap/rtap.h
@@ -70,4 +70,17 @@ struct rtap_pmd {
 int rtap_queue_open(struct rte_eth_dev *dev, uint16_t queue_id);
 void rtap_queue_close(struct rte_eth_dev *dev, uint16_t queue_id);
 
+/* rtap_rxtx.c */
+uint16_t rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts);
+uint16_t rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts);
+int rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+			uint16_t nb_rx_desc, unsigned int socket_id,
+			const struct rte_eth_rxconf *rx_conf,
+			struct rte_mempool *mb_pool);
+void rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id);
+int rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+			uint16_t nb_tx_desc, unsigned int socket_id,
+			const struct rte_eth_txconf *tx_conf);
+void rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id);
+
 #endif /* _RTAP_H_ */
diff --git a/drivers/net/rtap/rtap_ethdev.c b/drivers/net/rtap/rtap_ethdev.c
index 4e7847ff8d..a65a8b77ad 100644
--- a/drivers/net/rtap/rtap_ethdev.c
+++ b/drivers/net/rtap/rtap_ethdev.c
@@ -232,6 +232,10 @@ static const struct eth_dev_ops rtap_ops = {
 	.dev_stop		= rtap_dev_stop,
 	.dev_configure		= rtap_dev_configure,
 	.dev_close		= rtap_dev_close,
+	.rx_queue_setup		= rtap_rx_queue_setup,
+	.rx_queue_release	= rtap_rx_queue_release,
+	.tx_queue_setup		= rtap_tx_queue_setup,
+	.tx_queue_release	= rtap_tx_queue_release,
 };
 
 static int
@@ -272,6 +276,9 @@ rtap_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist)
 
 	PMD_LOG(DEBUG, "%s setup", ifr.ifr_name);
 
+	dev->rx_pkt_burst = rtap_rx_burst;
+	dev->tx_pkt_burst = rtap_tx_burst;
+
 	return 0;
 
 error:
diff --git a/drivers/net/rtap/rtap_rxtx.c b/drivers/net/rtap/rtap_rxtx.c
new file mode 100644
index 0000000000..c972ab4ca0
--- /dev/null
+++ b/drivers/net/rtap/rtap_rxtx.c
@@ -0,0 +1,755 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2026 Stephen Hemminger
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <linux/virtio_net.h>
+
+#include <rte_common.h>
+#include <rte_ethdev.h>
+#include <rte_ip.h>
+#include <rte_mbuf.h>
+#include <rte_net.h>
+#include <rte_malloc.h>
+
+#include "rtap.h"
+
+/*
+ * Since virtio net header is prepended to the mbuf,
+ * the DPDK configuration should make sure that mbuf pools
+ * are created to work.
+ */
+static_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct virtio_net_hdr),
+	      "Pktmbuf headroom not big enough for virtio header");
+
+
+/* Get the per-process file descriptor used transmit and receive */
+static inline int
+rtap_queue_fd(uint16_t port_id, uint16_t queue_id)
+{
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	int *fds = dev->process_private;
+	int fd = fds[queue_id];
+
+	RTE_ASSERT(fd != -1);
+	return fd;
+}
+
+/*
+ * Add to submit queue a read of mbuf data.
+ * For multi-segment mbuf's requires readv().
+ * Return:
+ *   -ENOSPC : no submit queue element available.
+ *   1 : readv was used and no io_uring_submit was done.
+ *   0 : regular read submitted, caller should call io_uring_submit
+ *       later to batch.
+ */
+static inline int
+rtap_rx_submit(struct rtap_rx_queue *rxq, int fd, struct rte_mbuf *mb)
+{
+	struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring);
+	struct iovec iovs[RTE_MBUF_MAX_NB_SEGS];
+
+	if (unlikely(sqe == NULL))
+		return -ENOSPC;
+
+	io_uring_sqe_set_data(sqe, mb);
+
+	RTE_ASSERT(rte_pktmbuf_headroom(mb) >= sizeof(struct virtio_net_hdr));
+	void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(struct virtio_net_hdr));
+	unsigned int nbytes = sizeof(struct virtio_net_hdr) + rte_pktmbuf_tailroom(mb);
+
+	/* optimize for the case where packet fits in one mbuf */
+	if (mb->nb_segs == 1) {
+		io_uring_prep_read(sqe, fd, buf, nbytes, 0);
+		/* caller will submit as batch */
+		return 0;
+	} else {
+		uint16_t nsegs = mb->nb_segs;
+		RTE_ASSERT(nsegs > 0 && nsegs < IOV_MAX);
+
+		iovs[0].iov_base = buf;
+		iovs[0].iov_len = nbytes;
+
+		for (uint16_t i = 1; i < nsegs; i++) {
+			mb = mb->next;
+			iovs[i].iov_base = rte_pktmbuf_mtod(mb, void *);
+			iovs[i].iov_len = rte_pktmbuf_tailroom(mb);
+		}
+		io_uring_prep_readv(sqe, fd, iovs, nsegs, 0);
+
+		/*
+		 * For readv, need to submit now since iovs[] must be
+		 * valid until submitted.
+		 * io_uring_submit(3) returns the number of submitted submission
+		 *  queue entries (on failure returns -errno).
+		 */
+		return io_uring_submit(&rxq->io_ring);
+	}
+}
+
+/* Allocates one or more mbuf's to be used for reading packets */
+static struct rte_mbuf *
+rtap_rx_alloc(struct rtap_rx_queue *rxq)
+{
+	const struct rte_eth_dev *dev = &rte_eth_devices[rxq->port_id];
+	int buf_size = dev->data->mtu + RTE_ETHER_HDR_LEN;
+	struct rte_mbuf *m = NULL;
+	struct rte_mbuf **tail = &m;
+
+	do {
+		struct rte_mbuf *seg = rte_pktmbuf_alloc(rxq->mb_pool);
+		if (unlikely(seg == NULL)) {
+			rte_pktmbuf_free(m);
+			return NULL;
+		}
+		*tail = seg;
+		tail = &seg->next;
+		if (seg != m)
+			++m->nb_segs;
+
+		buf_size -= rte_pktmbuf_tailroom(seg);
+	} while (buf_size > 0);
+
+	__rte_mbuf_sanity_check(m, 1);
+	return m;
+}
+
+/*
+ * When receiving multi-segment mbuf's need to adjust
+ * the length of mbufs.
+ */
+static inline int
+rtap_rx_adjust(struct rte_mbuf *mb, uint32_t len)
+{
+	struct rte_mbuf *seg;
+	uint16_t count = 0;
+
+	mb->pkt_len = len;
+
+	/* Walk through mbuf chain and update the length of each segment */
+	for (seg = mb; seg != NULL && len > 0; seg = seg->next) {
+		uint16_t seg_len = RTE_MIN(len, rte_pktmbuf_tailroom(seg));
+
+		seg->data_len = seg_len;
+		count++;
+		len -= seg_len;
+
+		/* If length is zero, this is end of packet */
+		if (len == 0) {
+			/* Drop unused tail segments */
+			if (seg->next != NULL) {
+				struct rte_mbuf *tail = seg->next;
+				seg->next = NULL;
+
+				/* Free segments one by one to avoid nb_segs issues */
+				while (tail != NULL) {
+					struct rte_mbuf *next = tail->next;
+					rte_pktmbuf_free_seg(tail);
+					tail = next;
+				}
+			}
+
+			mb->nb_segs = count;
+			return 0;
+		}
+	}
+
+	/* Packet was truncated - not enough mbuf space */
+	return -1;
+}
+
+/*
+ * Set the receive offload flags of received mbuf
+ * based on the bits in the virtio network header
+ */
+static int
+rtap_rx_offload(struct rte_mbuf *m, const struct virtio_net_hdr *hdr)
+{
+	uint32_t ptype;
+	bool l4_supported = false;
+	struct rte_net_hdr_lens hdr_lens;
+
+	/* nothing to do */
+	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
+		return 0;
+
+	m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
+
+	ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
+	m->packet_type = ptype;
+	if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
+	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
+	    (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
+		l4_supported = true;
+
+	if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
+		uint32_t hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len;
+		if (hdr->csum_start <= hdrlen && l4_supported) {
+			m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
+		} else {
+			/* Unknown proto or tunnel, do sw cksum. */
+			uint16_t csum = 0;
+
+			if (rte_raw_cksum_mbuf(m, hdr->csum_start,
+					       rte_pktmbuf_pkt_len(m) - hdr->csum_start,
+					       &csum) < 0)
+				return -EINVAL;
+			if (likely(csum != 0xffff))
+				csum = ~csum;
+
+			uint32_t off = (uint32_t)hdr->csum_offset + hdr->csum_start;
+			if (rte_pktmbuf_data_len(m) >= off + sizeof(uint16_t))
+				*rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum;
+		}
+	} else if ((hdr->flags & VIRTIO_NET_HDR_F_DATA_VALID) && l4_supported) {
+		m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
+	}
+
+	/* GSO request, save required information in mbuf */
+	if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
+		/* Check unsupported modes */
+		if ((hdr->gso_type & VIRTIO_NET_HDR_GSO_ECN) || hdr->gso_size == 0)
+			return -EINVAL;
+
+		/* Update mss lengths in mbuf */
+		m->tso_segsz = hdr->gso_size;
+		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
+		case VIRTIO_NET_HDR_GSO_TCPV4:
+		case VIRTIO_NET_HDR_GSO_TCPV6:
+			m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE;
+			break;
+		default:
+			return -EINVAL;
+		}
+	}
+
+	return 0;
+}
+
+uint16_t
+rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+	struct rtap_rx_queue *rxq = queue;
+	struct io_uring_cqe *cqe;
+	unsigned int head, num_cqe = 0, num_sqe = 0;
+	uint16_t num_rx = 0;
+	uint32_t num_bytes = 0;
+	int fd = rtap_queue_fd(rxq->port_id, rxq->queue_id);
+
+	if (unlikely(nb_pkts == 0))
+		return 0;
+
+	io_uring_for_each_cqe(&rxq->io_ring, head, cqe) {
+		struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+		struct rte_mbuf *nmb = NULL;
+		struct virtio_net_hdr *hdr = NULL;
+		ssize_t len = cqe->res;
+
+		PMD_RX_LOG(DEBUG, "complete m=%p len=%zd", mb, len);
+
+		num_cqe++;
+
+		if (unlikely(len < (ssize_t)(sizeof(*hdr) + RTE_ETHER_HDR_LEN))) {
+			if (len < 0)
+				PMD_RX_LOG(ERR, "io_uring_read: %s", strerror(-len));
+			else
+				PMD_RX_LOG(ERR, "io_uring_read len %zd", len);
+			rxq->rx_errors++;
+			goto resubmit;
+		}
+
+		/* virtio header is before packet data */
+		hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr *, -sizeof(*hdr));
+		len -= sizeof(*hdr);
+
+		/* Replacement mbuf for resubmitting */
+		nmb = rtap_rx_alloc(rxq);
+		if (unlikely(nmb == NULL)) {
+			struct rte_eth_dev *dev = &rte_eth_devices[rxq->port_id];
+
+			PMD_RX_LOG(ERR, "Rx mbuf alloc failed");
+			dev->data->rx_mbuf_alloc_failed++;
+
+			nmb = mb;	 /* Reuse original */
+			goto resubmit;
+		}
+
+		if (mb->nb_segs == 1) {
+			mb->data_len = len;
+			mb->pkt_len = len;
+		} else {
+			if (unlikely(rtap_rx_adjust(mb, len) < 0)) {
+				PMD_RX_LOG(ERR, "packet truncated: pkt_len=%u exceeds mbuf capacity",
+					   mb->pkt_len);
+				++rxq->rx_errors;
+				rte_pktmbuf_free(mb);
+				goto resubmit;
+			}
+		}
+
+		if (unlikely(rtap_rx_offload(mb, hdr) < 0)) {
+			PMD_RX_LOG(ERR, "invalid rx offload");
+			++rxq->rx_errors;
+			rte_pktmbuf_free(mb);
+			goto resubmit;
+		}
+
+		mb->port = rxq->port_id;
+
+		__rte_mbuf_sanity_check(mb, 1);
+		num_bytes += mb->pkt_len;
+		bufs[num_rx++] = mb;
+
+resubmit:
+		/* Submit the replacement mbuf */
+		int n = rtap_rx_submit(rxq, fd, nmb);
+		if (unlikely(n < 0)) {
+			/* Hope that later Rx can recover */
+			PMD_RX_LOG(ERR, "io_uring no Rx sqe: %s", strerror(-n));
+			rxq->rx_errors++;
+			rte_pktmbuf_free(nmb);
+			break;
+		}
+
+		/* If using readv() then n > 0 and all sqe's have been queued. */
+		if (n > 0)
+			num_sqe = 0;
+		else
+			++num_sqe;
+
+		if (num_rx == nb_pkts)
+			break;
+	}
+	if (num_cqe > 0)
+		io_uring_cq_advance(&rxq->io_ring, num_cqe);
+
+	if (num_sqe > 0) {
+		int n = io_uring_submit(&rxq->io_ring);
+		if (unlikely(n < 0)) {
+			PMD_LOG(ERR, "Rx io_uring submit failed: %s", strerror(-n));
+		} else if (unlikely(n != (int)num_sqe)) {
+			PMD_RX_LOG(NOTICE, "Rx io_uring %d of %u resubmitted", n, num_sqe);
+		}
+	}
+
+	rxq->rx_packets += num_rx;
+	rxq->rx_bytes += num_bytes;
+
+	return num_rx;
+}
+
+int
+rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, uint16_t nb_rx_desc,
+		    unsigned int socket_id,
+		    const struct rte_eth_rxconf *rx_conf __rte_unused,
+		    struct rte_mempool *mb_pool)
+{
+	struct rte_mbuf **mbufs = NULL;
+	unsigned int nsqe = 0;
+	int fd = -1;
+
+	PMD_LOG(DEBUG, "setup port %u queue %u rx_descriptors %u",
+		dev->data->port_id, queue_id, nb_rx_desc);
+
+	struct rtap_rx_queue *rxq = rte_zmalloc_socket(NULL, sizeof(*rxq),
+						       RTE_CACHE_LINE_SIZE, socket_id);
+	if (rxq == NULL) {
+		PMD_LOG(ERR, "rxq alloc failed");
+		return -1;
+	}
+
+	rxq->mb_pool = mb_pool;
+	rxq->port_id = dev->data->port_id;
+	rxq->queue_id = queue_id;
+	dev->data->rx_queues[queue_id] = rxq;
+
+	if (io_uring_queue_init(nb_rx_desc, &rxq->io_ring, 0) != 0) {
+		PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+		goto error_rxq_free;
+	}
+
+	mbufs = calloc(nb_rx_desc, sizeof(struct rte_mbuf *));
+	if (mbufs == NULL) {
+		PMD_LOG(ERR, "Rx mbuf pointer alloc failed");
+		goto error_iouring_exit;
+	}
+
+	/* open shared tap fd maybe already setup */
+	if (rtap_queue_open(dev, queue_id) < 0)
+		goto error_bulk_free;
+
+	fd = rtap_queue_fd(rxq->port_id, rxq->queue_id);
+
+	for (uint16_t i = 0; i < nb_rx_desc; i++) {
+		mbufs[i] = rtap_rx_alloc(rxq);
+		if (mbufs[i] == NULL) {
+			PMD_LOG(ERR, "Rx mbuf alloc buf failed");
+			goto error_bulk_free;
+		}
+
+		int n = rtap_rx_submit(rxq, fd, mbufs[i]);
+		if (n < 0) {
+			PMD_LOG(ERR, "rtap_rx_submit failed: %s", strerror(-n));
+			goto error_bulk_free;
+		}
+
+		/* If using readv() then n > 0 and all sqe's have been queued. */
+		if (n > 0)
+			nsqe = 0;
+		else
+			++nsqe;
+	}
+
+	if (nsqe > 0) {
+		int n = io_uring_submit(&rxq->io_ring);
+		if (n < 0) {
+			PMD_LOG(ERR, "Rx io_uring submit failed: %s", strerror(-n));
+			goto error_bulk_free;
+		}
+		if (n < (int)nsqe)
+			PMD_LOG(NOTICE, "Rx io_uring partial submit %d of %u", n, nb_rx_desc);
+	}
+
+	free(mbufs);
+	return 0;
+
+error_bulk_free:
+	/* can't use bulk free here because some of mbufs[] maybe NULL */
+	for (uint16_t i = 0; i < nb_rx_desc; i++) {
+		if (mbufs[i] != NULL)
+			rte_pktmbuf_free(mbufs[i]);
+	}
+	rtap_queue_close(dev, queue_id);
+	free(mbufs);
+error_iouring_exit:
+	io_uring_queue_exit(&rxq->io_ring);
+error_rxq_free:
+	rte_free(rxq);
+	return -1;
+}
+
+/*
+ * Cancel all pending io_uring operations and drain completions.
+ * Uses IORING_ASYNC_CANCEL_ALL to cancel all operations at once.
+ * Returns the number of mbufs freed.
+ */
+static unsigned int
+rtap_cancel_all(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	unsigned int head, num_freed = 0;
+	unsigned int ready;
+	int ret;
+
+	/* Cancel all pending operations using CANCEL_ALL flag */
+	sqe = io_uring_get_sqe(ring);
+	if (sqe != NULL) {
+		/* IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_ANY cancels all ops */
+		io_uring_prep_cancel(sqe, NULL,
+				     IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_ANY);
+		io_uring_sqe_set_data(sqe, NULL);
+		ret = io_uring_submit(ring);
+		if (ret < 0)
+			PMD_LOG(ERR, "cancel submit failed: %s", strerror(-ret));
+	}
+
+	/*
+	 * One blocking wait to let the kernel deliver the cancel CQE
+	 * and the CQEs for all cancelled operations.
+	 */
+	io_uring_submit_and_wait(ring, 1);
+
+	/*
+	 * Drain all CQEs non-blocking.  Cancellation of many pending
+	 * operations may produce CQEs in waves; keep polling until the
+	 * CQ is empty.
+	 */
+	for (unsigned int retries = 0; retries < 10; retries++) {
+		ready = io_uring_cq_ready(ring);
+		if (ready == 0)
+			break;
+
+		io_uring_for_each_cqe(ring, head, cqe) {
+			struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+
+			/* Skip the cancel operation's own CQE (user_data = NULL) */
+			if (mb != NULL) {
+				rte_pktmbuf_free(mb);
+				++num_freed;
+			}
+		}
+
+		/* Advance past all processed CQEs */
+		io_uring_cq_advance(ring, ready);
+	}
+
+	return num_freed;
+}
+
+void
+rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+	struct rtap_rx_queue *rxq = dev->data->rx_queues[queue_id];
+
+	if (rxq == NULL)
+		return;
+
+	rtap_cancel_all(&rxq->io_ring);
+	io_uring_queue_exit(&rxq->io_ring);
+
+	rte_free(rxq);
+
+	/* Close the shared TAP fd if the tx queue is already gone */
+	if (queue_id >= dev->data->nb_tx_queues ||
+	    dev->data->tx_queues[queue_id] == NULL)
+		rtap_queue_close(dev, queue_id);
+}
+
+int
+rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+		    uint16_t nb_tx_desc, unsigned int socket_id,
+		    const struct rte_eth_txconf *tx_conf)
+{
+	/* open shared tap fd maybe already setup */
+	if (rtap_queue_open(dev, queue_id) < 0)
+		return -1;
+
+	struct rtap_tx_queue *txq = rte_zmalloc_socket(NULL, sizeof(*txq),
+						       RTE_CACHE_LINE_SIZE, socket_id);
+	if (txq == NULL) {
+		PMD_LOG(ERR, "txq alloc failed");
+		return -1;
+	}
+
+	txq->port_id = dev->data->port_id;
+	txq->queue_id = queue_id;
+	txq->free_thresh = tx_conf->tx_free_thresh;
+	dev->data->tx_queues[queue_id] = txq;
+
+	if (io_uring_queue_init(nb_tx_desc, &txq->io_ring, 0) != 0) {
+		PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+		rte_free(txq);
+		return -1;
+	}
+
+	return 0;
+}
+
+static void
+rtap_tx_cleanup(struct rtap_tx_queue *txq)
+{
+	struct io_uring_cqe *cqe;
+	unsigned int head;
+	unsigned int num_cqe = 0;
+
+	io_uring_for_each_cqe(&txq->io_ring, head, cqe) {
+		struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+
+		++num_cqe;
+
+		/* Skip CQEs with NULL user_data (e.g., cancel operations) */
+		if (mb == NULL)
+			continue;
+
+		PMD_TX_LOG(DEBUG, " mbuf len %u result: %d", mb->pkt_len, cqe->res);
+		txq->tx_errors += (cqe->res < 0);
+		rte_pktmbuf_free(mb);
+	}
+	io_uring_cq_advance(&txq->io_ring, num_cqe);
+}
+
+void
+rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+	struct rtap_tx_queue *txq = dev->data->tx_queues[queue_id];
+
+	if (txq == NULL)
+		return;
+
+	/* First drain any completed TX operations */
+	rtap_tx_cleanup(txq);
+
+	/* Cancel all remaining pending operations and free mbufs */
+	rtap_cancel_all(&txq->io_ring);
+	io_uring_queue_exit(&txq->io_ring);
+
+	rte_free(txq);
+
+	/* Close the shared TAP fd if the rx queue is already gone */
+	if (queue_id >= dev->data->nb_rx_queues ||
+	    dev->data->rx_queues[queue_id] == NULL)
+		rtap_queue_close(dev, queue_id);
+}
+
+/* Convert mbuf offload flags to virtio net header */
+static void
+rtap_tx_offload(struct virtio_net_hdr *hdr, const struct rte_mbuf *m)
+{
+	uint64_t csum_l4 = m->ol_flags & RTE_MBUF_F_TX_L4_MASK;
+	uint16_t o_l23_len = (m->ol_flags & RTE_MBUF_F_TX_TUNNEL_MASK) ?
+			     m->outer_l2_len + m->outer_l3_len : 0;
+
+	memset(hdr, 0, sizeof(*hdr));
+
+	if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
+		csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
+
+	switch (csum_l4) {
+	case RTE_MBUF_F_TX_UDP_CKSUM:
+		hdr->csum_start = o_l23_len + m->l2_len + m->l3_len;
+		hdr->csum_offset = offsetof(struct rte_udp_hdr, dgram_cksum);
+		hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+		break;
+
+	case RTE_MBUF_F_TX_TCP_CKSUM:
+		hdr->csum_start = o_l23_len + m->l2_len + m->l3_len;
+		hdr->csum_offset = offsetof(struct rte_tcp_hdr, cksum);
+		hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+		break;
+	}
+
+	/* TCP Segmentation Offload */
+	if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
+		hdr->gso_type = (m->ol_flags & RTE_MBUF_F_TX_IPV6) ?
+			VIRTIO_NET_HDR_GSO_TCPV6 :
+			VIRTIO_NET_HDR_GSO_TCPV4;
+		hdr->gso_size = m->tso_segsz;
+		hdr->hdr_len = o_l23_len + m->l2_len + m->l3_len + m->l4_len;
+	}
+}
+
+/*
+ * Transmit burst posts mbufs into the io_uring TAP file descriptor
+ * by creating queue elements with write operation.
+ *
+ * The driver mimics the behavior of a real hardware NIC.
+ *
+ * If there is no space left in the io_uring then the driver will return the number of
+ * mbuf's that were processed to that point. The application can then decide to retry
+ * later or drop the unsent packets in case of backpressue.
+ *
+ * The transmit process puts the virtio header before the data. In some cases, a new mbuf
+ * is required from same pool as original; but if that fails, the packet is not sent and
+ * is silently dropped. This is to avoid situation where pool is so small that transmit
+ * gets stuck when pool resources are very low.
+ */
+uint16_t
+rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+	struct rtap_tx_queue *txq = queue;
+	uint16_t i, num_tx = 0;
+	uint32_t num_tx_bytes = 0;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	if (io_uring_sq_space_left(&txq->io_ring) < RTE_MAX(txq->free_thresh, nb_pkts))
+		rtap_tx_cleanup(txq);
+
+	int fd = rtap_queue_fd(txq->port_id, txq->queue_id);
+
+	for (i = 0; i < nb_pkts; i++) {
+		struct rte_mbuf *mb = bufs[i];
+		struct virtio_net_hdr *hdr;
+
+		/* Use packet head room space for virtio header (if possible) */
+		if (rte_mbuf_refcnt_read(mb) == 1 && RTE_MBUF_DIRECT(mb) &&
+		    rte_pktmbuf_headroom(mb) >= sizeof(*hdr)) {
+			hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr *, -sizeof(*hdr));
+		} else {
+			/* Need to chain a new mbuf to make room for virtio header */
+			struct rte_mbuf *mh = rte_pktmbuf_alloc(mb->pool);
+			if (unlikely(mh == NULL)) {
+				PMD_TX_LOG(DEBUG, "mbuf pool exhausted on transmit");
+				rte_pktmbuf_free(mb);
+				++txq->tx_errors;
+				continue;
+			}
+
+			/* The packet headroom should be available in newly allocated mbuf */
+			RTE_ASSERT(rte_pktmbuf_headroom(mh) >= sizeof(*hdr));
+
+			hdr = rte_pktmbuf_mtod_offset(mh, struct virtio_net_hdr *, -sizeof(*hdr));
+			mh->next = mb;
+			mh->nb_segs = mb->nb_segs + 1;
+			mh->pkt_len = mb->pkt_len;
+			mh->ol_flags = mb->ol_flags & RTE_MBUF_F_TX_OFFLOAD_MASK;
+			mb = mh;
+		}
+
+		struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring);
+		if (sqe == NULL) {
+			/* Drop header mbuf if it was used */
+			if (mb != bufs[i])
+				rte_pktmbuf_free_seg(mb);
+			break;	/* submit ring is full */
+		}
+
+		/* Note: transmit bytes does not include virtio header */
+		num_tx_bytes += mb->pkt_len;
+
+		io_uring_sqe_set_data(sqe, mb);
+		rtap_tx_offload(hdr, mb);
+
+		PMD_TX_LOG(DEBUG, "write m=%p segs=%u", mb, mb->nb_segs);
+
+		/* Start of data written to kernel includes virtio net header */
+		void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(*hdr));
+		unsigned int nbytes = sizeof(struct virtio_net_hdr) + mb->data_len;
+
+		if (mb->nb_segs == 1) {
+			/* Single segment mbuf can go as write and batched */
+			io_uring_prep_write(sqe, fd, buf, nbytes, 0);
+			++num_tx;
+		} else {
+			/* Mult-segment mbuf needs scatter/gather */
+			struct iovec iovs[RTE_MBUF_MAX_NB_SEGS + 1];
+			unsigned int niov = mb->nb_segs;
+
+			iovs[0].iov_base = buf;
+			iovs[0].iov_len = nbytes;
+
+			for (unsigned int v = 1; v < niov; v++) {
+				mb = mb->next;
+				iovs[v].iov_base = rte_pktmbuf_mtod(mb, void *);
+				iovs[v].iov_len = mb->data_len;
+			}
+
+			io_uring_prep_writev(sqe, fd, iovs, niov, 0);
+
+			/*
+			 * For writev, submit now since iovs[] is on the stack
+			 * and must remain valid until submitted.
+			 * This also submits any previously batched single-seg writes.
+			 */
+			int err = io_uring_submit(&txq->io_ring);
+			if (unlikely(err < 0)) {
+				PMD_TX_LOG(ERR, "Tx io_uring submit failed: %s", strerror(-err));
+				++txq->tx_errors;
+			}
+
+			num_tx = 0;
+		}
+	}
+
+	if (likely(num_tx > 0)) {
+		int err = io_uring_submit(&txq->io_ring);
+		if (unlikely(err < 0)) {
+			PMD_LOG(ERR, "Tx io_uring submit failed: %s", strerror(-err));
+			++txq->tx_errors;
+		}
+	}
+
+	txq->tx_packets += i;
+	txq->tx_bytes += num_tx_bytes;
+
+	return i;
+}
-- 
2.51.0



More information about the dev mailing list