[dpdk-dev] [PATCH v4 2/2] examples: rework to use buffered tx

Tomasz Kulasek tomaszx.kulasek at intel.com
Thu Mar 10 18:19:35 CET 2016


The internal buffering of packets for TX in sample apps is no longer
needed, so this patchset also replaces this code with calls to the new
rte_eth_tx_buffer* APIs in:

* l2fwd-jobstats
* l2fwd-keepalive
* l2fwd
* l3fwd-acl
* l3fwd-power
* link_status_interrupt
* client_server_mp
* l2fwd_fork
* packet_ordering
* qos_meter

v3 changes
 - updated due to the change of callback name

v2 changes
 - rework synced with tx buffer API changes

Signed-off-by: Tomasz Kulasek <tomaszx.kulasek at intel.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
---
 examples/l2fwd-jobstats/main.c                     |  104 +++++++----------
 examples/l2fwd-keepalive/main.c                    |  100 ++++++----------
 examples/l2fwd/main.c                              |  104 +++++++----------
 examples/l3fwd-acl/main.c                          |   92 ++++++---------
 examples/l3fwd-power/main.c                        |   89 ++++++--------
 examples/link_status_interrupt/main.c              |  107 +++++++----------
 .../client_server_mp/mp_client/client.c            |  101 +++++++++-------
 examples/multi_process/l2fwd_fork/main.c           |   97 +++++++---------
 examples/packet_ordering/main.c                    |  122 ++++++++++++++------
 examples/qos_meter/main.c                          |   61 +++-------
 10 files changed, 436 insertions(+), 541 deletions(-)

diff --git a/examples/l2fwd-jobstats/main.c b/examples/l2fwd-jobstats/main.c
index 6da60e0..d1e9bf7 100644
--- a/examples/l2fwd-jobstats/main.c
+++ b/examples/l2fwd-jobstats/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@
 #include <rte_alarm.h>
 #include <rte_common.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memcpy.h>
 #include <rte_memzone.h>
@@ -97,18 +98,12 @@ static uint32_t l2fwd_dst_ports[RTE_MAX_ETHPORTS];
 
 static unsigned int l2fwd_rx_queue_per_lcore = 1;
 
-struct mbuf_table {
-	uint64_t next_flush_time;
-	unsigned len;
-	struct rte_mbuf *mbufs[MAX_PKT_BURST];
-};
-
 #define MAX_RX_QUEUE_PER_LCORE 16
 #define MAX_TX_QUEUE_PER_PORT 16
 struct lcore_queue_conf {
 	unsigned n_rx_port;
 	unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
+	uint64_t next_flush_time[RTE_MAX_ETHPORTS];
 
 	struct rte_timer rx_timers[MAX_RX_QUEUE_PER_LCORE];
 	struct rte_jobstats port_fwd_jobs[MAX_RX_QUEUE_PER_LCORE];
@@ -123,6 +118,8 @@ struct lcore_queue_conf {
 } __rte_cache_aligned;
 struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
 
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
 static const struct rte_eth_conf port_conf = {
 	.rxmode = {
 		.split_hdr_size = 0,
@@ -373,59 +370,14 @@ show_stats_cb(__rte_unused void *param)
 	rte_eal_alarm_set(timer_period * US_PER_S, show_stats_cb, NULL);
 }
 
-/* Send the burst of packets on an output interface */
-static void
-l2fwd_send_burst(struct lcore_queue_conf *qconf, uint8_t port)
-{
-	struct mbuf_table *m_table;
-	uint16_t ret;
-	uint16_t queueid = 0;
-	uint16_t n;
-
-	m_table = &qconf->tx_mbufs[port];
-	n = m_table->len;
-
-	m_table->next_flush_time = rte_get_timer_cycles() + drain_tsc;
-	m_table->len = 0;
-
-	ret = rte_eth_tx_burst(port, queueid, m_table->mbufs, n);
-
-	port_statistics[port].tx += ret;
-	if (unlikely(ret < n)) {
-		port_statistics[port].dropped += (n - ret);
-		do {
-			rte_pktmbuf_free(m_table->mbufs[ret]);
-		} while (++ret < n);
-	}
-}
-
-/* Enqueue packets for TX and prepare them to be sent */
-static int
-l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
-{
-	const unsigned lcore_id = rte_lcore_id();
-	struct lcore_queue_conf *qconf = &lcore_queue_conf[lcore_id];
-	struct mbuf_table *m_table = &qconf->tx_mbufs[port];
-	uint16_t len = qconf->tx_mbufs[port].len;
-
-	m_table->mbufs[len] = m;
-
-	len++;
-	m_table->len = len;
-
-	/* Enough pkts to be sent. */
-	if (unlikely(len == MAX_PKT_BURST))
-		l2fwd_send_burst(qconf, port);
-
-	return 0;
-}
-
 static void
 l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 {
 	struct ether_hdr *eth;
 	void *tmp;
+	int sent;
 	unsigned dst_port;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	dst_port = l2fwd_dst_ports[portid];
 	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -437,7 +389,10 @@ l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 	/* src addr */
 	ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], &eth->s_addr);
 
-	l2fwd_send_packet(m, (uint8_t) dst_port);
+	buffer = tx_buffer[dst_port];
+	sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
+	if (sent)
+		port_statistics[dst_port].tx += sent;
 }
 
 static void
@@ -511,8 +466,10 @@ l2fwd_flush_job(__rte_unused struct rte_timer *timer, __rte_unused void *arg)
 	uint64_t now;
 	unsigned lcore_id;
 	struct lcore_queue_conf *qconf;
-	struct mbuf_table *m_table;
 	uint8_t portid;
+	unsigned i;
+	uint32_t sent;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	lcore_id = rte_lcore_id();
 	qconf = &lcore_queue_conf[lcore_id];
@@ -522,14 +479,20 @@ l2fwd_flush_job(__rte_unused struct rte_timer *timer, __rte_unused void *arg)
 	now = rte_get_timer_cycles();
 	lcore_id = rte_lcore_id();
 	qconf = &lcore_queue_conf[lcore_id];
-	for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-		m_table = &qconf->tx_mbufs[portid];
-		if (m_table->len == 0 || m_table->next_flush_time <= now)
+
+	for (i = 0; i < qconf->n_rx_port; i++) {
+		portid = l2fwd_dst_ports[qconf->rx_port_list[i]];
+
+		if (qconf->next_flush_time[portid] <= now)
 			continue;
 
-		l2fwd_send_burst(qconf, portid);
-	}
+		buffer = tx_buffer[portid];
+		sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
+		if (sent)
+			port_statistics[portid].tx += sent;
 
