misc: Bandit scan issue for lxml

This patch is to fix Bandit scan issue b313-b320 which is vulnerable to
XML attacks when parsing untrusted XML data.

I replace lxml.etree with the equivalent defusedxml package.

I confirm it works after making a Bandit scan, building the configurator
and compiling the acrn.

Signed-off-by: dongpingx <dongpingx.wu@intel.com>
Tracked-On: #8717
This commit is contained in:
dongpingx 2024-08-28 16:42:02 +08:00
parent 0198edf145
commit b0f9a36008
16 changed files with 53 additions and 49 deletions

View File

@ -9,7 +9,7 @@ import sys, os, re, argparse, shutil, ctypes
from acpi_const import *
import board_cfg_lib, acrn_config_utilities
import collections
import lxml.etree
from defusedxml.lxml import parse
from acrn_config_utilities import get_node
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'board_inspector'))
@ -861,11 +861,11 @@ def main(args):
scenario= params['--scenario']
out = params['--out']
board_etree = lxml.etree.parse(board)
board_etree = parse(board)
board_root = board_etree.getroot()
scenario_etree = lxml.etree.parse(scenario)
scenario_etree = parse(scenario)
scenario_root = scenario_etree.getroot()
allocation_etree = lxml.etree.parse(os.path.join(os.path.dirname(board), "configs", "allocation.xml"))
allocation_etree = parse(os.path.join(os.path.dirname(board), "configs", "allocation.xml"))
board_type = board_root.attrib['board']
scenario_name = scenario_root.attrib['scenario']
pcpu_list = board_root.find('CPU_PROCESSOR_INFO').text.strip().split(',')

View File

@ -9,7 +9,7 @@ import logging
import subprocess # nosec
import os, sys, argparse, re, shutil
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'board_inspector'))
import lxml.etree
from defusedxml.lxml import parse
from acpi_const import *
import acpiparser.tpm2
import inspectorlib.cdata
@ -251,8 +251,8 @@ def check_iasl(iasl_path, iasl_min_ver):
def main(args):
board_etree = lxml.etree.parse(args.board)
scenario_etree = lxml.etree.parse(args.scenario)
board_etree = parse(args.board)
scenario_etree = parse(args.scenario)
scenario_name = get_node("//@scenario", scenario_etree)
@ -266,7 +266,7 @@ def main(args):
hypervisor_out = args.out
DEST_ACPI_BIN_PATH = os.path.join(hypervisor_out, 'acpi')
allocation_etree = lxml.etree.parse(os.path.join(hypervisor_out, 'configs', 'allocation.xml'))
allocation_etree = parse(os.path.join(hypervisor_out, 'configs', 'allocation.xml'))
if os.path.isdir(DEST_ACPI_BIN_PATH):
shutil.rmtree(DEST_ACPI_BIN_PATH)

View File

