[dpdk-dev] [PATCH v7 07/10] ipsec: rework SA replay window/SQN for MT environment

Konstantin Ananyev konstantin.ananyev at intel.com
Thu Jan 10 15:20:56 CET 2019


With these changes functions:
  - rte_ipsec_pkt_crypto_prepare
  - rte_ipsec_pkt_process
 can be safely used in MT environment, as long as the user can guarantee
 that they obey multiple readers/single writer model for SQN+replay_window
 operations.
 To be more specific:
 for outbound SA there are no restrictions.
 for inbound SA the caller has to guarantee that at any given moment
 only one thread is executing rte_ipsec_pkt_process() for given SA.
 Note that it is caller responsibility to maintain correct order
 of packets to be processed.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
Acked-by: Declan Doherty <declan.doherty at intel.com>
---
 lib/librte_ipsec/ipsec_sqn.h    | 113 +++++++++++++++++++++++++++++++-
 lib/librte_ipsec/rte_ipsec_sa.h |  33 ++++++++++
 lib/librte_ipsec/sa.c           |  80 +++++++++++++++++-----
 lib/librte_ipsec/sa.h           |  21 +++++-
 4 files changed, 225 insertions(+), 22 deletions(-)

diff --git a/lib/librte_ipsec/ipsec_sqn.h b/lib/librte_ipsec/ipsec_sqn.h
index 6e18c34eb..7de10bef5 100644
--- a/lib/librte_ipsec/ipsec_sqn.h
+++ b/lib/librte_ipsec/ipsec_sqn.h
@@ -15,6 +15,8 @@
 
 #define IS_ESN(sa)	((sa)->sqn_mask == UINT64_MAX)
 
+#define	SQN_ATOMIC(sa)	((sa)->type & RTE_IPSEC_SATP_SQN_ATOM)
+
 /*
  * gets SQN.hi32 bits, SQN supposed to be in network byte order.
  */
@@ -140,8 +142,12 @@ esn_outb_update_sqn(struct rte_ipsec_sa *sa, uint32_t *num)
 	uint64_t n, s, sqn;
 
 	n = *num;
-	sqn = sa->sqn.outb + n;
-	sa->sqn.outb = sqn;
+	if (SQN_ATOMIC(sa))
+		sqn = (uint64_t)rte_atomic64_add_return(&sa->sqn.outb.atom, n);
+	else {
+		sqn = sa->sqn.outb.raw + n;
+		sa->sqn.outb.raw = sqn;
+	}
 
 	/* overflow */
 	if (sqn > sa->sqn_mask) {
@@ -231,4 +237,107 @@ rsn_size(uint32_t nb_bucket)
 	return sz;
 }
 
+/**
+ * Copy replay window and SQN.
+ */
+static inline void
+rsn_copy(const struct rte_ipsec_sa *sa, uint32_t dst, uint32_t src)
+{
+	uint32_t i, n;
+	struct replay_sqn *d;
+	const struct replay_sqn *s;
+
+	d = sa->sqn.inb.rsn[dst];
+	s = sa->sqn.inb.rsn[src];
+
+	n = sa->replay.nb_bucket;
+
+	d->sqn = s->sqn;
+	for (i = 0; i != n; i++)
+		d->window[i] = s->window[i];
+}
+
+/**
+ * Get RSN for read-only access.
+ */
+static inline struct replay_sqn *
+rsn_acquire(struct rte_ipsec_sa *sa)
+{
+	uint32_t n;
+	struct replay_sqn *rsn;
+
+	n = sa->sqn.inb.rdidx;
+	rsn = sa->sqn.inb.rsn[n];
+
+	if (!SQN_ATOMIC(sa))
+		return rsn;
+
+	/* check there are no writers */
+	while (rte_rwlock_read_trylock(&rsn->rwl) < 0) {
+		rte_pause();
+		n = sa->sqn.inb.rdidx;
+		rsn = sa->sqn.inb.rsn[n];
+		rte_compiler_barrier();
+	}
+
+	return rsn;
+}
+
+/**
+ * Release read-only access for RSN.
+ */
+static inline void
+rsn_release(struct rte_ipsec_sa *sa, struct replay_sqn *rsn)
+{
+	if (SQN_ATOMIC(sa))
+		rte_rwlock_read_unlock(&rsn->rwl);
+}
+
+/**
+ * Start RSN update.
+ */
+static inline struct replay_sqn *
+rsn_update_start(struct rte_ipsec_sa *sa)
+{
+	uint32_t k, n;
+	struct replay_sqn *rsn;
+
+	n = sa->sqn.inb.wridx;
+
+	/* no active writers */
+	RTE_ASSERT(n == sa->sqn.inb.rdidx);
+
+	if (!SQN_ATOMIC(sa))
+		return sa->sqn.inb.rsn[n];
+
+	k = REPLAY_SQN_NEXT(n);
+	sa->sqn.inb.wridx = k;
+
+	rsn = sa->sqn.inb.rsn[k];
+	rte_rwlock_write_lock(&rsn->rwl);
+	rsn_copy(sa, k, n);
+
+	return rsn;
+}
+
+/**
+ * Finish RSN update.
+ */
+static inline void
+rsn_update_finish(struct rte_ipsec_sa *sa, struct replay_sqn *rsn)
+{
+	uint32_t n;
+
+	if (!SQN_ATOMIC(sa))
+		return;
+
+	n = sa->sqn.inb.wridx;
+	RTE_ASSERT(n != sa->sqn.inb.rdidx);
+	RTE_ASSERT(rsn - sa->sqn.inb.rsn == n);
+
+	rte_rwlock_write_unlock(&rsn->rwl);
+	sa->sqn.inb.rdidx = n;
+}
+
+
 #endif /* _IPSEC_SQN_H_ */
