mirror of
				https://github.com/csunny/DB-GPT.git
				synced 2025-10-22 17:39:02 +00:00 
			
		
		
		
	Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com> Co-authored-by: aries_ckt <916701291@qq.com> Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com> Co-authored-by: csunny <cfqsunny@163.com>
		
			
				
	
	
		
			857 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			857 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import argparse
 | |
| import os
 | |
| from collections import OrderedDict
 | |
| from dataclasses import MISSING, asdict, dataclass, field, fields, is_dataclass
 | |
| from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from dbgpt._private.pydantic import BaseModel
 | |
| 
 | |
| MISSING_DEFAULT_VALUE = "__MISSING_DEFAULT_VALUE__"
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class ParameterDescription:
 | |
|     required: bool = False
 | |
|     param_class: Optional[str] = None
 | |
|     param_name: Optional[str] = None
 | |
|     param_type: Optional[str] = None
 | |
|     description: Optional[str] = None
 | |
|     default_value: Optional[Any] = None
 | |
|     valid_values: Optional[List[Any]] = None
 | |
|     ext_metadata: Optional[Dict[str, Any]] = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class BaseParameters:
 | |
|     @classmethod
 | |
|     def from_dict(
 | |
|         cls, data: dict, ignore_extra_fields: bool = False
 | |
|     ) -> "BaseParameters":
 | |
|         """Create an instance of the dataclass from a dictionary.
 | |
| 
 | |
|         Args:
 | |
|             data: A dictionary containing values for the dataclass fields.
 | |
|             ignore_extra_fields: If True, any extra fields in the data dictionary that are
 | |
|                 not part of the dataclass will be ignored.
 | |
|                 If False, extra fields will raise an error. Defaults to False.
 | |
|         Returns:
 | |
|             An instance of the dataclass with values populated from the given dictionary.
 | |
| 
 | |
|         Raises:
 | |
|             TypeError: If `ignore_extra_fields` is False and there are fields in the
 | |
|                            dictionary that aren't present in the dataclass.
 | |
|         """
 | |
|         all_field_names = {f.name for f in fields(cls)}
 | |
|         if ignore_extra_fields:
 | |
|             data = {key: value for key, value in data.items() if key in all_field_names}
 | |
|         else:
 | |
|             extra_fields = set(data.keys()) - all_field_names
 | |
|             if extra_fields:
 | |
|                 raise TypeError(f"Unexpected fields: {', '.join(extra_fields)}")
 | |
|         return cls(**data)
 | |
| 
 | |
|     def update_from(self, source: Union["BaseParameters", dict]) -> bool:
 | |
|         """
 | |
|         Update the attributes of this object using the values from another object (of the same or parent type) or a dictionary.
 | |
|         Only update if the new value is different from the current value and the field is not marked as "fixed" in metadata.
 | |
| 
 | |
|         Args:
 | |
|             source (Union[BaseParameters, dict]): The source to update from. Can be another object of the same type or a dictionary.
 | |
| 
 | |
|         Returns:
 | |
|             bool: True if at least one field was updated, otherwise False.
 | |
|         """
 | |
|         updated = False  # Flag to indicate whether any field was updated
 | |
|         if isinstance(source, (BaseParameters, dict)):
 | |
|             for field_info in fields(self):
 | |
|                 # Check if the field has a "fixed" tag in metadata
 | |
|                 tags = field_info.metadata.get("tags")
 | |
|                 tags = [] if not tags else tags.split(",")
 | |
|                 if tags and "fixed" in tags:
 | |
|                     continue  # skip this field
 | |
|                 # Get the new value from source (either another BaseParameters object or a dict)
 | |
|                 new_value = (
 | |
|                     getattr(source, field_info.name)
 | |
|                     if isinstance(source, BaseParameters)
 | |
|                     else source.get(field_info.name, None)
 | |
|                 )
 | |
| 
 | |
|                 # If the new value is not None and different from the current value, update the field and set the flag
 | |
|                 if new_value is not None and new_value != getattr(
 | |
|                     self, field_info.name
 | |
|                 ):
 | |
|                     setattr(self, field_info.name, new_value)
 | |
|                     updated = True
 | |
|         else:
 | |
|             raise ValueError(
 | |
|                 "Source must be an instance of BaseParameters (or its derived class) or a dictionary."
 | |
|             )
 | |
| 
 | |
|         return updated
 | |
| 
 | |
|     def __str__(self) -> str:
 | |
|         return _get_dataclass_print_str(self)
 | |
| 
 | |
|     def to_command_args(self, args_prefix: str = "--") -> List[str]:
 | |
