[dpdk-dev] [PATCH] Add in_flight_bitmask so as to use full 32 bits of tag.

jigsaw jigsaw at gmail.com
Mon Nov 10 13:13:05 CET 2014


OK thx Bruce, I will make 2 commits with one cover-letter.

On Mon, Nov 10, 2014 at 1:09 PM, Bruce Richardson <
bruce.richardson at intel.com> wrote:

> On Mon, Nov 10, 2014 at 12:38:32PM +0200, Qinglai Xiao wrote:
> > User application is advocated to set the newly introduced union field
> > mbuf->hash.usr as flow id, which is uint32_t.
> > With introduction of in_flight_bitmask, the whole 32 bits of tag can
> > be used.
> >
> > Further more, this patch fixed the integer overflow when finding the
> > matched tags.
> >
> > Note that currently librte_distributor supports up to 64 worker
> > threads. If more workers are needed, the size of in_flight_bitmask and
> > the algorithm of finding matched tag must be revised.
> >
> > Signed-off-by: Qinglai Xiao <jigsaw at gmail.com>
>
> Hi,
>
> this would probably be better as two patches rather than one. One patch to
> add the hash.usr field, and then use it in the distributor. The change to
> the distributor to add the bit mask to allow all bits of the tag to be used
> should then go as a separate patch.
>
> Regards,
> /Bruce
>
>
> > ---
> >  app/test/test_distributor.c              |   18 ++++++------
> >  app/test/test_distributor_perf.c         |    4 +-
> >  lib/librte_distributor/rte_distributor.c |   45
> +++++++++++++++++++++--------
> >  lib/librte_distributor/rte_distributor.h |    3 ++
> >  lib/librte_mbuf/rte_mbuf.h               |    1 +
> >  5 files changed, 47 insertions(+), 24 deletions(-)
> >
> > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> > index ce06436..9e8c06d 100644
> > --- a/app/test/test_distributor.c
> > +++ b/app/test/test_distributor.c
> > @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       rte_distributor_flush(d);
> > @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       if (rte_lcore_count() >= 3) {
> >               clear_packet_count();
> >               for (i = 0; i < BURST; i++)
> > -                     bufs[i]->hash.rss = (i & 1) << 8;
> > +                     bufs[i]->hash.usr = (i & 1) << 8;
> >
> >               rte_distributor_process(d, bufs, BURST);
> >               rte_distributor_flush(d);
> > @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >        * so load gets distributed */
> >       clear_packet_count();
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = i;
> > +             bufs[i]->hash.usr = i;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       rte_distributor_flush(d);
> > @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >               return -1;
> >       }
> >       for (i = 0; i < BIG_BATCH; i++)
> > -             many_bufs[i]->hash.rss = i << 2;
> > +             many_bufs[i]->hash.usr = i << 2;
> >
> >       for (i = 0; i < BIG_BATCH/BURST; i++) {
> >               rte_distributor_process(d, &many_bufs[i*BURST], BURST);
> > @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor
> *d, struct rte_mempool *p)
> >               while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
> >                       rte_distributor_process(d, NULL, 0);
> >               for (j = 0; j < BURST; j++) {
> > -                     bufs[j]->hash.rss = (i+j) << 1;
> > +                     bufs[j]->hash.usr = (i+j) << 1;
> >                       rte_mbuf_refcnt_set(bufs[j], 1);
> >               }
> >
> > @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct
> rte_distributor *d,
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       /* at this point, we will have processed some packets and have a
> full
> > @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct
> rte_distributor *d,
> >               return -1;
> >       }
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       /* get worker zero to quit */
> >       zero_quit = 1;
> > @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct
> rte_distributor *d,
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       /* at this point, we will have processed some packets and have a
> full
> > @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct
> rte_mempool *p)
> >       zero_quit = 0;
> >       quit = 1;
> >       for (i = 0; i < num_workers; i++)
> > -             bufs[i]->hash.rss = i << 1;
> > +             bufs[i]->hash.usr = i << 1;
> >       rte_distributor_process(d, bufs, num_workers);
> >
> >       rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> > diff --git a/app/test/test_distributor_perf.c
> b/app/test/test_distributor_perf.c
> > index b04864c..48ee344 100644
> > --- a/app/test/test_distributor_perf.c
> > +++ b/app/test/test_distributor_perf.c
> > @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       }
> >       /* ensure we have different hash value for each pkt */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = i;
> > +             bufs[i]->hash.usr = i;
> >
> >       start = rte_rdtsc();
> >       for (i = 0; i < (1<<ITER_POWER); i++)
> > @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct
> rte_mempool *p)
> >
> >       quit = 1;
> >       for (i = 0; i < num_workers; i++)
> > -             bufs[i]->hash.rss = i << 1;
> > +             bufs[i]->hash.usr = i << 1;
> >       rte_distributor_process(d, bufs, num_workers);
> >
> >       rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> > diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> > index 656ee5c..a84ea97 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -92,7 +92,13 @@ struct rte_distributor {
> >       unsigned num_workers;                 /**< Number of workers
> polling */
> >
> >       uint32_t in_flight_tags[RTE_MAX_LCORE];
> > -             /**< Tracks the tag being processed per core, 0 == no pkt
> */
> > +             /**< Tracks the tag being processed per core */
> > +     uint64_t in_flight_bitmask;
> > +             /**< on/off bits for in-flight tags.
> > +              * Note that if RTE_MAX_LCORE is larger than 64 then
> > +              * the bitmask has to expand.
> > +              */
> > +
> >       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> >
> >       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> > @@ -189,6 +195,7 @@ static inline void
> >  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
> >  {
> >       d->in_flight_tags[wkr] = 0;
> > +     d->in_flight_bitmask &= ~(1UL << wkr);
> >       d->bufs[wkr].bufptr64 = 0;
> >       if (unlikely(d->backlog[wkr].count != 0)) {
> >               /* On return of a packet, we need to move the
> > @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d,
> unsigned wkr)
> >                       pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
> >                                       RTE_DISTRIB_FLAG_BITS));
> >               }
> > -             /* recursive call */
> > +             /* recursive call.
> > +              * Note that the tags were set before first level call
> > +              * to rte_distributor_process.
> > +              */
> >               rte_distributor_process(d, pkts, i);
> >               bl->count = bl->start = 0;
> >       }
> > @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
> >                       else {
> >                               d->bufs[wkr].bufptr64 =
> RTE_DISTRIB_GET_BUF;
> >                               d->in_flight_tags[wkr] = 0;
> > +                             d->in_flight_bitmask &= ~(1UL << wkr);
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >               } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > @@ -284,17 +295,21 @@ rte_distributor_process(struct rte_distributor *d,
> >                       next_value = (((int64_t)(uintptr_t)next_mb)
> >                                       << RTE_DISTRIB_FLAG_BITS);
> >                       /*
> > -                      * Set the low bit on the tag, so we can guarantee
> that
> > -                      * we never store a tag value of zero. That means
> we can
> > -                      * use the zero-value to indicate that no packet is
> > -                      * being processed by a worker.
> > +                      * User is advocated to set tag vaue for each
> > +                      * mbuf before calling rte_distributor_process.
> > +                      * User defined tags are used to identify flows,
> > +                      * or sessions.
> >                        */
> > -                     new_tag = (next_mb->hash.rss | 1);
> > +                     new_tag = next_mb->hash.usr;
> >
> > -                     uint32_t match = 0;
> > +                     /*
> > +                      * Note that if RTE_MAX_LCORE is larger than 64
> then
> > +                      * the size of match has to be expanded.
> > +                      */
> > +                     uint64_t match = 0;
> >                       unsigned i;
> >                       /*
> > -                      * to scan for a match use "xor" and "not" to get
> a 0/1
> > +                      * To scan for a match use "xor" and "not" to get
> a 0/1
> >                        * value, then use shifting to merge to single
> "match"
> >                        * variable, where a one-bit indicates a match for
> the
> >                        * worker given by the bit-position
> > @@ -303,9 +318,11 @@ rte_distributor_process(struct rte_distributor *d,
> >                               match |= (!(d->in_flight_tags[i] ^ new_tag)
> >                                       << i);
> >
> > +                     /* Only turned-on bits are considered as match */
> > +                     match &= d->in_flight_bitmask;
> >                       if (match) {
> >                               next_mb = NULL;
> > -                             unsigned worker = __builtin_ctz(match);
> > +                             unsigned worker = __builtin_ctzl(match);
> >                               if (add_to_backlog(&d->backlog[worker],
> >                                               next_value) < 0)
> >                                       next_idx--;
> > @@ -322,6 +339,7 @@ rte_distributor_process(struct rte_distributor *d,
> >                       else {
> >                               d->bufs[wkr].bufptr64 = next_value;
> >                               d->in_flight_tags[wkr] = new_tag;
> > +                             d->in_flight_bitmask |= (1UL << wkr);
> >                               next_mb = NULL;
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > @@ -379,11 +397,12 @@ rte_distributor_returned_pkts(struct
> rte_distributor *d,
> >  static inline unsigned
> >  total_outstanding(const struct rte_distributor *d)
> >  {
> > -     unsigned wkr, total_outstanding = 0;
> > +     unsigned wkr, total_outstanding;
> >
> > +     total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
> >       for (wkr = 0; wkr < d->num_workers; wkr++)
> > -             total_outstanding += d->backlog[wkr].count +
> > -                             !!(d->in_flight_tags[wkr]);
> > +             total_outstanding += d->backlog[wkr].count;
> > +
> >       return total_outstanding;
> >  }
> >
> > diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> > index ec0d74a..a29c506 100644
> > --- a/lib/librte_distributor/rte_distributor.h
> > +++ b/lib/librte_distributor/rte_distributor.h
> > @@ -87,6 +87,9 @@ rte_distributor_create(const char *name, unsigned
> socket_id,
> >   * Process a set of packets by distributing them among workers that
> request
> >   * packets. The distributor will ensure that no two packets that have
> the
> >   * same flow id, or tag, in the mbuf will be procesed at the same time.
> > + * The user is advocated to set tag for each mbuf. If user doesn't set
> the
> > + * tag, the tag value can be various values depending on driver
> implementation
> > + * and configuration.
> >   *
> >   * This is not multi-thread safe and should only be called on a single
> lcore.
> >   *
> > diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> > index e8f9bfc..f5f8658 100644
> > --- a/lib/librte_mbuf/rte_mbuf.h
> > +++ b/lib/librte_mbuf/rte_mbuf.h
> > @@ -185,6 +185,7 @@ struct rte_mbuf {
> >                       uint16_t id;
> >               } fdir;           /**< Filter identifier if FDIR enabled */
> >               uint32_t sched;   /**< Hierarchical scheduler */
> > +             uint32_t usr;     /**< User defined tags. See
> @rte_distributor_process */
> >       } hash;                   /**< hash information */
> >
> >       /* second cache line - fields only used in slow path or on TX */
> > --
> > 1.7.1
> >
>


More information about the dev mailing list