[dts] [PATCH v2] Add auto test cases generating function. That covers different combination of keys, iv, packet length, cipher algorithms and hash algorithms

Liu, Yong yong.liu at intel.com
Mon Jan 16 06:03:56 CET 2017


Thanks, Zhaoyan. Some comments below.

> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of Chen, Zhaoyan
> Sent: Tuesday, January 10, 2017 4:23 PM
> To: dts at dpdk.org
> Cc: Chen, Zhaoyan <zhaoyan.chen at intel.com>
> Subject: [dts] [PATCH v2] Add auto test cases generating function. That covers
> different combination of keys, iv, packet length, cipher algorithms and hash
> algorithms
> 
> From: Zhaoyan Chen<zhaoyan.chen at intel.com>
> 
> ---
>  tests/TestSuite_l2fwd_crypto.py | 2007
> ++++++++++++++++++++++++++++++++-------
>  1 file changed, 1687 insertions(+), 320 deletions(-)
> 
> diff --git a/tests/TestSuite_l2fwd_crypto.py b/tests/TestSuite_l2fwd_crypto.py
> index 9c78a6f..74728ea 100644
> --- a/tests/TestSuite_l2fwd_crypto.py
> +++ b/tests/TestSuite_l2fwd_crypto.py
> @@ -32,12 +32,12 @@
>  import hmac
>  import hashlib
>  import binascii
> -
> -import utils
>  import time
> -
> +import os
> +import sys
> +import dts

Please remove dts, test suite should not depend on this module now.

>  from test_case import TestCase
> -
> +from CryptoMobile.CM import *

Please include required functions apparently. 

