[PATCH v2 2/2] dts: add QinQ test suite
Dean Marx
dmarx at iol.unh.edu
Fri Oct 17 22:31:59 CEST 2025
Add QinQ test suite, which verifies PMD behavior when
sending QinQ (IEEE 802.1ad) packets.
Signed-off-by: Dean Marx <dmarx at iol.unh.edu>
---
dts/tests/TestSuite_qinq.py | 236 ++++++++++++++++++++++++++++++++++++
1 file changed, 236 insertions(+)
create mode 100644 dts/tests/TestSuite_qinq.py
diff --git a/dts/tests/TestSuite_qinq.py b/dts/tests/TestSuite_qinq.py
new file mode 100644
index 0000000000..e97f407c1d
--- /dev/null
+++ b/dts/tests/TestSuite_qinq.py
@@ -0,0 +1,236 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 University of New Hampshire
+
+"""QinQ (802.1ad) Test Suite.
+
+This test suite verifies the correctness and capability of DPDK Poll Mode Drivers (PMDs)
+in handling QinQ-tagged Ethernet frames, which contain a pair of stacked VLAN headers
+(outer S-VLAN and inner C-VLAN). These tests ensure that both software and hardware offloads
+related to QinQ behave as expected across different NIC vendors and PMD implementations.
+"""
+
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Dot1AD, Dot1Q, Ether
+from scapy.packet import Packet, Raw
+
+from api.capabilities import NicCapability, requires_nic_capability
+from api.testpmd import TestPmd
+from framework.test_suite import TestSuite, func_test
+
+
+class TestQinq(TestSuite):
+ """QinQ test suite.
+
+ This suite consists of 3 test cases:
+ 1. QinQ Filter: Enable VLAN filter and verify packets with mismatched VLAN IDs are dropped,
+ and packets with matching VLAN IDs are received.
+ 2. QinQ Forwarding: Send a QinQ packet and verify the received packet contains
+ both QinQ/VLAN layers.
+ 3. QinQ Strip: Enable VLAN/QinQ stripping and verify sent packets are received with the
+ expected VLAN/QinQ layers.
+ """
+
+ def _send_packet_and_verify(
+ self, packet: Packet, testpmd: TestPmd, should_receive: bool
+ ) -> None:
+ """Send packet and verify reception.
+
+ Args:
+ packet: The packet to send to testpmd.
+ testpmd: The testpmd session to send commands to.
+ should_receive: If :data:`True`, verifies packet was received.
+ """
+ testpmd.start()
+ packets = self.send_packet_and_capture(packet=packet)
+ test_packet = self._get_relevant_packet(packets)
+ if should_receive:
+ self.verify(
+ test_packet is not None, "Packet was dropped when it should have been received."
+ )
+ else:
+ self.verify(
+ test_packet is None, "Packet was received when it should have been dropped."
+ )
+
+ def _strip_verify(self, packet: Packet | None, expects_tag: bool, context: str) -> bool:
+ """Helper method for verifying packet stripping functionality.
+
+ Returns: :data:`True` if tags are stripped or not stripped accordingly,
+ otherwise :data:`False`
+ """
+ if packet is None:
+ self.log(f"{context} packet was dropped when it should have been received.")
+ return False
+
+ if not expects_tag:
+ if packet.haslayer(Dot1Q) or packet.haslayer(Dot1AD):
+ self.log(
+ f"VLAN tags found in packet when should have been stripped: "
+ f"{packet.summary()}\tsent packet: {context}",
+ )
+ return False
+
+ if expects_tag:
+ if vlan_layer := packet.getlayer(Dot1Q):
+ if vlan_layer.vlan != 200:
+ self.log(
+ f"Expected VLAN ID 200 but found ID {vlan_layer.vlan}: "
+ f"{packet.summary()}\tsent packet: {context}",
+ )
+ return False
+ else:
+ self.log(
+ f"Expected 0x8100 VLAN tag but none found: {packet.summary()}"
+ f"\tsent packet: {context}"
+ )
+ return False
+
+ return True
+
+ def _get_relevant_packet(self, packet_list: list[Packet]) -> Packet | None:
+ """Helper method for checking received packet list for sent packet."""
+ for packet in packet_list:
+ if hasattr(packet, "load") and b"xxxxx" in packet.load:
+ return packet
+ return None
+
+ @requires_nic_capability(NicCapability.RX_OFFLOAD_VLAN_EXTEND)
+ @func_test
+ def test_qinq_filter(self) -> None:
+ """QinQ Rx filter test case.
+
+ Steps:
+ Launch testpmd with mac forwarding mode.
+ Enable VLAN filter/extend modes on port 0.
+ Add VLAN tag 100 to the filter on port 0.
+ Send test packet and capture verbose output.
+
+ Verify:
+ Packet with matching VLAN ID is received.
+ Packet with mismatched VLAN ID is dropped.
+ """
+ packets = [
+ Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb")
+ / Dot1AD(vlan=100)
+ / Dot1Q(vlan=200)
+ / IP(dst="192.0.2.1", src="198.51.100.1")
+ / UDP(dport=1234, sport=5678),
+ Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb")
+ / Dot1AD(vlan=101)
+ / Dot1Q(vlan=200)
+ / IP(dst="192.0.2.1", src="198.51.100.1")
+ / UDP(dport=1234, sport=5678),
+ ]
+ with TestPmd() as testpmd:
+ testpmd.set_vlan_filter(0, True)
+ testpmd.set_vlan_extend(0, True)
+ testpmd.rx_vlan(100, 0, True)
+ self._send_packet_and_verify(packets[0], testpmd, should_receive=True)
+ self._send_packet_and_verify(packets[1], testpmd, should_receive=False)
+
+ @func_test
+ def test_qinq_forwarding(self) -> None:
+ """QinQ Rx filter test case.
+
+ Steps:
+ Launch testpmd with mac forwarding mode.
+ Disable VLAN filter mode on port 0.
+ Send test packet and capture verbose output.
+
+ Verify:
+ Check that the received packet has two separate VLAN layers in proper QinQ fashion.
+ Check that the received packet outer and inner VLAN layer has the appropriate ID.
+ """
+ test_packet = (
+ Ether(dst="ff:ff:ff:ff:ff:ff")
+ / Dot1AD(vlan=100)
+ / Dot1Q(vlan=200)
+ / IP(dst="1.2.3.4")
+ / UDP(dport=1234, sport=4321)
+ / Raw(load="xxxxx")
+ )
+ with TestPmd() as testpmd:
+ testpmd.set_vlan_filter(0, False)
+ testpmd.start()
+ received_packets = self.send_packet_and_capture(test_packet)
+ packet = self._get_relevant_packet(received_packets)
+
+ self.verify(packet is not None, "Packet was dropped when it should have been received.")
+
+ if packet is not None:
+ self.verify(bool(packet.haslayer(Dot1AD)), "QinQ layer not found in packet")
+
+ if outer_vlan := packet.getlayer(Dot1AD):
+ outer_vlan_id = outer_vlan.vlan
+ self.verify(
+ outer_vlan_id == 100,
+ f"Outer VLAN ID was {outer_vlan_id} when it should have been 100.",
+ )
+ else:
+ self.verify(False, "VLAN layer not found in received packet.")
+
+ if outer_vlan and (inner_vlan := outer_vlan.getlayer(Dot1Q)):
+ inner_vlan_id = inner_vlan.vlan
+ self.verify(
+ inner_vlan_id == 200,
+ f"Inner VLAN ID was {inner_vlan_id} when it should have been 200",
+ )
+
+ @requires_nic_capability(NicCapability.RX_OFFLOAD_QINQ_STRIP)
+ @func_test
+ def test_qinq_strip(self) -> None:
+ """Test combinations of VLAN/QinQ strip settings with various QinQ packets.
+
+ Steps:
+ Launch testpmd with QinQ and VLAN strip enabled.
+ Send four VLAN/QinQ related test packets.
+
+ Verify:
+ Check received packets have the expected VLAN/QinQ layers/tags.
+ """
+ test_packets = [
+ Ether() / Dot1Q() / IP() / UDP(dport=1234, sport=4321) / Raw(load="xxxxx"),
+ Ether()
+ / Dot1Q(vlan=100)
+ / Dot1Q(vlan=200)
+ / IP()
+ / UDP(dport=1234, sport=4321)
+ / Raw(load="xxxxx"),
+ Ether() / Dot1AD() / IP() / UDP(dport=1234, sport=4321) / Raw(load="xxxxx"),
+ Ether() / Dot1AD() / Dot1Q() / IP() / UDP(dport=1234, sport=4321) / Raw(load="xxxxx"),
+ ]
+ with TestPmd() as testpmd:
+ testpmd.set_qinq_strip(0, True)
+ testpmd.set_vlan_strip(0, True)
+ testpmd.start()
+
+ received_packets1 = self.send_packet_and_capture(test_packets[0])
+ packet1 = self._get_relevant_packet(received_packets1)
+ received_packets2 = self.send_packet_and_capture(test_packets[1])
+ packet2 = self._get_relevant_packet(received_packets2)
+ received_packets3 = self.send_packet_and_capture(test_packets[2])
+ packet3 = self._get_relevant_packet(received_packets3)
+ received_packets4 = self.send_packet_and_capture(test_packets[3])
+ packet4 = self._get_relevant_packet(received_packets4)
+
+ testpmd.stop()
+
+ tests = [
+ ("Single 8100 tag", self._strip_verify(packet1, False, "Single 8100 tag")),
+ (
+ "Double 8100 tag",
+ self._strip_verify(packet2, True, "Double 8100 tag"),
+ ),
+ ("Single 88a8 tag", self._strip_verify(packet3, False, "Single 88a8 tag")),
+ (
+ "QinQ (88a8 and 8100 tags)",
+ self._strip_verify(packet4, False, "QinQ (88a8 and 8100 tags)"),
+ ),
+ ]
+
+ failed = [ctx for ctx, result in tests if not result]
+
+ self.verify(
+ not failed,
+ f"The following packets were not stripped correctly: {', '.join(failed)}",
+ )
--
2.51.0
More information about the dev
mailing list