diff --git a/lib/librte_ipsec/rte_ipsec_sa.h b/lib/librte_ipsec/rte_ipsec_sa.h
index d99028c2c..7802da3b1 100644
--- a/lib/librte_ipsec/rte_ipsec_sa.h
+++ b/lib/librte_ipsec/rte_ipsec_sa.h
@@ -55,6 +55,27 @@ struct rte_ipsec_sa_prm {
 	uint32_t replay_win_sz;
 };
 
+/**
+ * Indicates that SA will(/will not) need an 'atomic' access
+ * to sequence number and replay window.
+ * 'atomic' here means:
+ * functions:
+ *  - rte_ipsec_pkt_crypto_prepare
+ *  - rte_ipsec_pkt_process
+ * can be safely used in MT environment, as long as the user can guarantee
+ * that they obey multiple readers/single writer model for SQN+replay_window
+ * operations.
+ * To be more specific:
+ * for outbound SA there are no restrictions.
+ * for inbound SA the caller has to guarantee that at any given moment
+ * only one thread is executing rte_ipsec_pkt_process() for given SA.
+ * Note that it is caller responsibility to maintain correct order
+ * of packets to be processed.
+ * In other words - it is a caller responsibility to serialize process()
+ * invocations.
+ */
+#define	RTE_IPSEC_SAFLAG_SQN_ATOM	(1ULL << 0)
+
 /**
  * SA type is an 64-bit value that contain the following information:
  * - IP version (IPv4/IPv6)
@@ -62,6 +83,8 @@ struct rte_ipsec_sa_prm {
  * - inbound/outbound
  * - mode (TRANSPORT/TUNNEL)
  * - for TUNNEL outer IP version (IPv4/IPv6)
+ * - are SA SQN operations 'atomic'
+ * - ESN enabled/disabled
  * ...
  */
 
@@ -70,6 +93,8 @@ enum {
 	RTE_SATP_LOG2_PROTO,
 	RTE_SATP_LOG2_DIR,
 	RTE_SATP_LOG2_MODE,
+	RTE_SATP_LOG2_SQN = RTE_SATP_LOG2_MODE + 2,
+	RTE_SATP_LOG2_ESN,
 	RTE_SATP_LOG2_NUM
 };
 
