#!/usr/bin/env python3 # # Copyright (C) 2022 Intel Corporation. # # SPDX-License-Identifier: BSD-3-Clause # import sys, os import argparse import logging from copy import copy from collections import namedtuple try: import xmlschema except ImportError: logging.error("Python package `xmlschema` is not installed.\n" + "The scenario XML file will NOT be validated against the schema, which may cause build-time or runtime errors.\n" + "To enable the validation, install the python package by executing: pip3 install xmlschema.") sys.exit(0) from pipeline import PipelineObject, PipelineStage, PipelineEngine from default_populator import DefaultValuePopulatingStage def existing_file_type(parser): def aux(arg): if not os.path.exists(arg): parser.error(f"can't open {arg}: No such file or directory") elif not os.path.isfile(arg): parser.error(f"can't open {arg}: Is not a file") else: return arg return aux def log_level_type(parser): def aux(arg): arg = arg.lower() if arg in ["critical", "error", "warning", "info", "debug"]: return arg else: parser.error(f"{arg} is not a valid log level") return aux class ValidationError(dict): def __init__(self, paths, message, severity): super().__init__(paths = paths, message = message, severity = severity) def __str__(self): return f"{', '.join(self['paths'])}: {self['message']}" class ScenarioValidator: def __init__(self, schema_etree, datachecks_etree): """Initialize the validator with preprocessed schemas in ElementTree.""" self.schema = xmlschema.XMLSchema11(schema_etree) self.datachecks = xmlschema.XMLSchema11(datachecks_etree) def check_syntax(self, scenario_etree): errors = [] it = self.schema.iter_errors(scenario_etree) for error in it: # Syntactic errors are always critical. e = ValidationError([error.path], error.reason, "critical") logging.debug(e) errors.append(e) return errors def check_semantics(self, board_etree, scenario_etree): errors = [] unified_node = copy(scenario_etree.getroot()) unified_node.extend(board_etree.getroot()) it = self.datachecks.iter_errors(unified_node) for error in it: logging.debug(f"{error.elem}: {error.message}") anno = error.validator.annotation severity = anno.elem.get("{https://projectacrn.org}severity") errors.append(ValidationError([error.elem.tag], error.message, severity)) return errors class ValidatorConstructionStage(PipelineStage): # The schema etree may still useful for schema-based transformation. Do not consume it. uses = {"schema_etree"} consumes = {"datachecks_etree"} provides = {"validator"} def run(self, obj): validator = ScenarioValidator(obj.get("schema_etree"), obj.get("datachecks_etree")) obj.set("validator", validator) class ValidatorConstructionByFileStage(PipelineStage): uses = {"schema_path", "datachecks_path"} provides = {"validator"} def run(self, obj): validator = ScenarioValidator(obj.get("schema_path"), obj.get("datachecks_path")) obj.set("validator", validator) class SyntacticValidationStage(PipelineStage): uses = {"validator", "scenario_etree"} provides = {"syntactic_errors"} def run(self, obj): errors = obj.get("validator").check_syntax(obj.get("scenario_etree")) obj.set("syntactic_errors", errors) class SemanticValidationStage(PipelineStage): uses = {"validator", "board_etree", "scenario_etree"} provides = {"semantic_errors"} def run(self, obj): errors = obj.get("validator").check_semantics(obj.get("board_etree"), obj.get("scenario_etree")) obj.set("semantic_errors", errors) class ReportValidationResultStage(PipelineStage): consumes = {"board_etree", "scenario_etree", "syntactic_errors", "semantic_errors"} provides = {"nr_all_errors"} def run(self, obj): board_name = obj.get("board_etree").getroot().get("board") scenario_name = obj.get("scenario_etree").getroot().get("scenario") nr_critical = len(obj.get("syntactic_errors")) nr_error = len(list(filter(lambda e: e["severity"] == "error", obj.get("semantic_errors")))) nr_warning = len(list(filter(lambda e: e["severity"] == "warning", obj.get("semantic_errors")))) if nr_critical > 0 or nr_error > 0: logging.error(f"Board {board_name} and scenario {scenario_name} are inconsistent: {nr_critical} syntax errors, {nr_error} data errors, {nr_warning} warnings.") elif nr_warning > 0: logging.warning(f"Board {board_name} and scenario {scenario_name} are potentially inconsistent: {nr_warning} warnings.") else: logging.info(f"Board {board_name} and scenario {scenario_name} are valid and consistent.") obj.set("nr_all_errors", nr_critical + nr_error + nr_warning) def validate_one(validation_pipeline, pipeline_obj, board_xml, scenario_xml): pipeline_obj.set("board_path", board_xml) pipeline_obj.set("scenario_path", scenario_xml) validation_pipeline.run(pipeline_obj) return pipeline_obj.consume("nr_all_errors") def validate_board(validation_pipeline, pipeline_obj, board_xml): board_dir = os.path.dirname(board_xml) nr_all_errors = 0 for f in os.listdir(board_dir): if not f.endswith(".xml"): continue if f == os.path.basename(board_xml) or "launch" in f: continue nr_all_errors += validate_one(validation_pipeline, pipeline_obj, board_xml, os.path.join(board_dir, f)) return nr_all_errors def validate_all(validation_pipeline, pipeline_obj, data_dir): nr_all_errors = 0 for f in os.listdir(data_dir): board_xml = os.path.join(data_dir, f, f"{f}.xml") if os.path.isfile(board_xml): nr_all_errors += validate_board(validation_pipeline, pipeline_obj, board_xml) else: logging.warning(f"Cannot find a board XML under {os.path.join(data_dir, f)}") return nr_all_errors def main(args): from xml_loader import XMLLoadStage from lxml_loader import LXMLLoadStage validator_construction_pipeline = PipelineEngine(["schema_path", "datachecks_path"]) validator_construction_pipeline.add_stages([ LXMLLoadStage("schema"), LXMLLoadStage("datachecks"), ValidatorConstructionStage(), ]) validation_pipeline = PipelineEngine(["board_path", "scenario_path", "schema_etree", "validator"]) validation_pipeline.add_stages([ XMLLoadStage("board"), XMLLoadStage("scenario"), DefaultValuePopulatingStage(), SyntacticValidationStage(), SemanticValidationStage(), ReportValidationResultStage(), ]) obj = PipelineObject(schema_path = args.schema, datachecks_path = args.datachecks) validator_construction_pipeline.run(obj) if args.board and args.scenario: nr_all_errors = validate_one(validation_pipeline, obj, args.board, args.scenario) elif args.board: nr_all_errors = validate_board(validation_pipeline, obj, args.board) else: nr_all_errors = validate_all(validation_pipeline, obj, os.path.join(config_tools_dir, "data")) sys.exit(1 if nr_all_errors > 0 else 0) if __name__ == "__main__": config_tools_dir = os.path.join(os.path.dirname(__file__), "..") schema_dir = os.path.join(config_tools_dir, "schema") parser = argparse.ArgumentParser() parser.add_argument("board", nargs="?", type=existing_file_type(parser), help="the board XML file to be validated") parser.add_argument("scenario", nargs="?", type=existing_file_type(parser), help="the scenario XML file to be validated") parser.add_argument("--loglevel", default="warning", type=log_level_type(parser), help="choose log level, e.g. debug, info, warning or error") parser.add_argument("--schema", default=os.path.join(schema_dir, "config.xsd"), help="the XML schema that defines the syntax of scenario XMLs") parser.add_argument("--datachecks", default=os.path.join(schema_dir, "datachecks.xsd"), help="the XML schema that defines the semantic rules against board and scenario data") args = parser.parse_args() logging.basicConfig(level=args.loglevel.upper()) main(args)