[dpdk-dev] Enhance KNI DPDK-app-side to be Multi-Producer/Consumer

Zhou, Danny danny.zhou at intel.com
Thu Nov 20 05:00:58 CET 2014


Robert, I roughly review the code below about lockless KNI fifo with MP/MC support , which looks good to me. Comparing to SP/SC implementation, I think It should introduce a little performance degradation but it will be making your multiple-threaded DPDK application easier to program.  Do you have plan to support MP/MC in kernel part of KNI as well?

From: Robert Sanford [mailto:rsanford2 at gmail.com]
Sent: Thursday, November 20, 2014 4:49 AM
To: Zhou, Danny; dev at dpdk.org
Subject: Re: [dpdk-dev] Enhance KNI DPDK-app-side to be Multi-Producer/Consumer

Hi Danny,

On Fri, Nov 14, 2014 at 7:04 PM, Zhou, Danny <danny.zhou at intel.com<mailto:danny.zhou at intel.com>> wrote:
It will be always good if you can submit the RFC patch in terms of KNI optimization.

On the other hand, do you have any perf. data to prove that your patchset could improve
KNI performance which is the concern that most customers care about? We introduced
multiple-threaded KNI kernel support last year, if I remember correctly, the key perform
bottle-neck we found is the skb alloc/free and memcpy between skb and mbuf. Would be
very happy if your patchset can approve I am wrong.


This is not an attempt to improve raw performance. Our modest goal is to make librte_kni's RX/TX burst APIs multithreaded, without changing rte_kni.ko. In this RFC patch, we make it possible for multiple cores to concurrently invoke rte_kni_tx_burst (or rte_kni_rx_burst) for the same KNI device.

At the moment, multiple cores invoking rte_kni_tx_burst for the same device cannot function correctly, because the rte_kni_fifo structures (memory shared between app and kernel driver) are single-producer, single-consumer. The following patch supplements the rte_kni_fifo structure with an additional structure that is private to the application, and we borrow librte_ring's MP/MC enqueue/dequeue logic.


Here is a patch for 1.8. We have only tested a 1.7.1 version. Please have a look and let us know whether you think something like this would be useful.

--
Thanks,
Robert


Signed-off-by: Robert Sanford <rsanford at akamai.com<mailto:rsanford at akamai.com>>

---
 lib/librte_kni/rte_kni.c      |   21 +++++-
 lib/librte_kni/rte_kni_fifo.h |  131 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+), 4 deletions(-)

