[PATCH v6 1/4] usertools/cpu_layout: update coding style
Anatoly Burakov
anatoly.burakov at intel.com
Thu Aug 22 12:38:23 CEST 2024
Update coding style:
- make it PEP-484 compliant
- format code with Ruff
- address all mypy etc. warnings
- use f-strings in place of old-style string interpolation
- refactor printing to make the code more readable
- read valid CPU ID's from "online" sysfs node
Signed-off-by: Anatoly Burakov <anatoly.burakov at intel.com>
Acked-by: Robin Jarry <rjarry at redhat.com>
---
Notes:
v4-v5:
- Format with Ruff on default settings
v3->v4:
- Format with Ruff, line width 79
v1,v2 -> v3:
- Import typing as T instead of individual types
usertools/cpu_layout.py | 161 ++++++++++++++++++++++++++--------------
1 file changed, 106 insertions(+), 55 deletions(-)
diff --git a/usertools/cpu_layout.py b/usertools/cpu_layout.py
index 891b9238fa..e133fb8ad3 100755
--- a/usertools/cpu_layout.py
+++ b/usertools/cpu_layout.py
@@ -3,62 +3,113 @@
# Copyright(c) 2010-2014 Intel Corporation
# Copyright(c) 2017 Cavium, Inc. All rights reserved.
-sockets = []
-cores = []
-core_map = {}
-base_path = "/sys/devices/system/cpu"
-fd = open("{}/kernel_max".format(base_path))
-max_cpus = int(fd.read())
-fd.close()
-for cpu in range(max_cpus + 1):
- try:
- fd = open("{}/cpu{}/topology/core_id".format(base_path, cpu))
- except IOError:
- continue
- core = int(fd.read())
- fd.close()
- fd = open("{}/cpu{}/topology/physical_package_id".format(base_path, cpu))
- socket = int(fd.read())
- fd.close()
- if core not in cores:
- cores.append(core)
- if socket not in sockets:
- sockets.append(socket)
- key = (socket, core)
- if key not in core_map:
- core_map[key] = []
- core_map[key].append(cpu)
+"""Display CPU topology information."""
-print(format("=" * (47 + len(base_path))))
-print("Core and Socket Information (as reported by '{}')".format(base_path))
-print("{}\n".format("=" * (47 + len(base_path))))
-print("cores = ", cores)
-print("sockets = ", sockets)
-print("")
+import typing as T
-max_processor_len = len(str(len(cores) * len(sockets) * 2 - 1))
-max_thread_count = len(list(core_map.values())[0])
-max_core_map_len = (max_processor_len * max_thread_count) \
- + len(", ") * (max_thread_count - 1) \
- + len('[]') + len('Socket ')
-max_core_id_len = len(str(max(cores)))
-output = " ".ljust(max_core_id_len + len('Core '))
-for s in sockets:
- output += " Socket %s" % str(s).ljust(max_core_map_len - len('Socket '))
-print(output)
-
-output = " ".ljust(max_core_id_len + len('Core '))
-for s in sockets:
- output += " --------".ljust(max_core_map_len)
- output += " "
-print(output)
-
-for c in cores:
- output = "Core %s" % str(c).ljust(max_core_id_len)
- for s in sockets:
- if (s, c) in core_map:
- output += " " + str(core_map[(s, c)]).ljust(max_core_map_len)
+def range_expand(rstr: str) -> T.List[int]:
+ """Expand a range string into a list of integers."""
+ # 0,1-3 => [0, 1-3]
+ ranges = rstr.split(",")
+ valset: T.List[int] = []
+ for r in ranges:
+ # 1-3 => [1, 2, 3]
+ if "-" in r:
+ start, end = r.split("-")
+ valset.extend(range(int(start), int(end) + 1))
else:
- output += " " * (max_core_map_len + 1)
- print(output)
+ valset.append(int(r))
+ return valset
+
+
+def read_sysfs(path: str) -> str:
+ """Read a sysfs file and return its contents."""
+ with open(path, encoding="utf-8") as fd:
+ return fd.read().strip()
+
+
+def print_row(row: T.Tuple[str, ...], col_widths: T.List[int]) -> None:
+ """Print a row of a table with the given column widths."""
+ first, *rest = row
+ w_first, *w_rest = col_widths
+ first_end = " " * 4
+ rest_end = " " * 10
+
+ print(first.ljust(w_first), end=first_end)
+ for cell, width in zip(rest, w_rest):
+ print(cell.rjust(width), end=rest_end)
+ print()
+
+
+def print_section(heading: str) -> None:
+ """Print a section heading."""
+ sep = "=" * len(heading)
+ print(sep)
+ print(heading)
+ print(sep)
+ print()
+
+
+def main() -> None:
+ """Print CPU topology information."""
+ sockets_s: T.Set[int] = set()
+ cores_s: T.Set[int] = set()
+ core_map: T.Dict[T.Tuple[int, int], T.List[int]] = {}
+ base_path = "/sys/devices/system/cpu"
+
+ cpus = range_expand(read_sysfs(f"{base_path}/online"))
+
+ for cpu in cpus:
+ lcore_base = f"{base_path}/cpu{cpu}"
+ core = int(read_sysfs(f"{lcore_base}/topology/core_id"))
+ socket = int(read_sysfs(f"{lcore_base}/topology/physical_package_id"))
+
+ cores_s.add(core)
+ sockets_s.add(socket)
+ key = (socket, core)
+ core_map.setdefault(key, [])
+ core_map[key].append(cpu)
+
+ cores = sorted(cores_s)
+ sockets = sorted(sockets_s)
+
+ print_section(f"Core and Socket Information (as reported by '{base_path}')")
+
+ print("cores = ", cores)
+ print("sockets = ", sockets)
+ print()
+
+ # Core, [Socket, Socket, ...]
+ heading_strs = "", *[f"Socket {s}" for s in sockets]
+ sep_strs = tuple("-" * len(hstr) for hstr in heading_strs)
+ rows: T.List[T.Tuple[str, ...]] = []
+
+ for c in cores:
+ # Core,
+ row: T.Tuple[str, ...] = (f"Core {c}",)
+
+ # [lcores, lcores, ...]
+ for s in sockets:
+ try:
+ lcores = core_map[(s, c)]
+ row += (str(lcores),)
+ except KeyError:
+ row += ("",)
+ rows += [row]
+
+ # find max widths for each column, including header and rows
+ col_widths = [
+ max(len(tup[col_idx]) for tup in rows + [heading_strs])
+ for col_idx in range(len(heading_strs))
+ ]
+
+ # print out table taking row widths into account
+ print_row(heading_strs, col_widths)
+ print_row(sep_strs, col_widths)
+ for row in rows:
+ print_row(row, col_widths)
+
+
+if __name__ == "__main__":
+ main()
--
2.43.5
More information about the dev
mailing list