[dpdk-dev] [PATCH v2 4/4] examples/ipsec-secgw: add scapy based unittests

Marcin Smoczynski marcinx.smoczynski at intel.com
Mon Jun 24 15:40:00 CEST 2019


Add new unittest-like mechanism which uses scapy to craft custom
packets and a set of assertions to check how ipsec-secgw example
application is processing them. Python3 with scapy module is
required by pkttest.sh to run test scripts.

A new mechanism is used to test IPv6 transport mode traffic with
header extensions (trs_ipv6opts.py).

Fix incomplete test log problem by disabling buffering of ipsec-secgw
standard output with stdbuf application.

Signed-off-by: Marcin Smoczynski <marcinx.smoczynski at intel.com>
---
 examples/ipsec-secgw/test/common_defs.sh      |  58 +-----
 .../ipsec-secgw/test/common_defs_secgw.sh     |  65 +++++++
 examples/ipsec-secgw/test/pkttest.py          | 127 ++++++++++++
 examples/ipsec-secgw/test/pkttest.sh          |  65 +++++++
 examples/ipsec-secgw/test/run_test.sh         | 108 ++++++++---
 examples/ipsec-secgw/test/trs_ipv6opts.py     | 181 ++++++++++++++++++
 6 files changed, 519 insertions(+), 85 deletions(-)
 create mode 100644 examples/ipsec-secgw/test/common_defs_secgw.sh
 create mode 100755 examples/ipsec-secgw/test/pkttest.py
 create mode 100755 examples/ipsec-secgw/test/pkttest.sh
 mode change 100644 => 100755 examples/ipsec-secgw/test/run_test.sh
 create mode 100755 examples/ipsec-secgw/test/trs_ipv6opts.py

diff --git a/examples/ipsec-secgw/test/common_defs.sh b/examples/ipsec-secgw/test/common_defs.sh
index 8dc574b50..63ad5415d 100644
--- a/examples/ipsec-secgw/test/common_defs.sh
+++ b/examples/ipsec-secgw/test/common_defs.sh
@@ -1,22 +1,10 @@
 #! /bin/bash
 
-#check that env vars are properly defined
-
-#check SGW_PATH
-if [[ -z "${SGW_PATH}" || ! -x ${SGW_PATH} ]]; then
-	echo "SGW_PATH is invalid"
-	exit 127
-fi
-
 #check ETH_DEV
 if [[ -z "${ETH_DEV}" ]]; then
 	echo "ETH_DEV is invalid"
 	exit 127
 fi
-
-#setup SGW_LCORE
-SGW_LCORE=${SGW_LCORE:-0}
-
 #check that REMOTE_HOST is reachable
 ssh ${REMOTE_HOST} echo
 st=$?
@@ -47,14 +35,6 @@ LOCAL_IPV6=fd12:3456:789a:0031:0000:0000:0000:0092
 DPDK_PATH=${RTE_SDK:-${PWD}}
 DPDK_BUILD=${RTE_TARGET:-x86_64-native-linux-gcc}
 
-SGW_OUT_FILE=./ipsec-secgw.out1
-
-SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4 ${ETH_DEV}"
-SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
-SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
-
-SGW_CFG_FILE=$(mktemp)
-
 # configure local host/ifaces
 config_local_iface()
 {
@@ -126,37 +106,7 @@ config6_iface()
 	config6_remote_iface
 }
 