|         """Convert the fields of the dataclass to a list of command line arguments.
 | |
| 
 | |
|         Args:
 | |
|             args_prefix: args prefix
 | |
|         Returns:
 | |
|             A list of strings where each field is represented by two items:
 | |
|             one for the field name prefixed by args_prefix, and one for its value.
 | |
|         """
 | |
|         return _dict_to_command_args(asdict(self), args_prefix=args_prefix)
 | |
| 
 | |
|     def to_dict(self) -> Dict[str, Any]:
 | |
|         return asdict(self)
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class BaseServerParameters(BaseParameters):
 | |
|     host: Optional[str] = field(
 | |
|         default="0.0.0.0", metadata={"help": "The host IP address to bind to."}
 | |
|     )
 | |
|     port: Optional[int] = field(
 | |
|         default=None, metadata={"help": "The port number to bind to."}
 | |
|     )
 | |
|     daemon: Optional[bool] = field(
 | |
|         default=False, metadata={"help": "Run the server as a daemon."}
 | |
|     )
 | |
|     log_level: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "Logging level",
 | |
|             "valid_values": [
 | |
|                 "FATAL",
 | |
|                 "ERROR",
 | |
|                 "WARNING",
 | |
|                 "WARNING",
 | |
|                 "INFO",
 | |
|                 "DEBUG",
 | |
|                 "NOTSET",
 | |
|             ],
 | |
|         },
 | |
|     )
 | |
|     log_file: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "The filename to store log",
 | |
|         },
 | |
|     )
 | |
|     tracer_file: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "The filename to store tracer span records",
 | |
|         },
 | |
|     )
 | |
|     tracer_to_open_telemetry: Optional[bool] = field(
 | |
|         default=os.getenv("TRACER_TO_OPEN_TELEMETRY", "False").lower() == "true",
 | |
|         metadata={
 | |
|             "help": "Whether send tracer span records to OpenTelemetry",
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_endpoint: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "`OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` target to which the span "
 | |
|             "exporter is going to send spans. The endpoint MUST be a valid URL host, "
 | |
|             "and MAY contain a scheme (http or https), port and path. A scheme of https"
 | |
|             " indicates a secure connection and takes precedence over this "
 | |
|             "configuration setting.",
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_insecure: Optional[bool] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "OTEL_EXPORTER_OTLP_TRACES_INSECURE` represents whether to enable "
 | |
|             "client transport security for gRPC requests for spans. A scheme of https "
 | |
|             "takes precedence over the this configuration setting. Default: False"
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_certificate: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "`OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE` stores the path to the "
 | |
|             "certificate file for TLS credentials of gRPC client for traces. "
 | |
|             "Should only be used for a secure connection for tracing",
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_headers: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "`OTEL_EXPORTER_OTLP_TRACES_HEADERS` contains the key-value pairs "
 | |
|             "to be used as headers for spans associated with gRPC or HTTP requests.",
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_timeout: Optional[int] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "`OTEL_EXPORTER_OTLP_TRACES_TIMEOUT` is the maximum time the OTLP "
 | |
|             "exporter will wait for each batch export for spans.",
 | |
|         },
 | |
|     )
 | |
|     otel_exporter_otlp_traces_compression: Optional[str] = field(
 | |
|         default=None,
 | |
|         metadata={
 | |
|             "help": "`OTEL_EXPORTER_OTLP_COMPRESSION` but only for the span exporter. "
 | |
|             "If both are present, this takes higher precedence.",
 | |
|         },
 | |
|     )
 | |
| 
 | |
| 
 | |
| def _get_dataclass_print_str(obj):
 | |
|     class_name = obj.__class__.__name__
 | |
|     parameters = [
 | |
|         f"\n\n=========================== {class_name} ===========================\n"
 | |
|     ]
 | |
|     for field_info in fields(obj):
 | |
|         value = _get_simple_privacy_field_value(obj, field_info)
 | |
|         parameters.append(f"{field_info.name}: {value}")
 | |
|     parameters.append(
 | |
|         "\n======================================================================\n\n"
 | |
|     )
 | |
|     return "\n".join(parameters)
 | |
| 
 | |
| 
 | |
| def _dict_to_command_args(obj: Dict, args_prefix: str = "--") -> List[str]:
 | |
|     """Convert dict to a list of command line arguments
 | |
| 
 | |
|     Args:
 | |
|         obj: dict
 | |
|     Returns:
 | |
|         A list of strings where each field is represented by two items:
 | |
|         one for the field name prefixed by args_prefix, and one for its value.
 | |
|     """
 | |
|     args = []
 | |
