[PATCH v4 1/1] dts: Remove XML-RPC server for Scapy TG and instead use PythonShell
    jspewock at iol.unh.edu 
    jspewock at iol.unh.edu
       
    Wed Sep 25 19:37:27 CEST 2024
    
    
  
From: Jeremy Spewock <jspewock at iol.unh.edu>
Previously all scapy commands were handled using an XML-RPC server that
ran on the TGNode. This unnecessarily enforces a minimum Python version
of 3.10 on the server that is being used as a traffic generator and
complicates the implementation of scapy methods. This patch removes the
XML-RPC server completely and instead allows the Scapy TG to extend from
the PythonShell to implement the functionality of a traffic generator.
This is done by importing the Scapy library in the PythonShell and
sending commands directly to the interactive session on the TG Node.
Bugzilla ID: 1374
Signed-off-by: Jeremy Spewock <jspewock at iol.unh.edu>
---
 .../single_active_interactive_shell.py        |   8 +-
 .../traffic_generator/__init__.py             |   2 +-
 .../testbed_model/traffic_generator/scapy.py  | 454 +++++++-----------
 .../traffic_generator/traffic_generator.py    |  15 +-
 dts/framework/utils.py                        |  15 +
 5 files changed, 195 insertions(+), 299 deletions(-)
diff --git a/dts/framework/remote_session/single_active_interactive_shell.py b/dts/framework/remote_session/single_active_interactive_shell.py
index 77a4dcefdf..f41d729655 100644
--- a/dts/framework/remote_session/single_active_interactive_shell.py
+++ b/dts/framework/remote_session/single_active_interactive_shell.py
@@ -36,9 +36,10 @@
 from framework.params import Params
 from framework.settings import SETTINGS
 from framework.testbed_model.node import Node
+from framework.utils import MultiInheritanceBaseClass
 
 
-class SingleActiveInteractiveShell(ABC):
+class SingleActiveInteractiveShell(MultiInheritanceBaseClass, ABC):
     """The base class for managing interactive shells.
 
     This class shouldn't be instantiated directly, but instead be extended. It contains
@@ -93,9 +94,13 @@ def __init__(
         timeout: float = SETTINGS.timeout,
         app_params: Params = Params(),
         name: str | None = None,
+        **kwargs,
     ) -> None:
         """Create an SSH channel during initialization.
 
+        Additional key-word arguments can be passed through `kwargs` if needed for fulfilling other
+        constructors in the case of multiple-inheritance.
+
         Args:
             node: The node on which to run start the interactive shell.
             privileged: Enables the shell to run as superuser.
@@ -115,6 +120,7 @@ def __init__(
         self._timeout = timeout
         # Ensure path is properly formatted for the host
         self._update_real_path(self.path)
+        super().__init__(node, **kwargs)
 
     def _setup_ssh_channel(self):
         self._ssh_channel = self._node.main_session.interactive_session.session.invoke_shell()
diff --git a/dts/framework/testbed_model/traffic_generator/__init__.py b/dts/framework/testbed_model/traffic_generator/__init__.py
index 6dac86a224..a319fa5320 100644
--- a/dts/framework/testbed_model/traffic_generator/__init__.py
+++ b/dts/framework/testbed_model/traffic_generator/__init__.py
@@ -36,7 +36,7 @@ def create_traffic_generator(
     """
     match traffic_generator_config:
         case ScapyTrafficGeneratorConfig():
-            return ScapyTrafficGenerator(tg_node, traffic_generator_config)
+            return ScapyTrafficGenerator(tg_node, traffic_generator_config, privileged=True)
         case _:
             raise ConfigurationError(
                 f"Unknown traffic generator: {traffic_generator_config.traffic_generator_type}"
diff --git a/dts/framework/testbed_model/traffic_generator/scapy.py b/dts/framework/testbed_model/traffic_generator/scapy.py
index 13fc1107aa..07f11b7f78 100644
--- a/dts/framework/testbed_model/traffic_generator/scapy.py
+++ b/dts/framework/testbed_model/traffic_generator/scapy.py
@@ -6,311 +6,150 @@
 
 A traffic generator used for functional testing, implemented with
 `the Scapy library <https://scapy.readthedocs.io/en/latest/>`_.
-The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+The traffic generator uses an interactive shell to run Scapy on the remote TG node.
 
-The traffic generator uses the :mod:`xmlrpc.server` module to run an XML-RPC server
-in an interactive remote Python SSH session. The communication with the server is facilitated
-with a local server proxy from the :mod:`xmlrpc.client` module.
+The traffic generator extends :class:`framework.remote_session.python_shell.PythonShell` to
+implement the methods for handling packets by sending commands into the interactive shell.
 """
 
-import inspect
-import marshal
+
+import re
 import time
-import types
-import xmlrpc.client
-from xmlrpc.server import SimpleXMLRPCServer
+from typing import ClassVar
 
-import scapy.all  # type: ignore[import-untyped]
+from scapy.compat import base64_bytes  # type: ignore[import-untyped]
 from scapy.layers.l2 import Ether  # type: ignore[import-untyped]
 from scapy.packet import Packet  # type: ignore[import-untyped]
 
 from framework.config import OS, ScapyTrafficGeneratorConfig
 from framework.remote_session.python_shell import PythonShell
-from framework.settings import SETTINGS
 from framework.testbed_model.node import Node
 from framework.testbed_model.port import Port
-
-from .capturing_traffic_generator import (
-    CapturingTrafficGenerator,
+from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
     PacketFilteringConfig,
-    _get_default_capture_name,
 )
+from framework.utils import REGEX_FOR_BASE64_ENCODING
 
-"""
-========= BEGIN RPC FUNCTIONS =========
-
-All of the functions in this section are intended to be exported to a python
-shell which runs a scapy RPC server. These functions are made available via that
-RPC server to the packet generator. To add a new function to the RPC server,
-first write the function in this section. Then, if you need any imports, make sure to
-add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list
-in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc,
-so you may need to construct wrapper functions around many scapy types.
-"""
+from .capturing_traffic_generator import CapturingTrafficGenerator
 
-"""
-Add the line needed to import something in a normal python environment
-as an entry to this array. It will be imported before any functions are
-sent to the server.
-"""
-SCAPY_RPC_SERVER_IMPORTS = [
-    "from scapy.all import *",
-    "import xmlrpc",
-    "import sys",
-    "from xmlrpc.server import SimpleXMLRPCServer",
-    "import marshal",
-    "import pickle",
-    "import types",
-    "import time",
-]
-
-
-def scapy_send_packets_and_capture(
-    xmlrpc_packets: list[xmlrpc.client.Binary],
-    send_iface: str,
-    recv_iface: str,
-    duration: float,
-    sniff_filter: str,
-) -> list[bytes]:
-    """The RPC function to send and capture packets.
-
-    This function is meant to be executed on the remote TG node via the server proxy.
-
-    Args:
-        xmlrpc_packets: The packets to send. These need to be converted to
-            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
-        send_iface: The logical name of the egress interface.
-        recv_iface: The logical name of the ingress interface.
-        duration: Capture for this amount of time, in seconds.
-
-    Returns:
-        A list of bytes. Each item in the list represents one packet, which needs
-        to be converted back upon transfer from the remote node.
-    """
-    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
-    sniffer = scapy.all.AsyncSniffer(
-        iface=recv_iface,
-        store=True,
-        started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface),
-        filter=sniff_filter,
-    )
-    sniffer.start()
-    time.sleep(duration)
-    return [scapy_packet.build() for scapy_packet in sniffer.stop(join=True)]
-
-
-def scapy_send_packets(xmlrpc_packets: list[xmlrpc.client.Binary], send_iface: str) -> None:
-    """The RPC function to send packets.
-
-    This function is meant to be executed on the remote TG node via the server proxy.
-    It only sends `xmlrpc_packets`, without capturing them.
-
-    Args:
-        xmlrpc_packets: The packets to send. These need to be converted to
-            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
-        send_iface: The logical name of the egress interface.
-    """
-    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
-    scapy.all.sendp(scapy_packets, iface=send_iface, realtime=True, verbose=True)
 
+class ScapyTrafficGenerator(PythonShell, CapturingTrafficGenerator):
+    """Provides access to scapy functions on a traffic generator node.
 
-"""
-Functions to be exposed by the scapy RPC server.
-"""
-RPC_FUNCTIONS = [
-    scapy_send_packets,
-    scapy_send_packets_and_capture,
-]
+    This class extends the base with remote execution of scapy functions. All methods for
+    processing packets are implemented using an underlying
+    :class:`framework.remote_session.python_shell.PythonShell` which imports the Scapy library.
 
-"""
-========= END RPC FUNCTIONS =========
-"""
-
-
-class QuittableXMLRPCServer(SimpleXMLRPCServer):
-    r"""Basic XML-RPC server.
-
-    The server may be augmented by functions serializable by the :mod:`marshal` module.
-
-    Example:
-        ::
+    Because of the dual-inheritance, this class has both methods that wrap scapy commands sent into
+    the shell and methods that run locally to fulfill traffic generation needs. To help make a
+    clear distinction between the two, the names of the methods that wrap the logic of the
+    underlying shell should be prepended with "shell".
 
-            def hello_world():
-                # to be sent to the XML-RPC server
-                print("Hello World!")
-
-            # start the XML-RPC server on the remote node
-            # this is done by starting a Python shell on the remote node
-            from framework.remote_session import PythonShell
-            # the example assumes you're already connected to a tg_node
-            session = tg_node.create_interactive_shell(PythonShell, timeout=5, privileged=True)
-
-            # then importing the modules needed to run the server
-            # and the modules for any functions later added to the server
-            session.send_command("import xmlrpc")
-            session.send_command("from xmlrpc.server import SimpleXMLRPCServer")
-
-            # sending the source code of this class to the Python shell
-            from xmlrpc.server import SimpleXMLRPCServer
-            src = inspect.getsource(QuittableXMLRPCServer)
-            src = "\n".join([l for l in src.splitlines() if not l.isspace() and l != ""])
-            spacing = "\n" * 4
-            session.send_command(spacing + src + spacing)
-
-            # then starting the server with:
-            command = "s = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));s.serve_forever()"
-            session.send_command(command, "XMLRPC OK")
+    Note that the order of inheritance is important for this class. In order to instantiate this
+    class, the abstract methods of :class:`~.capturing_traffic_generator.CapturingTrafficGenerator`
+    must be implemented. Since some of these methods are implemented in the underlying interactive
+    shell, according to Python's Method Resolution Order (MRO), the interactive shell must come
+    first.
+    """
 
-            # now the server is running on the remote node and we can add functions to it
-            # first connect to the server from the execution node
-            import xmlrpc.client
-            server_url = f"http://{tg_node.config.hostname}:8000"
-            rpc_server_proxy = xmlrpc.client.ServerProxy(server_url)
+    _config: ScapyTrafficGeneratorConfig
 
-            # get the function bytes to send
-            import marshal
-            function_bytes = marshal.dumps(hello_world.__code__)
-            rpc_server_proxy.add_rpc_function(hello_world.__name__, function_bytes)
+    #: Name of sniffer to ensure the same is used in all places
+    _sniffer_name: ClassVar[str] = "sniffer"
+    #: Name of variable that points to the list of packets inside the scapy shell.
+    _send_packet_list_name: ClassVar[str] = "packets"
+    #: Padding to add to the start of a line for python syntax compliance.
+    _python_indentation: ClassVar[str] = " " * 4
 
-            # now we can execute the function on the server
-            xmlrpc_binary_recv: xmlrpc.client.Binary = rpc_server_proxy.hello_world()
-            print(str(xmlrpc_binary_recv))
-    """
+    def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig, **kwargs):
+        """Extend the constructor with Scapy TG specifics.
 
-    def __init__(self, *args, **kwargs):
-        """Extend the XML-RPC server initialization.
+        Initializes both the traffic generator and the interactive shell used to handle Scapy
+        functions. The interactive shell will be started on `tg_node`. The additional key-word
+        arguments in `kwargs` are used to pass into the constructor for the interactive shell.
 
         Args:
-            args: The positional arguments that will be passed to the superclass's constructor.
-            kwargs: The keyword arguments that will be passed to the superclass's constructor.
-                The `allow_none` argument will be set to :data:`True`.
+            tg_node: The node where the traffic generator resides.
+            config: The traffic generator's test run configuration.
+            kwargs: Keyword arguments all traffic generators must support in order to allow for
+                multiple-inheritance. Values passed into this parameter should correspond to the
+                parameters of :meth:`PythonShell.__init__` in this case.
         """
-        kwargs["allow_none"] = True
-        super().__init__(*args, **kwargs)
-        self.register_introspection_functions()
-        self.register_function(self.quit)
-        self.register_function(self.add_rpc_function)
+        assert (
+            tg_node.config.os == OS.linux
+        ), "Linux is the only supported OS for scapy traffic generation"
 
-    def quit(self) -> None:
-        """Quit the server."""
-        self._BaseServer__shutdown_request = True
-        return None
+        super().__init__(tg_node, config=config, **kwargs)
+        self.start_application()
 
-    def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary) -> None:
-        """Add a function to the server from the local server proxy.
+    def start_application(self) -> None:
+        """Extends :meth:`framework.remote_session.interactive_shell.start_application`.
 
-        Args:
-              name: The name of the function.
-              function_bytes: The code of the function.
+        Adds a command that imports everything from the scapy library immediately after starting
+        the shell for usage in later calls to the methods of this class.
         """