-#start ipsec-secgw
-secgw_start()
-{
-	SGW_EXEC_FILE=$(mktemp)
-	cat <<EOF > ${SGW_EXEC_FILE}
-${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
---vdev="net_tap0,mac=fixed" \
--- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
-${SGW_OUT_FILE} 2>&1 &
-p=\$!
-echo \$p
-EOF
-
-	cat ${SGW_EXEC_FILE}
-	SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
-
-	# wait till ipsec-secgw start properly
-	i=0
-	st=1
-	while [[ $i -ne 10 && st -ne 0 ]]; do
-		sleep 1
-		ifconfig ${LOCAL_IFACE}
-		st=$?
-		let i++
-	done
-}
-
-#stop ipsec-secgw and cleanup
-secgw_stop()
-{
-	kill ${SGW_PID}
-	rm -f ${SGW_EXEC_FILE}
-	rm -f ${SGW_CFG_FILE}
-}
+# secgw application parameters setup
+SGW_PORT_CFG="--vdev=\"net_tap0,mac=fixed\" ${ETH_DEV}"
+SGW_WAIT_DEV="${LOCAL_IFACE}"
+. ${DIR}/common_defs_secgw.sh
diff --git a/examples/ipsec-secgw/test/common_defs_secgw.sh b/examples/ipsec-secgw/test/common_defs_secgw.sh
new file mode 100644
index 000000000..a50c03cb3
--- /dev/null
+++ b/examples/ipsec-secgw/test/common_defs_secgw.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+# check required parameters
+SGW_REQ_VARS="SGW_PATH SGW_PORT_CFG SGW_WAIT_DEV"
+for reqvar in ${SGW_REQ_VARS}
+do
+	if [[ -z "${!reqvar}" ]]; then
+		echo "Required parameter ${reqvar} is empty"
+		exit 127
+	fi
+done
+
+# check if SGW_PATH point to an executable
+if [[ ! -x ${SGW_PATH} ]]; then
+	echo "${SGW_PATH} is not executable"
+	exit 127
+fi
+
+# setup SGW_LCORE
+SGW_LCORE=${SGW_LCORE:-0}
+
+# setup config and output filenames
+SGW_OUT_FILE=./ipsec-secgw.out1
+SGW_CFG_FILE=$(mktemp)
+
+# setup secgw parameters
+SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4"
+SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
+SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
+
+# start ipsec-secgw
+secgw_start()
+{
+	SGW_EXEC_FILE=$(mktemp)
+	cat <<EOF > ${SGW_EXEC_FILE}
+stdbuf -o0 ${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
+${SGW_PORT_CFG} ${SGW_EAL_XPRM} \
+-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
+${SGW_OUT_FILE} 2>&1 &
+p=\$!
+echo \$p
+EOF
+
+	cat ${SGW_EXEC_FILE}
+	cat ${SGW_CFG_FILE}
+	SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
+
+	# wait till ipsec-secgw start properly
+	i=0
+	st=1
+	while [[ $i -ne 10 && $st -ne 0 ]]; do
+		sleep 1
+		ifconfig ${SGW_WAIT_DEV}
+		st=$?
+		let i++
+	done
+}
+
+# stop ipsec-secgw and cleanup
+secgw_stop()
+{
+	kill ${SGW_PID}
+	rm -f ${SGW_EXEC_FILE}
+	rm -f ${SGW_CFG_FILE}
+}
diff --git a/examples/ipsec-secgw/test/pkttest.py b/examples/ipsec-secgw/test/pkttest.py
new file mode 100755
index 000000000..bcad2156b
--- /dev/null
+++ b/examples/ipsec-secgw/test/pkttest.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python3
+
+import fcntl
+import pkg_resources
+import socket
+import struct
+import sys
+import unittest
+
+
+if sys.version_info < (3, 0):
+    print("Python3 is required to run this script")
+    sys.exit(1)
+
+
+try:
+    from scapy.all import Ether
+except ImportError:
+    print("Scapy module is required")
+    sys.exit(1)
+
+
+PKTTEST_REQ = [
+    "scapy==2.4.3rc1",
+]
+
+
+def assert_requirements(req):
+    """
+    assert requirement is met
+    req can hold a string or a list of strings
+    """
+    try:
+        pkg_resources.require(req)
+    except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
+        print("Requirement assertion: " + str(e))
+        sys.exit(1)
+
+
+TAP_UNPROTECTED = "dtap1"
+TAP_PROTECTED = "dtap0"
+
+
+class Interface(object):
+    ETH_P_ALL = 3
+    MAX_PACKET_SIZE = 1280
+    IOCTL_GET_INFO = 0x8927
+    SOCKET_TIMEOUT = 0.5
+    def __init__(self, ifname):
+        self.name = ifname
+
+        # create and bind socket to specified interface
+        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
+        self.s.settimeout(Interface.SOCKET_TIMEOUT)
+        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
+
+        # get interface MAC address
+        info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
+        self.mac = ':'.join(['%02x' % i for i in info[18:24]])
+
+    def __del__(self):
+        self.s.close()
+
+    def send_l3packet(self, pkt, mac):
+        e = Ether(src=self.mac, dst=mac)
+        self.send_packet(e/pkt)
+
+    def send_packet(self, pkt):
+        self.send_bytes(bytes(pkt))
+
+    def send_bytes(self, bytedata):
+        self.s.send(bytedata)
+
+    def recv_packet(self):
+        return Ether(self.recv_bytes())
+
+    def recv_bytes(self):
+        return self.s.recv(Interface.MAX_PACKET_SIZE)
+
+    def get_mac(self):
+        return self.mac
+
+
+class PacketXfer(object):
+    def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
+        self.protected_port = Interface(protected_iface)
+        self.unprotected_port = Interface(unprotected_iface)
+
+    def send_to_protected_port(self, pkt, remote_mac=None):
+        if remote_mac is None:
+            remote_mac = self.unprotected_port.get_mac()
+        self.protected_port.send_l3packet(pkt, remote_mac)
+
+    def send_to_unprotected_port(self, pkt, remote_mac=None):
+        if remote_mac is None:
+            remote_mac = self.protected_port.get_mac()
+        self.unprotected_port.send_l3packet(pkt, remote_mac)
+
+    def xfer_unprotected(self, pkt):
+        self.send_to_unprotected_port(pkt)
+        return self.protected_port.recv_packet()
+
+    def xfer_protected(self, pkt):
+        self.send_to_protected_port(pkt)
+        return self.unprotected_port.recv_packet()
+
+
+def pkttest():
+    if len(sys.argv) == 1:
+        sys.exit(unittest.main(verbosity=2))
+    elif len(sys.argv) == 2:
+        if sys.argv[1] == "config":
+            module = __import__('__main__')
+            try:
+                print(module.config())
+            except AttributeError:
+                sys.stderr.write("Cannot find \"config()\" in a test")
+                sys.exit(1)
+    else:
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
+        assert_requirements(PKTTEST_REQ)
+    else:
+        print("Usage: " + sys.argv[0] + " check_reqs")
diff --git a/examples/ipsec-secgw/test/pkttest.sh b/examples/ipsec-secgw/test/pkttest.sh
new file mode 100755
index 000000000..04cd96b2e
--- /dev/null
+++ b/examples/ipsec-secgw/test/pkttest.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+DIR=$(dirname $0)
+
+if [ $(id -u) -ne 0 ]; then
+	echo "Run as root"
+	exit 1
+fi
+
+# check python requirements
+python3 ${DIR}/pkttest.py check_reqs
+if [ $? -ne 0 ]; then
+	echo "Requirements for Python not met, exiting"
+	exit 1
+fi
+
+# secgw application parameters setup
+CRYPTO_DEV="--vdev=crypto_null0"
+SGW_PORT_CFG="--vdev=net_tap0,mac=fixed --vdev=net_tap1,mac=fixed"
+SGW_EAL_XPRM="--no-pci"
+SGW_CMD_XPRM=-l
+SGW_WAIT_DEV="dtap0"
+. ${DIR}/common_defs_secgw.sh
+
+echo "Running tests: $*"
+for testcase in $*
+do
+	# check test file presence
+	testfile="${DIR}/${testcase}.py"
+	if [ ! -f ${testfile} ]; then
+		echo "Invalid test ${testcase}"
+		continue
+	fi
+
+	# prepare test config
+	python3 ${testfile} config > ${SGW_CFG_FILE}
+	if [ $? -ne 0 ]; then
+		rm -f ${SGW_CFG_FILE}
+		echo "Cannot get secgw configuration for test ${testcase}"
+		exit 1
+	fi
+
+	# start the application
+	secgw_start
+
+	# setup interfaces
+	ifconfig dtap0 up
+	ifconfig dtap1 up
+
+	# run the test
+	echo "Running test case: ${testcase}"
+	python3 ${testfile}
+	st=$?
+
+	# stop the application
+	secgw_stop
+
+	# report test result and exit on failure
+	if [ $st -eq 0 ]; then
+		echo "Test case ${testcase} succeeded"
+	else
+		echo "Test case ${testcase} failed!"
+		exit $st
+	fi
+done
diff --git a/examples/ipsec-secgw/test/run_test.sh b/examples/ipsec-secgw/test/run_test.sh
old mode 100644
new mode 100755
index 3a1a7d4b4..4969effdb
--- a/examples/ipsec-secgw/test/run_test.sh
+++ b/examples/ipsec-secgw/test/run_test.sh
@@ -17,6 +17,17 @@
 # naming convention:
 # 'old' means that ipsec-secgw will run in legacy (non-librte_ipsec mode)
 # 'tun/trs' refer to tunnel/transport mode respectively
+
+usage()
+{
+	echo "Usage:"
+	echo -e "\t$0 -[46p]"
+	echo -e "\t\t-4 Perform Linux IPv4 network tests"
+	echo -e "\t\t-6 Perform Linux IPv6 network tests"
+	echo -e "\t\t-p Perform packet validation tests"
+	echo -e "\t\t-h Display this help"
+}
+
 LINUX_TEST="tun_aescbc_sha1 \
 tun_aescbc_sha1_esn \
 tun_aescbc_sha1_esn_atom \
@@ -50,47 +61,82 @@ trs_3descbc_sha1_old \
 trs_3descbc_sha1_esn \
 trs_3descbc_sha1_esn_atom"
 
-DIR=`dirname $0`
+PKT_TESTS="trs_ipv6opts"
+
+DIR=$(dirname $0)
 
 # get input options
-st=0
 run4=0
 run6=0
-while [[ ${st} -eq 0 ]]; do
-	getopts ":46" opt
-	st=$?
-	if [[ "${opt}" == "4" ]]; then
-		run4=1
-	elif [[ "${opt}" == "6" ]]; then
-		run6=1
-	fi
+runpkt=0
+while getopts ":46ph" opt
+do
+	case $opt in
+		4)
+			run4=1
+			;;
+		6)
+			run6=1
+			;;
+		p)
+			runpkt=1
+			;;
+		h)
+			usage
+			exit 0
+			;;
+		?)
+			echo "Invalid option"
+			usage
+			exit 127
+			;;
+	esac
 done
 
