[dts] [PATCH V1] add test suite link_status_interrupt

Lijuan Tu lijuanx.a.tu at intel.com
Fri Feb 10 07:50:37 CET 2017


Signed-off-by: Lijuan Tu <lijuanx.a.tu at intel.com>
---
 tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
 1 file changed, 132 insertions(+), 156 deletions(-)

diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
index c50a99d..ae205cc 100644
--- a/tests/TestSuite_link_status_interrupt.py
+++ b/tests/TestSuite_link_status_interrupt.py
@@ -1,87 +1,77 @@
-# <COPYRIGHT_TAG>
+# BSD LICENSE
+#
+# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+#   * Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+#   * Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in
+#     the documentation and/or other materials provided with the
+#     distribution.
+#   * Neither the name of Intel Corporation nor the names of its
+#     contributors may be used to endorse or promote products derived
+#     from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
 
 """
 DPDK Test suite.
-
-Link Status Detection
-
+Test link status.
 """
 
-# NOTE: These tests generally won't work in automated mode since the
-# link doesn't stay down unless the cable is actually removed. The code
-# is left here for legacy reasons.
-
-
 import utils
+import string
+import time
 import re
-
-testPorts = []
-intr_mode = ['"intr_mode=random"',
-             '"intr_mode=msix"', '"intr_mode=legacy"', '']
-intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
-                    'Use legacy interrupt', 'Use MSIX interrupt by default']
-
-
 from test_case import TestCase
-
-#
-#
-# Test class.
-#
+from packet import Packet, sniff_packets, load_sniff_packets
 
 
 class TestLinkStatusInterrupt(TestCase):
 
-    #
-    #
-    #
-    # Test cases.
-    #
-
     def set_up_all(self):
         """
         Run at the start of each test suite.
-
-
-        Link Status Interrupt Prerequisites
         """
+        self.dut_ports = self.dut.get_ports(self.nic)
+        self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
+        cores = self.dut.get_core_list("1S/4C/1T")
+        self.coremask = utils.create_mask(cores)
+        self.portmask = utils.create_mask(self.dut_ports)
+
+        self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
 
-        dutPorts = self.dut.get_ports(self.nic)
-        self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
-        for n in range(2):
-            testPorts.append(dutPorts[n])
-            inter = self.tester.get_interface(
-                self.tester.get_local_port(testPorts[n]))
-            self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
-        out = self.dut.send_expect(
-            "make -C examples/link_status_interrupt", "# ")
-        self.verify("Error" not in out, "Compilation error 1")
-        self.verify("No such file" not in out, "Compilation error 2")
+        # build sample app
+        out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
+        self.verify("Error" not in out, "compilation error 1")
+        self.verify("No such file" not in out, "compilation error 2")
 
     def set_link_status_and_verify(self, dutPort, status):
         """
-        In registered callback...
-        Event type: LSC interrupt
-        Port 0 Link Up - speed 10000 Mbps - full-duplex
-
-        In registered callback...
-        Event type: LSC interrupt
-        Port 0 Link Down
+        set link status verify results
         """
-
-        inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
-        self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
-        self.dut.send_expect("", "Port %s Link %s" %
-                             (dutPort, status.capitalize()), 60)
-        out = self.dut.send_expect("", "Aggregate statistics", 60)
-        exp = r"Statistics for port (\d+) -+\r\n" + \
-            "Link status:\s+Link (up|down)\r\n"
-        pattern = re.compile(exp)
-        info = pattern.findall(out)
-        if info[0][0] == repr(dutPort):
-            self.verify(info[0][1] == status, "Link status change port error")
-        else:
-            self.verify(info[1][1] == status, "Link status change hello error")
+        self.intf = self.tester.get_interface(
+            self.tester.get_local_port(dutPort))
+        self.tester.send_expect("ifconfig %s %s" %
+                                (self.intf, status.lower()), "# ", 10)
+        verify_point = "Port %s Link %s" % (dutPort, status)
+        self.dut.send_expect("", verify_point, 60)
 
     def set_up(self):
         """
@@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
 
     def test_link_status_interrupt_change(self):
         """
-        Link status change.
+        Verify Link status change  
         """
-
-        memChannel = self.dut.get_memory_channels()
-        portMask = utils.create_mask(testPorts)
-        if self.drivername in ["igb_uio"]:
-            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
-                memChannel, portMask)
-        elif self.drivername in ["vfio-pci"]:
-            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx  -- -q 2 -p %s" % (
-                memChannel, portMask)
-        else:
-            print "unknow driver"
-        for n in range(len(intr_mode)):
-            if self.drivername in ["igb_uio"]:
-                self.dut.send_expect("rmmod igb_uio", "# ")
+        if self.drivername == "igb_uio":
+            cmdline = self.path + " -c %s -n %s -- -p %s " % (
+                self.coremask, self.dut.get_memory_channels(), self.portmask)
+            for mode in ["legacy", "msix"]:
+                self.dut.send_expect("rmmod -f igb_uio", "#", 20)
                 self.dut.send_expect(
-                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
-                out = self.dut.send_expect(
-                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
-                self.verify(
-                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
-                if n == 0:
-                    continue
-            self.dut.send_expect(cmdline, "Aggregate statistics", 605)
-            self.dut.send_expect(
-                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
-            self.set_link_status_and_verify(testPorts[0], 'down')
-            self.set_link_status_and_verify(testPorts[0], 'up')
-            self.set_link_status_and_verify(testPorts[1], 'down')
-            self.set_link_status_and_verify(testPorts[1], 'up')
-            self.dut.send_expect("^C", "# ")
+                    'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+                self.dut.bind_interfaces_linux()
+                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+                self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+                self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+                self.dut.send_expect("^C", "#", 60)
+        elif self.drivername == "vfio-pci":
+            for mode in ["legacy", "msi", "msix"]:
+                cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+                    self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+                self.set_link_status_and_verify(self.dut_ports[0], 'Down')
+                self.set_link_status_and_verify(self.dut_ports[0], 'Up')
+                self.dut.send_expect("^C", "#", 60)
 
     def test_link_status_interrupt_port_available(self):
         """
