[dpdk-dev] [RFC PATCH DRAFT 1/2] ethdev: add buffered single pkt TX function to API

Bruce Richardson bruce.richardson at intel.com
Wed Jun 25 00:32:15 CEST 2014


Many sample apps include internal buffering for single-packet-at-a-time
operation. Since this is such a common paradigm, this functionality is
better suited to being inside the core ethdev API.
The new APIs include three functions:
* rte_eth_tx_buffer - buffer up a single packet for future transmission
* rte_eth_tx_buffer_flush - flush any unsent buffered packets
* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in
  case transmitting a buffered burst fails. By default, we just free the
  unsent packets.
---
 config/common_bsdapp          |  1 +
 config/common_linuxapp        |  1 +
 lib/librte_ether/rte_ethdev.c | 55 +++++++++++++++++++++++++++++++++++--
 lib/librte_ether/rte_ethdev.h | 63 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 118 insertions(+), 2 deletions(-)

diff --git a/config/common_bsdapp b/config/common_bsdapp
index 989e1da..98da1f5 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -138,6 +138,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
 CONFIG_RTE_MAX_ETHPORTS=32
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
 
 #
 # Compile burst-oriented IGB & EM PMD drivers
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 5b896c3..0f509d0 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -161,6 +161,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
 CONFIG_RTE_MAX_ETHPORTS=32
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
 
 #
 # Support NIC bypass logic
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 7256841..68d4d22 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -397,11 +397,32 @@ rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t tx_queue_id)
 
 }
 
+static void
+free_unsent_pkts(struct rte_mbuf **pkts, uint16_t unsent,
+		void *userdata __rte_unused)
+{
+	unsigned i;
+	for (i = 0; i < unsent; i++)
+		rte_pktmbuf_free(pkts[i]);
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+		buffer_tx_error_fn cbfn, void *userdata)
+{
+	struct rte_eth_dev_data *dev_data = rte_eth_devices[port_id].data;
+	struct rte_eth_dev_tx_buffer *buf = dev_data->tx_queues[queue_id];
+
+	buf->userdata = userdata;
+	buf->flush_cb = cbfn;
+}
+
 static int
 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
 	uint16_t old_nb_queues = dev->data->nb_tx_queues;
 	void **txq;
+	struct rte_eth_dev_tx_buffer *new_bufs;
 	unsigned i;
 
 	if (dev->data->tx_queues == NULL) { /* first time configuration */
@@ -412,24 +433,54 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 			dev->data->nb_tx_queues = 0;
 			return -(ENOMEM);
 		}
+
+		dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+		if (dev->data->txq_bufs == NULL) {
+			dev->data->nb_tx_queues = 0;
+			rte_free(dev->data->tx_queues);
+			return -(ENOMEM);
+		}
+		for (i = 0; i < nb_queues; i++ )
+			dev->data->txq_bufs[i].flush_cb = free_unsent_pkts;
 	} else { /* re-configure */
+
+		/* flush the packets queued for all queues*/
+		for (i = 0; i < old_nb_queues; i++)
+			rte_eth_tx_buffer_flush(dev->data->port_id, i);
+
 		FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
 
+		/* get new buffer space first, but keep old space around */
+		new_bufs = rte_zmalloc("ethdev->txq_bufs",
+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+		if (new_bufs == NULL)
+			return -(ENOMEM);
+
 		txq = dev->data->tx_queues;
 
 		for (i = nb_queues; i < old_nb_queues; i++)
 			(*dev->dev_ops->tx_queue_release)(txq[i]);
 		txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
 				CACHE_LINE_SIZE);
-		if (txq == NULL)
+		if (txq == NULL) {
+			rte_free(new_bufs);
 			return -(ENOMEM);
+		}
 
-		if (nb_queues > old_nb_queues)
+		if (nb_queues > old_nb_queues) {
 			memset(txq + old_nb_queues, 0,
 				sizeof(txq[0]) * (nb_queues - old_nb_queues));
+			for (i = old_nb_queues; i < nb_queues; i++)
+				dev->data->txq_bufs[i].flush_cb =
+						free_unsent_pkts;
+		}
 
 		dev->data->tx_queues = txq;
 
+		/* now replace old buffers with new */
+		rte_free(dev->data->txq_bufs);
+		dev->data->txq_bufs = new_bufs;
 	}
 	dev->data->nb_tx_queues = nb_queues;
 	return (0);
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 2406e45..0ec7076 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -176,6 +176,7 @@ extern "C" {
 #include <rte_interrupts.h>
 #include <rte_pci.h>
 #include <rte_mbuf.h>
+#include <rte_branch_prediction.h>
 #include "rte_ether.h"
 
 /**
@@ -1497,6 +1498,16 @@ struct rte_eth_dev_sriov {
 };
 #define RTE_ETH_DEV_SRIOV(dev)         ((dev)->data->sriov)
 
+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
+		void *userdata);
+
+struct rte_eth_dev_tx_buffer {
+	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
+	unsigned nb_pkts;
+	buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
+	void *userdata;              /* userdata for callback */
+};
+
 /**
  * @internal
  * The data part, with no function pointers, associated with each ethernet device.
@@ -1534,6 +1545,9 @@ struct rte_eth_dev_data {
 		scattered_rx : 1,  /**< RX of scattered packets is ON(1) / OFF(0) */
 		all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
 		dev_started : 1;   /**< Device state: STARTED(1) / STOPPED(0). */
+
+	struct rte_eth_dev_tx_buffer
+			*txq_bufs; /**< space to allow buffered transmits */
 };
 
 /**
@@ -2426,6 +2440,55 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
 }
 #endif
 
+static inline uint16_t
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
+{
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+	qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
+	if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
+		return 0;
+
+	const uint16_t sent = (*dev->tx_pkt_burst)(
+			dev->data->tx_queues[queue_id], qbuf->pkts,
+			RTE_ETHDEV_TX_BUFSIZE);
+
+	qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+	                    * callback below */
+	if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE))
+		qbuf->flush_cb(&qbuf->pkts[sent], RTE_ETHDEV_TX_BUFSIZE - sent,
+				qbuf->userdata);
+
+	return sent;
+}
+
+static inline uint16_t
+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
+{
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+	if (qbuf->nb_pkts != 0)
+		return 0;
+
+	const uint16_t to_send = qbuf->nb_pkts;
+	const uint16_t sent = (*dev->tx_pkt_burst)(
+			dev->data->tx_queues[queue_id], qbuf->pkts, to_send);
+
+	qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+	                    * callback below */
+	if (unlikely(sent != qbuf->nb_pkts))
+		qbuf->flush_cb(&qbuf->pkts[sent], to_send - sent,
+				qbuf->userdata);
+
+	return sent;
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+		buffer_tx_error_fn cbfn, void *userdata);
+
 /**
  * Setup a new signature filter rule on an Ethernet device
  *
-- 
1.9.3



More information about the dev mailing list