+		qconf->next_flush_time[portid] = rte_get_timer_cycles() + drain_tsc;
+	}
 
 	/* Pass target to indicate that this job is happy of time interwal
 	 * in which it was called. */
@@ -945,6 +908,23 @@ main(int argc, char **argv)
 			rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
 				ret, (unsigned) portid);
 
+		/* Initialize TX buffers */
+		tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+		if (tx_buffer[portid] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) portid);
+
+		rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
+				rte_eth_tx_buffer_count_callback,
+				&port_statistics[portid].dropped);
+		if (ret < 0)
+				rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+						"tx buffer on port %u\n", (unsigned) portid);
+
 		/* Start device */
 		ret = rte_eth_dev_start(portid);
 		if (ret < 0)
diff --git a/examples/l2fwd-keepalive/main.c b/examples/l2fwd-keepalive/main.c
index f4d52f2..94b8677 100644
--- a/examples/l2fwd-keepalive/main.c
+++ b/examples/l2fwd-keepalive/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -47,6 +47,7 @@
 
 #include <rte_common.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memcpy.h>
 #include <rte_memzone.h>
@@ -97,21 +98,16 @@ static uint32_t l2fwd_dst_ports[RTE_MAX_ETHPORTS];
 
 static unsigned int l2fwd_rx_queue_per_lcore = 1;
 
-struct mbuf_table {
-	unsigned len;
-	struct rte_mbuf *m_table[MAX_PKT_BURST];
-};
-
 #define MAX_RX_QUEUE_PER_LCORE 16
 #define MAX_TX_QUEUE_PER_PORT 16
 struct lcore_queue_conf {
 	unsigned n_rx_port;
 	unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
-
 } __rte_cache_aligned;
 struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
 
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
 static const struct rte_eth_conf port_conf = {
 	.rxmode = {
 		.split_hdr_size = 0,
@@ -192,58 +188,14 @@ print_stats(__attribute__((unused)) struct rte_timer *ptr_timer,
 	printf("\n====================================================\n");
 }
 
-/* Send the burst of packets on an output interface */
-static int
-l2fwd_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	unsigned ret;
-	unsigned queueid = 0;
-
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
-	port_statistics[port].tx += ret;
-	if (unlikely(ret < n)) {
-		port_statistics[port].dropped += (n - ret);
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
-/* Enqueue packets for TX and prepare them to be sent */
-static int
-l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
-{
-	unsigned lcore_id, len;
-	struct lcore_queue_conf *qconf;
-
-	lcore_id = rte_lcore_id();
-
-	qconf = &lcore_queue_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		l2fwd_send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
-
-	qconf->tx_mbufs[port].len = len;
-	return 0;
-}
-
 static void
 l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 {
 	struct ether_hdr *eth;
 	void *tmp;
+	int sent;
 	unsigned dst_port;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	dst_port = l2fwd_dst_ports[portid];
 	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -255,7 +207,10 @@ l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 	/* src addr */
 	ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], &eth->s_addr);
 
-	l2fwd_send_packet(m, (uint8_t) dst_port);
+	buffer = tx_buffer[dst_port];
+	sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
+	if (sent)
+		port_statistics[dst_port].tx += sent;
 }
 
 /* main processing loop */
@@ -264,12 +219,14 @@ l2fwd_main_loop(void)
 {
 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
 	struct rte_mbuf *m;
+	int sent;
 	unsigned lcore_id;
 	uint64_t prev_tsc, diff_tsc, cur_tsc;
 	unsigned i, j, portid, nb_rx;
 	struct lcore_queue_conf *qconf;
 	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1)
 		/ US_PER_S * BURST_TX_DRAIN_US;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	prev_tsc = 0;
 
@@ -312,13 +269,15 @@ l2fwd_main_loop(void)
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
 
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				l2fwd_send_burst(&lcore_queue_conf[lcore_id],
-						 qconf->tx_mbufs[portid].len,
-						 (uint8_t) portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_rx_port; i++) {
+
+				portid = l2fwd_dst_ports[qconf->rx_port_list[i]];
+				buffer = tx_buffer[portid];
+
+				sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
+				if (sent)
+					port_statistics[portid].tx += sent;
+
 			}
 
 			prev_tsc = cur_tsc;
@@ -713,6 +672,23 @@ main(int argc, char **argv)
 				"rte_eth_tx_queue_setup:err=%d, port=%u\n",
 				ret, (unsigned) portid);
 
+		/* Initialize TX buffers */
+		tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+		if (tx_buffer[portid] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) portid);
+
+		rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
+				rte_eth_tx_buffer_count_callback,
+				&port_statistics[portid].dropped);
+		if (ret < 0)
+				rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+						"tx buffer on port %u\n", (unsigned) portid);
+
 		/* Start device */
 		ret = rte_eth_dev_start(portid);
 		if (ret < 0)
diff --git a/examples/l2fwd/main.c b/examples/l2fwd/main.c
index f35d8a1..e175681 100644
--- a/examples/l2fwd/main.c
+++ b/examples/l2fwd/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -49,6 +49,7 @@
 
 #include <rte_common.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memcpy.h>
 #include <rte_memzone.h>
@@ -99,21 +100,16 @@ static uint32_t l2fwd_dst_ports[RTE_MAX_ETHPORTS];
 
 static unsigned int l2fwd_rx_queue_per_lcore = 1;
 
-struct mbuf_table {
-	unsigned len;
-	struct rte_mbuf *m_table[MAX_PKT_BURST];
-};
-
 #define MAX_RX_QUEUE_PER_LCORE 16
 #define MAX_TX_QUEUE_PER_PORT 16
 struct lcore_queue_conf {
 	unsigned n_rx_port;
 	unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
-
 } __rte_cache_aligned;
 struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
 
