[dts] [PATCH 2/3] Add test suite "l3fwdacl" There're three cases contained in this suite, l3fwdacl_acl_rule l3fwdacl_exact_route l3fwdacl_invalid l3fwdacl_lpm_route l3fwdacl_scalar Signed-off-by: Lijuan Tu <lijuanx.a.tu at intel.com>

lijuan,tu lijuanx.a.tu at intel.com
Tue Sep 15 03:37:41 CEST 2015


From: Lijuan Tu <lijuanx.a.tu at intel.com>


Signed-off-by: Lijuan Tu <lijuanx.a.tu at intel.com>
---
 tests/TestSuite_l3fwdacl.py | 1002 +++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 1002 insertions(+), 0 deletions(-)
 create mode 100644 tests/TestSuite_l3fwdacl.py

diff --git a/tests/TestSuite_l3fwdacl.py b/tests/TestSuite_l3fwdacl.py
new file mode 100644
index 0000000..fefeb23
--- /dev/null
+++ b/tests/TestSuite_l3fwdacl.py
@@ -0,0 +1,1002 @@
+# <COPYRIGHT_TAG>
+
+"""
+Layer-3 forwarding ACL test script.
+"""
+
+import dts
+import tester
+import re
+from test_case import TestCase
+
+
+#
+#
+# Test class.
+#
+
+
+class TestL3fwdacl(TestCase):
+
+    exe_path = "./examples/l3fwd-acl/build"
+    all_ipv4_addresses = "0.0.0.0/0"
+    all_ipv6_addresses = "0:0:0:0:0:0:0:0/0"
+    all_ports = "0 : 65535"
+    all_protocols = "0x00/0x00"
+
+    core_list_configs = {
+        "1S/1C/1T": {"config": "", "mask": "", "ports": []},
+        "1S/1C/2T": {"config": "", "mask": "", "ports": []},
+        "1S/2C/1T": {"config": "", "mask": "", "ports": []},
+        "2S/1C/1T": {"config": "", "mask": "", "ports": []},
+    }
+
+    performance_cases = [
+        {"DB": "/root/acl1_1000_lim512.rules",
+         "Pcap0": "acl1_1000_lim512.prtX.pcap",
+         "Pcap2": "acl1_1000_lim512.prtY.pcap"},
+        {"DB": "/root/acl2_1000_lim512.rules",
+         "Pcap0": "acl2_1000_lim512.prtX.pcap",
+         "Pcap2": "acl2_1000_lim512.prtY.pcap"},
+        {"DB": "/root/acl3_1000_lim512.rules",
+         "Pcap0": "acl3_1000_lim512.prtX.pcap",
+         "Pcap2": "acl3_1000_lim512.prtY.pcap"},
+        {"DB": "/root/acl4_1000_lim512.rules",
+         "Pcap0": "acl4_1000_lim512.prtX.pcap",
+         "Pcap2": "acl4_1000_lim512.prtY.pcap"},
+        {"DB": "/root/acl5_1000_lim512.rules",
+         "Pcap0": "acl5_1000_lim512.prtX.pcap",
+         "Pcap2": "acl5_1000_lim512.prtY.pcap"},
+    ]
+
+    performance_results = []
+
+    default_rule = {"Type": "ROUTE", "sIpAddr": "ALL", "dIpAddr": "ALL",
+                    "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL",
+                    "Port": "1"}
+
+    acl_ipv4_rule_list = [
+        {"Type": "ACL", "sIpAddr": "200.10.0.1/32", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "100.10.0.1/32",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "11 : 11", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "101 : 101", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "0x06/0xff", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "200.10.0.1/32", "dIpAddr": "100.10.0.1/32",
+         "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff",
+         "Port": ""}
+    ]
+
+    acl_ipv6_rule_list = [
+        {"Type": "ACL", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/128",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "11 : 11", "dPort": "ALL", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "101 : 101", "Protocol": "ALL", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "0x06/0xff", "Port": ""},
+        {"Type": "ACL", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/128",
+         "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff", "Port": ""},
+    ]
+
+    exact_rule_list_ipv4 = [
+        {"Type": "ROUTE", "sIpAddr": "200.10.0.1/32",
+         "dIpAddr": "100.10.0.1/32", "sPort": "11 : 11",
+         "dPort": "101 : 101", "Protocol": "0x06/0xff", "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "200.20.0.1/32",
+         "dIpAddr": "100.20.0.1/32", "sPort": "12 : 12",
+         "dPort": "102 : 102", "Protocol": "0x06/0xff", "Port": "1"},
+    ]
+
+    exact_rule_list_ipv6 = [
+        {"Type": "ROUTE", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/128",
+         "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff", "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/128",
+         "sPort": "12 : 12", "dPort": "102 : 102", "Protocol": "0x06/0xff", "Port": "1"},
+    ]
+
+    lpm_rule_list_ipv4 = [
+        {"Type": "ROUTE", "sIpAddr": "0.0.0.0/0", "dIpAddr": "1.1.1.0/24",
+         "sPort": "0 : 65535", "dPort": "0 : 65535", "Protocol": "0x00/0x00",
+         "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "0.0.0.0/0", "dIpAddr": "2.1.1.0/24",
+         "sPort": "0 : 65535", "dPort": "0 : 65535", "Protocol": "0x00/0x00",
+         "Port": "1"},
+    ]
+
+    lpm_rule_list_ipv6 = [
+        {"Type": "ROUTE", "sIpAddr": "0:0:0:0:0:0:0:0/0", "dIpAddr": "1:1:1:1:1:1:0:0/96",
+         "sPort": "0 : 65535", "dPort": "0 : 65535", "Protocol": "0x00/0x00", "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "0:0:0:0:0:0:0:0/0", "dIpAddr": "2:1:1:1:1:1:0:0/96",
+         "sPort": "0 : 65535", "dPort": "0 : 65535", "Protocol": "0x00/0x00", "Port": "1"},
+    ]
+
+    scalar_rule_list_ipv4 = [
+        {"Type": "ACL", "sIpAddr": "200.10.0.1/32", "dIpAddr": "100.10.0.1/32",
+         "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff",
+         "Port": ""},
+    ]
+
+    scalar_rule_list_ipv6 = [
+        {"Type": "ACL", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/101",
+         "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff",
+         "Port": ""},
+    ]
+
+    invalid_rule_ipv4_list = [
+        {"Type": "ROUTE", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "12 : 11", "dPort": "ALL", "Protocol": "ALL", "Port": "0"},
+        {"Type": "@R", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "", "Port": ""},
+    ]
+
+    invalid_rule_ipv6_list = [
+        {"Type": "ROUTE", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "12 : 11", "dPort": "ALL", "Protocol": "ALL", "Port": "0"},
+        {"Type": "@R", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "ALL", "Port": "0"},
+        {"Type": "ROUTE", "sIpAddr": "ALL", "dIpAddr": "ALL",
+         "sPort": "ALL", "dPort": "ALL", "Protocol": "", "Port": ""},
+    ]
+
+    invalid_port_rule_ipv4 = {"Type": "ROUTE", "sIpAddr": "200.10.0.1/32",
+                              "dIpAddr": "100.10.0.1/32", "sPort": "11 : 11",
+                              "dPort": "101 : 101", "Protocol": "0x06/0xff",
+                              "Port": "99"}
+
+    invalid_port_rule_ipv6 = {"Type": "ACL", "sIpAddr": "2001:0db8:85a3:08d3:1319:8a2e:0370:7344/128", "dIpAddr": "2002:0db8:85a3:08d3:1319:8a2e:0370:7344/101",
+                              "sPort": "11 : 11", "dPort": "101 : 101", "Protocol": "0x06/0xff", "Port": "99"}
+
+    acl_ipv4_db = "/root/rule_ipv4.db"
+    acl_ipv6_db = "/root/rule_ipv6.db"
+    perfDB = ""
+    rule_format = "%s%s %s %s %s %s %s"
+    default_core_config = "1S/4C/1T"
+
+    #
+    #
+    # Utility methods and other non-test code.
+    #
+    def start_l3fwdacl(self, scalar=False):
+
+        extra_args = ''
+
+        if scalar:
+            extra_args = '--scalar'
+
+        cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s --config="(%d,0,2),(%d,0,3)" --rule_ipv4="%s" --rule_ipv6="%s" %s' % \
+                  (TestL3fwdacl.exe_path, self.core_mask, self.dut.get_memory_channels(),
+                   self.port_mask, self.dut_ports[0], self.dut_ports[1],
+                   TestL3fwdacl.acl_ipv4_db,
+                   TestL3fwdacl.acl_ipv6_db,
+                   extra_args)
+
+        out = self.dut.send_expect(cmdline, "L3FWD:", 30)
+        # self.verify("error" not in out, "l3fwd launch fail")
+
+    def get_core_list(self):
+
+        self.sock0ports = self.dut.get_ports(self.nic, socket=0)
+        self.sock1ports = self.dut.get_ports(self.nic, socket=1)
+
+        core1 = self.dut.get_lcore_id('C{1.1.0}')
+        core2 = self.dut.get_lcore_id('C{1.2.0}')
+        core3 = self.dut.get_lcore_id('C{1.3.0}')
+        core4 = self.dut.get_lcore_id('C{1.4.0}')
+
+        thread1 = self.dut.get_lcore_id('C{1.1.1}')
+        thread2 = self.dut.get_lcore_id('C{1.2.1}')
+        thread3 = self.dut.get_lcore_id('C{0.1.1}')
+
+        socket1 = self.dut.get_lcore_id('C{0.1.0}')
+        socket2 = self.dut.get_lcore_id('C{0.2.0}')
+
+        # l3fwd support first 8 dut_ports only
+        TestL3fwdacl.core_list_configs["1S/1C/1T"]["config"] = "\(%s,0,%s\),\(%s,0,%s\)" % (self.dut_ports[0], core1, self.dut_ports[1], core1)
+        TestL3fwdacl.core_list_configs["1S/1C/1T"]["mask"] = dts.create_mask([core1])
+        TestL3fwdacl.core_list_configs["1S/1C/1T"]["ports"].append(self.dut_ports[0])
+        TestL3fwdacl.core_list_configs["1S/1C/1T"]["ports"].append(self.dut_ports[1])
+        TestL3fwdacl.core_list_configs["1S/1C/2T"]["config"] = "\(%s,0,%s\),\(%s,0,%s\)" % (self.dut_ports[0], core1, self.dut_ports[1], thread1)
+        TestL3fwdacl.core_list_configs["1S/1C/2T"]["mask"] = dts.create_mask([core1, thread1])
+        TestL3fwdacl.core_list_configs["1S/1C/2T"]["ports"].append(self.dut_ports[0])
+        TestL3fwdacl.core_list_configs["1S/1C/2T"]["ports"].append(self.dut_ports[1])
+        TestL3fwdacl.core_list_configs["1S/2C/1T"]["config"] = "\(%s,0,%s\),\(%s,0,%s\)" % (self.dut_ports[0], core1, self.dut_ports[1], core2)
+        TestL3fwdacl.core_list_configs["1S/2C/1T"]["mask"] = dts.create_mask([core1, core2])
+        TestL3fwdacl.core_list_configs["1S/2C/1T"]["ports"].append(self.dut_ports[0])
+        TestL3fwdacl.core_list_configs["1S/2C/1T"]["ports"].append(self.dut_ports[1])
+
+        if len(self.sock0ports) > 0 and len(self.sock1ports) > 0:
+            TestL3fwdacl.core_list_configs["2S/1C/1T"]["config"] = "\(%s,0,%s\),\(%s,0,%s\)" % (self.sock0ports[0], core1, self.sock1ports[0], socket1)
+            TestL3fwdacl.core_list_configs["2S/1C/1T"]["mask"] = dts.create_mask([core1, socket1])
+            TestL3fwdacl.core_list_configs["2S/1C/1T"]["ports"].append(self.sock0ports[0])
+            TestL3fwdacl.core_list_configs["2S/1C/1T"]["ports"].append(self.sock1ports[0])
+
+        # no need to run two socket config if two cards on one slot
+        self.port_mask_1_socket = dts.create_mask([self.dut_ports[0],
+                                                   self.dut_ports[1]])
+
+        if len(self.sock0ports) > 0 and len(self.sock1ports) > 0:
+            self.port_mask_2_sockets = dts.create_mask([self.sock0ports[0],
+                                                        self.sock1ports[0]])
+        else:
+            del TestL3fwdacl.core_list_configs["2S/1C/1T"]
+
+    def change_ports_in_rules_files(self, core_config):
+
+        for case in TestL3fwdacl.performance_cases:
+            db_file = case["DB"]
+
+            sed_command = "sed -i 's/\\tX\\t/\\t%s\\t/' %s" % (TestL3fwdacl.core_list_configs[core_config]["ports"][0],
+                                                               db_file)
+
+            self.dut.send_expect(sed_command, "# ")
+            sed_command = "sed -i 's/\\tY\\t/\\t%s\\t/' %s" % (TestL3fwdacl.core_list_configs[core_config]["ports"][1],
+                                                               db_file)
+
+            self.dut.send_expect(sed_command, "# ")
+
+    def rule_cfg_init(self, acl_ipv4_db, acl_ipv6_db):
+        """
+        initialize the acl rule file
+        """
+        if acl_ipv4_db:
+            self.dut.send_expect("echo '' > %s" % acl_ipv4_db, "# ")
+            self.dut.send_expect("echo 'R0.0.0.0/0 0.0.0.0/0 0 : 65535 0 : 65535 0x00/0x00 %s' >> %s" % (self.dut_ports[1], acl_ipv4_db), "# ")
+        if acl_ipv6_db:
+            self.dut.send_expect("echo '' > %s" % acl_ipv6_db, "# ")
+            self.dut.send_expect("echo 'R0:0:0:0:0:0:0:0/0 0:0:0:0:0:0:0:0/0 0 : 65535 0 : 65535 0x00/0x00 %s' >> %s" % (self.dut_ports[1], acl_ipv6_db), "# ")
+
+    def rule_cfg_delete(self, acl_ipv4_db, acl_ipv6_db):
+        """
+        delete the acle rule file
+        """
+        self.dut.send_expect("rm -rf  %s" % acl_ipv4_db, "# ")
+        self.dut.send_expect("rm -rf  %s" % acl_ipv6_db, "# ")
+
+    def create_ipv4_ip_not_match(self, ip_address):
+        """
+        generate ip not match rule ip
+        """
+        match = r"(\d+).(\d+).(\d+).(\d+)/(\d+)"
+        m = re.search(match, ip_address)
+
+        ip1 = int(m.group(1))
+        ip2 = int(m.group(2))
+        ip3 = int(m.group(3))
+        ip4 = int(m.group(4))
+        mask_len = int(m.group(5))
+
+        ip = ip1 << 24 | ip2 << 16 | ip3 << 8 | ip4
+        ip_diff = ((ip >> (32 - mask_len)) + 1) << (32 - mask_len)
+
+        ip1 = (ip_diff & 0xff000000) >> 24
+        ip2 = (ip_diff & 0x00ff0000) >> 16
+        ip3 = (ip_diff & 0x0000ff00) >> 8
+        ip4 = ip_diff & 0xff
+
+        return "%d.%d.%d.%d/32" % (ip1, ip2, ip3, ip4)
+
+    def create_ipv6_ip_not_match(self, ip_address):
+        """
+        generate ip not match rule ip
+        """
+        return "0:0:0:0:0:0:0:1/128"
+
+    def create_port_not_match(self, port):
+        """
+        generate port number not match rule
+        """
+        return "0 : 0"
+
+    def send_ipv4_packet_not_match(self, rule, tx_port, rx_port):
+        """
+        send a packet not match rule and return whether forwarded
+        """
+        tx_interface = self.tester.get_interface(tx_port)
+        rx_interface = self.tester.get_interface(rx_port)
+        if rule["sIpAddr"] != "ALL":
+            rule["sIpAddr"] = self.create_ipv4_ip_not_match(rule["sIpAddr"])
+        if rule["dIpAddr"] != "ALL":
+            rule["dIpAddr"] = self.create_ipv4_ip_not_match(rule["dIpAddr"])
+        if rule["sPort"] != "ALL":
+            rule["sPort"] = self.create_port_not_match(rule["sPort"])
+        if rule["dPort"] != "ALL":
+            rule["dPort"] = self.create_port_not_match(rule["dPort"])
+        if rule["Protocol"] != "ALL":
+            if "6" in rule["Protocol"]:
+                rule["Protocol"] = "0x11/0xff"
+            else:
+                rule["Protocol"] = "0x6/0xff"
+
+        ethernet_str = self.create_ipv4_rule_string(rule, "Ether")
+
+        self.tester.scapy_background()
+        self.tester.scapy_append('p = sniff(filter="ether src not 00:00:00:00:00:00 and ether dst not ff:ff:ff:ff:ff:ff and (tcp or udp)",iface="%s", count=1, timeout=5)' % rx_interface)
+        self.tester.scapy_append('RESULT = str(len(p))')
+        self.tester.scapy_foreground()
+
+        self.tester.scapy_append('sendp([%s],iface="%s")' % (ethernet_str, tx_interface))
+        self.tester.scapy_execute()
+        out = self.tester.scapy_get_result()
+        return out
+
+    def send_ipv6_packet_not_match(self, rule, tx_port, rx_port):
+        """
+        send a packet not match rule and return whether forwardeid
+        """
+        tx_interface = self.tester.get_interface(tx_port)
+        rx_interface = self.tester.get_interface(rx_port)
+        if rule["sIpAddr"] != "ALL":
+            rule["sIpAddr"] = self.create_ipv6_ip_not_match(rule["sIpAddr"])
+        if rule["dIpAddr"] != "ALL":
+            rule["dIpAddr"] = self.create_ipv6_ip_not_match(rule["dIpAddr"])
+        if rule["sPort"] != "ALL":
+            rule["sPort"] = self.create_port_not_match(rule["sPort"])
+        if rule["dPort"] != "ALL":
+            rule["dPort"] = self.create_port_not_match(rule["dPort"])
+        if rule["Protocol"] != "ALL":
+            if "6" in rule["Protocol"]:
+                rule["Protocol"] = "0x11/0xff"
+            else:
+                rule["Protocol"] = "0x6/0xff"
+
+        ethernet_str = self.create_ipv6_rule_string(rule, "Ether")
+
+        self.tester.scapy_background()
+        self.tester.scapy_append('p = sniff(filter="(tcp or udp)",iface="%s", count=1, timeout=10)' % rx_interface)
+        self.tester.scapy_append('RESULT = str(len(p))')
+        self.tester.scapy_foreground()
+
+        self.tester.scapy_append('sendp([%s],iface="%s")' % (ethernet_str, tx_interface))
+        self.tester.scapy_execute()
+        out = self.tester.scapy_get_result()
+        return out
+
+    def send_ipv4_packet_match(self, rule, tx_port, rx_port):
+        """
+        send a packet match rule and return whether forwarded
+        """
+        tx_interface = self.tester.get_interface(tx_port)
+        rx_interface = self.tester.get_interface(rx_port)
+        etherStr = self.create_ipv4_rule_string(rule, "Ether")
+
+        self.tester.scapy_background()
+        self.tester.scapy_append('p = sniff(filter="ether src not 00:00:00:00:00:00 and ether dst not ff:ff:ff:ff:ff:ff and (tcp or udp)",iface="%s", count=1, timeout=5)' % rx_interface)
+        self.tester.scapy_append('RESULT = str(len(p))')
+        self.tester.scapy_foreground()
+
+        self.tester.scapy_append('sendp([%s],iface="%s")' % (etherStr, tx_interface))
+        self.tester.scapy_execute()
+        out = self.tester.scapy_get_result()
+
+        return out
+
+    def send_ipv6_packet_match(self, rule, tx_port, rx_port):
+        """
+        send a packet match rule and return whether forwarded
+        """
+        tx_interface = self.tester.get_interface(tx_port)
+        rx_interface = self.tester.get_interface(rx_port)
+        etherStr = self.create_ipv6_rule_string(rule, "Ether")
+
+        self.tester.scapy_background()
+        self.tester.scapy_append('p = sniff(filter="(tcp or udp)",iface="%s", count=1, timeout=10)' % rx_interface)
+        self.tester.scapy_append('RESULT = str(len(p))')
+        self.tester.scapy_foreground()
+
+        self.tester.scapy_append('sendp([%s],iface="%s")' % (etherStr, tx_interface))
+        self.tester.scapy_execute()
+        out = self.tester.scapy_get_result()
+
+        return out
+
+    def create_ipv4_rule_string(self, rule, rule_type):
+        """
+        generate related string from rule
+        """
+        acl_promt = ""
+        source_ip = ""
+        source_ip_addr = ""
+        destination_ip = ""
+        destination_ip_addr = ""
+        source_port = ""
+        source_port_num = ""
+        destination_port = ""
+        destination_port_numer = ""
+        protocol = ""
+        protocol_str = ""
+
+        if rule["Type"] == "ACL":
+            acl_promt = "@"
+        elif rule["Type"] == "ROUTE":
+            acl_promt = "R"
+        else:
+            acl_promt = rule["Type"]
+
+        if rule["sIpAddr"] == "ALL":
+            source_ip = TestL3fwdacl.all_ipv4_addresses
+            source_ip_addr = "200.10.0.1"
+        else:
+            source_ip = rule["sIpAddr"]
+            source_ip_addr = rule["sIpAddr"].split('/')[0]
+
+        if rule["dIpAddr"] == "ALL":
+            destination_ip = TestL3fwdacl.all_ipv4_addresses
+            destination_ip_addr = "100.10.0.1"
+        else:
+            destination_ip = rule["dIpAddr"]
+            destination_ip_addr = rule["dIpAddr"].split('/')[0]
+
+        if rule["sPort"] == "ALL":
+            source_port = TestL3fwdacl.all_ports
+            source_port_num = "11"
+        else:
+            source_port = rule["sPort"]
+            source_port_num = rule["sPort"].split(' ')[0]
+
+        if rule["dPort"] == "ALL":
+            destination_port = TestL3fwdacl.all_ports
+            destination_port_numer = "101"
+        else:
+            destination_port = rule["dPort"]
+            destination_port_numer = rule["dPort"].split(' ')[0]
+
+        if rule["Protocol"] == "ALL":
+            protocol = TestL3fwdacl.all_protocols
+            protocol_str = "UDP"
+        else:
+            protocol = rule["Protocol"]
+            if "6" in rule["Protocol"]:
+                protocol_str = "TCP"
+            else:
+                protocol_str = "UDP"
+
+        port = rule["Port"]
+
+        destination_mac = self.dut.get_mac_address(self.dut_ports[0])
+
+        rule_str = TestL3fwdacl.rule_format % (acl_promt, source_ip,
+                                               destination_ip,
+                                               source_port,
+                                               destination_port,
+                                               protocol,
+                                               port)
+
+        ether_str = 'Ether(dst="%s")/IP(src="%s",dst="%s")/%s(sport=%s,dport=%s)' % \
+                    (destination_mac, source_ip_addr, destination_ip_addr,
+                     protocol_str, source_port_num, destination_port_numer)
+
+        if rule_type == "DataBase":
+            return rule_str
+        elif rule_type == "Ether":
+            return ether_str
+
+    def create_ipv6_rule_string(self, rule, rule_type):
+        """
+        generate related string from rule
+        """
+        acl_promt = ""
+        source_ip = ""
+        source_ip_addr = ""
+        destination_ip = ""
+        destination_ip_addr = ""
+        source_port = ""
+        source_port_num = ""
+        destination_port = ""
+        destination_port_numer = ""
+        protocol = ""
+        protocol_str = ""
+
+        if rule["Type"] == "ACL":
+            acl_promt = "@"
+        elif rule["Type"] == "ROUTE":
+            acl_promt = "R"
+        else:
+            acl_promt = rule["Type"]
+
+        if rule["sIpAddr"] == "ALL":
+            source_ip = TestL3fwdacl.all_ipv6_addresses
+            source_ip_addr = "200:0:0:0:0:0:0:1"
+        else:
+            source_ip = rule["sIpAddr"]
+            source_ip_addr = rule["sIpAddr"].split('/')[0]
+
+        if rule["dIpAddr"] == "ALL":
+            destination_ip = TestL3fwdacl.all_ipv6_addresses
+            destination_ip_addr = "100:0:0:0:0:0:0:1"
+        else:
+            destination_ip = rule["dIpAddr"]
+            destination_ip_addr = rule["dIpAddr"].split('/')[0]
+
+        if rule["sPort"] == "ALL":
+            source_port = TestL3fwdacl.all_ports
+            source_port_num = "11"
+        else:
+            source_port = rule["sPort"]
+            source_port_num = rule["sPort"].split(' ')[0]
+
+        if rule["dPort"] == "ALL":
+            destination_port = TestL3fwdacl.all_ports
+            destination_port_numer = "101"
+        else:
+            destination_port = rule["dPort"]
+            destination_port_numer = rule["dPort"].split(' ')[0]
+
+        if rule["Protocol"] == "ALL":
+            protocol = TestL3fwdacl.all_protocols
+            protocol_str = "UDP"
+        else:
+            protocol = rule["Protocol"]
+            if "6" in rule["Protocol"]:
+                protocol_str = "TCP"
+            else:
+                protocol_str = "UDP"
+
+        port = rule["Port"]
+
+        destination_mac = self.dut.get_mac_address(self.dut_ports[0])
+
+        rule_str = TestL3fwdacl.rule_format % (acl_promt, source_ip,
+                                               destination_ip,
+                                               source_port,
+                                               destination_port,
+                                               protocol,
+                                               port)
+
+        ether_str = 'Ether(dst="%s")/IPv6(src="%s",dst="%s")/%s(sport=%s,dport=%s)' % \
+                    (destination_mac, source_ip_addr, destination_ip_addr,
+                     protocol_str, source_port_num, destination_port_numer)
+
+        if rule_type == "DataBase":
+            return rule_str
+        elif rule_type == "Ether":
+            return ether_str
+
+    def create_acl_ipv4_db(self, rule_list):
+        """
+        create rule.db from rule_list
+        """
+
+        self.dut.send_expect("echo '' > %s" % TestL3fwdacl.acl_ipv4_db, "# ")
+        for rule in rule_list:
+            rule_str = self.create_ipv4_rule_string(rule, rule_type="DataBase")
+            self.dut.send_expect("echo %s >> %s" % (rule_str,
+                                                    TestL3fwdacl.acl_ipv4_db), "# ")
+
+        return
+
+    def create_acl_ipv6_db(self, rule_list):
+        """
+        create rule.db from rule_list
+        """
+
+        self.dut.send_expect("echo '' > %s" % TestL3fwdacl.acl_ipv6_db, "# ")
+        for rule in rule_list:
+            rule_str = self.create_ipv6_rule_string(rule, rule_type="DataBase")
+            self.dut.send_expect("echo %s >> %s" % (rule_str,
+                                                    TestL3fwdacl.acl_ipv6_db), "# ")
+
+        return
+
+    def basic_acl_ipv4_test(self, acl_rule):
+        """
+        Bbasic test for l3fwal-acl
+        """
+        rule_list = []
+        rule_list.append(acl_rule)
+        rule_list.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv4_db(rule_list)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv4_packet_match(acl_rule, tx_port, rx_port)
+        out2 = self.send_ipv4_packet_not_match(acl_rule, tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+        self.verify('1' not in out1, "Rx port receive unexpected packet")
+        self.verify('1' in out2, "Rx port not receive expected packet")
+
+    def basic_acl_ipv6_test(self, acl_rule):
+        """
+        Basic test for l3fwd-acl with ipv6 packets
+        """
+        rule_list = []
+        rule_list.append(acl_rule)
+        rule_list.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv6_db(rule_list)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv6_packet_match(acl_rule, tx_port, rx_port)
+        out2 = self.send_ipv6_packet_not_match(acl_rule, tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+        self.verify('1' not in out1, "Rx port receive unexpected packet")
+        self.verify('1' in out2, "Rx port not receive expected packet")
+
+    def invalid_acl_ipv4_test(self, acl_rule):
+        """
+        Basic test for l3fwal-acl with invalid rule
+        """
+
+        rule_list = []
+        rule_list.append(acl_rule)
+        rule_list.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv4_db(rule_list)
+
+        cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s --config="(%d,0,2),(%d,0,3)" --rule_ipv4="%s" --rule_ipv6="%s"' % \
+                  (TestL3fwdacl.exe_path, self.core_mask, self.dut.get_memory_channels(),
+                   self.port_mask, self.dut_ports[0], self.dut_ports[1],
+                   TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        out = self.dut.send_expect(cmdline, "# ", 30)
+        self.verify("rules error" in out, "l3fwd not detect invalid rule")
+        self.dut.send_expect("^C", "#", 5)
+
+    def invalid_acl_ipv6_test(self, acl_rule):
+        """
+        Basic test for l3fwal-acl with invalid rule
+        """
+
+        rule_list = []
+        rule_list.append(acl_rule)
+        rule_list.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv6_db(rule_list)
+
+        cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s --config="(%d,0,2),(%d,0,3)" --rule_ipv4="%s" --rule_ipv6="%s"' % \
+                  (TestL3fwdacl.exe_path, self.core_mask, self.dut.get_memory_channels(),
+                   self.port_mask, self.dut_ports[0], self.dut_ports[1],
+                   TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        out = self.dut.send_expect(cmdline, "# ", 30)
+        self.verify("rules error" in out, "l3fwd not detect invalid rule")
+        self.dut.send_expect("^C", "#", 5)
+
+    def performance_acl_test(self, case, core=""):
+        """
+        Performance test for l3fwal-acl
+        """
+
+        tgenInput = []
+
+        if(core == "2S/1C/1T"):
+            txItf = self.tester.get_local_port(self.sock0ports[0])
+            rxItf = self.tester.get_local_port(self.sock1ports[0])
+            tgenInput.append((txItf, rxItf, case["Pcap0"]))
+            txItf = self.tester.get_local_port(self.sock1ports[0])
+            rxItf = self.tester.get_local_port(self.sock0ports[0])
+            tgenInput.append((txItf, rxItf, case["Pcap2"]))
+        else:
+            txItf = self.tester.get_local_port(self.dut_ports[0])
+            rxItf = self.tester.get_local_port(self.dut_ports[1])
+            tgenInput.append((txItf, rxItf, case["Pcap0"]))
+            txItf = self.tester.get_local_port(self.dut_ports[1])
+            rxItf = self.tester.get_local_port(self.dut_ports[0])
+            tgenInput.append((txItf, rxItf, case["Pcap2"]))
+
+        bps, pps = self.tester.traffic_generator_throughput(tgenInput)
+        return (bps, pps)
+
+    def store_performance_result(self, case, core, bps, pps):
+        performance_result = {}
+        performance_result["RuleDB"] = case["DB"]
+        performance_result["Core/Thread"] = core
+        performance_result["Gb/s"] = (bps * 1.0 / 1000000000)
+        performance_result["Mpps"] = (pps * 1.0 / 1000000)
+
+        TestL3fwdacl.performance_results.append(performance_result)
+
+    #
+    #
+    # Test cases.
+    #
+
+    def set_up_all(self):
+        """
+        Run at the start of each test suite.
+
+        l3fwd Acl Prerequisites
+        """
+
+        # Based on h/w type, choose how many dut_ports to use
+        ports = self.dut.get_ports(self.nic)
+
+        # Verify that enough dut_ports are available
+        self.verify(len(ports) >= 2, "Insufficient dut_ports for speed testing")
+
+        # Verify that enough threads are available
+        cores = self.dut.get_core_list("2S/4C/2T")
+        self.verify(cores is not None, "Insufficient cores for speed testing")
+
+        self.core_mask = dts.create_mask(cores)
+        print "Core mask: %s" % self.core_mask
+
+        valid_ports = [port for port in ports if self.tester.get_local_port(port) != -1]
+        self.verify(
+            len(valid_ports) >= 2, "Insufficient active dut_ports for speed testing")
+
+        self.dut_ports = valid_ports
+        print "Valid ports found in DUT: %s" % self.dut_ports
+
+        self.port_mask = dts.create_mask([self.dut_ports[0], self.dut_ports[1]])
+        print "Port mask: %s" % self.port_mask
+
+        TestL3fwdacl.default_rule["Port"] = self.dut_ports[1]
+
+        # compile l3fwd-acl
+        out = self.dut.send_expect("make -C examples/l3fwd-acl", "# ")
+        self.verify("Error" not in out, "compilation error 1")
+        self.verify("No such file" not in out, "compilation error 2")
+
+        return "SUCCESS"
+
+    def test_l3fwdacl_acl_rule(self):
+        """
+        l3fwd Access Control match ACL rule test
+        """
+
+        self.rule_cfg_init(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        for acl_rule in TestL3fwdacl.acl_ipv4_rule_list:
+            self.basic_acl_ipv4_test(acl_rule)
+
+        for acl_rule in TestL3fwdacl.acl_ipv6_rule_list:
+            self.basic_acl_ipv6_test(acl_rule)
+
+        self.rule_cfg_delete(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        return "SUCCESS"
+
+    def test_l3fwdacl_exact_route(self):
+        """
+        l3fwd Access Control match Exact route rule test
+        """
+
+        self.rule_cfg_init(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        rule_list_ipv4 = []
+
+        TestL3fwdacl.exact_rule_list_ipv4[0]["Port"] = self.dut_ports[0]
+        TestL3fwdacl.exact_rule_list_ipv4[1]["Port"] = self.dut_ports[1]
+
+        rule_list_ipv4.append(TestL3fwdacl.exact_rule_list_ipv4[0])
+        rule_list_ipv4.append(TestL3fwdacl.exact_rule_list_ipv4[1])
+        self.create_acl_ipv4_db(rule_list_ipv4)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv4_packet_match(TestL3fwdacl.exact_rule_list_ipv4[0], tx_port, tx_port)
+        out2 = self.send_ipv4_packet_match(TestL3fwdacl.exact_rule_list_ipv4[1], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' in out1, "Rx port0 not receive expected packet")
+        self.verify('1' in out2, "Rx port1 not receive expected packet")
+
+        rule_list_ipv6 = []
+
+        TestL3fwdacl.exact_rule_list_ipv6[0]["Port"] = self.dut_ports[0]
+        TestL3fwdacl.exact_rule_list_ipv6[1]["Port"] = self.dut_ports[1]
+
+        rule_list_ipv6.append(TestL3fwdacl.exact_rule_list_ipv6[0])
+        rule_list_ipv6.append(TestL3fwdacl.exact_rule_list_ipv6[1])
+        self.create_acl_ipv6_db(rule_list_ipv6)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv6_packet_match(TestL3fwdacl.exact_rule_list_ipv6[0], tx_port, tx_port)
+        out2 = self.send_ipv6_packet_match(TestL3fwdacl.exact_rule_list_ipv6[1], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' in out1, "Rx port0 not receive expected packet")
+        self.verify('1' in out2, "Rx port1 not receive expected packet")
+
+        self.rule_cfg_delete(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        return "SUCCESS"
+
+    def test_l3fwdacl_lpm_route(self):
+        """
+        l3fwd Access Control match Lpm route rule test
+        """
+
+        self.rule_cfg_init(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        rule_list_ipv4 = []
+
+        TestL3fwdacl.lpm_rule_list_ipv4[0]["Port"] = self.dut_ports[0]
+        TestL3fwdacl.lpm_rule_list_ipv4[1]["Port"] = self.dut_ports[1]
+
+        rule_list_ipv4.append(TestL3fwdacl.lpm_rule_list_ipv4[0])
+        rule_list_ipv4.append(TestL3fwdacl.lpm_rule_list_ipv4[1])
+        self.create_acl_ipv4_db(rule_list_ipv4)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv4_packet_match(TestL3fwdacl.lpm_rule_list_ipv4[0], tx_port, tx_port)
+        out2 = self.send_ipv4_packet_match(TestL3fwdacl.lpm_rule_list_ipv4[1], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' in out1, "Rx port0 not receive expected packet")
+        self.verify('1' in out2, "Rx port1 not receive expected packet")
+
+        rule_list_ipv6 = []
+
+        TestL3fwdacl.lpm_rule_list_ipv6[0]["Port"] = self.dut_ports[0]
+        TestL3fwdacl.lpm_rule_list_ipv6[1]["Port"] = self.dut_ports[1]
+
+        rule_list_ipv6.append(TestL3fwdacl.lpm_rule_list_ipv6[0])
+        rule_list_ipv6.append(TestL3fwdacl.lpm_rule_list_ipv6[1])
+        self.create_acl_ipv6_db(rule_list_ipv6)
+
+        self.start_l3fwdacl()
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv6_packet_match(TestL3fwdacl.lpm_rule_list_ipv6[0], tx_port, tx_port)
+        out2 = self.send_ipv6_packet_match(TestL3fwdacl.lpm_rule_list_ipv6[1], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' in out1, "Rx port0 not receive expected packet")
+        self.verify('1' in out2, "Rx port1 not receive expected packet")
+
+        self.rule_cfg_delete(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        return "SUCCESS"
+
+    def test_l3fwdacl_scalar(self):
+        """
+        l3fwd Access Control match with Scalar function test
+        """
+
+        self.rule_cfg_init(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        rule_list_ipv4 = []
+        rule_list_ipv4.append(TestL3fwdacl.scalar_rule_list_ipv4[0])
+        rule_list_ipv4.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv4_db(rule_list_ipv4)
+
+        self.start_l3fwdacl(scalar=True)
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv4_packet_match(TestL3fwdacl.scalar_rule_list_ipv4[0], tx_port, rx_port)
+        out2 = self.send_ipv4_packet_not_match(TestL3fwdacl.scalar_rule_list_ipv4[0], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' not in out1, "Rx port received unexpected packet")
+        self.verify('1' in out2, "Rx port not receive expected packet")
+
+        rule_list_ipv6 = []
+        rule_list_ipv6.append(TestL3fwdacl.scalar_rule_list_ipv6[0])
+        rule_list_ipv6.append(TestL3fwdacl.default_rule)
+        self.create_acl_ipv6_db(rule_list_ipv6)
+
+        self.start_l3fwdacl(scalar=True)
+
+        tx_port = self.tester.get_local_port(self.dut_ports[0])
+        rx_port = self.tester.get_local_port(self.dut_ports[1])
+
+        out1 = self.send_ipv6_packet_match(TestL3fwdacl.scalar_rule_list_ipv6[0], tx_port, rx_port)
+        out2 = self.send_ipv6_packet_not_match(TestL3fwdacl.scalar_rule_list_ipv6[0], tx_port, rx_port)
+
+        self.dut.send_expect("^C", "#", 20)
+
+        self.verify('1' not in out1, "Rx port received unexpected packet")
+        self.verify('1' in out2, "Rx port not receive expected packet")
+
+        self.rule_cfg_delete(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        return "SUCCESS"
+
+    def test_l3fwdacl_invalid(self):
+        """
+        l3fwd Access Control handle Invalid rule test
+        """
+
+        self.rule_cfg_init(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        for acl_rule in TestL3fwdacl.invalid_rule_ipv4_list:
+            self.invalid_acl_ipv4_test(acl_rule)
+
+        for acl_rule in TestL3fwdacl.invalid_rule_ipv6_list:
+            self.invalid_acl_ipv6_test(acl_rule)
+
+        rule_list_ipv4 = []
+        rule_list_ipv4.append(TestL3fwdacl.invalid_port_rule_ipv4)
+        self.create_acl_ipv4_db(rule_list_ipv4)
+
+        cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s --config="(%d,0,2),(%d,0,3)" --rule_ipv4="%s" --rule_ipv6="%s" --scalar' % \
+                  (TestL3fwdacl.exe_path, self.core_mask, self.dut.get_memory_channels(),
+                   self.port_mask, self.dut_ports[0], self.dut_ports[1],
+                   TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        out = self.dut.send_expect(cmdline, "# ", 30)
+        self.verify("fwd number illegal" in out, "l3fwd not detect invalid port")
+
+        rule_list_ipv6 = []
+        rule_list_ipv6.append(TestL3fwdacl.invalid_port_rule_ipv6)
+        self.create_acl_ipv6_db(rule_list_ipv6)
+
+        cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s --config="(%d,0,2),(%d,0,3)" --rule_ipv4="%s" --rule_ipv6="%s" --scalar' % \
+                  (TestL3fwdacl.exe_path, self.core_mask, self.dut.get_memory_channels(),
+                   self.port_mask, self.dut_ports[0], self.dut_ports[1],
+                   TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        out = self.dut.send_expect(cmdline, "# ", 30)
+        self.verify("fwd number illegal" in out, "l3fwd not detect invalid port")
+
+        self.rule_cfg_delete(TestL3fwdacl.acl_ipv4_db, TestL3fwdacl.acl_ipv6_db)
+
+        return "SUCCESS"
+
+    def test_perf_l3fwdacl(self):
+        """
+        l3fwd Access Control performance test
+        """
+
+        self.rule_cfg_init(None, TestL3fwdacl.acl_ipv6_db)
+        self.get_core_list()
+        for case in TestL3fwdacl.performance_cases:
+            for core_config in TestL3fwdacl.core_list_configs.keys():
+                if core_config == "2S/1C/1T":
+                    portmask = self.port_mask_2_sockets
+                else:
+                    portmask = self.port_mask_1_socket
+
+                self.change_ports_in_rules_files(core_config)
+
+                cmdline = '%s/l3fwd-acl -c %s -n %d -- -p %s -P --config=%s --rule_ipv4="%s" --rule_ipv6="%s"' \
+                    % (TestL3fwdacl.exe_path,
+                       TestL3fwdacl.core_list_configs[core_config]["mask"],
+                       self.dut.get_memory_channels(),
+                       portmask,
+                       TestL3fwdacl.core_list_configs[core_config]["config"],
+                       case["DB"],
+                       TestL3fwdacl.acl_ipv6_db)
+
+                out = self.dut.send_expect(cmdline, "L3FWD:", 120)
+                self.verify("error" not in out, "l3fwd launch fail")
+
+                bps, pps = self.performance_acl_test(case, core_config)
+                self.store_performance_result(case, core_config, bps, pps)
+
+                self.dut.send_expect("^C", "#", 20)
+
+        dts.results_table_add_header(['RuleDB', 'Core/Thread', 'Gb/s', 'Mpps'])
+        for report in TestL3fwdacl.performance_results:
+            row = [report['RuleDB'], report['Core/Thread'], report['Gb/s'], report['Mpps']]
+            dts.results_table_add_row(row)
+
+        dts.results_table_print()
+        return "SUCCESS"
-- 
1.7.4.4



More information about the dts mailing list