[PATCH v4 06/10] net/ioring: implement receive and transmit
Stephen Hemminger
stephen at networkplumber.org
Thu Mar 13 22:50:57 CET 2025
Use io_uring to read and write from TAP device.
Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
drivers/net/ioring/rte_eth_ioring.c | 366 +++++++++++++++++++++++++++-
1 file changed, 365 insertions(+), 1 deletion(-)
diff --git a/drivers/net/ioring/rte_eth_ioring.c b/drivers/net/ioring/rte_eth_ioring.c
index f01db960a7..2f049e4c4f 100644
--- a/drivers/net/ioring/rte_eth_ioring.c
+++ b/drivers/net/ioring/rte_eth_ioring.c
@@ -2,6 +2,7 @@
* Copyright (c) Stephen Hemminger
*/
+#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
@@ -9,8 +10,10 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <liburing.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <sys/uio.h>
#include <net/if.h>
#include <linux/if.h>
#include <linux/if_arp.h>
@@ -27,6 +30,12 @@
#include <rte_kvargs.h>
#include <rte_log.h>
+#define IORING_DEFAULT_BURST 64
+#define IORING_NUM_BUFFERS 1024
+#define IORING_MAX_QUEUES 128
+
+static_assert(IORING_MAX_QUEUES <= RTE_MP_MAX_FD_NUM, "Max queues exceeds MP fd limit");
+
#define IORING_DEFAULT_IFNAME "itap%d"
#define IORING_MP_KEY "ioring_mp_send_fds"
@@ -34,6 +43,20 @@ RTE_LOG_REGISTER_DEFAULT(ioring_logtype, NOTICE);
#define RTE_LOGTYPE_IORING ioring_logtype
#define PMD_LOG(level, ...) RTE_LOG_LINE_PREFIX(level, IORING, "%s(): ", __func__, __VA_ARGS__)
+#ifdef RTE_ETHDEV_DEBUG_RX
+#define PMD_RX_LOG(level, ...) \
+ RTE_LOG_LINE_PREFIX(level, IORING, "%s() rx: ", __func__, __VA_ARGS__)
+#else
+#define PMD_RX_LOG(...) do { } while (0)
+#endif
+
+#ifdef RTE_ETHDEV_DEBUG_TX
+#define PMD_TX_LOG(level, ...) \
+ RTE_LOG_LINE_PREFIX(level, IORING, "%s() tx: ", __func__, __VA_ARGS__)
+#else
+#define PMD_TX_LOG(...) do { } while (0)
+#endif
+
#define IORING_IFACE_ARG "iface"
#define IORING_PERSIST_ARG "persist"
@@ -43,6 +66,30 @@ static const char * const valid_arguments[] = {
NULL
};
+struct rx_queue {
+ struct rte_mempool *mb_pool; /* rx buffer pool */
+ struct io_uring io_ring; /* queue of posted read's */
+ uint16_t port_id;
+ uint16_t queue_id;
+
+ uint64_t rx_packets;
+ uint64_t rx_bytes;
+ uint64_t rx_nombuf;
+ uint64_t rx_errors;
+};
+
+struct tx_queue {
+ struct io_uring io_ring;
+
+ uint16_t port_id;
+ uint16_t queue_id;
+ uint16_t free_thresh;
+
+ uint64_t tx_packets;
+ uint64_t tx_bytes;
+ uint64_t tx_errors;
+};
+
struct pmd_internals {
int keep_fd; /* keep alive file descriptor */
char ifname[IFNAMSIZ]; /* name assigned by kernel */
@@ -300,6 +347,15 @@ eth_dev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
dev_info->if_index = if_nametoindex(pmd->ifname);
dev_info->max_mac_addrs = 1;
dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN;
+ dev_info->max_rx_queues = IORING_MAX_QUEUES;
+ dev_info->max_tx_queues = IORING_MAX_QUEUES;
+ dev_info->min_rx_bufsize = 0;
+
+ dev_info->default_rxportconf = (struct rte_eth_dev_portconf) {
+ .burst_size = IORING_DEFAULT_BURST,
+ .ring_size = IORING_NUM_BUFFERS,
+ .nb_queues = 1,
+ };
return 0;
}
@@ -311,6 +367,14 @@ eth_dev_close(struct rte_eth_dev *dev)
PMD_LOG(INFO, "Closing %s", pmd->ifname);
+ int *fds = dev->process_private;
+ for (uint16_t i = 0; i < dev->data->nb_rx_queues; i++) {
+ if (fds[i] == -1)
+ continue;
+ close(fds[i]);
+ fds[i] = -1;
+ }
+
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
@@ -324,6 +388,299 @@ eth_dev_close(struct rte_eth_dev *dev)
return 0;
}
+/* Setup another fd to TAP device for the queue */
+static int
+eth_queue_setup(struct rte_eth_dev *dev, const char *name, uint16_t queue_id)
+{
+ int *fds = dev->process_private;
+
+ if (fds[queue_id] != -1)
+ return 0; /* already setup */
+
+ struct ifreq ifr = { };
+ int tap_fd = tap_open(name, &ifr, 0);
+ if (tap_fd < 0) {
+ PMD_LOG(ERR, "tap_open failed");
+ return -1;
+ }
+
+ PMD_LOG(DEBUG, "opened %d for queue %u", tap_fd, queue_id);
+ fds[queue_id] = tap_fd;
+ return 0;
+}
+
+static int
+eth_queue_fd(uint16_t port_id, uint16_t queue_id)
+{
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ int *fds = dev->process_private;
+
+ return fds[queue_id];
+}
+
+/* setup an submit queue to read mbuf */
+static inline void
+eth_rx_submit(struct rx_queue *rxq, int fd, struct rte_mbuf *mb)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring);
+
+ if (unlikely(sqe == NULL)) {
+ PMD_LOG(DEBUG, "io_uring no rx sqe");
+ rxq->rx_errors++;
+ rte_pktmbuf_free(mb);
+ return;
+ }
+ io_uring_sqe_set_data(sqe, mb);
+
+ void *buf = rte_pktmbuf_mtod_offset(mb, void *, 0);
+ unsigned int nbytes = rte_pktmbuf_tailroom(mb);
+
+ io_uring_prep_read(sqe, fd, buf, nbytes, 0);
+}
+
+static uint16_t
+eth_ioring_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+ struct rx_queue *rxq = queue;
+ struct io_uring_cqe *cqe;
+ unsigned int head, num_cqe = 0;
+ uint16_t num_rx = 0;
+ uint32_t num_bytes = 0;
+ int fd = eth_queue_fd(rxq->port_id, rxq->queue_id);
+
+ io_uring_for_each_cqe(&rxq->io_ring, head, cqe) {
+ struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+ ssize_t len = cqe->res;
+
+ PMD_RX_LOG(DEBUG, "cqe %u len %zd", num_cqe, len);
+ num_cqe++;
+
+ if (unlikely(len < RTE_ETHER_HDR_LEN)) {
+ if (len < 0)
+ PMD_LOG(ERR, "io_uring_read: %s", strerror(-len));
+ else
+ PMD_LOG(ERR, "io_uring_read missing hdr");
+
+ rxq->rx_errors++;
+ goto resubmit;
+ }
+
+ struct rte_mbuf *nmb = rte_pktmbuf_alloc(rxq->mb_pool);
+ if (unlikely(nmb == 0)) {
+ PMD_LOG(DEBUG, "Rx mbuf alloc failed");
+ ++rxq->rx_nombuf;
+ goto resubmit;
+ }
+
+ mb->pkt_len = len;
+ mb->data_len = len;
+ mb->port = rxq->port_id;
+ __rte_mbuf_sanity_check(mb, 1);
+
+ num_bytes += len;
+ bufs[num_rx++] = mb;
+
+ mb = nmb;
+resubmit:
+ eth_rx_submit(rxq, fd, mb);
+
+ if (num_rx == nb_pkts)
+ break;
+ }
+ io_uring_cq_advance(&rxq->io_ring, num_cqe);
+
+ rxq->rx_packets += num_rx;
+ rxq->rx_bytes += num_bytes;
+ return num_rx;
+}
+
+static int
+eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, uint16_t nb_rx_desc,
+ unsigned int socket_id,
+ const struct rte_eth_rxconf *rx_conf __rte_unused,
+ struct rte_mempool *mb_pool)
+{
+ struct pmd_internals *pmd = dev->data->dev_private;
+
+ PMD_LOG(DEBUG, "setup port %u queue %u rx_descriptors %u",
+ dev->data->port_id, queue_id, nb_rx_desc);
+
+ /* open shared tap fd maybe already setup */
+ if (eth_queue_setup(dev, pmd->ifname, queue_id) < 0)
+ return -1;
+
+ struct rx_queue *rxq = rte_zmalloc_socket(NULL, sizeof(*rxq),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (rxq == NULL) {
+ PMD_LOG(ERR, "rxq alloc failed");
+ return -1;
+ }
+
+ rxq->mb_pool = mb_pool;
+ rxq->port_id = dev->data->port_id;
+ rxq->queue_id = queue_id;
+ dev->data->rx_queues[queue_id] = rxq;
+
+ if (io_uring_queue_init(nb_rx_desc, &rxq->io_ring, 0) != 0) {
+ PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+ return -1;
+ }
+
+ struct rte_mbuf **mbufs = alloca(nb_rx_desc * sizeof(struct rte_mbuf *));
+ if (mbufs == NULL) {
+ PMD_LOG(ERR, "alloca for %u failed", nb_rx_desc);
+ return -1;
+ }
+
+ if (rte_pktmbuf_alloc_bulk(mb_pool, mbufs, nb_rx_desc) < 0) {
+ PMD_LOG(ERR, "Rx mbuf alloc %u bufs failed", nb_rx_desc);
+ return -1;
+ }
+
+ int fd = eth_queue_fd(rxq->port_id, rxq->queue_id);
+ for (uint16_t i = 0; i < nb_rx_desc; i++)
+ eth_rx_submit(rxq, fd, mbufs[i]);
+
+ io_uring_submit(&rxq->io_ring);
+ return 0;
+}
+
+static void
+eth_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+ struct rx_queue *rxq = dev->data->rx_queues[queue_id];
+
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring);
+ if (sqe == NULL) {
+ PMD_LOG(ERR, "io_uring_get_sqe failed: %s", strerror(errno));
+ } else {
+ io_uring_prep_cancel(sqe, NULL, IORING_ASYNC_CANCEL_ANY);
+ io_uring_submit_and_wait(&rxq->io_ring, 1);
+ }
+
+ io_uring_queue_exit(&rxq->io_ring);
+}
+
+static int
+eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+ uint16_t nb_tx_desc, unsigned int socket_id,
+ const struct rte_eth_txconf *tx_conf)
+{
+ struct pmd_internals *pmd = dev->data->dev_private;
+
+ /* open shared tap fd maybe already setup */
+ if (eth_queue_setup(dev, pmd->ifname, queue_id) < 0)
+ return -1;
+
+ struct tx_queue *txq = rte_zmalloc_socket(NULL, sizeof(*txq),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (txq == NULL) {
+ PMD_LOG(ERR, "txq alloc failed");
+ return -1;
+ }
+
+ txq->port_id = dev->data->port_id;
+ txq->queue_id = queue_id;
+ txq->free_thresh = tx_conf->tx_free_thresh;
+ dev->data->tx_queues[queue_id] = txq;
+
+ if (io_uring_queue_init(nb_tx_desc, &txq->io_ring, 0) != 0) {
+ PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+static void
+eth_ioring_tx_cleanup(struct tx_queue *txq)
+{
+ struct io_uring_cqe *cqe;
+ unsigned int head;
+ unsigned int tx_done = 0;
+ uint64_t tx_bytes = 0;
+
+ io_uring_for_each_cqe(&txq->io_ring, head, cqe) {
+ struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+
+ PMD_TX_LOG(DEBUG, " mbuf len %u result: %d", mb->pkt_len, cqe->res);
+ if (unlikely(cqe->res < 0)) {
+ ++txq->tx_errors;
+ } else {
+ ++tx_done;
+ tx_bytes += mb->pkt_len;
+ }
+
+ rte_pktmbuf_free(mb);
+ }
+ io_uring_cq_advance(&txq->io_ring, tx_done);
+
+ txq->tx_packets += tx_done;
+ txq->tx_bytes += tx_bytes;
+}
+
+static void
+eth_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+ struct tx_queue *txq = dev->data->tx_queues[queue_id];
+
+ eth_ioring_tx_cleanup(txq);
+
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring);
+ if (sqe == NULL) {
+ PMD_LOG(ERR, "io_uring_get_sqe failed: %s", strerror(errno));
+ } else {
+ io_uring_prep_cancel(sqe, NULL, IORING_ASYNC_CANCEL_ANY);
+ io_uring_submit_and_wait(&txq->io_ring, 1);
+ }
+
+ io_uring_queue_exit(&txq->io_ring);
+}
+
+static uint16_t
+eth_ioring_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+ struct tx_queue *txq = queue;
+ uint16_t num_tx;
+
+ if (unlikely(nb_pkts == 0))
+ return 0;
+
+ PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+ if (io_uring_sq_space_left(&txq->io_ring) < txq->free_thresh)
+ eth_ioring_tx_cleanup(txq);
+
+ int fd = eth_queue_fd(txq->port_id, txq->queue_id);
+
+ for (num_tx = 0; num_tx < nb_pkts; num_tx++) {
+ struct rte_mbuf *mb = bufs[num_tx];
+
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring);
+ if (sqe == NULL)
+ break; /* submit ring is full */
+
+ io_uring_sqe_set_data(sqe, mb);
+
+ if (rte_mbuf_refcnt_read(mb) == 1 &&
+ RTE_MBUF_DIRECT(mb) && mb->nb_segs == 1) {
+ void *base = rte_pktmbuf_mtod(mb, void *);
+ io_uring_prep_write(sqe, fd, base, mb->pkt_len, 0);
+
+ PMD_TX_LOG(DEBUG, "tx mbuf: %p submit", mb);
+ } else {
+ PMD_LOG(ERR, "Can't do mbuf without space yet!");
+ ++txq->tx_errors;
+ continue;
+ }
+ }
+
+ if (likely(num_tx > 0))
+ io_uring_submit(&txq->io_ring);
+
+ return num_tx;
+}
+
static const struct eth_dev_ops ops = {
.dev_start = eth_dev_start,
.dev_stop = eth_dev_stop,
@@ -339,9 +696,12 @@ static const struct eth_dev_ops ops = {
.promiscuous_disable = eth_dev_promiscuous_disable,
.allmulticast_enable = eth_dev_allmulticast_enable,
.allmulticast_disable = eth_dev_allmulticast_disable,
+ .rx_queue_setup = eth_rx_queue_setup,
+ .rx_queue_release = eth_rx_queue_release,
+ .tx_queue_setup = eth_tx_queue_setup,
+ .tx_queue_release = eth_tx_queue_release,
};
-
static int
ioring_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist)
{
@@ -379,6 +739,10 @@ ioring_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist)
}
PMD_LOG(DEBUG, "%s setup", ifr.ifr_name);
+
+ dev->rx_pkt_burst = eth_ioring_rx;
+ dev->tx_pkt_burst = eth_ioring_tx;
+
return 0;
error:
--
2.47.2
More information about the dev
mailing list