[PATCH v1] dts: add ipgre test suite
Andrew Bailey
abailey at iol.unh.edu
Fri Mar 27 19:02:32 CET 2026
Port over the IP GRE test suite from old DTS to next DTS. This test
suite covers GRE tunneling and checksum offload verification using this
protocol.
Bugzilla ID: 1480
Signed-off-by: Andrew Bailey <abailey at iol.unh.edu>
---
doc/api/dts/tests.TestSuite_ip_gre.rst | 8 +
dts/api/testpmd/__init__.py | 23 ++
dts/tests/TestSuite_ip_gre.py | 301 +++++++++++++++++++++++++
3 files changed, 332 insertions(+)
create mode 100644 doc/api/dts/tests.TestSuite_ip_gre.rst
create mode 100644 dts/tests/TestSuite_ip_gre.py
diff --git a/doc/api/dts/tests.TestSuite_ip_gre.rst b/doc/api/dts/tests.TestSuite_ip_gre.rst
new file mode 100644
index 0000000000..e8ce01dc0b
--- /dev/null
+++ b/doc/api/dts/tests.TestSuite_ip_gre.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+ip_gre Test Suite
+=================
+
+.. automodule:: tests.TestSuite_ip_gre
+ :members:
+ :show-inheritance:
diff --git a/dts/api/testpmd/__init__.py b/dts/api/testpmd/__init__.py
index e9187440bb..33bf5e63de 100644
--- a/dts/api/testpmd/__init__.py
+++ b/dts/api/testpmd/__init__.py
@@ -951,6 +951,29 @@ def set_flow_control(
f"Testpmd failed to set the {flow_ctrl} in port {port}."
)
+ def set_csum_parse_tunnel(self, port: int, on: bool, verify: bool = True) -> None:
+ """Set parse tunnel on or of in testpmd for a given port.
+
+ Args:
+ port: The ID of the requested port
+ on: set parse tunnel on if `on` is :data:`True`, otherwise off
+ verify: if :data:`True`, the output of the command is scanned to verify that
+ parse tunnel was set successfully
+
+ Raises:
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the command
+ fails to execute.
+
+ """
+ output = self.send_command(f"csum parse-tunnel {"on" if on else "off"} {port}")
+ if verify and f"Parse tunnel is {'on' if on else'off'}" not in output:
+ self._logger.debug(
+ f"Testpmd failed to set csum parse-tunnel {'on' if on else 'off'} in port {port}"
+ )
+ raise InteractiveCommandExecutionError(
+ f"Testpmd failed to set csum parse-tunnel {'on' if on else 'off'} in port {port}"
+ )
+
def show_port_flow_info(self, port: int) -> TestPmdPortFlowCtrl | None:
"""Show port info flow.
diff --git a/dts/tests/TestSuite_ip_gre.py b/dts/tests/TestSuite_ip_gre.py
new file mode 100644
index 0000000000..fc51eef181
--- /dev/null
+++ b/dts/tests/TestSuite_ip_gre.py
@@ -0,0 +1,301 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2026 University of New Hampshire
+
+"""DPDK IP GRE test suite."""
+
+from scapy.layers.inet import GRE, IP, TCP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Dot1Q, Ether
+from scapy.layers.sctp import SCTP
+from scapy.packet import Packet
+
+from api.capabilities import (
+ NicCapability,
+ requires_nic_capability,
+)
+from api.packet import send_packet_and_capture
+from api.test import verify
+from api.testpmd import TestPmd
+from api.testpmd.config import SimpleForwardingModes
+from api.testpmd.types import (
+ ChecksumOffloadOptions,
+ PacketOffloadFlag,
+ RtePTypes,
+ TestPmdVerbosePacket,
+)
+from framework.test_suite import TestSuite, func_test
+
+SRC_ID = "FF:FF:FF:FF:FF:FF"
+
+
+class TestIpGre(TestSuite):
+ """IP GRE test suite."""
+
+ def _check_for_matching_packet(
+ self, output: list[TestPmdVerbosePacket], flags: RtePTypes
+ ) -> bool:
+ """Returns :data:`True` if the packet in verbose output contains all specified flags."""
+ for packet in output:
+ if packet.src_mac == SRC_ID:
+ if flags not in packet.hw_ptype and flags not in packet.sw_ptype:
+ return False
+ return True
+
+ def _send_packet_and_verify_flags(
+ self, expected_flag: RtePTypes, packet: Packet, testpmd: TestPmd
+ ) -> None:
+ """Sends a packet to the DUT and verifies the verbose ptype flags."""
+ send_packet_and_capture(packet=packet)
+ verbose_output = testpmd.extract_verbose_output(testpmd.stop(verify=True))
+ valid = self._check_for_matching_packet(output=verbose_output, flags=expected_flag)
+ verify(valid, f"Packet type flag did not match the expected flag: {expected_flag}.")
+
+ def _setup_session(
+ self, testpmd: TestPmd, expected_flags: list[RtePTypes], packet_list=list[Packet]
+ ) -> None:
+ """Sets the forwarding and verbose mode of each test case interactive shell session."""
+ testpmd.set_forward_mode(SimpleForwardingModes.rxonly)
+ testpmd.set_verbose(level=1)
+ for i in range(0, len(packet_list)):
+ testpmd.start(verify=True)
+ self._send_packet_and_verify_flags(
+ expected_flag=expected_flags[i], packet=packet_list[i], testpmd=testpmd
+ )
+
+ def _send_packet_and_verify_checksum(
+ self, packet: Packet, good_L4: bool, good_IP: bool, testpmd: TestPmd
+ ) -> None:
+ """Send packet and verify verbose output matches expected output."""
+ testpmd.start()
+ send_packet_and_capture(packet=packet)
+ verbose_output = testpmd.extract_verbose_output(testpmd.stop())
+ is_IP = is_L4 = None
+ for testpmd_packet in verbose_output:
+ if testpmd_packet.src_mac == SRC_ID:
+ is_IP = PacketOffloadFlag.RTE_MBUF_F_RX_IP_CKSUM_GOOD in testpmd_packet.ol_flags
+ is_L4 = PacketOffloadFlag.RTE_MBUF_F_RX_L4_CKSUM_GOOD in testpmd_packet.ol_flags
+ verify(
+ is_IP is not None and is_L4 is not None,
+ "Test packet was dropped when it should have been received.",
+ )
+ verify(is_L4 == good_L4, "Layer 4 checksum flag did not match expected checksum flag.")
+ verify(is_IP == good_IP, "IP checksum flag did not match expected checksum flag.")
+
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_GRE_TNL_TSO)
+ @func_test
+ def gre_ip4_pkt_detect(self) -> None:
+ """GRE IP4 packet send and detect.
+
+ Steps:
+ * Craft packets using GRE tunneling.
+ * Send them to the testpmd application.
+
+ Verify:
+ * All packets were received.
+ """
+ packets = [
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / UDP(),
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / TCP(),
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / SCTP(),
+ Ether(src=SRC_ID) / Dot1Q() / IP() / GRE() / IP() / UDP(),
+ Ether(src=SRC_ID) / Dot1Q() / IP() / GRE() / IP() / TCP(),
+ Ether(src=SRC_ID) / Dot1Q() / IP() / GRE() / IP() / SCTP(),
+ ]
+ flags = [
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_SCTP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV4
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_SCTP,
+ ]
+ with TestPmd() as testpmd:
+ self._setup_session(testpmd=testpmd, expected_flags=flags, packet_list=packets)
+
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_GRE_TNL_TSO)
+ @func_test
+ def gre_ip6_outer_ip4_inner_pkt_detect(self) -> None:
+ """GRE IPv6 outer and IPv4 inner send and detect.
+
+ Steps:
+ * Craft packets using GRE tunneling.
+ * Send them to the testpmd application.
+
+ Verify:
+ * All packets were received.
+ """
+ packets = [
+ Ether(src=SRC_ID) / IPv6() / GRE() / IP() / UDP(),
+ Ether(src=SRC_ID) / IPv6() / GRE() / IP() / TCP(),
+ Ether(src=SRC_ID) / IPv6() / GRE() / IP() / SCTP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IP() / UDP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IP() / TCP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IP() / SCTP(),
+ ]
+ flags = [
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_SCTP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV4
+ | RtePTypes.INNER_L4_SCTP,
+ ]
+ with TestPmd() as testpmd:
+ self._setup_session(testpmd=testpmd, expected_flags=flags, packet_list=packets)
+
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_GRE_TNL_TSO)
+ @func_test
+ def gre_ip6_outer_ip6_inner_pkt_detect(self) -> None:
+ """GRE IPv6 outer and inner send and detect.
+
+ Steps:
+ * Craft packets using GRE tunneling.
+ * Send them to the testpmd application.
+
+ Verify:
+ * All packets were received.
+ """
+ packets = [
+ Ether(src=SRC_ID) / IPv6() / GRE() / IPv6() / UDP(),
+ Ether(src=SRC_ID) / IPv6() / GRE() / IPv6() / TCP(),
+ Ether(src=SRC_ID) / IPv6() / GRE() / IPv6() / SCTP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IPv6() / UDP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IPv6() / TCP(),
+ Ether(src=SRC_ID) / Dot1Q() / IPv6() / GRE() / IPv6() / SCTP(),
+ ]
+ flags = [
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_SCTP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_UDP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_TCP,
+ RtePTypes.L2_ETHER_VLAN
+ | RtePTypes.L3_IPV6
+ | RtePTypes.TUNNEL_GRE
+ | RtePTypes.INNER_L3_IPV6
+ | RtePTypes.INNER_L4_SCTP,
+ ]
+ with TestPmd() as testpmd:
+ self._setup_session(testpmd=testpmd, expected_flags=flags, packet_list=packets)
+
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_OUTER_IPV4_CKSUM)
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_IPV4_CKSUM)
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_TCP_CKSUM)
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_UDP_CKSUM)
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_SCTP_CKSUM)
+ @requires_nic_capability(NicCapability.PORT_TX_OFFLOAD_GRE_TNL_TSO)
+ @func_test
+ def gre_checksum_offload(self) -> None:
+ """GRE checksum offload test.
+
+ Steps:
+ * Craft packets using GRE tunneling.
+ * Alter checksum of each packet.
+ * Send packets to the testpmd application.
+
+ Verify:
+ * All packets were received with the expected checksum flags.
+ """
+ packets = [
+ Ether(src=SRC_ID) / IP(chksum=0x0) / GRE() / IP() / TCP(),
+ Ether(src=SRC_ID) / IP() / GRE() / IP(chksum=0x0) / TCP(),
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / TCP(chksum=0x0),
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / UDP(chksum=0xFFFF),
+ Ether(src=SRC_ID) / IP() / GRE() / IP() / SCTP(chksum=0x0),
+ ]
+ good_l4_ip = [
+ (True, True),
+ (True, False),
+ (False, True),
+ (False, True),
+ (False, True),
+ ]
+ with TestPmd() as testpmd:
+ testpmd.set_forward_mode(SimpleForwardingModes.csum)
+ testpmd.csum_set_hw(
+ layers=ChecksumOffloadOptions.ip
+ | ChecksumOffloadOptions.udp
+ | ChecksumOffloadOptions.outer_ip
+ | ChecksumOffloadOptions.sctp
+ | ChecksumOffloadOptions.tcp,
+ port_id=0,
+ )
+ testpmd.set_csum_parse_tunnel(port=0, on=True)
+ testpmd.set_verbose(1)
+ testpmd.start_all_ports()
+ testpmd.start()
+ for i in range(len(packets)):
+ self._send_packet_and_verify_checksum(
+ packets[i],
+ good_l4_ip[i][0],
+ good_l4_ip[i][1],
+ testpmd,
+ )
--
2.50.1
More information about the dev
mailing list