-if [[ ${run4} -eq 0 && ${run6} -eq 0 ]]; then
+# no test suite has been selected
+if [[ ${run4} -eq 0 && ${run6} -eq 0 && ${runpkt} -eq 0 ]]; then
+	usage
 	exit 127
 fi
 
-for i in ${LINUX_TEST}; do
-
-	echo "starting test ${i}"
+# perform packet processing validation tests
+st=0
+if [ $runpkt -eq 1 ]; then
+	echo "Performing packet validation tests"
+	/bin/bash ${DIR}/pkttest.sh ${PKT_TESTS}
+	st=$?
 
-	st4=0
-	if [[ ${run4} -ne 0 ]]; then
-		/bin/bash ${DIR}/linux_test4.sh ${i}
-		st4=$?
-		echo "test4 ${i} finished with status ${st4}"
+	echo "pkttests finished with status ${st}"
+	if [[ ${st} -ne 0 ]]; then
+		echo "ERROR pkttests FAILED"
+		exit ${st}
 	fi
+fi
 
-	st6=0
-	if [[ ${run6} -ne 0 ]]; then
-		/bin/bash ${DIR}/linux_test6.sh ${i}
-		st6=$?
-		echo "test6 ${i} finished with status ${st6}"
-	fi
+# perform network tests
+if [[ ${run4} -eq 1 || ${run6} -eq 1 ]]; then
+	for i in ${LINUX_TEST}; do
 