-        function_code = marshal.loads(function_bytes.data)
-        function = types.FunctionType(function_code, globals(), name)
-        self.register_function(function)
+        super().start_application()
+        self.send_command("from scapy.all import *")
 
-    def serve_forever(self, poll_interval: float = 0.5) -> None:
-        """Extend the superclass method with an additional print.
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        """Implementation for sending packets without capturing any received traffic.
 
-        Once executed in the local server proxy, the print gives us a clear string to expect
-        when starting the server. The print means this function was executed on the XML-RPC server.
+        Provides a "fire and forget" method of sending packets.
         """
-        print("XMLRPC OK")
-        super().serve_forever(poll_interval)
-
-
-class ScapyTrafficGenerator(CapturingTrafficGenerator):
-    """Provides access to scapy functions via an RPC interface.
-
-    This class extends the base with remote execution of scapy functions.
-
-    Any packets sent to the remote server are first converted to bytes. They are received as
-    :class:`~xmlrpc.client.Binary` objects on the server side. When the server sends the packets
-    back, they are also received as :class:`~xmlrpc.client.Binary` objects on the client side, are
-    converted back to :class:`~scapy.packet.Packet` objects and only then returned from the methods.
-
-    Attributes:
-        session: The exclusive interactive remote session created by the Scapy
-            traffic generator where the XML-RPC server runs.
-        rpc_server_proxy: The object used by clients to execute functions
-            on the XML-RPC server.
-    """
-
-    session: PythonShell
-    rpc_server_proxy: xmlrpc.client.ServerProxy
-    _config: ScapyTrafficGeneratorConfig
-
-    def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig):
-        """Extend the constructor with Scapy TG specifics.
-
-        The traffic generator first starts an XML-RPC on the remote `tg_node`.
-        Then it populates the server with functions which use the Scapy library
-        to send/receive traffic:
+        self._shell_set_packet_list(packets)
+        send_command = [
+            "sendp(",
+            f"{self._send_packet_list_name},",
+            f"iface='{port.logical_name}',",
+            "realtime=True,",
+            "verbose=True",
+            ")",
+        ]
+        self.send_command(f"\n{self._python_indentation}".join(send_command))
 
-            * :func:`scapy_send_packets_and_capture`
-            * :func:`scapy_send_packets`
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        recv_port: Port,
+        filter_config: PacketFilteringConfig,
+        duration: float,
+    ) -> list[Packet]:
+        """Implementation for sending packets and capturing any received traffic.
 
-        To enable verbose logging from the xmlrpc client, use the :option:`--verbose`
-        command line argument or the :envvar:`DTS_VERBOSE` environment variable.
+        This method first creates an asynchronous sniffer that holds the packets to send, then
+        starts and stops said sniffer, collecting any packets that it had received while it was
+        running.
 
-        Args:
-            tg_node: The node where the traffic generator resides.
-            config: The traffic generator's test run configuration.
+        Returns:
+            A list of packets received after sending `packets`.
         """
