[dpdk-dev] [PATCH v3 2/9] ring: prepare ring to allow new sync schemes

Ananyev, Konstantin konstantin.ananyev at intel.com
Thu Apr 9 15:39:01 CEST 2020


> > Subject: [PATCH v3 2/9] ring: prepare ring to allow new sync schemes
> >
> > Change from *single* to *sync_type* to allow different synchronisation
> > schemes to be applied.
> > Mark *single* as deprecated in comments.
> > Add new functions to allow user to query ring sync types.
> > Replace direct access to *single* with appopriate function call.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
> > ---
> >  app/test/test_pdump.c           |   6 +-
> >  lib/librte_pdump/rte_pdump.c    |   2 +-
> >  lib/librte_port/rte_port_ring.c |  12 ++--
> >  lib/librte_ring/rte_ring.c      |   6 +-
> >  lib/librte_ring/rte_ring.h      | 113 ++++++++++++++++++++++++++------
> >  lib/librte_ring/rte_ring_elem.h |   8 +--
> >  6 files changed, 108 insertions(+), 39 deletions(-)
> >
> > diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index
> > ad183184c..6a1180bcb 100644
> > --- a/app/test/test_pdump.c
> > +++ b/app/test/test_pdump.c
> > @@ -57,8 +57,7 @@ run_pdump_client_tests(void)
> >  	if (ret < 0)
> >  		return -1;
> >  	mp->flags = 0x0000;
> > -	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
> > -				      RING_F_SP_ENQ | RING_F_SC_DEQ);
> > +	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
> Are you saying to get SP and SC behavior we now have to set the flags to 0?

No.
What the original cause does:
creates SP/SC ring:
ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
				      RING_F_SP_ENQ | RING_F_SC_DEQ);
Then manually makes it MP/MC by:
ring_client->prod.single = 0;
ring_client->cons.single = 0;

Instead it should just create MP/MC ring straightway, as the patch does:
ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);

 >Isn't that a ABI break?
I don't see any.

> 
> >  	if (ring_client == NULL) {
> >  		printf("rte_ring_create SR0 failed");
> >  		return -1;
> > @@ -71,9 +70,6 @@ run_pdump_client_tests(void)
> >  	}
> >  	rte_eth_dev_probing_finish(eth_dev);
> >
> > -	ring_client->prod.single = 0;
> > -	ring_client->cons.single = 0;
> Just wondering if users outside of DPDK have done the same. I hope not, otherwise, we have an API break?

I think no. While it is completely wrong practise, it would keep working
even with these changes. 

