[dpdk-dev] [RFC PATCH DRAFT 1/2] ethdev: add buffered single pkt TX function to API
Bruce Richardson
bruce.richardson at intel.com
Wed Jun 25 00:32:15 CEST 2014
Many sample apps include internal buffering for single-packet-at-a-time
operation. Since this is such a common paradigm, this functionality is
better suited to being inside the core ethdev API.
The new APIs include three functions:
* rte_eth_tx_buffer - buffer up a single packet for future transmission
* rte_eth_tx_buffer_flush - flush any unsent buffered packets
* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in
case transmitting a buffered burst fails. By default, we just free the
unsent packets.
---
config/common_bsdapp | 1 +
config/common_linuxapp | 1 +
lib/librte_ether/rte_ethdev.c | 55 +++++++++++++++++++++++++++++++++++--
lib/librte_ether/rte_ethdev.h | 63 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 118 insertions(+), 2 deletions(-)
diff --git a/config/common_bsdapp b/config/common_bsdapp
index 989e1da..98da1f5 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -138,6 +138,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
CONFIG_RTE_MAX_ETHPORTS=32
CONFIG_RTE_LIBRTE_IEEE1588=n
CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
#
# Compile burst-oriented IGB & EM PMD drivers
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 5b896c3..0f509d0 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -161,6 +161,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
CONFIG_RTE_MAX_ETHPORTS=32
CONFIG_RTE_LIBRTE_IEEE1588=n
CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
#
# Support NIC bypass logic
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 7256841..68d4d22 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -397,11 +397,32 @@ rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t tx_queue_id)
}
+static void
+free_unsent_pkts(struct rte_mbuf **pkts, uint16_t unsent,
+ void *userdata __rte_unused)
+{
+ unsigned i;
+ for (i = 0; i < unsent; i++)
+ rte_pktmbuf_free(pkts[i]);
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+ buffer_tx_error_fn cbfn, void *userdata)
+{
+ struct rte_eth_dev_data *dev_data = rte_eth_devices[port_id].data;
+ struct rte_eth_dev_tx_buffer *buf = dev_data->tx_queues[queue_id];
+
+ buf->userdata = userdata;
+ buf->flush_cb = cbfn;
+}
+
static int
rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
{
uint16_t old_nb_queues = dev->data->nb_tx_queues;
void **txq;
+ struct rte_eth_dev_tx_buffer *new_bufs;
unsigned i;
if (dev->data->tx_queues == NULL) { /* first time configuration */
@@ -412,24 +433,54 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
dev->data->nb_tx_queues = 0;
return -(ENOMEM);
}
+
+ dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
+ sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+ if (dev->data->txq_bufs == NULL) {
+ dev->data->nb_tx_queues = 0;
+ rte_free(dev->data->tx_queues);
+ return -(ENOMEM);
+ }
+ for (i = 0; i < nb_queues; i++ )
+ dev->data->txq_bufs[i].flush_cb = free_unsent_pkts;
} else { /* re-configure */
+
+ /* flush the packets queued for all queues*/
+ for (i = 0; i < old_nb_queues; i++)
+ rte_eth_tx_buffer_flush(dev->data->port_id, i);
+
FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+ /* get new buffer space first, but keep old space around */
+ new_bufs = rte_zmalloc("ethdev->txq_bufs",
+ sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+ if (new_bufs == NULL)
+ return -(ENOMEM);
+
txq = dev->data->tx_queues;
for (i = nb_queues; i < old_nb_queues; i++)
(*dev->dev_ops->tx_queue_release)(txq[i]);
txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
CACHE_LINE_SIZE);
- if (txq == NULL)
+ if (txq == NULL) {
+ rte_free(new_bufs);
return -(ENOMEM);
+ }
- if (nb_queues > old_nb_queues)
+ if (nb_queues > old_nb_queues) {
memset(txq + old_nb_queues, 0,
sizeof(txq[0]) * (nb_queues - old_nb_queues));
+ for (i = old_nb_queues; i < nb_queues; i++)
+ dev->data->txq_bufs[i].flush_cb =
+ free_unsent_pkts;
+ }
dev->data->tx_queues = txq;
+ /* now replace old buffers with new */
+ rte_free(dev->data->txq_bufs);
+ dev->data->txq_bufs = new_bufs;
}
dev->data->nb_tx_queues = nb_queues;
return (0);
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 2406e45..0ec7076 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -176,6 +176,7 @@ extern "C" {
#include <rte_interrupts.h>
#include <rte_pci.h>
#include <rte_mbuf.h>
+#include <rte_branch_prediction.h>
#include "rte_ether.h"
/**
@@ -1497,6 +1498,16 @@ struct rte_eth_dev_sriov {
};
#define RTE_ETH_DEV_SRIOV(dev) ((dev)->data->sriov)
+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
+ void *userdata);
+
+struct rte_eth_dev_tx_buffer {
+ struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
+ unsigned nb_pkts;
+ buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
+ void *userdata; /* userdata for callback */
+};
+
/**
* @internal
* The data part, with no function pointers, associated with each ethernet device.
@@ -1534,6 +1545,9 @@ struct rte_eth_dev_data {
scattered_rx : 1, /**< RX of scattered packets is ON(1) / OFF(0) */
all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
dev_started : 1; /**< Device state: STARTED(1) / STOPPED(0). */
+
+ struct rte_eth_dev_tx_buffer
+ *txq_bufs; /**< space to allow buffered transmits */
};
/**
@@ -2426,6 +2440,55 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
}
#endif
+static inline uint16_t
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
+{
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+ qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
+ if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
+ return 0;
+
+ const uint16_t sent = (*dev->tx_pkt_burst)(
+ dev->data->tx_queues[queue_id], qbuf->pkts,
+ RTE_ETHDEV_TX_BUFSIZE);
+
+ qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+ * callback below */
+ if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE))
+ qbuf->flush_cb(&qbuf->pkts[sent], RTE_ETHDEV_TX_BUFSIZE - sent,
+ qbuf->userdata);
+
+ return sent;
+}
+
+static inline uint16_t
+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
+{
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+ if (qbuf->nb_pkts != 0)
+ return 0;
+
+ const uint16_t to_send = qbuf->nb_pkts;
+ const uint16_t sent = (*dev->tx_pkt_burst)(
+ dev->data->tx_queues[queue_id], qbuf->pkts, to_send);
+
+ qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+ * callback below */
+ if (unlikely(sent != qbuf->nb_pkts))
+ qbuf->flush_cb(&qbuf->pkts[sent], to_send - sent,
+ qbuf->userdata);
+
+ return sent;
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+ buffer_tx_error_fn cbfn, void *userdata);
+
/**
* Setup a new signature filter rule on an Ethernet device
*
--
1.9.3
More information about the dev
mailing list