acrn-hypervisor/misc/config_tools/scenario_config/config_summary.py
Kunhui-Li 9c2d0f8858 config_tools: replace RstCloth library with class.
Currently, Configurator load fail because it needs to download
RstCloth related packages. This patch defines Doc class to replace
RstCloth library to fix this issue.

Tracked-On: #8395
Signed-off-by: Kunhui-Li <kunhuix.li@intel.com>
Reviewed-by: Junjie Mao <junjie.mao@intel.com>
2023-05-11 15:25:44 +08:00

470 lines
20 KiB
Python

#!/usr/bin/env python3
#
# Copyright (C) 2022 Intel Corporation.
#
# SPDX-License-Identifier: BSD-3-Clause
#
import sys
import argparse
import logging
import typing
import functools
import textwrap
from lxml import etree
t_content = typing.Union[str, typing.List[str]]
class Doc:
def __init__(self, stream: typing.TextIO = sys.stdout, line_width: int = 72) -> None:
self._stream = stream
self._line_width = line_width
def fill(self, text: str, initial_indent: int = 0, subsequent_indent: int = 0) -> str:
return textwrap.fill(
text=text,
width=self._line_width,
initial_indent=" " * initial_indent,
subsequent_indent=" " * subsequent_indent,
expand_tabs=False,
break_long_words=False,
break_on_hyphens=False,
)
def _add(self, content: t_content) -> None:
if isinstance(content, list):
self._stream.write("\n".join(content) + "\n")
else:
self._stream.write(content + "\n")
def content(self, content: t_content, indent: int = 0) -> None:
if isinstance(content, list):
content = " ".join(content)
self._add(self.fill(content, indent, indent))
def note(self):
marker = ".. {type}::".format(type='note')
self._add(marker)
def newline(self, count: int = 1) -> None:
if count == 1:
self._add("")
else:
self._add("\n" * (count - 1))
def table(self, header: typing.List, data) -> None:
column_widths = list()
content = list()
data = [header] + data
for i in range(len(header)):
column_widths.append(max(list(map(lambda x: len(str(x[i])), data))))
for j in range(len(data)):
overline = "+" + "+".join(["-" * column_widths[i] for i in range(len(header))]) + "+"
underline = "+" + "+".join(["=" * column_widths[i] for i in range(len(header))]) + "+"
format_raw = "|" + "|".join([str(data[j][i]).ljust(column_widths[i]) for i in range(len(header))]) + "|"
if j == 0:
content.extend([overline, format_raw, underline])
else:
content.extend([format_raw, overline])
if len(data) == 1:
content.append(overline)
self.newline()
self._add(content)
self.newline()
def heading(self, text: str, char: str, overline: bool = False) -> None:
underline = char * len(text)
content = [text, underline]
if overline:
content.insert(0, underline)
self._add(content)
h1 = functools.partialmethod(heading, char="#")
h2 = functools.partialmethod(heading, char="*")
h3 = functools.partialmethod(heading, char="=")
title = functools.partialmethod(heading, char="=", overline=True)
class GenerateRst:
io_port = {}
io_description = []
pci_vuart = {}
pci_ivshmem = {}
amount_l3_cache = {}
service_vm_used_pcpu_list = []
# Class initialization
def __init__(self, board_file_name, scenario_file_name, rst_file_name) -> None:
self.board_etree = etree.parse(board_file_name)
self.scenario_etree = etree.parse(scenario_file_name)
self.file = open(rst_file_name, 'w')
self.doc = Doc(self.file)
# The rst content is written in three parts according to the first level title
# 1. Hardware Resource Allocation 2. Inter-VM Connections 3. VM info
def write_configuration_rst(self):
self.doc.title(f"ACRN Scenario Datasheet")
self.doc.newline()
self.write_hardware_resource_allocation()
self.write_inter_vm_connections()
self.write_vms()
self.close_file()
# Write all info under the first level heading "Hardware Resource Allocation"
def write_hardware_resource_allocation(self):
self.doc.h1("Hardware Resource Allocation")
self.doc.newline()
self.write_pcpu()
self.write_shared_cache()
# Write physical CPUs info table
def write_pcpu(self):
self.doc.h2("Physical CPUs")
column_title, data_table = self.get_pcpu_table()
self.doc.table(column_title, data_table)
# Get VM node from scenario xml
def get_vm_node_list(self):
return self.scenario_etree.xpath("/acrn-config/vm")
# Get a specified VM name from an etree_node
@classmethod
def get_vm_name(cls, vm_node):
return f"<VM {vm_node.attrib['id']} {vm_node.find('name').text}>"
# Get all VM names info from the scenario.xml
def get_all_vm_name(self):
return list(map(lambda x: GenerateRst.get_vm_name(x), self.scenario_etree.xpath("/acrn-config/vm")))
# get the column_title and datatable required by the physical CPUs table
def get_pcpu_table(self):
data_table = []
column_title = [" "]
total_pcpu_list = self.get_pcpu()
vm_node_list = self.get_vm_node_list()
pre_launch_vm_node_list = self.scenario_etree.xpath("vm[load_order = 'PRE_LAUNCHED_VM']")
column_title.extend(map(str, total_pcpu_list))
if pre_launch_vm_node_list is not None and len(pre_launch_vm_node_list) != 0:
for pre_launch_vm_node in pre_launch_vm_node_list:
pre_launch_vm_pcpu_list = list(map(int, (pre_launch_vm_node.xpath("cpu_affinity/pcpu/pcpu_id/text()"))))
self.service_vm_used_pcpu_list.extend(set(total_pcpu_list).difference(set(pre_launch_vm_pcpu_list)))
else:
self.service_vm_used_pcpu_list.extend(total_pcpu_list)
for vm_node in vm_node_list:
vm_pcpu_id_list = []
data_row = [" "] * (len(total_pcpu_list) + 1)
data_row[0] = GenerateRst.get_vm_name(vm_node)
vm_load_order = vm_node.find("load_order").text
vm_pcpu_id_node = vm_node.xpath("cpu_affinity/pcpu/pcpu_id")
vm_pcpu_id_list.extend(map(lambda x: int(x.text), vm_pcpu_id_node))
if len(vm_pcpu_id_node) == 0 and vm_load_order == "SERVICE_VM":
for pcpu in self.service_vm_used_pcpu_list:
data_row[pcpu + 1] = "*"
else:
for pcpu in vm_pcpu_id_list:
data_row[pcpu + 1] = "*"
data_table.append(data_row)
return column_title, data_table
# Get all physical CPU information from board.xml
def get_pcpu(self):
pcpu_list = list(map(int, self.board_etree.xpath("processors//cpu_id/text()")))
return pcpu_list
def write_shared_cache(self):
self.doc.h2("Shared Cache")
self.doc.newline()
self.write_each_shared_cache(cache_level='2')
self.write_each_shared_cache(cache_level='3')
# For L2 cache and L3 cache, fill in different cache info according to different situations
def write_each_shared_cache(self, cache_level):
vm_vcpu_info = self.get_vm_used_vcpu(cache_level)
if not bool(vm_vcpu_info) and cache_level == '2':
self.doc.h3(f"Level-{cache_level} Cache")
self.doc.newline()
self.doc.content("Level-2 caches are local on all physical CPUs.")
elif not bool(vm_vcpu_info) and cache_level == '3':
return
else:
self.doc.h3(f"Level-{cache_level} Cache")
for cache_info, vm_info in vm_vcpu_info.items():
each_cache_way_size = self.get_each_cache_way_info(cache_level, cache_info[1])[0]
column_title, data_table = self.get_vcpu_table({cache_info: vm_info}, cache_level)
self.doc.table(column_title, data_table)
self.doc.note(name="note", content=f"Each cache chunk is {each_cache_way_size}KB.")
self.doc.newline()
# Get used vcpu table
def get_vcpu_table(self, vm_vcpu_info, cache_level):
data_table = []
column_title = [" "]
chunk_list = self.get_each_cache_way_info(cache_level=cache_level)[1]
column_title.extend(chunk_list)
for vm_name, vcpu_list in vm_vcpu_info.items():
for vcpu_tuple in vcpu_list:
data_row = [f"{vcpu_tuple[0]} vCPU{vcpu_tuple[1]}"] + ([" "] * len(chunk_list))
clos_mask_list = list(vcpu_tuple[2])
for index in range(len(clos_mask_list)):
if clos_mask_list[index] == '1':
data_row[index + 1] = "*"
data_table.append(data_row)
return column_title, sorted(data_table, key=lambda x: x[-1])
# Get the vcpu info used by each VM from scenario.xml
def get_vm_used_vcpu(self, cache_level):
vm_used_vcpu_info = {}
cache_allocation_list = self.scenario_etree.xpath("/acrn-config/hv/CACHE_REGION/CACHE_ALLOCATION")
for cache_allocation_node in cache_allocation_list:
vm_name_list = []
vcpu_num_list = []
clos_mask_list = []
cache_id = cache_allocation_node.find("CACHE_ID").text
if cache_allocation_node.find("CACHE_LEVEL").text == cache_level:
policy_node_list = cache_allocation_node.xpath("POLICY")
vm_name_list.extend(map(lambda x: x.find("VM").text, policy_node_list))
vcpu_num_list.extend(map(lambda x: int(x.find("VCPU").text), policy_node_list))
clos_mask_list.extend(map(lambda x: bin(int(x.find("CLOS_MASK").text, base=16))[2:], policy_node_list))
for index in range(len(vm_name_list)):
if (cache_level, cache_id) not in vm_used_vcpu_info.keys():
vm_used_vcpu_info[(cache_level, cache_id)] = [(vm_name_list[index], vcpu_num_list[index], clos_mask_list[index])]
else:
vm_used_vcpu_info[(cache_level, cache_id)].append((vm_name_list[index], vcpu_num_list[index], clos_mask_list[index]))
return vm_used_vcpu_info
# Write all info under the first level heading "Inter-VM Connections"
def write_inter_vm_connections(self):
self.doc.h1("Inter-VM Connections")
self.doc.newline()
self.write_virtual_uarts()
self.write_shared_memory()
# Write the virtual uarts info according to the virtual uarts table
def write_virtual_uarts(self):
self.doc.h2("Virtual UARTs")
self.doc.newline()
self.doc.content("The table below summarizes the virtual UART connections in the system.")
column_title, data_table = self.get_virtual_uarts_table()
self.doc.table(column_title, data_table)
# Get virtual uarts table
def get_virtual_uarts_table(self):
data_table = []
vm_name_list = self.get_all_vm_name()
column_title = ["vUART Connection"]
column_title.extend(vm_name_list)
vuart_connections_list = self.scenario_etree.xpath("/acrn-config/hv/vuart_connections/vuart_connection")
for vuart_connection in vuart_connections_list:
data_row = [""] * (len(vm_name_list) + 1)
pci_row = [""] * 3
vc_name = vuart_connection.find("name").text
vc_type = vuart_connection.find("type").text
vc_endpoint_list = vuart_connection.xpath("endpoint")
data_row[0] = vc_name
for vc_endpoint_node in vc_endpoint_list:
vc_vm_name = vc_endpoint_node.find("vm_name").text
vc_io_port = vc_endpoint_node.find("io_port").text
if vc_endpoint_node.find("vbdf") is not None:
vc_vbdf = vc_endpoint_node.find("vbdf").text
pci_row[1] = vc_vbdf
else:
vc_vbdf = ""
if vc_vm_name not in self.io_port.keys():
self.io_port[vc_vm_name] = [vc_io_port]
else:
self.io_port[vc_vm_name].append(vc_io_port)
self.io_description.append(vc_name)
for vm_name in vm_name_list:
if vm_name.find(vc_vm_name) != -1:
data_row[vm_name_list.index(vm_name) + 1] = f"{vc_type} {vc_io_port} {vc_vbdf}"
pci_row[0] = vc_vm_name
pci_row[2] = f"{vc_name} {vc_type} {vc_io_port}"
if pci_row[0] not in self.pci_vuart:
self.pci_vuart[pci_row[0]] = [pci_row[-2:]]
else:
self.pci_vuart[pci_row[0]].append(pci_row[-2:])
data_table.append(data_row)
return column_title, data_table
# Write the shared memory information according to the shared memory table
def write_shared_memory(self):
self.doc.h2("Shared Memory")
self.doc.newline()
self.doc.content("The table below summarizes topology of shared memory in the system.")
column_title, data_table = self.get_shared_memory_table()
self.doc.table(column_title, data_table)
# Get shared memory table
def get_shared_memory_table(self):
data_table = []
vm_name_list = self.get_all_vm_name()
column_title = ["Shared Memory Block"]
column_title.extend(vm_name_list)
ivshmem_region_list = self.scenario_etree.xpath("/acrn-config/hv/FEATURES/IVSHMEM/IVSHMEM_REGION")
for ivshmem_region_node in ivshmem_region_list:
data_row = [""] * (len(vm_name_list) + 1)
ivshmem_row = [""] * 3
ir_name = ivshmem_region_node.find("NAME").text
powered_by = ivshmem_region_node.find("PROVIDED_BY").text
data_row[0] = ir_name
for ivshmem_vm in ivshmem_region_node.xpath("IVSHMEM_VMS/IVSHMEM_VM"):
ivshmem_vm_name = ivshmem_vm.find("VM_NAME").text
ivshmem_vm_vbdf = ivshmem_vm.find("VBDF").text
for vm_name in vm_name_list:
if vm_name.find(ivshmem_vm_name) != -1:
ivshmem_row[0] = ivshmem_vm_name
ivshmem_row[1] = ivshmem_vm_vbdf
ivshmem_row[2] = f"pci, {ir_name}, powered by {powered_by}"
data_row[vm_name_list.index(vm_name) + 1] = f"pci {ivshmem_vm_vbdf}"
if ivshmem_row[0] not in self.pci_ivshmem:
self.pci_ivshmem[ivshmem_row[0]] = [ivshmem_row[-2:]]
else:
self.pci_ivshmem[ivshmem_row[0]].append(ivshmem_row[-2:])
data_table.append(data_row)
return column_title, data_table
# Write all info under the first level heading "VM X - <VM Name>"
def write_vms(self):
vm_node_list = self.get_vm_node_list()
for vm_node in vm_node_list:
self.doc.h1(GenerateRst.get_vm_name(vm_node))
self.doc.newline()
self.write_each_vm(vm_node)
# Write the information under all secondary headings for each VM in order
def write_each_vm(self, vm_node):
self.doc.h2("Basic Information")
column_title1, data_table1 = self.get_basic_information_table(vm_node)
self.doc.table(column_title1, data_table1)
self.doc.h2("PCI Devices")
column_title2, data_table2 = self.get_pci_devices_table(vm_node)
self.doc.table(column_title2, data_table2)
self.doc.h2("Fixed I/O Addresses")
column_title3, data_table3 = self.get_fixed_io_address_table(vm_node)
self.doc.table(column_title3, data_table3)
# Get basic information table for VM info
def get_basic_information_table(self, vm_node):
parameter_dict = {}
memory_size = 0
data_table = []
column_title = ["Parameter", "Configuration"]
load_order = vm_node.find("load_order").text
vm_name = vm_node.find("name").text
if len(vm_node.xpath("memory/hpa_region")) == 0 and len(vm_node.xpath("memory/size")) == 0:
memory_size = " "
else:
if len(vm_node.xpath("memory/hpa_region")) == 0:
memory_size = int(vm_node.find("memory/size").text)
else:
hpa_region_list = vm_node.xpath("memory/hpa_region")
for hpa_region in hpa_region_list:
memory_size = memory_size + int(hpa_region.find("size_hpa").text)
vm_vcpu_info_l2 = self.get_vm_used_vcpu("2")
vm_vcpu_info_l3 = self.get_vm_used_vcpu("3")
amount_vm_l3_cache = self.get_amount_l3_cache(vm_node)
parameter_dict["Load Order"] = load_order
if load_order == "SERVICE_VM":
parameter_dict["Number of vCPUs"] = len(self.service_vm_used_pcpu_list)
else:
parameter_dict["Number of vCPUs"] = len(vm_node.xpath(f"cpu_affinity/pcpu/pcpu_id"))
parameter_dict["Ammount of RAM"] = str(memory_size) + "MB"
parameter_dict["Amount of L3 Cache"] = amount_vm_l3_cache
data_table.extend(map(list, parameter_dict.items()))
return column_title, data_table
# Get cache way size for each shared cache in board.xml
def get_each_cache_way_info(self, cache_level, cache_id=None):
chunk_list = []
total_cache_size = 0
total_cache_ways = 0
if cache_id is not None:
cache_node_list = self.board_etree.xpath(f"caches/cache[@level = '{cache_level}' and @id = '{cache_id}']")
else:
cache_node_list = self.board_etree.xpath(f"caches/cache [@level = '{cache_level}']")
for cache_node in cache_node_list:
total_cache_size = max(int(cache_node.find("cache_size").text), total_cache_size)
total_cache_ways = max(int(cache_node.find("ways").text), total_cache_ways)
for i in range(total_cache_ways - 1, -1, -1):
chunk_list.append(f"chunk{i}")
return int(total_cache_size / 1024 / total_cache_ways), chunk_list
# get Amount of L3 Cache info
def get_amount_l3_cache(self, vm_node):
vm_bit_map = 0
vm_name = vm_node.find("name").text
each_cache_way_size = self.get_each_cache_way_info(cache_level='3')[0]
clos_mask_list = self.scenario_etree.xpath(f"/acrn-config/hv/CACHE_REGION/CACHE_ALLOCATION[CACHE_LEVEL='3']/POLICY[VM='{vm_name}']/CLOS_MASK/text()")
for item in clos_mask_list:
vm_bit_map |= int(item, base=16)
vm_cache_way = bin(vm_bit_map).count('1')
amount_vm_l3_cache = f"{vm_cache_way * each_cache_way_size} MB"
return amount_vm_l3_cache
# Get pci device table for VM info
def get_pci_devices_table(self, vm_node):
data_table = []
vm_name = vm_node.find("name").text
column_title = ["Parameter", "Configuration"]
pci_devices_list = vm_node.xpath("pci_devs/pci_dev")
if len(pci_devices_list) != 0:
for pci_device in pci_devices_list:
data_row = pci_device.text.split(" ", 1)
data_table.append(data_row)
if vm_name in self.pci_vuart.keys():
for item in self.pci_vuart[vm_name]:
data_row = [item[0], item[1]]
data_table.append(data_row)
if vm_name in self.pci_ivshmem.keys():
for item in self.pci_ivshmem[vm_name]:
data_row = [item[0], item[1]]
data_table.append(data_row)
return column_title, data_table
# Get fixed io address table for VM info
def get_fixed_io_address_table(self, vm_node):
data_table = []
column_title = ["I/O Address", "Function Description"]
vm_name = vm_node.find("name").text
for k, v in self.io_port.items():
if k in vm_name:
for item in v:
data_table.append([item, "Virtual UART"])
return column_title, data_table
# Close the Rst file after all information is written.
def close_file(self):
self.file.close()
def main(board_xml, scenario_xml, config_summary):
GenerateRst(board_file_name=board_xml, scenario_file_name=scenario_xml,
rst_file_name=config_summary).write_configuration_rst()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("board_file_name",
help="Specifies the board config XML path and name used to generate the rst file")
parser.add_argument("scenario_file_name",
help="Specifies the scenario XML path and name used to generate the rst file")
parser.add_argument("rst_file_name", default="config_summary.rst",
help="the path and name of the output rst file that "
"summaries the config from scenario.xml and board.xml")
args = parser.parse_args()
logging.basicConfig(level="INFO")
sys.exit(main(args.board_file_name, args.scenario_file_name, args.rst_file_name))