@@ -90,6 +115,14 @@ enum {
 #define RTE_IPSEC_SATP_MODE_TUNLV4	(1ULL << RTE_SATP_LOG2_MODE)
 #define RTE_IPSEC_SATP_MODE_TUNLV6	(2ULL << RTE_SATP_LOG2_MODE)
 
+#define RTE_IPSEC_SATP_SQN_MASK		(1ULL << RTE_SATP_LOG2_SQN)
+#define RTE_IPSEC_SATP_SQN_RAW		(0ULL << RTE_SATP_LOG2_SQN)
+#define RTE_IPSEC_SATP_SQN_ATOM		(1ULL << RTE_SATP_LOG2_SQN)
+
+#define RTE_IPSEC_SATP_ESN_MASK		(1ULL << RTE_SATP_LOG2_ESN)
+#define RTE_IPSEC_SATP_ESN_DISABLE	(0ULL << RTE_SATP_LOG2_ESN)
+#define RTE_IPSEC_SATP_ESN_ENABLE	(1ULL << RTE_SATP_LOG2_ESN)
+
 /**
  * get type of given SA
  * @return
diff --git a/lib/librte_ipsec/sa.c b/lib/librte_ipsec/sa.c
index d263e7bcf..8d4ce1ac6 100644
--- a/lib/librte_ipsec/sa.c
+++ b/lib/librte_ipsec/sa.c
@@ -80,21 +80,37 @@ rte_ipsec_sa_type(const struct rte_ipsec_sa *sa)
 }
 
 static int32_t
-ipsec_sa_size(uint32_t wsz, uint64_t type, uint32_t *nb_bucket)
+ipsec_sa_size(uint64_t type, uint32_t *wnd_sz, uint32_t *nb_bucket)
 {
-	uint32_t n, sz;
+	uint32_t n, sz, wsz;
 
+	wsz = *wnd_sz;
 	n = 0;
-	if (wsz != 0 && (type & RTE_IPSEC_SATP_DIR_MASK) ==
-			RTE_IPSEC_SATP_DIR_IB)
-		n = replay_num_bucket(wsz);
+
+	if ((type & RTE_IPSEC_SATP_DIR_MASK) == RTE_IPSEC_SATP_DIR_IB) {
+
+		/*
+		 * RFC 4303 recommends 64 as minimum window size.
+		 * there is no point to use ESN mode without SQN window,
+		 * so make sure we have at least 64 window when ESN is enalbed.
+		 */
+		wsz = ((type & RTE_IPSEC_SATP_ESN_MASK) ==
+			RTE_IPSEC_SATP_ESN_DISABLE) ?
+			wsz : RTE_MAX(wsz, (uint32_t)WINDOW_BUCKET_SIZE);
+		if (wsz != 0)
+			n = replay_num_bucket(wsz);
+	}
 
 	if (n > WINDOW_BUCKET_MAX)
 		return -EINVAL;
 
+	*wnd_sz = wsz;
 	*nb_bucket = n;
 
 	sz = rsn_size(n);
+	if ((type & RTE_IPSEC_SATP_SQN_MASK) == RTE_IPSEC_SATP_SQN_ATOM)
+		sz *= REPLAY_SQN_NUM;
+
 	sz += sizeof(struct rte_ipsec_sa);
 	return sz;
 }
@@ -158,6 +174,18 @@ fill_sa_type(const struct rte_ipsec_sa_prm *prm, uint64_t *type)
 	} else
 		return -EINVAL;
 
+	/* check for ESN flag */
+	if (prm->ipsec_xform.options.esn == 0)
+		tp |= RTE_IPSEC_SATP_ESN_DISABLE;
+	else
+		tp |= RTE_IPSEC_SATP_ESN_ENABLE;
+
+	/* interpret flags */
+	if (prm->flags & RTE_IPSEC_SAFLAG_SQN_ATOM)
+		tp |= RTE_IPSEC_SATP_SQN_ATOM;
+	else
+		tp |= RTE_IPSEC_SATP_SQN_RAW;
+
 	*type = tp;
 	return 0;
 }
@@ -191,7 +219,7 @@ esp_inb_tun_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm)
 static void
 esp_outb_init(struct rte_ipsec_sa *sa, uint32_t hlen)
 {
-	sa->sqn.outb = 1;
+	sa->sqn.outb.raw = 1;
 
 	/* these params may differ with new algorithms support */
 	sa->ctp.auth.offset = hlen;
@@ -277,11 +305,26 @@ esp_sa_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm,
 	return 0;
 }
 
+/*
+ * helper function, init SA replay structure.
+ */
+static void
+fill_sa_replay(struct rte_ipsec_sa *sa, uint32_t wnd_sz, uint32_t nb_bucket)
+{
+	sa->replay.win_sz = wnd_sz;
+	sa->replay.nb_bucket = nb_bucket;
+	sa->replay.bucket_index_mask = nb_bucket - 1;
+	sa->sqn.inb.rsn[0] = (struct replay_sqn *)(sa + 1);
+	if ((sa->type & RTE_IPSEC_SATP_SQN_MASK) == RTE_IPSEC_SATP_SQN_ATOM)
+		sa->sqn.inb.rsn[1] = (struct replay_sqn *)
+			((uintptr_t)sa->sqn.inb.rsn[0] + rsn_size(nb_bucket));
+}
+
 int __rte_experimental
 rte_ipsec_sa_size(const struct rte_ipsec_sa_prm *prm)
 {
 	uint64_t type;
-	uint32_t nb;
+	uint32_t nb, wsz;
 	int32_t rc;
 
 	if (prm == NULL)
@@ -293,7 +336,8 @@ rte_ipsec_sa_size(const struct rte_ipsec_sa_prm *prm)
 		return rc;
 
 	/* determine required size */
-	return ipsec_sa_size(prm->replay_win_sz, type, &nb);
+	wsz = prm->replay_win_sz;
+	return ipsec_sa_size(type, &wsz, &nb);
 }
 
 int __rte_experimental