diff --git a/lib/librte_kni/rte_kni.c b/lib/librte_kni/rte_kni.c
index fdb7509..8009173 100644
--- a/lib/librte_kni/rte_kni.c
+++ b/lib/librte_kni/rte_kni.c
@@ -76,6 +76,11 @@ struct rte_kni {
  struct rte_kni_fifo *alloc_q;       /**< Allocated mbufs queue */
  struct rte_kni_fifo *free_q;        /**< To be freed mbufs queue */

+  struct rte_kni_fifo_multi tx_q_mc;  /**< Make tx_q multi-consumer */
+  struct rte_kni_fifo_multi alloc_q_mp;/**< Make alloc_q multi-producer */
+  struct rte_kni_fifo_multi rx_q_mp;  /**< Make rx_q multi-producer */
+  struct rte_kni_fifo_multi free_q_mc;/**< Make free_q multi-consumer */
+
  /* For request & response */
  struct rte_kni_fifo *req_q;         /**< Request queue */
  struct rte_kni_fifo *resp_q;        /**< Response queue */
@@ -414,6 +419,11 @@ rte_kni_alloc(struct rte_mempool *pktmbuf_pool,
  kni_fifo_init(ctx->free_q, KNI_FIFO_COUNT_MAX);
  dev_info.free_phys = mz->phys_addr;

+  kni_fifo_multi_init(&ctx->tx_q_mc, KNI_FIFO_COUNT_MAX);
+  kni_fifo_multi_init(&ctx->alloc_q_mp, KNI_FIFO_COUNT_MAX);
+  kni_fifo_multi_init(&ctx->rx_q_mp, KNI_FIFO_COUNT_MAX);
+  kni_fifo_multi_init(&ctx->free_q_mc, KNI_FIFO_COUNT_MAX);
+
  /* Request RING */
  mz = slot->m_req_q;
  ctx->req_q = mz->addr;
@@ -557,7 +567,8 @@ rte_kni_handle_request(struct rte_kni *kni)
 unsigned
 rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num)
 {
-  unsigned ret = kni_fifo_put(kni->rx_q, (void **)mbufs, num);
+  unsigned ret = kni_fifo_put_mp(kni->rx_q, &kni->rx_q_mp, (void **)mbufs,
+         num);

  /* Get mbufs from free_q and then free them */
  kni_free_mbufs(kni);
@@ -568,7 +579,8 @@ rte_kni_tx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num)
 unsigned
 rte_kni_rx_burst(struct rte_kni *kni, struct rte_mbuf **mbufs, unsigned num)
 {
-  unsigned ret = kni_fifo_get(kni->tx_q, (void **)mbufs, num);
+  unsigned ret = kni_fifo_get_mc(kni->tx_q, &kni->tx_q_mc,
+         (void **)mbufs, num);

  /* Allocate mbufs and then put them into alloc_q */
  kni_allocate_mbufs(kni);
@@ -582,7 +594,8 @@ kni_free_mbufs(struct rte_kni *kni)
  int i, ret;
  struct rte_mbuf *pkts[MAX_MBUF_BURST_NUM];

-  ret = kni_fifo_get(kni->free_q, (void **)pkts, MAX_MBUF_BURST_NUM);
+  ret = kni_fifo_get_mc(kni->free_q, &kni->free_q_mc, (void **)pkts,
+         MAX_MBUF_BURST_NUM);
  if (likely(ret > 0)) {
     for (i = 0; i < ret; i++)
         rte_pktmbuf_free(pkts[i]);
@@ -629,7 +642,7 @@ kni_allocate_mbufs(struct rte_kni *kni)
  if (i <= 0)
     return;

-  ret = kni_fifo_put(kni->alloc_q, (void **)pkts, i);
+  ret = kni_fifo_put_mp(kni->alloc_q, &kni->alloc_q_mp, (void **)pkts, i);

  /* Check if any mbufs not put into alloc_q, and then free them */
  if (ret >= 0 && ret < i && ret < MAX_MBUF_BURST_NUM) {
diff --git a/lib/librte_kni/rte_kni_fifo.h b/lib/librte_kni/rte_kni_fifo.h
index 8cb8587..7dccba2 100644
--- a/lib/librte_kni/rte_kni_fifo.h
+++ b/lib/librte_kni/rte_kni_fifo.h
@@ -91,3 +91,134 @@ kni_fifo_get(struct rte_kni_fifo *fifo, void **data, unsigned num)
  fifo->read = new_read;
  return i;
 }
+
+
+/**
+ * Suplemental members to facilitate MP/MC access to KNI FIFOs.
+ */
+struct rte_kni_fifo_multi {
+  volatile uint32_t head;
+  volatile uint32_t tail;
+  uint32_t mask;
+  uint32_t size;
+};
+
+/**
+ * Initialize a kni fifo MP/MC struct.
+ */
+static void
+kni_fifo_multi_init(struct rte_kni_fifo_multi *multi, unsigned size)
+{
+  multi->head = 0;
+  multi->tail = 0;
+  multi->mask = (typeof(multi->mask))(size - 1);
+  multi->size = (typeof(multi->size))size;
+}
+
+/**
+ * Adds num elements into the fifo. Return the number actually written.
+ *
+ * Multiple-producer version, modeled after __rte_ring_mp_do_enqueue().
+ */
+static inline unsigned
+kni_fifo_put_mp(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *prod,
+     void **data, unsigned n)
+{
+  uint32_t prod_head, prod_next;
+  uint32_t cons_tail, free_entries;
+  const unsigned max = n;
+  int success;
+  unsigned i;
+  const uint32_t mask = prod->mask;
+
+  /* Move prod->head atomically. */
+  do {
+     /* Reset n to the initial burst count. */
+     n = max;
+
+     prod_head = prod->head;
+     cons_tail = fifo->read;
+
+     free_entries = (mask + cons_tail - prod_head) & mask;
+
+     /* Check that we have enough room in ring. */
+     if (unlikely(n > free_entries)) {
+         if (unlikely(free_entries == 0))
+            return 0;
+         n = free_entries;
+     }
+
+     prod_next = (prod_head + n) & mask;
+     success = rte_atomic32_cmpset(&prod->head, prod_head, prod_next);
+  } while (unlikely(success == 0));
+
+  /* Write entries in ring. */
+  for (i = 0; i < n; i++) {
+     fifo->buffer[(prod_head + i) & mask] = data[i];
+  }
+  rte_compiler_barrier();
+
+  /* If there are other enqueues in progress that preceded us,
+  * we need to wait for them to complete.
+  */
+  while (unlikely(prod->tail != prod_head))
+     rte_pause();
+
+  prod->tail = prod_next;
+  fifo->write = prod_next;
+  return n;
+}
+
+/**
+ * Get up to num elements from the fifo. Return the number actully read.
+ *
+ * Multiple-consumer version, modeled after __rte_ring_mc_do_dequeue().
+ */
+static inline unsigned
+kni_fifo_get_mc(struct rte_kni_fifo *fifo, struct rte_kni_fifo_multi *cons,
+     void **data, unsigned n)
+{
+  uint32_t cons_head, prod_tail;
+  uint32_t cons_next, entries;
+  const unsigned max = n;
+  int success;
+  unsigned i;
+  const uint32_t mask = cons->mask;
+
+  /* Move cons->head atomically. */
+  do {
+     /* Restore n as it may change every loop. */
+     n = max;
+
+     cons_head = cons->head;
+     prod_tail = fifo->write;
+
+     entries = (prod_tail - cons_head + mask + 1) & mask;
+
+     /* Set the actual entries for dequeue. */
+     if (n > entries) {
+         if (unlikely(entries == 0))
+            return 0;
+         n = entries;
+     }
+
+     cons_next = (cons_head + n) & mask;
+     success = rte_atomic32_cmpset(&cons->head, cons_head, cons_next);
+  } while (unlikely(success == 0));
+
+  /* Copy entries from ring. */
+  for (i = 0; i < n; i++) {
+     data[i] = fifo->buffer[(cons_head + i) & mask];
+  }
+  rte_compiler_barrier();
+
+  /* If there are other dequeues in progress that preceded us,
+  * we need to wait for them to complete.
+  */
+  while (unlikely(cons->tail != cons_head))
+     rte_pause();
+
+  cons->tail = cons_next;
+  fifo->read = cons_next;
+  return n;
+}
--
1.7.1



More information about the dev mailing list