@ -7,7 +7,7 @@ import sys
import enum
import board_cfg_lib
import acrn_config_utilities
import lxml.etree
from defusedxml.lxml import parse
import os
from acrn_config_utilities import get_node
@ -125,7 +125,7 @@ def populate_mba_delay_mask(rdt_res, mba_delay_list, config):
idx += 1
def get_rdt_enabled():
scenario_etree = lxml.etree.parse(acrn_config_utilities.SCENARIO_INFO_FILE)
scenario_etree = parse(acrn_config_utilities.SCENARIO_INFO_FILE)
enable = scenario_etree.xpath(f"//RDT_ENABLED/text()")
if enable[0] == "y":
return "true"
@ -133,7 +133,7 @@ def get_rdt_enabled():
return "false"
def get_cdp_enabled():
scenario_etree = lxml.etree.parse(acrn_config_utilities.SCENARIO_INFO_FILE)
scenario_etree = parse(acrn_config_utilities.SCENARIO_INFO_FILE)
enable = scenario_etree.xpath(f"//CDP_ENABLED/text()")
if enable[0] == "y":
return "true"
@ -154,7 +154,7 @@ def gen_rdt_str(cache, config):
err_dic = {}
cat_mask_list = {}
board_etree = lxml.etree.parse(acrn_config_utilities.BOARD_INFO_FILE)
board_etree = parse(acrn_config_utilities.BOARD_INFO_FILE)
mask_length = get_node(f"./capability[@id='CAT']/capacity_mask_length/text()", cache)
clos_number = get_node(f"./capability[@id='CAT']/clos_number/text()", cache)
@ -220,7 +220,7 @@ def gen_rdt_str(cache, config):
def get_mask_list(cache_level, cache_id):
allocation_dir = os.path.split(acrn_config_utilities.SCENARIO_INFO_FILE)[0] + "/configs/allocation.xml"
allocation_etree = lxml.etree.parse(allocation_dir)
allocation_etree = parse(allocation_dir)
if cache_level == "3":
clos_list = allocation_etree.xpath(f"//clos_mask[@id = 'l3']/clos/text()")
else:
@ -285,9 +285,9 @@ def gen_rdt_res(config):
err_dic = {}
res_present = [0, 0, 0]
scenario_etree = lxml.etree.parse(acrn_config_utilities.SCENARIO_INFO_FILE)
allocation_etree = lxml.etree.parse(acrn_config_utilities.SCENARIO_INFO_FILE)
board_etree = lxml.etree.parse(acrn_config_utilities.BOARD_INFO_FILE)
scenario_etree = parse(acrn_config_utilities.SCENARIO_INFO_FILE)
allocation_etree = parse(acrn_config_utilities.SCENARIO_INFO_FILE)
board_etree = parse(acrn_config_utilities.BOARD_INFO_FILE)
cache_list = board_etree.xpath(f"//cache[capability/@id = 'CAT' or capability/@id = 'MBA']")
gen_clos_array(cache_list, config)
@ -410,7 +410,7 @@ def gen_px_cx(config):
def gen_pci_hide(config):
"""Generate hide pci information for this platform"""
scenario_etree = lxml.etree.parse(acrn_config_utilities.SCENARIO_INFO_FILE)
scenario_etree = parse(acrn_config_utilities.SCENARIO_INFO_FILE)
hidden_pdev_list = [x.replace('.', ':') for x in scenario_etree.xpath(f"//HIDDEN_PDEV/text()")]
if board_cfg_lib.BOARD_NAME in list(board_cfg_lib.KNOWN_HIDDEN_PDEVS_BOARD_DB.keys()) and board_cfg_lib.KNOWN_HIDDEN_PDEVS_BOARD_DB[board_cfg_lib.BOARD_NAME] != 0:
@ -458,7 +458,7 @@ def gen_known_caps_pci_devs(config):
def gen_cpufreq_limits(config):
allocation_dir = os.path.split(acrn_config_utilities.SCENARIO_INFO_FILE)[0] + "/configs/allocation.xml"
allocation_etree = lxml.etree.parse(allocation_dir)
allocation_etree = parse(allocation_dir)
cpu_list = board_cfg_lib.get_processor_info()
max_cpu_num = len(cpu_list)

View File

@ -11,6 +11,7 @@ import logging
import tempfile
import subprocess # nosec
import lxml.etree
from defusedxml.lxml import parse
import argparse
from tqdm import tqdm
from collections import namedtuple
@ -158,7 +159,7 @@ def main(board_name, board_xml, args):
env = { "PYTHONPATH": script_dir, "PATH": os.environ["PATH"] }
subprocess.run([sys.executable, legacy_parser, args.board_name, "--out", board_xml], check=True, env=env)
# ... then load the created board XML and append it with additional data by invoking the extractors.
board_etree = lxml.etree.parse(board_xml)
board_etree = parse(board_xml)
root_node = board_etree.getroot()
# Clear the whitespaces between adjacent children under the root node

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
import argparse
import lxml.etree
from defusedxml.lxml import parse
def mmio_regions(etree):
ret = []
@ -21,7 +21,7 @@ if __name__ == "__main__":
parser.add_argument("file", help="board XML file")
args = parser.parse_args()
etree = lxml.etree.parse(args.file)
etree = parse(args.file)
regions = mmio_regions(etree)
for region in regions:
print("%-4s 0x%08x 0x%08x" % (region[0], region[1], region[2]))

