[dts] [PATCH V1 5/6] tests/ipsec_gw_cryptodev_func: rework cryptodev ipsec test script

xinfengx xinfengx.zhao at intel.com
Thu Jun 4 02:54:44 CEST 2020


rework cryptodev ipsec test script
remove the redundant code

Signed-off-by: xinfengx <xinfengx.zhao at intel.com>
---
 tests/TestSuite_ipsec_gw_cryptodev_func.py | 624 ++++-----------------
 1 file changed, 97 insertions(+), 527 deletions(-)

diff --git a/tests/TestSuite_ipsec_gw_cryptodev_func.py b/tests/TestSuite_ipsec_gw_cryptodev_func.py
index c6bf6a9..de123dc 100644
--- a/tests/TestSuite_ipsec_gw_cryptodev_func.py
+++ b/tests/TestSuite_ipsec_gw_cryptodev_func.py
@@ -29,36 +29,29 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-import hmac
-import hashlib
 import binascii
 import time
 import utils
 from test_case import TestCase
 import packet
 
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESGCM
-from cryptography.hazmat.backends import default_backend
-
 import cryptodev_common as cc
 
 class TestIPsecGW(TestCase):
 
     def set_up_all(self):
-
-        self.core_config = "1S/2C/1T"
-        self.number_of_ports = 1
+        self.core_config = "1S/3C/1T"
+        self.number_of_ports = 2
         self.dut_ports = self.dut.get_ports(self.nic)
         self.verify(len(self.dut_ports) >= self.number_of_ports,
                     "Not enough ports for " + self.nic)
         self.ports_socket = self.dut.get_numa_id(self.dut_ports[0])
+        self.core_list = self.dut.get_core_list(self.core_config, socket=self.ports_socket)
 
         self.logger.info("core config = " + self.core_config)
         self.logger.info("number of ports = " + str(self.number_of_ports))
         self.logger.info("dut ports = " + str(self.dut_ports))
         self.logger.info("ports_socket = " + str(self.ports_socket))
-
         # Generally, testbed should has 4 ports NIC, like,
         # 03:00.0 03:00.1 03:00.2 03:00.3
         # This test case will
@@ -83,22 +76,18 @@ class TestIPsecGW(TestCase):
         self.verify("Error"not in out,"Compilation error")
         self.verify("No such"not in out,"Compilation error")
 
-        self.vf_driver = self.get_suite_cfg()['vf_driver']
-        cc.bind_qat_device(self, self.vf_driver)
+        cc.bind_qat_device(self)
 
         self._default_ipsec_gw_opts = {
-            "config": None,
+            "config": '"(0,0,%s),(1,0,%s)"' % tuple(self.core_list[-2:]),
             "P": "",
             "p": "0x3",
-            "f": "conf/ipsec_ep0.cfg",
+            "f": "/tmp/ipsec_ep0.cfg",
             "u": "0x1"
         }
 
-        self._pcap_idx = 0
-        self.pcap_filename = ''
-
         conf_file = r'conf/ipsec_ep0.cfg'
-        self.dut.session.copy_file_to(conf_file)
+        self.dut.session.copy_file_to(conf_file, '/tmp')
 
     def set_up(self):
         pass
@@ -110,604 +99,181 @@ class TestIPsecGW(TestCase):
         cc.clear_dpdk_config(self)
 
     def test_qat_aes_128_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_qat_aes_128_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_256_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_256_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_qat_aes_256_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_gcm_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_gcm_ipv4_tunnel")
-        self.pcap_filename = "test_qat_aes_gcm_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_ctr_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_ctr_ipv4_tunnel")
-        self.pcap_filename = "test_qat_aes_128_ctr_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_ctr_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_ctr_ipv6_tunnel")
-        self.pcap_filename = "test_qat_aes_128_ctr_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_ctr_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_ctr_ipv4_transport")
-        self.pcap_filename = "test_qat_aes_128_ctr_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_ctr_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_ctr_ipv6_transport")
-        self.pcap_filename = "test_qat_aes_128_ctr_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_null_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_null_ipv4_tunnel")
-        self.pcap_filename = "test_qat_null_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_cbc_ipv4_transport")
-        self.pcap_filename = "test_qat_aes_128_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_256_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_256_cbc_ipv4_transport")
-        self.pcap_filename = "test_qat_aes_256_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_gcm_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_gcm_ipv4_transport")
-        self.pcap_filename = "test_qat_aes_gcm_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_null_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_null_ipv4_transport")
-        self.pcap_filename = "test_qat_null_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
-
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_qat_aes_128_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_256_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_256_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_qat_aes_256_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_gcm_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_gcm_ipv6_tunnel")
-        self.pcap_filename = "test_qat_aes_gcm_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_null_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_null_ipv6_tunnel")
-        self.pcap_filename = "test_qat_null_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_128_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_128_cbc_ipv6_transport")
-        self.pcap_filename = "test_qat_aes_128_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_256_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_256_cbc_ipv6_transport")
-        self.pcap_filename = "test_qat_aes_256_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_aes_gcm_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_aes_gcm_ipv6_transport")
-        self.pcap_filename = "test_qat_aes_gcm_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_null_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_null_ipv6_transport")
-        self.pcap_filename = "test_qat_null_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
-
+        self._execute_ipsec_gw_test()
 
     def test_qat_3des_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_3des_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_qat_3des_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_3des_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_3des_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_qat_3des_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_3des_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_3des_cbc_ipv4_transport")
-        self.pcap_filename = "test_qat_3des_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_qat_3des_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test qat_3des_cbc_ipv6_transport")
-        self.pcap_filename = "test_qat_3des_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_sw_aes_128_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_256_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_256_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_sw_aes_256_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_gcm_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_gcm_ipv4_tunnel")
-        self.pcap_filename = "test_sw_aes_gcm_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_null_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_null_ipv4_tunnel")
-        self.pcap_filename = "test_sw_null_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_cbc_ipv4_transport")
-        self.pcap_filename = "test_sw_aes_128_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_256_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_256_cbc_ipv4_transport")
-        self.pcap_filename = "test_sw_aes_256_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_gcm_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_gcm_ipv4_transport")
-        self.pcap_filename = "test_sw_aes_gcm_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_null_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_null_ipv4_transport")
-        self.pcap_filename = "test_sw_null_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_sw_aes_128_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_256_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_256_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_sw_aes_256_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_gcm_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_gcm_ipv6_tunnel")
-        self.pcap_filename = "test_sw_aes_gcm_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_null_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_null_ipv6_tunnel")
-        self.pcap_filename = "test_sw_null_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_cbc_ipv6_transport")
-        self.pcap_filename = "test_sw_aes_128_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_256_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_256_cbc_ipv6_transport")
-        self.pcap_filename = "test_sw_aes_256_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_gcm_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_gcm_ipv6_transport")
-        self.pcap_filename = "test_sw_aes_gcm_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_null_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_null_ipv6_transport")
-        self.pcap_filename = "test_sw_null_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_ctr_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_ctr_ipv4_tunnel")
-        self.pcap_filename = "test_sw_aes_128_ctr_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_ctr_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_ctr_ipv6_tunnel")
-        self.pcap_filename = "test_sw_aes_128_ctr_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_ctr_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_ctr_ipv4_transport")
-        self.pcap_filename = "test_sw_aes_128_ctr_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_aes_128_ctr_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_aes_128_ctr_ipv6_transport")
-        self.pcap_filename = "test_sw_aes_128_ctr_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_3des_cbc_ipv4_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_3des_cbc_ipv4_tunnel")
-        self.pcap_filename = "test_sw_3des_cbc_ipv4_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_3des_cbc_ipv6_tunnel(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_3des_cbc_ipv6_tunnel")
-        self.pcap_filename = "test_sw_3des_cbc_ipv6_tunnel"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_3des_cbc_ipv4_transport(self):
-        if cc.is_test_skip(self):
-            return
-
-        self.logger.info("Test sw_3des_cbc_ipv4_transport")
-        self.pcap_filename = "test_sw_3des_cbc_ipv4_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
-
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        self._execute_ipsec_gw_test()
 
     def test_sw_3des_cbc_ipv6_transport(self):
