[PATCH v3 5/5] usertools/mbuf: parse mbuf history dump

Robin Jarry rjarry at redhat.com
Thu Oct 2 10:07:39 CEST 2025


Hi Thomas, Shani,

Sorry, I had completely forgotten about this patch.

Thomas Monjalon, Oct 01, 2025 at 01:25:
> From: Shani Peretz <shperetz at nvidia.com>
>
> Add a Python script that parses the history dump of mbufs
> generated by rte_mbuf_history_dump() and related functions,
> and presents it in a human-readable format.
>
> If an operation ID is repeated, such as in the case of a double free,
> it will be highlighted in red and listed at the end of the file.
>
> Signed-off-by: Shani Peretz <shperetz at nvidia.com>
> ---
>  usertools/dpdk-mbuf_history_parser.py | 173 ++++++++++++++++++++++++++
>  1 file changed, 173 insertions(+)
>  create mode 100755 usertools/dpdk-mbuf_history_parser.py
>
> diff --git a/usertools/dpdk-mbuf_history_parser.py b/usertools/dpdk-mbuf_history_parser.py
> new file mode 100755
> index 0000000000..dfb02d99be
> --- /dev/null
> +++ b/usertools/dpdk-mbuf_history_parser.py
> @@ -0,0 +1,173 @@
> +#!/usr/bin/env python3
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright (c) 2023 NVIDIA Corporation & Affiliates
> +

Could you add a top level docstring to this module? You can probably
copy paste the contents of the commit message here:

"""
Parse the history dump of mbufs generated by rte_mbuf_history_dump() and
related functions, and present it in a human-readable format.
"""

> +import sys
> +import re
> +import os
> +import enum

Imports are not sorted alphabetically. Could you process the file with
black[1] before submitting a respin? That way we have consistent coding
style for new python code.

[1] https://github.com/psf/black