View File

@ -7,7 +7,7 @@
import sys, os
import argparse
import lxml.etree as etree
from defusedxml.lxml import parse
import logging
import xmlschema
@ -18,7 +18,7 @@ logging_fn = {
}
def validate_board(xsd_path, board_etree):
schema_etree = etree.parse(xsd_path)
schema_etree = parse(xsd_path)
schema_etree.xinclude()
schema = xmlschema.XMLSchema11(schema_etree)

View File

@ -8,7 +8,7 @@ import os
from copy import deepcopy
import elementpath
import lxml.etree as etree
from defusedxml.lxml import fromstring
from bs4 import BeautifulSoup
from . import convert_result, nuc11_board, scenario_json_schema, nuc11_board_path
@ -20,7 +20,7 @@ def get_dynamic_scenario(board):
:type board: str
:param board: board xml text
"""
board_xml = etree.fromstring(board)
board_xml = fromstring(board)
def get_enum(source, options, option_names, obj_type):
elements = [str(x) for x in elementpath.select(source, options) if x]

View File

@ -9,7 +9,7 @@ __package__ = 'configurator.pyodide'
from pathlib import Path
from tempfile import TemporaryDirectory
from xml.etree.ElementTree import tostring
from defusedxml.ElementTree import tostring
from scenario_config.pipeline import PipelineObject, PipelineEngine
from scenario_config.xml_loader import XMLLoadStage

View File

@ -5,13 +5,14 @@ from .pyodide import convert_result, nuc11_board, nuc11_scenario
import re
from lxml import etree
from defusedxml.lxml import fromstring
class GenerateSchema:
def __init__(self, board, scenario):
parser = etree.XMLParser(remove_blank_text=True)
self.board_etree = etree.fromstring(board, parser)
self.board_etree = fromstring(board, parser)
self.scenario = scenario
@property

View File

@ -16,7 +16,7 @@ import argparse
import logging
import lxml.etree as etree
from defusedxml.lxml import parse
def eval_xpath(element, xpath, default_value=None):
@ -428,8 +428,8 @@ def generate_for_one_vm(board_etree, hv_scenario_etree, vm_scenario_etree, vm_id
def main(board_xml, scenario_xml, user_vm_id, out_dir):
board_etree = etree.parse(board_xml)
scenario_etree = etree.parse(scenario_xml)
board_etree = parse(board_xml)
scenario_etree = parse(scenario_xml)
service_vm_id = eval_xpath(scenario_etree, "//vm[load_order = 'SERVICE_VM']/@id")
service_vm_name = eval_xpath(scenario_etree, "//vm[load_order = 'SERVICE_VM']/name/text()")

View File

@ -11,7 +11,7 @@ import acrn_config_utilities
import board_cfg_lib
import scenario_cfg_lib
import lxml
import lxml.etree
from defusedxml.lxml import parse
ERR_LIST = {}
BOOT_TYPE = ['no', 'ovmf']
@ -673,7 +673,7 @@ def check_communication_vuart(launch_communication_vuarts, scenario_info):
return
def check_enable_ptm(launch_enable_ptm, scenario_info):
scenario_etree = lxml.etree.parse(scenario_info)
scenario_etree = parse(scenario_info)
enable_ptm_vm_list = scenario_etree.xpath("//vm[PTM = 'y']/@id")
for user_vmid, enable_ptm in launch_enable_ptm.items():
key = 'user_vm:id={},enable_ptm'.format(user_vmid)

View File

@ -10,7 +10,7 @@ import logging
import typing
import functools
import textwrap
from lxml import etree
from defusedxml.lxml import parse
t_content = typing.Union[str, typing.List[str]]
@ -100,8 +100,8 @@ class GenerateRst:
# Class initialization
def __init__(self, board_file_name, scenario_file_name, rst_file_name) -> None:
self.board_etree = etree.parse(board_file_name)
self.scenario_etree = etree.parse(scenario_file_name)
self.board_etree = parse(board_file_name)
self.scenario_etree = parse(scenario_file_name)
self.file = open(rst_file_name, 'w')
self.doc = Doc(self.file)

View File

@ -5,7 +5,8 @@
# SPDX-License-Identifier: BSD-3-Clause
#
from lxml.etree import parse, XMLParser
from lxml.etree import XMLParser
from defusedxml.lxml import parse
from pipeline import PipelineStage
class LXMLLoadStage(PipelineStage):

View File

@ -6,7 +6,7 @@
import os
import sys
import copy
import lxml.etree as etree
from defusedxml.lxml import parse, tostring
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'library'))
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'hv_config'))
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'acpi_gen'))
@ -100,9 +100,9 @@ def validate_scenario_schema(scenario_info):
XMLSchema does not process XInclude.
Use lxml to expand the schema which is feed to XMLSchema as a string.
"""
xsd_doc = etree.parse(acrn_config_utilities.SCENARIO_SCHEMA_FILE)
xsd_doc = parse(acrn_config_utilities.SCENARIO_SCHEMA_FILE)
xsd_doc.xinclude()
my_schema = xmlschema.XMLSchema11(etree.tostring(xsd_doc, encoding="unicode"))
my_schema = xmlschema.XMLSchema11(tostring(xsd_doc, encoding="unicode"))
it = my_schema.iter_errors(scenario_info)
for idx, validation_error in enumerate(it, start=1):
@ -124,12 +124,12 @@ def validate_scenario_schema(scenario_info):
scenario_cfg_lib.ERR_LIST[key] = element + reason
def apply_data_checks(board_info, scenario_info):
xsd_doc = etree.parse(acrn_config_utilities.DATACHECK_SCHEMA_FILE)
xsd_doc = parse(acrn_config_utilities.DATACHECK_SCHEMA_FILE)
xsd_doc.xinclude()
datachecks_schema = xmlschema.XMLSchema11(etree.tostring(xsd_doc, encoding="unicode"))
datachecks_schema = xmlschema.XMLSchema11(tostring(xsd_doc, encoding="unicode"))
main_etree = etree.parse(board_info)
scenario_etree = etree.parse(scenario_info)
main_etree = parse(board_info)
scenario_etree = parse(scenario_info)
main_etree.getroot().extend(scenario_etree.getroot()[:])
# FIXME: Figure out proper error keys for data check failures
error_key = ""