-        super().__init__(tg_node, config)
-
-        assert (
-            self._tg_node.config.os == OS.linux
-        ), "Linux is the only supported OS for scapy traffic generation"
-
-        self.session = PythonShell(
-            self._tg_node, timeout=5, privileged=True, name="ScapyXMLRPCServer"
+        self._shell_create_sniffer(
+            packets, send_port, recv_port, self._create_packet_filter(filter_config)
         )
+        return self._shell_start_and_stop_sniffing(duration)
 
-        self.session.start_application()
+    def _shell_set_packet_list(self, packets: list[Packet]) -> None:
+        """Build a list of packets to send later.
 
-        # import libs in remote python console
-        for import_statement in SCAPY_RPC_SERVER_IMPORTS:
-            self.session.send_command(import_statement)
+        Sends the string that represents the Python command that was used to create each packet in
+        `packets` into the underlying Python session. The purpose behind doing this is to create a
+        list that is identical to `packets` inside the shell. This method should only be called by
+        methods for sending packets immediately prior to sending. The list of packets will continue
+        to exist in the scope of the shell until subsequent calls to this method, so failure to
+        rebuild the list prior to sending packets could lead to undesired "stale" packets to be
+        sent.
 
-        # start the server
-        xmlrpc_server_listen_port = 8000
-        self._start_xmlrpc_server_in_remote_python(xmlrpc_server_listen_port)
-
-        # connect to the server
-        server_url = f"http://{self._tg_node.config.hostname}:{xmlrpc_server_listen_port}"
-        self.rpc_server_proxy = xmlrpc.client.ServerProxy(
-            server_url, allow_none=True, verbose=SETTINGS.verbose
-        )
-
-        # add functions to the server
-        for function in RPC_FUNCTIONS:
-            # A slightly hacky way to move a function to the remote server.
-            # It is constructed from the name and code on the other side.
-            # Pickle cannot handle functions, nor can any of the other serialization
-            # frameworks aside from the libraries used to generate pyc files, which
-            # are even more messy to work with.
-            function_bytes = marshal.dumps(function.__code__)
-            self.rpc_server_proxy.add_rpc_function(function.__name__, function_bytes)
-
-    def _start_xmlrpc_server_in_remote_python(self, listen_port: int) -> None:
-        # load the source of the function
-        src = inspect.getsource(QuittableXMLRPCServer)
-        # Lines with only whitespace break the repl if in the middle of a function
-        # or class, so strip all lines containing only whitespace
-        src = "\n".join([line for line in src.splitlines() if not line.isspace() and line != ""])
-
-        # execute it in the python terminal
-        self.session.send_command(src + "\n")
-        self.session.send_command(
-            f"server = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));server.serve_forever()",
-            "XMLRPC OK",
+        Args:
+            packets: The list of packets to recreate in the shell.
+        """
+        self._logger.info("Building a list of packets to send.")
+        self.send_command(
+            f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]"
         )
 
-    def _send_packets(self, packets: list[Packet], port: Port) -> None:
-        packets = [packet.build() for packet in packets]
-        self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name)
-
     def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
-        """Combines filter settings from `filter_config` into a BPF that scapy can use.
+        """Combine filter settings from `filter_config` into a BPF that scapy can use.
 
         Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are
         collected based on various attributes of the packet.
@@ -331,36 +170,65 @@ def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
             bpf_filter.append("ether[12:2] != 0x88cc")
         return " && ".join(bpf_filter)
 
-    def _send_packets_and_capture(
-        self,
-        packets: list[Packet],
-        send_port: Port,
-        receive_port: Port,
-        filter_config: PacketFilteringConfig,
-        duration: float,
-        capture_name: str = _get_default_capture_name(),
-    ) -> list[Packet]:
-        binary_packets = [packet.build() for packet in packets]
-
-        xmlrpc_packets: list[
-            xmlrpc.client.Binary
-        ] = self.rpc_server_proxy.scapy_send_packets_and_capture(
-            binary_packets,
-            send_port.logical_name,
-            receive_port.logical_name,
-            duration,
-            self._create_packet_filter(filter_config),
-        )  # type: ignore[assignment]
-
-        scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets]
-        return scapy_packets
-
-    def close(self) -> None:
-        """Close the traffic generator."""
-        try:
-            self.rpc_server_proxy.quit()
-        except ConnectionRefusedError:
-            # Because the python instance closes, we get no RPC response.
-            # Thus, this error is expected
-            pass
-        self.session.close()
+    def _shell_create_sniffer(
+        self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str
+    ) -> None:
+        """Create an asynchronous sniffer in the shell.
+
+        A list of packets is passed to the sniffer's callback function so that they are immediately
+        sent at the time sniffing is started.
+
+        Args:
+            packets_to_send: A list of packets to send when sniffing is started.
+            send_port: The port to send the packets on when sniffing is started.
+            recv_port: The port to collect the traffic from.
+            filter_config: An optional BPF format filter to use when sniffing for packets. Omitted
+                when set to an empty string.
+        """
+        self._shell_set_packet_list(packets_to_send)
+        sniffer_commands = [
+            f"{self._sniffer_name} = AsyncSniffer(",
+            f"iface='{recv_port.logical_name}',",
+            "store=True,",
+            # *args is used in the arguments of the lambda since Scapy sends parameters to the
+            # callback function which we do not need for our purposes.
+            "started_callback=lambda *args: sendp(",
+            (
+                # Additional indentation is added to this line only for readability of the logs.
+                f"{self._python_indentation}{self._send_packet_list_name},"
+                f" iface='{send_port.logical_name}'),"
+            ),
+            ")",
+        ]
+        if filter_config:
+            sniffer_commands.insert(-1, f"filter='{filter_config}'")
+
+        self.send_command(f"\n{self._python_indentation}".join(sniffer_commands))
+
+    def _shell_start_and_stop_sniffing(self, duration: float) -> list[Packet]:
+        """Start asynchronous sniffer, run for a set `duration`, then collect received packets.
+
+        This method expects that you have first created an asynchronous sniffer inside the shell
+        and will fail if you haven't. Received packets are collected by printing the base64
+        encoding of each packet in the shell and then harvesting these encodings using regex to
+        convert back into packet objects.
+
+        Args:
+            duration: The amount of time in seconds to sniff for received packets.
+
+        Returns:
+            A list of all packets that were received while the sniffer was running.
+        """
+        sniffed_packets_name = "gathered_packets"
+        self.send_command(f"{self._sniffer_name}.start()")
+        time.sleep(duration)
+        self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)")
+        # An extra newline is required here due to the nature of interactive Python shells
+        packet_strs = self.send_command(
+            f"for pakt in {sniffed_packets_name}: print(bytes_base64(pakt.build()))\n"
+        )
+        # In the string of bytes "b'XXXX'", we only want the contents ("XXXX")
+        list_of_packets_base64 = re.findall(
+            f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_strs, re.MULTILINE
+        )
+        return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64]
diff --git a/dts/framework/testbed_model/traffic_generator/traffic_generator.py b/dts/framework/testbed_model/traffic_generator/traffic_generator.py
index 4ce1148706..176d5e9065 100644
--- a/dts/framework/testbed_model/traffic_generator/traffic_generator.py
+++ b/dts/framework/testbed_model/traffic_generator/traffic_generator.py
@@ -16,23 +16,29 @@
 from framework.logger import DTSLogger, get_dts_logger
 from framework.testbed_model.node import Node
 from framework.testbed_model.port import Port