|     for key, value in obj.items():
 | |
|         if value is None:
 | |
|             continue
 | |
|         args.append(f"{args_prefix}{key}")
 | |
|         args.append(str(value))
 | |
|     return args
 | |
| 
 | |
| 
 | |
| def _get_simple_privacy_field_value(obj, field_info):
 | |
|     """Retrieve the value of a field from a dataclass instance, applying privacy rules if necessary.
 | |
| 
 | |
|     This function reads the metadata of a field to check if it's tagged with 'privacy'.
 | |
|     If the 'privacy' tag is present, then it modifies the value based on its type
 | |
|     for privacy concerns:
 | |
|     - int: returns -999
 | |
|     - float: returns -999.0
 | |
|     - bool: returns False
 | |
|     - str: if length > 5, masks the middle part and returns first and last char;
 | |
|            otherwise, returns "******"
 | |
| 
 | |
|     Args:
 | |
|         obj: The dataclass instance.
 | |
|         field_info: A Field object that contains information about the dataclass field.
 | |
| 
 | |
|     Returns:
 | |
|     The original or modified value of the field based on the privacy rules.
 | |
| 
 | |
|     Example usage:
 | |
|     @dataclass
 | |
|     class Person:
 | |
|         name: str
 | |
|         age: int
 | |
|         ssn: str = field(metadata={"tags": "privacy"})
 | |
|     p = Person("Alice", 30, "123-45-6789")
 | |
|     print(_get_simple_privacy_field_value(p, Person.ssn))  # A******9
 | |
|     """
 | |
|     tags = field_info.metadata.get("tags")
 | |
|     tags = [] if not tags else tags.split(",")
 | |
|     is_privacy = False
 | |
|     if tags and "privacy" in tags:
 | |
|         is_privacy = True
 | |
|     value = getattr(obj, field_info.name)
 | |
|     if not is_privacy or not value:
 | |
|         return value
 | |
|     field_type = EnvArgumentParser._get_argparse_type(field_info.type)
 | |
|     if field_type is int:
 | |
|         return -999
 | |
|     if field_type is float:
 | |
|         return -999.0
 | |
|     if field_type is bool:
 | |
|         return False
 | |
|     # str
 | |
|     if len(value) > 5:
 | |
|         return value[0] + "******" + value[-1]
 | |
|     return "******"
 | |
| 
 | |
| 
 | |
| def _genenv_ignoring_key_case(
 | |
|     env_key: str, env_prefix: Optional[str] = None, default_value: Optional[str] = None
 | |
| ):
 | |
|     """Get the value from the environment variable, ignoring the case of the key"""
 | |
|     if env_prefix:
 | |
|         env_key = env_prefix + env_key
 | |
|     return os.getenv(
 | |
|         env_key, os.getenv(env_key.upper(), os.getenv(env_key.lower(), default_value))
 | |
|     )
 | |
| 
 | |
| 
 | |
| def _genenv_ignoring_key_case_with_prefixes(
 | |
|     env_key: str,
 | |
|     env_prefixes: Optional[List[str]] = None,
 | |
|     default_value: Optional[str] = None,
 | |
| ) -> str:
 | |
|     if env_prefixes:
 | |
|         for env_prefix in env_prefixes:
 | |
|             env_var_value = _genenv_ignoring_key_case(env_key, env_prefix)
 | |
|             if env_var_value:
 | |
|                 return env_var_value
 | |
|     return _genenv_ignoring_key_case(env_key, default_value=default_value)
 | |
| 
 | |
| 
 | |
| class EnvArgumentParser:
 | |
|     @staticmethod
 | |
|     def get_env_prefix(env_key: str) -> Optional[str]:
 | |
|         if not env_key:
 | |
|             return None
 | |
|         env_key = env_key.replace("-", "_")
 | |
|         return env_key + "_"
 | |
| 
 | |
|     def parse_args_into_dataclass(
 | |
|         self,
 | |
|         dataclass_type: Type,
 | |
|         env_prefixes: Optional[List[str]] = None,
 | |
|         command_args: Optional[List[str]] = None,
 | |
|         **kwargs,
 | |
|     ) -> Any:
 | |
|         """Parse parameters from environment variables and command lines and populate them into data class"""
 | |
|         parser = argparse.ArgumentParser(allow_abbrev=False)
 | |
|         for field in fields(dataclass_type):
 | |
|             env_var_value: Any = _genenv_ignoring_key_case_with_prefixes(
 | |
|                 field.name, env_prefixes
 | |
|             )
 | |
|             if env_var_value:
 | |
|                 env_var_value = env_var_value.strip()
 | |