View File

@ -8,7 +8,7 @@ import sys, os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'library'))
import argparse
import lxml.etree
from defusedxml.lxml import parse
import acrn_config_utilities
from acrn_config_utilities import get_node
@ -41,8 +41,8 @@ def main(args):
Generate serial configuration file for service VM
:param args: command line args
"""
scenario_etree = lxml.etree.parse(args.scenario)
allocation_etree = lxml.etree.parse(args.allocation)
scenario_etree = parse(args.scenario)
allocation_etree = parse(args.allocation)
vuart_target_vmid = {}
vm_list = scenario_etree.xpath("//vm[load_order = 'SERVICE_VM']")

View File

@ -7,6 +7,7 @@
import sys, os
import lxml.etree
from defusedxml.lxml import parse, fromstring
import argparse
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'library'))
import acrn_config_utilities
@ -22,9 +23,9 @@ def main(args):
scripts_path = os.path.dirname(os.path.realpath(__file__))
current = os.path.basename(__file__)
board_etree = lxml.etree.parse(args.board)
scenario_etree = lxml.etree.parse(args.scenario)
allocation_etree = lxml.etree.ElementTree(element=lxml.etree.fromstring("<acrn-config></acrn-config>"))
board_etree = parse(args.board)
scenario_etree = parse(args.scenario)
allocation_etree = lxml.etree.ElementTree(element=fromstring("<acrn-config></acrn-config>"))
for script in sorted([f for f in os.listdir(scripts_path) if f.endswith(".py") and f != current]):
module_name = os.path.splitext(script)[0]
module = import_module(f"{module_name}")