+static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
 static const struct rte_eth_conf port_conf = {
 	.rxmode = {
 		.split_hdr_size = 0,
@@ -189,58 +185,14 @@ print_stats(void)
 	printf("\n====================================================\n");
 }
 
-/* Send the burst of packets on an output interface */
-static int
-l2fwd_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	unsigned ret;
-	unsigned queueid =0;
-
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
-	port_statistics[port].tx += ret;
-	if (unlikely(ret < n)) {
-		port_statistics[port].dropped += (n - ret);
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
-/* Enqueue packets for TX and prepare them to be sent */
-static int
-l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
-{
-	unsigned lcore_id, len;
-	struct lcore_queue_conf *qconf;
-
-	lcore_id = rte_lcore_id();
-
-	qconf = &lcore_queue_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		l2fwd_send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
-
-	qconf->tx_mbufs[port].len = len;
-	return 0;
-}
-
 static void
 l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 {
 	struct ether_hdr *eth;
 	void *tmp;
 	unsigned dst_port;
+	int sent;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	dst_port = l2fwd_dst_ports[portid];
 	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -252,7 +204,10 @@ l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 	/* src addr */
 	ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], &eth->s_addr);
 
-	l2fwd_send_packet(m, (uint8_t) dst_port);
+	buffer = tx_buffer[dst_port];
+	sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
+	if (sent)
+		port_statistics[dst_port].tx += sent;
 }
 
 /* main processing loop */
@@ -261,11 +216,14 @@ l2fwd_main_loop(void)
 {
 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
 	struct rte_mbuf *m;
+	int sent;
 	unsigned lcore_id;
 	uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;
 	unsigned i, j, portid, nb_rx;
 	struct lcore_queue_conf *qconf;
-	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
+	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S *
+			BURST_TX_DRAIN_US;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	prev_tsc = 0;
 	timer_tsc = 0;
@@ -285,6 +243,7 @@ l2fwd_main_loop(void)
 		portid = qconf->rx_port_list[i];
 		RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u\n", lcore_id,
 			portid);
+
 	}
 
 	while (!force_quit) {
@@ -297,13 +256,15 @@ l2fwd_main_loop(void)
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
 
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				l2fwd_send_burst(&lcore_queue_conf[lcore_id],
-						 qconf->tx_mbufs[portid].len,
-						 (uint8_t) portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_rx_port; i++) {
+
+				portid = l2fwd_dst_ports[qconf->rx_port_list[i]];
+				buffer = tx_buffer[portid];
+
+				sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
+				if (sent)
+					port_statistics[portid].tx += sent;
+
 			}
 
 			/* if timer is enabled */
@@ -688,6 +649,23 @@ main(int argc, char **argv)
 			rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
 				ret, (unsigned) portid);
 
+		/* Initialize TX buffers */
+		tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+		if (tx_buffer[portid] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) portid);
+
+		rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
+				rte_eth_tx_buffer_count_callback,
+				&port_statistics[portid].dropped);
+		if (ret < 0)
+				rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+						"tx buffer on port %u\n", (unsigned) portid);
+
 		/* Start device */
 		ret = rte_eth_dev_start(portid);
 		if (ret < 0)
diff --git a/examples/l3fwd-acl/main.c b/examples/l3fwd-acl/main.c
index f676d14..3a895b7 100644
--- a/examples/l3fwd-acl/main.c
+++ b/examples/l3fwd-acl/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -119,11 +119,6 @@ static uint32_t enabled_port_mask;
 static int promiscuous_on; /**< Ports set in promiscuous mode off by default. */
 static int numa_on = 1; /**< NUMA is enabled by default. */
 
-struct mbuf_table {
-	uint16_t len;
-	struct rte_mbuf *m_table[MAX_PKT_BURST];
-};
-
 struct lcore_rx_queue {
 	uint8_t port_id;
 	uint8_t queue_id;
@@ -187,7 +182,7 @@ static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
 static inline int
 is_valid_ipv4_pkt(struct ipv4_hdr *pkt, uint32_t link_len);
 #endif
-static inline int
+static inline void
 send_single_packet(struct rte_mbuf *m, uint8_t port);
 
 #define MAX_ACL_RULE_NUM	100000
@@ -1291,56 +1286,26 @@ app_acl_init(void)
 struct lcore_conf {
 	uint16_t n_rx_queue;
 	struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
+	uint16_t n_tx_port;
+	uint16_t tx_port_id[RTE_MAX_ETHPORTS];
 	uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
+	struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 } __rte_cache_aligned;
 
 static struct lcore_conf lcore_conf[RTE_MAX_LCORE];
 
-/* Send burst of packets on an output interface */
-static inline int
-send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	int ret;
-	uint16_t queueid;
-
-	queueid = qconf->tx_queue_id[port];
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, queueid, m_table, n);
-	if (unlikely(ret < n)) {
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
 /* Enqueue a single packet, and send burst if queue is filled */
-static inline int
+static inline void
 send_single_packet(struct rte_mbuf *m, uint8_t port)
 {
 	uint32_t lcore_id;
-	uint16_t len;
 	struct lcore_conf *qconf;
 
 	lcore_id = rte_lcore_id();
 
 	qconf = &lcore_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
-
-	qconf->tx_mbufs[port].len = len;
-	return 0;
+	rte_eth_tx_buffer(port, qconf->tx_queue_id[port],
+			qconf->tx_buffer[port], m);
 }
 
 #ifdef DO_RFC_1812_CHECKS
@@ -1428,20 +1393,12 @@ main_loop(__attribute__((unused)) void *dummy)
 		 */
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
-
-			/*
-			 * This could be optimized (use queueid instead of
-			 * portid), but it is not called so often
-			 */
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				send_burst(&lcore_conf[lcore_id],
-					qconf->tx_mbufs[portid].len,
-					portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_tx_port; ++i) {
+				portid = qconf->tx_port_id[i];
+				rte_eth_tx_buffer_flush(portid,
+						qconf->tx_queue_id[portid],
+						qconf->tx_buffer[portid]);
 			}
-
 			prev_tsc = cur_tsc;
 		}
 
@@ -1936,6 +1893,7 @@ main(int argc, char **argv)
 	unsigned lcore_id;
 	uint32_t n_tx_queue, nb_lcores;
 	uint8_t portid, nb_rx_queue, queue, socketid;
+	uint8_t nb_tx_port;
 
 	/* init EAL */
 	ret = rte_eal_init(argc, argv);
@@ -1968,6 +1926,7 @@ main(int argc, char **argv)
 		rte_exit(EXIT_FAILURE, "app_acl_init failed\n");
 
 	nb_lcores = rte_lcore_count();