|                 if field.type is int or field.type == Optional[int]:
 | |
|                     env_var_value = int(env_var_value)
 | |
|                 elif field.type is float or field.type == Optional[float]:
 | |
|                     env_var_value = float(env_var_value)
 | |
|                 elif field.type is bool or field.type == Optional[bool]:
 | |
|                     env_var_value = env_var_value.lower() == "true"
 | |
|                 elif field.type is str or field.type == Optional[str]:
 | |
|                     pass
 | |
|                 else:
 | |
|                     raise ValueError(f"Unsupported parameter type {field.type}")
 | |
|             if not env_var_value:
 | |
|                 env_var_value = kwargs.get(field.name)
 | |
| 
 | |
|             # print(f"env_var_value: {env_var_value} for {field.name}")
 | |
|             # Add a command-line argument for this field
 | |
|             EnvArgumentParser._build_single_argparse_option(
 | |
|                 parser, field, env_var_value
 | |
|             )
 | |
| 
 | |
|         # Parse the command-line arguments
 | |
|         cmd_args, cmd_argv = parser.parse_known_args(args=command_args)
 | |
|         # cmd_args = parser.parse_args(args=command_args)
 | |
|         # print(f"cmd_args: {cmd_args}")
 | |
|         for field in fields(dataclass_type):
 | |
|             # cmd_line_value = getattr(cmd_args, field.name)
 | |
|             if field.name in cmd_args:
 | |
|                 cmd_line_value = getattr(cmd_args, field.name)
 | |
|                 if cmd_line_value is not None:
 | |
|                     kwargs[field.name] = cmd_line_value
 | |
| 
 | |
|         return dataclass_type(**kwargs)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_arg_parser(dataclass_type: Type) -> argparse.ArgumentParser:
 | |
|         parser = argparse.ArgumentParser(description=dataclass_type.__doc__)
 | |
|         for field in fields(dataclass_type):
 | |
|             help_text = field.metadata.get("help", "")
 | |
|             valid_values = field.metadata.get("valid_values", None)
 | |
|             argument_kwargs = {
 | |
|                 "type": EnvArgumentParser._get_argparse_type(field.type),
 | |
|                 "help": help_text,
 | |
|                 "choices": valid_values,
 | |
|                 "required": EnvArgumentParser._is_require_type(field.type),
 | |
|             }
 | |
|             if field.default != MISSING:
 | |
|                 argument_kwargs["default"] = field.default
 | |
|                 argument_kwargs["required"] = False
 | |
|             parser.add_argument(f"--{field.name}", **argument_kwargs)
 | |
|         return parser
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_click_option_from_field(field_name: str, field: Type, is_func=True):
 | |
|         import click
 | |
| 
 | |
|         help_text = field.metadata.get("help", "")
 | |
|         valid_values = field.metadata.get("valid_values", None)
 | |
|         cli_params = {
 | |
|             "default": None if field.default is MISSING else field.default,
 | |
|             "help": help_text,
 | |
|             "show_default": True,
 | |
|             "required": field.default is MISSING,
 | |
|         }
 | |
|         if valid_values:
 | |
|             cli_params["type"] = click.Choice(valid_values)
 | |
|         real_type = EnvArgumentParser._get_argparse_type(field.type)
 | |
|         if real_type is int:
 | |
|             cli_params["type"] = click.INT
 | |
|         elif real_type is float:
 | |
|             cli_params["type"] = click.FLOAT
 | |
|         elif real_type is str:
 | |
|             cli_params["type"] = click.STRING
 | |
|         elif real_type is bool:
 | |
|             cli_params["is_flag"] = True
 | |
|         name = f"--{field_name}"
 | |
|         if is_func:
 | |
|             return click.option(
 | |
|                 name,
 | |
|                 **cli_params,
 | |
|             )
 | |
|         else:
 | |
|             return click.Option([name], **cli_params)
 | |
| 
 | |
|     @staticmethod
 | |
|     def create_click_option(
 | |
|         *dataclass_types: Type,
 | |
|         _dynamic_factory: Optional[Callable[[], List[Type]]] = None,
 | |
|     ):
 | |
|         import functools
 | |
|         from collections import OrderedDict
 | |
| 
 | |
|         combined_fields = OrderedDict()
 | |
|         if _dynamic_factory:
 | |
|             _types = _dynamic_factory()
 | |
|             if _types:
 | |
|                 dataclass_types = list(_types)  # type: ignore
 | |
|         for dataclass_type in dataclass_types:
 | |
|             # type: ignore
 | |
|             for field in fields(dataclass_type):
 | |