> 
> > -
> >  	printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
> >
> >  	for (itr = 0; itr < NUM_ITR; itr++) {
> > diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
> > index 8a01ac510..65364f2c5 100644
> > --- a/lib/librte_pdump/rte_pdump.c
> > +++ b/lib/librte_pdump/rte_pdump.c
> > @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct
> > rte_mempool *mp)
> >  		rte_errno = EINVAL;
> >  		return -1;
> >  	}
> > -	if (ring->prod.single || ring->cons.single) {
> > +	if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
> >  		PDUMP_LOG(ERR, "ring with either SP or SC settings"
> >  		" is not valid for pdump, should have MP and MC settings\n");
> >  		rte_errno = EINVAL;
> > diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
> > index 47fcdd06a..2f6c050fa 100644
> > --- a/lib/librte_port/rte_port_ring.c
> > +++ b/lib/librte_port/rte_port_ring.c
> > @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int
> > socket_id,
> >  	/* Check input parameters */
> >  	if ((conf == NULL) ||
> >  		(conf->ring == NULL) ||
> > -		(conf->ring->cons.single && is_multi) ||
> > -		(!(conf->ring->cons.single) && !is_multi)) {
> > +		(rte_ring_cons_single(conf->ring) && is_multi) ||
> > +		(!rte_ring_cons_single(conf->ring) && !is_multi)) {
> >  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> >  		return NULL;
> >  	}
> > @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params,
> > int socket_id,
> >  	/* Check input parameters */
> >  	if ((conf == NULL) ||
> >  		(conf->ring == NULL) ||
> > -		(conf->ring->prod.single && is_multi) ||
> > -		(!(conf->ring->prod.single) && !is_multi) ||
> > +		(rte_ring_prod_single(conf->ring) && is_multi) ||
> > +		(!rte_ring_prod_single(conf->ring) && !is_multi) ||
> >  		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> >  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> >  		return NULL;
> > @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void
> > *params, int socket_id,
> >  	/* Check input parameters */
> >  	if ((conf == NULL) ||
> >  		(conf->ring == NULL) ||
> > -		(conf->ring->prod.single && is_multi) ||
> > -		(!(conf->ring->prod.single) && !is_multi) ||
> > +		(rte_ring_prod_single(conf->ring) && is_multi) ||
> > +		(!rte_ring_prod_single(conf->ring) && !is_multi) ||
> >  		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
> >  		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
> >  		return NULL;
> > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> > 77e5de099..fa5733907 100644
> > --- a/lib/librte_ring/rte_ring.c
> > +++ b/lib/librte_ring/rte_ring.c
> > @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> > unsigned count,
> >  	if (ret < 0 || ret >= (int)sizeof(r->name))
> >  		return -ENAMETOOLONG;
> >  	r->flags = flags;
> > -	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> > -	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> > +	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
> > +		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> > +	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
> > +		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
> >
> >  	if (flags & RING_F_EXACT_SZ) {
> >  		r->size = rte_align32pow2(count + 1); diff --git
> > a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> > 18fc5d845..d4775a063 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -61,11 +61,27 @@ enum rte_ring_queue_behavior {  #define
> > RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> >  			   sizeof(RTE_RING_MZ_PREFIX) + 1)
> >
> > -/* structure to hold a pair of head/tail values and other metadata */
> > +/** prod/cons sync types */
> > +enum rte_ring_sync_type {
> > +	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
> > +	RTE_RING_SYNC_ST,     /**< single thread only */
> > +};
> > +
> > +/**
> > + * structure to hold a pair of head/tail values and other metadata.
> > + * Depending on sync_type format of that structure might be different,
> > + * but offset for *sync_type* and *tail* values should remain the same.
> > + */
> >  struct rte_ring_headtail {
> > -	volatile uint32_t head;  /**< Prod/consumer head. */
> > -	volatile uint32_t tail;  /**< Prod/consumer tail. */
> > -	uint32_t single;         /**< True if single prod/cons */
> > +	volatile uint32_t head;      /**< prod/consumer head. */
> > +	volatile uint32_t tail;      /**< prod/consumer tail. */
> > +	RTE_STD_C11
> > +	union {
> > +		/** sync type of prod/cons */
> > +		enum rte_ring_sync_type sync_type;
> > +		/** deprecated -  True if single prod/cons */
> > +		uint32_t single;
> > +	};
> >  };
> >
> >  /**
> > @@ -116,11 +132,10 @@ struct rte_ring {
> >  #define RING_F_EXACT_SZ 0x0004
> >  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> >
> > -/* @internal defines for passing to the enqueue dequeue worker functions
> > */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define
> > __IS_MC 0
> > +#define __IS_SP RTE_RING_SYNC_ST
> > +#define __IS_MP RTE_RING_SYNC_MT
> > +#define __IS_SC RTE_RING_SYNC_ST
> > +#define __IS_MC RTE_RING_SYNC_MT
> I think we can remove these #defines and use the new SYNC types

Wouldn't that introduce an API breakage?
Or we are ok here, as they are marked as internal?
I think I can for sure mark them as deprecated.
 
> >
> >  /**
> >   * Calculate the memory size needed for a ring @@ -420,7 +435,7 @@
> > rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> >  			 unsigned int n, unsigned int *free_space)  {
> >  	return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -			__IS_MP, free_space);
> > +			RTE_RING_SYNC_MT, free_space);
> >  }
> >
> >  /**
> > @@ -443,7 +458,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *
> > const *obj_table,
> >  			 unsigned int n, unsigned int *free_space)  {
> >  	return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -			__IS_SP, free_space);
> > +			RTE_RING_SYNC_ST, free_space);
> >  }
> >
> >  /**
> > @@ -470,7 +485,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> > const *obj_table,
> >  		      unsigned int n, unsigned int *free_space)  {
> >  	return __rte_ring_do_enqueue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -			r->prod.single, free_space);
> > +			r->prod.sync_type, free_space);
> >  }
> >
> >  /**
> > @@ -554,7 +569,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void
> > **obj_table,
> >  		unsigned int n, unsigned int *available)  {
> >  	return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -			__IS_MC, available);
> > +			RTE_RING_SYNC_MT, available);
> >  }
> >
> >  /**
> > @@ -578,7 +593,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void
> > **obj_table,
> >  		unsigned int n, unsigned int *available)  {
> >  	return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -			__IS_SC, available);
> > +			RTE_RING_SYNC_ST, available);
> >  }
> >
> >  /**
> > @@ -605,7 +620,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> > **obj_table, unsigned int n,
> >  		unsigned int *available)
> >  {
> >  	return __rte_ring_do_dequeue(r, obj_table, n,
> > RTE_RING_QUEUE_FIXED,
> > -				r->cons.single, available);
> > +				r->cons.sync_type, available);
> >  }
> >
> >  /**
> > @@ -777,6 +792,62 @@ rte_ring_get_capacity(const struct rte_ring *r)
> >  	return r->capacity;
> >  }
> >
> > +/**
> > + * Return sync type used by producer in the ring.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @return
> > + *   Producer sync type value.
> > + */
> > +static inline enum rte_ring_sync_type
> > +rte_ring_get_prod_sync_type(const struct rte_ring *r) {
> > +	return r->prod.sync_type;
> > +}
> > +
> > +/**
> > + * Check is the ring for single producer.
>                      ^^ if
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @return
> > + *   true if ring is SP, zero otherwise.
> > + */
> > +static inline int
> > +rte_ring_prod_single(const struct rte_ring *r) {
> would rte_ring_is_prod_single better?

Ok, can rename.

> 
> > +	return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); }
> > +
> > +/**
> > + * Return sync type used by consumer in the ring.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @return
> > + *   Consumer sync type value.
> > + */
> > +static inline enum rte_ring_sync_type
> > +rte_ring_get_cons_sync_type(const struct rte_ring *r) {
> > +	return r->cons.sync_type;
> > +}
> > +
> > +/**
> > + * Check is the ring for single consumer.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @return
> > + *   true if ring is SC, zero otherwise.
> > + */
> > +static inline int
> > +rte_ring_cons_single(const struct rte_ring *r) {
> > +	return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); }
> > +
> All these new functions are  not required to be called in the data path. They can be made non-inline.

Well, all these functions are introduced to encourage people not to
access ring fields sync_type/single directly but use functions instead.
I don't know do people access ring.single directly at data-path or not,
but assuming that they do - making these functions not-inline would 
force them to ignore these functions and keep accessing it directly.
That was my thoughts besides making them inline.
I think we have the same for get_size/get_capacity().



More information about the dev mailing list