acrn-hypervisor/misc/config_tools/config_app/views.py
Liu Long 3f4ea38158 ACRN: misc: Unify terminology for service vm/user vm
Rename SOS_VM type to SERVICE_VM
rename UOS to User VM in XML description
rename uos_thread_pid to user_vm_thread_pid
rename devname_uos to devname_user_vm
rename uosid to user_vmid
rename UOS_ACK to USER_VM_ACK
rename SOS_VM_CONFIG_CPU_AFFINITY to SERVICE_VM_CONFIG_CPU_AFFINITY
rename SOS_COM to SERVICE_VM_COM
rename SOS_UART1_VALID_NUM" to SERVICE_VM_UART1_VALID_NUM
rename SOS_BOOTARGS_DIFF to SERVICE_VM_BOOTARGS_DIFF
rename uos to user_vm in launch script and xml

Tracked-On: #6744
Signed-off-by: Liu Long <long.liu@linux.intel.com>
Reviewed-by: Geoffroy Van Cutsem <geoffroy.vancutsem@intel.com>
2021-11-02 10:00:55 +08:00

1355 lines
60 KiB
Python

# Copyright (C) 2019 Intel Corporation.
# SPDX-License-Identifier: BSD-3-Clause
"""View pages for config app.
"""
import os, copy
from datetime import datetime
from shutil import copyfile, move
# flask: Copyright 2010 Pallets
# SPDX-License-Identifier: BSD-3-Clause
# Refer to https://github.com/pallets/flask/blob/master/LICENSE.rst for the permission notice.
from flask import request, render_template, Blueprint, redirect, url_for, current_app
# werkzeug: Copyright 2007 Pallets
# SPDX-License-Identifier: BSD-3-Clause
# Refer to https://github.com/pallets/werkzeug/blob/master/LICENSE.rst for the permission notice.
from werkzeug.utils import secure_filename
from controller import *
from scenario_config.scenario_cfg_gen import get_scenario_item_values
from scenario_config.scenario_cfg_gen import validate_scenario_setting
from launch_config.launch_cfg_gen import get_launch_item_values
from launch_config.launch_cfg_gen import validate_launch_setting
from library.common import MAX_VM_NUM
import scenario_config.default_populator as default_populator
CONFIG_APP = Blueprint('CONFIG_APP', __name__, template_folder='templates')
@CONFIG_APP.route('/')
def index():
"""
render the index page
:return: the render template of index page
"""
return redirect(url_for('CONFIG_APP.scenarios'))
@CONFIG_APP.route('/scenario', methods=['GET'])
def scenarios():
"""
render the scenario parent setting page
:return: the render template of scenario setting parent page
"""
board_info, board_type, scenario_config, launch_config = get_xml_configs()
(bios_info, base_board_info) = get_board_info(board_info)
scenario_list = ([], [])
launch_list = ([], [])
if board_type is not None:
default_xml_config = XmlConfig(os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type))
xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'), board_type))
generic_config_list = get_generic_config_list()
if default_xml_config.list_all(xml_type='scenario')[0]:
scenario_list = (
default_xml_config.list_all(xml_type='scenario')[0], xml_config.list_all(xml_type='scenario')[0])
else:
scenario_list = (
generic_config_list[0], xml_config.list_all(xml_type='scenario')[0])
if default_xml_config.list_all(xml_type='uos_launcher')[0]:
launch_list = (
default_xml_config.list_all(xml_type='uos_launcher')[0],
xml_config.list_all(xml_type='uos_launcher')[0])
else:
launch_list = (
generic_config_list[1], xml_config.list_all(xml_type='uos_launcher')[0])
return render_template('scenario.html', board_info_list=get_board_list(),
board_info=board_info, board_type=board_type,
bios_info=bios_info, base_board_info=base_board_info,
scenarios=scenario_list,
launches=launch_list,
scenario='', root=None)
@CONFIG_APP.route('/scenario/<scenario_name>', methods=['GET'])
def scenario(scenario_name):
"""
render the specified scenario setting page
:param scenario_name: the scenario type
:return: the render template of the specified scenario setting page
"""
board_info, board_type, scenario_config, launch_config = get_xml_configs()
(bios_info, base_board_info) = get_board_info(board_info)
scenario_list = ([], [])
launch_list = ([], [])
if board_type is not None:
default_xml_config = XmlConfig(os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type))
xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'), board_type))
generic_config_list = get_generic_config_list()
if default_xml_config.list_all(xml_type='scenario')[0]:
scenario_list = (
default_xml_config.list_all(xml_type='scenario')[0], xml_config.list_all(xml_type='scenario')[0])
else:
scenario_list = (
generic_config_list[0], xml_config.list_all(xml_type='scenario')[0])
if default_xml_config.list_all(xml_type='uos_launcher')[0]:
launch_list = (
default_xml_config.list_all(xml_type='uos_launcher')[0],
xml_config.list_all(xml_type='uos_launcher')[0])
else:
launch_list = (
generic_config_list[1], xml_config.list_all(xml_type='uos_launcher')[0])
xpath_dict = get_xpath_dict_of_xsd()
current_app.config.update(SCENARIO=scenario_name)
scenario_config.set_curr(scenario_name)
scenario_item_values = {}
if board_info is not None and board_type is not None:
scenario_file_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_name + '.xml')
if os.path.isfile(scenario_file_path):
scenario_item_values = get_scenario_item_values(
os.path.join(current_app.config.get('CONFIG_PATH'), board_type, board_info+'.xml'),
scenario_file_path)
for xpath in xpath_dict:
if xpath_dict[xpath]['enumeration'] and len(xpath.split('/')) > 2:
scenario_item_values[','.join(xpath.split('/')[2:])] = xpath_dict[xpath]['enumeration']
return render_template('scenario.html', board_info_list=get_board_list(),
board_info=board_info, board_type=board_type,
bios_info=bios_info, base_board_info=base_board_info,
scenarios=scenario_list,
launches=launch_list,
scenario=scenario_name, root=scenario_config.get_curr_root(),
scenario_item_values=scenario_item_values,
xpath_dict=xpath_dict)
@CONFIG_APP.route('/launch', methods=['GET'])
def launches():
"""
render the parent launch setting page
:return: the render template of launch setting page
"""
board_info, board_type, scenario_config, launch_config = get_xml_configs()
(bios_info, base_board_info) = get_board_info(board_info)
scenario_list = ([], [])
launch_list = ([], [])
if board_type is not None:
default_xml_config = XmlConfig(os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type))
xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'), board_type))
generic_config_list = get_generic_config_list()
if default_xml_config.list_all(xml_type='scenario')[0]:
scenario_list = (
default_xml_config.list_all(xml_type='scenario')[0], xml_config.list_all(xml_type='scenario')[0])
else:
scenario_list = (
generic_config_list[0], xml_config.list_all(xml_type='scenario')[0])
if default_xml_config.list_all(xml_type='uos_launcher')[0]:
launch_list = (
default_xml_config.list_all(xml_type='uos_launcher')[0],
xml_config.list_all(xml_type='uos_launcher')[0])
else:
launch_list = (
generic_config_list[1], xml_config.list_all(xml_type='uos_launcher')[0])
return render_template('launch.html', board_info_list=get_board_list(),
board_info=board_info, board_type=board_type,
bios_info=bios_info, base_board_info=base_board_info,
scenarios=scenario_list,
launches=launch_list,
launch='', root=None)
@CONFIG_APP.route('/launch/<launch_name>', methods=['GET'])
def launch(launch_name):
"""
render the specified launch setting page
:param launch_name: the launch type
:return: the render template of specified launch setting page
"""
board_info, board_type, scenario_config, launch_config = get_xml_configs()
(bios_info, base_board_info) = get_board_info(board_info)
scenario_list = ([], [])
launch_list = ([], [])
if board_type is not None:
default_xml_config = XmlConfig(os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type))
xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'), board_type))
generic_config_list = get_generic_config_list()
if default_xml_config.list_all(xml_type='scenario')[0]:
scenario_list = (
default_xml_config.list_all(xml_type='scenario')[0], xml_config.list_all(xml_type='scenario')[0])
else:
scenario_list = (
generic_config_list[0], xml_config.list_all(xml_type='scenario')[0])
if default_xml_config.list_all(xml_type='uos_launcher')[0]:
launch_list = (
default_xml_config.list_all(xml_type='uos_launcher')[0],
xml_config.list_all(xml_type='uos_launcher')[0])
else:
launch_list = (
generic_config_list[1], xml_config.list_all(xml_type='uos_launcher')[0])
launch_config.set_curr(launch_name)
launch_item_values = {}
scenario_name = launch_config.get_curr_root().attrib['scenario']
current_app.config.update(SCENARIO=scenario_name)
scenario_name = current_app.config.get('SCENARIO')
scenario_file = None
if board_info is not None and scenario_name is not None:
scenario_file = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_name+'.xml')
if not os.path.isfile(scenario_file):
scenario_file = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_name + '.xml')
if not os.path.isfile(scenario_file):
scenario_file = None
launch_item_values = get_launch_item_values(
os.path.join(current_app.config.get('CONFIG_PATH'), board_type, board_info + '.xml'), scenario_file)
scenario_name = None
launch_config_root = launch_config.get_curr_root()
if launch_config_root is not None and 'scenario' in launch_config_root.attrib:
scenario_name = launch_config_root.attrib['scenario']
post_launch_vm_list = get_post_launch_vm_list(scenario_name)
return render_template('launch.html', board_info_list=get_board_list(),
board_info=board_info, board_type=board_type,
bios_info=bios_info, base_board_info=base_board_info,
scenarios=scenario_list,
launches=launch_list,
launch=launch_name, root=launch_config.get_curr_root(),
scenario=current_app.config.get('SCENARIO'),
post_launch_vm_list=post_launch_vm_list,
launch_item_values=launch_item_values)
@CONFIG_APP.route('/save_scenario', methods=['POST'])
def save_scenario():
"""
save scenario setting.
:return: the error list for the edited scenario setting.
"""
scenario_config_data = request.json if request.method == "POST" else request.args
xml_configs = get_xml_configs()
board_type = xml_configs[1]
scenario_config = xml_configs[3]
if board_type is None or xml_configs[0] is None:
return {'status': 'fail',
'error_list': {'error': 'Please select the board info before this operation.'}}
scenario_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_type)
old_scenario_name = scenario_config_data['old_scenario_name']
scenario_config.set_curr(old_scenario_name)
for vm in list(scenario_config.get_curr_root()):
if vm.tag == 'vm':
for elem in list(vm):
if elem.tag in ['communication_vuart', 'console_vuart']:
vuart_key = vm.tag+':id='+vm.attrib['id']+','+elem.tag + ':id='+elem.attrib['id']
delete_flag = True
for key in scenario_config_data:
if key.find(vuart_key) >= 0:
delete_flag = False
break
if delete_flag:
scenario_config.delete_curr_elem(*tuple(vuart_key.split(',')))
for key in scenario_config_data:
if scenario_config_data[key] in [None, 'None']:
scenario_config_data[key] = ''
if key not in ['old_scenario_name', 'new_scenario_name', 'generator', 'add_vm_type', 'src_path']:
if isinstance(scenario_config_data[key], list):
scenario_config.set_curr_list(scenario_config_data[key], *tuple(key.split(',')))
elif key.find('communication_vuart') >= 0:
try:
scenario_config.get_curr_elem(*tuple(key.split(',')))
except:
communication_vuart_id = key.split(',')[1].split('=')[1].strip()
communication_vuart_1_item = scenario_config.get_curr_elem(
*tuple([key.split(',')[0], 'communication_vuart:id=1']))
communication_vuart_i_item = copy.deepcopy(communication_vuart_1_item)
communication_vuart_i_item.attrib['id'] = communication_vuart_id
curr_index = 0
elem_list = list(scenario_config.get_curr_elem(key.split(',')[0]))
for elem in reversed(elem_list):
curr_index += 1
if elem.tag in ['communication_vuart', 'console_vuart']:
break
scenario_config.insert_curr_elem(len(elem_list)-curr_index+1, communication_vuart_i_item, key.split(',')[0])
finally:
scenario_config.set_curr_value(scenario_config_data[key], *tuple(key.split(',')))
else:
scenario_config.set_curr_value(scenario_config_data[key], *tuple(key.split(',')))
# how to put added vuart under vuart!!!
# delete vuart without key!!!
generator = scenario_config_data['generator']
if generator is not None:
if generator == 'remove_vm_kata':
scenario_config.delete_curr_key('vm:desc=specific for Kata')
assign_vm_id(scenario_config)
elif generator == 'add_vm_kata':
vm_list = []
for vm in list(scenario_config.get_curr_root()):
if vm.tag == 'vm':
vm_list.append(vm.attrib['id'])
if len(vm_list) >= MAX_VM_NUM:
return {'status': 'fail',
'error_list': {'error': 'Can not add a new VM. Max VM number is {}.'.format(MAX_VM_NUM)}}
# clone vm kata from generic config
generic_scenario_config = get_generic_scenario_config(scenario_config)
generic_scenario_config_root = generic_scenario_config.get_curr_root()
elem_kata = None
for vm in list(generic_scenario_config_root):
if 'desc' in vm.attrib and vm.attrib['desc'] == 'specific for Kata':
elem_kata = vm
break
if elem_kata is not None:
scenario_config.clone_curr_elem(elem_kata)
assign_vm_id(scenario_config)
elif generator.startswith('add_vm:'):
vm_list = []
for vm in list(scenario_config.get_curr_root()):
if vm.tag == 'vm':
vm_list.append(vm.attrib['id'])
if len(vm_list) >= MAX_VM_NUM:
return {'status': 'fail',
'error_list': {'error': 'Can not add a new VM. Max VM number is {}.'.format(MAX_VM_NUM)}}
curr_vm_id = generator.split(':')[1]
add_vm_type = scenario_config_data['add_vm_type']
add_scenario_config = get_generic_scenario_config(scenario_config, add_vm_type)
vm_to_add = []
if str(curr_vm_id) == '-1':
curr_vm_index = 1
else:
curr_vm_index = len(vm_list) + 1
for i in range(len(vm_list)):
if curr_vm_id == vm_list[i]:
curr_vm_index = i + 2
break
if add_scenario_config is not None and add_scenario_config.tag == 'vm':
for i in range(0, MAX_VM_NUM):
if str(i) not in vm_list:
break
add_scenario_config.attrib['id'] = str(i)
vm_to_add.append(add_scenario_config)
for vm in vm_to_add:
scenario_config.insert_curr_elem(curr_vm_index, vm)
curr_vm_index += 1
assign_vm_id(scenario_config)
elif generator.startswith('remove_vm:'):
remove_vm_id = generator.split(':')[1]
scenario_config.delete_curr_key('vm:id='+remove_vm_id.strip())
assign_vm_id(scenario_config)
scenario_config.set_curr_attr('board', board_type)
scenario_config.set_curr_attr('scenario', scenario_config_data['new_scenario_name'])
tmp_scenario_file = os.path.join(scenario_path,
'tmp_'+scenario_config_data['new_scenario_name']+'.xml')
scenario_config.save('tmp_'+scenario_config_data['new_scenario_name'])
# call validate function
error_list = {}
new_scenario_name = scenario_config_data['new_scenario_name']
rename = False
try:
if generator is None or not (generator.startswith('add_vm:') or generator.startswith('remove_vm:')):
(error_list, vm_info) = validate_scenario_setting(
os.path.join(current_app.config.get('CONFIG_PATH'), board_type, xml_configs[0]+'.xml'),
tmp_scenario_file)
except Exception as error:
if os.path.isfile(tmp_scenario_file):
os.remove(tmp_scenario_file)
return {'status': 'fail', 'file_name': new_scenario_name,
'rename': rename, 'error_list': {'error': str(error)}}
if not error_list:
scenario_config.save(new_scenario_name)
if os.path.isfile(tmp_scenario_file):
os.remove(tmp_scenario_file)
src_path = scenario_config_data['src_path']
if src_path is not None and str(src_path).strip() != '':
if not os.path.isdir(src_path):
os.makedirs(src_path)
copyfile(os.path.join(scenario_path, scenario_config_data['new_scenario_name']+'.xml'),
os.path.join(src_path, scenario_config_data['new_scenario_name']+'.xml'))
return {'status': 'success', 'file_name': new_scenario_name,
'rename': rename, 'error_list': error_list}
@CONFIG_APP.route('/save_launch', methods=['POST'])
def save_launch():
"""
save launch setting.
:return: the error list of the edited launch setting.
"""
launch_config_data = request.json if request.method == "POST" else request.args
xml_configs = get_xml_configs()
launch_config = xml_configs[3]
if xml_configs[1] is None or xml_configs[0] is None:
return {'status': 'fail',
'error_list': {'error': 'Please select the board info before this operation.'}}
old_launch_name = launch_config_data['old_launch_name']
launch_config.set_curr(old_launch_name)
scenario_name = launch_config_data['scenario_name']
scenario_file_path = os.path.join(current_app.config.get('CONFIG_PATH'),
current_app.config.get('BOARD_TYPE'),
scenario_name + '.xml')
for key in launch_config_data:
if launch_config_data[key] in [None, 'None']:
launch_config_data[key] = ''
if key not in ['old_launch_name', 'new_launch_name', 'generator', 'add_launch_type', 'scenario_name', 'src_path']:
if isinstance(launch_config_data[key], list):
launch_config.set_curr_list(launch_config_data[key], *tuple(key.split(',')))
else:
launch_config.set_curr_value(launch_config_data[key], *tuple(key.split(',')))
generator = launch_config_data['generator']
if generator is not None:
if generator.startswith('add_vm:'):
vm_list = []
for vm in list(launch_config.get_curr_root()):
if vm.tag == 'uos':
vm_list.append(vm.attrib['id'])
if len(vm_list) >= MAX_VM_NUM:
return {'status': 'fail',
'error_list': {'error': 'Can not add a new VM. Max VM number is {}.'.format(MAX_VM_NUM)}}
curr_vm_id = generator.split(':')[1].strip()
add_launch_type = launch_config_data['add_launch_type']
if add_launch_type is None or len(add_launch_type.split('ID :')) < 2:
return {'status': 'fail',
'error_list': {'error': 'No VM is selected. '
'Please select a scenario with available post launched VMs.'}}
add_vm_id = add_launch_type.split('ID :')[1].replace(')', '').strip()
add_launch_type = 'LAUNCH_' + add_launch_type.split()[0]
if add_launch_type == 'LAUNCH_KATA_VM':
add_launch_type = 'LAUNCH_POST_STD_VM'
add_launch_id = 1
post_vm_list = get_post_launch_vm_list(scenario_name)
for i in range(len(post_vm_list)):
vm_info = post_vm_list[i]
if add_vm_id == vm_info[0]:
add_launch_id = i + 1
break
add_launch_config = get_generic_scenario_config(launch_config, add_launch_type)
vm_to_add = []
if str(curr_vm_id) == '-1':
curr_vm_index = len(vm_list)
else:
curr_vm_index = 0
for i in range(len(vm_list)):
if curr_vm_id == vm_list[i]:
curr_vm_index = i + 1
break
if add_launch_config is not None and add_launch_config.tag == 'uos':
for i in range(1, MAX_VM_NUM):
if str(i) not in vm_list:
break
add_launch_config.attrib['id'] = str(add_launch_id)
vm_to_add.append(add_launch_config)
for vm in vm_to_add:
launch_config.insert_curr_elem(curr_vm_index, vm)
elif generator.startswith('remove_vm:'):
remove_vm_id = generator.split(':')[1]
launch_config.delete_curr_key('uos:id='+remove_vm_id.strip())
launch_config.set_curr_attr('scenario', scenario_name)
launch_config.set_curr_attr('board', current_app.config.get('BOARD_TYPE'))
launch_config.set_curr_attr('uos_launcher', str(len(list(launch_config.get_curr_root()))))
tmp_launch_file = os.path.join(current_app.config.get('CONFIG_PATH'), xml_configs[1],
'tmp_' + launch_config_data['new_launch_name'] + '.xml')
launch_config.save('tmp_' + launch_config_data['new_launch_name'])
# call validate function
error_list = {}
rename = False
try:
if generator is None or not (generator.startswith('add_vm:') or generator.startswith('remove_vm:')):
(error_list, pthru_sel, virtio, dm_value) = validate_launch_setting(
os.path.join(current_app.config.get('CONFIG_PATH'), xml_configs[1], xml_configs[0]+'.xml'),
scenario_file_path,
tmp_launch_file)
except Exception as error:
if os.path.isfile(tmp_launch_file):
os.remove(tmp_launch_file)
return {'status': 'fail', 'file_name': launch_config_data['new_launch_name'],
'rename': rename, 'error_list': {'launch config error': str(error)}}
if not error_list:
launch_config.save(launch_config_data['new_launch_name'])
if os.path.isfile(tmp_launch_file):
os.remove(tmp_launch_file)
src_path = launch_config_data['src_path']
if src_path is not None and str(src_path).strip() != '':
if not os.path.isdir(src_path):
os.makedirs(src_path)
copyfile(os.path.join(current_app.config.get('CONFIG_PATH'), current_app.config.get('BOARD_TYPE'),
launch_config_data['new_launch_name'] + '.xml'),
os.path.join(src_path, launch_config_data['new_launch_name'] + '.xml'))
return {'status': 'success', 'file_name': launch_config_data['new_launch_name'],
'rename': rename, 'error_list': error_list}
@CONFIG_APP.route('/check_setting_exist', methods=['POST'])
def check_setting_exist():
"""
check the setting exist or not.
:return: setting exist or not.
"""
config_data = request.json if request.method == "POST" else request.args
src_path = config_data['src_path']
board_info = current_app.config.get('BOARD_TYPE')
setting_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_info)
if src_path is not None and str(src_path).strip() != '':
setting_path = src_path
if 'old_scenario_name' in list(config_data.keys()) and 'new_scenario_name' in list(config_data.keys()):
if config_data['old_scenario_name'] != config_data['new_scenario_name'] and \
os.path.isfile(os.path.join(setting_path, config_data['new_scenario_name'] + '.xml')):
return {'exist': 'yes'}
else:
return {'exist': 'no'}
elif 'old_launch_name' in list(config_data.keys()) and 'new_launch_name' in list(config_data.keys()):
if config_data['old_launch_name'] != config_data['new_launch_name'] and \
os.path.isfile(os.path.join(setting_path, config_data['new_launch_name'] + '.xml')):
return {'exist': 'yes'}
else:
return {'exist': 'no'}
elif 'create_name' in list(config_data.keys()):
if os.path.isfile(os.path.join(setting_path, config_data['create_name'] + '.xml')):
return {'exist': 'yes'}
else:
return {'exist': 'no'}
else:
return {'exist': 'no'}
@CONFIG_APP.route('/create_setting', methods=['POST'])
def create_setting():
"""
create a new scenario or launch setting.
:return: the status and error list of the created scenario or launch setting.
"""
create_config_data = request.json if request.method == "POST" else request.args
mode = create_config_data['mode']
default_name = create_config_data['default_name']
create_name = create_config_data['create_name']
src_path = create_config_data['src_path']
setting_type = create_config_data['type']
xml_configs = get_xml_configs()
board_info = xml_configs[0]
board_type = xml_configs[1]
scenario_config = xml_configs[2]
launch_config = xml_configs[3]
if board_type is None or xml_configs[0] is None:
return {'status': 'fail',
'error_list': {'error': 'Please select the board info before this operation.'}}
setting_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_info)
if not os.path.isdir(setting_path):
os.makedirs(setting_path)
if src_path is not None and str(src_path).strip() != '':
setting_path = src_path
if not os.path.isdir(setting_path):
os.makedirs(setting_path)
if create_config_data['type'] == 'launch':
launch_file = os.path.join(setting_path, create_name + '.xml')
if mode == 'create':
template_file_name = 'hybrid_launch_2uos'
src_file_name = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), 'generic_board', template_file_name + '.xml')
else: #load
src_file_name = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type, default_name + '.xml')
if os.path.isfile(src_file_name):
if os.path.isfile(launch_file):
os.remove(launch_file)
copyfile(src_file_name,
os.path.join(current_app.config.get('CONFIG_PATH'), board_type, create_name + '.xml'))
launch_config.set_curr(create_name)
if mode == 'create':
launch_config.delete_curr_key('uos:id=2')
launch_config.delete_curr_key('uos:id=1')
launch_config.save(create_name)
if os.path.normcase(setting_path) != os.path.normcase(os.path.join(current_app.config.get('CONFIG_PATH'), board_type)):
copyfile(os.path.join(current_app.config.get('CONFIG_PATH'), board_type, create_name + '.xml'),
os.path.join(setting_path, create_name + '.xml'))
return {'status': 'success', 'setting': create_name, 'error_list': {}}
elif create_config_data['type'] == 'scenario':
scenario_file = os.path.join(setting_path, create_name + '.xml')
if mode == 'create':
template_file_name = 'shared'
src_file_name = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), 'generic_board', template_file_name + '.xml')
else: # load
src_file_name = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), board_type,
default_name + '.xml')
if not os.path.isfile(src_file_name):
src_file_name=os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), 'generic_board', default_name + '.xml')
if os.path.isfile(src_file_name):
xsd_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'schema', 'config.xsd')
out_xml = os.path.join(os.path.dirname(os.path.abspath(__file__)), create_name + '.xml')
default_populator.main(xsd_path, src_file_name, out_xml)
if os.path.isfile(scenario_file):
os.remove(scenario_file)
copyfile(out_xml,
os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
create_name + '.xml'))
os.remove(out_xml)
if mode == 'create':
# update RDT->CLOS_MASK according to board xml
scenario_config.set_curr(create_name)
elem_clos_max = scenario_config.get_curr_elem('hv', 'FEATURES', 'RDT', 'CLOS_MASK')
# elem_mba_delay = scenario_config.get_curr_elem('hv', 'FEATURES', 'RDT', 'MBA_DELAY')
scenario_config.delete_curr_elem('hv', 'FEATURES', 'RDT', 'CLOS_MASK')
# scenario_config.delete_curr_elem('hv', 'FEATURES', 'RDT', 'MBA_DELAY')
cdp_enabled = scenario_config.get_curr_value('hv', 'FEATURES', 'RDT', 'CDP_ENABLED')
(num_clos_mask, num_mba_delay) = get_num_of_rdt_res(board_info, cdp_enabled)
for i in range(num_clos_mask):
scenario_config.clone_curr_elem(elem_clos_max, 'hv', 'FEATURES', 'RDT')
# for i in range(num_mba_delay):
# scenario_config.clone_curr_elem(elem_mba_delay, 'hv', 'FEATURES', 'RDT')
for i in range(8):
scenario_config.delete_curr_key('vm:id={}'.format(i))
scenario_config = set_default_config(scenario_config)
scenario_config.save(create_name)
if os.path.normcase(setting_path) != os.path.normcase(os.path.join(current_app.config.get('CONFIG_PATH'), board_type)):
copyfile(os.path.join(current_app.config.get('CONFIG_PATH'), board_type, create_name + '.xml'),
os.path.join(setting_path, create_name + '.xml'))
return {'status': 'success', 'setting': create_name, 'error_list': {}}
else:
return {'status': 'fail', 'error_list': {'name': 'Unsupported setting type. '}}
@CONFIG_APP.route('/remove_setting', methods=['POST'])
def remove_setting():
"""
remove current setting from config app
:return: the return message
"""
remove_config_data = request.json if request.method == "POST" else request.args
old_setting_name = remove_config_data['old_setting_name']
if current_app.config.get('BOARD_TYPE') is None:
return {'status': 'Board info not set before remove current setting'}
old_setting_path = os.path.join(current_app.config.get('CONFIG_PATH'),
current_app.config.get('BOARD_TYPE'),
old_setting_name + '.xml')
if os.path.isfile(old_setting_path):
os.remove(old_setting_path)
return {'status': 'success'}
@CONFIG_APP.route('/generate_src', methods=['POST'])
def generate_src():
"""
generate board src or scenario src
:return: the error message
"""
generator_config_data = request.json if request.method == "POST" else request.args
src_type = generator_config_data['type']
board_info = generator_config_data['board_info']
board_type = current_app.config.get('BOARD_TYPE')
board_info_xml = os.path.join(current_app.config.get('CONFIG_PATH'),
board_type, board_info+'.xml')
scenario_setting = generator_config_data['scenario_setting']
scenario_setting_xml = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_setting+'.xml')
src_path = generator_config_data['src_path']
if src_path is None or str(src_path).strip() == '':
src_path = os.path.join(current_app.config.get('CONFIG_PATH'))
launch_setting_xml = None
if 'launch_setting' in generator_config_data:
launch_setting = generator_config_data['launch_setting']
launch_setting_xml = os.path.join(current_app.config.get('CONFIG_PATH'),
board_type, launch_setting + '.xml')
msg = {}
error_list = {}
status = 'success'
if src_type == 'generate_config_src':
try:
from board_config.board_cfg_gen import ui_entry_api
error_list = ui_entry_api(board_info_xml, scenario_setting_xml, src_path)
except Exception as error:
status = 'fail'
error_list = {'board setting error': str(error)}
try:
from scenario_config.scenario_cfg_gen import ui_entry_api
error_list = ui_entry_api(board_info_xml, scenario_setting_xml, src_path)
except Exception as error:
status = 'fail'
error_list = {'scenario setting error': str(error)}
elif src_type == 'generate_launch_script':
scenario_setting_xml = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_setting + '.xml')
try:
from launch_config.launch_cfg_gen import ui_entry_api
error_list = ui_entry_api(board_info_xml, scenario_setting_xml, launch_setting_xml, src_path)
except Exception as error:
status = 'fail'
error_list = {'launch setting error': str(error)}
else:
status = 'fail'
error_list = {'error': 'generator type not specified'}
msg = {'status': status, 'error_list': error_list}
return msg
@CONFIG_APP.route('/upload_board_info', methods=['POST'])
def upload_board_info():
"""
upload board info xml file
:return: the upload status
"""
if request.method == 'POST':
if 'file' not in request.files:
return {'status': 'Error: no file uploaded'}
file = request.files['file']
if file and '.' in file.filename and file.filename.rsplit('.', 1)[1] in ['xml']:
filename = secure_filename(file.filename)
tmp_filename = 'tmp_' + filename
config_path = current_app.config.get('CONFIG_PATH')
default_config_path = current_app.config.get('DEFAULT_CONFIG_PATH')
save_tmp_board_path = os.path.join(config_path, tmp_filename)
file.save(save_tmp_board_path)
board_type_list = []
for config_name in os.listdir(default_config_path):
if os.path.isdir(os.path.join(default_config_path, config_name)) \
and config_name not in ['generic_board', 'sample_launch_scripts']:
board_type_list.append(config_name)
board_info_config = XmlConfig(config_path)
board_info_config.set_curr(tmp_filename.rsplit('.', 1)[0])
board_info_root = board_info_config.get_curr_root()
board_type = None
if board_info_root is not None and 'board' in board_info_root.attrib \
and 'scenario' not in board_info_root.attrib \
and 'uos_launcher' not in board_info_root.attrib:
board_type = board_info_root.attrib['board']
if not board_type:
os.remove(save_tmp_board_path)
return {'status': 'Error on parsing Board info\n'
'check the xml syntax and whether there is only the board '
'attribute in the board info file'}
board_path =os.path.join(config_path, board_type)
if not os.path.isdir(board_path):
os.makedirs(board_path)
move(save_tmp_board_path, os.path.join(board_path, filename))
info = 'updated'
if board_type not in board_type_list:
info = board_type
for generic_name in os.listdir(os.path.join(default_config_path, 'generic_board')):
generic_file = os.path.join(default_config_path, 'generic_board', generic_name)
if os.path.isfile(generic_file):
new_file = os.path.join(board_path, generic_name)
copyfile(generic_file, new_file)
xml_config = XmlConfig(board_path)
xml_config.set_curr(generic_name.rsplit('.', 1)[0])
xml_config.set_curr_attr('board', board_type)
xml_config_root = xml_config.get_curr_root()
if 'scenario' not in xml_config_root.attrib and 'uos_launcher' not in xml_config_root.attrib:
os.remove(new_file)
continue
# update RDT->CLOS_MASK according to board xml
# if 'board' in xml_config_root.attrib and 'scenario' in xml_config_root.attrib \
# and 'uos_launcher' not in xml_config_root.attrib:
# cdp_enabled = xml_config.get_curr_value('hv', 'FEATURES', 'RDT', 'CDP_ENABLED')
# (num_clos_mask, num_mba_delay) = \
# get_num_of_rdt_res(filename.rsplit('.', 1)[0], cdp_enabled)
# elem_clos_max = xml_config.get_curr_elem('hv', 'FEATURES', 'RDT', 'CLOS_MASK')
# elem_mba_delay = xml_config.get_curr_elem('hv', 'FEATURES', 'RDT', 'MBA_DELAY')
# xml_config.delete_curr_elem('hv', 'FEATURES', 'RDT', 'CLOS_MASK')
# xml_config.delete_curr_elem('hv', 'FEATURES', 'RDT', 'MBA_DELAY')
# for i in range(num_clos_mask):
# xml_config.clone_curr_elem(elem_clos_max, 'hv', 'FEATURES', 'RDT')
# for i in range(num_mba_delay):
# xml_config.clone_curr_elem(elem_mba_delay, 'hv', 'FEATURES', 'RDT')
# xml_config = set_default_config(xml_config)
xml_config.save(generic_name.rsplit('.', 1)[0], user_defined=False)
board_info = os.path.splitext(file.filename)[0]
current_app.config.update(BOARD_INFO=board_info)
current_app.config.update(BOARD_TYPE=board_type)
return {'status': 'success', 'info': info}
return {'status': 'Error: upload failed'}
@CONFIG_APP.route('/select_board', methods=['POST'])
def select_board():
"""
select one board info
:return: the selected board info
"""
data = request.json if request.method == "POST" else request.args
board_info = data['board_info']
current_app.config.update(BOARD_INFO=board_info)
board_type = get_board_type(board_info)
current_app.config.update(BOARD_TYPE=board_type)
if board_type:
return board_type
return ''
@CONFIG_APP.route('/upload_scenario', methods=['POST'])
def upload_scenario():
"""
upload scenario setting xml file
:return: the upload status
"""
if request.method == 'POST':
if 'file' not in request.files:
return {'status': 'no file uploaded'}
file = request.files['file']
if file and '.' in file.filename and file.filename.rsplit('.', 1)[1] in ['xml']:
filename = secure_filename(file.filename)
scenario_file_name = os.path.splitext(file.filename)[0]
board_type = current_app.config.get('BOARD_TYPE')
tmp_scenario_name = 'tmp_' + scenario_file_name + '.xml'
tmp_scenario_file = os.path.join(current_app.config.get('CONFIG_PATH'), board_type, tmp_scenario_name)
tmp_scenario_folder = os.path.dirname(tmp_scenario_file)
if not os.path.exists(tmp_scenario_folder):
os.makedirs(tmp_scenario_folder)
if os.path.isfile(tmp_scenario_file):
os.remove(tmp_scenario_file)
file.save(tmp_scenario_file)
tmp_xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'),
board_type))
tmp_xml_config.set_curr(tmp_scenario_name[:-4])
status = None
if tmp_xml_config.get_curr_root() is None:
status = 'Error on parsing the scenario xml file, \n' \
'check the xml syntax and config items.'
else:
tmp_root = tmp_xml_config.get_curr_root()
if 'board' not in tmp_root.attrib or 'scenario' not in tmp_root.attrib \
or 'uos_launcher' in tmp_root.attrib:
status = 'Invalid scenario xml file, \nonly board and scenario ' \
'need to be configured.'
elif tmp_root.attrib['board'] != current_app.config.get('BOARD_TYPE'):
status = 'Current board: {} mismatched with the board in the scenario file,' \
'\nplease reselect or upload the board info: {}'\
.format(current_app.config.get('BOARD_TYPE'), tmp_root.attrib['board'])
if status is not None:
if os.path.isfile(tmp_scenario_file):
os.remove(tmp_scenario_file)
return {'status': status}
error_list = {}
new_scenario_name = scenario_file_name
rename = False
if not error_list:
new_scenario_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
scenario_file_name + '.xml')
if os.path.isfile(new_scenario_path):
new_scenario_name = new_scenario_name + '_' \
+ datetime.now().strftime('%Y%m%d%H%M%S')
rename = True
os.rename(tmp_scenario_file, os.path.join(current_app.config.get('CONFIG_PATH'),
board_type, new_scenario_name + '.xml'))
return {'status': 'success', 'file_name': new_scenario_name,
'rename': rename, 'error_list': error_list}
return {'status': 'unsupported method'}
@CONFIG_APP.route('/upload_launch', methods=['POST'])
def upload_launch():
"""
upload scenario setting xml file
:return: the upload status
"""
if request.method == 'POST':
if 'file' not in request.files:
return {'status': 'no file uploaded'}
file = request.files['file']
if file and '.' in file.filename and file.filename.rsplit('.', 1)[1] in ['xml']:
filename = secure_filename(file.filename)
launch_file_name = os.path.splitext(file.filename)[0]
board_type = current_app.config.get('BOARD_TYPE')
tmp_launch_name = 'tmp_' + launch_file_name + '.xml'
tmp_launch_file = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
tmp_launch_name)
if os.path.isfile(tmp_launch_file):
os.remove(tmp_launch_file)
file.save(tmp_launch_file)
tmp_xml_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'),
board_type))
tmp_xml_config.set_curr(tmp_launch_name[:-4])
status = None
if tmp_xml_config.get_curr_root() is None:
status = 'Error on parsing the scenario xml file, \n' \
'check the xml syntax and config items.'
else:
tmp_root = tmp_xml_config.get_curr_root()
if 'board' not in tmp_root.attrib or 'scenario' not in tmp_root.attrib \
or 'uos_launcher' not in tmp_root.attrib:
status = 'Invalid launch xml file, \nboard, scenario,' \
'and uos_launcher need to be configured.'
elif tmp_root.attrib['board'] != current_app.config.get('BOARD_TYPE'):
status = 'Current board: {} mismatched with the board in the launch file,' \
'\nplease reselect or upload the board info: {}' \
.format(current_app.config.get('BOARD_TYPE'), tmp_root.attrib['board'])
if status is not None:
if os.path.isfile(tmp_launch_file):
os.remove(tmp_launch_file)
return {'status': status}
error_list = {}
new_launch_name = launch_file_name
rename = False
if not error_list:
new_launch_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_type,
launch_file_name + '.xml')
if os.path.isfile(new_launch_path):
new_launch_name = new_launch_name + '_' + \
datetime.now().strftime('%Y%m%d%H%M%S')
rename = True
os.rename(tmp_launch_file, os.path.join(current_app.config.get('CONFIG_PATH'),
board_type, new_launch_name + '.xml'))
return {'status': 'success', 'file_name': new_launch_name,
'rename': rename, 'error_list': error_list}
return {'status': 'unsupported method'}
@CONFIG_APP.route('/get_post_launch_vms', methods=['POST'])
def get_post_launch_vms():
"""
get vm list for the scenario setting
:return: the vm list
"""
data = request.json if request.method == "POST" else request.args
scenario_name = data['scenario_name']
current_app.config.update(SCENARIO=scenario_name)
vm_list = get_post_launch_vm_list(scenario_name)
user_vmid_list = []
launch_name = data['launch_name']
xml_configs = get_xml_configs()
launch_config = xml_configs[3]
launch_config.set_curr(launch_name)
if launch_config is not None and launch_config.get_curr_root() is not None:
for uos in list(launch_config.get_curr_root()):
if 'id' in uos.attrib:
user_vmid_list.append(int(uos.attrib['id'])-1)
vm_list_index = [i for i in range(len(vm_list))]
vm_list_index = set(vm_list_index)
user_vmid_list = set(user_vmid_list)
index = list(vm_list_index - user_vmid_list)
vm_list = [vm_list[i] for i in index]
return {'vm_list': vm_list}
@CONFIG_APP.route('/get_num_of_rdt_res_entries', methods=['POST'])
def get_num_of_rdt_res_entries():
"""
get the number of rdt res entries
:return: the number of CLOS_MASK and MBA_DELAY entries
"""
data = request.json if request.method == "POST" else request.args
cdp_enabled = data['cdp_enabled']
board_info = current_app.config.get('BOARD_INFO')
(num_clos_mask, num_mba_delay) = get_num_of_rdt_res(board_info, cdp_enabled)
return {'num_clos_mask': num_clos_mask, 'num_mba_delay': num_mba_delay}
def get_post_launch_vm_list(scenario_name):
"""
get post launch VM list
:param scenario_name:
:return: vm_list
"""
xml_configs = get_xml_configs()
scenario_config = xml_configs[3]
scenario_config.set_curr(scenario_name)
vm_list = []
if scenario_config is not None and scenario_config.get_curr_root() is not None:
for vm in list(scenario_config.get_curr_root()):
if vm.tag == 'vm' and 'id' in vm.attrib:
vm_id = vm.attrib['id']
for item in list(vm):
if item.tag == 'vm_type' and item.text in ['POST_STD_VM', 'POST_RT_VM']:
vm_list.append((vm_id, item.text))
break
return vm_list
def get_board_list():
"""
get all available board info files
:return: the file list of board info
"""
config_path = current_app.config.get('CONFIG_PATH')
board_type_list = []
for config_name in os.listdir(config_path):
if os.path.isdir(os.path.join(config_path, config_name)) \
and config_name not in ['generic_board', 'sample_launch_scripts']:
board_type_list.append(config_name)
return board_type_list
def get_xml_configs(user_defined=False):
"""
get xml config related variables
:return: board_info, board_config, scenario_config, launch_config
"""
config_path = None
board_info = current_app.config.get('BOARD_INFO')
board_type = get_board_type(board_info)
if board_type is not None:
config_path = os.path.join(current_app.config.get('CONFIG_PATH'), board_type)
scenario_config = XmlConfig(config_path, not user_defined)
launch_config = XmlConfig(config_path, not user_defined)
return board_info, board_type, scenario_config, launch_config
def get_generic_scenario_config(scenario_config, add_vm_type=None):
if add_vm_type is not None:
vm_dict = {
'PRE_STD_VM': ('partitioned', 'vm:id=0'),
'PRE_RT_VM': ('hybrid_rt', 'vm:id=0'),
'SAFETY_VM': ('hybrid', 'vm:id=0'),
'SERVICE_VM': ('shared', 'vm:id=0'),
'POST_STD_VM': ('shared', 'vm:id=1'),
'POST_RT_VM': ('shared', 'vm:id=2'),
'KATA_VM': ('shared', 'vm:id=7'),
'LAUNCH_POST_STD_VM': ('hybrid_launch_2uos', 'uos:id=1'),
'LAUNCH_POST_RT_VM': ('shared_launch_6uos', 'uos:id=2')
}
config_path = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), 'generic_board')
xml_path = os.path.join(config_path, vm_dict[add_vm_type][0] + '.xml')
if os.path.isfile(xml_path):
xsd_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'schema', 'config.xsd')
output_xml = os.path.join(config_path, vm_dict[add_vm_type][0] + '_1' + '.xml')
default_populator.main(xsd_path, xml_path, output_xml)
generic_scenario_config = XmlConfig(config_path)
generic_scenario_config.set_curr(vm_dict[add_vm_type][0] + '_1')
generic_scenario_config = set_default_config(generic_scenario_config)
os.remove(output_xml)
return generic_scenario_config.get_curr_elem(vm_dict[add_vm_type][1])
else:
return None
config_path = os.path.join(current_app.config.get('CONFIG_PATH'), 'generic_board')
generic_scenario_config = XmlConfig(config_path)
for file in os.listdir(config_path):
if os.path.isfile(os.path.join(config_path, file)) and \
os.path.splitext(file)[1] == '.xml':
generic_scenario_config.set_curr(os.path.splitext(file)[0])
generic_scenario_config_root = generic_scenario_config.get_curr_root()
if 'scenario' in generic_scenario_config_root.attrib \
and 'uos_launcher' not in generic_scenario_config_root.attrib \
and generic_scenario_config_root.attrib['scenario'] == \
scenario_config.get_curr_root().attrib['scenario']:
return generic_scenario_config
return None
def get_board_config(board_info):
"""
get board config
:param board_info: the board info file
:return: the board type
"""
board_config = None
if board_info is not None:
board_config = XmlConfig(os.path.join(current_app.config.get('CONFIG_PATH'), board_info), board_info)
board_config.set_curr(board_info)
return board_config
def get_board_type(board_info):
"""
get board info type
:param board_info: the board info file
:return: the board type
"""
board_config = get_board_config(board_info)
board_type = None
if board_config is not None:
board_info_root = board_config.get_curr_root()
if board_info_root is not None and 'board' in board_info_root.attrib:
board_type = board_info_root.attrib['board']
return board_type
def get_board_info(board_info):
"""
get board info type
:param board_info: the board info file
:return: the board info
"""
board_config = get_board_config(board_info)
bios_info = None
base_board_info = None
if board_config is not None:
board_info_root = board_config.get_curr_root()
if board_info_root is not None:
for item in list(board_info_root):
if item.tag == 'BIOS_INFO':
for line in item.text.split('\n'):
if line.strip() != '':
if bios_info is None:
bios_info = line.strip()
else:
bios_info += ('\n' + line.strip())
elif item.tag == 'BASE_BOARD_INFO':
for line in item.text.split('\n'):
if line.strip() != '':
if base_board_info is None:
base_board_info = line.strip()
else:
base_board_info += ('\n' + line.strip())
return (bios_info, base_board_info)
def get_num_of_rdt_res(board_file_name, cdp_enalbed):
"""
get the number of rdt res entries
:param board_file_name: the file name of the board
:param cdp_enalbed: cdp enalbed or not
:return: the number of rdt res entries
"""
dict_rdt_res_clos_max = get_board_rdt_res_clos_max(board_file_name)
num_clos_max = 0
num_mba_delay = 0
if 'MBA' not in dict_rdt_res_clos_max.keys():
num_mba_delay = 0
if 'L2' not in dict_rdt_res_clos_max.keys() and 'L3' not in dict_rdt_res_clos_max.keys():
num_clos_max = 0
else:
num_clos_max = min(dict_rdt_res_clos_max.values())
else:
if 'L2' not in dict_rdt_res_clos_max.keys() and 'L3' not in dict_rdt_res_clos_max.keys():
num_clos_max = 0
num_mba_delay = dict_rdt_res_clos_max['MBA']
else:
if cdp_enalbed is not None and cdp_enalbed.strip().lower() == 'y':
for key in dict_rdt_res_clos_max.keys():
if key not in ['MBA']:
dict_rdt_res_clos_max[key] = int(dict_rdt_res_clos_max[key] / 2)
common_clos_max = min(dict_rdt_res_clos_max.values())
num_clos_max = common_clos_max * 2
num_mba_delay = common_clos_max
else:
common_clos_max = min(dict_rdt_res_clos_max.values())
num_clos_max = common_clos_max
num_mba_delay = common_clos_max
return (num_clos_max, num_mba_delay)
def get_board_rdt_res_clos_max(board_file_name):
"""
get rdt res clos max of the board
:param board_file_name: the file name of the board
:return: the rdt res clos max
"""
board_config = get_board_config(board_file_name)
rdt_res_supported = []
rdt_res_clos_max = []
if board_config is not None:
board_info_root = board_config.get_curr_root()
if board_info_root is not None:
for item in list(board_info_root):
if item.tag == 'CLOS_INFO':
for line in item.text.split('\n'):
line = line.strip()
if line.startswith('rdt resources supported'):
try:
rdt_res_supported = line.split(':')[1].split(',')
except:
pass
if line.startswith('rdt resource clos max:'):
try:
rdt_res_clos_max = line.split(':')[1].split(',')
break
except:
pass
if len(rdt_res_clos_max) < len(rdt_res_supported):
for i in range(len(rdt_res_supported) - len(rdt_res_clos_max)):
rdt_res_clos_max.append(0)
dict_rdt_res_clos_max = {}
for i in range(len(rdt_res_supported)):
try:
clos_max = int(rdt_res_clos_max[i].strip())
except:
clos_max = 0
if clos_max > 0:
dict_rdt_res_clos_max[rdt_res_supported[i].strip()] = clos_max
return dict_rdt_res_clos_max
def assign_vm_id(scenario_config):
"""
assign vm ids for the scenario setting.
:param scenario_config:
:return:
"""
root = scenario_config.get_curr_root()
pre_launched_vm_num = 0
sos_vm_num = 0
post_launched_vm_num = 0
for vm in list(root):
if vm.tag == 'vm':
for item in list(vm):
if item.tag == 'vm_type':
if item.text in ['PRE_STD_VM', 'SAFETY_VM', 'PRE_RT_VM']:
pre_launched_vm_num += 1
elif item.text in ['SERVICE_VM']:
sos_vm_num += 1
elif item.text in ['POST_STD_VM', 'POST_RT_VM', 'KATA_VM']:
post_launched_vm_num += 1
pre_launched_vm_index = 0
sos_vm_index = pre_launched_vm_num
post_launched_vm_index = pre_launched_vm_num + sos_vm_num
for vm in list(root):
if vm.tag == 'vm':
for item in list(vm):
if item.tag == 'vm_type':
if item.text in ['PRE_STD_VM', 'SAFETY_VM', 'PRE_RT_VM']:
vm.attrib['id'] = str(pre_launched_vm_index)
pre_launched_vm_index += 1
elif item.text in ['SERVICE_VM']:
vm.attrib['id'] = str(sos_vm_index)
sos_vm_index += 1
elif item.text in ['POST_STD_VM', 'POST_RT_VM', 'KATA_VM']:
vm.attrib['id'] = str(post_launched_vm_index)
post_launched_vm_index += 1
def get_xpath_dict_of_xsd():
"""
get xpath_dict of the xsd config
:return: xpath dict
"""
acrn_config_element_root = get_acrn_config_element(
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'schema', 'config.xsd'))
xpath_dict = acrn_config_element_2_xpath_dict(acrn_config_element_root, {})
return xpath_dict
def set_default_config(config):
'''
set default values for current config
:param config: the current config
:return: the return config setting with default values
'''
xpath_dict = get_xpath_dict_of_xsd()
config_root = config.get_curr_root()
return set_default_config_for_curr_layer(config, config_root, '/'+config_root.tag, xpath_dict)
def set_default_config_for_curr_layer(config, elem, key, xpath_dict):
for sub_elem in list(elem):
sub_key = key+'/'+sub_elem.tag
if sub_key in xpath_dict.keys() and xpath_dict[sub_key] and xpath_dict[sub_key]['default']:
config.set_curr_value(xpath_dict[sub_key]['default'], *tuple(sub_key.split('/')[2:]))
set_default_config_for_curr_layer(config, sub_elem, sub_key, xpath_dict)
return config
def get_generic_config_list():
default_config_path = os.path.join(current_app.config.get('DEFAULT_CONFIG_PATH'), 'generic_board')
default_xml_config = XmlConfig(default_config_path)
secenario_config_list = []
launch_config_list = []
for config_name in os.listdir(default_config_path):
if os.path.isfile(os.path.join(default_config_path, config_name)):
default_xml_config.set_curr(config_name.rsplit('.', 1)[0])
xml_config_root = default_xml_config.get_curr_root()
if 'scenario' in xml_config_root.attrib and 'uos_launcher' not in xml_config_root.attrib:
secenario_config_list.append(config_name.rsplit('.', 1)[0])
elif 'scenario' in xml_config_root.attrib and 'uos_launcher' in xml_config_root.attrib:
launch_config_list.append(config_name.rsplit('.', 1)[0])
return (secenario_config_list, launch_config_list)
@CONFIG_APP.context_processor
def utility_functions():
def print_in_console(message):
print(str(message))
return dict(mdebug=print_in_console)