[RFC PATCH v1 1/2] dts: Add interactive shell for managing Scapy
Jeremy Spewock
jspewock at iol.unh.edu
Mon Jun 17 21:45:41 CEST 2024
On Tue, Jun 11, 2024 at 7:12 AM Juraj Linkeš <juraj.linkes at pantheon.tech> wrote:
>
>
> > diff --git a/dts/framework/remote_session/scapy_shell.py b/dts/framework/remote_session/scapy_shell.py
> > new file mode 100644
> > index 0000000000..fa647dc870
> > --- /dev/null
> > +++ b/dts/framework/remote_session/scapy_shell.py
> > @@ -0,0 +1,175 @@
> > +# SPDX-License-Identifier: BSD-3-Clause
> > +# Copyright(c) 2024 University of New Hampshire
> > +
> > +"""Scapy interactive shell."""
> > +
> > +import re
> > +import time
> > +from typing import Callable, ClassVar
> > +
> > +from scapy.compat import base64_bytes # type: ignore[import]
> > +from scapy.layers.l2 import Ether # type: ignore[import]
> > +from scapy.packet import Packet # type: ignore[import]
> > +
> > +from framework.testbed_model.port import Port
> > +from framework.utils import REGEX_FOR_BASE64_ENCODING
> > +
> > +from .python_shell import PythonShell
> > +
> > +
> > +class ScapyShell(PythonShell):
> > + """Scapy interactive shell.
> > +
> > + The scapy shell is implemented using a :class:`~.python_shell.PythonShell` and importing
> > + everything from the "scapy.all" library. This is done due to formatting issues that occur from
> > + the scapy interactive shell attempting to use iPython, which is not compatible with the
> > + pseudo-terminal that paramiko creates to manage its channels.
> > +
> > + This class is used as an underlying session for the scapy traffic generator and shouldn't be
> > + used directly inside of test suites. If there isn't a method in
> > + :class:`framework.testbed_model.traffic_generator.scapy.ScapyTrafficGenerator` to fulfill a
> > + need, one should be added there and implemented here.
> > + """
> > +
> > + #: Name of sniffer to ensure the same is used in all places
> > + _sniffer_name: ClassVar[str] = "sniffer"
> > + #: Name of variable that points to the list of packets inside the scapy shell.
> > + _send_packet_list_name: ClassVar[str] = "packets"
> > + #: Padding to add to the start of a line for python syntax compliance.
> > + _padding: ClassVar[str] = " " * 4
> > +
> > + def _start_application(self, get_privileged_command: Callable[[str], str] | None) -> None:
> > + """Overrides :meth:`~.interactive_shell._start_application`.
>
> This extends the method and in that case we should mention what the
> extension is.
Ack.
>
> > +
> > + Adds a command that imports everything from the scapy library immediately after starting
> > + the shell for usage in later calls to the methods of this class.
> > +
> > + Args:
> > + get_privileged_command: A function (but could be any callable) that produces
> > + the version of the command with elevated privileges.
> > + """
> > + super()._start_application(get_privileged_command)
> > + self.send_command("from scapy.all import *")
> > +
> > + def _build_send_packet_list(self, packets: list[Packet]) -> None:
>
> The send in the name evokes that the method sends the packets.
>
> The description in the Args section says "packets to recreate in the
> shell" and I like that so I'd put that in the name: _create_packet_list()
>
Great point. I was trying too hard to force the idea of building the
"list of packets that will be sent" but on its own this just builds a
list of packets.
> > + """Build a list of packets to send later.
> > +
> > + Gets the string that represents the Python command that was used to create each packet in
>
> Gets the string sounds like that's what the methods returns, as a getter
> method would.
Fair enough, I'll edit this.
>
> > + `packets` and sends these commands into the underlying Python session. The purpose behind
> > + doing this is to create a list that is identical to `packets` inside the shell. This method
> > + should only be called by methods for sending packets immediately prior to sending. The list
> > + of packets will continue to exist in the scope of the shell until subsequent calls to this
> > + method, so failure to rebuild the list prior to sending packets could lead to undesired
> > + "stale" packets to be sent.
> > +
> > + Args:
> > + packets: The list of packets to recreate in the shell.
> > + """
> > + self._logger.info("Building a list of packets to send...")
>
> The could be just a regular dot instead of the ellipsis (I don't like
> random ellipses as those read as if I was supposed to expect something
> and we don't provide a subsequent log that would continue this ellipsis).
Ack.
>
> > + self.send_command(
> > + f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]"
> > + )
> > +
> > + def send_packets(self, packets: list[Packet], send_port: Port) -> None:
> > + """Send packets without capturing any received traffic.
> > +
> > + Provides a "fire and forget" method for sending packets for situations when there is no
> > + need to collected any received traffic.
>
> Typo: collected
Good catch!
>
> > +
> > + Args:
> > + packets: The packets to send.
> > + send_port: The port to send the packets from.
> > + """
> > + self._build_send_packet_list(packets)
> > + send_command = [
> > + "sendp(",
> > + f"{self._send_packet_list_name},",
> > + f"iface='{send_port.logical_name}',",
> > + "realtime=True,",
> > + "verbose=True",
> > + ")",
> > + ]
> > + self.send_command(f"\n{self._padding}".join(send_command))
> > +
> > + def _create_sniffer(
> > + self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str
> > + ) -> None:
> > + """Create an asynchronous sniffer in the shell.
> > +
> > + A list of packets to send is added to the sniffer inside of a callback function so that
> > + they are immediately sent at the time sniffing is started.
> > +
> > + Args:
> > + packets_to_send: A list of packets to send when sniffing is started.
> > + send_port: The port to send the packets on when sniffing is started.
> > + recv_port: The port to collect the traffic from.
> > + filter_config: An optional BPF format filter to use when sniffing for packets. Omitted
> > + when set to an empty string.
> > + """
> > + self._build_send_packet_list(packets_to_send)
> > + sniffer_commands = [
> > + f"{self._sniffer_name} = AsyncSniffer(",
> > + f"iface='{recv_port.logical_name}',",
> > + "store=True,",
> > + "started_callback=lambda *args: sendp(",
> > + f"{self._padding}{self._send_packet_list_name}, iface='{send_port.logical_name}'),",
> > + ")",
> > + ]
> > + if filter_config:
> > + sniffer_commands.insert(-1, f"filter='{filter_config}'")
> > +
> > + self.send_command(f"\n{self._padding}".join(sniffer_commands))
> > +
> > + def _start_and_stop_sniffing(self, duration: float) -> list[Packet]:
> > + """Starts asynchronous sniffer, runs for a set `duration`, then collects received packets.
>
> This should be in imperative to align with the rest of the docstrings.
Ack.
>
> > + > + This method expects that you have first created an
> asynchronous sniffer inside the shell
> > + and will fail if you haven't. Received packets are collected by printing the base64
> > + encoding of each packet in the shell and then harvesting these encodings using regex to
> > + convert back into packet objects.
> > +
> > + Args:
> > + duration: The amount of time in seconds to sniff for received packets.
> > +
> > + Returns:
> > + A list of all packets that were received while the sniffer was running.
> > + """
> > + sniffed_packets_name = "gathered_packets"
> > + self.send_command(f"{self._sniffer_name}.start()")
> > + time.sleep(duration)
> > + self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)")
> > + # An extra newline is required here due to the nature of interactive Python shells
> > + packet_objects = self.send_command(
>
> These are strings, which are objects, but I'd like to be more explicit,
> so maybe packet_strs?
Good point, that would be more clear.
>
> > + f"for pakt in {sniffed_packets_name}: print(bytes_base64(pakt.build()))\n"
> > + )
> > + # In the string of bytes "b'XXXX'", we only want the contents ("XXXX")
> > + list_of_packets_base64 = re.findall(
> > + f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_objects, re.MULTILINE
> > + )
> > + return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64]
> > +
> > + def send_packets_and_capture(
> > + self,
> > + packets: list[Packet],
> > + send_port: Port,
> > + recv_port: Port,
> > + filter_config: str,
> > + duration: float,
> > + ) -> list[Packet]:
> > + """Send packets and capture any received traffic.
> > +
> > + The steps required to collect these packets are creating a sniffer that holds the packets to
> > + send then starting and stopping the sniffer.
> > +
> > + Args:
> > + packets: The packets to send.
> > + send_port: The port to send the packets from.
> > + recv_port: The port to collect received packets from.
> > + filter_config: The filter to use while sniffing for packets.
> > + duration: The amount of time in seconds to sniff for received packets.
> > +
> > + Returns:
> > + A list of packets received after sending `packets`.
> > + """
> > + self._create_sniffer(packets, send_port, recv_port, filter_config)
> > + return self._start_and_stop_sniffing(duration)
More information about the dev
mailing list