[PATCH v1 2/2] dts: add qinq test suite

Dean Marx dmarx at iol.unh.edu
Thu Jul 17 22:57:18 CEST 2025


Add QinQ test suite, which verifies PMD behavior when
sending QinQ (IEEE 802.1ad) packets.

Signed-off-by: Dean Marx <dmarx at iol.unh.edu>
---
 dts/tests/TestSuite_qinq.py | 329 ++++++++++++++++++++++++++++++++++++
 1 file changed, 329 insertions(+)
 create mode 100644 dts/tests/TestSuite_qinq.py

diff --git a/dts/tests/TestSuite_qinq.py b/dts/tests/TestSuite_qinq.py
new file mode 100644
index 0000000000..7412e9bf04
--- /dev/null
+++ b/dts/tests/TestSuite_qinq.py
@@ -0,0 +1,329 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 University of New Hampshire
+
+"""QinQ (802.1ad) Test Suite.
+
+This test suite verifies the correctness and capability of DPDK Poll Mode Drivers (PMDs)
+in handling QinQ-tagged Ethernet frames, which contain a pair of stacked VLAN headers
+(outer S-VLAN and inner C-VLAN). These tests ensure that both software and hardware offloads
+related to QinQ behave as expected across different NIC vendors and PMD implementations.
+
+"""
+
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Dot1AD, Dot1Q, Ether
+from scapy.packet import Packet
+
+from framework.remote_session.testpmd_shell import TestPmdShell
+from framework.test_suite import TestSuite, func_test
+from framework.testbed_model.capability import NicCapability, requires
+
+
+class TestQinq(TestSuite):
+    """QinQ test suite.
+
+    This suite consists of 4 test cases:
+    1. QinQ Filter: Enable VLAN filter and verify packets with mismatched VLAN IDs are dropped,
+        and packets with matching VLAN IDs are received.
+    2. QinQ Forwarding: Send a QinQ packet and verify the received packet contains
+        both QinQ/VLAN layers.
+    3. Mismatched TPID: Send a Qinq packet with an invalid TPID (not 0x8100/0x88a8) and verify the
+        mismatched VLAN layer is interpreted as part of the Ethertype (expected DPDK behavior.)
+    4. VLAN Strip: Enable VLAN stripping and verify sent packets are received with the
+        expected VLAN/QinQ layers.
+    5. QinQ Strip: Enable VLAN/QinQ stripping and verify sent packets are received with the
+        expected VLAN/QinQ layers.
+    """
+
+    def send_packet_and_verify(
+        self, packet: Packet, testpmd: TestPmdShell, should_receive: bool
+    ) -> None:
+        """Send packet and verify reception.
+
+        Args:
+            packet: The packet to send to testpmd.
+            testpmd: The testpmd session to send commands to.
+            should_receive: If :data:`True`, verifies packet was received.
+        """
+        testpmd.start()
+        received = self.send_packet_and_capture(packet=packet)
+        if should_receive:
+            self.verify(received != [], "Packet was dropped when it should have been received.")
+        else:
+            self.verify(received == [], "Packet was received when it should have been dropped.")
+
+    @requires(NicCapability.RX_OFFLOAD_VLAN_EXTEND)
+    @func_test
+    def test_qinq_filter(self) -> None:
+        """QinQ Rx filter test case.
+
+        Steps:
+            Launch testpmd with mac forwarding mode.
+            Enable VLAN filter/extend modes on port 0.
+            Add VLAN tag 100 to the filter on port 0.
+            Send test packet and capture verbose output.
+
+        Verify:
+            Packet with matching VLAN ID is received.
+            Packet with mismatched VLAN ID is dropped.
+        """
+        packets = [
+            Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb")
+            / Dot1Q(vlan=100)
+            / Dot1Q(vlan=200)
+            / IP(dst="192.0.2.1", src="198.51.100.1")
+            / UDP(dport=1234, sport=5678),
+            Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb")
+            / Dot1Q(vlan=101)
+            / Dot1Q(vlan=200)
+            / IP(dst="192.0.2.1", src="198.51.100.1")
+            / UDP(dport=1234, sport=5678),
+        ]
+        with TestPmdShell() as testpmd:
+            testpmd.stop_all_ports()
+            testpmd.set_vlan_filter(0, True)
+            testpmd.set_vlan_extend(0, True)
+            testpmd.start_all_ports()
+            testpmd.rx_vlan(100, 0, True)
+            self.send_packet_and_verify(packets[0], testpmd, should_receive=True)
+            self.send_packet_and_verify(packets[1], testpmd, should_receive=False)
+
+    @func_test
+    def test_qinq_forwarding(self) -> None:
+        """QinQ Rx filter test case.
+
+        Steps:
+            Launch testpmd with mac forwarding mode.
+            Disable VLAN filter mode on port 0.
+            Send test packet and capture verbose output.
+
+        Verify:
+            Check that the received packet has two separate VLAN layers in proper QinQ fashion.
+            Check that the received packet outer and inner VLAN layer has the appropriate ID.
+        """
+        test_packet = (
+            Ether(dst="ff:ff:ff:ff:ff:ff")
+            / Dot1AD(vlan=100)
+            / Dot1Q(vlan=200)
+            / IP(dst="1.2.3.4")
+            / UDP(dport=1234, sport=4321)
+        )
+        with TestPmdShell() as testpmd:
+            testpmd.stop_all_ports()
+            testpmd.set_vlan_filter(0, False)
+            testpmd.start_all_ports()
+            testpmd.start()
+            received_packet = self.send_packet_and_capture(test_packet)
+
+            self.verify(
+                received_packet != [], "Packet was dropped when it should have been received."
+            )
+
+            for packet in received_packet:
+                vlan_tags = packet.getlayer(Dot1AD), packet.getlayer(Dot1Q)
+                self.verify(len(vlan_tags) == 2, f"Expected 2 VLAN tags, found {len(vlan_tags)}")
+
+                if packet.haslayer(Dot1Q):
+                    outer_vlan_id = packet[Dot1Q].vlan
+                    self.verify(
+                        outer_vlan_id == 100,
+                        f"Outer VLAN ID was {outer_vlan_id} when it should have been 100.",
+                    )
+                else:
+                    self.verify(False, "VLAN layer not found in received packet.")
+
+                if packet[Dot1Q].haslayer(Dot1Q):
+                    inner_vlan_id = packet[Dot1Q].payload[Dot1Q].vlan
+                    self.verify(
+                        inner_vlan_id == 200,
+                        f"Inner VLAN ID was {inner_vlan_id} when it should have been 200",
+                    )
+
+    @func_test
+    def test_mismatched_tpid(self) -> None:
+        """Test behavior when outer VLAN tag has a non-standard TPID (not 0x8100 or 0x88a8).
+
+        Steps:
+            Launch testpmd in mac forward mode.
+            Set verbose level to 1.
+            Disable VLAN filtering on port 0.
+            Send and capture test packet.
+
+        Verify:
+            Only 1 VLAN tag is in received packet.
+            Inner VLAN ID matches the original packet.
+        """
+        with TestPmdShell() as testpmd:
+            testpmd.set_verbose(level=1)
+            testpmd.stop_all_ports()
+            testpmd.set_vlan_filter(0, False)
+            testpmd.start_all_ports()
+            testpmd.start()
+
+            mismatched_packet = (
+                Ether(dst="ff:ff:ff:ff:ff:ff")
+                / Dot1Q(vlan=100, type=0x1234)
+                / Dot1Q(vlan=200)
+                / IP(dst="1.2.3.4")
+                / UDP(dport=1234, sport=4321)
+            )
+
+            received_packet = self.send_packet_and_capture(mismatched_packet)
+
+            self.verify(
+                received_packet != [], "Packet was dropped when it should have been received."
+            )
+
+            for packet in received_packet:
+                vlan_tags = [layer for layer in packet.layers() if layer == Dot1Q]
+                self.verify(
+                    len(vlan_tags) == 1,
+                    f"Expected 1 VLAN tag due to mismatched TPID, found {len(vlan_tags)}",
+                )
+
+                vlan_id = packet[Dot1Q].vlan
+                self.verify(vlan_id == 200, f"Expected inner VLAN ID 200, got {vlan_id}")
+
+    def strip_verify(
+        self, packet: Packet, expected_num_tags: int, context: str, check_id: bool = False
+    ) -> None:
+        """Helper method for verifying packet stripping functionality."""
+        if expected_num_tags == 0:
+            has_vlan = bool(packet.haslayer(Dot1Q))
+            if not has_vlan:
+                self.verify(True, "Packet contained VLAN layer")
+            else:
+                vlan_layer = packet.getlayer(Dot1Q)
+                self.verify(
+                    vlan_layer is not None
+                    and vlan_layer.type != 0x8100
+                    and vlan_layer.type != 0x88A8,
+                    f"""VLAN tags found in packet when should have been stripped: {packet.summary()}
+                    sent packet: {context}""",
+                )
+        if expected_num_tags == 1:
+
+            def count_vlan_tags(packet: Packet) -> int:
+                """Method for counting the number of VLAN layers in a packet."""
+                count = 0
+                layer = packet.getlayer(Dot1Q)
+                while layer:
+                    if layer.type == 0x8100:
+                        count += 1
+                    layer = layer.payload.getlayer(Dot1Q)
+                return count
+
+            tag_count = count_vlan_tags(packet)
+            self.verify(
+                tag_count == 1,
+                f"""Expected one 0x8100 VLAN tag but found {tag_count}: {packet.summary()}
+                sent packet: {context}""",
+            )
+            first_dot1q = packet.getlayer(Dot1Q)
+            self.verify(
+                first_dot1q is not None and first_dot1q.type == 0x8100,
+                f"""VLAN tag 0x8100 not found in packet: {packet.summary()}
+                sent packet: {context}""",
+            )
+            if check_id:
+                self.verify(
+                    packet[Dot1Q].vlan == 200,
+                    f"""VLAN ID 200 not found in received packet: {packet.summary()}
+                    sent packet: {context}""",
+                )
+
+    @requires(NicCapability.RX_OFFLOAD_VLAN_STRIP)
+    @func_test
+    def test_vlan_strip(self) -> None:
+        """Test combinations of VLAN/QinQ strip settings with various QinQ packets.
+
+        Steps:
+            Launch testpmd with VLAN strip enabled.
+            Send four VLAN/QinQ related test packets.
+
+        Verify:
+            Check received packets have the expected VLAN/QinQ layers/tags.
+        """
+        test_packets = [
+            Ether() / Dot1Q(type=0x8100) / IP() / UDP(dport=1234, sport=4321),
+            Ether()
+            / Dot1Q(vlan=100, type=0x8100)
+            / Dot1Q(vlan=200, type=0x8100)
+            / IP()
+            / UDP(dport=1234, sport=4321),
+            Ether() / Dot1Q(type=0x88A8) / IP() / UDP(dport=1234, sport=4321),
+            Ether() / Dot1Q(type=0x88A8) / Dot1Q(type=0x8100) / IP() / UDP(dport=1234, sport=4321),
+        ]
+        with TestPmdShell() as testpmd:
+            testpmd.stop_all_ports()
+            testpmd.set_vlan_strip(0, True)
+            testpmd.start_all_ports()
+            testpmd.start()
+
+            rec1 = self.send_packet_and_capture(test_packets[0])
+            rec2 = self.send_packet_and_capture(test_packets[1])
+            rec3 = self.send_packet_and_capture(test_packets[2])
+            rec4 = self.send_packet_and_capture(test_packets[3])
+
+            testpmd.stop()
+
+            try:
+                context = "Single VLAN"
+                self.strip_verify(rec1[0], 0, context)
+                context = "Stacked VLAN"
+                self.strip_verify(rec2[0], 1, context, check_id=True)
+                context = "Single S-VLAN"
+                self.strip_verify(rec3[0], 0, context)
+                context = "QinQ"
+                self.strip_verify(rec4[0], 1, context)
+            except IndexError:
+                self.verify(
+                    False, f"{context} packet was dropped when it should have been received."
+                )
+
+    @requires(NicCapability.RX_OFFLOAD_QINQ_STRIP)
+    @func_test
+    def test_qinq_strip(self) -> None:
+        """Test combinations of VLAN/QinQ strip settings with various QinQ packets.
+
+        Steps:
+            Launch testpmd with QinQ and VLAN strip enabled.
+            Send four VLAN/QinQ related test packets.
+
+        Verify:
+            Check received packets have the expected VLAN/QinQ layers/tags.
+        """
+        test_packets = [
+            Ether() / Dot1Q(type=0x8100) / IP() / UDP(dport=1234, sport=4321),
+            Ether()
+            / Dot1Q(vlan=100, type=0x8100)
+            / Dot1Q(vlan=200, type=0x8100)
+            / IP()
+            / UDP(dport=1234, sport=4321),
+            Ether() / Dot1Q(type=0x88A8) / IP() / UDP(dport=1234, sport=4321),
+            Ether() / Dot1Q(type=0x88A8) / Dot1Q(type=0x8100) / IP() / UDP(dport=1234, sport=4321),
+        ]
+        with TestPmdShell() as testpmd:
+            testpmd.stop_all_ports()
+            testpmd.set_qinq_strip(0, True)
+            testpmd.start_all_ports()
+            testpmd.start()
+
+            rec1 = self.send_packet_and_capture(test_packets[0])
+            rec2 = self.send_packet_and_capture(test_packets[1])
+            rec3 = self.send_packet_and_capture(test_packets[2])
+            rec4 = self.send_packet_and_capture(test_packets[3])
+
+            testpmd.stop()
+
+            try:
+                context = "Single VLAN"
+                self.strip_verify(rec1[0], 0, context)
+                context = "Stacked VLAN"
+                self.strip_verify(rec2[0], 1, context, check_id=True)
+                context = "Single S-VLAN"
+                self.strip_verify(rec3[0], 0, context)
+                context = "QinQ"
+                self.strip_verify(rec4[0], 0, context)
+            except IndexError:
+                self.log(f"{context} packet was dropped when it should have been received.")
-- 
2.50.1



More information about the dev mailing list