board_inspector: use executables found under system paths

Using partial executable paths in the board inspector may cause unintended
results when another executable has the same name and is also detectable in
the search paths.

Introduce a wrapper module (`external_tools`) which locates executables
only under system paths such as /usr/bin and /usr/sbin and converts partial
executable paths to absolute ones before executing them via the subprocess
module. All invocations to `subprocess.run` or `subprocess.Popen`
throughout the board inspector are replaced with `external_tools.run`, with
the only exception being the invocation to the legacy board parser which
already uses an absolute path to the current Python interpreter.

Tracked-On: #8315
Signed-off-by: Junjie Mao <junjie.mao@intel.com>
This commit is contained in:
Junjie Mao 2022-11-16 18:28:15 +08:00 committed by acrnsi-robot
parent 32edd75f7f
commit ce6500893f
10 changed files with 111 additions and 68 deletions

View File

@ -20,7 +20,7 @@ script_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(script_dir)) sys.path.append(os.path.join(script_dir))
from cpuparser import parse_cpuid, get_online_cpu_ids, get_offline_cpu_ids from cpuparser import parse_cpuid, get_online_cpu_ids, get_offline_cpu_ids
from inspectorlib import validator from inspectorlib import external_tools, validator
class AddLLCCATAction(argparse.Action): class AddLLCCATAction(argparse.Action):
CATInfo = namedtuple("CATInfo", ["capacity_mask_length", "clos_number", "has_CDP"]) CATInfo = namedtuple("CATInfo", ["capacity_mask_length", "clos_number", "has_CDP"])
@ -38,37 +38,29 @@ class AddLLCCATAction(argparse.Action):
def check_deps(): def check_deps():
# Check that the required tools are installed on the system # Check that the required tools are installed on the system
BIN_LIST = ['cpuid', 'rdmsr', 'lspci', ' dmidecode', 'blkid', 'stty'] had_error = not external_tools.locate_tools(["cpuid", "rdmsr", "lspci", "dmidecode", "blkid", "stty", "modprobe"])
cpuid_min_ver = 20170122
had_error = False
for execute in BIN_LIST:
res = subprocess.Popen("which {}".format(execute),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
line = res.stdout.readline().decode('ascii') try:
if not line: cpuid_min_ver = 20170122
logger.critical("'{}' cannot be found. Please install it and run the Board Inspector again.".format(execute)) res = external_tools.run("cpuid -v")
line = res.stdout.readline().decode("ascii")
version = line.split()[2]
if int(version) < cpuid_min_ver:
logger.critical("This tool requires CPUID version >= {}. Try updating and upgrading the OS" \
"on this system and reruning the Board Inspector. If that fails, install a newer CPUID tool" \
"from https://github.com/tycho/cpuid.".format(cpuid_min_ver))
had_error = True had_error = True
except external_tools.ExecutableNotFound:
pass
if execute == 'cpuid':
res = subprocess.Popen("cpuid -v",
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
line = res.stdout.readline().decode('ascii')
version = line.split()[2]
if int(version) < cpuid_min_ver:
logger.critical("This tool requires CPUID version >= {}. Try updating and upgrading the OS" \
"on this system and reruning the Board Inspector. If that fails, install a newer CPUID tool" \
"from https://github.com/tycho/cpuid.".format(cpuid_min_ver))
had_error = True
if had_error: if had_error:
sys.exit(1) sys.exit(1)
# Try updating pci.ids for latest PCI device descriptions # Try updating pci.ids for latest PCI device descriptions
external_tools.locate_tools(["update-pciids"])
try: try:
logger.info("Updating pci.ids for latest PCI device descriptions.") logger.info("Updating pci.ids for latest PCI device descriptions.")
res = subprocess.Popen(["update-pciids", "-q"], stderr=subprocess.DEVNULL) res = external_tools.run("update-pciids -q", stderr=subprocess.DEVNULL)
if res.wait(timeout=40) != 0: if res.wait(timeout=40) != 0:
logger.warning(f"Failed to invoke update-pciids. No functional impact is foreseen, but descriptions of PCI devices may be inaccurate.") logger.warning(f"Failed to invoke update-pciids. No functional impact is foreseen, but descriptions of PCI devices may be inaccurate.")
except Exception as e: except Exception as e:

View File

@ -7,7 +7,6 @@
from __future__ import print_function from __future__ import print_function
import sys import sys
import subprocess # nosec
import re import re
import functools import functools
import inspect import inspect
@ -15,6 +14,7 @@ import operator
import textwrap import textwrap
import logging import logging
from collections import namedtuple from collections import namedtuple
from inspectorlib import external_tools
_wrapper = textwrap.TextWrapper(width=78, initial_indent=' ', subsequent_indent=' ') _wrapper = textwrap.TextWrapper(width=78, initial_indent=' ', subsequent_indent=' ')
regex_hex = "0x[0-9a-f]+" regex_hex = "0x[0-9a-f]+"
@ -26,7 +26,7 @@ class cpuid_result(namedtuple('cpuid_result', ['eax', 'ebx', 'ecx', 'edx'])):
return "cpuid_result(eax={eax:#010x}, ebx={ebx:#010x}, ecx={ecx:#010x}, edx={edx:#010x})".format(**self._asdict()) return "cpuid_result(eax={eax:#010x}, ebx={ebx:#010x}, ecx={ecx:#010x}, edx={edx:#010x})".format(**self._asdict())
def cpuid(cpu_id, leaf, subleaf): def cpuid(cpu_id, leaf, subleaf):
result = subprocess.run(["cpuid", "-l", str(leaf), "-s", str(subleaf), "-r"], stdout=subprocess.PIPE, check=True) result = external_tools.run(["cpuid", "-l", str(leaf), "-s", str(subleaf), "-r"], check=True)
stdout = result.stdout.decode("ascii").replace("\n", "") stdout = result.stdout.decode("ascii").replace("\n", "")
regex = re.compile(f"CPU {cpu_id}:[^:]*: eax=({regex_hex}) ebx=({regex_hex}) ecx=({regex_hex}) edx=({regex_hex})") regex = re.compile(f"CPU {cpu_id}:[^:]*: eax=({regex_hex}) ebx=({regex_hex}) ecx=({regex_hex}) edx=({regex_hex})")
m = regex.search(stdout) m = regex.search(stdout)

View File

@ -4,7 +4,6 @@
# #
import logging import logging
import subprocess
import lxml.etree import lxml.etree
import re import re

View File

@ -0,0 +1,75 @@
# Copyright (C) 2022 Intel Corporation.
#
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import logging
import subprocess # nosec
available_tools = {}
class ExecutableNotFound(ValueError):
pass
def run(command, **kwargs):
"""Run the given command which can be either a string or a list."""
try:
if isinstance(command, list):
full_path = available_tools[command[0]]
kwargs.setdefault("capture_output", True)
return subprocess.run([full_path] + command[1:], **kwargs)
elif isinstance(command, str):
parts = command.split(" ", maxsplit=1)
full_path = available_tools[parts[0]]
kwargs["shell"] = True
kwargs.setdefault("stdout", subprocess.PIPE)
kwargs.setdefault("stderr", subprocess.PIPE)
kwargs.setdefault("close_fds", True)
cmd = f"{full_path} {parts[1]}" if len(parts) == 2 else full_path
return subprocess.Popen(cmd, **kwargs)
except KeyError:
raise ExecutableNotFound
assert False, f"A command is expected either a list or a string: {command}"
def detect_tool(name):
"""Detect the full path of a system tool."""
system_paths = [
"/usr/bin/",
"/usr/sbin/",
"/usr/local/bin/",
"/usr/local/sbin/",
]
# Look for `which` first.
for path in system_paths:
candidate = os.path.join(path, "which")
if os.path.exists(candidate):
available_tools["which"] = candidate
break
try:
result = run(["which", "-a", name])
for path in result.stdout.decode().strip().split("\n"):
if any(map(lambda x: path.startswith(x), system_paths)):
logging.debug(f"Use {name} found at {path}.")
return path
except ExecutableNotFound:
pass
logging.critical(f"'{name}' cannot be found. Please install it and run again.")
return None
def locate_tools(tool_list):
"""Find a list of tools under common system executable paths. Return True if and only if all tools are found."""
had_error = False
for tool in tool_list:
full_path = detect_tool(tool)
if full_path != None:
available_tools[tool] = full_path
else:
had_error = True
return not had_error

View File

@ -5,12 +5,12 @@
import os import os
import sys import sys
import subprocess # nosec
import shutil import shutil
from collections import defaultdict from collections import defaultdict
import dmar import dmar
import parser_lib import parser_lib
import logging import logging
from inspectorlib import external_tools
SYS_PATH = ['/proc/cpuinfo', '/sys/firmware/acpi/tables/', '/sys/devices/system/cpu/'] SYS_PATH = ['/proc/cpuinfo', '/sys/firmware/acpi/tables/', '/sys/devices/system/cpu/']
@ -546,14 +546,11 @@ def store_px_data(sysnode, config):
p_cnt = 0 p_cnt = 0
for freq in freqs.split(): for freq in freqs.split():
if boost != 0 and i == 0: if boost != 0 and i == 0:
try: res = external_tools.run(['rdmsr', '0x1ad'])
subprocess.check_call('/usr/sbin/rdmsr 0x1ad', shell=True, stdout=subprocess.PIPE) if res.returncode != 0:
except subprocess.CalledProcessError:
logging.debug("MSR 0x1ad not support in this platform!") logging.debug("MSR 0x1ad not support in this platform!")
return return
res = subprocess.Popen('/usr/sbin/rdmsr 0x1ad', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
result = res.stdout.readline().strip() result = res.stdout.readline().strip()
#max_ratio_cpu = result[-2:] #max_ratio_cpu = result[-2:]
ctl_state = int(result[-2:], 16) << 8 ctl_state = int(result[-2:], 16) << 8
@ -600,11 +597,7 @@ def read_tpm_data(config):
:param config: file pointer that opened for writing board config information :param config: file pointer that opened for writing board config information
:return: :return:
''' '''
try: if os.path.exists('/sys/firmware/acpi/tables/TPM2'):
acpi_table_output = subprocess.check_output('ls -l /sys/firmware/acpi/tables/'.split()).decode('utf8')
except:
acpi_table_output = ''
if 'TPM2' in acpi_table_output:
print("\tTPM2", file=config) print("\tTPM2", file=config)
else: else:
print("\t/* no TPM device */", file=config) print("\t/* no TPM device */", file=config)

View File

@ -7,7 +7,6 @@ import os
import sys import sys
import shutil import shutil
import argparse import argparse
import subprocess # nosec
import pci_dev import pci_dev
import dmi import dmi
import acpi import acpi
@ -15,6 +14,7 @@ import clos
import misc import misc
import parser_lib import parser_lib
import logging import logging
from inspectorlib import external_tools
OUTPUT = "./out/" OUTPUT = "./out/"
PY_CACHE = "__pycache__" PY_CACHE = "__pycache__"
@ -52,12 +52,13 @@ def check_env():
if os.path.exists(PY_CACHE): if os.path.exists(PY_CACHE):
shutil.rmtree(PY_CACHE) shutil.rmtree(PY_CACHE)
if not external_tools.locate_tools(['cpuid', 'rdmsr', 'lspci', 'dmidecode', 'blkid', 'stty', 'modprobe']):
sys.exit(1)
# check cpu msr file # check cpu msr file
cpu_dirs = "/dev/cpu" cpu_dirs = "/dev/cpu"
if not check_msr_files(cpu_dirs): if not check_msr_files(cpu_dirs):
res = subprocess.Popen("modprobe msr", res = external_tools.run("modprobe msr")
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
err_msg = res.stderr.readline().decode('ascii') err_msg = res.stderr.readline().decode('ascii')
if err_msg: if err_msg:
logging.critical("{}".format(err_msg)) logging.critical("{}".format(err_msg))

View File

@ -3,8 +3,8 @@
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import parser_lib
import logging import logging
from inspectorlib import external_tools
RDT_TYPE = { RDT_TYPE = {
"L2":4, "L2":4,
@ -18,7 +18,7 @@ def dump_cpuid_reg(cmd, reg):
:param cmd: command what can be executed in shell :param cmd: command what can be executed in shell
:param reg: register name :param reg: register name
""" """
res = parser_lib.cmd_execute(cmd) res = external_tools.run(cmd)
if reg == "eax": if reg == "eax":
idx = 2 idx = 2
if reg == "ebx": if reg == "ebx":
@ -27,7 +27,7 @@ def dump_cpuid_reg(cmd, reg):
idx = 5 idx = 5
while True: while True:
line = parser_lib.decode_stdout(res) line = res.stdout.readline().decode('ascii')
if not line: if not line:
break break

View File

@ -5,7 +5,7 @@
import sys import sys
import ctypes import ctypes
import parser_lib from inspectorlib import external_tools
ACPI_DMAR_TYPE = { ACPI_DMAR_TYPE = {
'ACPI_DMAR_TYPE_HARDWARE_UNIT':0, 'ACPI_DMAR_TYPE_HARDWARE_UNIT':0,
@ -193,7 +193,7 @@ def get_secondary_bus(dmar_tbl, tmp_dev, tmp_fun):
secondary_bus_str = '' secondary_bus_str = ''
found_pci_bdf = False found_pci_bdf = False
pci_bridge_header_type = False pci_bridge_header_type = False
res = parser_lib.cmd_execute(cmd) res = external_tools.run(cmd)
while True: while True:
line = res.stdout.readline().decode("ascii") line = res.stdout.readline().decode("ascii")

View File

@ -4,6 +4,7 @@
# #
import parser_lib, os import parser_lib, os
from inspectorlib import external_tools
from extractors.helpers import get_bdf_from_realpath from extractors.helpers import get_bdf_from_realpath
MEM_PATH = ['/proc/iomem', '/proc/meminfo'] MEM_PATH = ['/proc/iomem', '/proc/meminfo']
@ -35,7 +36,7 @@ def detected_ttys():
tty_used_list = [] tty_used_list = []
for s_inc in range(ttys_cnt): for s_inc in range(ttys_cnt):
cmd = 'stty -F /dev/ttyS{}'.format(s_inc) cmd = 'stty -F /dev/ttyS{}'.format(s_inc)
res = parser_lib.cmd_execute('{}'.format(cmd)) res = external_tools.run('{}'.format(cmd))
while True: while True:
line = res.stdout.readline().decode('ascii') line = res.stdout.readline().decode('ascii')

View File

@ -4,7 +4,7 @@
# #
import os import os
import subprocess # nosec from inspectorlib import external_tools
BIOS_INFO_KEY = ['BIOS Information', 'Vendor:', 'Version:', 'Release Date:', 'BIOS Revision:'] BIOS_INFO_KEY = ['BIOS Information', 'Vendor:', 'Version:', 'Release Date:', 'BIOS Revision:']
@ -45,14 +45,6 @@ def print_red(msg, err=False):
print("\033[1;31m{0}\033[0m".format(msg)) print("\033[1;31m{0}\033[0m".format(msg))
def decode_stdout(resource):
"""Decode the information and return one line of the decoded information
:param resource: it contains information produced by subprocess.Popen method
"""
line = resource.stdout.readline().decode('ascii')
return line
def handle_hw_info(line, hw_info): def handle_hw_info(line, hw_info):
"""Handle the hardware information """Handle the hardware information
:param line: one line of information which had decoded to 'ASCII' :param line: one line of information which had decoded to 'ASCII'
@ -80,16 +72,6 @@ def handle_pci_dev(line):
return False return False
def cmd_execute(cmd):
"""Excute cmd and retrun raw information
:param cmd: command what can be executed in shell
"""
res = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
return res
def handle_block_dev(line): def handle_block_dev(line):
"""Handle if it match root device information pattern """Handle if it match root device information pattern
:param line: one line of information which had decoded to 'ASCII' :param line: one line of information which had decoded to 'ASCII'
@ -122,7 +104,7 @@ def dump_execute(cmd, desc, config):
print("\t\t</{0}>".format(desc), file=config) print("\t\t</{0}>".format(desc), file=config)
return return
res = cmd_execute(cmd) res = external_tools.run(cmd)
while True: while True:
line = res.stdout.readline().decode('ascii') line = res.stdout.readline().decode('ascii')
line = line.replace("&", "&amp;").replace('"', "&quot;") \ line = line.replace("&", "&amp;").replace('"', "&quot;") \
@ -160,7 +142,7 @@ def dump_execute(cmd, desc, config):
def get_output_lines(cmd): def get_output_lines(cmd):
res_lines = [] res_lines = []
res = cmd_execute(cmd) res = external_tools.run(cmd)
while True: while True:
line = res.stdout.readline().decode('ascii') line = res.stdout.readline().decode('ascii')
if not line: if not line: