[PATCH v2 4/4] usertool: add a script to parse mbuf history dump
Shani Peretz
shperetz at nvidia.com
Tue Sep 16 17:12:07 CEST 2025
Added a Python script that parses the history dump of a mbufs
generated by rte_mbuf_objects_dump and presents it in a human-readable
format.
If an operation ID is repeated, such as in the case of a double free,
it will be highlighted in red and listed at the end of the file.
Signed-off-by: Shani Peretz <shperetz at nvidia.com>
---
usertools/dpdk-mbuf_history_parser.py | 173 ++++++++++++++++++++++++++
1 file changed, 173 insertions(+)
create mode 100755 usertools/dpdk-mbuf_history_parser.py
diff --git a/usertools/dpdk-mbuf_history_parser.py b/usertools/dpdk-mbuf_history_parser.py
new file mode 100755
index 0000000000..c39a796d5d
--- /dev/null
+++ b/usertools/dpdk-mbuf_history_parser.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 NVIDIA Corporation & Affiliates
+
+import sys
+import re
+import os
+import enum
+
+RED = "\033[91m"
+RESET = "\033[0m"
+ENUM_PATTERN = r'enum\s+rte_mbuf_history_op\s*{([^}]+)}'
+VALUE_PATTERN = r'([A-Z_]+)\s*=\s*(\d+),\s*(?:/\*\s*(.*?)\s*\*/)?'
+HEADER_FILE = os.path.join(
+ os.path.dirname(os.path.dirname(__file__)),
+ 'lib/mbuf/rte_mbuf_history.h'
+)
+
+
+def print_history_sequence(address: str, sequence: list[str]):
+ max_op_width = max(
+ len(re.sub(r'\x1b\[[0-9;]*m', '', op)) for op in sequence
+ )
+ op_width = max_op_width
+ for i in range(0, len(sequence), 4):
+ chunk = sequence[i:i + 4]
+ formatted_ops = [f"{op:<{op_width}}" for op in chunk]
+ line = ""
+ for j, op in enumerate(formatted_ops):
+ line += op
+ if j < len(formatted_ops) - 1:
+ line += " -> "
+ if i + 4 < len(sequence):
+ line += " ->"
+ print(f"mbuf {address}: " + line)
+ print()
+
+
+def match_field(match: re.Match) -> tuple[int, str]:
+ name, value, _ = match.groups()
+ return (int(value), name.replace('RTE_MBUF_', ''))
+
+
+class HistoryEnum:
+ def __init__(self, ops: enum.Enum):
+ self.ops = ops
+
+ @staticmethod
+ def from_header(header_file: str) -> 'HistoryEnum':
+ with open(header_file, 'r') as f:
+ content = f.read()
+
+ # Extract each enum value and its comment
+ enum_content = re.search(ENUM_PATTERN, content, re.DOTALL).group(1)
+ fields = map(match_field, re.finditer(VALUE_PATTERN, enum_content))
+ fields = dict({v: k for k, v in fields})
+ return HistoryEnum(enum.Enum('HistoryOps', fields))
+
+
+class HistoryLine:
+ def __init__(self, address: str, ops: list):
+ self.address = address
+ self.ops = ops
+
+ def repeats(self) -> [list[str], str | None]:
+ repeated = None
+ sequence = []
+ for idx, op in enumerate(self.ops):
+ if idx > 0 and op == self.ops[idx - 1] and op.name != 'NEVER':
+ sequence[-1] = f"{RED}{op.name}{RESET}"
+ sequence.append(f"{RED}{op.name}{RESET}")
+ repeated = op.name
+ else:
+ sequence.append(op.name)
+ return sequence, repeated
+
+
+class HistoryMetrics:
+ def __init__(self, metrics: dict[str, int]):
+ self.metrics = metrics
+
+ def max_name_width(self) -> int:
+ return max(len(name) for name in self.metrics.keys())
+
+
+class HistoryParser:
+ def __init__(self):
+ self.history_enum = HistoryEnum.from_header(HEADER_FILE)
+
+ def parse(
+ self, dump_file: str
+ ) -> tuple[list[HistoryLine], 'HistoryMetrics']:
+ with open(dump_file, 'r') as f:
+ lines = [line for line in f.readlines() if line.strip()]
+ populated = next(line for line in lines if "Populated:" in line)
+ metrics_start = lines.index(populated)
+
+ history_lines = lines[3:metrics_start]
+ metrics_lines = lines[metrics_start:-1]
+ return (
+ self._parse_history(history_lines),
+ self._parse_metrics(metrics_lines)
+ )
+
+ def _parse_metrics(self, lines: list[str]) -> HistoryMetrics:
+ metrics = {}
+ for line in lines:
+ key, value = line.split(':', 1)
+ metrics[key] = int(value)
+ return HistoryMetrics(metrics)
+
+ def _parse_history(self, lines: list[str]) -> list[HistoryLine]:
+ # Parse the format "mbuf 0x1054b9980: 0000000000000065"
+ history_lines = []
+ for line in lines:
+ address = line.split(':')[0].split('mbuf ')[1]
+ history = line.split(':')[1]
+ history_lines.append(
+ HistoryLine(
+ address=address,
+ ops=self._parse(int(history, 16))
+ )
+ )
+ return history_lines
+
+ def _parse(self, history: int) -> list[str]:
+ ops = []
+ for _ in range(16): # 64 bits / 4 bits = 16 possible operations
+ op = history & 0xF # Extract lowest 4 bits
+ if op == 0:
+ break
+ ops.append(self.history_enum.ops(op))
+ history >>= 4
+
+ ops.reverse()
+ return ops
+
+
+def print_history_lines(history_lines: list[HistoryLine]):
+ lines = [
+ (line.address, line.repeats()) for line in history_lines
+ ]
+
+ for address, (sequence, _) in lines:
+ print_history_sequence(address, sequence)
+
+ print("=== Violations ===")
+ for address, (sequence, repeated) in lines:
+ if repeated:
+ print(f"mbuf {address} has repeated ops: {RED}{repeated}{RESET}")
+
+
+def print_metrics(metrics: HistoryMetrics):
+ print("=== Metrics Summary ===")
+ for name, value in metrics.metrics.items():
+ print(f"{name + ':':<{metrics.max_name_width() + 2}} {value}")
+
+
+def main():
+ if len(sys.argv) != 2:
+ print("Usage: {} <history_file>".format(sys.argv[0]))
+ sys.exit(1)
+
+ history_parser = HistoryParser()
+ history_lines, metrics = history_parser.parse(sys.argv[1])
+
+ print_history_lines(history_lines)
+ print()
+ print_metrics(metrics)
+
+
+if __name__ == "__main__":
+ main()
--
2.34.1
More information about the dev
mailing list