<div dir="ltr">This series has been applied to next-dts. <div><br></div><div>I am noting here that patchwork CI results shows a checkpatches warning. However, I have run checkpatches on this patch and it's reporting no issues. My DPDK_CHECKPATCH_PATH and codespell dictionary seem fine. So, I am confident enough that there is no issue to apply to next-dts. Ali please let me know if you are concerned.</div></div><br><div class="gmail_quote gmail_quote_container"><div dir="ltr" class="gmail_attr">On Fri, Aug 29, 2025 at 1:43 PM Paul Szczepanek <<a href="mailto:paul.szczepanek@arm.com">paul.szczepanek@arm.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">Testpmd moved into new API directory.<br>
Capabilities converted into vanilla enum and moved to API.<br>
Removed some ciruclar dependencies and incorrect imports.<br>
<br>
Signed-off-by: Paul Szczepanek <<a href="mailto:paul.szczepanek@arm.com" target="_blank">paul.szczepanek@arm.com</a>><br>
---<br>
doc/api/dts/api.capabilities.rst | 8 +<br>
doc/api/dts/api.rst | 20 +<br>
...stpmd_shell.rst => api.testpmd.config.rst} | 4 +-<br>
doc/api/dts/api.testpmd.rst | 15 +<br>
doc/api/dts/api.testpmd.types.rst | 8 +<br>
doc/api/dts/framework.params.rst | 1 -<br>
doc/api/dts/framework.params.testpmd.rst | 8 -<br>
doc/api/dts/framework.remote_session.rst | 1 -<br>
doc/api/dts/index.rst | 1 +<br>
dts/api/__init__.py | 14 +<br>
dts/api/capabilities.py | 180 ++<br>
dts/api/testpmd/__init__.py | 1294 ++++++++<br>
.../testpmd.py => api/testpmd/config.py} | 9 +-<br>
dts/api/testpmd/types.py | 1406 ++++++++<br>
dts/framework/config/__init__.py | 3 +-<br>
dts/framework/params/eal.py | 12 +-<br>
dts/framework/params/types.py | 4 +-<br>
dts/framework/remote_session/__init__.py | 44 -<br>
dts/framework/remote_session/testpmd_shell.py | 2844 -----------------<br>
dts/framework/testbed_model/capability.py | 144 +-<br>
dts/framework/testbed_model/linux_session.py | 2 +-<br>
dts/framework/testbed_model/os_session.py | 14 +-<br>
dts/framework/testbed_model/topology.py | 30 +-<br>
23 files changed, 3086 insertions(+), 2980 deletions(-)<br>
create mode 100644 doc/api/dts/api.capabilities.rst<br>
create mode 100644 doc/api/dts/api.rst<br>
rename doc/api/dts/{framework.remote_session.testpmd_shell.rst => api.testpmd.config.rst} (54%)<br>
create mode 100644 doc/api/dts/api.testpmd.rst<br>
create mode 100644 doc/api/dts/api.testpmd.types.rst<br>
delete mode 100644 doc/api/dts/framework.params.testpmd.rst<br>
create mode 100644 dts/api/__init__.py<br>
create mode 100644 dts/api/capabilities.py<br>
create mode 100644 dts/api/testpmd/__init__.py<br>
rename dts/{framework/params/testpmd.py => api/testpmd/config.py} (98%)<br>
create mode 100644 dts/api/testpmd/types.py<br>
delete mode 100644 dts/framework/remote_session/testpmd_shell.py<br>
<br>
diff --git a/doc/api/dts/api.capabilities.rst b/doc/api/dts/api.capabilities.rst<br>
new file mode 100644<br>
index 0000000000..311872f61d<br>
--- /dev/null<br>
+++ b/doc/api/dts/api.capabilities.rst<br>
@@ -0,0 +1,8 @@<br>
+.. SPDX-License-Identifier: BSD-3-Clause<br>
+<br>
+capabilities - SUT Capabilities<br>
+==========================================<br>
+<br>
+.. automodule:: api.capabilities<br>
+ :members:<br>
+ :show-inheritance:<br>
diff --git a/doc/api/dts/api.rst b/doc/api/dts/api.rst<br>
new file mode 100644<br>
index 0000000000..d3cf1226eb<br>
--- /dev/null<br>
+++ b/doc/api/dts/api.rst<br>
@@ -0,0 +1,20 @@<br>
+.. SPDX-License-Identifier: BSD-3-Clause<br>
+<br>
+api - DTS API<br>
+==========================================<br>
+<br>
+.. automodule:: api<br>
+ :members:<br>
+ :show-inheritance:<br>
+<br>
+.. toctree::<br>
+ :hidden:<br>
+ :maxdepth: 2<br>
+<br>
+ api.testpmd<br>
+<br>
+.. toctree::<br>
+ :hidden:<br>
+ :maxdepth: 1<br>
+<br>
+ api.capabilities<br>
\ No newline at end of file<br>
diff --git a/doc/api/dts/framework.remote_session.testpmd_shell.rst b/doc/api/dts/api.testpmd.config.rst<br>
similarity index 54%<br>
rename from doc/api/dts/framework.remote_session.testpmd_shell.rst<br>
rename to doc/api/dts/api.testpmd.config.rst<br>
index 81ca23337f..d338c07a36 100644<br>
--- a/doc/api/dts/framework.remote_session.testpmd_shell.rst<br>
+++ b/doc/api/dts/api.testpmd.config.rst<br>
@@ -1,8 +1,8 @@<br>
.. SPDX-License-Identifier: BSD-3-Clause<br>
<br>
-testpmd\_shell - Testpmd Interactive Remote Shell<br>
+config - Testpmd configuration<br>
=================================================<br>
<br>
-.. automodule:: framework.remote_session.testpmd_shell<br>
+.. automodule:: api.testpmd.config<br>
:members:<br>
:show-inheritance:<br>
diff --git a/doc/api/dts/api.testpmd.rst b/doc/api/dts/api.testpmd.rst<br>
new file mode 100644<br>
index 0000000000..75a92823f8<br>
--- /dev/null<br>
+++ b/doc/api/dts/api.testpmd.rst<br>
@@ -0,0 +1,15 @@<br>
+.. SPDX-License-Identifier: BSD-3-Clause<br>
+<br>
+testpmd - Testpmd Interactive Remote Shell<br>
+=================================================<br>
+<br>
+.. automodule:: api.testpmd<br>
+ :members:<br>
+ :show-inheritance:<br>
+<br>
+.. toctree::<br>
+ :hidden:<br>
+ :maxdepth: 1<br>
+<br>
+ api.testpmd.types<br>
+ api.testpmd.config<br>
\ No newline at end of file<br>
diff --git a/doc/api/dts/api.testpmd.types.rst b/doc/api/dts/api.testpmd.types.rst<br>
new file mode 100644<br>
index 0000000000..75b197aa73<br>
--- /dev/null<br>
+++ b/doc/api/dts/api.testpmd.types.rst<br>
@@ -0,0 +1,8 @@<br>
+.. SPDX-License-Identifier: BSD-3-Clause<br>
+<br>
+types - Testpmd types<br>
+=================================================<br>
+<br>
+.. automodule:: api.testpmd.types<br>
+ :members:<br>
+ :show-inheritance:<br>
diff --git a/doc/api/dts/framework.params.rst b/doc/api/dts/framework.params.rst<br>
index 4e263e2e5c..d8c6af9667 100644<br>
--- a/doc/api/dts/framework.params.rst<br>
+++ b/doc/api/dts/framework.params.rst<br>
@@ -12,5 +12,4 @@ params - Command Line Parameters Modelling<br>
:maxdepth: 1<br>
<br>
framework.params.eal<br>
- framework.params.testpmd<br>
framework.params.types<br>
diff --git a/doc/api/dts/framework.params.testpmd.rst b/doc/api/dts/framework.params.testpmd.rst<br>
deleted file mode 100644<br>
index 19583b01de..0000000000<br>
--- a/doc/api/dts/framework.params.testpmd.rst<br>
+++ /dev/null<br>
@@ -1,8 +0,0 @@<br>
-.. SPDX-License-Identifier: BSD-3-Clause<br>
-<br>
-testpmd - TestPMD Parameters Modelling<br>
-======================================<br>
-<br>
-.. automodule:: framework.params.testpmd<br>
- :members:<br>
- :show-inheritance:<br>
diff --git a/doc/api/dts/framework.remote_session.rst b/doc/api/dts/framework.remote_session.rst<br>
index 27c9153e64..b7dbe71412 100644<br>
--- a/doc/api/dts/framework.remote_session.rst<br>
+++ b/doc/api/dts/framework.remote_session.rst<br>
@@ -18,5 +18,4 @@ remote\_session - Node Connections Package<br>
framework.remote_session.shell_pool<br>
framework.remote_session.dpdk<br>
framework.remote_session.dpdk_shell<br>
- framework.remote_session.testpmd_shell<br>
framework.remote_session.python_shell<br>
diff --git a/doc/api/dts/index.rst b/doc/api/dts/index.rst<br>
index a11f395e11..c719297c11 100644<br>
--- a/doc/api/dts/index.rst<br>
+++ b/doc/api/dts/index.rst<br>
@@ -15,6 +15,7 @@ Packages<br>
:maxdepth: 1<br>
<br>
tests<br>
+ api<br>
framework.testbed_model<br>
framework.remote_session<br>
framework.params<br>
diff --git a/dts/api/__init__.py b/dts/api/__init__.py<br>
new file mode 100644<br>
index 0000000000..26b773bfbb<br>
--- /dev/null<br>
+++ b/dts/api/__init__.py<br>
@@ -0,0 +1,14 @@<br>
+# SPDX-License-Identifier: BSD-3-Clause<br>
+# Copyright(c) 2025 Arm Limited<br>
+<br>
+"""DTS API.<br>
+<br>
+This package exposes public API modules for test writers.<br>
+<br>
+All modules in this package are considered stable. Do not use framework<br>
+internal modules in your tests. Any missing functionality should be added<br>
+to the public API.<br>
+<br>
+Private methods and members are prefixed with an underscore and should not be<br>
+used outside of the framework.<br>
+"""<br>
diff --git a/dts/api/capabilities.py b/dts/api/capabilities.py<br>
new file mode 100644<br>
index 0000000000..1a79413f6f<br>
--- /dev/null<br>
+++ b/dts/api/capabilities.py<br>
@@ -0,0 +1,180 @@<br>
+# SPDX-License-Identifier: BSD-3-Clause<br>
+# Copyright(c) 2024 PANTHEON.tech s.r.o.<br>
+# Copyright(c) 2025 Arm Limited<br>
+<br>
+"""Testbed capabilities.<br>
+<br>
+This module provides a protocol that defines the common attributes of test cases and suites<br>
+and support for test environment capabilities.<br>
+<br>
+Many test cases are testing features not available on all hardware.<br>
+On the other hand, some test cases or suites may not need the most complex topology available.<br>
+<br>
+The module allows developers to mark test cases or suites to require certain hardware capabilities<br>
+or a particular topology.<br>
+<br>
+There are differences between hardware and topology capabilities:<br>
+<br>
+ * Hardware capabilities are assumed to not be required when not specified.<br>
+ * However, some topology is always available, so each test case or suite is assigned<br>
+ a default topology if no topology is specified in the decorator.<br>
+<br>
+Examples:<br>
+ .. code:: python<br>
+<br>
+ from framework.test_suite import TestSuite, func_test<br>
+ from framework.testbed_model.capability import LinkTopology, requires_link_topology<br>
+ # The whole test suite (each test case within) doesn't require any links.<br>
+ @requires_link_topology(LinkTopology.NO_LINK)<br>
+ @func_test<br>
+ class TestHelloWorld(TestSuite):<br>
+ def hello_world_single_core(self):<br>
+ ...<br>
+<br>
+ .. code:: python<br>
+<br>
+ from framework.test_suite import TestSuite, func_test<br>
+ from framework.testbed_model.capability import NicCapability, requires_nic_capability<br>
+ class TestPmdBufferScatter(TestSuite):<br>
+ # only the test case requires the SCATTERED_RX_ENABLED capability<br>
+ # other test cases may not require it<br>
+ @requires_nic_capability(NicCapability.SCATTERED_RX_ENABLED)<br>
+ @func_test<br>
+ def test_scatter_mbuf_2048(self):<br>
+"""<br>
+<br>
+from enum import IntEnum, auto<br>
+from typing import TYPE_CHECKING, Callable<br>
+<br>
+if TYPE_CHECKING:<br>
+ from framework.test_suite import TestProtocol<br>
+<br>
+<br>
+class LinkTopology(IntEnum):<br>
+ """Supported topology types."""<br>
+<br>
+ #: A topology with no Traffic Generator.<br>
+ NO_LINK = 0<br>
+ #: A topology with one physical link between the SUT node and the TG node.<br>
+ ONE_LINK = auto()<br>
+ #: A topology with two physical links between the Sut node and the TG node.<br>
+ TWO_LINKS = auto()<br>
+<br>
+ @classmethod<br>
+ def default(cls) -> "LinkTopology":<br>
+ """The default topology required by test cases if not specified otherwise."""<br>
+ return cls.TWO_LINKS<br>
+<br>
+<br>
+class NicCapability(IntEnum):<br>
+ """DPDK NIC capabilities.<br>
+<br>
+ The capabilities are used to mark test cases or suites that require a specific<br>
+ DPDK NIC capability to run. The capabilities are used by the test framework to<br>
+ determine whether a test case or suite can be run on the current testbed.<br>
+ """<br>
+<br>
+ #: Scattered packets Rx enabled.<br>
+ SCATTERED_RX_ENABLED = 0<br>
+ #: Device supports VLAN stripping.<br>
+ RX_OFFLOAD_VLAN_STRIP = auto()<br>
+ #: Device supports L3 checksum offload.<br>
+ RX_OFFLOAD_IPV4_CKSUM = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_UDP_CKSUM = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_TCP_CKSUM = auto()<br>
+ #: Device supports Large Receive Offload.<br>
+ RX_OFFLOAD_TCP_LRO = auto()<br>
+ #: Device supports QinQ (queue in queue) offload.<br>
+ RX_OFFLOAD_QINQ_STRIP = auto()<br>
+ #: Device supports inner packet L3 checksum.<br>
+ RX_OFFLOAD_OUTER_IPV4_CKSUM = auto()<br>
+ #: Device supports MACsec.<br>
+ RX_OFFLOAD_MACSEC_STRIP = auto()<br>
+ #: Device supports filtering of a VLAN Tag identifier.<br>
+ RX_OFFLOAD_VLAN_FILTER = auto()<br>
+ #: Device supports VLAN offload.<br>
+ RX_OFFLOAD_VLAN_EXTEND = auto()<br>
+ #: Device supports receiving segmented mbufs.<br>
+ RX_OFFLOAD_SCATTER = auto()<br>
+ #: Device supports Timestamp.<br>
+ RX_OFFLOAD_TIMESTAMP = auto()<br>
+ #: Device supports crypto processing while packet is received in NIC.<br>
+ RX_OFFLOAD_SECURITY = auto()<br>
+ #: Device supports CRC stripping.<br>
+ RX_OFFLOAD_KEEP_CRC = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_SCTP_CKSUM = auto()<br>
+ #: Device supports inner packet L4 checksum.<br>
+ RX_OFFLOAD_OUTER_UDP_CKSUM = auto()<br>
+ #: Device supports RSS hashing.<br>
+ RX_OFFLOAD_RSS_HASH = auto()<br>
+ #: Device supports scatter Rx packets to segmented mbufs.<br>
+ RX_OFFLOAD_BUFFER_SPLIT = auto()<br>
+ #: Device supports all checksum capabilities.<br>
+ RX_OFFLOAD_CHECKSUM = auto()<br>
+ #: Device supports all VLAN capabilities.<br>
+ RX_OFFLOAD_VLAN = auto()<br>
+ #: Device supports Rx queue setup after device started.<br>
+ RUNTIME_RX_QUEUE_SETUP = auto()<br>
+ #: Device supports Tx queue setup after device started.<br>
+ RUNTIME_TX_QUEUE_SETUP = auto()<br>
+ #: Device supports shared Rx queue among ports within Rx domain and switch domain.<br>
+ RXQ_SHARE = auto()<br>
+ #: Device supports keeping flow rules across restart.<br>
+ FLOW_RULE_KEEP = auto()<br>
+ #: Device supports keeping shared flow objects across restart.<br>
+ FLOW_SHARED_OBJECT_KEEP = auto()<br>
+ #: Device supports multicast address filtering.<br>
+ MCAST_FILTERING = auto()<br>
+ #: Device supports flow ctrl.<br>
+ FLOW_CTRL = auto()<br>
+ #: Device is running on a physical function.<br>
+ PHYSICAL_FUNCTION = auto()<br>
+<br>
+<br>
+def requires_link_topology(<br>
+ link_topology: LinkTopology,<br>
+) -> Callable[[type["TestProtocol"]], type["TestProtocol"]]:<br>
+ """Decorator to set required topology type for a test case or test suite.<br>
+<br>
+ Args:<br>
+ link_topology: The topology type the test suite or case requires.<br>
+<br>
+ Returns:<br>
+ The decorated test case or test suite.<br>
+ """<br>
+ from framework.testbed_model.capability import TopologyCapability<br>
+<br>
+ def add_required_topology(<br>
+ test_case_or_suite: type["TestProtocol"],<br>
+ ) -> type["TestProtocol"]:<br>
+ topology_capability = TopologyCapability.get_unique(link_topology)<br>
+ topology_capability.set_required(test_case_or_suite)<br>
+ return test_case_or_suite<br>
+<br>
+ return add_required_topology<br>
+<br>
+<br>
+def requires_nic_capability(<br>
+ nic_capability: NicCapability,<br>
+) -> Callable[[type["TestProtocol"]], type["TestProtocol"]]:<br>
+ """Decorator to add a single required NIC capability to a test case or test suite.<br>
+<br>
+ Args:<br>
+ nic_capability: The NIC capability that is required by the test case or test suite.<br>
+<br>
+ Returns:<br>
+ The decorated test case or test suite.<br>
+ """<br>
+ from framework.testbed_model.capability import DecoratedNicCapability<br>
+<br>
+ def add_required_capability(<br>
+ test_case_or_suite: type["TestProtocol"],<br>
+ ) -> type["TestProtocol"]:<br>
+ decorated = DecoratedNicCapability.get_unique(nic_capability)<br>
+ decorated.add_to_required(test_case_or_suite)<br>
+ return test_case_or_suite<br>
+<br>
+ return add_required_capability<br>
diff --git a/dts/api/testpmd/__init__.py b/dts/api/testpmd/__init__.py<br>
new file mode 100644<br>
index 0000000000..49cf3742dd<br>
--- /dev/null<br>
+++ b/dts/api/testpmd/__init__.py<br>
@@ -0,0 +1,1294 @@<br>
+# SPDX-License-Identifier: BSD-3-Clause<br>
+# Copyright(c) 2023 University of New Hampshire<br>
+# Copyright(c) 2023 PANTHEON.tech s.r.o.<br>
+# Copyright(c) 2024 Arm Limited<br>
+<br>
+"""Testpmd interactive shell.<br>
+<br>
+Typical usage example in a TestSuite::<br>
+<br>
+ testpmd = TestPmd(self.sut_node)<br>
+ devices = testpmd.get_devices()<br>
+ for device in devices:<br>
+ print(device)<br>
+ testpmd.close()<br>
+"""<br>
+<br>
+import functools<br>
+import re<br>
+import time<br>
+from collections.abc import MutableSet<br>
+from enum import Flag<br>
+from pathlib import PurePath<br>
+from typing import (<br>
+ Any,<br>
+ Callable,<br>
+ ClassVar,<br>
+ Concatenate,<br>
+ ParamSpec,<br>
+ Tuple,<br>
+)<br>
+<br>
+from typing_extensions import Unpack<br>
+<br>
+from api.capabilities import LinkTopology, NicCapability<br>
+from api.testpmd.config import PortTopology, SimpleForwardingModes, TestPmdParams<br>
+from api.testpmd.types import (<br>
+ ChecksumOffloadOptions,<br>
+ DeviceCapabilitiesFlag,<br>
+ FlowRule,<br>
+ RxOffloadCapabilities,<br>
+ RxOffloadCapability,<br>
+ TestPmdDevice,<br>
+ TestPmdPort,<br>
+ TestPmdPortFlowCtrl,<br>
+ TestPmdPortStats,<br>
+ TestPmdQueueInfo,<br>
+ TestPmdRxqInfo,<br>
+ TestPmdVerbosePacket,<br>
+ VLANOffloadFlag,<br>
+)<br>
+from framework.context import get_ctx<br>
+from framework.exception import InteractiveCommandExecutionError, InternalError<br>
+from framework.params.types import TestPmdParamsDict<br>
+from framework.remote_session.dpdk_shell import DPDKShell<br>
+from framework.remote_session.interactive_shell import only_active<br>
+from framework.settings import SETTINGS<br>
+<br>
+P = ParamSpec("P")<br>
+TestPmdMethod = Callable[Concatenate["TestPmd", P], Any]<br>
+<br>
+<br>
+def _requires_stopped_ports(func: TestPmdMethod) -> TestPmdMethod:<br>
+ """Decorator for :class:`TestPmd` commands methods that require stopped ports.<br>
+<br>
+ If the decorated method is called while the ports are started, then these are stopped before<br>
+ continuing.<br>
+<br>
+ Args:<br>
+ func: The :class:`TestPmd` method to decorate.<br>
+ """<br>
+<br>
+ @functools.wraps(func)<br>
+ def _wrapper(self: "TestPmd", *args: P.args, **kwargs: P.kwargs):<br>
+ if self.ports_started:<br>
+ self._logger.debug("Ports need to be stopped to continue.")<br>
+ self.stop_all_ports()<br>
+<br>
+ return func(self, *args, **kwargs)<br>
+<br>
+ return _wrapper<br>
+<br>
+<br>
+def _requires_started_ports(func: TestPmdMethod) -> TestPmdMethod:<br>
+ """Decorator for :class:`TestPmd` commands methods that require started ports.<br>
+<br>
+ If the decorated method is called while the ports are stopped, then these are started before<br>
+ continuing.<br>
+<br>
+ Args:<br>
+ func: The :class:`TestPmd` method to decorate.<br>
+ """<br>
+<br>
+ @functools.wraps(func)<br>
+ def _wrapper(self: "TestPmd", *args: P.args, **kwargs: P.kwargs):<br>
+ if not self.ports_started:<br>
+ self._logger.debug("Ports need to be started to continue.")<br>
+ self.start_all_ports()<br>
+<br>
+ return func(self, *args, **kwargs)<br>
+<br>
+ return _wrapper<br>
+<br>
+<br>
+def _add_remove_mtu(mtu: int = 1500) -> Callable[[TestPmdMethod], TestPmdMethod]:<br>
+ """Configure MTU to `mtu` on all ports, run the decorated function, then revert.<br>
+<br>
+ Args:<br>
+ mtu: The MTU to configure all ports on.<br>
+<br>
+ Returns:<br>
+ The method decorated with setting and reverting MTU.<br>
+ """<br>
+<br>
+ def decorator(func: TestPmdMethod) -> TestPmdMethod:<br>
+ @functools.wraps(func)<br>
+ def wrapper(self: "TestPmd", *args: P.args, **kwargs: P.kwargs):<br>
+ original_mtu = self.ports[0].mtu<br>
+ self.set_port_mtu_all(mtu=mtu, verify=False)<br>
+ retval = func(self, *args, **kwargs)<br>
+ self.set_port_mtu_all(original_mtu if original_mtu else 1500, verify=False)<br>
+ return retval<br>
+<br>
+ return wrapper<br>
+<br>
+ return decorator<br>
+<br>
+<br>
+class TestPmd(DPDKShell):<br>
+ """Testpmd interactive shell.<br>
+<br>
+ The testpmd shell users should never use<br>
+ the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather<br>
+ call specialized methods. If there isn't one that satisfies a need, it should be added.<br>
+<br>
+ Attributes:<br>
+ ports_started: Indicates whether the ports are started.<br>
+ """<br>
+<br>
+ _app_params: TestPmdParams<br>
+ _ports: list[TestPmdPort] | None<br>
+<br>
+ #: The testpmd's prompt.<br>
+ _default_prompt: ClassVar[str] = "testpmd>"<br>
+<br>
+ #: This forces the prompt to appear after sending a command.<br>
+ _command_extra_chars: ClassVar[str] = "\n"<br>
+<br>
+ ports_started: bool<br>
+<br>
+ def __init__(<br>
+ self,<br>
+ name: str | None = None,<br>
+ privileged: bool = True,<br>
+ **app_params: Unpack[TestPmdParamsDict],<br>
+ ) -> None:<br>
+ """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs."""<br>
+ if "port_topology" not in app_params and get_ctx().topology.type is LinkTopology.ONE_LINK:<br>
+ app_params["port_topology"] = PortTopology.loop<br>
+ super().__init__(name, privileged, app_params=TestPmdParams(**app_params))<br>
+ self.ports_started = not self._app_params.disable_device_start<br>
+ self._ports = None<br>
+<br>
+ @property<br>
+ def path(self) -> PurePath:<br>
+ """The path to the testpmd executable."""<br>
+ return PurePath("app/dpdk-testpmd")<br>
+<br>
+ @property<br>
+ def ports(self) -> list[TestPmdPort]:<br>
+ """The ports of the instance.<br>
+<br>
+ This caches the ports returned by :meth:`show_port_info_all`.<br>
+ To force an update of port information, execute :meth:`show_port_info_all` or<br>
+ :meth:`show_port_info`.<br>
+<br>
+ Returns: The list of known testpmd ports.<br>
+ """<br>
+ if self._ports is None:<br>
+ return self.show_port_info_all()<br>
+ return self._ports<br>
+<br>
+ @_requires_started_ports<br>
+ def start(self, verify: bool = True) -> None:<br>
+ """Start packet forwarding with the current configuration.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True` , a second start command will be sent in an attempt to verify<br>
+ packet forwarding started as expected.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to<br>
+ start or ports fail to come up.<br>
+ """<br>
+ self.send_command("start")<br>
+ if verify:<br>
+ # If forwarding was already started, sending "start" again should tell us<br>
+ start_cmd_output = self.send_command("start")<br>
+ if "Packet forwarding already started" not in start_cmd_output:<br>
+ self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")<br>
+ raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")<br>
+<br>
+ def stop(self, verify: bool = True) -> str:<br>
+ """Stop packet forwarding.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True` , the output of the stop command is scanned to verify that<br>
+ forwarding was stopped successfully or not started. If neither is found, it is<br>
+ considered an error.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop<br>
+ forwarding results in an error.<br>
+<br>
+ Returns:<br>
+ Output gathered from the stop command and all other preceding logs in the buffer. This<br>
+ output is most often used to view forwarding statistics that are displayed when this<br>
+ command is sent as well as any verbose packet information that hasn't been consumed<br>
+ prior to calling this method.<br>
+ """<br>
+ stop_cmd_output = self.send_command("stop")<br>
+ if verify:<br>
+ if (<br>
+ "Done." not in stop_cmd_output<br>
+ and "Packet forwarding not started" not in stop_cmd_output<br>
+ ):<br>
+ self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")<br>
+ raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")<br>
+ return stop_cmd_output<br>
+<br>
+ def get_devices(self) -> list[TestPmdDevice]:<br>
+ """Get a list of device names that are known to testpmd.<br>
+<br>
+ Uses the device info listed in testpmd and then parses the output.<br>
+<br>
+ Returns:<br>
+ A list of devices.<br>
+ """<br>
+ dev_info: str = self.send_command("show device info all")<br>
+ dev_list: list[TestPmdDevice] = []<br>
+ for line in dev_info.split("\n"):<br>
+ if "device name:" in line.lower():<br>
+ dev_list.append(TestPmdDevice(line))<br>
+ return dev_list<br>
+<br>
+ def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:<br>
+ """Wait until the link status on the given port is "up".<br>
+<br>
+ Arguments:<br>
+ port_id: Port to check the link status on.<br>
+ timeout: Time to wait for the link to come up. The default value for this<br>
+ argument may be modified using the :option:`--timeout` command-line argument<br>
+ or the :envvar:`DTS_TIMEOUT` environment variable.<br>
+<br>
+ Returns:<br>
+ Whether the link came up in time or not.<br>
+ """<br>
+ time_to_stop = time.time() + timeout<br>
+ port_info: str = ""<br>
+ while time.time() < time_to_stop:<br>
+ port_info = self.send_command(f"show port info {port_id}")<br>
+ if "Link status: up" in port_info:<br>
+ break<br>
+ time.sleep(0.5)<br>
+ else:<br>
+ self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")<br>
+ return "Link status: up" in port_info<br>
+<br>
+ def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True):<br>
+ """Set packet forwarding mode.<br>
+<br>
+ Args:<br>
+ mode: The forwarding mode to use.<br>
+ verify: If :data:`True` the output of the command will be scanned in an attempt to<br>
+ verify that the forwarding mode was set to `mode` properly.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode<br>
+ fails to update.<br>
+ """<br>
+ set_fwd_output = self.send_command(f"set fwd {mode.value}")<br>
+ if verify:<br>
+ if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:<br>
+ self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to set fwd mode to {mode.value}"<br>
+ )<br>
+<br>
+ def stop_all_ports(self, verify: bool = True) -> None:<br>
+ """Stops all the ports.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True`, the output of the command will be checked for a successful<br>
+ execution.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not<br>
+ stopped successfully.<br>
+ """<br>
+ self._logger.debug("Stopping all the ports...")<br>
+ output = self.send_command("port stop all")<br>
+ if verify and not output.strip().endswith("Done"):<br>
+ raise InteractiveCommandExecutionError("Ports were not stopped successfully.")<br>
+<br>
+ self.ports_started = False<br>
+<br>
+ def start_all_ports(self, verify: bool = True) -> None:<br>
+ """Starts all the ports.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True`, the output of the command will be checked for a successful<br>
+ execution.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not<br>
+ started successfully.<br>
+ """<br>
+ self._logger.debug("Starting all the ports...")<br>
+ output = self.send_command("port start all")<br>
+ if verify and not output.strip().endswith("Done"):<br>
+ raise InteractiveCommandExecutionError("Ports were not started successfully.")<br>
+<br>
+ self.ports_started = True<br>
+<br>
+ @_requires_stopped_ports<br>
+ def set_ports_queues(self, number_of: int) -> None:<br>
+ """Sets the number of queues per port.<br>
+<br>
+ Args:<br>
+ number_of: The number of RX/TX queues to create per port.<br>
+<br>
+ Raises:<br>
+ InternalError: If `number_of` is invalid.<br>
+ """<br>
+ if number_of < 1:<br>
+ raise InternalError("The number of queues must be positive and non-zero.")<br>
+<br>
+ self.send_command(f"port config all rxq {number_of}")<br>
+ self.send_command(f"port config all txq {number_of}")<br>
+<br>
+ @_requires_stopped_ports<br>
+ def close_all_ports(self, verify: bool = True) -> None:<br>
+ """Close all ports.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True` the output of the close command will be scanned in an attempt<br>
+ to verify that all ports were stopped successfully. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and at lease one port<br>
+ failed to close.<br>
+ """<br>
+ port_close_output = self.send_command("port close all")<br>
+ if verify:<br>
+ num_ports = len(self.ports)<br>
+ if not all(f"Port {p_id} is closed" in port_close_output for p_id in range(num_ports)):<br>
+ raise InteractiveCommandExecutionError("Ports were not closed successfully.")<br>
+<br>
+ def show_port_info_all(self) -> list[TestPmdPort]:<br>
+ """Returns the information of all the ports.<br>
+<br>
+ Returns:<br>
+ list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.<br>
+ """<br>
+ output = self.send_command("show port info all")<br>
+<br>
+ # Sample output of the "all" command looks like:<br>
+ #<br>
+ # <start><br>
+ #<br>
+ # ********************* Infos for port 0 *********************<br>
+ # Key: value<br>
+ #<br>
+ # ********************* Infos for port 1 *********************<br>
+ # Key: value<br>
+ # <end><br>
+ #<br>
+ # Takes advantage of the double new line in between ports as end delimiter. But we need to<br>
+ # artificially add a new line at the end to pick up the last port. Because commands are<br>
+ # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.<br>
+ # Therefore we also need to take the carriage return into account.<br>
+ iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)<br>
+ self._ports = [TestPmdPort.parse(block.group(0)) for block in iter]<br>
+ return self._ports<br>
+<br>
+ def show_port_info(self, port_id: int) -> TestPmdPort:<br>
+ """Returns the given port information.<br>
+<br>
+ Args:<br>
+ port_id: The port ID to gather information for.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `port_id` is invalid.<br>
+<br>
+ Returns:<br>
+ TestPmdPort: An instance of `TestPmdPort` containing the given port's information.<br>
+ """<br>
+ output = self.send_command(f"show port info {port_id}", skip_first_line=True)<br>
+ if output.startswith("Invalid port"):<br>
+ raise InteractiveCommandExecutionError("invalid port given")<br>
+<br>
+ port = TestPmdPort.parse(output)<br>
+ self._update_port(port)<br>
+ return port<br>
+<br>
+ def _update_port(self, port: TestPmdPort) -> None:<br>
+ if self._ports:<br>
+ self._ports = [<br>
+ existing_port if <a href="http://port.id" rel="noreferrer" target="_blank">port.id</a> != <a href="http://existing_port.id" rel="noreferrer" target="_blank">existing_port.id</a> else port<br>
+ for existing_port in self._ports<br>
+ ]<br>
+<br>
+ def set_mac_addr(self, port_id: int, mac_address: str, add: bool, verify: bool = True) -> None:<br>
+ """Add or remove a mac address on a given port's Allowlist.<br>
+<br>
+ Args:<br>
+ port_id: The port ID the mac address is set on.<br>
+ mac_address: The mac address to be added to or removed from the specified port.<br>
+ add: If :data:`True`, add the specified mac address. If :data:`False`, remove specified<br>
+ mac address.<br>
+ verify: If :data:'True', assert that the 'mac_addr' operation was successful. If<br>
+ :data:'False', run the command and skip this assertion.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If the set mac address operation fails.<br>
+ """<br>
+ mac_cmd = "add" if add else "remove"<br>
+ output = self.send_command(f"mac_addr {mac_cmd} {port_id} {mac_address}")<br>
+ if "Bad arguments" in output:<br>
+ self._logger.debug("Invalid argument provided to mac_addr")<br>
+ raise InteractiveCommandExecutionError("Invalid argument provided")<br>
+<br>
+ if verify:<br>
+ if "mac_addr_cmd error:" in output:<br>
+ self._logger.debug(f"Failed to {mac_cmd} {mac_address} on port {port_id}")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Failed to {mac_cmd} {mac_address} on port {port_id} \n{output}"<br>
+ )<br>
+<br>
+ def set_multicast_mac_addr(<br>
+ self, port_id: int, multi_addr: str, add: bool, verify: bool = True<br>
+ ) -> None:<br>
+ """Add or remove multicast mac address to a specified port's allow list.<br>
+<br>
+ Args:<br>
+ port_id: The port ID the multicast address is set on.<br>
+ multi_addr: The multicast address to be added or removed from the filter.<br>
+ add: If :data:'True', add the specified multicast address to the port filter.<br>
+ If :data:'False', remove the specified multicast address from the port filter.<br>
+ verify: If :data:'True', assert that the 'mcast_addr' operations was successful.<br>
+ If :data:'False', execute the 'mcast_addr' operation and skip the assertion.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If either the 'add' or 'remove' operations fails.<br>
+ """<br>
+ mcast_cmd = "add" if add else "remove"<br>
+ output = self.send_command(f"mcast_addr {mcast_cmd} {port_id} {multi_addr}")<br>
+ if "Bad arguments" in output:<br>
+ self._logger.debug("Invalid arguments provided to mcast_addr")<br>
+ raise InteractiveCommandExecutionError("Invalid argument provided")<br>
+<br>
+ if verify:<br>
+ if (<br>
+ "Invalid multicast_addr" in output<br>
+ or f"multicast address {'already' if add else 'not'} filtered by port" in output<br>
+ ):<br>
+ self._logger.debug(f"Failed to {mcast_cmd} {multi_addr} on port {port_id}")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Failed to {mcast_cmd} {multi_addr} on port {port_id} \n{output}"<br>
+ )<br>
+<br>
+ def show_port_stats_all(self) -> Tuple[list[TestPmdPortStats], str]:<br>
+ """Returns the statistics of all the ports.<br>
+<br>
+ Returns:<br>
+ Tuple[str, list[TestPmdPortStats]]: A tuple where the first element is the stats of all<br>
+ ports as `TestPmdPortStats` and second is the raw testpmd output that was collected<br>
+ from the sent command.<br>
+ """<br>
+ output = self.send_command("show port stats all")<br>
+<br>
+ # Sample output of the "all" command looks like:<br>
+ #<br>
+ # ########### NIC statistics for port 0 ###########<br>
+ # values...<br>
+ # #################################################<br>
+ #<br>
+ # ########### NIC statistics for port 1 ###########<br>
+ # values...<br>
+ # #################################################<br>
+ #<br>
+ iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*\r$", output, re.MULTILINE)<br>
+ return ([TestPmdPortStats.parse(block.group(1)) for block in iter], output)<br>
+<br>
+ def show_port_stats(self, port_id: int) -> TestPmdPortStats:<br>
+ """Returns the given port statistics.<br>
+<br>
+ Args:<br>
+ port_id: The port ID to gather information for.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `port_id` is invalid.<br>
+<br>
+ Returns:<br>
+ TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.<br>
+ """<br>
+ output = self.send_command(f"show port stats {port_id}", skip_first_line=True)<br>
+ if output.startswith("Invalid port"):<br>
+ raise InteractiveCommandExecutionError("invalid port given")<br>
+<br>
+ return TestPmdPortStats.parse(output)<br>
+<br>
+ def set_multicast_all(self, on: bool, verify: bool = True) -> None:<br>
+ """Turns multicast mode on/off for the specified port.<br>
+<br>
+ Args:<br>
+ on: If :data:`True`, turns multicast mode on, otherwise turns off.<br>
+ verify: If :data:`True` an additional command will be sent to verify<br>
+ that multicast mode is properly set. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and multicast<br>
+ mode is not properly set.<br>
+ """<br>
+ multicast_cmd_output = self.send_command(f"set allmulti all {'on' if on else 'off'}")<br>
+ if verify:<br>
+ port_stats = self.show_port_info_all()<br>
+ if on ^ all(stats.is_allmulticast_mode_enabled for stats in port_stats):<br>
+ self._logger.debug(<br>
+ f"Failed to set multicast mode on all ports.: \n{multicast_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ "Testpmd failed to set multicast mode on all ports."<br>
+ )<br>
+<br>
+ @_requires_stopped_ports<br>
+ def csum_set_hw(<br>
+ self, layers: ChecksumOffloadOptions, port_id: int, verify: bool = True<br>
+ ) -> None:<br>
+ """Enables hardware checksum offloading on the specified layer.<br>
+<br>
+ Args:<br>
+ layers: The layer/layers that checksum offloading should be enabled on.<br>
+ port_id: The port number to enable checksum offloading on, should be within 0-32.<br>
+ verify: If :data:`True` the output of the command will be scanned in an attempt to<br>
+ verify that checksum offloading was enabled on the port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If checksum offload is not enabled successfully.<br>
+ """<br>
+ for name, offload in ChecksumOffloadOptions.__members__.items():<br>
+ if offload in layers:<br>
+ name = name.replace("_", "-")<br>
+ csum_output = self.send_command(f"csum set {name} hw {port_id}")<br>
+ if verify:<br>
+ if (<br>
+ "Bad arguments" in csum_output<br>
+ or f"Please stop port {port_id} first" in csum_output<br>
+ or f"checksum offload is not supported by port {port_id}" in csum_output<br>
+ ):<br>
+ self._logger.debug(f"Csum set hw error:\n{csum_output}")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Failed to set csum hw {name} mode on port {port_id}"<br>
+ )<br>
+ success = False<br>
+ if f"{name} checksum offload is hw" in csum_output.lower():<br>
+ success = True<br>
+ if not success and verify:<br>
+ self._logger.debug(<br>
+ f"Failed to set csum hw mode on port {port_id}:\n{csum_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"""Failed to set csum hw mode on port<br>
+ {port_id}:\n{csum_output}"""<br>
+ )<br>
+<br>
+ def flow_create(self, flow_rule: FlowRule, port_id: int) -> int:<br>
+ """Creates a flow rule in the testpmd session.<br>
+<br>
+ This command is implicitly verified as needed to return the created flow rule id.<br>
+<br>
+ Args:<br>
+ flow_rule: :class:`FlowRule` object used for creating testpmd flow rule.<br>
+ port_id: Integer representing the port to use.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If flow rule is invalid.<br>
+<br>
+ Returns:<br>
+ Id of created flow rule.<br>
+ """<br>
+ flow_output = self.send_command(f"flow create {port_id} {flow_rule}")<br>
+ match = re.search(r"#(\d+)", flow_output)<br>
+ if match is not None:<br>
+ match_str = match.group(1)<br>
+ flow_id = int(match_str)<br>
+ return flow_id<br>
+ else:<br>
+ self._logger.debug(f"Failed to create flow rule:\n{flow_output}")<br>
+ raise InteractiveCommandExecutionError(f"Failed to create flow rule:\n{flow_output}")<br>
+<br>
+ def flow_validate(self, flow_rule: FlowRule, port_id: int) -> bool:<br>
+ """Validates a flow rule in the testpmd session.<br>
+<br>
+ Args:<br>
+ flow_rule: :class:`FlowRule` object used for validating testpmd flow rule.<br>
+ port_id: Integer representing the port to use.<br>
+<br>
+ Returns:<br>
+ Boolean representing whether rule is valid or not.<br>
+ """<br>
+ flow_output = self.send_command(f"flow validate {port_id} {flow_rule}")<br>
+ if "Flow rule validated" in flow_output:<br>
+ return True<br>
+ return False<br>
+<br>
+ def flow_delete(self, flow_id: int, port_id: int, verify: bool = True) -> None:<br>
+ """Deletes the specified flow rule from the testpmd session.<br>
+<br>
+ Args:<br>
+ flow_id: ID of the flow to remove.<br>
+ port_id: Integer representing the port to use.<br>
+ verify: If :data:`True`, the output of the command is scanned<br>
+ to ensure the flow rule was deleted successfully.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If flow rule is invalid.<br>
+ """<br>
+ flow_output = self.send_command(f"flow destroy {port_id} rule {flow_id}")<br>
+ if verify:<br>
+ if "destroyed" not in flow_output:<br>
+ self._logger.debug(f"Failed to delete flow rule:\n{flow_output}")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Failed to delete flow rule:\n{flow_output}"<br>
+ )<br>
+<br>
+ @_requires_started_ports<br>
+ @_requires_stopped_ports<br>
+ def set_port_mtu(self, port_id: int, mtu: int, verify: bool = True) -> None:<br>
+ """Change the MTU of a port using testpmd.<br>
+<br>
+ Some PMDs require that the port be stopped before changing the MTU, and it does no harm to<br>
+ stop the port before configuring in cases where it isn't required, so ports are stopped<br>
+ prior to changing their MTU. On the other hand, some PMDs require that the port had already<br>
+ been started once since testpmd startup. Therefore, ports are also started before stopping<br>
+ them to ensure this has happened.<br>
+<br>
+ Args:<br>
+ port_id: ID of the port to adjust the MTU on.<br>
+ mtu: Desired value for the MTU to be set to.<br>
+ verify: If `verify` is :data:`True` then the output will be scanned in an attempt to<br>
+ verify that the mtu was properly set on the port. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not<br>
+ properly updated on the port matching `port_id`.<br>
+ """<br>
+ set_mtu_output = self.send_command(f"port config mtu {port_id} {mtu}")<br>
+ if verify and (f"MTU: {mtu}" not in self.send_command(f"show port info {port_id}")):<br>
+ self._logger.debug(<br>
+ f"Failed to set mtu to {mtu} on port {port_id}. Output was:\n{set_mtu_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to update mtu of port {port_id} to {mtu}"<br>
+ )<br>
+<br>
+ def set_port_mtu_all(self, mtu: int, verify: bool = True) -> None:<br>
+ """Change the MTU of all ports using testpmd.<br>
+<br>
+ Runs :meth:`set_port_mtu` for every port that testpmd is aware of.<br>
+<br>
+ Args:<br>
+ mtu: Desired value for the MTU to be set to.<br>
+ verify: Whether to verify that setting the MTU on each port was successful or not.<br>
+ Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not<br>
+ properly updated on at least one port.<br>
+ """<br>
+ for port in self.ports:<br>
+ self.set_port_mtu(<a href="http://port.id" rel="noreferrer" target="_blank">port.id</a>, mtu, verify)<br>
+<br>
+ @staticmethod<br>
+ def extract_verbose_output(output: str) -> list[TestPmdVerbosePacket]:<br>
+ """Extract the verbose information present in given testpmd output.<br>
+<br>
+ This method extracts sections of verbose output that begin with the line<br>
+ "port X/queue Y: sent/received Z packets" and end with the ol_flags of a packet.<br>
+<br>
+ Args:<br>
+ output: Testpmd output that contains verbose information<br>
+<br>
+ Returns:<br>
+ List of parsed packet information gathered from verbose information in `output`.<br>
+ """<br>
+ out: list[TestPmdVerbosePacket] = []<br>
+ prev_header: str = ""<br>
+ iter = re.finditer(<br>
+ r"(?P<HEADER>(?:port \d+/queue \d+: (?:received|sent) \d+ packets)?)\s*"<br>
+ r"(?P<PACKET>src=[\w\s=:-]+?ol_flags: [\w ]+)",<br>
+ output,<br>
+ )<br>
+ for match in iter:<br>
+ if match.group("HEADER"):<br>
+ prev_header = match.group("HEADER")<br>
+ out.append(TestPmdVerbosePacket.parse(f"{prev_header}\n{match.group('PACKET')}"))<br>
+ return out<br>
+<br>
+ @_requires_stopped_ports<br>
+ def set_vlan_filter(self, port: int, enable: bool, verify: bool = True) -> None:<br>
+ """Set vlan filter on.<br>
+<br>
+ Args:<br>
+ port: The port number to enable VLAN filter on.<br>
+ enable: Enable the filter on `port` if :data:`True`, otherwise disable it.<br>
+ verify: If :data:`True`, the output of the command and show port info<br>
+ is scanned to verify that vlan filtering was set successfully.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the filter<br>
+ fails to update.<br>
+ """<br>
+ filter_cmd_output = self.send_command(f"vlan set filter {'on' if enable else 'off'} {port}")<br>
+ if verify:<br>
+ vlan_settings = self.show_port_info(port_id=port).vlan_offload<br>
+ if enable ^ (vlan_settings is not None and VLANOffloadFlag.FILTER in vlan_settings):<br>
+ self._logger.debug(<br>
+ f"""Failed to {"enable" if enable else "disable"}<br>
+ filter on port {port}: \n{filter_cmd_output}"""<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"""Failed to {"enable" if enable else "disable"}<br>
+ filter on port {port}"""<br>
+ )<br>
+<br>
+ def set_mac_address(self, port: int, mac_address: str, verify: bool = True) -> None:<br>
+ """Set port's MAC address.<br>
+<br>
+ Args:<br>
+ port: The number of the requested port.<br>
+ mac_address: The MAC address to set.<br>
+ verify: If :data:`True`, the output of the command is scanned to verify that<br>
+ the mac address is set in the specified port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the command<br>
+ fails to execute.<br>
+ """<br>
+ output = self.send_command(f"mac_addr set {port} {mac_address}", skip_first_line=True)<br>
+ if verify:<br>
+ if output.strip():<br>
+ self._logger.debug(<br>
+ f"Testpmd failed to set MAC address {mac_address} on port {port}."<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set MAC address {mac_address} on port {port}."<br>
+ )<br>
+<br>
+ def set_flow_control(<br>
+ self, port: int, flow_ctrl: TestPmdPortFlowCtrl, verify: bool = True<br>
+ ) -> None:<br>
+ """Set the given `port`'s flow control.<br>
+<br>
+ Args:<br>
+ port: The number of the requested port.<br>
+ flow_ctrl: The requested flow control parameters.<br>
+ verify: If :data:`True`, the output of the command is scanned to verify that<br>
+ the flow control in the specified port is set.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the command<br>
+ fails to execute.<br>
+ """<br>
+ output = self.send_command(f"set flow_ctrl {flow_ctrl} {port}", skip_first_line=True)<br>
+ if verify:<br>
+ if output.strip():<br>
+ self._logger.debug(f"Testpmd failed to set the {flow_ctrl} in port {port}.")<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set the {flow_ctrl} in port {port}."<br>
+ )<br>
+<br>
+ def show_port_flow_info(self, port: int) -> TestPmdPortFlowCtrl | None:<br>
+ """Show port info flow.<br>
+<br>
+ Args:<br>
+ port: The number of the requested port.<br>
+<br>
+ Returns:<br>
+ The current port flow control parameters if supported, otherwise :data:`None`.<br>
+ """<br>
+ output = self.send_command(f"show port {port} flow_ctrl")<br>
+ if "Flow control infos" in output:<br>
+ return TestPmdPortFlowCtrl.parse(output)<br>
+ return None<br>
+<br>
+ @_requires_stopped_ports<br>
+ def rx_vlan(self, vlan: int, port: int, add: bool, verify: bool = True) -> None:<br>
+ """Add specified vlan tag to the filter list on a port. Requires vlan filter to be on.<br>
+<br>
+ Args:<br>
+ vlan: The vlan tag to add, should be within 1-1005.<br>
+ port: The port number to add the tag on.<br>
+ add: Adds the tag if :data:`True`, otherwise removes the tag.<br>
+ verify: If :data:`True`, the output of the command is scanned to verify that<br>
+ the vlan tag was added to the filter list on the specified port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the tag<br>
+ is not added.<br>
+ """<br>
+ rx_cmd_output = self.send_command(f"rx_vlan {'add' if add else 'rm'} {vlan} {port}")<br>
+ if verify:<br>
+ if (<br>
+ "VLAN-filtering disabled" in rx_cmd_output<br>
+ or "Invalid vlan_id" in rx_cmd_output<br>
+ or "Bad arguments" in rx_cmd_output<br>
+ ):<br>
+ self._logger.debug(<br>
+ f"""Failed to {"add" if add else "remove"} tag {vlan}<br>
+ port {port}: \n{rx_cmd_output}"""<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to {'add' if add else 'remove'} tag {vlan} on port {port}."<br>
+ )<br>
+<br>
+ @_requires_stopped_ports<br>
+ def set_vlan_strip(self, port: int, enable: bool, verify: bool = True) -> None:<br>
+ """Enable or disable vlan stripping on the specified port.<br>
+<br>
+ Args:<br>
+ port: The port number to use.<br>
+ enable: If :data:`True`, will turn vlan stripping on, otherwise will turn off.<br>
+ verify: If :data:`True`, the output of the command and show port info<br>
+ is scanned to verify that vlan stripping was enabled on the specified port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and stripping<br>
+ fails to update.<br>
+ """<br>
+ strip_cmd_output = self.send_command(f"vlan set strip {'on' if enable else 'off'} {port}")<br>
+ if verify:<br>
+ vlan_settings = self.show_port_info(port_id=port).vlan_offload<br>
+ if enable ^ (vlan_settings is not None and VLANOffloadFlag.STRIP in vlan_settings):<br>
+ self._logger.debug(<br>
+ f"""Failed to set strip {"on" if enable else "off"}<br>
+ port {port}: \n{strip_cmd_output}"""<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set strip {'on' if enable else 'off'} port {port}."<br>
+ )<br>
+<br>
+ @_requires_stopped_ports<br>
+ def tx_vlan_set(<br>
+ self, port: int, enable: bool, vlan: int | None = None, verify: bool = True<br>
+ ) -> None:<br>
+ """Set hardware insertion of vlan tags in packets sent on a port.<br>
+<br>
+ Args:<br>
+ port: The port number to use.<br>
+ enable: Sets vlan tag insertion if :data:`True`, and resets if :data:`False`.<br>
+ vlan: The vlan tag to insert if enable is :data:`True`.<br>
+ verify: If :data:`True`, the output of the command is scanned to verify that<br>
+ vlan insertion was enabled on the specified port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the insertion<br>
+ tag is not set.<br>
+ """<br>
+ if enable:<br>
+ tx_vlan_cmd_output = self.send_command(f"tx_vlan set {port} {vlan}")<br>
+ if verify:<br>
+ if (<br>
+ "Please stop port" in tx_vlan_cmd_output<br>
+ or "Invalid vlan_id" in tx_vlan_cmd_output<br>
+ or "Invalid port" in tx_vlan_cmd_output<br>
+ ):<br>
+ self._logger.debug(<br>
+ f"Failed to set vlan tag {vlan} on port {port}:\n{tx_vlan_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set vlan insertion tag {vlan} on port {port}."<br>
+ )<br>
+ else:<br>
+ tx_vlan_cmd_output = self.send_command(f"tx_vlan reset {port}")<br>
+ if verify:<br>
+ if "Please stop port" in tx_vlan_cmd_output or "Invalid port" in tx_vlan_cmd_output:<br>
+ self._logger.debug(<br>
+ f"Failed to reset vlan insertion on port {port}: \n{tx_vlan_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to reset vlan insertion on port {port}."<br>
+ )<br>
+<br>
+ def set_promisc(self, port: int, enable: bool, verify: bool = True) -> None:<br>
+ """Enable or disable promiscuous mode for the specified port.<br>
+<br>
+ Args:<br>
+ port: Port number to use.<br>
+ enable: If :data:`True`, turn promiscuous mode on, otherwise turn off.<br>
+ verify: If :data:`True` an additional command will be sent to verify that<br>
+ promiscuous mode is properly set. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and promiscuous mode<br>
+ is not correctly set.<br>
+ """<br>
+ promisc_cmd_output = self.send_command(f"set promisc {port} {'on' if enable else 'off'}")<br>
+ if verify:<br>
+ stats = self.show_port_info(port_id=port)<br>
+ if enable ^ stats.is_promiscuous_mode_enabled:<br>
+ self._logger.debug(<br>
+ f"Failed to set promiscuous mode on port {port}: \n{promisc_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set promiscuous mode on port {port}."<br>
+ )<br>
+<br>
+ def set_verbose(self, level: int, verify: bool = True) -> None:<br>
+ """Set debug verbosity level.<br>
+<br>
+ Args:<br>
+ level: 0 - silent except for error<br>
+ 1 - fully verbose except for Tx packets<br>
+ 2 - fully verbose except for Rx packets<br>
+ >2 - fully verbose<br>
+ verify: If :data:`True` the command output will be scanned to verify that verbose level<br>
+ is properly set. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and verbose level<br>
+ is not correctly set.<br>
+ """<br>
+ verbose_cmd_output = self.send_command(f"set verbose {level}")<br>
+ if verify:<br>
+ if "Change verbose level" not in verbose_cmd_output:<br>
+ self._logger.debug(<br>
+ f"Failed to set verbose level to {level}: \n{verbose_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Testpmd failed to set verbose level to {level}."<br>
+ )<br>
+<br>
+ def rx_vxlan(self, vxlan_id: int, port_id: int, enable: bool, verify: bool = True) -> None:<br>
+ """Add or remove vxlan id to/from filter list.<br>
+<br>
+ Args:<br>
+ vxlan_id: VXLAN ID to add to port filter list.<br>
+ port_id: ID of the port to modify VXLAN filter of.<br>
+ enable: If :data:`True`, adds specified VXLAN ID, otherwise removes it.<br>
+ verify: If :data:`True`, the output of the command is checked to verify<br>
+ the VXLAN ID was successfully added/removed from the port.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and VXLAN ID<br>
+ is not successfully added or removed.<br>
+ """<br>
+ action = "add" if enable else "rm"<br>
+ vxlan_output = self.send_command(f"rx_vxlan_port {action} {vxlan_id} {port_id}")<br>
+ if verify:<br>
+ if "udp tunneling add error" in vxlan_output:<br>
+ self._logger.debug(f"Failed to set VXLAN:\n{vxlan_output}")<br>
+ raise InteractiveCommandExecutionError(f"Failed to set VXLAN:\n{vxlan_output}")<br>
+<br>
+ def clear_port_stats(self, port_id: int, verify: bool = True) -> None:<br>
+ """Clear statistics of a given port.<br>
+<br>
+ Args:<br>
+ port_id: ID of the port to clear the statistics on.<br>
+ verify: If :data:`True` the output of the command will be scanned to verify that it was<br>
+ successful, otherwise failures will be ignored. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and testpmd fails to<br>
+ clear the statistics of the given port.<br>
+ """<br>
+ clear_output = self.send_command(f"clear port stats {port_id}")<br>
+ if verify and f"NIC statistics for port {port_id} cleared" not in clear_output:<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to set clear forwarding stats on port {port_id}"<br>
+ )<br>
+<br>
+ def clear_port_stats_all(self, verify: bool = True) -> None:<br>
+ """Clear the statistics of all ports that testpmd is aware of.<br>
+<br>
+ Args:<br>
+ verify: If :data:`True` the output of the command will be scanned to verify that all<br>
+ ports had their statistics cleared, otherwise failures will be ignored. Defaults to<br>
+ :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and testpmd fails to<br>
+ clear the statistics of any of its ports.<br>
+ """<br>
+ clear_output = self.send_command("clear port stats all")<br>
+ if verify:<br>
+ if type(self._app_params.port_numa_config) is list:<br>
+ for port_id in range(len(self._app_params.port_numa_config)):<br>
+ if f"NIC statistics for port {port_id} cleared" not in clear_output:<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to set clear forwarding stats on port {port_id}"<br>
+ )<br>
+<br>
+ @only_active<br>
+ def close(self) -> None:<br>
+ """Overrides :meth:`~.interactive_shell.close`."""<br>
+ self.stop()<br>
+ self.send_command("quit", "Bye...")<br>
+ return super().close()<br>
+<br>
+ """<br>
+ ====== Capability retrieval methods ======<br>
+ """<br>
+<br>
+ def get_capabilities_rx_offload(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Get all rx offload capabilities and divide them into supported and unsupported.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ self._logger.debug("Getting rx offload capabilities.")<br>
+ command = f"show port {self.ports[0].id} rx_offload capabilities"<br>
+ rx_offload_capabilities_out = self.send_command(command)<br>
+ rx_offload_capabilities = RxOffloadCapabilities.parse(rx_offload_capabilities_out)<br>
+ self._update_capabilities_from_flag(<br>
+ supported_capabilities,<br>
+ unsupported_capabilities,<br>
+ RxOffloadCapability,<br>
+ rx_offload_capabilities.per_port | rx_offload_capabilities.per_queue,<br>
+ )<br>
+<br>
+ def get_port_queue_info(<br>
+ self, port_id: int, queue_id: int, is_rx_queue: bool<br>
+ ) -> TestPmdQueueInfo:<br>
+ """Returns the current state of the specified queue."""<br>
+ command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"<br>
+ queue_info = TestPmdQueueInfo.parse(self.send_command(command))<br>
+ return queue_info<br>
+<br>
+ def setup_port_queue(self, port_id: int, queue_id: int, is_rx_queue: bool) -> None:<br>
+ """Setup a given queue on a port.<br>
+<br>
+ This functionality cannot be verified because the setup action only takes effect when the<br>
+ queue is started.<br>
+<br>
+ Args:<br>
+ port_id: ID of the port where the queue resides.<br>
+ queue_id: ID of the queue to setup.<br>
+ is_rx_queue: Type of queue to setup. If :data:`True` an RX queue will be setup,<br>
+ otherwise a TX queue will be setup.<br>
+ """<br>
+ self.send_command(f"port {port_id} {'rxq' if is_rx_queue else 'txq'} {queue_id} setup")<br>
+<br>
+ def stop_port_queue(<br>
+ self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True<br>
+ ) -> None:<br>
+ """Stops a given queue on a port.<br>
+<br>
+ Args:<br>
+ port_id: ID of the port that the queue belongs to.<br>
+ queue_id: ID of the queue to stop.<br>
+ is_rx_queue: Type of queue to stop. If :data:`True` an RX queue will be stopped,<br>
+ otherwise a TX queue will be stopped.<br>
+ verify: If :data:`True` an additional command will be sent to verify the queue stopped.<br>
+ Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to<br>
+ stop.<br>
+ """<br>
+ port_type = "rxq" if is_rx_queue else "txq"<br>
+ stop_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} stop")<br>
+ if verify:<br>
+ queue_started = self.get_port_queue_info(<br>
+ port_id, queue_id, is_rx_queue<br>
+ ).is_queue_started<br>
+ if queue_started:<br>
+ self._logger.debug(<br>
+ f"Failed to stop {port_type} {queue_id} on port {port_id}:\n{stop_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to stop {port_type} {queue_id} on port {port_id}"<br>
+ )<br>
+<br>
+ def start_port_queue(<br>
+ self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True<br>
+ ) -> None:<br>
+ """Starts a given queue on a port.<br>
+<br>
+ First sets up the port queue, then starts it.<br>
+<br>
+ Args:<br>
+ port_id: ID of the port that the queue belongs to.<br>
+ queue_id: ID of the queue to start.<br>
+ is_rx_queue: Type of queue to start. If :data:`True` an RX queue will be started,<br>
+ otherwise a TX queue will be started.<br>
+ verify: if :data:`True` an additional command will be sent to verify that the queue was<br>
+ started. Defaults to :data:`True`.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to<br>
+ start.<br>
+ """<br>
+ port_type = "rxq" if is_rx_queue else "txq"<br>
+ self.setup_port_queue(port_id, queue_id, is_rx_queue)<br>
+ start_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} start")<br>
+ if verify:<br>
+ queue_started = self.get_port_queue_info(<br>
+ port_id, queue_id, is_rx_queue<br>
+ ).is_queue_started<br>
+ if not queue_started:<br>
+ self._logger.debug(<br>
+ f"Failed to start {port_type} {queue_id} on port {port_id}:\n{start_cmd_output}"<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Test pmd failed to start {port_type} {queue_id} on port {port_id}"<br>
+ )<br>
+<br>
+ def get_queue_ring_size(self, port_id: int, queue_id: int, is_rx_queue: bool) -> int:<br>
+ """Returns the current size of the ring on the specified queue."""<br>
+ command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"<br>
+ queue_info = TestPmdQueueInfo.parse(self.send_command(command))<br>
+ return queue_info.ring_size<br>
+<br>
+ def set_queue_ring_size(<br>
+ self,<br>
+ port_id: int,<br>
+ queue_id: int,<br>
+ size: int,<br>
+ is_rx_queue: bool,<br>
+ verify: bool = True,<br>
+ ) -> None:<br>
+ """Update the ring size of an Rx/Tx queue on a given port.<br>
+<br>
+ Queue is setup after setting the ring size so that the queue info reflects this change and<br>
+ it can be verified.<br>
+<br>
+ Args:<br>
+ port_id: The port that the queue resides on.<br>
+ queue_id: The ID of the queue on the port.<br>
+ size: The size to update the ring size to.<br>
+ is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be<br>
+ updated, otherwise a TX queue will be updated.<br>
+ verify: If :data:`True` an additional command will be sent to check the ring size of<br>
+ the queue in an attempt to validate that the size was changes properly.<br>
+<br>
+ Raises:<br>
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and there is a failure<br>
+ when updating ring size.<br>
+ """<br>
+ queue_type = "rxq" if is_rx_queue else "txq"<br>
+ self.send_command(f"port config {port_id} {queue_type} {queue_id} ring_size {size}")<br>
+ self.setup_port_queue(port_id, queue_id, is_rx_queue)<br>
+ if verify:<br>
+ curr_ring_size = self.get_queue_ring_size(port_id, queue_id, is_rx_queue)<br>
+ if curr_ring_size != size:<br>
+ self._logger.debug(<br>
+ f"Failed up update ring size of queue {queue_id} on port {port_id}. Current"<br>
+ f" ring size is {curr_ring_size}."<br>
+ )<br>
+ raise InteractiveCommandExecutionError(<br>
+ f"Failed to update ring size of queue {queue_id} on port {port_id}"<br>
+ )<br>
+<br>
+ @_requires_stopped_ports<br>
+ def set_queue_deferred_start(<br>
+ self, port_id: int, queue_id: int, is_rx_queue: bool, on: bool<br>
+ ) -> None:<br>
+ """Set the deferred start attribute of the specified queue on/off.<br>
+<br>
+ Args:<br>
+ port_id: The port that the queue resides on.<br>
+ queue_id: The ID of the queue on the port.<br>
+ is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be<br>
+ updated, otherwise a TX queue will be updated.<br>
+ on: Whether to set deferred start mode on or off. If :data:`True` deferred start will<br>
+ be turned on, otherwise it will be turned off.<br>
+ """<br>
+ queue_type = "rxq" if is_rx_queue else "txq"<br>
+ action = "on" if on else "off"<br>
+ self.send_command(f"port {port_id} {queue_type} {queue_id} deferred_start {action}")<br>
+<br>
+ def _update_capabilities_from_flag(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ flag_class: type[Flag],<br>
+ supported_flags: Flag,<br>
+ ) -> None:<br>
+ """Divide all flags from `flag_class` into supported and unsupported."""<br>
+ for flag in flag_class:<br>
+ if flag in supported_flags:<br>
+ supported_capabilities.add(NicCapability[str(<a href="http://flag.name" rel="noreferrer" target="_blank">flag.name</a>)])<br>
+ else:<br>
+ unsupported_capabilities.add(NicCapability[str(<a href="http://flag.name" rel="noreferrer" target="_blank">flag.name</a>)])<br>
+<br>
+ @_requires_started_ports<br>
+ def get_capabilities_rxq_info(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Get all rxq capabilities and divide them into supported and unsupported.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ self._logger.debug("Getting rxq capabilities.")<br>
+ command = f"show rxq info {self.ports[0].id} 0"<br>
+ rxq_info = TestPmdRxqInfo.parse(self.send_command(command))<br>
+ if rxq_info.scattered_packets:<br>
+ supported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)<br>
+ else:<br>
+ unsupported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)<br>
+<br>
+ def get_capabilities_show_port_info(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Get all capabilities from show port info and divide them into supported and unsupported.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ self._update_capabilities_from_flag(<br>
+ supported_capabilities,<br>
+ unsupported_capabilities,<br>
+ DeviceCapabilitiesFlag,<br>
+ self.ports[0].device_capabilities,<br>
+ )<br>
+<br>
+ def get_capabilities_mcast_filtering(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Get multicast filtering capability from mcast_addr add and check for testpmd error code.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ self._logger.debug("Getting mcast filter capabilities.")<br>
+ command = f"mcast_addr add {self.ports[0].id} 01:00:5E:00:00:00"<br>
+ output = self.send_command(command)<br>
+ if "diag=-95" in output:<br>
+ unsupported_capabilities.add(NicCapability.MCAST_FILTERING)<br>
+ else:<br>
+ supported_capabilities.add(NicCapability.MCAST_FILTERING)<br>
+ command = str.replace(command, "add", "remove", 1)<br>
+ self.send_command(command)<br>
+<br>
+ def get_capabilities_flow_ctrl(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Get flow control capability and check for testpmd failure.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ self._logger.debug("Getting flow ctrl capabilities.")<br>
+ command = f"show port {self.ports[0].id} flow_ctrl"<br>
+ output = self.send_command(command)<br>
+ if "Flow control infos" in output:<br>
+ supported_capabilities.add(NicCapability.FLOW_CTRL)<br>
+ else:<br>
+ unsupported_capabilities.add(NicCapability.FLOW_CTRL)<br>
+<br>
+ def get_capabilities_physical_function(<br>
+ self,<br>
+ supported_capabilities: MutableSet["NicCapability"],<br>
+ unsupported_capabilities: MutableSet["NicCapability"],<br>
+ ) -> None:<br>
+ """Store capability representing a physical function test run.<br>
+<br>
+ Args:<br>
+ supported_capabilities: Supported capabilities will be added to this set.<br>
+ unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
+ """<br>
+ ctx = get_ctx()<br>
+ if ctx.topology.vf_ports == []:<br>
+ supported_capabilities.add(NicCapability.PHYSICAL_FUNCTION)<br>
+ else:<br>
+ unsupported_capabilities.add(NicCapability.PHYSICAL_FUNCTION)<br>
diff --git a/dts/framework/params/testpmd.py b/dts/api/testpmd/config.py<br>
similarity index 98%<br>
rename from dts/framework/params/testpmd.py<br>
rename to dts/api/testpmd/config.py<br>
index 1913bd0fa2..e71a3e1ef0 100644<br>
--- a/dts/framework/params/testpmd.py<br>
+++ b/dts/api/testpmd/config.py<br>
@@ -1,7 +1,12 @@<br>
# SPDX-License-Identifier: BSD-3-Clause<br>
# Copyright(c) 2024 Arm Limited<br>
<br>
-"""Module containing all the TestPmd-related parameter classes."""<br>
+"""Module containing all classes needed to configure :class:`TestPmd`.<br>
+<br>
+This module defines the :class:`TestPmdParams` class which is used to configure the<br>
+TestPmd shell. It also includes various data classes and enums that are used<br>
+to represent different configurations and settings.<br>
+"""<br>
<br>
from dataclasses import dataclass, field<br>
from enum import EnumMeta, Flag, auto, unique<br>
@@ -146,7 +151,7 @@ class RSSSetting(EnumMeta):<br>
<br>
<br>
class SimpleForwardingModes(StrEnum):<br>
- r"""The supported packet forwarding modes for :class:`~TestPmdShell`\s."""<br>
+ r"""The supported packet forwarding modes for :class:`~TestPmd`\s."""<br>
<br>
#:<br>
io = auto()<br>
diff --git a/dts/api/testpmd/types.py b/dts/api/testpmd/types.py<br>
new file mode 100644<br>
index 0000000000..553438c904<br>
--- /dev/null<br>
+++ b/dts/api/testpmd/types.py<br>
@@ -0,0 +1,1406 @@<br>
+# SPDX-License-Identifier: BSD-3-Clause<br>
+# Copyright(c) 2023 University of New Hampshire<br>
+# Copyright(c) 2023 PANTHEON.tech s.r.o.<br>
+# Copyright(c) 2024 Arm Limited<br>
+<br>
+"""TestPmd types module.<br>
+<br>
+Exposes types used in the TestPmd API.<br>
+"""<br>
+<br>
+import re<br>
+from dataclasses import dataclass, field<br>
+from enum import Flag, auto<br>
+from typing import Literal<br>
+<br>
+from typing_extensions import Self<br>
+<br>
+from framework.parser import ParserFn, TextParser<br>
+from framework.utils import REGEX_FOR_MAC_ADDRESS, StrEnum<br>
+<br>
+<br>
+class TestPmdDevice:<br>
+ """The data of a device that testpmd can recognize.<br>
+<br>
+ Attributes:<br>
+ pci_address: The PCI address of the device.<br>
+ """<br>
+<br>
+ pci_address: str<br>
+<br>
+ def __init__(self, pci_address_line: str):<br>
+ """Initialize the device from the testpmd output line string.<br>
+<br>
+ Args:<br>
+ pci_address_line: A line of testpmd output that contains a device.<br>
+ """<br>
+ self.pci_address = pci_address_line.strip().split(": ")[1].strip()<br>
+<br>
+ def __str__(self) -> str:<br>
+ """The PCI address captures what the device is."""<br>
+ return self.pci_address<br>
+<br>
+<br>
+class VLANOffloadFlag(Flag):<br>
+ """Flag representing the VLAN offload settings of a NIC port."""<br>
+<br>
+ #:<br>
+ STRIP = auto()<br>
+ #:<br>
+ FILTER = auto()<br>
+ #:<br>
+ EXTEND = auto()<br>
+ #:<br>
+ QINQ_STRIP = auto()<br>
+<br>
+ @classmethod<br>
+ def from_str_dict(cls, d):<br>
+ """Makes an instance from a dict containing the flag member names with an "on" value.<br>
+<br>
+ Args:<br>
+ d: A dictionary containing the flag members as keys and any string value.<br>
+<br>
+ Returns:<br>
+ A new instance of the flag.<br>
+ """<br>
+ flag = cls(0)<br>
+ for name in cls.__members__:<br>
+ if d.get(name) == "on":<br>
+ flag |= cls[name]<br>
+ return flag<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ return TextParser.wrap(<br>
+ TextParser.find(<br>
+ r"VLAN offload:\s+"<br>
+ r"strip (?P<STRIP>on|off), "<br>
+ r"filter (?P<FILTER>on|off), "<br>
+ r"extend (?P<EXTEND>on|off), "<br>
+ r"qinq strip (?P<QINQ_STRIP>on|off)",<br>
+ re.MULTILINE,<br>
+ named=True,<br>
+ ),<br>
+ cls.from_str_dict,<br>
+ )<br>
+<br>
+<br>
+class ChecksumOffloadOptions(Flag):<br>
+ """Flag representing checksum hardware offload layer options."""<br>
+<br>
+ #:<br>
+ ip = auto()<br>
+ #:<br>
+ udp = auto()<br>
+ #:<br>
+ tcp = auto()<br>
+ #:<br>
+ sctp = auto()<br>
+ #:<br>
+ outer_ip = auto()<br>
+ #:<br>
+ outer_udp = auto()<br>
+<br>
+<br>
+class RSSOffloadTypesFlag(Flag):<br>
+ """Flag representing the RSS offload flow types supported by the NIC port."""<br>
+<br>
+ #:<br>
+ ipv4 = auto()<br>
+ #:<br>
+ ipv4_frag = auto()<br>
+ #:<br>
+ ipv4_tcp = auto()<br>
+ #:<br>
+ ipv4_udp = auto()<br>
+ #:<br>
+ ipv4_sctp = auto()<br>
+ #:<br>
+ ipv4_other = auto()<br>
+ #:<br>
+ ipv6 = auto()<br>
+ #:<br>
+ ipv6_frag = auto()<br>
+ #:<br>
+ ipv6_tcp = auto()<br>
+ #:<br>
+ ipv6_udp = auto()<br>
+ #:<br>
+ ipv6_sctp = auto()<br>
+ #:<br>
+ ipv6_other = auto()<br>
+ #:<br>
+ l2_payload = auto()<br>
+ #:<br>
+ ipv6_ex = auto()<br>
+ #:<br>
+ ipv6_tcp_ex = auto()<br>
+ #:<br>
+ ipv6_udp_ex = auto()<br>
+ #:<br>
+ port = auto()<br>
+ #:<br>
+ vxlan = auto()<br>
+ #:<br>
+ geneve = auto()<br>
+ #:<br>
+ nvgre = auto()<br>
+ #:<br>
+ user_defined_22 = auto()<br>
+ #:<br>
+ gtpu = auto()<br>
+ #:<br>
+ eth = auto()<br>
+ #:<br>
+ s_vlan = auto()<br>
+ #:<br>
+ c_vlan = auto()<br>
+ #:<br>
+ esp = auto()<br>
+ #:<br>
+ ah = auto()<br>
+ #:<br>
+ l2tpv3 = auto()<br>
+ #:<br>
+ pfcp = auto()<br>
+ #:<br>
+ pppoe = auto()<br>
+ #:<br>
+ ecpri = auto()<br>
+ #:<br>
+ mpls = auto()<br>
+ #:<br>
+ ipv4_chksum = auto()<br>
+ #:<br>
+ l4_chksum = auto()<br>
+ #:<br>
+ l2tpv2 = auto()<br>
+ #:<br>
+ ipv6_flow_label = auto()<br>
+ #:<br>
+ user_defined_38 = auto()<br>
+ #:<br>
+ user_defined_39 = auto()<br>
+ #:<br>
+ user_defined_40 = auto()<br>
+ #:<br>
+ user_defined_41 = auto()<br>
+ #:<br>
+ user_defined_42 = auto()<br>
+ #:<br>
+ user_defined_43 = auto()<br>
+ #:<br>
+ user_defined_44 = auto()<br>
+ #:<br>
+ user_defined_45 = auto()<br>
+ #:<br>
+ user_defined_46 = auto()<br>
+ #:<br>
+ user_defined_47 = auto()<br>
+ #:<br>
+ user_defined_48 = auto()<br>
+ #:<br>
+ user_defined_49 = auto()<br>
+ #:<br>
+ user_defined_50 = auto()<br>
+ #:<br>
+ user_defined_51 = auto()<br>
+ #:<br>
+ l3_pre96 = auto()<br>
+ #:<br>
+ l3_pre64 = auto()<br>
+ #:<br>
+ l3_pre56 = auto()<br>
+ #:<br>
+ l3_pre48 = auto()<br>
+ #:<br>
+ l3_pre40 = auto()<br>
+ #:<br>
+ l3_pre32 = auto()<br>
+ #:<br>
+ l2_dst_only = auto()<br>
+ #:<br>
+ l2_src_only = auto()<br>
+ #:<br>
+ l4_dst_only = auto()<br>
+ #:<br>
+ l4_src_only = auto()<br>
+ #:<br>
+ l3_dst_only = auto()<br>
+ #:<br>
+ l3_src_only = auto()<br>
+<br>
+ #:<br>
+ ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex<br>
+ #:<br>
+ udp = ipv4_udp | ipv6_udp | ipv6_udp_ex<br>
+ #:<br>
+ tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex<br>
+ #:<br>
+ sctp = ipv4_sctp | ipv6_sctp<br>
+ #:<br>
+ tunnel = vxlan | geneve | nvgre<br>
+ #:<br>
+ vlan = s_vlan | c_vlan<br>
+ #:<br>
+ all = (<br>
+ eth<br>
+ | vlan<br>
+ | ip<br>
+ | tcp<br>
+ | udp<br>
+ | sctp<br>
+ | l2_payload<br>
+ | l2tpv3<br>
+ | esp<br>
+ | ah<br>
+ | pfcp<br>
+ | gtpu<br>
+ | ecpri<br>
+ | mpls<br>
+ | l2tpv2<br>
+ )<br>
+<br>
+ @classmethod<br>
+ def from_list_string(cls, names: str) -> Self:<br>
+ """Makes a flag from a whitespace-separated list of names.<br>
+<br>
+ Args:<br>
+ names: a whitespace-separated list containing the members of this flag.<br>
+<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ flag = cls(0)<br>
+ for name in names.split():<br>
+ flag |= cls.from_str(name)<br>
+ return flag<br>
+<br>
+ @classmethod<br>
+ def from_str(cls, name: str) -> Self:<br>
+ """Makes a flag matching the supplied name.<br>
+<br>
+ Args:<br>
+ name: a valid member of this flag in text<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ member_name = name.strip().replace("-", "_")<br>
+ return cls[member_name]<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ return TextParser.wrap(<br>
+ TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE),<br>
+ RSSOffloadTypesFlag.from_list_string,<br>
+ )<br>
+<br>
+<br>
+class DeviceCapabilitiesFlag(Flag):<br>
+ """Flag representing the device capabilities."""<br>
+<br>
+ #: Device supports Rx queue setup after device started.<br>
+ RUNTIME_RX_QUEUE_SETUP = auto()<br>
+ #: Device supports Tx queue setup after device started.<br>
+ RUNTIME_TX_QUEUE_SETUP = auto()<br>
+ #: Device supports shared Rx queue among ports within Rx domain and switch domain.<br>
+ RXQ_SHARE = auto()<br>
+ #: Device supports keeping flow rules across restart.<br>
+ FLOW_RULE_KEEP = auto()<br>
+ #: Device supports keeping shared flow objects across restart.<br>
+ FLOW_SHARED_OBJECT_KEEP = auto()<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ return TextParser.wrap(<br>
+ TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),<br>
+ cls,<br>
+ )<br>
+<br>
+<br>
+class DeviceErrorHandlingMode(StrEnum):<br>
+ """Enum representing the device error handling mode."""<br>
+<br>
+ #:<br>
+ none = auto()<br>
+ #:<br>
+ passive = auto()<br>
+ #:<br>
+ proactive = auto()<br>
+ #:<br>
+ unknown = auto()<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this enum from text.<br>
+ """<br>
+ return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)<br>
+<br>
+<br>
+class RxQueueState(StrEnum):<br>
+ """RX queue states.<br>
+<br>
+ References:<br>
+ DPDK lib: ``lib/ethdev/rte_ethdev.h``<br>
+ testpmd display function: ``app/test-pmd/config.c:get_queue_state_name()``<br>
+ """<br>
+<br>
+ #:<br>
+ stopped = auto()<br>
+ #:<br>
+ started = auto()<br>
+ #:<br>
+ hairpin = auto()<br>
+ #:<br>
+ unknown = auto()<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this enum from text.<br>
+ """<br>
+ return TextParser.wrap(TextParser.find(r"Rx queue state: ([^\r\n]+)"), cls)<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdQueueInfo(TextParser):<br>
+ """Dataclass representation of the common parts of the testpmd `show rxq/txq info` commands."""<br>
+<br>
+ #:<br>
+ prefetch_threshold: int = field(metadata=TextParser.find_int(r"prefetch threshold: (\d+)"))<br>
+ #:<br>
+ host_threshold: int = field(metadata=TextParser.find_int(r"host threshold: (\d+)"))<br>
+ #:<br>
+ writeback_threshold: int = field(metadata=TextParser.find_int(r"writeback threshold: (\d+)"))<br>
+ #:<br>
+ free_threshold: int = field(metadata=TextParser.find_int(r"free threshold: (\d+)"))<br>
+ #:<br>
+ deferred_start: bool = field(metadata=TextParser.find("deferred start: on"))<br>
+ #: The number of RXD/TXDs is just the ring size of the queue.<br>
+ ring_size: int = field(metadata=TextParser.find_int(r"Number of (?:RXDs|TXDs): (\d+)"))<br>
+ #:<br>
+ is_queue_started: bool = field(metadata=TextParser.find("queue state: started"))<br>
+ #:<br>
+ burst_mode: str | None = field(<br>
+ default=None, metadata=TextParser.find(r"Burst mode: ([^\r\n]+)")<br>
+ )<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdTxqInfo(TestPmdQueueInfo):<br>
+ """Representation of testpmd's ``show txq info <port_id> <queue_id>`` command.<br>
+<br>
+ References:<br>
+ testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``<br>
+ testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``<br>
+ """<br>
+<br>
+ #: Ring size threshold<br>
+ rs_threshold: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"TX RS threshold: (\d+)\b")<br>
+ )<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdRxqInfo(TestPmdQueueInfo):<br>
+ """Representation of testpmd's ``show rxq info <port_id> <queue_id>`` command.<br>
+<br>
+ References:<br>
+ testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``<br>
+ testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``<br>
+ """<br>
+<br>
+ #: Mempool used by that queue<br>
+ mempool: str | None = field(default=None, metadata=TextParser.find(r"Mempool: ([^\r\n]+)"))<br>
+ #: Drop packets if no descriptors are available<br>
+ drop_packets: bool | None = field(<br>
+ default=None, metadata=TextParser.find(r"RX drop packets: on")<br>
+ )<br>
+ #: Scattered packets Rx enabled<br>
+ scattered_packets: bool | None = field(<br>
+ default=None, metadata=TextParser.find(r"RX scattered packets: on")<br>
+ )<br>
+ #: The state of the queue<br>
+ queue_state: str | None = field(default=None, metadata=RxQueueState.make_parser())<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdPort(TextParser):<br>
+ """Dataclass representing the result of testpmd's ``show port info`` command."""<br>
+<br>
+ @staticmethod<br>
+ def _make_device_private_info_parser() -> ParserFn:<br>
+ """Device private information parser.<br>
+<br>
+ Ensures that we are not parsing invalid device private info output.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser<br>
+ function that parses the device private info from the TestPmd port info output.<br>
+ """<br>
+<br>
+ def _validate(info: str):<br>
+ info = info.strip()<br>
+ if (<br>
+ info == "none"<br>
+ or info.startswith("Invalid file")<br>
+ or info.startswith("Failed to dump")<br>
+ ):<br>
+ return None<br>
+ return info<br>
+<br>
+ return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)<br>
+<br>
+ #:<br>
+ id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))<br>
+ #:<br>
+ device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))<br>
+ #:<br>
+ driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))<br>
+ #:<br>
+ socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))<br>
+ #:<br>
+ is_link_up: bool = field(metadata=TextParser.find("Link status: up"))<br>
+ #:<br>
+ link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))<br>
+ #:<br>
+ is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))<br>
+ #:<br>
+ is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))<br>
+ #:<br>
+ is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))<br>
+ #:<br>
+ is_allmulticast_mode_enabled: bool = field(<br>
+ metadata=TextParser.find("Allmulticast mode: enabled")<br>
+ )<br>
+ #: Maximum number of MAC addresses<br>
+ max_mac_addresses_num: int = field(<br>
+ metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")<br>
+ )<br>
+ #: Maximum configurable length of RX packet<br>
+ max_hash_mac_addresses_num: int = field(<br>
+ metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")<br>
+ )<br>
+ #: Minimum size of RX buffer<br>
+ min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))<br>
+ #: Maximum configurable length of RX packet<br>
+ max_rx_packet_length: int = field(<br>
+ metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")<br>
+ )<br>
+ #: Maximum configurable size of LRO aggregated packet<br>
+ max_lro_packet_size: int = field(<br>
+ metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")<br>
+ )<br>
+<br>
+ #: Current number of RX queues<br>
+ rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))<br>
+ #: Max possible RX queues<br>
+ max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))<br>
+ #: Max possible number of RXDs per queue<br>
+ max_queue_rxd_num: int = field(<br>
+ metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")<br>
+ )<br>
+ #: Min possible number of RXDs per queue<br>
+ min_queue_rxd_num: int = field(<br>
+ metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")<br>
+ )<br>
+ #: RXDs number alignment<br>
+ rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))<br>
+<br>
+ #: Current number of TX queues<br>
+ tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))<br>
+ #: Max possible TX queues<br>
+ max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))<br>
+ #: Max possible number of TXDs per queue<br>
+ max_queue_txd_num: int = field(<br>
+ metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")<br>
+ )<br>
+ #: Min possible number of TXDs per queue<br>
+ min_queue_txd_num: int = field(<br>
+ metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")<br>
+ )<br>
+ #: TXDs number alignment<br>
+ txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))<br>
+ #: Max segment number per packet<br>
+ max_packet_segment_num: int = field(<br>
+ metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")<br>
+ )<br>
+ #: Max segment number per MTU/TSO<br>
+ max_mtu_segment_num: int = field(<br>
+ metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")<br>
+ )<br>
+<br>
+ #:<br>
+ device_capabilities: DeviceCapabilitiesFlag = field(<br>
+ metadata=DeviceCapabilitiesFlag.make_parser(),<br>
+ )<br>
+ #:<br>
+ device_error_handling_mode: DeviceErrorHandlingMode | None = field(<br>
+ default=None, metadata=DeviceErrorHandlingMode.make_parser()<br>
+ )<br>
+ #:<br>
+ device_private_info: str | None = field(<br>
+ default=None,<br>
+ metadata=_make_device_private_info_parser(),<br>
+ )<br>
+<br>
+ #:<br>
+ hash_key_size: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")<br>
+ )<br>
+ #:<br>
+ redirection_table_size: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")<br>
+ )<br>
+ #:<br>
+ supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(<br>
+ default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()<br>
+ )<br>
+<br>
+ #:<br>
+ mac_address: str | None = field(<br>
+ default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")<br>
+ )<br>
+ #:<br>
+ fw_version: str | None = field(<br>
+ default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")<br>
+ )<br>
+ #:<br>
+ dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))<br>
+ #: Socket id of the memory allocation<br>
+ mem_alloc_socket_id: int | None = field(<br>
+ default=None,<br>
+ metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),<br>
+ )<br>
+ #:<br>
+ mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))<br>
+<br>
+ #:<br>
+ vlan_offload: VLANOffloadFlag | None = field(<br>
+ default=None,<br>
+ metadata=VLANOffloadFlag.make_parser(),<br>
+ )<br>
+<br>
+ #: Maximum size of RX buffer<br>
+ max_rx_bufsize: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")<br>
+ )<br>
+ #: Maximum number of VFs<br>
+ max_vfs_num: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")<br>
+ )<br>
+ #: Maximum number of VMDq pools<br>
+ max_vmdq_pools_num: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")<br>
+ )<br>
+<br>
+ #:<br>
+ switch_name: str | None = field(<br>
+ default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")<br>
+ )<br>
+ #:<br>
+ switch_domain_id: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")<br>
+ )<br>
+ #:<br>
+ switch_port_id: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")<br>
+ )<br>
+ #:<br>
+ switch_rx_domain: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")<br>
+ )<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdPortStats(TextParser):<br>
+ """Port statistics."""<br>
+<br>
+ #:<br>
+ port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))<br>
+<br>
+ #:<br>
+ rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))<br>
+ #:<br>
+ rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))<br>
+ #:<br>
+ rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))<br>
+ #:<br>
+ rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))<br>
+ #:<br>
+ rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))<br>
+<br>
+ #:<br>
+ tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))<br>
+ #:<br>
+ tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))<br>
+ #:<br>
+ tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))<br>
+<br>
+ #:<br>
+ rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))<br>
+ #:<br>
+ rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))<br>
+<br>
+ #:<br>
+ tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))<br>
+ #:<br>
+ tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))<br>
+<br>
+<br>
+@dataclass(kw_only=True)<br>
+class FlowRule:<br>
+ """Class representation of flow rule parameters.<br>
+<br>
+ This class represents the parameters of any flow rule as per the<br>
+ following pattern:<br>
+<br>
+ [group {group_id}] [priority {level}] [ingress] [egress]<br>
+ [user_id {user_id}] pattern {item} [/ {item} [...]] / end<br>
+ actions {action} [/ {action} [...]] / end<br>
+ """<br>
+<br>
+ #:<br>
+ group_id: int | None = None<br>
+ #:<br>
+ priority_level: int | None = None<br>
+ #:<br>
+ direction: Literal["ingress", "egress"]<br>
+ #:<br>
+ user_id: int | None = None<br>
+ #:<br>
+ pattern: list[str]<br>
+ #:<br>
+ actions: list[str]<br>
+<br>
+ def __str__(self) -> str:<br>
+ """Returns the string representation of this instance."""<br>
+ ret = ""<br>
+ pattern = " / ".join(self.pattern)<br>
+ action = " / ".join(self.actions)<br>
+ if self.group_id is not None:<br>
+ ret += f"group {self.group_id} "<br>
+ if self.priority_level is not None:<br>
+ ret += f"priority {self.priority_level} "<br>
+ ret += f"{self.direction} "<br>
+ if self.user_id is not None:<br>
+ ret += f"user_id {self.user_id} "<br>
+ ret += f"pattern {pattern} / end "<br>
+ ret += f"actions {action} / end"<br>
+ return ret<br>
+<br>
+<br>
+class PacketOffloadFlag(Flag):<br>
+ """Flag representing the Packet Offload Features Flags in DPDK.<br>
+<br>
+ Values in this class are taken from the definitions in the RTE MBUF core library in DPDK<br>
+ located in ``lib/mbuf/rte_mbuf_core.h``. It is expected that flag values in this class will<br>
+ match the values they are set to in said DPDK library with one exception; all values must be<br>
+ unique. For example, the definitions for unknown checksum flags in ``rte_mbuf_core.h`` are all<br>
+ set to :data:`0`, but it is valuable to distinguish between them in this framework. For this<br>
+ reason flags that are not unique in the DPDK library are set either to values within the<br>
+ RTE_MBUF_F_FIRST_FREE-RTE_MBUF_F_LAST_FREE range for Rx or shifted 61+ bits for Tx.<br>
+<br>
+ References:<br>
+ DPDK lib: ``lib/mbuf/rte_mbuf_core.h``<br>
+ """<br>
+<br>
+ # RX flags<br>
+<br>
+ #: The RX packet is a 802.1q VLAN packet, and the tci has been saved in mbuf->vlan_tci. If the<br>
+ #: flag RTE_MBUF_F_RX_VLAN_STRIPPED is also present, the VLAN header has been stripped from<br>
+ #: mbuf data, else it is still present.<br>
+ RTE_MBUF_F_RX_VLAN = auto()<br>
+<br>
+ #: RX packet with RSS hash result.<br>
+ RTE_MBUF_F_RX_RSS_HASH = auto()<br>
+<br>
+ #: RX packet with FDIR match indicate.<br>
+ RTE_MBUF_F_RX_FDIR = auto()<br>
+<br>
+ #: This flag is set when the outermost IP header checksum is detected as wrong by the hardware.<br>
+ RTE_MBUF_F_RX_OUTER_IP_CKSUM_BAD = 1 << 5<br>
+<br>
+ #: A vlan has been stripped by the hardware and its tci is saved in mbuf->vlan_tci. This can<br>
+ #: only happen if vlan stripping is enabled in the RX configuration of the PMD. When<br>
+ #: RTE_MBUF_F_RX_VLAN_STRIPPED is set, RTE_MBUF_F_RX_VLAN must also be set.<br>
+ RTE_MBUF_F_RX_VLAN_STRIPPED = auto()<br>
+<br>
+ #: No information about the RX IP checksum. Value is 0 in the DPDK library.<br>
+ RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN = 1 << 23<br>
+ #: The IP checksum in the packet is wrong.<br>
+ RTE_MBUF_F_RX_IP_CKSUM_BAD = 1 << 4<br>
+ #: The IP checksum in the packet is valid.<br>
+ RTE_MBUF_F_RX_IP_CKSUM_GOOD = 1 << 7<br>
+ #: The IP checksum is not correct in the packet data, but the integrity of the IP header is<br>
+ #: verified. Value is RTE_MBUF_F_RX_IP_CKSUM_BAD | RTE_MBUF_F_RX_IP_CKSUM_GOOD in the DPDK<br>
+ #: library.<br>
+ RTE_MBUF_F_RX_IP_CKSUM_NONE = 1 << 24<br>
+<br>
+ #: No information about the RX L4 checksum. Value is 0 in the DPDK library.<br>
+ RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN = 1 << 25<br>
+ #: The L4 checksum in the packet is wrong.<br>
+ RTE_MBUF_F_RX_L4_CKSUM_BAD = 1 << 3<br>
+ #: The L4 checksum in the packet is valid.<br>
+ RTE_MBUF_F_RX_L4_CKSUM_GOOD = 1 << 8<br>
+ #: The L4 checksum is not correct in the packet data, but the integrity of the L4 data is<br>
+ #: verified. Value is RTE_MBUF_F_RX_L4_CKSUM_BAD | RTE_MBUF_F_RX_L4_CKSUM_GOOD in the DPDK<br>
+ #: library.<br>
+ RTE_MBUF_F_RX_L4_CKSUM_NONE = 1 << 26<br>
+<br>
+ #: RX IEEE1588 L2 Ethernet PT Packet.<br>
+ RTE_MBUF_F_RX_IEEE1588_PTP = 1 << 9<br>
+ #: RX IEEE1588 L2/L4 timestamped packet.<br>
+ RTE_MBUF_F_RX_IEEE1588_TMST = 1 << 10<br>
+<br>
+ #: FD id reported if FDIR match.<br>
+ RTE_MBUF_F_RX_FDIR_ID = 1 << 13<br>
+ #: Flexible bytes reported if FDIR match.<br>
+ RTE_MBUF_F_RX_FDIR_FLX = 1 << 14<br>
+<br>
+ #: If both RTE_MBUF_F_RX_QINQ_STRIPPED and RTE_MBUF_F_RX_VLAN_STRIPPED are set, the 2 VLANs<br>
+ #: have been stripped by the hardware. If RTE_MBUF_F_RX_QINQ_STRIPPED is set and<br>
+ #: RTE_MBUF_F_RX_VLAN_STRIPPED is unset, only the outer VLAN is removed from packet data.<br>
+ RTE_MBUF_F_RX_QINQ_STRIPPED = auto()<br>
+<br>
+ #: When packets are coalesced by a hardware or virtual driver, this flag can be set in the RX<br>
+ #: mbuf, meaning that the m->tso_segsz field is valid and is set to the segment size of<br>
+ #: original packets.<br>
+ RTE_MBUF_F_RX_LRO = auto()<br>
+<br>
+ #: Indicate that security offload processing was applied on the RX packet.<br>
+ RTE_MBUF_F_RX_SEC_OFFLOAD = 1 << 18<br>
+ #: Indicate that security offload processing failed on the RX packet.<br>
+ RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED = auto()<br>
+<br>
+ #: The RX packet is a double VLAN. If this flag is set, RTE_MBUF_F_RX_VLAN must also be set. If<br>
+ #: the flag RTE_MBUF_F_RX_QINQ_STRIPPED is also present, both VLANs headers have been stripped<br>
+ #: from mbuf data, else they are still present.<br>
+ RTE_MBUF_F_RX_QINQ = auto()<br>
+<br>
+ #: No info about the outer RX L4 checksum. Value is 0 in the DPDK library.<br>
+ RTE_MBUF_F_RX_OUTER_L4_CKSUM_UNKNOWN = 1 << 27<br>
+ #: The outer L4 checksum in the packet is wrong<br>
+ RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD = 1 << 21<br>
+ #: The outer L4 checksum in the packet is valid<br>
+ RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD = 1 << 22<br>
+ #: Invalid outer L4 checksum state. Value is<br>
+ #: RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD | RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD in the DPDK library.<br>
+ RTE_MBUF_F_RX_OUTER_L4_CKSUM_INVALID = 1 << 28<br>
+<br>
+ # TX flags<br>
+<br>
+ #: Outer UDP checksum offload flag. This flag is used for enabling outer UDP checksum in PMD.<br>
+ #: To use outer UDP checksum, the user either needs to enable the following in mbuf:<br>
+ #:<br>
+ #: a) Fill outer_l2_len and outer_l3_len in mbuf.<br>
+ #: b) Set the RTE_MBUF_F_TX_OUTER_UDP_CKSUM flag.<br>
+ #: c) Set the RTE_MBUF_F_TX_OUTER_IPV4 or RTE_MBUF_F_TX_OUTER_IPV6 flag.<br>
+ #:<br>
+ #: Or configure RTE_ETH_TX_OFFLOAD_OUTER_UDP_CKSUM offload flag.<br>
+ RTE_MBUF_F_TX_OUTER_UDP_CKSUM = 1 << 41<br>
+<br>
+ #: UDP Fragmentation Offload flag. This flag is used for enabling UDP fragmentation in SW or in<br>
+ #: HW.<br>
+ RTE_MBUF_F_TX_UDP_SEG = auto()<br>
+<br>
+ #: Request security offload processing on the TX packet. To use Tx security offload, the user<br>
+ #: needs to fill l2_len in mbuf indicating L2 header size and where L3 header starts.<br>
+ #: Similarly, l3_len should also be filled along with ol_flags reflecting current L3 type.<br>
+ RTE_MBUF_F_TX_SEC_OFFLOAD = auto()<br>
+<br>
+ #: Offload the MACsec. This flag must be set by the application to enable this offload feature<br>
+ #: for a packet to be transmitted.<br>
+ RTE_MBUF_F_TX_MACSEC = auto()<br>
+<br>
+ # Bits 45:48 are used for the tunnel type in ``lib/mbuf/rte_mbuf_core.h``, but some are modified<br>
+ # in this Flag to maintain uniqueness. The tunnel type must be specified for TSO or checksum on<br>
+ # the inner part of tunnel packets. These flags can be used with RTE_MBUF_F_TX_TCP_SEG for TSO,<br>
+ # or RTE_MBUF_F_TX_xxx_CKSUM. The mbuf fields for inner and outer header lengths are required:<br>
+ # outer_l2_len, outer_l3_len, l2_len, l3_len, l4_len and tso_segsz for TSO.<br>
+<br>
+ #:<br>
+ RTE_MBUF_F_TX_TUNNEL_VXLAN = 1 << 45<br>
+ #:<br>
+ RTE_MBUF_F_TX_TUNNEL_GRE = 1 << 46<br>
+ #: Value is 3 << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_IPIP = 1 << 61<br>
+ #:<br>
+ RTE_MBUF_F_TX_TUNNEL_GENEVE = 1 << 47<br>
+ #: TX packet with MPLS-in-UDP RFC 7510 header. Value is 5 << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_MPLSINUDP = 1 << 62<br>
+ #: Value is 6 << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_VXLAN_GPE = 1 << 63<br>
+ #: Value is 7 << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_GTP = 1 << 64<br>
+ #:<br>
+ RTE_MBUF_F_TX_TUNNEL_ESP = 1 << 48<br>
+ #: Generic IP encapsulated tunnel type, used for TSO and checksum offload. This can be used for<br>
+ #: tunnels which are not standards or listed above. It is preferred to use specific tunnel<br>
+ #: flags like RTE_MBUF_F_TX_TUNNEL_GRE or RTE_MBUF_F_TX_TUNNEL_IPIP if possible. The ethdev<br>
+ #: must be configured with RTE_ETH_TX_OFFLOAD_IP_TNL_TSO. Outer and inner checksums are done<br>
+ #: according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel headers that<br>
+ #: contain payload length, sequence id or checksum are not expected to be updated. Value is<br>
+ #: 0xD << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_IP = 1 << 65<br>
+ #: Generic UDP encapsulated tunnel type, used for TSO and checksum offload. UDP tunnel type<br>
+ #: implies outer IP layer. It can be used for tunnels which are not standards or listed above.<br>
+ #: It is preferred to use specific tunnel flags like RTE_MBUF_F_TX_TUNNEL_VXLAN if possible.<br>
+ #: The ethdev must be configured with RTE_ETH_TX_OFFLOAD_UDP_TNL_TSO. Outer and inner checksums<br>
+ #: are done according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel<br>
+ #: headers that contain payload length, sequence id or checksum are not expected to be updated.<br>
+ #: value is 0xE << 45 in the DPDK library.<br>
+ RTE_MBUF_F_TX_TUNNEL_UDP = 1 << 66<br>
+<br>
+ #: Double VLAN insertion (QinQ) request to driver, driver may offload the insertion based on<br>
+ #: device capability. Mbuf 'vlan_tci' & 'vlan_tci_outer' must be valid when this flag is set.<br>
+ RTE_MBUF_F_TX_QINQ = 1 << 49<br>
+<br>
+ #: TCP segmentation offload. To enable this offload feature for a packet to be transmitted on<br>
+ #: hardware supporting TSO:<br>
+ #:<br>
+ #: - set the RTE_MBUF_F_TX_TCP_SEG flag in mbuf->ol_flags (this flag implies<br>
+ #: RTE_MBUF_F_TX_TCP_CKSUM)<br>
+ #: - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6<br>
+ #: * if it's IPv4, set the RTE_MBUF_F_TX_IP_CKSUM flag<br>
+ #: - fill the mbuf offload information: l2_len, l3_len, l4_len, tso_segsz<br>
+ RTE_MBUF_F_TX_TCP_SEG = auto()<br>
+<br>
+ #: TX IEEE1588 packet to timestamp.<br>
+ RTE_MBUF_F_TX_IEEE1588_TMST = auto()<br>
+<br>
+ # Bits 52+53 used for L4 packet type with checksum enabled in ``lib/mbuf/rte_mbuf_core.h`` but<br>
+ # some values must be modified in this framework to maintain uniqueness. To use hardware<br>
+ # L4 checksum offload, the user needs to:<br>
+ #<br>
+ # - fill l2_len and l3_len in mbuf<br>
+ # - set the flags RTE_MBUF_F_TX_TCP_CKSUM, RTE_MBUF_F_TX_SCTP_CKSUM or<br>
+ # RTE_MBUF_F_TX_UDP_CKSUM<br>
+ # - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6<br>
+<br>
+ #: Disable L4 cksum of TX pkt. Value is 0 in the DPDK library.<br>
+ RTE_MBUF_F_TX_L4_NO_CKSUM = 1 << 67<br>
+ #: TCP cksum of TX pkt. Computed by NIC.<br>
+ RTE_MBUF_F_TX_TCP_CKSUM = 1 << 52<br>
+ #: SCTP cksum of TX pkt. Computed by NIC.<br>
+ RTE_MBUF_F_TX_SCTP_CKSUM = 1 << 53<br>
+ #: UDP cksum of TX pkt. Computed by NIC. Value is 3 << 52 in the DPDK library.<br>
+ RTE_MBUF_F_TX_UDP_CKSUM = 1 << 68<br>
+<br>
+ #: Offload the IP checksum in the hardware. The flag RTE_MBUF_F_TX_IPV4 should also be set by<br>
+ #: the application, although a PMD will only check RTE_MBUF_F_TX_IP_CKSUM.<br>
+ RTE_MBUF_F_TX_IP_CKSUM = 1 << 54<br>
+<br>
+ #: Packet is IPv4. This flag must be set when using any offload feature (TSO, L3 or L4<br>
+ #: checksum) to tell the NIC that the packet is an IPv4 packet. If the packet is a tunneled<br>
+ #: packet, this flag is related to the inner headers.<br>
+ RTE_MBUF_F_TX_IPV4 = auto()<br>
+ #: Packet is IPv6. This flag must be set when using an offload feature (TSO or L4 checksum) to<br>
+ #: tell the NIC that the packet is an IPv6 packet. If the packet is a tunneled packet, this<br>
+ #: flag is related to the inner headers.<br>
+ RTE_MBUF_F_TX_IPV6 = auto()<br>
+ #: VLAN tag insertion request to driver, driver may offload the insertion based on the device<br>
+ #: capability. mbuf 'vlan_tci' field must be valid when this flag is set.<br>
+ RTE_MBUF_F_TX_VLAN = auto()<br>
+<br>
+ #: Offload the IP checksum of an external header in the hardware. The flag<br>
+ #: RTE_MBUF_F_TX_OUTER_IPV4 should also be set by the application, although a PMD will only<br>
+ #: check RTE_MBUF_F_TX_OUTER_IP_CKSUM.<br>
+ RTE_MBUF_F_TX_OUTER_IP_CKSUM = auto()<br>
+ #: Packet outer header is IPv4. This flag must be set when using any outer offload feature (L3<br>
+ #: or L4 checksum) to tell the NIC that the outer header of the tunneled packet is an IPv4<br>
+ #: packet.<br>
+ RTE_MBUF_F_TX_OUTER_IPV4 = auto()<br>
+ #: Packet outer header is IPv6. This flag must be set when using any outer offload feature (L4<br>
+ #: checksum) to tell the NIC that the outer header of the tunneled packet is an IPv6 packet.<br>
+ RTE_MBUF_F_TX_OUTER_IPV6 = auto()<br>
+<br>
+ @classmethod<br>
+ def from_list_string(cls, names: str) -> Self:<br>
+ """Makes a flag from a whitespace-separated list of names.<br>
+<br>
+ Args:<br>
+ names: a whitespace-separated list containing the members of this flag.<br>
+<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ flag = cls(0)<br>
+ for name in names.split():<br>
+ flag |= cls.from_str(name)<br>
+ return flag<br>
+<br>
+ @classmethod<br>
+ def from_str(cls, name: str) -> Self:<br>
+ """Makes a flag matching the supplied name.<br>
+<br>
+ Args:<br>
+ name: a valid member of this flag in text<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ member_name = name.strip().replace("-", "_")<br>
+ return cls[member_name]<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ return TextParser.wrap(<br>
+ TextParser.find(r"ol_flags: ([^\n]+)"),<br>
+ cls.from_list_string,<br>
+ )<br>
+<br>
+<br>
+class RtePTypes(Flag):<br>
+ """Flag representing possible packet types in DPDK verbose output.<br>
+<br>
+ Values in this class are derived from definitions in the RTE MBUF ptype library in DPDK located<br>
+ in ``lib/mbuf/rte_mbuf_ptype.h``. Specifically, the names of values in this class should match<br>
+ the possible return options from the functions ``rte_get_ptype_*_name`` in ``rte_mbuf_ptype.c``.<br>
+<br>
+ References:<br>
+ DPDK lib: ``lib/mbuf/rte_mbuf_ptype.h``<br>
+ DPDK ptype name formatting functions: ``lib/mbuf/rte_mbuf_ptype.c:rte_get_ptype_*_name()``<br>
+ """<br>
+<br>
+ # L2<br>
+ #: Ethernet packet type. This is used for outer packet for tunneling cases.<br>
+ L2_ETHER = auto()<br>
+ #: Ethernet packet type for time sync.<br>
+ L2_ETHER_TIMESYNC = auto()<br>
+ #: ARP (Address Resolution Protocol) packet type.<br>
+ L2_ETHER_ARP = auto()<br>
+ #: LLDP (Link Layer Discovery Protocol) packet type.<br>
+ L2_ETHER_LLDP = auto()<br>
+ #: NSH (Network Service Header) packet type.<br>
+ L2_ETHER_NSH = auto()<br>
+ #: VLAN packet type.<br>
+ L2_ETHER_VLAN = auto()<br>
+ #: QinQ packet type.<br>
+ L2_ETHER_QINQ = auto()<br>
+ #: PPPOE packet type.<br>
+ L2_ETHER_PPPOE = auto()<br>
+ #: FCoE packet type..<br>
+ L2_ETHER_FCOE = auto()<br>
+ #: MPLS packet type.<br>
+ L2_ETHER_MPLS = auto()<br>
+ #: No L2 packet information.<br>
+ L2_UNKNOWN = auto()<br>
+<br>
+ # L3<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and does not contain any header option.<br>
+ L3_IPV4 = auto()<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and contains header options.<br>
+ L3_IPV4_EXT = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and does not contain any extension header.<br>
+ L3_IPV6 = auto()<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and may or maynot contain header options.<br>
+ L3_IPV4_EXT_UNKNOWN = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and contains extension headers.<br>
+ L3_IPV6_EXT = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
+ #: cases, and may or maynot contain extension headers.<br>
+ L3_IPV6_EXT_UNKNOWN = auto()<br>
+ #: No L3 packet information.<br>
+ L3_UNKNOWN = auto()<br>
+<br>
+ # L4<br>
+ #: TCP (Transmission Control Protocol) packet type. This is used for outer packet for tunneling<br>
+ #: cases.<br>
+ L4_TCP = auto()<br>
+ #: UDP (User Datagram Protocol) packet type. This is used for outer packet for tunneling cases.<br>
+ L4_UDP = auto()<br>
+ #: Fragmented IP (Internet Protocol) packet type. This is used for outer packet for tunneling<br>
+ #: cases and refers to those packets of any IP types which can be recognized as fragmented. A<br>
+ #: fragmented packet cannot be recognized as any other L4 types (RTE_PTYPE_L4_TCP,<br>
+ #: RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP, RTE_PTYPE_L4_NONFRAG).<br>
+ L4_FRAG = auto()<br>
+ #: SCTP (Stream Control Transmission Protocol) packet type. This is used for outer packet for<br>
+ #: tunneling cases.<br>
+ L4_SCTP = auto()<br>
+ #: ICMP (Internet Control Message Protocol) packet type. This is used for outer packet for<br>
+ #: tunneling cases.<br>
+ L4_ICMP = auto()<br>
+ #: Non-fragmented IP (Internet Protocol) packet type. This is used for outer packet for<br>
+ #: tunneling cases and refers to those packets of any IP types, that cannot be recognized as<br>
+ #: any of the above L4 types (RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_FRAG,<br>
+ #: RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP).<br>
+ L4_NONFRAG = auto()<br>
+ #: IGMP (Internet Group Management Protocol) packet type.<br>
+ L4_IGMP = auto()<br>
+ #: No L4 packet information.<br>
+ L4_UNKNOWN = auto()<br>
+<br>
+ # Tunnel<br>
+ #: IP (Internet Protocol) in IP (Internet Protocol) tunneling packet type.<br>
+ TUNNEL_IP = auto()<br>
+ #: GRE (Generic Routing Encapsulation) tunneling packet type.<br>
+ TUNNEL_GRE = auto()<br>
+ #: VXLAN (Virtual eXtensible Local Area Network) tunneling packet type.<br>
+ TUNNEL_VXLAN = auto()<br>
+ #: NVGRE (Network Virtualization using Generic Routing Encapsulation) tunneling packet type.<br>
+ TUNNEL_NVGRE = auto()<br>
+ #: GENEVE (Generic Network Virtualization Encapsulation) tunneling packet type.<br>
+ TUNNEL_GENEVE = auto()<br>
+ #: Tunneling packet type of Teredo, VXLAN (Virtual eXtensible Local Area Network) or GRE<br>
+ #: (Generic Routing Encapsulation) could be recognized as this packet type, if they can not be<br>
+ #: recognized independently as of hardware capability.<br>
+ TUNNEL_GRENAT = auto()<br>
+ #: GTP-C (GPRS Tunnelling Protocol) control tunneling packet type.<br>
+ TUNNEL_GTPC = auto()<br>
+ #: GTP-U (GPRS Tunnelling Protocol) user data tunneling packet type.<br>
+ TUNNEL_GTPU = auto()<br>
+ #: ESP (IP Encapsulating Security Payload) tunneling packet type.<br>
+ TUNNEL_ESP = auto()<br>
+ #: L2TP (Layer 2 Tunneling Protocol) tunneling packet type.<br>
+ TUNNEL_L2TP = auto()<br>
+ #: VXLAN-GPE (VXLAN Generic Protocol Extension) tunneling packet type.<br>
+ TUNNEL_VXLAN_GPE = auto()<br>
+ #: MPLS-in-UDP tunneling packet type (RFC 7510).<br>
+ TUNNEL_MPLS_IN_UDP = auto()<br>
+ #: MPLS-in-GRE tunneling packet type (RFC 4023).<br>
+ TUNNEL_MPLS_IN_GRE = auto()<br>
+ #: No tunnel information found on the packet.<br>
+ TUNNEL_UNKNOWN = auto()<br>
+<br>
+ # Inner L2<br>
+ #: Ethernet packet type. This is used for inner packet type only.<br>
+ INNER_L2_ETHER = auto()<br>
+ #: Ethernet packet type with VLAN (Virtual Local Area Network) tag.<br>
+ INNER_L2_ETHER_VLAN = auto()<br>
+ #: QinQ packet type.<br>
+ INNER_L2_ETHER_QINQ = auto()<br>
+ #: No inner L2 information found on the packet.<br>
+ INNER_L2_UNKNOWN = auto()<br>
+<br>
+ # Inner L3<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and does<br>
+ #: not contain any header option.<br>
+ INNER_L3_IPV4 = auto()<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and<br>
+ #: contains header options.<br>
+ INNER_L3_IPV4_EXT = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and does<br>
+ #: not contain any extension header.<br>
+ INNER_L3_IPV6 = auto()<br>
+ #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and may or<br>
+ #: may not contain header options.<br>
+ INNER_L3_IPV4_EXT_UNKNOWN = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and<br>
+ #: contains extension headers.<br>
+ INNER_L3_IPV6_EXT = auto()<br>
+ #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and may or<br>
+ #: may not contain extension headers.<br>
+ INNER_L3_IPV6_EXT_UNKNOWN = auto()<br>
+ #: No inner L3 information found on the packet.<br>
+ INNER_L3_UNKNOWN = auto()<br>
+<br>
+ # Inner L4<br>
+ #: TCP (Transmission Control Protocol) packet type. This is used for inner packet only.<br>
+ INNER_L4_TCP = auto()<br>
+ #: UDP (User Datagram Protocol) packet type. This is used for inner packet only.<br>
+ INNER_L4_UDP = auto()<br>
+ #: Fragmented IP (Internet Protocol) packet type. This is used for inner packet only, and may<br>
+ #: or maynot have a layer 4 packet.<br>
+ INNER_L4_FRAG = auto()<br>
+ #: SCTP (Stream Control Transmission Protocol) packet type. This is used for inner packet only.<br>
+ INNER_L4_SCTP = auto()<br>
+ #: ICMP (Internet Control Message Protocol) packet type. This is used for inner packet only.<br>
+ INNER_L4_ICMP = auto()<br>
+ #: Non-fragmented IP (Internet Protocol) packet type. It is used for inner packet only, and may<br>
+ #: or may not have other unknown layer 4 packet types.<br>
+ INNER_L4_NONFRAG = auto()<br>
+ #: No inner L4 information found on the packet.<br>
+ INNER_L4_UNKNOWN = auto()<br>
+<br>
+ @classmethod<br>
+ def from_list_string(cls, names: str) -> Self:<br>
+ """Makes a flag from a whitespace-separated list of names.<br>
+<br>
+ Args:<br>
+ names: a whitespace-separated list containing the members of this flag.<br>
+<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ flag = cls(0)<br>
+ for name in names.split():<br>
+ flag |= cls.from_str(name)<br>
+ return flag<br>
+<br>
+ @classmethod<br>
+ def from_str(cls, name: str) -> Self:<br>
+ """Makes a flag matching the supplied name.<br>
+<br>
+ Args:<br>
+ name: a valid member of this flag in text<br>
+ Returns:<br>
+ An instance of this flag.<br>
+ """<br>
+ member_name = name.strip().replace("-", "_")<br>
+ return cls[member_name]<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls, hw: bool) -> ParserFn:<br>
+ """Makes a parser function.<br>
+<br>
+ Args:<br>
+ hw: Whether to make a parser for hardware ptypes or software ptypes. If :data:`True`,<br>
+ hardware ptypes will be collected, otherwise software pytpes will.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ return TextParser.wrap(<br>
+ TextParser.find(f"{'hw' if hw else 'sw'} ptype: ([^-]+)"),<br>
+ cls.from_list_string,<br>
+ )<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdVerbosePacket(TextParser):<br>
+ """Packet information provided by verbose output in Testpmd.<br>
+<br>
+ This dataclass expects that packet information be prepended with the starting line of packet<br>
+ bursts. Specifically, the line that reads "port X/queue Y: sent/received Z packets".<br>
+ """<br>
+<br>
+ #: ID of the port that handled the packet.<br>
+ port_id: int = field(metadata=TextParser.find_int(r"port (\d+)/queue \d+"))<br>
+ #: ID of the queue that handled the packet.<br>
+ queue_id: int = field(metadata=TextParser.find_int(r"port \d+/queue (\d+)"))<br>
+ #: Whether the packet was received or sent by the queue/port.<br>
+ was_received: bool = field(metadata=TextParser.find(r"received \d+ packets"))<br>
+ #:<br>
+ src_mac: str = field(metadata=TextParser.find(f"src=({REGEX_FOR_MAC_ADDRESS})"))<br>
+ #:<br>
+ dst_mac: str = field(metadata=TextParser.find(f"dst=({REGEX_FOR_MAC_ADDRESS})"))<br>
+ #: Memory pool the packet was handled on.<br>
+ pool: str = field(metadata=TextParser.find(r"pool=(\S+)"))<br>
+ #: Packet type in hex.<br>
+ p_type: int = field(metadata=TextParser.find_int(r"type=(0x[a-fA-F\d]+)"))<br>
+ #:<br>
+ length: int = field(metadata=TextParser.find_int(r"length=(\d+)"))<br>
+ #: Number of segments in the packet.<br>
+ nb_segs: int = field(metadata=TextParser.find_int(r"nb_segs=(\d+)"))<br>
+ #: Hardware packet type.<br>
+ hw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=True))<br>
+ #: Software packet type.<br>
+ sw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=False))<br>
+ #:<br>
+ l2_len: int = field(metadata=TextParser.find_int(r"l2_len=(\d+)"))<br>
+ #:<br>
+ ol_flags: PacketOffloadFlag = field(metadata=PacketOffloadFlag.make_parser())<br>
+ #: RSS hash of the packet in hex.<br>
+ rss_hash: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"RSS hash=(0x[a-fA-F\d]+)")<br>
+ )<br>
+ #: RSS queue that handled the packet in hex.<br>
+ rss_queue: int | None = field(<br>
+ default=None, metadata=TextParser.find_int(r"RSS queue=(0x[a-fA-F\d]+)")<br>
+ )<br>
+ #:<br>
+ l3_len: int | None = field(default=None, metadata=TextParser.find_int(r"l3_len=(\d+)"))<br>
+ #:<br>
+ l4_len: int | None = field(default=None, metadata=TextParser.find_int(r"l4_len=(\d+)"))<br>
+ #:<br>
+ l4_dport: int | None = field(<br>
+ default=None,<br>
+ metadata=TextParser.find_int(r"Destination (?:TCP|UDP) port=(\d+)"),<br>
+ )<br>
+<br>
+<br>
+class RxOffloadCapability(Flag):<br>
+ """Rx offload capabilities of a device.<br>
+<br>
+ The flags are taken from ``lib/ethdev/rte_ethdev.h``.<br>
+ They're prefixed with ``RTE_ETH_RX_OFFLOAD`` in ``lib/ethdev/rte_ethdev.h``<br>
+ instead of ``RX_OFFLOAD``, which is what testpmd changes the prefix to.<br>
+ The values are not contiguous, so the correspondence is preserved<br>
+ by specifying concrete values interspersed between auto() values.<br>
+<br>
+ The ``RX_OFFLOAD`` prefix has been preserved so that the same flag names can be used<br>
+ in :class:`NicCapability`. The prefix is needed in :class:`NicCapability` since there's<br>
+ no other qualifier which would sufficiently distinguish it from other capabilities.<br>
+<br>
+ References:<br>
+ DPDK lib: ``lib/ethdev/rte_ethdev.h``<br>
+ testpmd display function: ``app/test-pmd/cmdline.c:print_rx_offloads()``<br>
+ """<br>
+<br>
+ #:<br>
+ RX_OFFLOAD_VLAN_STRIP = auto()<br>
+ #: Device supports L3 checksum offload.<br>
+ RX_OFFLOAD_IPV4_CKSUM = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_UDP_CKSUM = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_TCP_CKSUM = auto()<br>
+ #: Device supports Large Receive Offload.<br>
+ RX_OFFLOAD_TCP_LRO = auto()<br>
+ #: Device supports QinQ (queue in queue) offload.<br>
+ RX_OFFLOAD_QINQ_STRIP = auto()<br>
+ #: Device supports inner packet L3 checksum.<br>
+ RX_OFFLOAD_OUTER_IPV4_CKSUM = auto()<br>
+ #: Device supports MACsec.<br>
+ RX_OFFLOAD_MACSEC_STRIP = auto()<br>
+ #: Device supports filtering of a VLAN Tag identifier.<br>
+ RX_OFFLOAD_VLAN_FILTER = 1 << 9<br>
+ #: Device supports VLAN offload.<br>
+ RX_OFFLOAD_VLAN_EXTEND = auto()<br>
+ #: Device supports receiving segmented mbufs.<br>
+ RX_OFFLOAD_SCATTER = 1 << 13<br>
+ #: Device supports Timestamp.<br>
+ RX_OFFLOAD_TIMESTAMP = auto()<br>
+ #: Device supports crypto processing while packet is received in NIC.<br>
+ RX_OFFLOAD_SECURITY = auto()<br>
+ #: Device supports CRC stripping.<br>
+ RX_OFFLOAD_KEEP_CRC = auto()<br>
+ #: Device supports L4 checksum offload.<br>
+ RX_OFFLOAD_SCTP_CKSUM = auto()<br>
+ #: Device supports inner packet L4 checksum.<br>
+ RX_OFFLOAD_OUTER_UDP_CKSUM = auto()<br>
+ #: Device supports RSS hashing.<br>
+ RX_OFFLOAD_RSS_HASH = auto()<br>
+ #: Device supports<br>
+ RX_OFFLOAD_BUFFER_SPLIT = auto()<br>
+ #: Device supports all checksum capabilities.<br>
+ RX_OFFLOAD_CHECKSUM = RX_OFFLOAD_IPV4_CKSUM | RX_OFFLOAD_UDP_CKSUM | RX_OFFLOAD_TCP_CKSUM<br>
+ #: Device supports all VLAN capabilities.<br>
+ RX_OFFLOAD_VLAN = (<br>
+ RX_OFFLOAD_VLAN_STRIP<br>
+ | RX_OFFLOAD_VLAN_FILTER<br>
+ | RX_OFFLOAD_VLAN_EXTEND<br>
+ | RX_OFFLOAD_QINQ_STRIP<br>
+ )<br>
+<br>
+ @classmethod<br>
+ def from_string(cls, line: str) -> Self:<br>
+ """Make an instance from a string containing the flag names separated with a space.<br>
+<br>
+ Args:<br>
+ line: The line to parse.<br>
+<br>
+ Returns:<br>
+ A new instance containing all found flags.<br>
+ """<br>
+ flag = cls(0)<br>
+ for flag_name in line.split():<br>
+ flag |= cls[f"RX_OFFLOAD_{flag_name}"]<br>
+ return flag<br>
+<br>
+ @classmethod<br>
+ def make_parser(cls, per_port: bool) -> ParserFn:<br>
+ """Make a parser function.<br>
+<br>
+ Args:<br>
+ per_port: If :data:`True`, will return capabilities per port. If :data:`False`,<br>
+ will return capabilities per queue.<br>
+<br>
+ Returns:<br>
+ ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
+ parser function that makes an instance of this flag from text.<br>
+ """<br>
+ granularity = "Port" if per_port else "Queue"<br>
+ return TextParser.wrap(<br>
+ TextParser.find(rf"Per {granularity}\s+:(.*)$", re.MULTILINE),<br>
+ cls.from_string,<br>
+ )<br>
+<br>
+<br>
+@dataclass<br>
+class RxOffloadCapabilities(TextParser):<br>
+ """The result of testpmd's ``show port <port_id> rx_offload capabilities`` command.<br>
+<br>
+ References:<br>
+ testpmd command function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa()``<br>
+ testpmd display function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa_parsed()``<br>
+ """<br>
+<br>
+ #:<br>
+ port_id: int = field(<br>
+ metadata=TextParser.find_int(r"Rx Offloading Capabilities of port (\d+) :")<br>
+ )<br>
+ #: Per-queue Rx offload capabilities.<br>
+ per_queue: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(False))<br>
+ #: Capabilities other than per-queue Rx offload capabilities.<br>
+ per_port: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(True))<br>
+<br>
+<br>
+@dataclass<br>
+class TestPmdPortFlowCtrl(TextParser):<br>
+ """Class representing a port's flow control parameters.<br>
+<br>
+ The parameters can also be parsed from the output of ``show port <port_id> flow_ctrl``.<br>
+ """<br>
+<br>
+ #: Enable Reactive Extensions.<br>
+ rx: bool = field(default=False, metadata=TextParser.find(r"Rx pause: on"))<br>
+ #: Enable Transmit.<br>
+ tx: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
+ #: High threshold value to trigger XOFF.<br>
+ high_water: int = field(<br>
+ default=0, metadata=TextParser.find_int(r"High waterline: (0x[a-fA-F\d]+)")<br>
+ )<br>
+ #: Low threshold value to trigger XON.<br>
+ low_water: int = field(<br>
+ default=0, metadata=TextParser.find_int(r"Low waterline: (0x[a-fA-F\d]+)")<br>
+ )<br>
+ #: Pause quota in the Pause frame.<br>
+ pause_time: int = field(default=0, metadata=TextParser.find_int(r"Pause time: (0x[a-fA-F\d]+)"))<br>
+ #: Send XON frame.<br>
+ send_xon: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
+ #: Enable receiving MAC control frames.<br>
+ mac_ctrl_frame_fwd: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
+ #: Change the auto-negotiation parameter.<br>
+ autoneg: bool = field(default=False, metadata=TextParser.find(r"Autoneg: on"))<br>
+<br>
+ def __str__(self) -> str:<br>
+ """Returns the string representation of this instance."""<br>
+ ret = (<br>
+ f"rx {'on' if self.rx else 'off'} "<br>
+ f"tx {'on' if self.tx else 'off'} "<br>
+ f"{self.high_water} "<br>
+ f"{self.low_water} "<br>
+ f"{self.pause_time} "<br>
+ f"{1 if self.send_xon else 0} "<br>
+ f"mac_ctrl_frame_fwd {'on' if self.mac_ctrl_frame_fwd else 'off'} "<br>
+ f"autoneg {'on' if self.autoneg else 'off'}"<br>
+ )<br>
+ return ret<br>
diff --git a/dts/framework/config/__init__.py b/dts/framework/config/__init__.py<br>
index 1ec744d1d4..d2f0138e4a 100644<br>
--- a/dts/framework/config/__init__.py<br>
+++ b/dts/framework/config/__init__.py<br>
@@ -28,7 +28,6 @@<br>
and makes it thread safe should we ever want to move in that direction.<br>
"""<br>
<br>
-import os<br>
from pathlib import Path<br>
from typing import TYPE_CHECKING, Annotated, Any, Literal, TypeVar, cast<br>
<br>
@@ -43,7 +42,7 @@<br>
from .test_run import TestRunConfiguration, create_test_suites_config_model<br>
<br>
# Import only if type checking or building docs, to prevent circular imports.<br>
-if TYPE_CHECKING or os.environ.get("DTS_DOC_BUILD"):<br>
+if TYPE_CHECKING:<br>
from framework.test_suite import BaseConfig<br>
<br>
NodesConfig = Annotated[list[NodeConfiguration], Field(min_length=1)]<br>
diff --git a/dts/framework/params/eal.py b/dts/framework/params/eal.py<br>
index b90ff33dcf..e84a20f02f 100644<br>
--- a/dts/framework/params/eal.py<br>
+++ b/dts/framework/params/eal.py<br>
@@ -4,15 +4,17 @@<br>
"""Module representing the DPDK EAL-related parameters."""<br>
<br>
from dataclasses import dataclass, field<br>
-from typing import Literal<br>
+from typing import TYPE_CHECKING, Literal<br>
<br>
from framework.params import Params, Switch<br>
from framework.testbed_model.cpu import LogicalCoreList<br>
-from framework.testbed_model.port import Port<br>
from framework.testbed_model.virtual_device import VirtualDevice<br>
<br>
+if TYPE_CHECKING:<br>
+ from framework.testbed_model.port import Port<br>
<br>
-def _port_to_pci(port: Port) -> str:<br>
+<br>
+def _port_to_pci(port: "Port") -> str:<br>
return port.pci<br>
<br>
<br>
@@ -42,11 +44,11 @@ class EalParams(Params):<br>
vdevs: list[VirtualDevice] | None = field(<br>
default=None, metadata=Params.multiple() | Params.long("vdev")<br>
)<br>
- allowed_ports: list[Port] | None = field(<br>
+ allowed_ports: list["Port"] | None = field(<br>
default=None,<br>
metadata=Params.convert_value(_port_to_pci) | Params.multiple() | Params.short("a"),<br>
)<br>
- blocked_ports: list[Port] | None = field(<br>
+ blocked_ports: list["Port"] | None = field(<br>
default=None,<br>
metadata=Params.convert_value(_port_to_pci) | Params.multiple() | Params.short("b"),<br>
)<br>
diff --git a/dts/framework/params/types.py b/dts/framework/params/types.py<br>
index 87d11502e8..5bc4bd37d9 100644<br>
--- a/dts/framework/params/types.py<br>
+++ b/dts/framework/params/types.py<br>
@@ -15,8 +15,7 @@ def create_testpmd(**kwargs: Unpack[TestPmdParamsDict]):<br>
from pathlib import PurePath<br>
from typing import TypedDict<br>
<br>
-from framework.params import Switch, YesNoSwitch<br>
-from framework.params.testpmd import (<br>
+from api.testpmd.config import (<br>
AnonMempoolAllocationMode,<br>
EthPeer,<br>
Event,<br>
@@ -37,6 +36,7 @@ def create_testpmd(**kwargs: Unpack[TestPmdParamsDict]):<br>
TXRingParams,<br>
TxUDPPortPair,<br>
)<br>
+from framework.params import Switch, YesNoSwitch<br>
from framework.testbed_model.cpu import LogicalCoreList<br>
from framework.testbed_model.port import Port<br>
from framework.testbed_model.virtual_device import VirtualDevice<br>
diff --git a/dts/framework/remote_session/__init__.py b/dts/framework/remote_session/__init__.py<br>
index 1a5cf6abd3..394c88b25e 100644<br>
--- a/dts/framework/remote_session/__init__.py<br>
+++ b/dts/framework/remote_session/__init__.py<br>
@@ -11,47 +11,3 @@<br>
The interactive sessions open an interactive shell which is continuously open,<br>
allowing it to send and receive data within that particular shell.<br>
"""<br>
-<br>
-from framework.config.node import NodeConfiguration<br>
-from framework.logger import DTSLogger<br>
-<br>
-from .interactive_remote_session import InteractiveRemoteSession<br>
-from .remote_session import RemoteSession<br>
-from .ssh_session import SSHSession<br>
-<br>
-<br>
-def create_remote_session(<br>
- node_config: NodeConfiguration, name: str, logger: DTSLogger<br>
-) -> RemoteSession:<br>
- """Factory for non-interactive remote sessions.<br>
-<br>
- The function returns an SSH session, but will be extended if support<br>
- for other protocols is added.<br>
-<br>
- Args:<br>
- node_config: The test run configuration of the node to connect to.<br>
- name: The name of the session.<br>
- logger: The logger instance this session will use.<br>
-<br>
- Returns:<br>
- The SSH remote session.<br>
- """<br>
- return SSHSession(node_config, name, logger)<br>
-<br>
-<br>
-def create_interactive_session(<br>
- node_config: NodeConfiguration, logger: DTSLogger<br>
-) -> InteractiveRemoteSession:<br>
- """Factory for interactive remote sessions.<br>
-<br>
- The function returns an interactive SSH session, but will be extended if support<br>
- for other protocols is added.<br>
-<br>
- Args:<br>
- node_config: The test run configuration of the node to connect to.<br>
- logger: The logger instance this session will use.<br>
-<br>
- Returns:<br>
- The interactive SSH remote session.<br>
- """<br>
- return InteractiveRemoteSession(node_config, logger)<br>
diff --git a/dts/framework/remote_session/testpmd_shell.py b/dts/framework/remote_session/testpmd_shell.py<br>
deleted file mode 100644<br>
index ad8cb273dc..0000000000<br>
--- a/dts/framework/remote_session/testpmd_shell.py<br>
+++ /dev/null<br>
@@ -1,2844 +0,0 @@<br>
-# SPDX-License-Identifier: BSD-3-Clause<br>
-# Copyright(c) 2023 University of New Hampshire<br>
-# Copyright(c) 2023 PANTHEON.tech s.r.o.<br>
-# Copyright(c) 2024 Arm Limited<br>
-<br>
-"""Testpmd interactive shell.<br>
-<br>
-Typical usage example in a TestSuite::<br>
-<br>
- testpmd_shell = TestPmdShell(self.sut_node)<br>
- devices = testpmd_shell.get_devices()<br>
- for device in devices:<br>
- print(device)<br>
- testpmd_shell.close()<br>
-"""<br>
-<br>
-import functools<br>
-import re<br>
-import time<br>
-from collections.abc import Callable, MutableSet<br>
-from dataclasses import dataclass, field<br>
-from enum import Flag, auto<br>
-from os import environ<br>
-from pathlib import PurePath<br>
-from typing import TYPE_CHECKING, Any, ClassVar, Concatenate, Literal, ParamSpec, Tuple, TypeAlias<br>
-<br>
-from framework.context import get_ctx<br>
-from framework.remote_session.interactive_shell import only_active<br>
-from framework.testbed_model.topology import TopologyType<br>
-<br>
-if TYPE_CHECKING or environ.get("DTS_DOC_BUILD"):<br>
- from enum import Enum as NoAliasEnum<br>
-else:<br>
- from aenum import NoAliasEnum<br>
-<br>
-from typing_extensions import Self, Unpack<br>
-<br>
-from framework.exception import InteractiveCommandExecutionError, InternalError<br>
-from framework.params.testpmd import PortTopology, SimpleForwardingModes, TestPmdParams<br>
-from framework.params.types import TestPmdParamsDict<br>
-from framework.parser import ParserFn, TextParser<br>
-from framework.remote_session.dpdk_shell import DPDKShell<br>
-from framework.settings import SETTINGS<br>
-from framework.utils import REGEX_FOR_MAC_ADDRESS, StrEnum<br>
-<br>
-P = ParamSpec("P")<br>
-TestPmdShellMethod = Callable[Concatenate["TestPmdShell", P], Any]<br>
-<br>
-TestPmdShellCapabilityMethod: TypeAlias = Callable[<br>
- ["TestPmdShell", MutableSet["NicCapability"], MutableSet["NicCapability"]], None<br>
-]<br>
-<br>
-TestPmdShellDecorator: TypeAlias = Callable[[TestPmdShellMethod], TestPmdShellMethod]<br>
-<br>
-TestPmdShellNicCapability = tuple[TestPmdShellCapabilityMethod, TestPmdShellDecorator | None]<br>
-<br>
-<br>
-class TestPmdDevice:<br>
- """The data of a device that testpmd can recognize.<br>
-<br>
- Attributes:<br>
- pci_address: The PCI address of the device.<br>
- """<br>
-<br>
- pci_address: str<br>
-<br>
- def __init__(self, pci_address_line: str):<br>
- """Initialize the device from the testpmd output line string.<br>
-<br>
- Args:<br>
- pci_address_line: A line of testpmd output that contains a device.<br>
- """<br>
- self.pci_address = pci_address_line.strip().split(": ")[1].strip()<br>
-<br>
- def __str__(self) -> str:<br>
- """The PCI address captures what the device is."""<br>
- return self.pci_address<br>
-<br>
-<br>
-class VLANOffloadFlag(Flag):<br>
- """Flag representing the VLAN offload settings of a NIC port."""<br>
-<br>
- #:<br>
- STRIP = auto()<br>
- #:<br>
- FILTER = auto()<br>
- #:<br>
- EXTEND = auto()<br>
- #:<br>
- QINQ_STRIP = auto()<br>
-<br>
- @classmethod<br>
- def from_str_dict(cls, d):<br>
- """Makes an instance from a dict containing the flag member names with an "on" value.<br>
-<br>
- Args:<br>
- d: A dictionary containing the flag members as keys and any string value.<br>
-<br>
- Returns:<br>
- A new instance of the flag.<br>
- """<br>
- flag = cls(0)<br>
- for name in cls.__members__:<br>
- if d.get(name) == "on":<br>
- flag |= cls[name]<br>
- return flag<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- return TextParser.wrap(<br>
- TextParser.find(<br>
- r"VLAN offload:\s+"<br>
- r"strip (?P<STRIP>on|off), "<br>
- r"filter (?P<FILTER>on|off), "<br>
- r"extend (?P<EXTEND>on|off), "<br>
- r"qinq strip (?P<QINQ_STRIP>on|off)",<br>
- re.MULTILINE,<br>
- named=True,<br>
- ),<br>
- cls.from_str_dict,<br>
- )<br>
-<br>
-<br>
-class ChecksumOffloadOptions(Flag):<br>
- """Flag representing checksum hardware offload layer options."""<br>
-<br>
- #:<br>
- ip = auto()<br>
- #:<br>
- udp = auto()<br>
- #:<br>
- tcp = auto()<br>
- #:<br>
- sctp = auto()<br>
- #:<br>
- outer_ip = auto()<br>
- #:<br>
- outer_udp = auto()<br>
-<br>
-<br>
-class RSSOffloadTypesFlag(Flag):<br>
- """Flag representing the RSS offload flow types supported by the NIC port."""<br>
-<br>
- #:<br>
- ipv4 = auto()<br>
- #:<br>
- ipv4_frag = auto()<br>
- #:<br>
- ipv4_tcp = auto()<br>
- #:<br>
- ipv4_udp = auto()<br>
- #:<br>
- ipv4_sctp = auto()<br>
- #:<br>
- ipv4_other = auto()<br>
- #:<br>
- ipv6 = auto()<br>
- #:<br>
- ipv6_frag = auto()<br>
- #:<br>
- ipv6_tcp = auto()<br>
- #:<br>
- ipv6_udp = auto()<br>
- #:<br>
- ipv6_sctp = auto()<br>
- #:<br>
- ipv6_other = auto()<br>
- #:<br>
- l2_payload = auto()<br>
- #:<br>
- ipv6_ex = auto()<br>
- #:<br>
- ipv6_tcp_ex = auto()<br>
- #:<br>
- ipv6_udp_ex = auto()<br>
- #:<br>
- port = auto()<br>
- #:<br>
- vxlan = auto()<br>
- #:<br>
- geneve = auto()<br>
- #:<br>
- nvgre = auto()<br>
- #:<br>
- user_defined_22 = auto()<br>
- #:<br>
- gtpu = auto()<br>
- #:<br>
- eth = auto()<br>
- #:<br>
- s_vlan = auto()<br>
- #:<br>
- c_vlan = auto()<br>
- #:<br>
- esp = auto()<br>
- #:<br>
- ah = auto()<br>
- #:<br>
- l2tpv3 = auto()<br>
- #:<br>
- pfcp = auto()<br>
- #:<br>
- pppoe = auto()<br>
- #:<br>
- ecpri = auto()<br>
- #:<br>
- mpls = auto()<br>
- #:<br>
- ipv4_chksum = auto()<br>
- #:<br>
- l4_chksum = auto()<br>
- #:<br>
- l2tpv2 = auto()<br>
- #:<br>
- ipv6_flow_label = auto()<br>
- #:<br>
- user_defined_38 = auto()<br>
- #:<br>
- user_defined_39 = auto()<br>
- #:<br>
- user_defined_40 = auto()<br>
- #:<br>
- user_defined_41 = auto()<br>
- #:<br>
- user_defined_42 = auto()<br>
- #:<br>
- user_defined_43 = auto()<br>
- #:<br>
- user_defined_44 = auto()<br>
- #:<br>
- user_defined_45 = auto()<br>
- #:<br>
- user_defined_46 = auto()<br>
- #:<br>
- user_defined_47 = auto()<br>
- #:<br>
- user_defined_48 = auto()<br>
- #:<br>
- user_defined_49 = auto()<br>
- #:<br>
- user_defined_50 = auto()<br>
- #:<br>
- user_defined_51 = auto()<br>
- #:<br>
- l3_pre96 = auto()<br>
- #:<br>
- l3_pre64 = auto()<br>
- #:<br>
- l3_pre56 = auto()<br>
- #:<br>
- l3_pre48 = auto()<br>
- #:<br>
- l3_pre40 = auto()<br>
- #:<br>
- l3_pre32 = auto()<br>
- #:<br>
- l2_dst_only = auto()<br>
- #:<br>
- l2_src_only = auto()<br>
- #:<br>
- l4_dst_only = auto()<br>
- #:<br>
- l4_src_only = auto()<br>
- #:<br>
- l3_dst_only = auto()<br>
- #:<br>
- l3_src_only = auto()<br>
-<br>
- #:<br>
- ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex<br>
- #:<br>
- udp = ipv4_udp | ipv6_udp | ipv6_udp_ex<br>
- #:<br>
- tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex<br>
- #:<br>
- sctp = ipv4_sctp | ipv6_sctp<br>
- #:<br>
- tunnel = vxlan | geneve | nvgre<br>
- #:<br>
- vlan = s_vlan | c_vlan<br>
- #:<br>
- all = (<br>
- eth<br>
- | vlan<br>
- | ip<br>
- | tcp<br>
- | udp<br>
- | sctp<br>
- | l2_payload<br>
- | l2tpv3<br>
- | esp<br>
- | ah<br>
- | pfcp<br>
- | gtpu<br>
- | ecpri<br>
- | mpls<br>
- | l2tpv2<br>
- )<br>
-<br>
- @classmethod<br>
- def from_list_string(cls, names: str) -> Self:<br>
- """Makes a flag from a whitespace-separated list of names.<br>
-<br>
- Args:<br>
- names: a whitespace-separated list containing the members of this flag.<br>
-<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- flag = cls(0)<br>
- for name in names.split():<br>
- flag |= cls.from_str(name)<br>
- return flag<br>
-<br>
- @classmethod<br>
- def from_str(cls, name: str) -> Self:<br>
- """Makes a flag matching the supplied name.<br>
-<br>
- Args:<br>
- name: a valid member of this flag in text<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- member_name = name.strip().replace("-", "_")<br>
- return cls[member_name]<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- return TextParser.wrap(<br>
- TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE),<br>
- RSSOffloadTypesFlag.from_list_string,<br>
- )<br>
-<br>
-<br>
-class DeviceCapabilitiesFlag(Flag):<br>
- """Flag representing the device capabilities."""<br>
-<br>
- #: Device supports Rx queue setup after device started.<br>
- RUNTIME_RX_QUEUE_SETUP = auto()<br>
- #: Device supports Tx queue setup after device started.<br>
- RUNTIME_TX_QUEUE_SETUP = auto()<br>
- #: Device supports shared Rx queue among ports within Rx domain and switch domain.<br>
- RXQ_SHARE = auto()<br>
- #: Device supports keeping flow rules across restart.<br>
- FLOW_RULE_KEEP = auto()<br>
- #: Device supports keeping shared flow objects across restart.<br>
- FLOW_SHARED_OBJECT_KEEP = auto()<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- return TextParser.wrap(<br>
- TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),<br>
- cls,<br>
- )<br>
-<br>
-<br>
-class DeviceErrorHandlingMode(StrEnum):<br>
- """Enum representing the device error handling mode."""<br>
-<br>
- #:<br>
- none = auto()<br>
- #:<br>
- passive = auto()<br>
- #:<br>
- proactive = auto()<br>
- #:<br>
- unknown = auto()<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this enum from text.<br>
- """<br>
- return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)<br>
-<br>
-<br>
-def make_device_private_info_parser() -> ParserFn:<br>
- """Device private information parser.<br>
-<br>
- Ensures that we are not parsing invalid device private info output.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser<br>
- function that parses the device private info from the TestPmd port info output.<br>
- """<br>
-<br>
- def _validate(info: str):<br>
- info = info.strip()<br>
- if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):<br>
- return None<br>
- return info<br>
-<br>
- return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)<br>
-<br>
-<br>
-class RxQueueState(StrEnum):<br>
- """RX queue states.<br>
-<br>
- References:<br>
- DPDK lib: ``lib/ethdev/rte_ethdev.h``<br>
- testpmd display function: ``app/test-pmd/config.c:get_queue_state_name()``<br>
- """<br>
-<br>
- #:<br>
- stopped = auto()<br>
- #:<br>
- started = auto()<br>
- #:<br>
- hairpin = auto()<br>
- #:<br>
- unknown = auto()<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this enum from text.<br>
- """<br>
- return TextParser.wrap(TextParser.find(r"Rx queue state: ([^\r\n]+)"), cls)<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdQueueInfo(TextParser):<br>
- """Dataclass representation of the common parts of the testpmd `show rxq/txq info` commands."""<br>
-<br>
- #:<br>
- prefetch_threshold: int = field(metadata=TextParser.find_int(r"prefetch threshold: (\d+)"))<br>
- #:<br>
- host_threshold: int = field(metadata=TextParser.find_int(r"host threshold: (\d+)"))<br>
- #:<br>
- writeback_threshold: int = field(metadata=TextParser.find_int(r"writeback threshold: (\d+)"))<br>
- #:<br>
- free_threshold: int = field(metadata=TextParser.find_int(r"free threshold: (\d+)"))<br>
- #:<br>
- deferred_start: bool = field(metadata=TextParser.find("deferred start: on"))<br>
- #: The number of RXD/TXDs is just the ring size of the queue.<br>
- ring_size: int = field(metadata=TextParser.find_int(r"Number of (?:RXDs|TXDs): (\d+)"))<br>
- #:<br>
- is_queue_started: bool = field(metadata=TextParser.find("queue state: started"))<br>
- #:<br>
- burst_mode: str | None = field(<br>
- default=None, metadata=TextParser.find(r"Burst mode: ([^\r\n]+)")<br>
- )<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdTxqInfo(TestPmdQueueInfo):<br>
- """Representation of testpmd's ``show txq info <port_id> <queue_id>`` command.<br>
-<br>
- References:<br>
- testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``<br>
- testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``<br>
- """<br>
-<br>
- #: Ring size threshold<br>
- rs_threshold: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"TX RS threshold: (\d+)\b")<br>
- )<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdRxqInfo(TestPmdQueueInfo):<br>
- """Representation of testpmd's ``show rxq info <port_id> <queue_id>`` command.<br>
-<br>
- References:<br>
- testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``<br>
- testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``<br>
- """<br>
-<br>
- #: Mempool used by that queue<br>
- mempool: str | None = field(default=None, metadata=TextParser.find(r"Mempool: ([^\r\n]+)"))<br>
- #: Drop packets if no descriptors are available<br>
- drop_packets: bool | None = field(<br>
- default=None, metadata=TextParser.find(r"RX drop packets: on")<br>
- )<br>
- #: Scattered packets Rx enabled<br>
- scattered_packets: bool | None = field(<br>
- default=None, metadata=TextParser.find(r"RX scattered packets: on")<br>
- )<br>
- #: The state of the queue<br>
- queue_state: str | None = field(default=None, metadata=RxQueueState.make_parser())<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdPort(TextParser):<br>
- """Dataclass representing the result of testpmd's ``show port info`` command."""<br>
-<br>
- #:<br>
- id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))<br>
- #:<br>
- device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))<br>
- #:<br>
- driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))<br>
- #:<br>
- socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))<br>
- #:<br>
- is_link_up: bool = field(metadata=TextParser.find("Link status: up"))<br>
- #:<br>
- link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))<br>
- #:<br>
- is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))<br>
- #:<br>
- is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))<br>
- #:<br>
- is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))<br>
- #:<br>
- is_allmulticast_mode_enabled: bool = field(<br>
- metadata=TextParser.find("Allmulticast mode: enabled")<br>
- )<br>
- #: Maximum number of MAC addresses<br>
- max_mac_addresses_num: int = field(<br>
- metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")<br>
- )<br>
- #: Maximum configurable length of RX packet<br>
- max_hash_mac_addresses_num: int = field(<br>
- metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")<br>
- )<br>
- #: Minimum size of RX buffer<br>
- min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))<br>
- #: Maximum configurable length of RX packet<br>
- max_rx_packet_length: int = field(<br>
- metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")<br>
- )<br>
- #: Maximum configurable size of LRO aggregated packet<br>
- max_lro_packet_size: int = field(<br>
- metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")<br>
- )<br>
-<br>
- #: Current number of RX queues<br>
- rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))<br>
- #: Max possible RX queues<br>
- max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))<br>
- #: Max possible number of RXDs per queue<br>
- max_queue_rxd_num: int = field(<br>
- metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")<br>
- )<br>
- #: Min possible number of RXDs per queue<br>
- min_queue_rxd_num: int = field(<br>
- metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")<br>
- )<br>
- #: RXDs number alignment<br>
- rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))<br>
-<br>
- #: Current number of TX queues<br>
- tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))<br>
- #: Max possible TX queues<br>
- max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))<br>
- #: Max possible number of TXDs per queue<br>
- max_queue_txd_num: int = field(<br>
- metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")<br>
- )<br>
- #: Min possible number of TXDs per queue<br>
- min_queue_txd_num: int = field(<br>
- metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")<br>
- )<br>
- #: TXDs number alignment<br>
- txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))<br>
- #: Max segment number per packet<br>
- max_packet_segment_num: int = field(<br>
- metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")<br>
- )<br>
- #: Max segment number per MTU/TSO<br>
- max_mtu_segment_num: int = field(<br>
- metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")<br>
- )<br>
-<br>
- #:<br>
- device_capabilities: DeviceCapabilitiesFlag = field(<br>
- metadata=DeviceCapabilitiesFlag.make_parser(),<br>
- )<br>
- #:<br>
- device_error_handling_mode: DeviceErrorHandlingMode | None = field(<br>
- default=None, metadata=DeviceErrorHandlingMode.make_parser()<br>
- )<br>
- #:<br>
- device_private_info: str | None = field(<br>
- default=None,<br>
- metadata=make_device_private_info_parser(),<br>
- )<br>
-<br>
- #:<br>
- hash_key_size: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")<br>
- )<br>
- #:<br>
- redirection_table_size: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")<br>
- )<br>
- #:<br>
- supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(<br>
- default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()<br>
- )<br>
-<br>
- #:<br>
- mac_address: str | None = field(<br>
- default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")<br>
- )<br>
- #:<br>
- fw_version: str | None = field(<br>
- default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")<br>
- )<br>
- #:<br>
- dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))<br>
- #: Socket id of the memory allocation<br>
- mem_alloc_socket_id: int | None = field(<br>
- default=None,<br>
- metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),<br>
- )<br>
- #:<br>
- mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))<br>
-<br>
- #:<br>
- vlan_offload: VLANOffloadFlag | None = field(<br>
- default=None,<br>
- metadata=VLANOffloadFlag.make_parser(),<br>
- )<br>
-<br>
- #: Maximum size of RX buffer<br>
- max_rx_bufsize: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")<br>
- )<br>
- #: Maximum number of VFs<br>
- max_vfs_num: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")<br>
- )<br>
- #: Maximum number of VMDq pools<br>
- max_vmdq_pools_num: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")<br>
- )<br>
-<br>
- #:<br>
- switch_name: str | None = field(<br>
- default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")<br>
- )<br>
- #:<br>
- switch_domain_id: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")<br>
- )<br>
- #:<br>
- switch_port_id: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")<br>
- )<br>
- #:<br>
- switch_rx_domain: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")<br>
- )<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdPortStats(TextParser):<br>
- """Port statistics."""<br>
-<br>
- #:<br>
- port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))<br>
-<br>
- #:<br>
- rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))<br>
- #:<br>
- rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))<br>
- #:<br>
- rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))<br>
- #:<br>
- rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))<br>
- #:<br>
- rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))<br>
-<br>
- #:<br>
- tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))<br>
- #:<br>
- tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))<br>
- #:<br>
- tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))<br>
-<br>
- #:<br>
- rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))<br>
- #:<br>
- rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))<br>
-<br>
- #:<br>
- tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))<br>
- #:<br>
- tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))<br>
-<br>
-<br>
-@dataclass(kw_only=True)<br>
-class FlowRule:<br>
- """Class representation of flow rule parameters.<br>
-<br>
- This class represents the parameters of any flow rule as per the<br>
- following pattern:<br>
-<br>
- [group {group_id}] [priority {level}] [ingress] [egress]<br>
- [user_id {user_id}] pattern {item} [/ {item} [...]] / end<br>
- actions {action} [/ {action} [...]] / end<br>
- """<br>
-<br>
- #:<br>
- group_id: int | None = None<br>
- #:<br>
- priority_level: int | None = None<br>
- #:<br>
- direction: Literal["ingress", "egress"]<br>
- #:<br>
- user_id: int | None = None<br>
- #:<br>
- pattern: list[str]<br>
- #:<br>
- actions: list[str]<br>
-<br>
- def __str__(self) -> str:<br>
- """Returns the string representation of this instance."""<br>
- ret = ""<br>
- pattern = " / ".join(self.pattern)<br>
- action = " / ".join(self.actions)<br>
- if self.group_id is not None:<br>
- ret += f"group {self.group_id} "<br>
- if self.priority_level is not None:<br>
- ret += f"priority {self.priority_level} "<br>
- ret += f"{self.direction} "<br>
- if self.user_id is not None:<br>
- ret += f"user_id {self.user_id} "<br>
- ret += f"pattern {pattern} / end "<br>
- ret += f"actions {action} / end"<br>
- return ret<br>
-<br>
-<br>
-class PacketOffloadFlag(Flag):<br>
- """Flag representing the Packet Offload Features Flags in DPDK.<br>
-<br>
- Values in this class are taken from the definitions in the RTE MBUF core library in DPDK<br>
- located in ``lib/mbuf/rte_mbuf_core.h``. It is expected that flag values in this class will<br>
- match the values they are set to in said DPDK library with one exception; all values must be<br>
- unique. For example, the definitions for unknown checksum flags in ``rte_mbuf_core.h`` are all<br>
- set to :data:`0`, but it is valuable to distinguish between them in this framework. For this<br>
- reason flags that are not unique in the DPDK library are set either to values within the<br>
- RTE_MBUF_F_FIRST_FREE-RTE_MBUF_F_LAST_FREE range for Rx or shifted 61+ bits for Tx.<br>
-<br>
- References:<br>
- DPDK lib: ``lib/mbuf/rte_mbuf_core.h``<br>
- """<br>
-<br>
- # RX flags<br>
-<br>
- #: The RX packet is a 802.1q VLAN packet, and the tci has been saved in mbuf->vlan_tci. If the<br>
- #: flag RTE_MBUF_F_RX_VLAN_STRIPPED is also present, the VLAN header has been stripped from<br>
- #: mbuf data, else it is still present.<br>
- RTE_MBUF_F_RX_VLAN = auto()<br>
-<br>
- #: RX packet with RSS hash result.<br>
- RTE_MBUF_F_RX_RSS_HASH = auto()<br>
-<br>
- #: RX packet with FDIR match indicate.<br>
- RTE_MBUF_F_RX_FDIR = auto()<br>
-<br>
- #: This flag is set when the outermost IP header checksum is detected as wrong by the hardware.<br>
- RTE_MBUF_F_RX_OUTER_IP_CKSUM_BAD = 1 << 5<br>
-<br>
- #: A vlan has been stripped by the hardware and its tci is saved in mbuf->vlan_tci. This can<br>
- #: only happen if vlan stripping is enabled in the RX configuration of the PMD. When<br>
- #: RTE_MBUF_F_RX_VLAN_STRIPPED is set, RTE_MBUF_F_RX_VLAN must also be set.<br>
- RTE_MBUF_F_RX_VLAN_STRIPPED = auto()<br>
-<br>
- #: No information about the RX IP checksum. Value is 0 in the DPDK library.<br>
- RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN = 1 << 23<br>
- #: The IP checksum in the packet is wrong.<br>
- RTE_MBUF_F_RX_IP_CKSUM_BAD = 1 << 4<br>
- #: The IP checksum in the packet is valid.<br>
- RTE_MBUF_F_RX_IP_CKSUM_GOOD = 1 << 7<br>
- #: The IP checksum is not correct in the packet data, but the integrity of the IP header is<br>
- #: verified. Value is RTE_MBUF_F_RX_IP_CKSUM_BAD | RTE_MBUF_F_RX_IP_CKSUM_GOOD in the DPDK<br>
- #: library.<br>
- RTE_MBUF_F_RX_IP_CKSUM_NONE = 1 << 24<br>
-<br>
- #: No information about the RX L4 checksum. Value is 0 in the DPDK library.<br>
- RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN = 1 << 25<br>
- #: The L4 checksum in the packet is wrong.<br>
- RTE_MBUF_F_RX_L4_CKSUM_BAD = 1 << 3<br>
- #: The L4 checksum in the packet is valid.<br>
- RTE_MBUF_F_RX_L4_CKSUM_GOOD = 1 << 8<br>
- #: The L4 checksum is not correct in the packet data, but the integrity of the L4 data is<br>
- #: verified. Value is RTE_MBUF_F_RX_L4_CKSUM_BAD | RTE_MBUF_F_RX_L4_CKSUM_GOOD in the DPDK<br>
- #: library.<br>
- RTE_MBUF_F_RX_L4_CKSUM_NONE = 1 << 26<br>
-<br>
- #: RX IEEE1588 L2 Ethernet PT Packet.<br>
- RTE_MBUF_F_RX_IEEE1588_PTP = 1 << 9<br>
- #: RX IEEE1588 L2/L4 timestamped packet.<br>
- RTE_MBUF_F_RX_IEEE1588_TMST = 1 << 10<br>
-<br>
- #: FD id reported if FDIR match.<br>
- RTE_MBUF_F_RX_FDIR_ID = 1 << 13<br>
- #: Flexible bytes reported if FDIR match.<br>
- RTE_MBUF_F_RX_FDIR_FLX = 1 << 14<br>
-<br>
- #: If both RTE_MBUF_F_RX_QINQ_STRIPPED and RTE_MBUF_F_RX_VLAN_STRIPPED are set, the 2 VLANs<br>
- #: have been stripped by the hardware. If RTE_MBUF_F_RX_QINQ_STRIPPED is set and<br>
- #: RTE_MBUF_F_RX_VLAN_STRIPPED is unset, only the outer VLAN is removed from packet data.<br>
- RTE_MBUF_F_RX_QINQ_STRIPPED = auto()<br>
-<br>
- #: When packets are coalesced by a hardware or virtual driver, this flag can be set in the RX<br>
- #: mbuf, meaning that the m->tso_segsz field is valid and is set to the segment size of<br>
- #: original packets.<br>
- RTE_MBUF_F_RX_LRO = auto()<br>
-<br>
- #: Indicate that security offload processing was applied on the RX packet.<br>
- RTE_MBUF_F_RX_SEC_OFFLOAD = 1 << 18<br>
- #: Indicate that security offload processing failed on the RX packet.<br>
- RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED = auto()<br>
-<br>
- #: The RX packet is a double VLAN. If this flag is set, RTE_MBUF_F_RX_VLAN must also be set. If<br>
- #: the flag RTE_MBUF_F_RX_QINQ_STRIPPED is also present, both VLANs headers have been stripped<br>
- #: from mbuf data, else they are still present.<br>
- RTE_MBUF_F_RX_QINQ = auto()<br>
-<br>
- #: No info about the outer RX L4 checksum. Value is 0 in the DPDK library.<br>
- RTE_MBUF_F_RX_OUTER_L4_CKSUM_UNKNOWN = 1 << 27<br>
- #: The outer L4 checksum in the packet is wrong<br>
- RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD = 1 << 21<br>
- #: The outer L4 checksum in the packet is valid<br>
- RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD = 1 << 22<br>
- #: Invalid outer L4 checksum state. Value is<br>
- #: RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD | RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD in the DPDK library.<br>
- RTE_MBUF_F_RX_OUTER_L4_CKSUM_INVALID = 1 << 28<br>
-<br>
- # TX flags<br>
-<br>
- #: Outer UDP checksum offload flag. This flag is used for enabling outer UDP checksum in PMD.<br>
- #: To use outer UDP checksum, the user either needs to enable the following in mbuf:<br>
- #:<br>
- #: a) Fill outer_l2_len and outer_l3_len in mbuf.<br>
- #: b) Set the RTE_MBUF_F_TX_OUTER_UDP_CKSUM flag.<br>
- #: c) Set the RTE_MBUF_F_TX_OUTER_IPV4 or RTE_MBUF_F_TX_OUTER_IPV6 flag.<br>
- #:<br>
- #: Or configure RTE_ETH_TX_OFFLOAD_OUTER_UDP_CKSUM offload flag.<br>
- RTE_MBUF_F_TX_OUTER_UDP_CKSUM = 1 << 41<br>
-<br>
- #: UDP Fragmentation Offload flag. This flag is used for enabling UDP fragmentation in SW or in<br>
- #: HW.<br>
- RTE_MBUF_F_TX_UDP_SEG = auto()<br>
-<br>
- #: Request security offload processing on the TX packet. To use Tx security offload, the user<br>
- #: needs to fill l2_len in mbuf indicating L2 header size and where L3 header starts.<br>
- #: Similarly, l3_len should also be filled along with ol_flags reflecting current L3 type.<br>
- RTE_MBUF_F_TX_SEC_OFFLOAD = auto()<br>
-<br>
- #: Offload the MACsec. This flag must be set by the application to enable this offload feature<br>
- #: for a packet to be transmitted.<br>
- RTE_MBUF_F_TX_MACSEC = auto()<br>
-<br>
- # Bits 45:48 are used for the tunnel type in ``lib/mbuf/rte_mbuf_core.h``, but some are modified<br>
- # in this Flag to maintain uniqueness. The tunnel type must be specified for TSO or checksum on<br>
- # the inner part of tunnel packets. These flags can be used with RTE_MBUF_F_TX_TCP_SEG for TSO,<br>
- # or RTE_MBUF_F_TX_xxx_CKSUM. The mbuf fields for inner and outer header lengths are required:<br>
- # outer_l2_len, outer_l3_len, l2_len, l3_len, l4_len and tso_segsz for TSO.<br>
-<br>
- #:<br>
- RTE_MBUF_F_TX_TUNNEL_VXLAN = 1 << 45<br>
- #:<br>
- RTE_MBUF_F_TX_TUNNEL_GRE = 1 << 46<br>
- #: Value is 3 << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_IPIP = 1 << 61<br>
- #:<br>
- RTE_MBUF_F_TX_TUNNEL_GENEVE = 1 << 47<br>
- #: TX packet with MPLS-in-UDP RFC 7510 header. Value is 5 << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_MPLSINUDP = 1 << 62<br>
- #: Value is 6 << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_VXLAN_GPE = 1 << 63<br>
- #: Value is 7 << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_GTP = 1 << 64<br>
- #:<br>
- RTE_MBUF_F_TX_TUNNEL_ESP = 1 << 48<br>
- #: Generic IP encapsulated tunnel type, used for TSO and checksum offload. This can be used for<br>
- #: tunnels which are not standards or listed above. It is preferred to use specific tunnel<br>
- #: flags like RTE_MBUF_F_TX_TUNNEL_GRE or RTE_MBUF_F_TX_TUNNEL_IPIP if possible. The ethdev<br>
- #: must be configured with RTE_ETH_TX_OFFLOAD_IP_TNL_TSO. Outer and inner checksums are done<br>
- #: according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel headers that<br>
- #: contain payload length, sequence id or checksum are not expected to be updated. Value is<br>
- #: 0xD << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_IP = 1 << 65<br>
- #: Generic UDP encapsulated tunnel type, used for TSO and checksum offload. UDP tunnel type<br>
- #: implies outer IP layer. It can be used for tunnels which are not standards or listed above.<br>
- #: It is preferred to use specific tunnel flags like RTE_MBUF_F_TX_TUNNEL_VXLAN if possible.<br>
- #: The ethdev must be configured with RTE_ETH_TX_OFFLOAD_UDP_TNL_TSO. Outer and inner checksums<br>
- #: are done according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel<br>
- #: headers that contain payload length, sequence id or checksum are not expected to be updated.<br>
- #: value is 0xE << 45 in the DPDK library.<br>
- RTE_MBUF_F_TX_TUNNEL_UDP = 1 << 66<br>
-<br>
- #: Double VLAN insertion (QinQ) request to driver, driver may offload the insertion based on<br>
- #: device capability. Mbuf 'vlan_tci' & 'vlan_tci_outer' must be valid when this flag is set.<br>
- RTE_MBUF_F_TX_QINQ = 1 << 49<br>
-<br>
- #: TCP segmentation offload. To enable this offload feature for a packet to be transmitted on<br>
- #: hardware supporting TSO:<br>
- #:<br>
- #: - set the RTE_MBUF_F_TX_TCP_SEG flag in mbuf->ol_flags (this flag implies<br>
- #: RTE_MBUF_F_TX_TCP_CKSUM)<br>
- #: - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6<br>
- #: * if it's IPv4, set the RTE_MBUF_F_TX_IP_CKSUM flag<br>
- #: - fill the mbuf offload information: l2_len, l3_len, l4_len, tso_segsz<br>
- RTE_MBUF_F_TX_TCP_SEG = auto()<br>
-<br>
- #: TX IEEE1588 packet to timestamp.<br>
- RTE_MBUF_F_TX_IEEE1588_TMST = auto()<br>
-<br>
- # Bits 52+53 used for L4 packet type with checksum enabled in ``lib/mbuf/rte_mbuf_core.h`` but<br>
- # some values must be modified in this framework to maintain uniqueness. To use hardware<br>
- # L4 checksum offload, the user needs to:<br>
- #<br>
- # - fill l2_len and l3_len in mbuf<br>
- # - set the flags RTE_MBUF_F_TX_TCP_CKSUM, RTE_MBUF_F_TX_SCTP_CKSUM or<br>
- # RTE_MBUF_F_TX_UDP_CKSUM<br>
- # - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6<br>
-<br>
- #: Disable L4 cksum of TX pkt. Value is 0 in the DPDK library.<br>
- RTE_MBUF_F_TX_L4_NO_CKSUM = 1 << 67<br>
- #: TCP cksum of TX pkt. Computed by NIC.<br>
- RTE_MBUF_F_TX_TCP_CKSUM = 1 << 52<br>
- #: SCTP cksum of TX pkt. Computed by NIC.<br>
- RTE_MBUF_F_TX_SCTP_CKSUM = 1 << 53<br>
- #: UDP cksum of TX pkt. Computed by NIC. Value is 3 << 52 in the DPDK library.<br>
- RTE_MBUF_F_TX_UDP_CKSUM = 1 << 68<br>
-<br>
- #: Offload the IP checksum in the hardware. The flag RTE_MBUF_F_TX_IPV4 should also be set by<br>
- #: the application, although a PMD will only check RTE_MBUF_F_TX_IP_CKSUM.<br>
- RTE_MBUF_F_TX_IP_CKSUM = 1 << 54<br>
-<br>
- #: Packet is IPv4. This flag must be set when using any offload feature (TSO, L3 or L4<br>
- #: checksum) to tell the NIC that the packet is an IPv4 packet. If the packet is a tunneled<br>
- #: packet, this flag is related to the inner headers.<br>
- RTE_MBUF_F_TX_IPV4 = auto()<br>
- #: Packet is IPv6. This flag must be set when using an offload feature (TSO or L4 checksum) to<br>
- #: tell the NIC that the packet is an IPv6 packet. If the packet is a tunneled packet, this<br>
- #: flag is related to the inner headers.<br>
- RTE_MBUF_F_TX_IPV6 = auto()<br>
- #: VLAN tag insertion request to driver, driver may offload the insertion based on the device<br>
- #: capability. mbuf 'vlan_tci' field must be valid when this flag is set.<br>
- RTE_MBUF_F_TX_VLAN = auto()<br>
-<br>
- #: Offload the IP checksum of an external header in the hardware. The flag<br>
- #: RTE_MBUF_F_TX_OUTER_IPV4 should also be set by the application, although a PMD will only<br>
- #: check RTE_MBUF_F_TX_OUTER_IP_CKSUM.<br>
- RTE_MBUF_F_TX_OUTER_IP_CKSUM = auto()<br>
- #: Packet outer header is IPv4. This flag must be set when using any outer offload feature (L3<br>
- #: or L4 checksum) to tell the NIC that the outer header of the tunneled packet is an IPv4<br>
- #: packet.<br>
- RTE_MBUF_F_TX_OUTER_IPV4 = auto()<br>
- #: Packet outer header is IPv6. This flag must be set when using any outer offload feature (L4<br>
- #: checksum) to tell the NIC that the outer header of the tunneled packet is an IPv6 packet.<br>
- RTE_MBUF_F_TX_OUTER_IPV6 = auto()<br>
-<br>
- @classmethod<br>
- def from_list_string(cls, names: str) -> Self:<br>
- """Makes a flag from a whitespace-separated list of names.<br>
-<br>
- Args:<br>
- names: a whitespace-separated list containing the members of this flag.<br>
-<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- flag = cls(0)<br>
- for name in names.split():<br>
- flag |= cls.from_str(name)<br>
- return flag<br>
-<br>
- @classmethod<br>
- def from_str(cls, name: str) -> Self:<br>
- """Makes a flag matching the supplied name.<br>
-<br>
- Args:<br>
- name: a valid member of this flag in text<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- member_name = name.strip().replace("-", "_")<br>
- return cls[member_name]<br>
-<br>
- @classmethod<br>
- def make_parser(cls) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- return TextParser.wrap(<br>
- TextParser.find(r"ol_flags: ([^\n]+)"),<br>
- cls.from_list_string,<br>
- )<br>
-<br>
-<br>
-class RtePTypes(Flag):<br>
- """Flag representing possible packet types in DPDK verbose output.<br>
-<br>
- Values in this class are derived from definitions in the RTE MBUF ptype library in DPDK located<br>
- in ``lib/mbuf/rte_mbuf_ptype.h``. Specifically, the names of values in this class should match<br>
- the possible return options from the functions ``rte_get_ptype_*_name`` in ``rte_mbuf_ptype.c``.<br>
-<br>
- References:<br>
- DPDK lib: ``lib/mbuf/rte_mbuf_ptype.h``<br>
- DPDK ptype name formatting functions: ``lib/mbuf/rte_mbuf_ptype.c:rte_get_ptype_*_name()``<br>
- """<br>
-<br>
- # L2<br>
- #: Ethernet packet type. This is used for outer packet for tunneling cases.<br>
- L2_ETHER = auto()<br>
- #: Ethernet packet type for time sync.<br>
- L2_ETHER_TIMESYNC = auto()<br>
- #: ARP (Address Resolution Protocol) packet type.<br>
- L2_ETHER_ARP = auto()<br>
- #: LLDP (Link Layer Discovery Protocol) packet type.<br>
- L2_ETHER_LLDP = auto()<br>
- #: NSH (Network Service Header) packet type.<br>
- L2_ETHER_NSH = auto()<br>
- #: VLAN packet type.<br>
- L2_ETHER_VLAN = auto()<br>
- #: QinQ packet type.<br>
- L2_ETHER_QINQ = auto()<br>
- #: PPPOE packet type.<br>
- L2_ETHER_PPPOE = auto()<br>
- #: FCoE packet type..<br>
- L2_ETHER_FCOE = auto()<br>
- #: MPLS packet type.<br>
- L2_ETHER_MPLS = auto()<br>
- #: No L2 packet information.<br>
- L2_UNKNOWN = auto()<br>
-<br>
- # L3<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
- #: cases, and does not contain any header option.<br>
- L3_IPV4 = auto()<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
- #: cases, and contains header options.<br>
- L3_IPV4_EXT = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
- #: cases, and does not contain any extension header.<br>
- L3_IPV6 = auto()<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling<br>
- #: cases, and may or maynot contain header options.<br>
- L3_IPV4_EXT_UNKNOWN = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
- #: cases, and contains extension headers.<br>
- L3_IPV6_EXT = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling<br>
- #: cases, and may or maynot contain extension headers.<br>
- L3_IPV6_EXT_UNKNOWN = auto()<br>
- #: No L3 packet information.<br>
- L3_UNKNOWN = auto()<br>
-<br>
- # L4<br>
- #: TCP (Transmission Control Protocol) packet type. This is used for outer packet for tunneling<br>
- #: cases.<br>
- L4_TCP = auto()<br>
- #: UDP (User Datagram Protocol) packet type. This is used for outer packet for tunneling cases.<br>
- L4_UDP = auto()<br>
- #: Fragmented IP (Internet Protocol) packet type. This is used for outer packet for tunneling<br>
- #: cases and refers to those packets of any IP types which can be recognized as fragmented. A<br>
- #: fragmented packet cannot be recognized as any other L4 types (RTE_PTYPE_L4_TCP,<br>
- #: RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP, RTE_PTYPE_L4_NONFRAG).<br>
- L4_FRAG = auto()<br>
- #: SCTP (Stream Control Transmission Protocol) packet type. This is used for outer packet for<br>
- #: tunneling cases.<br>
- L4_SCTP = auto()<br>
- #: ICMP (Internet Control Message Protocol) packet type. This is used for outer packet for<br>
- #: tunneling cases.<br>
- L4_ICMP = auto()<br>
- #: Non-fragmented IP (Internet Protocol) packet type. This is used for outer packet for<br>
- #: tunneling cases and refers to those packets of any IP types, that cannot be recognized as<br>
- #: any of the above L4 types (RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_FRAG,<br>
- #: RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP).<br>
- L4_NONFRAG = auto()<br>
- #: IGMP (Internet Group Management Protocol) packet type.<br>
- L4_IGMP = auto()<br>
- #: No L4 packet information.<br>
- L4_UNKNOWN = auto()<br>
-<br>
- # Tunnel<br>
- #: IP (Internet Protocol) in IP (Internet Protocol) tunneling packet type.<br>
- TUNNEL_IP = auto()<br>
- #: GRE (Generic Routing Encapsulation) tunneling packet type.<br>
- TUNNEL_GRE = auto()<br>
- #: VXLAN (Virtual eXtensible Local Area Network) tunneling packet type.<br>
- TUNNEL_VXLAN = auto()<br>
- #: NVGRE (Network Virtualization using Generic Routing Encapsulation) tunneling packet type.<br>
- TUNNEL_NVGRE = auto()<br>
- #: GENEVE (Generic Network Virtualization Encapsulation) tunneling packet type.<br>
- TUNNEL_GENEVE = auto()<br>
- #: Tunneling packet type of Teredo, VXLAN (Virtual eXtensible Local Area Network) or GRE<br>
- #: (Generic Routing Encapsulation) could be recognized as this packet type, if they can not be<br>
- #: recognized independently as of hardware capability.<br>
- TUNNEL_GRENAT = auto()<br>
- #: GTP-C (GPRS Tunnelling Protocol) control tunneling packet type.<br>
- TUNNEL_GTPC = auto()<br>
- #: GTP-U (GPRS Tunnelling Protocol) user data tunneling packet type.<br>
- TUNNEL_GTPU = auto()<br>
- #: ESP (IP Encapsulating Security Payload) tunneling packet type.<br>
- TUNNEL_ESP = auto()<br>
- #: L2TP (Layer 2 Tunneling Protocol) tunneling packet type.<br>
- TUNNEL_L2TP = auto()<br>
- #: VXLAN-GPE (VXLAN Generic Protocol Extension) tunneling packet type.<br>
- TUNNEL_VXLAN_GPE = auto()<br>
- #: MPLS-in-UDP tunneling packet type (RFC 7510).<br>
- TUNNEL_MPLS_IN_UDP = auto()<br>
- #: MPLS-in-GRE tunneling packet type (RFC 4023).<br>
- TUNNEL_MPLS_IN_GRE = auto()<br>
- #: No tunnel information found on the packet.<br>
- TUNNEL_UNKNOWN = auto()<br>
-<br>
- # Inner L2<br>
- #: Ethernet packet type. This is used for inner packet type only.<br>
- INNER_L2_ETHER = auto()<br>
- #: Ethernet packet type with VLAN (Virtual Local Area Network) tag.<br>
- INNER_L2_ETHER_VLAN = auto()<br>
- #: QinQ packet type.<br>
- INNER_L2_ETHER_QINQ = auto()<br>
- #: No inner L2 information found on the packet.<br>
- INNER_L2_UNKNOWN = auto()<br>
-<br>
- # Inner L3<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and does<br>
- #: not contain any header option.<br>
- INNER_L3_IPV4 = auto()<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and<br>
- #: contains header options.<br>
- INNER_L3_IPV4_EXT = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and does<br>
- #: not contain any extension header.<br>
- INNER_L3_IPV6 = auto()<br>
- #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and may or<br>
- #: may not contain header options.<br>
- INNER_L3_IPV4_EXT_UNKNOWN = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and<br>
- #: contains extension headers.<br>
- INNER_L3_IPV6_EXT = auto()<br>
- #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and may or<br>
- #: may not contain extension headers.<br>
- INNER_L3_IPV6_EXT_UNKNOWN = auto()<br>
- #: No inner L3 information found on the packet.<br>
- INNER_L3_UNKNOWN = auto()<br>
-<br>
- # Inner L4<br>
- #: TCP (Transmission Control Protocol) packet type. This is used for inner packet only.<br>
- INNER_L4_TCP = auto()<br>
- #: UDP (User Datagram Protocol) packet type. This is used for inner packet only.<br>
- INNER_L4_UDP = auto()<br>
- #: Fragmented IP (Internet Protocol) packet type. This is used for inner packet only, and may<br>
- #: or maynot have a layer 4 packet.<br>
- INNER_L4_FRAG = auto()<br>
- #: SCTP (Stream Control Transmission Protocol) packet type. This is used for inner packet only.<br>
- INNER_L4_SCTP = auto()<br>
- #: ICMP (Internet Control Message Protocol) packet type. This is used for inner packet only.<br>
- INNER_L4_ICMP = auto()<br>
- #: Non-fragmented IP (Internet Protocol) packet type. It is used for inner packet only, and may<br>
- #: or may not have other unknown layer 4 packet types.<br>
- INNER_L4_NONFRAG = auto()<br>
- #: No inner L4 information found on the packet.<br>
- INNER_L4_UNKNOWN = auto()<br>
-<br>
- @classmethod<br>
- def from_list_string(cls, names: str) -> Self:<br>
- """Makes a flag from a whitespace-separated list of names.<br>
-<br>
- Args:<br>
- names: a whitespace-separated list containing the members of this flag.<br>
-<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- flag = cls(0)<br>
- for name in names.split():<br>
- flag |= cls.from_str(name)<br>
- return flag<br>
-<br>
- @classmethod<br>
- def from_str(cls, name: str) -> Self:<br>
- """Makes a flag matching the supplied name.<br>
-<br>
- Args:<br>
- name: a valid member of this flag in text<br>
- Returns:<br>
- An instance of this flag.<br>
- """<br>
- member_name = name.strip().replace("-", "_")<br>
- return cls[member_name]<br>
-<br>
- @classmethod<br>
- def make_parser(cls, hw: bool) -> ParserFn:<br>
- """Makes a parser function.<br>
-<br>
- Args:<br>
- hw: Whether to make a parser for hardware ptypes or software ptypes. If :data:`True`,<br>
- hardware ptypes will be collected, otherwise software pytpes will.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- return TextParser.wrap(<br>
- TextParser.find(f"{'hw' if hw else 'sw'} ptype: ([^-]+)"),<br>
- cls.from_list_string,<br>
- )<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdVerbosePacket(TextParser):<br>
- """Packet information provided by verbose output in Testpmd.<br>
-<br>
- This dataclass expects that packet information be prepended with the starting line of packet<br>
- bursts. Specifically, the line that reads "port X/queue Y: sent/received Z packets".<br>
- """<br>
-<br>
- #: ID of the port that handled the packet.<br>
- port_id: int = field(metadata=TextParser.find_int(r"port (\d+)/queue \d+"))<br>
- #: ID of the queue that handled the packet.<br>
- queue_id: int = field(metadata=TextParser.find_int(r"port \d+/queue (\d+)"))<br>
- #: Whether the packet was received or sent by the queue/port.<br>
- was_received: bool = field(metadata=TextParser.find(r"received \d+ packets"))<br>
- #:<br>
- src_mac: str = field(metadata=TextParser.find(f"src=({REGEX_FOR_MAC_ADDRESS})"))<br>
- #:<br>
- dst_mac: str = field(metadata=TextParser.find(f"dst=({REGEX_FOR_MAC_ADDRESS})"))<br>
- #: Memory pool the packet was handled on.<br>
- pool: str = field(metadata=TextParser.find(r"pool=(\S+)"))<br>
- #: Packet type in hex.<br>
- p_type: int = field(metadata=TextParser.find_int(r"type=(0x[a-fA-F\d]+)"))<br>
- #:<br>
- length: int = field(metadata=TextParser.find_int(r"length=(\d+)"))<br>
- #: Number of segments in the packet.<br>
- nb_segs: int = field(metadata=TextParser.find_int(r"nb_segs=(\d+)"))<br>
- #: Hardware packet type.<br>
- hw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=True))<br>
- #: Software packet type.<br>
- sw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=False))<br>
- #:<br>
- l2_len: int = field(metadata=TextParser.find_int(r"l2_len=(\d+)"))<br>
- #:<br>
- ol_flags: PacketOffloadFlag = field(metadata=PacketOffloadFlag.make_parser())<br>
- #: RSS hash of the packet in hex.<br>
- rss_hash: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"RSS hash=(0x[a-fA-F\d]+)")<br>
- )<br>
- #: RSS queue that handled the packet in hex.<br>
- rss_queue: int | None = field(<br>
- default=None, metadata=TextParser.find_int(r"RSS queue=(0x[a-fA-F\d]+)")<br>
- )<br>
- #:<br>
- l3_len: int | None = field(default=None, metadata=TextParser.find_int(r"l3_len=(\d+)"))<br>
- #:<br>
- l4_len: int | None = field(default=None, metadata=TextParser.find_int(r"l4_len=(\d+)"))<br>
- #:<br>
- l4_dport: int | None = field(<br>
- default=None,<br>
- metadata=TextParser.find_int(r"Destination (?:TCP|UDP) port=(\d+)"),<br>
- )<br>
-<br>
-<br>
-class RxOffloadCapability(Flag):<br>
- """Rx offload capabilities of a device.<br>
-<br>
- The flags are taken from ``lib/ethdev/rte_ethdev.h``.<br>
- They're prefixed with ``RTE_ETH_RX_OFFLOAD`` in ``lib/ethdev/rte_ethdev.h``<br>
- instead of ``RX_OFFLOAD``, which is what testpmd changes the prefix to.<br>
- The values are not contiguous, so the correspondence is preserved<br>
- by specifying concrete values interspersed between auto() values.<br>
-<br>
- The ``RX_OFFLOAD`` prefix has been preserved so that the same flag names can be used<br>
- in :class:`NicCapability`. The prefix is needed in :class:`NicCapability` since there's<br>
- no other qualifier which would sufficiently distinguish it from other capabilities.<br>
-<br>
- References:<br>
- DPDK lib: ``lib/ethdev/rte_ethdev.h``<br>
- testpmd display function: ``app/test-pmd/cmdline.c:print_rx_offloads()``<br>
- """<br>
-<br>
- #:<br>
- RX_OFFLOAD_VLAN_STRIP = auto()<br>
- #: Device supports L3 checksum offload.<br>
- RX_OFFLOAD_IPV4_CKSUM = auto()<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_UDP_CKSUM = auto()<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_TCP_CKSUM = auto()<br>
- #: Device supports Large Receive Offload.<br>
- RX_OFFLOAD_TCP_LRO = auto()<br>
- #: Device supports QinQ (queue in queue) offload.<br>
- RX_OFFLOAD_QINQ_STRIP = auto()<br>
- #: Device supports inner packet L3 checksum.<br>
- RX_OFFLOAD_OUTER_IPV4_CKSUM = auto()<br>
- #: Device supports MACsec.<br>
- RX_OFFLOAD_MACSEC_STRIP = auto()<br>
- #: Device supports filtering of a VLAN Tag identifier.<br>
- RX_OFFLOAD_VLAN_FILTER = 1 << 9<br>
- #: Device supports VLAN offload.<br>
- RX_OFFLOAD_VLAN_EXTEND = auto()<br>
- #: Device supports receiving segmented mbufs.<br>
- RX_OFFLOAD_SCATTER = 1 << 13<br>
- #: Device supports Timestamp.<br>
- RX_OFFLOAD_TIMESTAMP = auto()<br>
- #: Device supports crypto processing while packet is received in NIC.<br>
- RX_OFFLOAD_SECURITY = auto()<br>
- #: Device supports CRC stripping.<br>
- RX_OFFLOAD_KEEP_CRC = auto()<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_SCTP_CKSUM = auto()<br>
- #: Device supports inner packet L4 checksum.<br>
- RX_OFFLOAD_OUTER_UDP_CKSUM = auto()<br>
- #: Device supports RSS hashing.<br>
- RX_OFFLOAD_RSS_HASH = auto()<br>
- #: Device supports<br>
- RX_OFFLOAD_BUFFER_SPLIT = auto()<br>
- #: Device supports all checksum capabilities.<br>
- RX_OFFLOAD_CHECKSUM = RX_OFFLOAD_IPV4_CKSUM | RX_OFFLOAD_UDP_CKSUM | RX_OFFLOAD_TCP_CKSUM<br>
- #: Device supports all VLAN capabilities.<br>
- RX_OFFLOAD_VLAN = (<br>
- RX_OFFLOAD_VLAN_STRIP<br>
- | RX_OFFLOAD_VLAN_FILTER<br>
- | RX_OFFLOAD_VLAN_EXTEND<br>
- | RX_OFFLOAD_QINQ_STRIP<br>
- )<br>
-<br>
- @classmethod<br>
- def from_string(cls, line: str) -> Self:<br>
- """Make an instance from a string containing the flag names separated with a space.<br>
-<br>
- Args:<br>
- line: The line to parse.<br>
-<br>
- Returns:<br>
- A new instance containing all found flags.<br>
- """<br>
- flag = cls(0)<br>
- for flag_name in line.split():<br>
- flag |= cls[f"RX_OFFLOAD_{flag_name}"]<br>
- return flag<br>
-<br>
- @classmethod<br>
- def make_parser(cls, per_port: bool) -> ParserFn:<br>
- """Make a parser function.<br>
-<br>
- Args:<br>
- per_port: If :data:`True`, will return capabilities per port. If :data:`False`,<br>
- will return capabilities per queue.<br>
-<br>
- Returns:<br>
- ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a<br>
- parser function that makes an instance of this flag from text.<br>
- """<br>
- granularity = "Port" if per_port else "Queue"<br>
- return TextParser.wrap(<br>
- TextParser.find(rf"Per {granularity}\s+:(.*)$", re.MULTILINE),<br>
- cls.from_string,<br>
- )<br>
-<br>
-<br>
-@dataclass<br>
-class RxOffloadCapabilities(TextParser):<br>
- """The result of testpmd's ``show port <port_id> rx_offload capabilities`` command.<br>
-<br>
- References:<br>
- testpmd command function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa()``<br>
- testpmd display function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa_parsed()``<br>
- """<br>
-<br>
- #:<br>
- port_id: int = field(<br>
- metadata=TextParser.find_int(r"Rx Offloading Capabilities of port (\d+) :")<br>
- )<br>
- #: Per-queue Rx offload capabilities.<br>
- per_queue: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(False))<br>
- #: Capabilities other than per-queue Rx offload capabilities.<br>
- per_port: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(True))<br>
-<br>
-<br>
-@dataclass<br>
-class TestPmdPortFlowCtrl(TextParser):<br>
- """Class representing a port's flow control parameters.<br>
-<br>
- The parameters can also be parsed from the output of ``show port <port_id> flow_ctrl``.<br>
- """<br>
-<br>
- #: Enable Reactive Extensions.<br>
- rx: bool = field(default=False, metadata=TextParser.find(r"Rx pause: on"))<br>
- #: Enable Transmit.<br>
- tx: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
- #: High threshold value to trigger XOFF.<br>
- high_water: int = field(<br>
- default=0, metadata=TextParser.find_int(r"High waterline: (0x[a-fA-F\d]+)")<br>
- )<br>
- #: Low threshold value to trigger XON.<br>
- low_water: int = field(<br>
- default=0, metadata=TextParser.find_int(r"Low waterline: (0x[a-fA-F\d]+)")<br>
- )<br>
- #: Pause quota in the Pause frame.<br>
- pause_time: int = field(default=0, metadata=TextParser.find_int(r"Pause time: (0x[a-fA-F\d]+)"))<br>
- #: Send XON frame.<br>
- send_xon: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
- #: Enable receiving MAC control frames.<br>
- mac_ctrl_frame_fwd: bool = field(default=False, metadata=TextParser.find(r"Tx pause: on"))<br>
- #: Change the auto-negotiation parameter.<br>
- autoneg: bool = field(default=False, metadata=TextParser.find(r"Autoneg: on"))<br>
-<br>
- def __str__(self) -> str:<br>
- """Returns the string representation of this instance."""<br>
- ret = (<br>
- f"rx {'on' if self.rx else 'off'} "<br>
- f"tx {'on' if self.tx else 'off'} "<br>
- f"{self.high_water} "<br>
- f"{self.low_water} "<br>
- f"{self.pause_time} "<br>
- f"{1 if self.send_xon else 0} "<br>
- f"mac_ctrl_frame_fwd {'on' if self.mac_ctrl_frame_fwd else 'off'} "<br>
- f"autoneg {'on' if self.autoneg else 'off'}"<br>
- )<br>
- return ret<br>
-<br>
-<br>
-def requires_stopped_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:<br>
- """Decorator for :class:`TestPmdShell` commands methods that require stopped ports.<br>
-<br>
- If the decorated method is called while the ports are started, then these are stopped before<br>
- continuing.<br>
-<br>
- Args:<br>
- func: The :class:`TestPmdShell` method to decorate.<br>
- """<br>
-<br>
- @functools.wraps(func)<br>
- def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):<br>
- if self.ports_started:<br>
- self._logger.debug("Ports need to be stopped to continue.")<br>
- self.stop_all_ports()<br>
-<br>
- return func(self, *args, **kwargs)<br>
-<br>
- return _wrapper<br>
-<br>
-<br>
-def requires_started_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:<br>
- """Decorator for :class:`TestPmdShell` commands methods that require started ports.<br>
-<br>
- If the decorated method is called while the ports are stopped, then these are started before<br>
- continuing.<br>
-<br>
- Args:<br>
- func: The :class:`TestPmdShell` method to decorate.<br>
- """<br>
-<br>
- @functools.wraps(func)<br>
- def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):<br>
- if not self.ports_started:<br>
- self._logger.debug("Ports need to be started to continue.")<br>
- self.start_all_ports()<br>
-<br>
- return func(self, *args, **kwargs)<br>
-<br>
- return _wrapper<br>
-<br>
-<br>
-def add_remove_mtu(mtu: int = 1500) -> Callable[[TestPmdShellMethod], TestPmdShellMethod]:<br>
- """Configure MTU to `mtu` on all ports, run the decorated function, then revert.<br>
-<br>
- Args:<br>
- mtu: The MTU to configure all ports on.<br>
-<br>
- Returns:<br>
- The method decorated with setting and reverting MTU.<br>
- """<br>
-<br>
- def decorator(func: TestPmdShellMethod) -> TestPmdShellMethod:<br>
- @functools.wraps(func)<br>
- def wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):<br>
- original_mtu = self.ports[0].mtu<br>
- self.set_port_mtu_all(mtu=mtu, verify=False)<br>
- retval = func(self, *args, **kwargs)<br>
- self.set_port_mtu_all(original_mtu if original_mtu else 1500, verify=False)<br>
- return retval<br>
-<br>
- return wrapper<br>
-<br>
- return decorator<br>
-<br>
-<br>
-class TestPmdShell(DPDKShell):<br>
- """Testpmd interactive shell.<br>
-<br>
- The testpmd shell users should never use<br>
- the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather<br>
- call specialized methods. If there isn't one that satisfies a need, it should be added.<br>
-<br>
- Attributes:<br>
- ports_started: Indicates whether the ports are started.<br>
- """<br>
-<br>
- _app_params: TestPmdParams<br>
- _ports: list[TestPmdPort] | None<br>
-<br>
- #: The testpmd's prompt.<br>
- _default_prompt: ClassVar[str] = "testpmd>"<br>
-<br>
- #: This forces the prompt to appear after sending a command.<br>
- _command_extra_chars: ClassVar[str] = "\n"<br>
-<br>
- ports_started: bool<br>
-<br>
- def __init__(<br>
- self,<br>
- name: str | None = None,<br>
- privileged: bool = True,<br>
- **app_params: Unpack[TestPmdParamsDict],<br>
- ) -> None:<br>
- """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs."""<br>
- if "port_topology" not in app_params and get_ctx().topology.type is TopologyType.one_link:<br>
- app_params["port_topology"] = PortTopology.loop<br>
- super().__init__(name, privileged, app_params=TestPmdParams(**app_params))<br>
- self.ports_started = not self._app_params.disable_device_start<br>
- self._ports = None<br>
-<br>
- @property<br>
- def path(self) -> PurePath:<br>
- """The path to the testpmd executable."""<br>
- return PurePath("app/dpdk-testpmd")<br>
-<br>
- @property<br>
- def ports(self) -> list[TestPmdPort]:<br>
- """The ports of the instance.<br>
-<br>
- This caches the ports returned by :meth:`show_port_info_all`.<br>
- To force an update of port information, execute :meth:`show_port_info_all` or<br>
- :meth:`show_port_info`.<br>
-<br>
- Returns: The list of known testpmd ports.<br>
- """<br>
- if self._ports is None:<br>
- return self.show_port_info_all()<br>
- return self._ports<br>
-<br>
- @requires_started_ports<br>
- def start(self, verify: bool = True) -> None:<br>
- """Start packet forwarding with the current configuration.<br>
-<br>
- Args:<br>
- verify: If :data:`True` , a second start command will be sent in an attempt to verify<br>
- packet forwarding started as expected.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to<br>
- start or ports fail to come up.<br>
- """<br>
- self.send_command("start")<br>
- if verify:<br>
- # If forwarding was already started, sending "start" again should tell us<br>
- start_cmd_output = self.send_command("start")<br>
- if "Packet forwarding already started" not in start_cmd_output:<br>
- self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")<br>
- raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")<br>
-<br>
- def stop(self, verify: bool = True) -> str:<br>
- """Stop packet forwarding.<br>
-<br>
- Args:<br>
- verify: If :data:`True` , the output of the stop command is scanned to verify that<br>
- forwarding was stopped successfully or not started. If neither is found, it is<br>
- considered an error.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop<br>
- forwarding results in an error.<br>
-<br>
- Returns:<br>
- Output gathered from the stop command and all other preceding logs in the buffer. This<br>
- output is most often used to view forwarding statistics that are displayed when this<br>
- command is sent as well as any verbose packet information that hasn't been consumed<br>
- prior to calling this method.<br>
- """<br>
- stop_cmd_output = self.send_command("stop")<br>
- if verify:<br>
- if (<br>
- "Done." not in stop_cmd_output<br>
- and "Packet forwarding not started" not in stop_cmd_output<br>
- ):<br>
- self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")<br>
- raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")<br>
- return stop_cmd_output<br>
-<br>
- def get_devices(self) -> list[TestPmdDevice]:<br>
- """Get a list of device names that are known to testpmd.<br>
-<br>
- Uses the device info listed in testpmd and then parses the output.<br>
-<br>
- Returns:<br>
- A list of devices.<br>
- """<br>
- dev_info: str = self.send_command("show device info all")<br>
- dev_list: list[TestPmdDevice] = []<br>
- for line in dev_info.split("\n"):<br>
- if "device name:" in line.lower():<br>
- dev_list.append(TestPmdDevice(line))<br>
- return dev_list<br>
-<br>
- def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:<br>
- """Wait until the link status on the given port is "up".<br>
-<br>
- Arguments:<br>
- port_id: Port to check the link status on.<br>
- timeout: Time to wait for the link to come up. The default value for this<br>
- argument may be modified using the :option:`--timeout` command-line argument<br>
- or the :envvar:`DTS_TIMEOUT` environment variable.<br>
-<br>
- Returns:<br>
- Whether the link came up in time or not.<br>
- """<br>
- time_to_stop = time.time() + timeout<br>
- port_info: str = ""<br>
- while time.time() < time_to_stop:<br>
- port_info = self.send_command(f"show port info {port_id}")<br>
- if "Link status: up" in port_info:<br>
- break<br>
- time.sleep(0.5)<br>
- else:<br>
- self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")<br>
- return "Link status: up" in port_info<br>
-<br>
- def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True):<br>
- """Set packet forwarding mode.<br>
-<br>
- Args:<br>
- mode: The forwarding mode to use.<br>
- verify: If :data:`True` the output of the command will be scanned in an attempt to<br>
- verify that the forwarding mode was set to `mode` properly.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode<br>
- fails to update.<br>
- """<br>
- set_fwd_output = self.send_command(f"set fwd {mode.value}")<br>
- if verify:<br>
- if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:<br>
- self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to set fwd mode to {mode.value}"<br>
- )<br>
-<br>
- def stop_all_ports(self, verify: bool = True) -> None:<br>
- """Stops all the ports.<br>
-<br>
- Args:<br>
- verify: If :data:`True`, the output of the command will be checked for a successful<br>
- execution.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not<br>
- stopped successfully.<br>
- """<br>
- self._logger.debug("Stopping all the ports...")<br>
- output = self.send_command("port stop all")<br>
- if verify and not output.strip().endswith("Done"):<br>
- raise InteractiveCommandExecutionError("Ports were not stopped successfully.")<br>
-<br>
- self.ports_started = False<br>
-<br>
- def start_all_ports(self, verify: bool = True) -> None:<br>
- """Starts all the ports.<br>
-<br>
- Args:<br>
- verify: If :data:`True`, the output of the command will be checked for a successful<br>
- execution.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not<br>
- started successfully.<br>
- """<br>
- self._logger.debug("Starting all the ports...")<br>
- output = self.send_command("port start all")<br>
- if verify and not output.strip().endswith("Done"):<br>
- raise InteractiveCommandExecutionError("Ports were not started successfully.")<br>
-<br>
- self.ports_started = True<br>
-<br>
- @requires_stopped_ports<br>
- def set_ports_queues(self, number_of: int) -> None:<br>
- """Sets the number of queues per port.<br>
-<br>
- Args:<br>
- number_of: The number of RX/TX queues to create per port.<br>
-<br>
- Raises:<br>
- InternalError: If `number_of` is invalid.<br>
- """<br>
- if number_of < 1:<br>
- raise InternalError("The number of queues must be positive and non-zero.")<br>
-<br>
- self.send_command(f"port config all rxq {number_of}")<br>
- self.send_command(f"port config all txq {number_of}")<br>
-<br>
- @requires_stopped_ports<br>
- def close_all_ports(self, verify: bool = True) -> None:<br>
- """Close all ports.<br>
-<br>
- Args:<br>
- verify: If :data:`True` the output of the close command will be scanned in an attempt<br>
- to verify that all ports were stopped successfully. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and at lease one port<br>
- failed to close.<br>
- """<br>
- port_close_output = self.send_command("port close all")<br>
- if verify:<br>
- num_ports = len(self.ports)<br>
- if not all(f"Port {p_id} is closed" in port_close_output for p_id in range(num_ports)):<br>
- raise InteractiveCommandExecutionError("Ports were not closed successfully.")<br>
-<br>
- def show_port_info_all(self) -> list[TestPmdPort]:<br>
- """Returns the information of all the ports.<br>
-<br>
- Returns:<br>
- list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.<br>
- """<br>
- output = self.send_command("show port info all")<br>
-<br>
- # Sample output of the "all" command looks like:<br>
- #<br>
- # <start><br>
- #<br>
- # ********************* Infos for port 0 *********************<br>
- # Key: value<br>
- #<br>
- # ********************* Infos for port 1 *********************<br>
- # Key: value<br>
- # <end><br>
- #<br>
- # Takes advantage of the double new line in between ports as end delimiter. But we need to<br>
- # artificially add a new line at the end to pick up the last port. Because commands are<br>
- # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.<br>
- # Therefore we also need to take the carriage return into account.<br>
- iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)<br>
- self._ports = [TestPmdPort.parse(block.group(0)) for block in iter]<br>
- return self._ports<br>
-<br>
- def show_port_info(self, port_id: int) -> TestPmdPort:<br>
- """Returns the given port information.<br>
-<br>
- Args:<br>
- port_id: The port ID to gather information for.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `port_id` is invalid.<br>
-<br>
- Returns:<br>
- TestPmdPort: An instance of `TestPmdPort` containing the given port's information.<br>
- """<br>
- output = self.send_command(f"show port info {port_id}", skip_first_line=True)<br>
- if output.startswith("Invalid port"):<br>
- raise InteractiveCommandExecutionError("invalid port given")<br>
-<br>
- port = TestPmdPort.parse(output)<br>
- self._update_port(port)<br>
- return port<br>
-<br>
- def _update_port(self, port: TestPmdPort) -> None:<br>
- if self._ports:<br>
- self._ports = [<br>
- existing_port if <a href="http://port.id" rel="noreferrer" target="_blank">port.id</a> != <a href="http://existing_port.id" rel="noreferrer" target="_blank">existing_port.id</a> else port<br>
- for existing_port in self._ports<br>
- ]<br>
-<br>
- def set_mac_addr(self, port_id: int, mac_address: str, add: bool, verify: bool = True) -> None:<br>
- """Add or remove a mac address on a given port's Allowlist.<br>
-<br>
- Args:<br>
- port_id: The port ID the mac address is set on.<br>
- mac_address: The mac address to be added to or removed from the specified port.<br>
- add: If :data:`True`, add the specified mac address. If :data:`False`, remove specified<br>
- mac address.<br>
- verify: If :data:'True', assert that the 'mac_addr' operation was successful. If<br>
- :data:'False', run the command and skip this assertion.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If the set mac address operation fails.<br>
- """<br>
- mac_cmd = "add" if add else "remove"<br>
- output = self.send_command(f"mac_addr {mac_cmd} {port_id} {mac_address}")<br>
- if "Bad arguments" in output:<br>
- self._logger.debug("Invalid argument provided to mac_addr")<br>
- raise InteractiveCommandExecutionError("Invalid argument provided")<br>
-<br>
- if verify:<br>
- if "mac_addr_cmd error:" in output:<br>
- self._logger.debug(f"Failed to {mac_cmd} {mac_address} on port {port_id}")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Failed to {mac_cmd} {mac_address} on port {port_id} \n{output}"<br>
- )<br>
-<br>
- def set_multicast_mac_addr(<br>
- self, port_id: int, multi_addr: str, add: bool, verify: bool = True<br>
- ) -> None:<br>
- """Add or remove multicast mac address to a specified port's allow list.<br>
-<br>
- Args:<br>
- port_id: The port ID the multicast address is set on.<br>
- multi_addr: The multicast address to be added or removed from the filter.<br>
- add: If :data:'True', add the specified multicast address to the port filter.<br>
- If :data:'False', remove the specified multicast address from the port filter.<br>
- verify: If :data:'True', assert that the 'mcast_addr' operations was successful.<br>
- If :data:'False', execute the 'mcast_addr' operation and skip the assertion.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If either the 'add' or 'remove' operations fails.<br>
- """<br>
- mcast_cmd = "add" if add else "remove"<br>
- output = self.send_command(f"mcast_addr {mcast_cmd} {port_id} {multi_addr}")<br>
- if "Bad arguments" in output:<br>
- self._logger.debug("Invalid arguments provided to mcast_addr")<br>
- raise InteractiveCommandExecutionError("Invalid argument provided")<br>
-<br>
- if verify:<br>
- if (<br>
- "Invalid multicast_addr" in output<br>
- or f"multicast address {'already' if add else 'not'} filtered by port" in output<br>
- ):<br>
- self._logger.debug(f"Failed to {mcast_cmd} {multi_addr} on port {port_id}")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Failed to {mcast_cmd} {multi_addr} on port {port_id} \n{output}"<br>
- )<br>
-<br>
- def show_port_stats_all(self) -> Tuple[list[TestPmdPortStats], str]:<br>
- """Returns the statistics of all the ports.<br>
-<br>
- Returns:<br>
- Tuple[str, list[TestPmdPortStats]]: A tuple where the first element is the stats of all<br>
- ports as `TestPmdPortStats` and second is the raw testpmd output that was collected<br>
- from the sent command.<br>
- """<br>
- output = self.send_command("show port stats all")<br>
-<br>
- # Sample output of the "all" command looks like:<br>
- #<br>
- # ########### NIC statistics for port 0 ###########<br>
- # values...<br>
- # #################################################<br>
- #<br>
- # ########### NIC statistics for port 1 ###########<br>
- # values...<br>
- # #################################################<br>
- #<br>
- iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*\r$", output, re.MULTILINE)<br>
- return ([TestPmdPortStats.parse(block.group(1)) for block in iter], output)<br>
-<br>
- def show_port_stats(self, port_id: int) -> TestPmdPortStats:<br>
- """Returns the given port statistics.<br>
-<br>
- Args:<br>
- port_id: The port ID to gather information for.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `port_id` is invalid.<br>
-<br>
- Returns:<br>
- TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.<br>
- """<br>
- output = self.send_command(f"show port stats {port_id}", skip_first_line=True)<br>
- if output.startswith("Invalid port"):<br>
- raise InteractiveCommandExecutionError("invalid port given")<br>
-<br>
- return TestPmdPortStats.parse(output)<br>
-<br>
- def set_multicast_all(self, on: bool, verify: bool = True) -> None:<br>
- """Turns multicast mode on/off for the specified port.<br>
-<br>
- Args:<br>
- on: If :data:`True`, turns multicast mode on, otherwise turns off.<br>
- verify: If :data:`True` an additional command will be sent to verify<br>
- that multicast mode is properly set. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and multicast<br>
- mode is not properly set.<br>
- """<br>
- multicast_cmd_output = self.send_command(f"set allmulti all {'on' if on else 'off'}")<br>
- if verify:<br>
- port_stats = self.show_port_info_all()<br>
- if on ^ all(stats.is_allmulticast_mode_enabled for stats in port_stats):<br>
- self._logger.debug(<br>
- f"Failed to set multicast mode on all ports.: \n{multicast_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- "Testpmd failed to set multicast mode on all ports."<br>
- )<br>
-<br>
- @requires_stopped_ports<br>
- def csum_set_hw(<br>
- self, layers: ChecksumOffloadOptions, port_id: int, verify: bool = True<br>
- ) -> None:<br>
- """Enables hardware checksum offloading on the specified layer.<br>
-<br>
- Args:<br>
- layers: The layer/layers that checksum offloading should be enabled on.<br>
- port_id: The port number to enable checksum offloading on, should be within 0-32.<br>
- verify: If :data:`True` the output of the command will be scanned in an attempt to<br>
- verify that checksum offloading was enabled on the port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If checksum offload is not enabled successfully.<br>
- """<br>
- for name, offload in ChecksumOffloadOptions.__members__.items():<br>
- if offload in layers:<br>
- name = name.replace("_", "-")<br>
- csum_output = self.send_command(f"csum set {name} hw {port_id}")<br>
- if verify:<br>
- if (<br>
- "Bad arguments" in csum_output<br>
- or f"Please stop port {port_id} first" in csum_output<br>
- or f"checksum offload is not supported by port {port_id}" in csum_output<br>
- ):<br>
- self._logger.debug(f"Csum set hw error:\n{csum_output}")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Failed to set csum hw {name} mode on port {port_id}"<br>
- )<br>
- success = False<br>
- if f"{name} checksum offload is hw" in csum_output.lower():<br>
- success = True<br>
- if not success and verify:<br>
- self._logger.debug(<br>
- f"Failed to set csum hw mode on port {port_id}:\n{csum_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"""Failed to set csum hw mode on port<br>
- {port_id}:\n{csum_output}"""<br>
- )<br>
-<br>
- def flow_create(self, flow_rule: FlowRule, port_id: int) -> int:<br>
- """Creates a flow rule in the testpmd session.<br>
-<br>
- This command is implicitly verified as needed to return the created flow rule id.<br>
-<br>
- Args:<br>
- flow_rule: :class:`FlowRule` object used for creating testpmd flow rule.<br>
- port_id: Integer representing the port to use.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If flow rule is invalid.<br>
-<br>
- Returns:<br>
- Id of created flow rule.<br>
- """<br>
- flow_output = self.send_command(f"flow create {port_id} {flow_rule}")<br>
- match = re.search(r"#(\d+)", flow_output)<br>
- if match is not None:<br>
- match_str = match.group(1)<br>
- flow_id = int(match_str)<br>
- return flow_id<br>
- else:<br>
- self._logger.debug(f"Failed to create flow rule:\n{flow_output}")<br>
- raise InteractiveCommandExecutionError(f"Failed to create flow rule:\n{flow_output}")<br>
-<br>
- def flow_validate(self, flow_rule: FlowRule, port_id: int) -> bool:<br>
- """Validates a flow rule in the testpmd session.<br>
-<br>
- Args:<br>
- flow_rule: :class:`FlowRule` object used for validating testpmd flow rule.<br>
- port_id: Integer representing the port to use.<br>
-<br>
- Returns:<br>
- Boolean representing whether rule is valid or not.<br>
- """<br>
- flow_output = self.send_command(f"flow validate {port_id} {flow_rule}")<br>
- if "Flow rule validated" in flow_output:<br>
- return True<br>
- return False<br>
-<br>
- def flow_delete(self, flow_id: int, port_id: int, verify: bool = True) -> None:<br>
- """Deletes the specified flow rule from the testpmd session.<br>
-<br>
- Args:<br>
- flow_id: ID of the flow to remove.<br>
- port_id: Integer representing the port to use.<br>
- verify: If :data:`True`, the output of the command is scanned<br>
- to ensure the flow rule was deleted successfully.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If flow rule is invalid.<br>
- """<br>
- flow_output = self.send_command(f"flow destroy {port_id} rule {flow_id}")<br>
- if verify:<br>
- if "destroyed" not in flow_output:<br>
- self._logger.debug(f"Failed to delete flow rule:\n{flow_output}")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Failed to delete flow rule:\n{flow_output}"<br>
- )<br>
-<br>
- @requires_started_ports<br>
- @requires_stopped_ports<br>
- def set_port_mtu(self, port_id: int, mtu: int, verify: bool = True) -> None:<br>
- """Change the MTU of a port using testpmd.<br>
-<br>
- Some PMDs require that the port be stopped before changing the MTU, and it does no harm to<br>
- stop the port before configuring in cases where it isn't required, so ports are stopped<br>
- prior to changing their MTU. On the other hand, some PMDs require that the port had already<br>
- been started once since testpmd startup. Therefore, ports are also started before stopping<br>
- them to ensure this has happened.<br>
-<br>
- Args:<br>
- port_id: ID of the port to adjust the MTU on.<br>
- mtu: Desired value for the MTU to be set to.<br>
- verify: If `verify` is :data:`True` then the output will be scanned in an attempt to<br>
- verify that the mtu was properly set on the port. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not<br>
- properly updated on the port matching `port_id`.<br>
- """<br>
- set_mtu_output = self.send_command(f"port config mtu {port_id} {mtu}")<br>
- if verify and (f"MTU: {mtu}" not in self.send_command(f"show port info {port_id}")):<br>
- self._logger.debug(<br>
- f"Failed to set mtu to {mtu} on port {port_id}. Output was:\n{set_mtu_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to update mtu of port {port_id} to {mtu}"<br>
- )<br>
-<br>
- def set_port_mtu_all(self, mtu: int, verify: bool = True) -> None:<br>
- """Change the MTU of all ports using testpmd.<br>
-<br>
- Runs :meth:`set_port_mtu` for every port that testpmd is aware of.<br>
-<br>
- Args:<br>
- mtu: Desired value for the MTU to be set to.<br>
- verify: Whether to verify that setting the MTU on each port was successful or not.<br>
- Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not<br>
- properly updated on at least one port.<br>
- """<br>
- for port in self.ports:<br>
- self.set_port_mtu(<a href="http://port.id" rel="noreferrer" target="_blank">port.id</a>, mtu, verify)<br>
-<br>
- @staticmethod<br>
- def extract_verbose_output(output: str) -> list[TestPmdVerbosePacket]:<br>
- """Extract the verbose information present in given testpmd output.<br>
-<br>
- This method extracts sections of verbose output that begin with the line<br>
- "port X/queue Y: sent/received Z packets" and end with the ol_flags of a packet.<br>
-<br>
- Args:<br>
- output: Testpmd output that contains verbose information<br>
-<br>
- Returns:<br>
- List of parsed packet information gathered from verbose information in `output`.<br>
- """<br>
- out: list[TestPmdVerbosePacket] = []<br>
- prev_header: str = ""<br>
- iter = re.finditer(<br>
- r"(?P<HEADER>(?:port \d+/queue \d+: (?:received|sent) \d+ packets)?)\s*"<br>
- r"(?P<PACKET>src=[\w\s=:-]+?ol_flags: [\w ]+)",<br>
- output,<br>
- )<br>
- for match in iter:<br>
- if match.group("HEADER"):<br>
- prev_header = match.group("HEADER")<br>
- out.append(TestPmdVerbosePacket.parse(f"{prev_header}\n{match.group('PACKET')}"))<br>
- return out<br>
-<br>
- @requires_stopped_ports<br>
- def set_vlan_filter(self, port: int, enable: bool, verify: bool = True) -> None:<br>
- """Set vlan filter on.<br>
-<br>
- Args:<br>
- port: The port number to enable VLAN filter on.<br>
- enable: Enable the filter on `port` if :data:`True`, otherwise disable it.<br>
- verify: If :data:`True`, the output of the command and show port info<br>
- is scanned to verify that vlan filtering was set successfully.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the filter<br>
- fails to update.<br>
- """<br>
- filter_cmd_output = self.send_command(f"vlan set filter {'on' if enable else 'off'} {port}")<br>
- if verify:<br>
- vlan_settings = self.show_port_info(port_id=port).vlan_offload<br>
- if enable ^ (vlan_settings is not None and VLANOffloadFlag.FILTER in vlan_settings):<br>
- self._logger.debug(<br>
- f"""Failed to {"enable" if enable else "disable"}<br>
- filter on port {port}: \n{filter_cmd_output}"""<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"""Failed to {"enable" if enable else "disable"}<br>
- filter on port {port}"""<br>
- )<br>
-<br>
- def set_mac_address(self, port: int, mac_address: str, verify: bool = True) -> None:<br>
- """Set port's MAC address.<br>
-<br>
- Args:<br>
- port: The number of the requested port.<br>
- mac_address: The MAC address to set.<br>
- verify: If :data:`True`, the output of the command is scanned to verify that<br>
- the mac address is set in the specified port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the command<br>
- fails to execute.<br>
- """<br>
- output = self.send_command(f"mac_addr set {port} {mac_address}", skip_first_line=True)<br>
- if verify:<br>
- if output.strip():<br>
- self._logger.debug(<br>
- f"Testpmd failed to set MAC address {mac_address} on port {port}."<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set MAC address {mac_address} on port {port}."<br>
- )<br>
-<br>
- def set_flow_control(<br>
- self, port: int, flow_ctrl: TestPmdPortFlowCtrl, verify: bool = True<br>
- ) -> None:<br>
- """Set the given `port`'s flow control.<br>
-<br>
- Args:<br>
- port: The number of the requested port.<br>
- flow_ctrl: The requested flow control parameters.<br>
- verify: If :data:`True`, the output of the command is scanned to verify that<br>
- the flow control in the specified port is set.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the command<br>
- fails to execute.<br>
- """<br>
- output = self.send_command(f"set flow_ctrl {flow_ctrl} {port}", skip_first_line=True)<br>
- if verify:<br>
- if output.strip():<br>
- self._logger.debug(f"Testpmd failed to set the {flow_ctrl} in port {port}.")<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set the {flow_ctrl} in port {port}."<br>
- )<br>
-<br>
- def show_port_flow_info(self, port: int) -> TestPmdPortFlowCtrl | None:<br>
- """Show port info flow.<br>
-<br>
- Args:<br>
- port: The number of the requested port.<br>
-<br>
- Returns:<br>
- The current port flow control parameters if supported, otherwise :data:`None`.<br>
- """<br>
- output = self.send_command(f"show port {port} flow_ctrl")<br>
- if "Flow control infos" in output:<br>
- return TestPmdPortFlowCtrl.parse(output)<br>
- return None<br>
-<br>
- @requires_stopped_ports<br>
- def rx_vlan(self, vlan: int, port: int, add: bool, verify: bool = True) -> None:<br>
- """Add specified vlan tag to the filter list on a port. Requires vlan filter to be on.<br>
-<br>
- Args:<br>
- vlan: The vlan tag to add, should be within 1-1005.<br>
- port: The port number to add the tag on.<br>
- add: Adds the tag if :data:`True`, otherwise removes the tag.<br>
- verify: If :data:`True`, the output of the command is scanned to verify that<br>
- the vlan tag was added to the filter list on the specified port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the tag<br>
- is not added.<br>
- """<br>
- rx_cmd_output = self.send_command(f"rx_vlan {'add' if add else 'rm'} {vlan} {port}")<br>
- if verify:<br>
- if (<br>
- "VLAN-filtering disabled" in rx_cmd_output<br>
- or "Invalid vlan_id" in rx_cmd_output<br>
- or "Bad arguments" in rx_cmd_output<br>
- ):<br>
- self._logger.debug(<br>
- f"""Failed to {"add" if add else "remove"} tag {vlan}<br>
- port {port}: \n{rx_cmd_output}"""<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to {'add' if add else 'remove'} tag {vlan} on port {port}."<br>
- )<br>
-<br>
- @requires_stopped_ports<br>
- def set_vlan_strip(self, port: int, enable: bool, verify: bool = True) -> None:<br>
- """Enable or disable vlan stripping on the specified port.<br>
-<br>
- Args:<br>
- port: The port number to use.<br>
- enable: If :data:`True`, will turn vlan stripping on, otherwise will turn off.<br>
- verify: If :data:`True`, the output of the command and show port info<br>
- is scanned to verify that vlan stripping was enabled on the specified port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and stripping<br>
- fails to update.<br>
- """<br>
- strip_cmd_output = self.send_command(f"vlan set strip {'on' if enable else 'off'} {port}")<br>
- if verify:<br>
- vlan_settings = self.show_port_info(port_id=port).vlan_offload<br>
- if enable ^ (vlan_settings is not None and VLANOffloadFlag.STRIP in vlan_settings):<br>
- self._logger.debug(<br>
- f"""Failed to set strip {"on" if enable else "off"}<br>
- port {port}: \n{strip_cmd_output}"""<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set strip {'on' if enable else 'off'} port {port}."<br>
- )<br>
-<br>
- @requires_stopped_ports<br>
- def tx_vlan_set(<br>
- self, port: int, enable: bool, vlan: int | None = None, verify: bool = True<br>
- ) -> None:<br>
- """Set hardware insertion of vlan tags in packets sent on a port.<br>
-<br>
- Args:<br>
- port: The port number to use.<br>
- enable: Sets vlan tag insertion if :data:`True`, and resets if :data:`False`.<br>
- vlan: The vlan tag to insert if enable is :data:`True`.<br>
- verify: If :data:`True`, the output of the command is scanned to verify that<br>
- vlan insertion was enabled on the specified port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the insertion<br>
- tag is not set.<br>
- """<br>
- if enable:<br>
- tx_vlan_cmd_output = self.send_command(f"tx_vlan set {port} {vlan}")<br>
- if verify:<br>
- if (<br>
- "Please stop port" in tx_vlan_cmd_output<br>
- or "Invalid vlan_id" in tx_vlan_cmd_output<br>
- or "Invalid port" in tx_vlan_cmd_output<br>
- ):<br>
- self._logger.debug(<br>
- f"Failed to set vlan tag {vlan} on port {port}:\n{tx_vlan_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set vlan insertion tag {vlan} on port {port}."<br>
- )<br>
- else:<br>
- tx_vlan_cmd_output = self.send_command(f"tx_vlan reset {port}")<br>
- if verify:<br>
- if "Please stop port" in tx_vlan_cmd_output or "Invalid port" in tx_vlan_cmd_output:<br>
- self._logger.debug(<br>
- f"Failed to reset vlan insertion on port {port}: \n{tx_vlan_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to reset vlan insertion on port {port}."<br>
- )<br>
-<br>
- def set_promisc(self, port: int, enable: bool, verify: bool = True) -> None:<br>
- """Enable or disable promiscuous mode for the specified port.<br>
-<br>
- Args:<br>
- port: Port number to use.<br>
- enable: If :data:`True`, turn promiscuous mode on, otherwise turn off.<br>
- verify: If :data:`True` an additional command will be sent to verify that<br>
- promiscuous mode is properly set. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and promiscuous mode<br>
- is not correctly set.<br>
- """<br>
- promisc_cmd_output = self.send_command(f"set promisc {port} {'on' if enable else 'off'}")<br>
- if verify:<br>
- stats = self.show_port_info(port_id=port)<br>
- if enable ^ stats.is_promiscuous_mode_enabled:<br>
- self._logger.debug(<br>
- f"Failed to set promiscuous mode on port {port}: \n{promisc_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set promiscuous mode on port {port}."<br>
- )<br>
-<br>
- def set_verbose(self, level: int, verify: bool = True) -> None:<br>
- """Set debug verbosity level.<br>
-<br>
- Args:<br>
- level: 0 - silent except for error<br>
- 1 - fully verbose except for Tx packets<br>
- 2 - fully verbose except for Rx packets<br>
- >2 - fully verbose<br>
- verify: If :data:`True` the command output will be scanned to verify that verbose level<br>
- is properly set. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and verbose level<br>
- is not correctly set.<br>
- """<br>
- verbose_cmd_output = self.send_command(f"set verbose {level}")<br>
- if verify:<br>
- if "Change verbose level" not in verbose_cmd_output:<br>
- self._logger.debug(<br>
- f"Failed to set verbose level to {level}: \n{verbose_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Testpmd failed to set verbose level to {level}."<br>
- )<br>
-<br>
- def rx_vxlan(self, vxlan_id: int, port_id: int, enable: bool, verify: bool = True) -> None:<br>
- """Add or remove vxlan id to/from filter list.<br>
-<br>
- Args:<br>
- vxlan_id: VXLAN ID to add to port filter list.<br>
- port_id: ID of the port to modify VXLAN filter of.<br>
- enable: If :data:`True`, adds specified VXLAN ID, otherwise removes it.<br>
- verify: If :data:`True`, the output of the command is checked to verify<br>
- the VXLAN ID was successfully added/removed from the port.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and VXLAN ID<br>
- is not successfully added or removed.<br>
- """<br>
- action = "add" if enable else "rm"<br>
- vxlan_output = self.send_command(f"rx_vxlan_port {action} {vxlan_id} {port_id}")<br>
- if verify:<br>
- if "udp tunneling add error" in vxlan_output:<br>
- self._logger.debug(f"Failed to set VXLAN:\n{vxlan_output}")<br>
- raise InteractiveCommandExecutionError(f"Failed to set VXLAN:\n{vxlan_output}")<br>
-<br>
- def clear_port_stats(self, port_id: int, verify: bool = True) -> None:<br>
- """Clear statistics of a given port.<br>
-<br>
- Args:<br>
- port_id: ID of the port to clear the statistics on.<br>
- verify: If :data:`True` the output of the command will be scanned to verify that it was<br>
- successful, otherwise failures will be ignored. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and testpmd fails to<br>
- clear the statistics of the given port.<br>
- """<br>
- clear_output = self.send_command(f"clear port stats {port_id}")<br>
- if verify and f"NIC statistics for port {port_id} cleared" not in clear_output:<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to set clear forwarding stats on port {port_id}"<br>
- )<br>
-<br>
- def clear_port_stats_all(self, verify: bool = True) -> None:<br>
- """Clear the statistics of all ports that testpmd is aware of.<br>
-<br>
- Args:<br>
- verify: If :data:`True` the output of the command will be scanned to verify that all<br>
- ports had their statistics cleared, otherwise failures will be ignored. Defaults to<br>
- :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and testpmd fails to<br>
- clear the statistics of any of its ports.<br>
- """<br>
- clear_output = self.send_command("clear port stats all")<br>
- if verify:<br>
- if type(self._app_params.port_numa_config) is list:<br>
- for port_id in range(len(self._app_params.port_numa_config)):<br>
- if f"NIC statistics for port {port_id} cleared" not in clear_output:<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to set clear forwarding stats on port {port_id}"<br>
- )<br>
-<br>
- @only_active<br>
- def close(self) -> None:<br>
- """Overrides :meth:`~.interactive_shell.close`."""<br>
- self.stop()<br>
- self.send_command("quit", "Bye...")<br>
- return super().close()<br>
-<br>
- """<br>
- ====== Capability retrieval methods ======<br>
- """<br>
-<br>
- def get_capabilities_rx_offload(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Get all rx offload capabilities and divide them into supported and unsupported.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- self._logger.debug("Getting rx offload capabilities.")<br>
- command = f"show port {self.ports[0].id} rx_offload capabilities"<br>
- rx_offload_capabilities_out = self.send_command(command)<br>
- rx_offload_capabilities = RxOffloadCapabilities.parse(rx_offload_capabilities_out)<br>
- self._update_capabilities_from_flag(<br>
- supported_capabilities,<br>
- unsupported_capabilities,<br>
- RxOffloadCapability,<br>
- rx_offload_capabilities.per_port | rx_offload_capabilities.per_queue,<br>
- )<br>
-<br>
- def get_port_queue_info(<br>
- self, port_id: int, queue_id: int, is_rx_queue: bool<br>
- ) -> TestPmdQueueInfo:<br>
- """Returns the current state of the specified queue."""<br>
- command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"<br>
- queue_info = TestPmdQueueInfo.parse(self.send_command(command))<br>
- return queue_info<br>
-<br>
- def setup_port_queue(self, port_id: int, queue_id: int, is_rx_queue: bool) -> None:<br>
- """Setup a given queue on a port.<br>
-<br>
- This functionality cannot be verified because the setup action only takes effect when the<br>
- queue is started.<br>
-<br>
- Args:<br>
- port_id: ID of the port where the queue resides.<br>
- queue_id: ID of the queue to setup.<br>
- is_rx_queue: Type of queue to setup. If :data:`True` an RX queue will be setup,<br>
- otherwise a TX queue will be setup.<br>
- """<br>
- self.send_command(f"port {port_id} {'rxq' if is_rx_queue else 'txq'} {queue_id} setup")<br>
-<br>
- def stop_port_queue(<br>
- self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True<br>
- ) -> None:<br>
- """Stops a given queue on a port.<br>
-<br>
- Args:<br>
- port_id: ID of the port that the queue belongs to.<br>
- queue_id: ID of the queue to stop.<br>
- is_rx_queue: Type of queue to stop. If :data:`True` an RX queue will be stopped,<br>
- otherwise a TX queue will be stopped.<br>
- verify: If :data:`True` an additional command will be sent to verify the queue stopped.<br>
- Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to<br>
- stop.<br>
- """<br>
- port_type = "rxq" if is_rx_queue else "txq"<br>
- stop_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} stop")<br>
- if verify:<br>
- queue_started = self.get_port_queue_info(<br>
- port_id, queue_id, is_rx_queue<br>
- ).is_queue_started<br>
- if queue_started:<br>
- self._logger.debug(<br>
- f"Failed to stop {port_type} {queue_id} on port {port_id}:\n{stop_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to stop {port_type} {queue_id} on port {port_id}"<br>
- )<br>
-<br>
- def start_port_queue(<br>
- self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True<br>
- ) -> None:<br>
- """Starts a given queue on a port.<br>
-<br>
- First sets up the port queue, then starts it.<br>
-<br>
- Args:<br>
- port_id: ID of the port that the queue belongs to.<br>
- queue_id: ID of the queue to start.<br>
- is_rx_queue: Type of queue to start. If :data:`True` an RX queue will be started,<br>
- otherwise a TX queue will be started.<br>
- verify: if :data:`True` an additional command will be sent to verify that the queue was<br>
- started. Defaults to :data:`True`.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to<br>
- start.<br>
- """<br>
- port_type = "rxq" if is_rx_queue else "txq"<br>
- self.setup_port_queue(port_id, queue_id, is_rx_queue)<br>
- start_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} start")<br>
- if verify:<br>
- queue_started = self.get_port_queue_info(<br>
- port_id, queue_id, is_rx_queue<br>
- ).is_queue_started<br>
- if not queue_started:<br>
- self._logger.debug(<br>
- f"Failed to start {port_type} {queue_id} on port {port_id}:\n{start_cmd_output}"<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Test pmd failed to start {port_type} {queue_id} on port {port_id}"<br>
- )<br>
-<br>
- def get_queue_ring_size(self, port_id: int, queue_id: int, is_rx_queue: bool) -> int:<br>
- """Returns the current size of the ring on the specified queue."""<br>
- command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"<br>
- queue_info = TestPmdQueueInfo.parse(self.send_command(command))<br>
- return queue_info.ring_size<br>
-<br>
- def set_queue_ring_size(<br>
- self,<br>
- port_id: int,<br>
- queue_id: int,<br>
- size: int,<br>
- is_rx_queue: bool,<br>
- verify: bool = True,<br>
- ) -> None:<br>
- """Update the ring size of an Rx/Tx queue on a given port.<br>
-<br>
- Queue is setup after setting the ring size so that the queue info reflects this change and<br>
- it can be verified.<br>
-<br>
- Args:<br>
- port_id: The port that the queue resides on.<br>
- queue_id: The ID of the queue on the port.<br>
- size: The size to update the ring size to.<br>
- is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be<br>
- updated, otherwise a TX queue will be updated.<br>
- verify: If :data:`True` an additional command will be sent to check the ring size of<br>
- the queue in an attempt to validate that the size was changes properly.<br>
-<br>
- Raises:<br>
- InteractiveCommandExecutionError: If `verify` is :data:`True` and there is a failure<br>
- when updating ring size.<br>
- """<br>
- queue_type = "rxq" if is_rx_queue else "txq"<br>
- self.send_command(f"port config {port_id} {queue_type} {queue_id} ring_size {size}")<br>
- self.setup_port_queue(port_id, queue_id, is_rx_queue)<br>
- if verify:<br>
- curr_ring_size = self.get_queue_ring_size(port_id, queue_id, is_rx_queue)<br>
- if curr_ring_size != size:<br>
- self._logger.debug(<br>
- f"Failed up update ring size of queue {queue_id} on port {port_id}. Current"<br>
- f" ring size is {curr_ring_size}."<br>
- )<br>
- raise InteractiveCommandExecutionError(<br>
- f"Failed to update ring size of queue {queue_id} on port {port_id}"<br>
- )<br>
-<br>
- @requires_stopped_ports<br>
- def set_queue_deferred_start(<br>
- self, port_id: int, queue_id: int, is_rx_queue: bool, on: bool<br>
- ) -> None:<br>
- """Set the deferred start attribute of the specified queue on/off.<br>
-<br>
- Args:<br>
- port_id: The port that the queue resides on.<br>
- queue_id: The ID of the queue on the port.<br>
- is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be<br>
- updated, otherwise a TX queue will be updated.<br>
- on: Whether to set deferred start mode on or off. If :data:`True` deferred start will<br>
- be turned on, otherwise it will be turned off.<br>
- """<br>
- queue_type = "rxq" if is_rx_queue else "txq"<br>
- action = "on" if on else "off"<br>
- self.send_command(f"port {port_id} {queue_type} {queue_id} deferred_start {action}")<br>
-<br>
- def _update_capabilities_from_flag(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- flag_class: type[Flag],<br>
- supported_flags: Flag,<br>
- ) -> None:<br>
- """Divide all flags from `flag_class` into supported and unsupported."""<br>
- for flag in flag_class:<br>
- if flag in supported_flags:<br>
- supported_capabilities.add(NicCapability[str(<a href="http://flag.name" rel="noreferrer" target="_blank">flag.name</a>)])<br>
- else:<br>
- unsupported_capabilities.add(NicCapability[str(<a href="http://flag.name" rel="noreferrer" target="_blank">flag.name</a>)])<br>
-<br>
- @requires_started_ports<br>
- def get_capabilities_rxq_info(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Get all rxq capabilities and divide them into supported and unsupported.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- self._logger.debug("Getting rxq capabilities.")<br>
- command = f"show rxq info {self.ports[0].id} 0"<br>
- rxq_info = TestPmdRxqInfo.parse(self.send_command(command))<br>
- if rxq_info.scattered_packets:<br>
- supported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)<br>
- else:<br>
- unsupported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)<br>
-<br>
- def get_capabilities_show_port_info(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Get all capabilities from show port info and divide them into supported and unsupported.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- self._update_capabilities_from_flag(<br>
- supported_capabilities,<br>
- unsupported_capabilities,<br>
- DeviceCapabilitiesFlag,<br>
- self.ports[0].device_capabilities,<br>
- )<br>
-<br>
- def get_capabilities_mcast_filtering(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Get multicast filtering capability from mcast_addr add and check for testpmd error code.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- self._logger.debug("Getting mcast filter capabilities.")<br>
- command = f"mcast_addr add {self.ports[0].id} 01:00:5E:00:00:00"<br>
- output = self.send_command(command)<br>
- if "diag=-95" in output:<br>
- unsupported_capabilities.add(NicCapability.MCAST_FILTERING)<br>
- else:<br>
- supported_capabilities.add(NicCapability.MCAST_FILTERING)<br>
- command = str.replace(command, "add", "remove", 1)<br>
- self.send_command(command)<br>
-<br>
- def get_capabilities_flow_ctrl(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Get flow control capability and check for testpmd failure.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- self._logger.debug("Getting flow ctrl capabilities.")<br>
- command = f"show port {self.ports[0].id} flow_ctrl"<br>
- output = self.send_command(command)<br>
- if "Flow control infos" in output:<br>
- supported_capabilities.add(NicCapability.FLOW_CTRL)<br>
- else:<br>
- unsupported_capabilities.add(NicCapability.FLOW_CTRL)<br>
-<br>
- def get_capabilities_physical_function(<br>
- self,<br>
- supported_capabilities: MutableSet["NicCapability"],<br>
- unsupported_capabilities: MutableSet["NicCapability"],<br>
- ) -> None:<br>
- """Store capability representing a physical function test run.<br>
-<br>
- Args:<br>
- supported_capabilities: Supported capabilities will be added to this set.<br>
- unsupported_capabilities: Unsupported capabilities will be added to this set.<br>
- """<br>
- ctx = get_ctx()<br>
- if ctx.topology.vf_ports == []:<br>
- supported_capabilities.add(NicCapability.PHYSICAL_FUNCTION)<br>
- else:<br>
- unsupported_capabilities.add(NicCapability.PHYSICAL_FUNCTION)<br>
-<br>
-<br>
-class NicCapability(NoAliasEnum):<br>
- """A mapping between capability names and the associated :class:`TestPmdShell` methods.<br>
-<br>
- The :class:`TestPmdShell` capability checking method executes the command that checks<br>
- whether the capability is supported.<br>
- A decorator may optionally be added to the method that will add and remove configuration<br>
- that's necessary to retrieve the capability support status.<br>
- The Enum members may be assigned the method itself or a tuple of<br>
- (capability_checking_method, decorator_function).<br>
-<br>
- The signature of each :class:`TestPmdShell` capability checking method must be::<br>
-<br>
- fn(self, supported_capabilities: MutableSet, unsupported_capabilities: MutableSet) -> None<br>
-<br>
- The capability checking method must get whether a capability is supported or not<br>
- from a testpmd command. If multiple capabilities can be obtained from a testpmd command,<br>
- each should be obtained in the method. These capabilities should then<br>
- be added to `supported_capabilities` or `unsupported_capabilities` based on their support.<br>
-<br>
- The two dictionaries are shared across all capability discovery function calls in a given<br>
- test run so that we don't call the same function multiple times. For example, when we find<br>
- :attr:`SCATTERED_RX_ENABLED` in :meth:`TestPmdShell.get_capabilities_rxq_info`,<br>
- we don't go looking for it again if a different test case also needs it.<br>
- """<br>
-<br>
- #: Scattered packets Rx enabled<br>
- SCATTERED_RX_ENABLED: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rxq_info,<br>
- add_remove_mtu(9000),<br>
- )<br>
- #:<br>
- RX_OFFLOAD_VLAN_STRIP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports L3 checksum offload.<br>
- RX_OFFLOAD_IPV4_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_UDP_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_TCP_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports Large Receive Offload.<br>
- RX_OFFLOAD_TCP_LRO: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_rx_offload, None)<br>
- #: Device supports QinQ (queue in queue) offload.<br>
- RX_OFFLOAD_QINQ_STRIP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports inner packet L3 checksum.<br>
- RX_OFFLOAD_OUTER_IPV4_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports MACsec.<br>
- RX_OFFLOAD_MACSEC_STRIP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports filtering of a VLAN Tag identifier.<br>
- RX_OFFLOAD_VLAN_FILTER: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports VLAN offload.<br>
- RX_OFFLOAD_VLAN_EXTEND: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports receiving segmented mbufs.<br>
- RX_OFFLOAD_SCATTER: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_rx_offload, None)<br>
- #: Device supports Timestamp.<br>
- RX_OFFLOAD_TIMESTAMP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports crypto processing while packet is received in NIC.<br>
- RX_OFFLOAD_SECURITY: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports CRC stripping.<br>
- RX_OFFLOAD_KEEP_CRC: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports L4 checksum offload.<br>
- RX_OFFLOAD_SCTP_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports inner packet L4 checksum.<br>
- RX_OFFLOAD_OUTER_UDP_CKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports RSS hashing.<br>
- RX_OFFLOAD_RSS_HASH: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports scatter Rx packets to segmented mbufs.<br>
- RX_OFFLOAD_BUFFER_SPLIT: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports all checksum capabilities.<br>
- RX_OFFLOAD_CHECKSUM: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_rx_offload,<br>
- None,<br>
- )<br>
- #: Device supports all VLAN capabilities.<br>
- RX_OFFLOAD_VLAN: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_rx_offload, None)<br>
- #: Device supports Rx queue setup after device started.<br>
- RUNTIME_RX_QUEUE_SETUP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_show_port_info,<br>
- None,<br>
- )<br>
- #: Device supports Tx queue setup after device started.<br>
- RUNTIME_TX_QUEUE_SETUP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_show_port_info,<br>
- None,<br>
- )<br>
- #: Device supports shared Rx queue among ports within Rx domain and switch domain.<br>
- RXQ_SHARE: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_show_port_info, None)<br>
- #: Device supports keeping flow rules across restart.<br>
- FLOW_RULE_KEEP: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_show_port_info, None)<br>
- #: Device supports keeping shared flow objects across restart.<br>
- FLOW_SHARED_OBJECT_KEEP: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_show_port_info,<br>
- None,<br>
- )<br>
- #: Device supports multicast address filtering.<br>
- MCAST_FILTERING: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_mcast_filtering,<br>
- None,<br>
- )<br>
- #: Device supports flow ctrl.<br>
- FLOW_CTRL: TestPmdShellNicCapability = (TestPmdShell.get_capabilities_flow_ctrl, None)<br>
- #: Device is running on a physical function.<br>
- PHYSICAL_FUNCTION: TestPmdShellNicCapability = (<br>
- TestPmdShell.get_capabilities_physical_function,<br>
- None,<br>
- )<br>
-<br>
- def __call__(<br>
- self,<br>
- testpmd_shell: TestPmdShell,<br>
- supported_capabilities: MutableSet[Self],<br>
- unsupported_capabilities: MutableSet[Self],<br>
- ) -> None:<br>
- """Execute the associated capability retrieval function.<br>
-<br>
- Args:<br>
- testpmd_shell: :class:`TestPmdShell` object to which the function will be bound to.<br>
- supported_capabilities: The dictionary storing the supported capabilities<br>
- of a given test run.<br>
- unsupported_capabilities: The dictionary storing the unsupported capabilities<br>
- of a given test run.<br>
- """<br>
- self.value(testpmd_shell, supported_capabilities, unsupported_capabilities)<br>
diff --git a/dts/framework/testbed_model/capability.py b/dts/framework/testbed_model/capability.py<br>
index f895b22bb3..a4a8d9b7b4 100644<br>
--- a/dts/framework/testbed_model/capability.py<br>
+++ b/dts/framework/testbed_model/capability.py<br>
@@ -26,9 +26,9 @@<br>
.. code:: python<br>
<br>
from framework.test_suite import TestSuite, func_test<br>
- from framework.testbed_model.capability import TopologyType, requires<br>
+ from framework.testbed_model.capability import LinkTopology, requires<br>
# The whole test suite (each test case within) doesn't require any links.<br>
- @requires(topology_type=TopologyType.no_link)<br>
+ @requires_link_topology(LinkTopology.NO_LINK)<br>
@func_test<br>
class TestHelloWorld(TestSuite):<br>
def hello_world_single_core(self):<br>
@@ -41,7 +41,7 @@ def hello_world_single_core(self):<br>
class TestPmdBufferScatter(TestSuite):<br>
# only the test case requires the SCATTERED_RX_ENABLED capability<br>
# other test cases may not require it<br>
- @requires(NicCapability.SCATTERED_RX_ENABLED)<br>
+ @requires_nic_capability(NicCapability.SCATTERED_RX_ENABLED)<br>
@func_test<br>
def test_scatter_mbuf_2048(self):<br>
"""<br>
@@ -50,27 +50,38 @@ def test_scatter_mbuf_2048(self):<br>
from abc import ABC, abstractmethod<br>
from collections.abc import MutableSet<br>
from dataclasses import dataclass<br>
-from typing import TYPE_CHECKING, Callable, ClassVar, Protocol<br>
+from typing import (<br>
+ TYPE_CHECKING,<br>
+ Any,<br>
+ Callable,<br>
+ ClassVar,<br>
+ Concatenate,<br>
+ ParamSpec,<br>
+ Protocol,<br>
+ TypeAlias,<br>
+)<br>
<br>
from typing_extensions import Self<br>
<br>
+from api.capabilities import LinkTopology, NicCapability<br>
from framework.exception import ConfigurationError, InternalError, SkippedTestException<br>
from framework.logger import get_dts_logger<br>
-from framework.remote_session.testpmd_shell import (<br>
- NicCapability,<br>
- TestPmdShell,<br>
- TestPmdShellCapabilityMethod,<br>
- TestPmdShellDecorator,<br>
- TestPmdShellMethod,<br>
-)<br>
from framework.testbed_model.node import Node<br>
from framework.testbed_model.port import DriverKind<br>
-<br>
-from .topology import Topology, TopologyType<br>
+from framework.testbed_model.topology import Topology<br>
<br>
if TYPE_CHECKING:<br>
+ from api.testpmd import TestPmd<br>
from framework.test_suite import TestCase<br>
<br>
+P = ParamSpec("P")<br>
+TestPmdMethod = Callable[Concatenate["TestPmd", P], Any]<br>
+TestPmdCapabilityMethod: TypeAlias = Callable[<br>
+ ["TestPmd", MutableSet["NicCapability"], MutableSet["NicCapability"]], None<br>
+]<br>
+TestPmdDecorator: TypeAlias = Callable[[TestPmdMethod], TestPmdMethod]<br>
+TestPmdNicCapability = tuple[TestPmdCapabilityMethod, TestPmdDecorator | None]<br>
+<br>
<br>
class Capability(ABC):<br>
"""The base class for various capabilities.<br>
@@ -153,7 +164,7 @@ def __hash__(self) -> int:<br>
<br>
@dataclass<br>
class DecoratedNicCapability(Capability):<br>
- """A wrapper around :class:`~framework.remote_session.testpmd_shell.NicCapability`.<br>
+ """A wrapper around :class:`~api.testpmd.NicCapability`.<br>
<br>
New instances should be created with the :meth:`create_unique` class method to ensure<br>
there are no duplicate instances.<br>
@@ -166,10 +177,69 @@ class DecoratedNicCapability(Capability):<br>
"""<br>
<br>
nic_capability: NicCapability<br>
- capability_fn: TestPmdShellCapabilityMethod<br>
- capability_decorator: TestPmdShellDecorator | None<br>
+ capability_fn: TestPmdCapabilityMethod<br>
+ capability_decorator: TestPmdDecorator | None<br>
_unique_capabilities: ClassVar[dict[NicCapability, Self]] = {}<br>
<br>
+ @classmethod<br>
+ def _get_nic_capability_check(cls) -> list[TestPmdNicCapability]:<br>
+ """A mapping between capability names and the associated :class:`TestPmd` methods.<br>
+<br>
+ The :class:`TestPmd` capability checking method executes the command that checks<br>
+ whether the capability is supported.<br>
+ A decorator may optionally be added to the method that will add and remove configuration<br>
+ that's necessary to retrieve the capability support status.<br>
+ The Enum members may be assigned the method itself or a tuple of<br>
+ (capability_checking_method, decorator_function).<br>
+<br>
+ The signature of each :class:`TestPmd` capability checking method must be::<br>
+<br>
+ fn(self, supported_capabilities: MutableSet, unsupported_capabilities: MutableSet)<br>
+<br>
+ The capability checking method must get whether a capability is supported or not<br>
+ from a testpmd command. If multiple capabilities can be obtained from a testpmd command,<br>
+ each should be obtained in the method. These capabilities should then<br>
+ be added to `supported_capabilities` or `unsupported_capabilities` based on their support.<br>
+<br>
+ The two dictionaries are shared across all capability discovery function calls in a given<br>
+ test run so that we don't call the same function multiple times. For example, when we find<br>
+ :attr:`SCATTERED_RX_ENABLED` in :meth:`TestPmd.get_capabilities_rxq_info`,<br>
+ we don't go looking for it again if a different test case also needs it.<br>
+ """<br>
+ from api.testpmd import TestPmd, _add_remove_mtu<br>
+<br>
+ return [<br>
+ (TestPmd.get_capabilities_rxq_info, _add_remove_mtu(9000)),<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_VLAN_STRIP<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_IPV4_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_UDP_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_TCP_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_TCP_LRO<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_QINQ_STRIP<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_OUTER_IPV4_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_MACSEC_STRIP<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_VLAN_FILTER<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_VLAN_EXTEND<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_SCATTER<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_TIMESTAMP<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_SECURITY<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_KEEP_CRC<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_SCTP_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_OUTER_UDP_CKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_RSS_HASH<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_BUFFER_SPLIT<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_CHECKSUM<br>
+ (TestPmd.get_capabilities_rx_offload, None), # RX_OFFLOAD_VLAN<br>
+ (TestPmd.get_capabilities_show_port_info, None), # RUNTIME_RX_QUEUE_SETUP<br>
+ (TestPmd.get_capabilities_show_port_info, None), # RUNTIME_TX_QUEUE_SETUP<br>
+ (TestPmd.get_capabilities_show_port_info, None), # RXQ_SHARE<br>
+ (TestPmd.get_capabilities_show_port_info, None), # FLOW_RULE_KEEP<br>
+ (TestPmd.get_capabilities_show_port_info, None), # FLOW_SHARED_OBJECT_KEEP<br>
+ (TestPmd.get_capabilities_mcast_filtering, None), # MCAST_FILTERING<br>
+ (TestPmd.get_capabilities_flow_ctrl, None), # FLOW_CTRL<br>
+ (TestPmd.get_capabilities_physical_function, None), # PHYSICAL_FUNCTION<br>
+ ]<br>
+<br>
@classmethod<br>
def get_unique(cls, nic_capability: NicCapability) -> Self:<br>
"""Get the capability uniquely identified by `nic_capability`.<br>
@@ -188,7 +258,7 @@ def get_unique(cls, nic_capability: NicCapability) -> Self:<br>
Returns:<br>
The capability uniquely identified by `nic_capability`.<br>
"""<br>
- capability_fn, decorator_fn = nic_capability.value<br>
+ capability_fn, decorator_fn = cls._get_nic_capability_check()[nic_capability.value]<br>
<br>
if nic_capability not in cls._unique_capabilities:<br>
cls._unique_capabilities[nic_capability] = cls(<br>
@@ -207,9 +277,11 @@ def get_supported_capabilities(<br>
Each capability is first checked whether it's supported/unsupported<br>
before executing its `capability_fn` so that each capability is retrieved only once.<br>
"""<br>
+ from api.testpmd import TestPmd<br>
+<br>
supported_conditional_capabilities: set["DecoratedNicCapability"] = set()<br>
logger = get_dts_logger(f"{<a href="http://node.name" rel="noreferrer" target="_blank">node.name</a>}.{cls.__name__}")<br>
- if topology.type is topology.type.no_link:<br>
+ if topology.type is topology.type.NO_LINK:<br>
logger.debug(<br>
"No links available in the current topology, not getting NIC capabilities."<br>
)<br>
@@ -219,7 +291,7 @@ def get_supported_capabilities(<br>
)<br>
if cls.capabilities_to_check:<br>
capabilities_to_check_map = cls._get_decorated_capabilities_map()<br>
- with TestPmdShell() as testpmd_shell:<br>
+ with TestPmd() as testpmd:<br>
for (<br>
conditional_capability_fn,<br>
capabilities,<br>
@@ -231,7 +303,7 @@ def get_supported_capabilities(<br>
)<br>
if conditional_capability_fn:<br>
capability_fn = conditional_capability_fn(capability_fn)<br>
- capability_fn(testpmd_shell)<br>
+ capability_fn(testpmd)<br>
for capability in capabilities:<br>
if capability.nic_capability in supported_capabilities:<br>
supported_conditional_capabilities.add(capability)<br>
@@ -242,8 +314,8 @@ def get_supported_capabilities(<br>
@classmethod<br>
def _get_decorated_capabilities_map(<br>
cls,<br>
- ) -> dict[TestPmdShellDecorator | None, set["DecoratedNicCapability"]]:<br>
- capabilities_map: dict[TestPmdShellDecorator | None, set["DecoratedNicCapability"]] = {}<br>
+ ) -> dict[TestPmdDecorator | None, set["DecoratedNicCapability"]]:<br>
+ capabilities_map: dict[TestPmdDecorator | None, set["DecoratedNicCapability"]] = {}<br>
for capability in cls.capabilities_to_check:<br>
if capability.capability_decorator not in capabilities_map:<br>
capabilities_map[capability.capability_decorator] = set()<br>
@@ -257,12 +329,12 @@ def _reduce_capabilities(<br>
capabilities: set["DecoratedNicCapability"],<br>
supported_capabilities: MutableSet,<br>
unsupported_capabilities: MutableSet,<br>
- ) -> TestPmdShellMethod:<br>
- def reduced_fn(testpmd_shell: TestPmdShell) -> None:<br>
+ ) -> TestPmdMethod:<br>
+ def reduced_fn(testpmd: "TestPmd") -> None:<br>
for capability in capabilities:<br>
if capability not in supported_capabilities | unsupported_capabilities:<br>
capability.capability_fn(<br>
- testpmd_shell, supported_capabilities, unsupported_capabilities<br>
+ testpmd, supported_capabilities, unsupported_capabilities<br>
)<br>
<br>
return reduced_fn<br>
@@ -278,11 +350,11 @@ def __repr__(self) -> str:<br>
<br>
@dataclass<br>
class TopologyCapability(Capability):<br>
- """A wrapper around :class:`~.topology.TopologyType`.<br>
+ """A wrapper around :class:`~.topology.LinkTopology`.<br>
<br>
Each test case must be assigned a topology. It could be done explicitly;<br>
- the implicit default is given by :meth:`~.topology.TopologyType.default`, which this class<br>
- returns :attr:`~.topology.TopologyType.two_links`.<br>
+ the implicit default is given by :meth:`~.topology.LinkTopology.default`, which this class<br>
+ returns :attr:`~.topology.LinkTopology.TWO_LINKS`.<br>
<br>
Test case topology may be set by setting the topology for the whole suite.<br>
The priority in which topology is set is as follows:<br>
@@ -301,7 +373,7 @@ class TopologyCapability(Capability):<br>
topology_type: The topology type that defines each instance.<br>
"""<br>
<br>
- topology_type: TopologyType<br>
+ topology_type: LinkTopology<br>
<br>
_unique_capabilities: ClassVar[dict[str, Self]] = {}<br>
<br>
@@ -310,7 +382,7 @@ def _preprocess_required(self, test_case_or_suite: type["TestProtocol"]) -> None<br>
test_case_or_suite.topology_type = self<br>
<br>
@classmethod<br>
- def get_unique(cls, topology_type: TopologyType) -> Self:<br>
+ def get_unique(cls, topology_type: LinkTopology) -> Self:<br>
"""Get the capability uniquely identified by `topology_type`.<br>
<br>
This is a factory method that implements a quasi-enum pattern.<br>
@@ -338,7 +410,7 @@ def get_supported_capabilities(<br>
"""Overrides :meth:`~Capability.get_supported_capabilities`."""<br>
supported_capabilities = set()<br>
topology_capability = cls.get_unique(topology.type)<br>
- for topology_type in TopologyType:<br>
+ for topology_type in LinkTopology:<br>
candidate_topology_type = cls.get_unique(topology_type)<br>
if candidate_topology_type <= topology_capability:<br>
supported_capabilities.add(candidate_topology_type)<br>
@@ -351,17 +423,17 @@ def set_required(self, test_case_or_suite: type["TestProtocol"]) -> None:<br>
This means we have to modify test case topologies when processing the test suite topologies.<br>
At that point, the test case topologies have been set by the :func:`requires` decorator.<br>
The test suite topology only affects the test case topologies<br>
- if not :attr:`~.topology.TopologyType.default`.<br>
+ if not :attr:`~.topology.LinkTopology.default`.<br>
<br>
Raises:<br>
ConfigurationError: If the topology type requested by the test case is more complex than<br>
the test suite's.<br>
"""<br>
if inspect.isclass(test_case_or_suite):<br>
- if self.topology_type is not TopologyType.default():<br>
+ if self.topology_type is not LinkTopology.default():<br>
self.add_to_required(test_case_or_suite)<br>
for test_case in test_case_or_suite.get_test_cases():<br>
- if test_case.topology_type.topology_type is TopologyType.default():<br>
+ if test_case.topology_type.topology_type is LinkTopology.default():<br>
# test case topology has not been set, use the one set by the test suite<br>
self.add_to_required(test_case)<br>
elif test_case.topology_type > test_case_or_suite.topology_type:<br>
@@ -440,7 +512,7 @@ class TestProtocol(Protocol):<br>
#: The reason for skipping the test case or suite.<br>
skip_reason: ClassVar[str] = ""<br>
#: The topology type of the test case or suite.<br>
- topology_type: ClassVar[TopologyCapability] = TopologyCapability(TopologyType.default())<br>
+ topology_type: ClassVar[TopologyCapability] = TopologyCapability(LinkTopology.default())<br>
#: The capabilities the test case or suite requires in order to be executed.<br>
required_capabilities: ClassVar[set[Capability]] = set()<br>
#: The SUT ports topology configuration of the test case or suite.<br>
@@ -481,7 +553,7 @@ def _decorator(func: type[TestProtocol]) -> type[TestProtocol]:<br>
<br>
def requires(<br>
*nic_capabilities: NicCapability,<br>
- topology_type: TopologyType = TopologyType.default(),<br>
+ topology_type: LinkTopology = LinkTopology.default(),<br>
) -> Callable[[type[TestProtocol]], type[TestProtocol]]:<br>
"""A decorator that adds the required capabilities to a test case or test suite.<br>
<br>
diff --git a/dts/framework/testbed_model/linux_session.py b/dts/framework/testbed_model/linux_session.py<br>
index 604245d855..1f11c3e740 100644<br>
--- a/dts/framework/testbed_model/linux_session.py<br>
+++ b/dts/framework/testbed_model/linux_session.py<br>
@@ -22,7 +22,7 @@<br>
InternalError,<br>
RemoteCommandExecutionError,<br>
)<br>
-from framework.testbed_model.os_session import PortInfo<br>
+from framework.testbed_model.port import PortInfo<br>
from framework.utils import expand_range<br>
<br>
from .cpu import LogicalCore<br>
diff --git a/dts/framework/testbed_model/os_session.py b/dts/framework/testbed_model/os_session.py<br>
index b6e03aa83d..c1872880da 100644<br>
--- a/dts/framework/testbed_model/os_session.py<br>
+++ b/dts/framework/testbed_model/os_session.py<br>
@@ -31,13 +31,9 @@<br>
<br>
from framework.config.node import NodeConfiguration<br>
from framework.logger import DTSLogger<br>
-from framework.remote_session import (<br>
- InteractiveRemoteSession,<br>
- RemoteSession,<br>
- create_interactive_session,<br>
- create_remote_session,<br>
-)<br>
-from framework.remote_session.remote_session import CommandResult<br>
+from framework.remote_session.interactive_remote_session import InteractiveRemoteSession<br>
+from framework.remote_session.remote_session import CommandResult, RemoteSession<br>
+from framework.remote_session.ssh_session import SSHSession<br>
from framework.settings import SETTINGS<br>
from framework.utils import MesonArgs, TarCompressionFormat<br>
<br>
@@ -129,8 +125,8 @@ def __init__(<br>
self._config = node_config<br>
<a href="http://self.name" rel="noreferrer" target="_blank">self.name</a> = name<br>
self._logger = logger<br>
- self.remote_session = create_remote_session(node_config, name, logger)<br>
- self.interactive_session = create_interactive_session(node_config, logger)<br>
+ self.remote_session = SSHSession(node_config, name, logger)<br>
+ self.interactive_session = InteractiveRemoteSession(node_config, logger)<br>
<br>
def is_alive(self) -> bool:<br>
"""Check whether the underlying remote session is still responding."""<br>
diff --git a/dts/framework/testbed_model/topology.py b/dts/framework/testbed_model/topology.py<br>
index 899ea0ad3a..09303a1f08 100644<br>
--- a/dts/framework/testbed_model/topology.py<br>
+++ b/dts/framework/testbed_model/topology.py<br>
@@ -11,33 +11,17 @@<br>
from collections import defaultdict<br>
from collections.abc import Iterator<br>
from dataclasses import dataclass<br>
-from enum import Enum<br>
from typing import Literal, NamedTuple<br>
<br>
from typing_extensions import Self<br>
<br>
+from api.capabilities import LinkTopology<br>
from framework.exception import ConfigurationError, InternalError<br>
from framework.testbed_model.node import Node<br>
<br>
from .port import DriverKind, Port, PortConfig<br>
<br>
<br>
-class TopologyType(int, Enum):<br>
- """Supported topology types."""<br>
-<br>
- #: A topology with no Traffic Generator.<br>
- no_link = 0<br>
- #: A topology with one physical link between the SUT node and the TG node.<br>
- one_link = 1<br>
- #: A topology with two physical links between the Sut node and the TG node.<br>
- two_links = 2<br>
-<br>
- @classmethod<br>
- def default(cls) -> "TopologyType":<br>
- """The default topology required by test cases if not specified otherwise."""<br>
- return cls.two_links<br>
-<br>
-<br>
class PortLink(NamedTuple):<br>
"""The physical, cabled connection between the ports."""<br>
<br>
@@ -71,7 +55,7 @@ class Topology:<br>
tg_ports: The TG ports.<br>
"""<br>
<br>
- type: TopologyType<br>
+ type: LinkTopology<br>
sut_ports: list[Port]<br>
tg_ports: list[Port]<br>
pf_ports: list[Port]<br>
@@ -87,15 +71,15 @@ def from_port_links(cls, port_links: Iterator[PortLink]) -> Self:<br>
Raises:<br>
ConfigurationError: If an unsupported link topology is supplied.<br>
"""<br>
- type = TopologyType.no_link<br>
+ type = LinkTopology.NO_LINK<br>
<br>
if port_link := next(port_links, None):<br>
- type = TopologyType.one_link<br>
+ type = LinkTopology.ONE_LINK<br>
sut_ports = [port_link.sut_port]<br>
tg_ports = [port_link.tg_port]<br>
<br>
if port_link := next(port_links, None):<br>
- type = TopologyType.two_links<br>
+ type = LinkTopology.TWO_LINKS<br>
sut_ports.append(port_link.sut_port)<br>
tg_ports.append(port_link.tg_port)<br>
<br>
@@ -268,9 +252,9 @@ def sut_port_ingress(self) -> Port:<br>
@property<br>
def sut_port_egress(self) -> Port:<br>
"""The egress port of the SUT node."""<br>
- return self.sut_ports[1 if self.type is TopologyType.two_links else 0]<br>
+ return self.sut_ports[1 if self.type is LinkTopology.TWO_LINKS else 0]<br>
<br>
@property<br>
def tg_port_ingress(self) -> Port:<br>
"""The ingress port of the TG node."""<br>
- return self.tg_ports[1 if self.type is TopologyType.two_links else 0]<br>
+ return self.tg_ports[1 if self.type is LinkTopology.TWO_LINKS else 0]<br>
--<br>
2.39.5<br>
<br>
</blockquote></div>