> +
> +RED = "\033[91m"
> +RESET = "\033[0m"
> +ENUM_PATTERN = r'enum\s+rte_mbuf_history_op\s*{([^}]+)}'
> +VALUE_PATTERN = r'([A-Z_]+)\s*=\s*(\d+),\s*(?:/\*\s*(.*?)\s*\*/)?'
> +HEADER_FILE = os.path.join(
> +    os.path.dirname(os.path.dirname(__file__)),
> +    'lib/mbuf/rte_mbuf_history.h'
> +)
> +
> +
> +def print_history_sequence(address: str, sequence: list[str]):
> +    max_op_width = max(
> +        len(re.sub(r'\x1b\[[0-9;]*m', '', op)) for op in sequence
> +    )
> +    op_width = max_op_width
> +    for i in range(0, len(sequence), 4):
> +        chunk = sequence[i:i + 4]
> +        formatted_ops = [f"{op:<{op_width}}" for op in chunk]
> +        line = ""
> +        for j, op in enumerate(formatted_ops):
> +            line += op
> +            if j < len(formatted_ops) - 1:
> +                line += " -> "
> +        if i + 4 < len(sequence):
> +            line += " ->"
> +        print(f"mbuf {address}: " + line)
> +    print()
> +
> +
> +def match_field(match: re.Match) -> tuple[int, str]:
> +    name, value, _ = match.groups()
> +    return (int(value), name.replace('RTE_MBUF_', ''))
> +
> +
> +class HistoryEnum:
> +    def __init__(self, ops: enum.Enum):
> +        self.ops = ops
> +
> +    @staticmethod
> +    def from_header(header_file: str) -> 'HistoryEnum':
> +        with open(header_file, 'r') as f:
> +            content = f.read()
> +
> +        # Extract each enum value and its comment
> +        enum_content = re.search(ENUM_PATTERN, content, re.DOTALL).group(1)
> +        fields = map(match_field, re.finditer(VALUE_PATTERN, enum_content))
> +        fields = dict({v: k for k, v in fields})
> +        return HistoryEnum(enum.Enum('HistoryOps', fields))
> +
> +
> +class HistoryLine:
> +    def __init__(self, address: str, ops: list):
> +        self.address = address
> +        self.ops = ops
> +
> +    def repeats(self) -> [list[str], str | None]:
> +        repeated = None
> +        sequence = []
> +        for idx, op in enumerate(self.ops):
> +            if idx > 0 and op == self.ops[idx - 1] and op.name != 'NEVER':
> +                sequence[-1] = f"{RED}{op.name}{RESET}"
> +                sequence.append(f"{RED}{op.name}{RESET}")
> +                repeated = op.name
> +            else:
> +                sequence.append(op.name)
> +        return sequence, repeated
> +
> +
> +class HistoryMetrics:
> +    def __init__(self, metrics: dict[str, int]):
> +        self.metrics = metrics
> +
> +    def max_name_width(self) -> int:
> +        return max(len(name) for name in self.metrics.keys())
> +
> +
> +class HistoryParser:
> +    def __init__(self):
> +        self.history_enum = HistoryEnum.from_header(HEADER_FILE)
> +
> +    def parse(
> +        self, dump_file: str
> +    ) -> tuple[list[HistoryLine], 'HistoryMetrics']:
> +        with open(dump_file, 'r') as f:
> +            lines = [line for line in f.readlines() if line.strip()]
> +            populated = next(line for line in lines if "  populated=" in line)
> +            metrics_start = lines.index(populated)
> +
> +        history_lines = lines[3:metrics_start]
> +        metrics_lines = lines[metrics_start:-1]
> +        return (
> +            self._parse_history(history_lines),
> +            self._parse_metrics(metrics_lines)
> +        )
> +
> +    def _parse_metrics(self, lines: list[str]) -> HistoryMetrics:
> +        metrics = {}
> +        for line in lines:
> +            key, value = line.split('=', 1)
> +            metrics[key] = int(value)
> +        return HistoryMetrics(metrics)
> +
> +    def _parse_history(self, lines: list[str]) -> list[HistoryLine]:
> +        # Parse the format "mbuf 0x1054b9980: 0000000000000065"
> +        history_lines = []
> +        for line in lines:
> +            address = line.split(':')[0].split('mbuf ')[1]
> +            history = line.split(':')[1]
> +            history_lines.append(
> +                HistoryLine(
> +                    address=address,
> +                    ops=self._parse(int(history, 16))
> +                )
> +            )
> +        return history_lines
> +
> +    def _parse(self, history: int) -> list[str]:
> +        ops = []
> +        for _ in range(16):  # 64 bits / 4 bits = 16 possible operations
> +            op = history & 0xF  # Extract lowest 4 bits
> +            if op == 0:
> +                break
> +            ops.append(self.history_enum.ops(op))
> +            history >>= 4
> +
> +        ops.reverse()
> +        return ops
> +
> +
> +def print_history_lines(history_lines: list[HistoryLine]):
> +    lines = [
> +        (line.address, line.repeats()) for line in history_lines
> +    ]
> +
> +    for address, (sequence, _) in lines:
> +        print_history_sequence(address, sequence)
> +
> +    print("=== Violations ===")
> +    for address, (sequence, repeated) in lines:
> +        if repeated:
> +            print(f"mbuf {address} has repeated ops: {RED}{repeated}{RESET}")
> +
> +
> +def print_metrics(metrics: HistoryMetrics):
> +    print("=== Metrics Summary ===")
> +    for name, value in metrics.metrics.items():
> +        print(f"{name + ':':<{metrics.max_name_width() + 2}} {value}")
> +
> +
> +def main():
> +    if len(sys.argv) != 2:
> +        print("Usage: {} <history_file>".format(sys.argv[0]))
> +        sys.exit(1)

Could you use argparse for this? I know it is a bit overkill but it
takes care of the usage, help and error messages for you.

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("history_file")
args = parser.parse_args()

> +
> +    history_parser = HistoryParser()
> +    history_lines, metrics = history_parser.parse(sys.argv[1])

history_lines, metrics = history_parser.parse(args.history_file)

> +
> +    print_history_lines(history_lines)
> +    print()
> +    print_metrics(metrics)
> +
> +
> +if __name__ == "__main__":
> +    main()


-- 
Robin

> No motorized vehicles allowed.



More information about the dev mailing list