|                 if field.name not in combined_fields:
 | |
|                     combined_fields[field.name] = field
 | |
| 
 | |
|         def decorator(func):
 | |
|             for field_name, field in reversed(combined_fields.items()):
 | |
|                 option_decorator = EnvArgumentParser._create_click_option_from_field(
 | |
|                     field_name, field
 | |
|                 )
 | |
|                 func = option_decorator(func)
 | |
| 
 | |
|             @functools.wraps(func)
 | |
|             def wrapper(*args, **kwargs):
 | |
|                 return func(*args, **kwargs)
 | |
| 
 | |
|             return wrapper
 | |
| 
 | |
|         return decorator
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_raw_click_option(
 | |
|         *dataclass_types: Type,
 | |
|         _dynamic_factory: Optional[Callable[[], List[Type]]] = None,
 | |
|     ):
 | |
|         combined_fields = _merge_dataclass_types(
 | |
|             *dataclass_types, _dynamic_factory=_dynamic_factory
 | |
|         )
 | |
|         options = []
 | |
| 
 | |
|         for field_name, field in reversed(combined_fields.items()):
 | |
|             options.append(
 | |
|                 EnvArgumentParser._create_click_option_from_field(
 | |
|                     field_name, field, is_func=False
 | |
|                 )
 | |
|             )
 | |
|         return options
 | |
| 
 | |
|     @staticmethod
 | |
|     def create_argparse_option(
 | |
|         *dataclass_types: Type,
 | |
|         _dynamic_factory: Optional[Callable[[], List[Type]]] = None,
 | |
|     ) -> argparse.ArgumentParser:
 | |
|         combined_fields = _merge_dataclass_types(
 | |
|             *dataclass_types, _dynamic_factory=_dynamic_factory
 | |
|         )
 | |
|         parser = argparse.ArgumentParser()
 | |
|         for _, field in reversed(combined_fields.items()):
 | |
|             EnvArgumentParser._build_single_argparse_option(parser, field)
 | |
|         return parser
 | |
| 
 | |
|     @staticmethod
 | |
|     def _build_single_argparse_option(
 | |
|         parser: argparse.ArgumentParser, field, default_value=None
 | |
|     ):
 | |
|         # Add a command-line argument for this field
 | |
|         help_text = field.metadata.get("help", "")
 | |
|         valid_values = field.metadata.get("valid_values", None)
 | |
|         short_name = field.metadata.get("short", None)
 | |
|         argument_kwargs = {
 | |
|             "type": EnvArgumentParser._get_argparse_type(field.type),
 | |
|             "help": help_text,
 | |
|             "choices": valid_values,
 | |
|             "required": EnvArgumentParser._is_require_type(field.type),
 | |
|         }
 | |
|         if field.default != MISSING:
 | |
|             argument_kwargs["default"] = field.default
 | |
|             argument_kwargs["required"] = False
 | |
|         if default_value:
 | |
|             argument_kwargs["default"] = default_value
 | |
|             argument_kwargs["required"] = False
 | |
|         if field.type is bool or field.type == Optional[bool]:
 | |
|             argument_kwargs["action"] = "store_true"
 | |
|             del argument_kwargs["type"]
 | |
|             del argument_kwargs["choices"]
 | |
|         names = []
 | |
|         if short_name:
 | |
|             names.append(f"-{short_name}")
 | |
|         names.append(f"--{field.name}")
 | |
|         parser.add_argument(*names, **argument_kwargs)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _get_argparse_type(field_type: Type) -> Type:
 | |
|         # Return the appropriate type for argparse to use based on the field type
 | |
|         if field_type is int or field_type == Optional[int]:
 | |
|             return int
 | |
|         elif field_type is float or field_type == Optional[float]:
 | |
|             return float
 | |
|         elif field_type is bool or field_type == Optional[bool]:
 | |
|             return bool
 | |
|         elif field_type is str or field_type == Optional[str]:
 | |
|             return str
 | |
|         elif field_type is dict or field_type == Optional[dict]:
 | |
|             return dict
 | |
|         else:
 | |
|             raise ValueError(f"Unsupported parameter type {field_type}")
 | |
| 
 | |
|     @staticmethod
 | |
|     def _get_argparse_type_str(field_type: Type) -> str:
 | |
|         argparse_type = EnvArgumentParser._get_argparse_type(field_type)
 | |
|         if argparse_type is int:
 | |
|             return "int"
 | |
|         elif argparse_type is float:
 | |
|             return "float"
 | |
|         elif argparse_type is bool:
 | |
|             return "bool"
 | |
|         elif argparse_type is dict:
 | |