+	nb_tx_port = 0;
 
 	/* initialize all ports */
 	for (portid = 0; portid < nb_ports; portid++) {
@@ -2003,6 +1962,22 @@ main(int argc, char **argv)
 		if (ret < 0)
 			rte_exit(EXIT_FAILURE, "init_mem failed\n");
 
+		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
+			if (rte_lcore_is_enabled(lcore_id) == 0)
+				continue;
+
+			/* Initialize TX buffers */
+			qconf = &lcore_conf[lcore_id];
+			qconf->tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+					RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+					rte_eth_dev_socket_id(portid));
+			if (qconf->tx_buffer[portid] == NULL)
+				rte_exit(EXIT_FAILURE, "Can't allocate tx buffer for port %u\n",
+						(unsigned) portid);
+
+			rte_eth_tx_buffer_init(qconf->tx_buffer[portid], MAX_PKT_BURST);
+		}
+
 		/* init one TX queue per couple (lcore,port) */
 		queueid = 0;
 		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
@@ -2032,8 +2007,13 @@ main(int argc, char **argv)
 			qconf = &lcore_conf[lcore_id];
 			qconf->tx_queue_id[portid] = queueid;
 			queueid++;
+
+			qconf->n_tx_port = nb_tx_port;
+			qconf->tx_port_id[qconf->n_tx_port] = portid;
 		}
 		printf("\n");
+
+		nb_tx_port++;
 	}
 
 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
diff --git a/examples/l3fwd-power/main.c b/examples/l3fwd-power/main.c
index f8a2f1b..d4bb7a3 100644
--- a/examples/l3fwd-power/main.c
+++ b/examples/l3fwd-power/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -47,6 +47,7 @@
 #include <rte_common.h>
 #include <rte_byteorder.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memcpy.h>
 #include <rte_memzone.h>
@@ -173,11 +174,6 @@ enum freq_scale_hint_t
 	FREQ_HIGHEST  =       2
 };
 
-struct mbuf_table {
-	uint16_t len;
-	struct rte_mbuf *m_table[MAX_PKT_BURST];
-};
-
 struct lcore_rx_queue {
 	uint8_t port_id;
 	uint8_t queue_id;
@@ -347,8 +343,10 @@ static lookup_struct_t *ipv4_l3fwd_lookup_struct[NB_SOCKETS];
 struct lcore_conf {
 	uint16_t n_rx_queue;
 	struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
+	uint16_t n_tx_port;
+	uint16_t tx_port_id[RTE_MAX_ETHPORTS];
 	uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
+	struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 	lookup_struct_t * ipv4_lookup_struct;
 	lookup_struct_t * ipv6_lookup_struct;
 } __rte_cache_aligned;
@@ -442,49 +440,19 @@ power_timer_cb(__attribute__((unused)) struct rte_timer *tim,
 	stats[lcore_id].sleep_time = 0;
 }
 
-/* Send burst of packets on an output interface */
-static inline int
-send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	int ret;
-	uint16_t queueid;
-
-	queueid = qconf->tx_queue_id[port];
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, queueid, m_table, n);
-	if (unlikely(ret < n)) {
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
 /* Enqueue a single packet, and send burst if queue is filled */
 static inline int
 send_single_packet(struct rte_mbuf *m, uint8_t port)
 {
 	uint32_t lcore_id;
-	uint16_t len;
 	struct lcore_conf *qconf;
 
 	lcore_id = rte_lcore_id();
-
 	qconf = &lcore_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
 
-	qconf->tx_mbufs[port].len = len;
+	rte_eth_tx_buffer(port, qconf->tx_queue_id[port],
+			qconf->tx_buffer[port], m);
+
 	return 0;
 }
 
@@ -905,20 +873,12 @@ main_loop(__attribute__((unused)) void *dummy)
 		 */
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
-
-			/*
-			 * This could be optimized (use queueid instead of
-			 * portid), but it is not called so often
-			 */
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				send_burst(&lcore_conf[lcore_id],
-					qconf->tx_mbufs[portid].len,
-					portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_tx_port; ++i) {
+				portid = qconf->tx_port_id[i];
+				rte_eth_tx_buffer_flush(portid,
+						qconf->tx_queue_id[portid],
+						qconf->tx_buffer[portid]);
 			}
-
 			prev_tsc = cur_tsc;
 		}
 
@@ -1585,6 +1545,7 @@ main(int argc, char **argv)
 	uint32_t n_tx_queue, nb_lcores;
 	uint32_t dev_rxq_num, dev_txq_num;
 	uint8_t portid, nb_rx_queue, queue, socketid;
+	uint8_t nb_tx_port;
 
 	/* catch SIGINT and restore cpufreq governor to ondemand */
 	signal(SIGINT, signal_exit_now);
@@ -1620,6 +1581,7 @@ main(int argc, char **argv)
 		rte_exit(EXIT_FAILURE, "check_port_config failed\n");
 
 	nb_lcores = rte_lcore_count();
+	nb_tx_port = 0;
 
 	/* initialize all ports */
 	for (portid = 0; portid < nb_ports; portid++) {
@@ -1663,6 +1625,22 @@ main(int argc, char **argv)
 		if (ret < 0)
 			rte_exit(EXIT_FAILURE, "init_mem failed\n");
 
+		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
+			if (rte_lcore_is_enabled(lcore_id) == 0)
+				continue;
+
+			/* Initialize TX buffers */
+			qconf = &lcore_conf[lcore_id];
+			qconf->tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+			if (qconf->tx_buffer[portid] == NULL)
+				rte_exit(EXIT_FAILURE, "Can't allocate tx buffer for port %u\n",
+						(unsigned) portid);
+
+			rte_eth_tx_buffer_init(qconf->tx_buffer[portid], MAX_PKT_BURST);
+		}
+
 		/* init one TX queue per couple (lcore,port) */
 		queueid = 0;
 		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
@@ -1695,8 +1673,13 @@ main(int argc, char **argv)
 			qconf = &lcore_conf[lcore_id];
 			qconf->tx_queue_id[portid] = queueid;
 			queueid++;
+
+			qconf->n_tx_port = nb_tx_port;
+			qconf->tx_port_id[qconf->n_tx_port] = portid;
 		}
 		printf("\n");
+
+		nb_tx_port++;
 	}
 
 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
diff --git a/examples/link_status_interrupt/main.c b/examples/link_status_interrupt/main.c
index c57a08a..cbc29bc 100644
--- a/examples/link_status_interrupt/main.c
+++ b/examples/link_status_interrupt/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -48,6 +48,7 @@
 
 #include <rte_common.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memcpy.h>
 #include <rte_memzone.h>
@@ -97,10 +98,6 @@ static unsigned int lsi_rx_queue_per_lcore = 1;
 static unsigned lsi_dst_ports[RTE_MAX_ETHPORTS] = {0};
 
 #define MAX_PKT_BURST 32
