[dts] [PATCH 2/2] vxlan_sample: check l3 and l4 checksum on tso test

Lijuan Tu lijuanx.a.tu at intel.com
Fri Sep 2 04:59:29 CEST 2016


1, fix some functions that framework move it from dts.py to utils.py
2, check l3 and l4 checksum on tso test

Signed-off-by: Lijuan Tu <lijuanx.a.tu at intel.com>
---
 tests/TestSuite_vxlan_sample.py | 54 ++++++++++++++++++++++++++++++-----------
 1 file changed, 40 insertions(+), 14 deletions(-)

diff --git a/tests/TestSuite_vxlan_sample.py b/tests/TestSuite_vxlan_sample.py
index 189bc38..60d4f2a 100644
--- a/tests/TestSuite_vxlan_sample.py
+++ b/tests/TestSuite_vxlan_sample.py
@@ -37,6 +37,7 @@ Vxlan sample test suite.
 
 import os
 import dts
+import utils
 import string
 import re
 import time
@@ -61,6 +62,7 @@ from scapy.config import conf
 from scapy.route import *
 
 
 PACKET_LEN = 128
 
 
@@ -220,7 +222,7 @@ class TestVxlanSample(TestCase):
             if self.vm_dut is None:
                 raise Exception("Set up VM ENV failed!")
         except Exception as e:
-            print dts.RED("Failure for %s" % str(e))
+            print utils.RED("Failure for %s" % str(e))
 
         # create another vm
         if vm_num == 2:
@@ -236,6 +238,7 @@ class TestVxlanSample(TestCase):
         if self.vm:
             self.vm.stop()
             self.vm = None
+            time.sleep(3)
 
     def mac_address_add(self, number):
         if number > 15:
@@ -302,6 +305,7 @@ class TestVxlanSample(TestCase):
         self.tester.session.copy_file_from(self.capture_file)
 
         if os.path.isfile('vxlan_cap.pcap'):
+            self.verify(os.path.getsize('vxlan_cap.pcap') != 0, "Packets receive error")
             pkts = rdpcap('vxlan_cap.pcap')
         else:
             pkts = []
@@ -312,12 +316,13 @@ class TestVxlanSample(TestCase):
         case_pass = True
         tester_recv_port = self.tester.get_local_port(self.pf)
         tester_iface = self.tester.get_interface(tester_recv_port)
+        tester_smac = self.tester.get_mac(tester_recv_port)
 
         if pkt_type == "normal_udp":
             self.start_capture(tester_iface, pkt_smac=self.pf_mac)
             self.tester.scapy_append(
-                'sendp([Ether(dst="%s")/IP()/UDP()/Raw("X"*18)], iface="%s")'
-                % (self.pf_mac, tester_iface))
+                'sendp([Ether(dst="%s",src="%s")/IP()/UDP()/Raw("X"*18)], iface="%s")'
+                % (self.pf_mac, tester_smac, tester_iface))
             self.tester.scapy_execute()
             time.sleep(5)
 
@@ -331,10 +336,10 @@ class TestVxlanSample(TestCase):
                     self.verify(ord(payload[i]) == 88, "Check udp data failed")
             except:
                 case_pass = False
-                print dts.RED("Failure in checking packet payload")
+                print utils.RED("Failure in checking packet payload")
 
             if case_pass:
-                print dts.GREEN("Check normal udp packet forward pass on "
+                print utils.GREEN("Check normal udp packet forward pass on "
                                 "virtIO port %d" % vf_id)
 
         if pkt_type == "vxlan_udp_decap":
@@ -366,10 +371,10 @@ class TestVxlanSample(TestCase):
                     self.verify(ord(payload[i]) == 88, "Check udp data failed")
             except:
                 case_pass = False
-                print dts.RED("Failure in checking packet payload")
+                print utils.RED("Failure in checking packet payload")
 
             if case_pass:
-                print dts.GREEN("Check vxlan packet decap pass on virtIO port"
+                print utils.GREEN("Check vxlan packet decap pass on virtIO port"
                                 " %d" % vf_id)
 
         if pkt_type == "vxlan_udp":
@@ -390,6 +395,7 @@ class TestVxlanSample(TestCase):
 
             # transfer capture pcap to dts server
             pkts = self.transfer_capture_file()
+
             # check packet number and payload
             self.verify(len(pkts) >= 1, "Failed to capture packets")
             self.verify(pkts[0].haslayer(Vxlan) == 1,
@@ -400,10 +406,10 @@ class TestVxlanSample(TestCase):
                     self.verify(ord(payload[i]) == 88, "Check udp data failed")
             except:
                 case_pass = False
-                print dts.RED("Failure in checking packet payload")
+                print utils.RED("Failure in checking packet payload")
 
             if case_pass:
-                print dts.GREEN("Check vxlan packet decap and encap pass on "
+                print utils.GREEN("Check vxlan packet decap and encap pass on "
                                 "virtIO port %d" % vf_id)
 
         if pkt_type == "vxlan_udp_chksum":
@@ -423,7 +429,7 @@ class TestVxlanSample(TestCase):
             vxlan_pkt = VxlanTestConfig(self, **params)
             vxlan_pkt.create_pcap()
             chksums_ref = vxlan_pkt.get_chksums()
-            print dts.GREEN("Checksum reference: %s" % chksums_ref)
+            print utils.GREEN("Checksum reference: %s" % chksums_ref)
 
             params['inner_ip_invalid'] = 1
             params['inner_l4_invalid'] = 1
@@ -443,14 +449,14 @@ class TestVxlanSample(TestCase):
             self.verify(pkts[0].haslayer(Vxlan) == 1,
                         "Packet not encapsulated")
             chksums = vxlan_pkt.get_chksums(pcap='vxlan_cap.pcap')
-            print dts.GREEN("Checksum : %s" % chksums)
+            print utils.GREEN("Checksum : %s" % chksums)
             for key in chksums_ref:
                 if 'inner' in key:  # only check inner packet chksum
                     self.verify(chksums[key] == chksums_ref[key],
                                 "%s not matched to %s"
                                 % (key, chksums_ref[key]))
 
-            print dts.GREEN("%s checksum pass" % params['inner_l4_type'])
+            print utils.GREEN("%s checksum pass" % params['inner_l4_type'])
 
         if pkt_type == "vxlan_tcp_tso":
             # create vxlan packet pf mac + vni=1000 + inner virtIO port0 mac +
@@ -461,6 +467,7 @@ class TestVxlanSample(TestCase):
             mac_incr = 2 * vm_id + vf_id
             params['inner_mac_dst'] = self.mac_address_add(mac_incr)
             params['payload_size'] = 892  # 256 + 256 + 256 + 124
+
             # extract reference chksum value
             vxlan_pkt = VxlanTestConfig(self, **params)
             vxlan_pkt.create_pcap()
@@ -474,6 +481,25 @@ class TestVxlanSample(TestCase):
             pkts = self.transfer_capture_file()
             # check packet number and payload
             self.verify(len(pkts) == 4, "Failed to capture tso packets")
+
+            # calculation  checksum, and checkt it
+            for pkt in pkts:
+                inner = pkt[Vxlan]
+                inner_ip_chksum = inner[IP].chksum
+                del inner.chksum
+                inner[IP] = inner[IP].__class__(str(inner[IP]))
+                inner_ip_chksum_ref = inner[IP].chksum
+                print utils.GREEN("inner ip checksum reference: %x" % inner_ip_chksum_ref)
+                print utils.GREEN("inner ip checksum: %x" % inner_ip_chksum)
+                self.verify(inner_ip_chksum == inner_ip_chksum_ref, "inner ip checksum error")
+                inner_l4_chksum = inner[params['inner_l4_type']].chksum
+                del inner[params['inner_l4_type']].chksum
+                inner[params['inner_l4_type']] = inner[params['inner_l4_type']].__class__(str(inner[params['inner_l4_type']]))
+                inner_l4_chksum_ref =  inner[params['inner_l4_type']].chksum
+                print utils.GREEN("inner l4 checksum reference: %x" % inner_l4_chksum_ref)
+                print utils.GREEN("inner l4 checksum: %x" % inner_l4_chksum)
+                self.verify(inner_l4_chksum == inner_l4_chksum_ref, "inner %s checksum error" % params['inner_l4_type'])
+
             length = 0
             for pkt in pkts:
                 self.verify(pkt.haslayer(Vxlan) == 1,
@@ -485,11 +511,11 @@ class TestVxlanSample(TestCase):
                     length += len(payload)
                 except:
                     case_pass = False
-                    print dts.RED("Failure in checking tso payload")
+                    print utils.RED("Failure in checking tso payload")
 
             self.verify(length == 892, "Total tcp payload size not match")
             if case_pass:
-                print dts.GREEN("Vxlan packet tso pass on virtIO port %d"
+                print utils.GREEN("Vxlan packet tso pass on virtIO port %d"
                                 % vf_id)
 
     def test_perf_vxlan_sample(self):
-- 
1.9.3



More information about the dts mailing list