|             return "dict"
 | |
|         else:
 | |
|             return "str"
 | |
| 
 | |
|     @staticmethod
 | |
|     def _is_require_type(field_type: Type) -> bool:
 | |
|         return field_type not in [Optional[int], Optional[float], Optional[bool]]
 | |
| 
 | |
|     @staticmethod
 | |
|     def _kwargs_to_env_key_value(
 | |
|         kwargs: Dict, prefix: str = "__dbgpt_gunicorn__env_prefix__"
 | |
|     ) -> Dict[str, str]:
 | |
|         return {prefix + k: str(v) for k, v in kwargs.items()}
 | |
| 
 | |
|     @staticmethod
 | |
|     def _read_env_key_value(
 | |
|         prefix: str = "__dbgpt_gunicorn__env_prefix__",
 | |
|     ) -> List[str]:
 | |
|         env_args = []
 | |
|         for key, value in os.environ.items():
 | |
|             if key.startswith(prefix):
 | |
|                 arg_key = "--" + key.replace(prefix, "")
 | |
|                 if value.lower() in ["true", "1"]:
 | |
|                     # Flag args
 | |
|                     env_args.append(arg_key)
 | |
|                 elif not value.lower() in ["false", "0"]:
 | |
|                     env_args.extend([arg_key, value])
 | |
|         return env_args
 | |
| 
 | |
| 
 | |
| def _merge_dataclass_types(
 | |
|     *dataclass_types: Type, _dynamic_factory: Optional[Callable[[], List[Type]]] = None
 | |
| ) -> OrderedDict:
 | |
|     combined_fields = OrderedDict()
 | |
|     if _dynamic_factory:
 | |
|         _types = _dynamic_factory()
 | |
|         if _types:
 | |
|             dataclass_types = list(_types)  # type: ignore
 | |
|     for dataclass_type in dataclass_types:
 | |
|         for field in fields(dataclass_type):
 | |
|             if field.name not in combined_fields:
 | |
|                 combined_fields[field.name] = field
 | |
|     return combined_fields
 | |
| 
 | |
| 
 | |
| def _type_str_to_python_type(type_str: str) -> Type:
 | |
|     type_mapping: Dict[str, Type] = {
 | |
|         "int": int,
 | |
|         "float": float,
 | |
|         "bool": bool,
 | |
|         "str": str,
 | |
|     }
 | |
|     return type_mapping.get(type_str, str)
 | |
| 
 | |
| 
 | |
| def _get_parameter_descriptions(
 | |
|     dataclass_type: Type, **kwargs
 | |
| ) -> List[ParameterDescription]:
 | |
|     descriptions = []
 | |
|     for field in fields(dataclass_type):
 | |
|         ext_metadata = {
 | |
|             k: v for k, v in field.metadata.items() if k not in ["help", "valid_values"]
 | |
|         }
 | |
|         default_value = field.default if field.default != MISSING else None
 | |
|         if field.name in kwargs:
 | |
|             default_value = kwargs[field.name]
 | |
|         descriptions.append(
 | |
|             ParameterDescription(
 | |
|                 param_class=f"{dataclass_type.__module__}.{dataclass_type.__name__}",
 | |
|                 param_name=field.name,
 | |
|                 param_type=EnvArgumentParser._get_argparse_type_str(field.type),
 | |
|                 description=field.metadata.get("help", None),
 | |
|                 required=field.default is MISSING,
 | |
|                 default_value=default_value,
 | |
|                 valid_values=field.metadata.get("valid_values", None),
 | |
|                 ext_metadata=ext_metadata,
 | |
|             )
 | |
|         )
 | |
|     return descriptions
 | |
| 
 | |
| 
 | |
| def _build_parameter_class(desc: List[ParameterDescription]) -> Type:
 | |
|     from dbgpt.util.module_utils import import_from_string
 | |
| 
 | |
|     if not desc:
 | |
|         raise ValueError("Parameter descriptions cant be empty")
 | |
|     param_class_str = desc[0].param_class
 | |
|     class_name = None
 | |
|     if param_class_str:
 | |
|         param_class = import_from_string(param_class_str, ignore_import_error=True)
 | |
|         if param_class:
 | |
|             return param_class
 | |
|         module_name, _, class_name = param_class_str.rpartition(".")
 | |
| 
 | |
|     fields_dict = {}  # This will store field names and their default values or field()
 | |
|     annotations = {}  # This will store the type annotations for the fields
 | |
| 
 | |
|     for d in desc:
 | |
|         metadata = d.ext_metadata if d.ext_metadata else {}
 | |