-struct mbuf_table {
-	unsigned len;
-	struct rte_mbuf *m_table[MAX_PKT_BURST];
-};
 
 #define MAX_RX_QUEUE_PER_LCORE 16
 #define MAX_TX_QUEUE_PER_PORT 16
@@ -108,11 +105,11 @@ struct lcore_queue_conf {
 	unsigned n_rx_port;
 	unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
 	unsigned tx_queue_id;
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
-
 } __rte_cache_aligned;
 struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
 
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
 static const struct rte_eth_conf port_conf = {
 	.rxmode = {
 		.split_hdr_size = 0,
@@ -202,59 +199,14 @@ print_stats(void)
 	printf("\n====================================================\n");
 }
 
-/* Send the packet on an output interface */
-static int
-lsi_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	unsigned ret;
-	unsigned queueid;
-
-	queueid = (uint16_t) qconf->tx_queue_id;
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
-	port_statistics[port].tx += ret;
-	if (unlikely(ret < n)) {
-		port_statistics[port].dropped += (n - ret);
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
-/* Send the packet on an output interface */
-static int
-lsi_send_packet(struct rte_mbuf *m, uint8_t port)
-{
-	unsigned lcore_id, len;
-	struct lcore_queue_conf *qconf;
-
-	lcore_id = rte_lcore_id();
-
-	qconf = &lcore_queue_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		lsi_send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
-
-	qconf->tx_mbufs[port].len = len;
-	return 0;
-}
-
 static void
 lsi_simple_forward(struct rte_mbuf *m, unsigned portid)
 {
 	struct ether_hdr *eth;
 	void *tmp;
 	unsigned dst_port = lsi_dst_ports[portid];
+	int sent;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
 
@@ -265,7 +217,10 @@ lsi_simple_forward(struct rte_mbuf *m, unsigned portid)
 	/* src addr */
 	ether_addr_copy(&lsi_ports_eth_addr[dst_port], &eth->s_addr);
 
-	lsi_send_packet(m, (uint8_t) dst_port);
+	buffer = tx_buffer[dst_port];
+	sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
+	if (sent)
+		port_statistics[dst_port].tx += sent;
 }
 
 /* main processing loop */
@@ -275,10 +230,13 @@ lsi_main_loop(void)
 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
 	struct rte_mbuf *m;
 	unsigned lcore_id;
+	unsigned sent;
 	uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;
 	unsigned i, j, portid, nb_rx;
 	struct lcore_queue_conf *qconf;
-	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
+	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S *
+			BURST_TX_DRAIN_US;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	prev_tsc = 0;
 	timer_tsc = 0;
@@ -310,15 +268,15 @@ lsi_main_loop(void)
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
 
-			/* this could be optimized (use queueid instead of
-			 * portid), but it is not called so often */
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				lsi_send_burst(&lcore_queue_conf[lcore_id],
-						 qconf->tx_mbufs[portid].len,
-						 (uint8_t) portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_rx_port; i++) {
+
+				portid = lsi_dst_ports[qconf->rx_port_list[i]];
+				buffer = tx_buffer[portid];
+
+				sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
+				if (sent)
+					port_statistics[portid].tx += sent;
+
 			}
 
 			/* if timer is enabled */
@@ -722,6 +680,23 @@ main(int argc, char **argv)
 			rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup: err=%d,port=%u\n",
 				  ret, (unsigned) portid);
 
+		/* Initialize TX buffers */
+		tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+		if (tx_buffer[portid] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) portid);
+
+		rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
+				rte_eth_tx_buffer_count_callback,
+				&port_statistics[portid].dropped);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+					"tx buffer on port %u\n", (unsigned) portid);
+
 		/* Start device */
 		ret = rte_eth_dev_start(portid);
 		if (ret < 0)
@@ -729,6 +704,8 @@ main(int argc, char **argv)
 				  ret, (unsigned) portid);
 		printf("done:\n");
 
+		rte_eth_promiscuous_enable(portid);
+
 		printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
 				(unsigned) portid,
 				lsi_ports_eth_addr[portid].addr_bytes[0],
diff --git a/examples/multi_process/client_server_mp/mp_client/client.c b/examples/multi_process/client_server_mp/mp_client/client.c
index bf049a4..d4f9ca3 100644
--- a/examples/multi_process/client_server_mp/mp_client/client.c
+++ b/examples/multi_process/client_server_mp/mp_client/client.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,7 @@
 #include <string.h>
 
 #include <rte_common.h>
+#include <rte_malloc.h>
 #include <rte_memory.h>
 #include <rte_memzone.h>
 #include <rte_eal.h>
@@ -72,17 +73,13 @@
  * queue to write to. */
 static uint8_t client_id = 0;
 
-struct mbuf_queue {
 #define MBQ_CAPACITY 32
-	struct rte_mbuf *bufs[MBQ_CAPACITY];
-	uint16_t top;
-};
 
 /* maps input ports to output ports for packets */
 static uint8_t output_ports[RTE_MAX_ETHPORTS];
 
 /* buffers up a set of packet that are ready to send */
-static struct mbuf_queue output_bufs[RTE_MAX_ETHPORTS];
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 
 /* shared data from server. We update statistics here */
 static volatile struct tx_stats *tx_stats;
@@ -149,11 +146,51 @@ parse_app_args(int argc, char *argv[])
 }
 
 /*
+ * Tx buffer error callback
+ */
+static void
+flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count,
+		void *userdata) {
+	int i;
+	uint8_t port_id = (uintptr_t)userdata;
+
+	tx_stats->tx_drop[port_id] += count;
+
+	/* free the mbufs which failed from transmit */
+	for (i = 0; i < count; i++)
+		rte_pktmbuf_free(unsent[i]);
+
+}
+
+static void
+configure_tx_buffer(uint8_t port_id, uint16_t size)
+{
+	int ret;
+
+	/* Initialize TX buffers */
+	tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer",
+			RTE_ETH_TX_BUFFER_SIZE(size), 0,
+			rte_eth_dev_socket_id(port_id));
+	if (tx_buffer[port_id] == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+				(unsigned) port_id);
+
+	rte_eth_tx_buffer_init(tx_buffer[port_id], size);
+
+	ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id],
+			flush_tx_error_callback, (void *)(intptr_t)port_id);
+	if (ret < 0)
+			rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+					"tx buffer on port %u\n", (unsigned) port_id);
+}
+
+/*
  * set up output ports so that all traffic on port gets sent out
  * its paired port. Index using actual port numbers since that is
  * what comes in the mbuf structure.
  */