-from framework.utils import get_packet_summaries
+from framework.utils import MultiInheritanceBaseClass, get_packet_summaries
 
 
-class TrafficGenerator(ABC):
+class TrafficGenerator(MultiInheritanceBaseClass, ABC):
     """The base traffic generator.
 
     Exposes the common public methods of all traffic generators and defines private methods
-    that must implement the traffic generation logic in subclasses.
+    that must implement the traffic generation logic in subclasses. This class also extends from
+    :class:`framework.utils.MultiInheritanceBaseClass` to allow subclasses the ability to inherit
+    from multiple classes to fulfil the traffic generating functionality without breaking
+    single-inheritance.
     """
 
     _config: TrafficGeneratorConfig
     _tg_node: Node
     _logger: DTSLogger
 
-    def __init__(self, tg_node: Node, config: TrafficGeneratorConfig):
+    def __init__(self, tg_node: Node, config: TrafficGeneratorConfig, **kwargs):
         """Initialize the traffic generator.
 
+        Additional key-word arguments can be passed through `kwargs` if needed for fulfilling other
+        constructors in the case of multiple-inheritance.
+
         Args:
             tg_node: The traffic generator node where the created traffic generator will be running.
             config: The traffic generator's test run configuration.
@@ -40,6 +46,7 @@ def __init__(self, tg_node: Node, config: TrafficGeneratorConfig):
         self._config = config
         self._tg_node = tg_node
         self._logger = get_dts_logger(f"{self._tg_node.name} {self._config.traffic_generator_type}")
+        super().__init__(tg_node, **kwargs)
 
     def send_packet(self, packet: Packet, port: Port) -> None:
         """Send `packet` and block until it is fully sent.
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index c768dd0c99..c68fc44517 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -29,6 +29,7 @@
 from .exception import ConfigurationError, InternalError
 
 REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/"
+REGEX_FOR_BASE64_ENCODING: str = "[-a-zA-Z0-9+\\/]*={0,3}"
 
 
 def expand_range(range_str: str) -> list[int]:
@@ -319,3 +320,17 @@ def _make_packet() -> Packet:
         return packet / random.randbytes(usable_payload_size)
 
     return [_make_packet() for _ in range(number_of)]
+
+
+class MultiInheritanceBaseClass:
+    """A base class for classes utilizing multiple-inheritance.
+
+    This class enables it's subclasses to support both single and multiple inheritance by acting as
+    a stopping point in the tree of calls to the constructors of super-classes. This class is able
+    to exist at the end of the Method Resolution Order (MRO) so that sub-classes can call super
+    without repercussion.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        """Call the init method of :class:`object`."""
+        super().__init__()
-- 
2.46.0
    
    
More information about the dev
mailing list