[PATCH v4 1/4] usertools/cpu_layout: update coding style
    Anatoly Burakov 
    anatoly.burakov at intel.com
       
    Wed Aug 21 11:22:36 CEST 2024
    
    
  
Update coding style:
- make it PEP-484 compliant
- address all flake8, mypy etc. warnings
- use f-strings in place of old-style string interpolation
- refactor printing to make the code more readable
- read valid CPU ID's from "online" sysfs node
Signed-off-by: Anatoly Burakov <anatoly.burakov at intel.com>
---
Notes:
    v3->v4:
    - Format with Ruff, line width 79
    
    v1,v2 -> v3:
    - Import typing as T instead of individual types
 usertools/cpu_layout.py | 163 ++++++++++++++++++++++++++--------------
 1 file changed, 108 insertions(+), 55 deletions(-)
diff --git a/usertools/cpu_layout.py b/usertools/cpu_layout.py
index 891b9238fa..8812ea286b 100755
--- a/usertools/cpu_layout.py
+++ b/usertools/cpu_layout.py
@@ -3,62 +3,115 @@
 # Copyright(c) 2010-2014 Intel Corporation
 # Copyright(c) 2017 Cavium, Inc. All rights reserved.
 
-sockets = []
-cores = []
-core_map = {}
-base_path = "/sys/devices/system/cpu"
-fd = open("{}/kernel_max".format(base_path))
-max_cpus = int(fd.read())
-fd.close()
-for cpu in range(max_cpus + 1):
-    try:
-        fd = open("{}/cpu{}/topology/core_id".format(base_path, cpu))
-    except IOError:
-        continue
-    core = int(fd.read())
-    fd.close()
-    fd = open("{}/cpu{}/topology/physical_package_id".format(base_path, cpu))
-    socket = int(fd.read())
-    fd.close()
-    if core not in cores:
-        cores.append(core)
-    if socket not in sockets:
-        sockets.append(socket)
-    key = (socket, core)
-    if key not in core_map:
-        core_map[key] = []
-    core_map[key].append(cpu)
+"""Display CPU topology information."""
 
-print(format("=" * (47 + len(base_path))))
-print("Core and Socket Information (as reported by '{}')".format(base_path))
-print("{}\n".format("=" * (47 + len(base_path))))
-print("cores = ", cores)
-print("sockets = ", sockets)
-print("")
+import typing as T
 
-max_processor_len = len(str(len(cores) * len(sockets) * 2 - 1))
-max_thread_count = len(list(core_map.values())[0])
-max_core_map_len = (max_processor_len * max_thread_count)  \
-                      + len(", ") * (max_thread_count - 1) \
-                      + len('[]') + len('Socket ')
-max_core_id_len = len(str(max(cores)))
 
-output = " ".ljust(max_core_id_len + len('Core '))
-for s in sockets:
-    output += " Socket %s" % str(s).ljust(max_core_map_len - len('Socket '))
-print(output)
-
-output = " ".ljust(max_core_id_len + len('Core '))
-for s in sockets:
-    output += " --------".ljust(max_core_map_len)
-    output += " "
-print(output)
-
-for c in cores:
-    output = "Core %s" % str(c).ljust(max_core_id_len)
-    for s in sockets:
-        if (s, c) in core_map:
-            output += " " + str(core_map[(s, c)]).ljust(max_core_map_len)
+def range_expand(rstr: str) -> T.List[int]:
+    """Expand a range string into a list of integers."""
+    # 0,1-3 => [0, 1-3]
+    ranges = rstr.split(",")
+    valset: T.List[int] = []
+    for r in ranges:
+        # 1-3 => [1, 2, 3]
+        if "-" in r:
+            start, end = r.split("-")
+            valset.extend(range(int(start), int(end) + 1))
         else:
-            output += " " * (max_core_map_len + 1)
-    print(output)
+            valset.append(int(r))
+    return valset
+
+
+def read_sysfs(path: str) -> str:
+    """Read a sysfs file and return its contents."""
+    with open(path, encoding="utf-8") as fd:
+        return fd.read().strip()
+
+
+def print_row(row: T.Tuple[str, ...], col_widths: T.List[int]) -> None:
+    """Print a row of a table with the given column widths."""
+    first, *rest = row
+    w_first, *w_rest = col_widths
+    first_end = " " * 4
+    rest_end = " " * 10
+
+    print(first.ljust(w_first), end=first_end)
+    for cell, width in zip(rest, w_rest):
+        print(cell.rjust(width), end=rest_end)
+    print()
+
+
+def print_section(heading: str) -> None:
+    """Print a section heading."""
+    sep = "=" * len(heading)
+    print(sep)
+    print(heading)
+    print(sep)
+    print()
+
+
+def main() -> None:
+    """Print CPU topology information."""
+    sockets_s: T.Set[int] = set()
+    cores_s: T.Set[int] = set()
+    core_map: T.Dict[T.Tuple[int, int], T.List[int]] = {}
+    base_path = "/sys/devices/system/cpu"
+
+    cpus = range_expand(read_sysfs(f"{base_path}/online"))
+
+    for cpu in cpus:
+        lcore_base = f"{base_path}/cpu{cpu}"
+        core = int(read_sysfs(f"{lcore_base}/topology/core_id"))
+        socket = int(read_sysfs(f"{lcore_base}/topology/physical_package_id"))
+
+        cores_s.add(core)
+        sockets_s.add(socket)
+        key = (socket, core)
+        core_map.setdefault(key, [])
+        core_map[key].append(cpu)
+
+    cores = sorted(cores_s)
+    sockets = sorted(sockets_s)
+
+    print_section(
+        f"Core and Socket Information (as reported by '{base_path}')"
+    )
+
+    print("cores = ", cores)
+    print("sockets = ", sockets)
+    print()
+
+    # Core, [Socket, Socket, ...]
+    heading_strs = "", *[f"Socket {s}" for s in sockets]
+    sep_strs = tuple("-" * len(hstr) for hstr in heading_strs)
+    rows: T.List[T.Tuple[str, ...]] = []
+
+    for c in cores:
+        # Core,
+        row: T.Tuple[str, ...] = (f"Core {c}",)
+
+        # [lcores, lcores, ...]
+        for s in sockets:
+            try:
+                lcores = core_map[(s, c)]
+                row += (str(lcores),)
+            except KeyError:
+                row += ("",)
+        rows += [row]
+
+    # find max widths for each column, including header and rows
+    col_widths = [
+        max(len(tup[col_idx]) for tup in rows + [heading_strs])
+        for col_idx in range(len(heading_strs))
+    ]
+
+    # print out table taking row widths into account
+    print_row(heading_strs, col_widths)
+    print_row(sep_strs, col_widths)
+    for row in rows:
+        print_row(row, col_widths)
+
+
+if __name__ == "__main__":
+    main()
-- 
2.43.5
    
    
More information about the dev
mailing list