-static void configure_output_ports(const struct port_info *ports)
+static void
+configure_output_ports(const struct port_info *ports)
 {
 	int i;
 	if (ports->num_ports > RTE_MAX_ETHPORTS)
@@ -164,41 +201,11 @@ static void configure_output_ports(const struct port_info *ports)
 		uint8_t p2 = ports->id[i+1];
 		output_ports[p1] = p2;
 		output_ports[p2] = p1;
-	}
-}
 
+		configure_tx_buffer(p1, MBQ_CAPACITY);
+		configure_tx_buffer(p2, MBQ_CAPACITY);
 
-static inline void
-send_packets(uint8_t port)
-{
-	uint16_t i, sent;
-	struct mbuf_queue *mbq = &output_bufs[port];
-
-	if (unlikely(mbq->top == 0))
-		return;
-
-	sent = rte_eth_tx_burst(port, client_id, mbq->bufs, mbq->top);
-	if (unlikely(sent < mbq->top)){
-		for (i = sent; i < mbq->top; i++)
-			rte_pktmbuf_free(mbq->bufs[i]);
-		tx_stats->tx_drop[port] += (mbq->top - sent);
 	}
-	tx_stats->tx[port] += sent;
-	mbq->top = 0;
-}
-
-/*
- * Enqueue a packet to be sent on a particular port, but
- * don't send it yet. Only when the buffer is full.
- */
-static inline void
-enqueue_packet(struct rte_mbuf *buf, uint8_t port)
-{
-	struct mbuf_queue *mbq = &output_bufs[port];
-	mbq->bufs[mbq->top++] = buf;
-
-	if (mbq->top == MBQ_CAPACITY)
-		send_packets(port);
 }
 
 /*
@@ -209,10 +216,15 @@ enqueue_packet(struct rte_mbuf *buf, uint8_t port)
 static void
 handle_packet(struct rte_mbuf *buf)
 {
+	int sent;
 	const uint8_t in_port = buf->port;
 	const uint8_t out_port = output_ports[in_port];
+	struct rte_eth_dev_tx_buffer *buffer = tx_buffer[out_port];
+
+	sent = rte_eth_tx_buffer(out_port, client_id, buffer, buf);
+	if (sent)
+		tx_stats->tx[out_port] += sent;
 
-	enqueue_packet(buf, out_port);
 }
 
 /*
@@ -229,6 +241,7 @@ main(int argc, char *argv[])
 	int need_flush = 0; /* indicates whether we have unsent packets */
 	int retval;
 	void *pkts[PKT_READ_SIZE];
+	uint16_t sent;
 
 	if ((retval = rte_eal_init(argc, argv)) < 0)
 		return -1;
@@ -274,8 +287,12 @@ main(int argc, char *argv[])
 
 		if (unlikely(rx_pkts == 0)){
 			if (need_flush)
-				for (port = 0; port < ports->num_ports; port++)
-					send_packets(ports->id[port]);
+				for (port = 0; port < ports->num_ports; port++) {
+					sent = rte_eth_tx_buffer_flush(ports->id[port], client_id,
+							tx_buffer[port]);
+					if (unlikely(sent))
+						tx_stats->tx[port] += sent;
+				}
 			need_flush = 0;
 			continue;
 		}
diff --git a/examples/multi_process/l2fwd_fork/main.c b/examples/multi_process/l2fwd_fork/main.c
index f2d7eab..aebf531 100644
--- a/examples/multi_process/l2fwd_fork/main.c
+++ b/examples/multi_process/l2fwd_fork/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -127,11 +127,11 @@ struct mbuf_table {
 struct lcore_queue_conf {
 	unsigned n_rx_port;
 	unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
-	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
-
 } __rte_cache_aligned;
 struct lcore_queue_conf lcore_queue_conf[RTE_MAX_LCORE];
 
+struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
+
 struct lcore_resource_struct {
 	int enabled;	/* Only set in case this lcore involved into packet forwarding */
 	int flags; 	    /* Set only slave need to restart or recreate */
@@ -583,58 +583,14 @@ slave_exit_cb(unsigned slaveid, __attribute__((unused))int stat)
 	rte_spinlock_unlock(&res_lock);
 }
 
-/* Send the packet on an output interface */
-static int
-l2fwd_send_burst(struct lcore_queue_conf *qconf, unsigned n, uint8_t port)
-{
-	struct rte_mbuf **m_table;
-	unsigned ret;
-	unsigned queueid =0;
-
-	m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
-
-	ret = rte_eth_tx_burst(port, (uint16_t) queueid, m_table, (uint16_t) n);
-	port_statistics[port].tx += ret;
-	if (unlikely(ret < n)) {
-		port_statistics[port].dropped += (n - ret);
-		do {
-			rte_pktmbuf_free(m_table[ret]);
-		} while (++ret < n);
-	}
-
-	return 0;
-}
-
-/* Send the packet on an output interface */
-static int
-l2fwd_send_packet(struct rte_mbuf *m, uint8_t port)
-{
-	unsigned lcore_id, len;
-	struct lcore_queue_conf *qconf;
-
-	lcore_id = rte_lcore_id();
-
-	qconf = &lcore_queue_conf[lcore_id];
-	len = qconf->tx_mbufs[port].len;
-	qconf->tx_mbufs[port].m_table[len] = m;
-	len++;
-
-	/* enough pkts to be sent */
-	if (unlikely(len == MAX_PKT_BURST)) {
-		l2fwd_send_burst(qconf, MAX_PKT_BURST, port);
-		len = 0;
-	}
-
-	qconf->tx_mbufs[port].len = len;
-	return 0;
-}
-
 static void
 l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 {
 	struct ether_hdr *eth;
 	void *tmp;
 	unsigned dst_port;
+	int sent;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	dst_port = l2fwd_dst_ports[portid];
 	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -646,7 +602,10 @@ l2fwd_simple_forward(struct rte_mbuf *m, unsigned portid)
 	/* src addr */
 	ether_addr_copy(&l2fwd_ports_eth_addr[dst_port], &eth->s_addr);
 
-	l2fwd_send_packet(m, (uint8_t) dst_port);
+	buffer = tx_buffer[dst_port];
+	sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
+	if (sent)
+		port_statistics[dst_port].tx += sent;
 }
 
 /* main processing loop */
@@ -655,11 +614,14 @@ l2fwd_main_loop(void)
 {
 	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
 	struct rte_mbuf *m;
+	int sent;
 	unsigned lcore_id;
 	uint64_t prev_tsc, diff_tsc, cur_tsc;
 	unsigned i, j, portid, nb_rx;
 	struct lcore_queue_conf *qconf;
-	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
+	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S *
+			BURST_TX_DRAIN_US;
+	struct rte_eth_dev_tx_buffer *buffer;
 
 	prev_tsc = 0;
 
@@ -699,13 +661,15 @@ l2fwd_main_loop(void)
 		diff_tsc = cur_tsc - prev_tsc;
 		if (unlikely(diff_tsc > drain_tsc)) {
 
-			for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
-				if (qconf->tx_mbufs[portid].len == 0)
-					continue;
-				l2fwd_send_burst(&lcore_queue_conf[lcore_id],
-						 qconf->tx_mbufs[portid].len,
-						 (uint8_t) portid);
-				qconf->tx_mbufs[portid].len = 0;
+			for (i = 0; i < qconf->n_rx_port; i++) {
+
+				portid = l2fwd_dst_ports[qconf->rx_port_list[i]];
+				buffer = tx_buffer[portid];
+
+				sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
+				if (sent)
+					port_statistics[portid].tx += sent;
+
 			}
 		}
 
@@ -1144,6 +1108,23 @@ main(int argc, char **argv)
 			rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
 				ret, (unsigned) portid);
 
+		/* Initialize TX buffers */
+		tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+				rte_eth_dev_socket_id(portid));
+		if (tx_buffer[portid] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) portid);
+
+		rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
+				rte_eth_tx_buffer_count_callback,
+				&port_statistics[portid].dropped);
+		if (ret < 0)
+				rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+						"tx buffer on port %u\n", (unsigned) portid);
+
 		/* Start device */
 		ret = rte_eth_dev_start(portid);
 		if (ret < 0)
diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c
index 1d9a86f..15bb900 100644
--- a/examples/packet_ordering/main.c
+++ b/examples/packet_ordering/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
 #include <rte_errno.h>
 #include <rte_ethdev.h>
 #include <rte_lcore.h>
+#include <rte_malloc.h>
 #include <rte_mbuf.h>
 #include <rte_mempool.h>
 #include <rte_ring.h>
@@ -54,7 +55,7 @@
 
 #define RING_SIZE 16384
 
-/* uncommnet below line to enable debug logs */
+/* uncomment below line to enable debug logs */
 /* #define DEBUG */
 
 #ifdef DEBUG
@@ -86,11 +87,6 @@ struct send_thread_args {
 	struct rte_reorder_buffer *buffer;
 };
 
-struct output_buffer {
-	unsigned count;
-	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
-};
-
 volatile struct app_stats {
 	struct {
 		uint64_t rx_pkts;
@@ -235,6 +231,68 @@ parse_args(int argc, char **argv)
 	return 0;
 }
 
+/*
+ * Tx buffer error callback
+ */
+static void
+flush_tx_error_callback(struct rte_mbuf **unsent, uint16_t count,
+		void *userdata __rte_unused) {
+
+	/* free the mbufs which failed from transmit */
+	app_stats.tx.ro_tx_failed_pkts += count;
+	LOG_DEBUG(REORDERAPP, "%s:Packet loss with tx_burst\n", __func__);
+	pktmbuf_free_bulk(unsent, count);
+
+}
+
+static inline int
+free_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[]) {
+	const uint8_t nb_ports = rte_eth_dev_count();
+	unsigned port_id;
+
+	/* initialize buffers for all ports */
+	for (port_id = 0; port_id < nb_ports; port_id++) {
+		/* skip ports that are not enabled */
+		if ((portmask & (1 << port_id)) == 0)
+			continue;
+
+		rte_free(tx_buffer[port_id]);
+	}
+	return 0;
+}
+
+static inline int
+configure_tx_buffers(struct rte_eth_dev_tx_buffer *tx_buffer[])
+{
+	const uint8_t nb_ports = rte_eth_dev_count();
+	unsigned port_id;
+	int ret;
+
+	/* initialize buffers for all ports */
+	for (port_id = 0; port_id < nb_ports; port_id++) {
+		/* skip ports that are not enabled */
+		if ((portmask & (1 << port_id)) == 0)
+			continue;
+
+		/* Initialize TX buffers */
+		tx_buffer[port_id] = rte_zmalloc_socket("tx_buffer",
+				RTE_ETH_TX_BUFFER_SIZE(MAX_PKTS_BURST), 0,
+				rte_eth_dev_socket_id(port_id));
+		if (tx_buffer[port_id] == NULL)
+			rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
+					(unsigned) port_id);
+
+		rte_eth_tx_buffer_init(tx_buffer[port_id], MAX_PKTS_BURST);
+
+		ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[port_id],
+				flush_tx_error_callback, NULL);
+		if (ret < 0)
+			rte_exit(EXIT_FAILURE, "Cannot set error callback for "
+					"tx buffer on port %u\n", (unsigned) port_id);
+	}
+	return 0;
+}
+
 static inline int
 configure_eth_port(uint8_t port_id)
 {
@@ -438,22 +496,6 @@ worker_thread(void *args_ptr)
 	return 0;
 }
 
-static inline void
-flush_one_port(struct output_buffer *outbuf, uint8_t outp)
-{
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.ro_tx_pkts += nb_tx;
-
-	if (unlikely(nb_tx < outbuf->count)) {
-		/* free the mbufs which failed from transmit */
-		app_stats.tx.ro_tx_failed_pkts += (outbuf->count - nb_tx);
-		LOG_DEBUG(REORDERAPP, "%s:Packet loss with tx_burst\n", __func__);
-		pktmbuf_free_bulk(&outbuf->mbufs[nb_tx], outbuf->count - nb_tx);
-	}
-	outbuf->count = 0;
-}
-
 /**
  * Dequeue mbufs from the workers_to_tx ring and reorder them before
  * transmitting.
@@ -465,12 +507,15 @@ send_thread(struct send_thread_args *args)
 	unsigned int i, dret;
 	uint16_t nb_dq_mbufs;
 	uint8_t outp;
-	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
+	unsigned sent;
 	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
 	struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL};
+	static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 
 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__, rte_lcore_id());
 
+	configure_tx_buffers(tx_buffer);
+
 	while (!quit_signal) {
 
 		/* deque the mbufs from workers_to_tx ring */