-        if cc.is_test_skip(self):
-            return
+        self._execute_ipsec_gw_test()
+
+    def _get_crypto_device(self, num):
+        device = {}
+        if self.get_case_cfg()["devtype"] == "crypto_aesni_mb":
+            dev = "crypto_aesni_mb"
+        elif self.get_case_cfg()["devtype"] == "crypto_qat":
+            w = cc.get_qat_devices(self, cpm_num=1, num=num)
+            device["w"] = ' -w '.join(w)
+            device["vdev"] = None
+        elif self.get_case_cfg()["devtype"] == "crypto_openssl":
+            dev = "crypto_openssl"
+        elif self.get_case_cfg()["devtype"] == "crypto_aesni_gcm":
+            dev = "crypto_aesni_gcm"
+        elif self.get_case_cfg()["devtype"] == "crypto_kasumi":
+            dev = "crypto_kasumi"
+        elif self.get_case_cfg()["devtype"] == "crypto_snow3g":
+            dev = "crypto_snow3g"
+        elif self.get_case_cfg()["devtype"] == "crypto_zuc":
+            dev = "crypto_zuc"
+        elif self.get_case_cfg()["devtype"] == "crypto_null":
+            dev = "crypto_null"
+        else:
+            return {}
 
-        self.logger.info("Test sw_3des_cbc_ipv6_transport")
-        self.pcap_filename = "test_sw_3des_cbc_ipv6_transport"
-        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
-        self.logger.debug(ipsec_gw_opt_str)
+        if not device:
+            vdev_list = []
+            for i in range(num):
+                vdev = "{}{}".format(dev, i)
+                vdev_list.append(vdev)
+            device["w"] = "0000:00:00.0"
+            device["vdev"] = ' --vdev '.join(vdev_list)
 
-        result = self._execute_ipsec_gw_test(ipsec_gw_opt_str)
-        self.verify(result, "FAIL")
+        return device
 
     def _get_ipsec_gw_opt_str(self, override_ipsec_gw_opts={}):
         if "librte_ipsec" in list(self.get_suite_cfg().keys()) and self.get_suite_cfg()["librte_ipsec"]:
@@ -715,12 +281,18 @@ class TestIPsecGW(TestCase):
         return cc.get_opt_str(self, self._default_ipsec_gw_opts,
                               override_ipsec_gw_opts)
 
-    def _execute_ipsec_gw_test(self, ipsec_gw_opt_str):
+    def _execute_ipsec_gw_test(self):
+        if cc.is_test_skip(self):
+            return
+
         result = True
-        eal_opt_str = cc.get_eal_opt_str(self, add_port=True)
+        opts = {'l': ','.join(self.core_list)}
+        devices = self._get_crypto_device(self.number_of_ports)
+        opts.update(devices)
+        eal_opt_str = cc.get_eal_opt_str(self, opts, add_port=True)
+        ipsec_gw_opt_str = self._get_ipsec_gw_opt_str()
 
         cmd_str = cc.get_dpdk_app_cmd_str(self._app_path, eal_opt_str, ipsec_gw_opt_str)
-        self.logger.info("IPsec-gw cmd: " + cmd_str)
         self.dut.send_expect(cmd_str, "IPSEC:", 30)
         time.sleep(3)
         inst = self.tester.tcpdump_sniff_packets(self.rx_interface)
@@ -750,10 +322,9 @@ class TestIPsecGW(TestCase):
 
         pkt_rec = self.tester.load_tcpdump_sniff_packets(inst)
 
-        pcap_filename = "{0}.pcap".format(self.pcap_filename)
+        pcap_filename = "{0}.pcap".format(self.running_case)
         self.logger.info("Save pkts to {0}".format(packet.TMP_PATH + pcap_filename))
         pkt_rec.save_pcapfile(self.tester, pcap_filename)
-        self._pcap_idx = self._pcap_idx + 1
 
         if len(pkt_rec) == 0:
             self.logger.error("IPsec forwarding failed")
@@ -792,5 +363,4 @@ class TestIPsecGW(TestCase):
                 result = False
                 break
 
-        self.dut.kill_all()
-        return result
+        self.verify(result, "FAILED")
-- 
2.17.1



More information about the dts mailing list