@@ -301,7 +345,7 @@ rte_ipsec_sa_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm,
 	uint32_t size)
 {
 	int32_t rc, sz;
-	uint32_t nb;
+	uint32_t nb, wsz;
 	uint64_t type;
 	struct crypto_xform cxf;
 
@@ -314,7 +358,8 @@ rte_ipsec_sa_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm,
 		return rc;
 
 	/* determine required size */
-	sz = ipsec_sa_size(prm->replay_win_sz, type, &nb);
+	wsz = prm->replay_win_sz;
+	sz = ipsec_sa_size(type, &wsz, &nb);
 	if (sz < 0)
 		return sz;
 	else if (size < (uint32_t)sz)
@@ -347,12 +392,8 @@ rte_ipsec_sa_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm,
 		rte_ipsec_sa_fini(sa);
 
 	/* fill replay window related fields */
-	if (nb != 0) {
-		sa->replay.win_sz = prm->replay_win_sz;
-		sa->replay.nb_bucket = nb;
-		sa->replay.bucket_index_mask = sa->replay.nb_bucket - 1;
-		sa->sqn.inb = (struct replay_sqn *)(sa + 1);
-	}
+	if (nb != 0)
+		fill_sa_replay(sa, wsz, nb);
 
 	return sz;
 }
@@ -877,7 +918,7 @@ inb_pkt_prepare(const struct rte_ipsec_session *ss, struct rte_mbuf *mb[],
 	struct rte_mbuf *dr[num];
 
 	sa = ss->sa;
-	rsn = sa->sqn.inb;
+	rsn = rsn_acquire(sa);
 
 	k = 0;
 	for (i = 0; i != num; i++) {
@@ -896,6 +937,8 @@ inb_pkt_prepare(const struct rte_ipsec_session *ss, struct rte_mbuf *mb[],
 		}
 	}
 
+	rsn_release(sa, rsn);
+
 	/* update cops */
 	lksd_none_cop_prepare(ss, mb, cop, k);
 
@@ -1058,7 +1101,7 @@ esp_inb_rsn_update(struct rte_ipsec_sa *sa, const uint32_t sqn[],
 	uint32_t i, k;
 	struct replay_sqn *rsn;
 
-	rsn = sa->sqn.inb;
+	rsn = rsn_update_start(sa);
 
 	k = 0;
 	for (i = 0; i != num; i++) {
@@ -1068,6 +1111,7 @@ esp_inb_rsn_update(struct rte_ipsec_sa *sa, const uint32_t sqn[],
 			dr[i - k] = mb[i];
 	}
 
+	rsn_update_finish(sa, rsn);
 	return k;
 }
 
diff --git a/lib/librte_ipsec/sa.h b/lib/librte_ipsec/sa.h
index 616cf1b9f..392e8fd7b 100644
--- a/lib/librte_ipsec/sa.h
+++ b/lib/librte_ipsec/sa.h
@@ -5,6 +5,8 @@
 #ifndef _SA_H_
 #define _SA_H_
 
+#include <rte_rwlock.h>
+
 #define IPSEC_MAX_HDR_SIZE	64
 #define IPSEC_MAX_IV_SIZE	16
 #define IPSEC_MAX_IV_QWORD	(IPSEC_MAX_IV_SIZE / sizeof(uint64_t))
@@ -36,7 +38,11 @@ union sym_op_data {
 	};
 };
 
+#define REPLAY_SQN_NUM		2
+#define REPLAY_SQN_NEXT(n)	((n) ^ 1)
+
 struct replay_sqn {
+	rte_rwlock_t rwl;
 	uint64_t sqn;
 	__extension__ uint64_t window[0];
 };
@@ -74,10 +80,21 @@ struct rte_ipsec_sa {
 
 	/*
 	 * sqn and replay window
+	 * In case of SA handled by multiple threads *sqn* cacheline
+	 * could be shared by multiple cores.
+	 * To minimise perfomance impact, we try to locate in a separate
+	 * place from other frequently accesed data.
 	 */
 	union {
-		uint64_t outb;
-		struct replay_sqn *inb;
+		union {
+			rte_atomic64_t atom;
+			uint64_t raw;
+		} outb;
+		struct {
+			uint32_t rdidx; /* read index */
+			uint32_t wridx; /* write index */
+			struct replay_sqn *rsn[REPLAY_SQN_NUM];
+		} inb;
 	} sqn;
 
 } __rte_cache_aligned;
-- 
2.17.1



More information about the dev mailing list