|         metadata["help"] = d.description
 | |
|         metadata["valid_values"] = d.valid_values
 | |
| 
 | |
|         annotations[d.param_name] = _type_str_to_python_type(
 | |
|             d.param_type  # type: ignore
 | |
|         )  # Set type annotation
 | |
|         fields_dict[d.param_name] = field(default=d.default_value, metadata=metadata)
 | |
| 
 | |
|     # Create the new class. Note the setting of __annotations__ for type hints
 | |
|     new_class = type(
 | |
|         class_name,  # type: ignore
 | |
|         (object,),
 | |
|         {**fields_dict, "__annotations__": annotations},  # type: ignore
 | |
|     )
 | |
|     # Make it a dataclass
 | |
|     result_class = dataclass(new_class)  # type: ignore
 | |
| 
 | |
|     return result_class
 | |
| 
 | |
| 
 | |
| def _extract_parameter_details(
 | |
|     parser: argparse.ArgumentParser,
 | |
|     param_class: Optional[str] = None,
 | |
|     skip_names: Optional[List[str]] = None,
 | |
|     overwrite_default_values: Optional[Dict[str, Any]] = None,
 | |
| ) -> List[ParameterDescription]:
 | |
|     if overwrite_default_values is None:
 | |
|         overwrite_default_values = {}
 | |
|     descriptions = []
 | |
| 
 | |
|     for action in parser._actions:
 | |
|         if (
 | |
|             action.default == argparse.SUPPRESS
 | |
|         ):  # typically this means the argument was not provided
 | |
|             continue
 | |
| 
 | |
|         # determine parameter class (store_true/store_false are flags)
 | |
|         flag_or_option = (
 | |
|             "flag" if isinstance(action, argparse._StoreConstAction) else "option"
 | |
|         )
 | |
| 
 | |
|         # extract parameter name (use the first option string, typically the long form)
 | |
|         param_name = action.option_strings[0] if action.option_strings else action.dest
 | |
|         if param_name.startswith("--"):
 | |
|             param_name = param_name[2:]
 | |
|         if param_name.startswith("-"):
 | |
|             param_name = param_name[1:]
 | |
| 
 | |
|         param_name = param_name.replace("-", "_")
 | |
| 
 | |
|         if skip_names and param_name in skip_names:
 | |
|             continue
 | |
| 
 | |
|         # gather other details
 | |
|         default_value = action.default
 | |
|         if param_name in overwrite_default_values:
 | |
|             default_value = overwrite_default_values[param_name]
 | |
|         arg_type = (
 | |
|             action.type
 | |
|             if not callable(action.type)
 | |
|             else str(action.type.__name__)  # type: ignore
 | |
|         )
 | |
|         description = action.help
 | |
| 
 | |
|         # determine if the argument is required
 | |
|         required = action.required
 | |
| 
 | |
|         # extract valid values for choices, if provided
 | |
|         valid_values = list(action.choices) if action.choices is not None else None
 | |
| 
 | |
|         # set ext_metadata as an empty dict for now, can be updated later if needed
 | |
|         ext_metadata: Dict[str, Any] = {}
 | |
| 
 | |