-	let "st = st4 + st6"
-	if [[ $st -ne 0 ]]; then
-		echo "ERROR test ${i} FAILED"
-		exit $st
-	fi
-done
+		echo "starting test ${i}"
+
+		st4=0
+		if [[ ${run4} -ne 0 ]]; then
+			/bin/bash ${DIR}/linux_test4.sh ${i}
+			st4=$?
+			echo "test4 ${i} finished with status ${st4}"
+		fi
+
+		st6=0
+		if [[ ${run6} -ne 0 ]]; then
+			/bin/bash ${DIR}/linux_test6.sh ${i}
+			st6=$?
+			echo "test6 ${i} finished with status ${st6}"
+		fi
+
+		let "st = st4 + st6"
+		if [[ $st -ne 0 ]]; then
+			echo "ERROR test ${i} FAILED"
+			exit $st
+		fi
+	done
+fi
diff --git a/examples/ipsec-secgw/test/trs_ipv6opts.py b/examples/ipsec-secgw/test/trs_ipv6opts.py
new file mode 100755
index 000000000..167c89617
--- /dev/null
+++ b/examples/ipsec-secgw/test/trs_ipv6opts.py
@@ -0,0 +1,181 @@
+#!/usr/bin/env python3
+
+from scapy.all import *
+import unittest
+import pkttest
+
+
+SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
+DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
+SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
+DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
+
+
+def config():
+    return """
+sp ipv6 out esp protect 5 pri 1 \\
+src {0} \\
+dst {1} \\
+sport 0:65535 dport 0:65535
+
+sp ipv6 in esp protect 6 pri 1 \\
+src {1} \\
+dst {0} \\
+sport 0:65535 dport 0:65535
+
+sa out 5 cipher_algo null auth_algo null mode transport
+sa in 6 cipher_algo null auth_algo null mode transport
+
+rt ipv6 dst {0} port 1
+rt ipv6 dst {1} port 0
+""".format(SRC_NET, DST_NET)
+
+
+class TestTransportWithIPv6Ext(unittest.TestCase):
+    # There is a bug in the IPsec Scapy implementation
+    # which causes invalid packet reconstruction after
+    # successful decryption. This method is a workaround.
+    @staticmethod
+    def decrypt(pkt, sa):
+        esp = pkt[ESP]
+
+        # decrypt dummy packet with no extensions
+        d = sa.decrypt(IPv6()/esp)
+
+        # fix 'next header' in the preceding header of the original
+        # packet and remove ESP
+        pkt[ESP].underlayer.nh = d[IPv6].nh
+        pkt[ESP].underlayer.remove_payload()
+
+        # combine L3 header with decrypted payload
+        npkt = pkt/d[IPv6].payload
+
+        # fix length
+        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
+
+        return npkt
+
+    def setUp(self):
+        self.px = pkttest.PacketXfer()
+        self.outb_sa = SecurityAssociation(ESP, spi=5)
+        self.inb_sa = SecurityAssociation(ESP, spi=6)
+
+    def test_outb_ipv6_noopt(self):
+        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+        # send and check response
+        resp = self.px.xfer_unprotected(pkt)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_outb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+        # send and check response
+        resp = self.px.xfer_unprotected(pkt)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+
+        # check extensions
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
+
+        # check ESP
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_inb_ipv6_noopt(self):
+        # encrypt and send raw UDP packet
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # send and check response
+        resp = self.px.xfer_protected(e)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
+
+        # check UDP packet
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        # prepare packet with options
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # self encrypted packet and check response
+        resp = self.px.xfer_protected(e)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_frag(self):
+        # prepare ESP payload
+        pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # craft and send inbound packet
+        e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
+        resp = self.px.xfer_protected(e)
+
+        # check response
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
+        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+
+pkttest.pkttest()
-- 
2.17.1



More information about the dev mailing list