> 
>  class TestL2fwdCrypto(TestCase):
> 
> @@ -55,10 +55,10 @@ class TestL2fwdCrypto(TestCase):
>          self.logger.info("dut ports = " + str(self.dut_ports))
>          self.logger.info("ports_socket = " + str(self.ports_socket))
> 
> -        self.core_mask = utils.create_mask(self.dut.get_core_list(
> +        self.core_mask = dts.create_mask(self.dut.get_core_list(
>                                           self.core_config,


Currently we use utils.create_mask to generate mask.

> self.tester.scapy_append('sendp([Ether(src="52:00:00:00:00:00")/IP(src="192.16
> 8.1.1",dst="192.168.1.2")/Raw(load=\"%s\")], iface="%s", count=%s)' %
> (payload, self.tx_interface, PACKET_COUNT))

Suggest to use packet module for transmission and capture packets.
You can reference to userspace_ethtool suit.

> +        self.logger.info("Test qat_h_AES_XCBC_MAC_01")
> +        if not self.__execute_l2fwd_crypto_test(
> +                test_vectors, "qat_h_AES_XCBC_MAC_01"):
> +            result = False
> 
> -        self.tester.scapy_execute()
> +        self.verify(result, True)
> 
> -        time.sleep(5)
> +    def test_null_NULL(self):
> 
> -        self.tester.send_expect("killall tcpdump", "#")
> -        self.tester.send_expect("^C", "#")
> +        result = True
> 
> -        # Wait 5 secs for tcpdump exit
> -        time.sleep(5)
> +        self.logger.info("Test null_c_NULL_01")
> +        if not self.__execute_l2fwd_crypto_test(
> +                test_vectors, "null_c_NULL_01"):
> +            result = False
> 
> -        self.tester.send_expect("scapy", ">>>")
> -        self.tester.send_expect("p=rdpcap('%s.pcap', count=%s)" %
> (self.rx_interface, PACKET_COUNT), ">>>")
> +        self.verify(result, True)
> +
> +    def test_calculatr_case_number(self):
> +
Typo here.

> +        self.__calculate_totall_cases_numb()
> +
> +    def __calculate_totall_cases_numb(self):

Not sure why add this case, only dump total number of cases? 

> +
> +    def __execute_l2fwd_crypto_test(self, test_vectors, test_vector_name):
> 
> -        hex_list = []
> -        for i in range(PACKET_COUNT):
> -            cmd = "linehexdump(p[%s],onlyhex=1)" % i
> -            hex_list.append(self.tester.send_expect(cmd, ">>>"))
> +        if test_vector_name not in test_vectors:
> +            self.logger.warn("SKIP : " + test_vector_name)
> +            return True
> 
> -        # Exit the scapy
> -        self.tester.send_expect("exit()", "#", 60)
> +        test_vector = test_vectors[test_vector_name]
> 
> -        for hex_str in hex_list:
> -            packet_hex = hex_str.split(" ")
> -            # self.logger.info(hex_str)
> -            # self.logger.info(packet_hex)
> +        test_vector_list = self.__test_vector_to_vector_list(test_vector,
> +                core_mask=self.core_mask,
> +                port_mask=self.port_mask)
> 
> -            cipher_offset = 34
> -            cipher_length = len(test_vector["output_cipher"])/2
> -            if cipher_length == 0:
> -                cipher_length = len(test_vector["input"])/2
> -            cipher_text =
> "".join(packet_hex[cipher_offset:cipher_offset+cipher_length])
> -            # self.logger.info("Cipher text in packet = " + cipher_text)
> -            # self.logger.info("Ref Cipher text       = " + test_vector["output_cipher"])
> -            if str.lower(cipher_text) == str.lower(test_vector["output_cipher"]):
> -                self.logger.info("Cipher Matched.")
> -            else:
> -                if test_vector["output_cipher"] != "":
> -                    result = False
> -                    self.logger.info("Cipher NOT Matched.")
> -                    self.logger.info("Cipher text in packet = " + cipher_text)
> -                    self.logger.info("Ref Cipher text       = " +
> test_vector["output_cipher"])
> +        result = True
> +        self.logger.info("Total Generated {0} Tests".format(len(test_vector_list)))
> +        for test_vector in test_vector_list:
> +            self.logger.debug(test_vector)
> +            cmd_str = self.__test_vector_to_cmd(test_vector,
> +                                                core_mask=self.core_mask,
> +                                                port_mask=self.port_mask)
> +            self.dut.send_expect(cmd_str, "==", 30)
> +
> +            self.tester.send_expect("rm -rf %s.pcap" % (self.rx_interface), "#")
> +            self.tester.send_expect("tcpdump -P in -w %s.pcap -i %s &" %
> (self.rx_interface, self.rx_interface), "#")
> +            # Wait 5 sec for tcpdump stable
> +            time.sleep(5)
> +
> +            payload = self.__format_hex_to_param(test_vector["input"], "\\x",
> "\\x")
> +
> +            PACKET_COUNT = 65
> +
> +            self.tester.scapy_foreground()
> +
> self.tester.scapy_append('sendp([Ether(src="52:00:00:00:00:00")/IP(src="192.16
> 8.1.1",dst="192.168.1.2")/Raw(load=\"%s\")], iface="%s", count=%s)' %
> (payload, self.tx_interface, PACKET_COUNT))
> +
> +            self.tester.scapy_execute()
> +
> +            time.sleep(5)
> +
> +            self.tester.send_expect("killall tcpdump", "#")
> +            self.tester.send_expect("^C", "#")
> +
> +            # Wait 5 secs for tcpdump exit
> +            time.sleep(5)
> +
> +            self.tester.send_expect("scapy", ">>>")
> +            self.tester.send_expect("p=rdpcap('%s.pcap', count=%s)" %
> (self.rx_interface, PACKET_COUNT), ">>>")
> +
> +            hex_list = []
> +            for i in range(PACKET_COUNT):
> +                cmd = "linehexdump(p[%s],onlyhex=1)" % i
> +                hex_list.append(self.tester.send_expect(cmd, ">>>"))
> +

Please try strip_pktload in packet module, we want suite use unified packet module.


More information about the dts mailing list