|         descriptions.append(
 | |
|             ParameterDescription(
 | |
|                 param_class=param_class,
 | |
|                 param_name=param_name,
 | |
|                 param_type=arg_type,
 | |
|                 default_value=default_value,
 | |
|                 description=description,
 | |
|                 required=required,
 | |
|                 valid_values=valid_values,
 | |
|                 ext_metadata=ext_metadata,
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     return descriptions
 | |
| 
 | |
| 
 | |
| def _get_dict_from_obj(obj, default_value=None) -> Optional[Dict]:
 | |
|     if not obj:
 | |
|         return None
 | |
|     if is_dataclass(type(obj)):
 | |
|         params = {}
 | |
|         for field_info in fields(obj):
 | |
|             value = _get_simple_privacy_field_value(obj, field_info)
 | |
|             params[field_info.name] = value
 | |
|         return params
 | |
|     if isinstance(obj, dict):
 | |
|         return obj
 | |
|     return default_value
 | |
| 
 | |
| 
 | |
| def _get_base_model_descriptions(model_cls: "BaseModel") -> List[ParameterDescription]:
 | |
|     from dbgpt._private import pydantic
 | |
| 
 | |
|     version = int(pydantic.VERSION.split(".")[0])  # type: ignore
 | |
|     schema = model_cls.model_json_schema() if version >= 2 else model_cls.schema()
 | |
|     required_fields = set(schema.get("required", []))
 | |
|     param_descs = []
 | |
|     for field_name, field_schema in schema.get("properties", {}).items():
 | |
|         field = model_cls.model_fields[field_name]
 | |
|         param_type = field_schema.get("type")
 | |
|         if not param_type and "anyOf" in field_schema:
 | |
|             for any_of in field_schema["anyOf"]:
 | |
|                 if any_of["type"] != "null":
 | |
|                     param_type = any_of["type"]
 | |
|                     break
 | |
|         if version >= 2:
 | |
|             default_value = (
 | |
|                 field.default
 | |
|                 if hasattr(field, "default")
 | |
|                 and str(field.default) != "PydanticUndefined"
 | |
|                 else None
 | |
|             )
 | |
|         else:
 | |
|             default_value = (
 | |
|                 field.default
 | |
|                 if not field.allow_none
 | |
|                 else (
 | |
|                     field.default_factory() if callable(field.default_factory) else None
 | |
|                 )
 | |
|             )
 | |
|         description = field_schema.get("description", "")
 | |
|         is_required = field_name in required_fields
 | |
|         valid_values = None
 | |
|         ext_metadata = None
 | |
|         if hasattr(field, "field_info"):
 | |
|             valid_values = (
 | |
|                 list(field.field_info.choices)
 | |
|                 if hasattr(field.field_info, "choices")
 | |
|                 else None
 | |
|             )
 | |
|             ext_metadata = (
 | |
|                 field.field_info.extra if hasattr(field.field_info, "extra") else None
 | |
|             )
 | |
|         param_class = f"{model_cls.__module__}.{model_cls.__name__}"
 | |
|         param_desc = ParameterDescription(
 | |
|             param_class=param_class,
 | |
|             param_name=field_name,
 | |
|             param_type=param_type,
 | |
|             default_value=default_value,
 | |
|             description=description,
 | |
|             required=is_required,
 | |
|             valid_values=valid_values,
 | |
|             ext_metadata=ext_metadata,
 | |
|         )
 | |
|         param_descs.append(param_desc)
 | |
|     return param_descs
 | |
| 
 | |
| 
 | |
| class _SimpleArgParser:
 | |
|     def __init__(self, *args):
 | |
|         self.params = {arg.replace("_", "-"): None for arg in args}
 | |
| 
 | |
|     def parse(self, args=None):
 | |
|         import sys
 | |
| 
 | |
|         if args is None:
 | |
|             args = sys.argv[1:]
 | |
|         else:
 | |
|             args = list(args)
 | |
|         prev_arg = None
 | |
|         for arg in args:
 | |
|             if arg.startswith("--"):
 | |
|                 if prev_arg:
 | |
|                     self.params[prev_arg] = None
 | |
|                 prev_arg = arg[2:]
 | |
|             else:
 | |
|                 if prev_arg:
 | |
|                     self.params[prev_arg] = arg
 | |
|                     prev_arg = None
 | |
| 
 | |
|         if prev_arg:
 | |
|             self.params[prev_arg] = None
 | |
| 
 | |
|     def _get_param(self, key):
 | |
|         return self.params.get(key.replace("_", "-")) or self.params.get(key)
 | |
| 
 | |
|     def __getattr__(self, item):
 | |
|         return self._get_param(item)
 | |
| 
 | |
|     def __getitem__(self, key):
 | |
|         return self._get_param(key)
 | |
| 
 | |
|     def get(self, key, default=None):
 | |
|         return self._get_param(key) or default
 | |
| 
 | |
|     def __str__(self):
 | |
|         return "\n".join(
 | |
|             [f'{key.replace("-", "_")}: {value}' for key, value in self.params.items()]
 | |
|         )
 | |
| 
 | |
| 
 | |
| def build_lazy_click_command(*dataclass_types: Type, _dynamic_factory=None):
 | |
|     import click
 | |
| 
 | |
|     class LazyCommand(click.Command):
 | |
|         def __init__(self, *args, **kwargs):
 | |
|             super(LazyCommand, self).__init__(*args, **kwargs)
 | |
|             self.dynamic_params_added = False
 | |
| 
 | |
|         def get_params(self, ctx):
 | |
|             if ctx and not self.dynamic_params_added:
 | |
|                 dynamic_params = EnvArgumentParser._create_raw_click_option(
 | |
|                     *dataclass_types, _dynamic_factory=_dynamic_factory
 | |
|                 )
 | |
|                 for param in reversed(dynamic_params):
 | |
|                     self.params.append(param)
 | |
|                 self.dynamic_params_added = True
 | |
|             return super(LazyCommand, self).get_params(ctx)
 | |
| 
 | |
|     return LazyCommand
 |