-        Port available.
+        interrupt all port link, then link them,
+        sent packet, check packets received.
         """
-
-        memChannel = self.dut.get_memory_channels()
-        portMask = utils.create_mask(testPorts)
-        if self.drivername in ["igb_uio"]:
-            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
-                memChannel, portMask)
-        elif self.drivername in ["vfio-pci"]:
-            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
-                memChannel, portMask)
-        else:
-            print "unknow driver"
-        for n in range(1, len(intr_mode)):
-            if self.drivername in ["igb_uio"]:
-                self.dut.send_expect("rmmod igb_uio", "# ")
+        if self.drivername == "igb_uio":
+            cmdline = self.path + " -c %s -n %s -- -p %s " % (
+                self.coremask, self.dut.get_memory_channels(), self.portmask)
+            for mode in ["legacy", "msix"]:
+                self.dut.send_expect("rmmod -f igb_uio", "#", 20)
                 self.dut.send_expect(
-                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
-                out = self.dut.send_expect(
-                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
-                self.verify(
-                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
-            self.dut.send_expect(cmdline, "Aggregate statistics", 60)
-            self.dut.send_expect(
-                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
-            self.set_link_status_and_verify(testPorts[0], 'down')
-            self.set_link_status_and_verify(testPorts[1], 'down')
-            self.set_link_status_and_verify(testPorts[0], 'up')
-            self.set_link_status_and_verify(testPorts[1], 'up')
-            for m in [0, 1]:
-                txPort = self.tester.get_local_port(testPorts[m])
-                rxPort = self.tester.get_local_port(testPorts[1 - m])
-                txItf = self.tester.get_interface(txPort)
-                rxItf = self.tester.get_interface(rxPort)
-                self.tester.scapy_background()
-                self.tester.scapy_append(
-                    'p = sniff(iface="%s", count=1)' % rxItf)
-                self.tester.scapy_append('nr_packets=len(p)')
-                self.tester.scapy_append('RESULT = str(nr_packets)')
-                self.tester.scapy_foreground()
-                self.tester.scapy_append(
-                    'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
-                self.tester.scapy_execute()
-                nr_packets = self.tester.scapy_get_result()
-                self.verify(nr_packets == "1", "Fail to switch L2 frame")
-            self.dut.send_expect("^C", "# ")
-
-    def test_link_status_interrupt_recovery(self):
-        """
-        Recovery.
-        """
-        if self.drivername in ["igb_uio"]:
-            self.dut.send_expect("^C", "# ")
-            self.dut.send_expect("rmmod igb_uio", "# ")
-            self.dut.send_expect(
-                "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
-            out = self.dut.send_expect(
-                "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
-            self.verify(
-                'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
-        elif self.drivername in ["vfio-pci"]:
-            self.verify(Ture, "not need run this case, when used vfio driver")
-        else:
-            print "unknow driver"
+                    'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
+                self.dut.bind_interfaces_linux()
+                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+                for port in self.dut_ports:
+                    self.set_link_status_and_verify(
+                        self.dut_ports[port], 'Down')
+                time.sleep(10)
+                for port in self.dut_ports:
+                    self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+                self.scapy_send_packet(1)
+                out = self.dut.get_session_output(timeout=60)
+                pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+                pkt_received = re.findall(
+                    "Total packets received:\s*(\d*)", out)
+                self.verify(pkt_send == pkt_received == '1',
+                            "Error: sent packets != received packets")
+                self.dut.send_expect("^C", "#", 60)
+        elif self.drivername == "vfio-pci":
+            for mode in ["legacy", "msi", "msix"]:
+                cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
+                    self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
+                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
+                for port in self.dut_ports:
+                    self.set_link_status_and_verify(
+                        self.dut_ports[port], 'Down')
+                time.sleep(10)
+                for port in self.dut_ports:
+                    self.set_link_status_and_verify(self.dut_ports[port], 'Up')
+                self.scapy_send_packet(1)
+                out = self.dut.get_session_output(timeout=60)
+                pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
+                pkt_received = re.findall(
+                    "Total packets received:\s*(\d*)", out)
+                self.verify(pkt_send == pkt_received == '1',
+                            "Error: sent packets != received packets")
+                self.dut.send_expect("^C", "#", 60)
+
+    def scapy_send_packet(self, num=1):
+        """
+        Send a packet to port
+        """
+        self.dmac = self.dut.get_mac_address(self.dut_ports[0])
+        txport = self.tester.get_local_port(self.dut_ports[0])
+        self.txItf = self.tester.get_interface(txport)
+        pkt = Packet(pkt_type='UDP')
+        pkt.config_layer('ether', {'dst': self.dmac})
+        pkt.send_pkt(tx_port=self.txItf, count=num)
 
     def tear_down(self):
         """
         Run after each test case.
         """
-        pass
+        self.dut.kill_all()
+        time.sleep(2)
+        for port in self.dut_ports:
+            intf = self.tester.get_interface(self.tester.get_local_port(port))
+            self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
 
     def tear_down_all(self):
         """
-- 
1.9.3



More information about the dts mailing list