@@ -515,7 +560,7 @@ send_thread(struct send_thread_args *args)
 		dret = rte_reorder_drain(args->buffer, rombufs, MAX_PKTS_BURST);
 		for (i = 0; i < dret; i++) {
 
-			struct output_buffer *outbuf;
+			struct rte_eth_dev_tx_buffer *outbuf;
 			uint8_t outp1;
 
 			outp1 = rombufs[i]->port;
@@ -525,12 +570,15 @@ send_thread(struct send_thread_args *args)
 				continue;
 			}
 
-			outbuf = &tx_buffers[outp1];
-			outbuf->mbufs[outbuf->count++] = rombufs[i];
-			if (outbuf->count == MAX_PKTS_BURST)
-				flush_one_port(outbuf, outp1);
+			outbuf = tx_buffer[outp1];
+			sent = rte_eth_tx_buffer(outp1, 0, outbuf, rombufs[i]);
+			if (sent)
+				app_stats.tx.ro_tx_pkts += sent;
 		}
 	}
+
+	free_tx_buffers(tx_buffer);
+
 	return 0;
 }
 
@@ -542,12 +590,16 @@ tx_thread(struct rte_ring *ring_in)
 {
 	uint32_t i, dqnum;
 	uint8_t outp;
-	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
+	unsigned sent;
 	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
-	struct output_buffer *outbuf;
+	struct rte_eth_dev_tx_buffer *outbuf;
+	static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 
 	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
 							rte_lcore_id());
+
+	configure_tx_buffers(tx_buffer);
+
 	while (!quit_signal) {
 
 		/* deque the mbufs from workers_to_tx ring */
@@ -567,10 +619,10 @@ tx_thread(struct rte_ring *ring_in)
 				continue;
 			}
 
-			outbuf = &tx_buffers[outp];
-			outbuf->mbufs[outbuf->count++] = mbufs[i];
-			if (outbuf->count == MAX_PKTS_BURST)
-				flush_one_port(outbuf, outp);
+			outbuf = tx_buffer[outp];
+			sent = rte_eth_tx_buffer(outp, 0, outbuf, mbufs[i]);
+			if (sent)
+				app_stats.tx.ro_tx_pkts += sent;
 		}
 	}
 
diff --git a/examples/qos_meter/main.c b/examples/qos_meter/main.c
index 0de5e7f..b968b00 100644
--- a/examples/qos_meter/main.c
+++ b/examples/qos_meter/main.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -36,6 +36,7 @@
 
 #include <rte_common.h>
 #include <rte_eal.h>
+#include <rte_malloc.h>
 #include <rte_mempool.h>
 #include <rte_ethdev.h>
 #include <rte_cycles.h>
@@ -118,9 +119,7 @@ static struct rte_eth_conf port_conf = {
 static uint8_t port_rx;
 static uint8_t port_tx;
 static struct rte_mbuf *pkts_rx[PKT_RX_BURST_MAX];
-static struct rte_mbuf *pkts_tx[PKT_TX_BURST_MAX];
-static uint16_t pkts_tx_len = 0;
-
+struct rte_eth_dev_tx_buffer *tx_buffer;
 
 struct rte_meter_srtcm_params app_srtcm_params[] = {
 	{.cir = 1000000 * 46,  .cbs = 2048, .ebs = 2048},
@@ -188,27 +187,8 @@ main_loop(__attribute__((unused)) void *dummy)
 		current_time = rte_rdtsc();
 		time_diff = current_time - last_time;
 		if (unlikely(time_diff > TIME_TX_DRAIN)) {
-			int ret;
-
-			if (pkts_tx_len == 0) {
-				last_time = current_time;
-
-				continue;
-			}
-
-			/* Write packet burst to NIC TX */
-			ret = rte_eth_tx_burst(port_tx, NIC_TX_QUEUE, pkts_tx, pkts_tx_len);
-
-			/* Free buffers for any packets not written successfully */
-			if (unlikely(ret < pkts_tx_len)) {
-				for ( ; ret < pkts_tx_len; ret ++) {
-					rte_pktmbuf_free(pkts_tx[ret]);
-				}
-			}
-
-			/* Empty the output buffer */
-			pkts_tx_len = 0;
-
+			/* Flush tx buffer */
+			rte_eth_tx_buffer_flush(port_tx, NIC_TX_QUEUE, tx_buffer);
 			last_time = current_time;
 		}
 
@@ -222,26 +202,8 @@ main_loop(__attribute__((unused)) void *dummy)
 			/* Handle current packet */
 			if (app_pkt_handle(pkt, current_time) == DROP)
 				rte_pktmbuf_free(pkt);
-			else {
-				pkts_tx[pkts_tx_len] = pkt;
-				pkts_tx_len ++;
-			}
-
-			/* Write packets from output buffer to NIC TX when full burst is available */
-			if (unlikely(pkts_tx_len == PKT_TX_BURST_MAX)) {
-				/* Write packet burst to NIC TX */
-				int ret = rte_eth_tx_burst(port_tx, NIC_TX_QUEUE, pkts_tx, PKT_TX_BURST_MAX);
-
-				/* Free buffers for any packets not written successfully */
-				if (unlikely(ret < PKT_TX_BURST_MAX)) {
-					for ( ; ret < PKT_TX_BURST_MAX; ret ++) {
-						rte_pktmbuf_free(pkts_tx[ret]);
-					}
-				}
-
-				/* Empty the output buffer */
-				pkts_tx_len = 0;
-			}
+			else
+				rte_eth_tx_buffer(port_tx, NIC_TX_QUEUE, tx_buffer, pkt);
 		}
 	}
 }
@@ -397,6 +359,15 @@ main(int argc, char **argv)
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Port %d TX queue setup error (%d)\n", port_tx, ret);
 
+	tx_buffer = rte_zmalloc_socket("tx_buffer",
+			RTE_ETH_TX_BUFFER_SIZE(PKT_TX_BURST_MAX), 0,
+			rte_eth_dev_socket_id(port_tx));
+	if (tx_buffer == NULL)
+		rte_exit(EXIT_FAILURE, "Port %d TX buffer allocation error\n",
+				port_tx);
+
+	rte_eth_tx_buffer_init(tx_buffer, PKT_TX_BURST_MAX);
+
 	ret = rte_eth_dev_start(port_rx);
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Port %d start error (%d)\n", port_rx, ret);
-- 
1.7.9.5



More information about the dev mailing list