From e9155f0c313d73b2be5dcaff0509a3d2c1004745 Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Fri, 30 Aug 2024 07:24:22 +0800 Subject: [PATCH] feat: Support dynamic parameters --- dbgpt/core/awel/flow/base.py | 107 +++++++++++----- examples/awel/awel_flow_ui_components.py | 155 ++++++++++++++++++++++- 2 files changed, 228 insertions(+), 34 deletions(-) diff --git a/dbgpt/core/awel/flow/base.py b/dbgpt/core/awel/flow/base.py index 4e691ed08..99aa77c8b 100644 --- a/dbgpt/core/awel/flow/base.py +++ b/dbgpt/core/awel/flow/base.py @@ -36,6 +36,8 @@ _ALLOWED_TYPES: Dict[str, Type] = { } _BASIC_TYPES = [str, int, float, bool, dict, list, set] +_DYNAMIC_PARAMETER_TYPES = [str, int, float, bool] +DefaultParameterType = Union[str, int, float, bool, None] T = TypeVar("T", bound="ViewMixin") TM = TypeVar("TM", bound="TypeMetadata") @@ -292,9 +294,6 @@ class ParameterCategory(str, Enum): return cls.RESOURCER -DefaultParameterType = Union[str, int, float, bool, None] - - class TypeMetadata(BaseModel): """The metadata of the type.""" @@ -313,7 +312,23 @@ class TypeMetadata(BaseModel): return self.__class__(**self.model_dump(exclude_defaults=True)) -class Parameter(TypeMetadata, Serializable): +class BaseDynamic(BaseModel): + """The base dynamic field.""" + + dynamic: bool = Field( + default=False, + description="Whether current field is dynamic", + examples=[True, False], + ) + dynamic_minimum: int = Field( + default=0, + description="The minimum count of the dynamic field, only valid when dynamic is" + " True", + examples=[0, 1, 2], + ) + + +class Parameter(BaseDynamic, TypeMetadata, Serializable): """Parameter for build operator.""" label: str = Field( @@ -332,11 +347,6 @@ class Parameter(TypeMetadata, Serializable): description="The category of the parameter", examples=["common", "resource"], ) - # resource_category: Optional[str] = Field( - # default=None, - # description="The category of the resource, just for resource type", - # examples=["llm_client", "common"], - # ) resource_type: ResourceType = Field( default=ResourceType.INSTANCE, description="The type of the resource, just for resource type", @@ -389,6 +399,17 @@ class Parameter(TypeMetadata, Serializable): values[k] = handled_v return values + @model_validator(mode="after") + def check_parameters(self) -> "Parameter": + """Check the parameters.""" + if self.dynamic and not self.is_list: + raise FlowMetadataException("Dynamic parameter must be list.") + if self.dynamic and self.dynamic_minimum < 0: + raise FlowMetadataException( + "Dynamic minimum must be greater then or equal to 0." + ) + return self + @classmethod def _covert_to_real_type(cls, type_cls: str, v: Any, is_list: bool) -> Any: def _parse_single_value(vv: Any) -> Any: @@ -450,6 +471,8 @@ class Parameter(TypeMetadata, Serializable): description: Optional[str] = None, options: Optional[Union[BaseDynamicOptions, List[OptionValue]]] = None, resource_type: ResourceType = ResourceType.INSTANCE, + dynamic: bool = False, + dynamic_minimum: int = 0, alias: Optional[List[str]] = None, ui: Optional[UIComponent] = None, ): @@ -461,6 +484,8 @@ class Parameter(TypeMetadata, Serializable): raise ValueError(f"Default value is missing for optional parameter {name}.") if not optional: default = None + if dynamic and type not in _DYNAMIC_PARAMETER_TYPES: + raise ValueError("Dynamic parameter must be str, int, float or bool.") return cls( label=label, name=name, @@ -474,6 +499,8 @@ class Parameter(TypeMetadata, Serializable): placeholder=placeholder, description=description or label, options=options, + dynamic=dynamic, + dynamic_minimum=dynamic_minimum, alias=alias, ui=ui, ) @@ -635,6 +662,11 @@ class BaseResource(Serializable, BaseModel): description="The label to display in UI", examples=["LLM Operator", "OpenAI LLM Client"], ) + custom_label: Optional[str] = Field( + None, + description="The custom label to display in UI", + examples=["LLM Operator", "OpenAI LLM Client"], + ) name: str = Field( ..., description="The name of the operator", @@ -668,7 +700,7 @@ class IOFiledType(str, Enum): LIST = "list" -class IOField(Resource): +class IOField(BaseDynamic, Resource): """The input or output field of the operator.""" is_list: bool = Field( @@ -676,17 +708,6 @@ class IOField(Resource): description="Whether current field is list", examples=[True, False], ) - dynamic: bool = Field( - default=False, - description="Whether current field is dynamic", - examples=[True, False], - ) - dynamic_minimum: int = Field( - default=0, - description="The minimum count of the dynamic field, only valid when dynamic is" - " True", - examples=[0, 1, 2], - ) mappers: Optional[List[str]] = Field( default=None, description="The mappers of the field, transform the field to the target type", @@ -724,18 +745,6 @@ class IOField(Resource): mappers=mappers_cls, ) - @model_validator(mode="before") - @classmethod - def base_pre_fill(cls, values: Dict[str, Any]) -> Dict[str, Any]: - """Pre fill the metadata.""" - if not isinstance(values, dict): - return values - if "dynamic" not in values: - values["dynamic"] = False - if "dynamic_minimum" not in values: - values["dynamic_minimum"] = 0 - return values - class BaseMetadata(BaseResource): """The base metadata.""" @@ -1137,6 +1146,38 @@ class ViewMetadata(BaseMetadata): values["outputs"] = new_outputs return values + @model_validator(mode="after") + def check_metadata(self) -> "ViewMetadata": + """Check the metadata.""" + if self.inputs: + for field in self.inputs: + if field.mappers: + raise ValueError("Input field can't have mappers.") + dyn_cnt, is_last_field_dynamic = 0, False + for field in self.inputs: + if field.dynamic: + dyn_cnt += 1 + is_last_field_dynamic = True + else: + if is_last_field_dynamic: + raise ValueError("Dynamic field input must be the last field.") + is_last_field_dynamic = False + if dyn_cnt > 1: + raise ValueError("Only one dynamic input field is allowed.") + if self.outputs: + dyn_cnt, is_last_field_dynamic = 0, False + for field in self.outputs: + if field.dynamic: + dyn_cnt += 1 + is_last_field_dynamic = True + else: + if is_last_field_dynamic: + raise ValueError("Dynamic field output must be the last field.") + is_last_field_dynamic = False + if dyn_cnt > 1: + raise ValueError("Only one dynamic output field is allowed.") + return self + def get_operator_key(self) -> str: """Get the operator key.""" if not self.flow_type: diff --git a/examples/awel/awel_flow_ui_components.py b/examples/awel/awel_flow_ui_components.py index 2db9607ea..ce411a79d 100644 --- a/examples/awel/awel_flow_ui_components.py +++ b/examples/awel/awel_flow_ui_components.py @@ -4,7 +4,7 @@ import json import logging from typing import Any, Dict, List, Optional -from dbgpt.core.awel import MapOperator +from dbgpt.core.awel import JoinOperator, MapOperator from dbgpt.core.awel.flow import ( FunctionDynamicOptions, IOField, @@ -1243,3 +1243,156 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]): if exitcode != 0: return exitcode, logs_all return exitcode, logs_all + + +class ExampleFlowDynamicParametersOperator(MapOperator[str, str]): + """An example flow operator that includes dynamic parameters.""" + + metadata = ViewMetadata( + label="Example Dynamic Parameters Operator", + name="example_dynamic_parameters_operator", + category=OperatorCategory.EXAMPLE, + description="An example flow operator that includes dynamic parameters.", + parameters=[ + Parameter.build_from( + "Dynamic String", + "dynamic_1", + type=str, + is_list=True, + placeholder="Please input the dynamic parameter", + description="The dynamic parameter you want to use, you can add more, " + "at least 1 parameter.", + dynamic=True, + dynamic_minimum=1, + ui=ui.UIInput(), + ), + Parameter.build_from( + "Dynamic Integer", + "dynamic_2", + type=int, + is_list=True, + placeholder="Please input the dynamic parameter", + description="The dynamic parameter you want to use, you can add more, " + "at least 0 parameter.", + dynamic=True, + dynamic_minimum=0, + ), + ], + inputs=[ + IOField.build_from( + "User Name", + "user_name", + str, + description="The name of the user.", + ), + ], + outputs=[ + IOField.build_from( + "Dynamic", + "dynamic", + str, + description="User's selected dynamic.", + ), + ], + ) + + def __init__(self, dynamic_1: List[str], dynamic_2: List[int], **kwargs): + super().__init__(**kwargs) + if not dynamic_1: + raise ValueError("The dynamic string is empty.") + self.dynamic_1 = dynamic_1 + self.dynamic_2 = dynamic_2 + + async def map(self, user_name: str) -> str: + """Map the user name to the dynamic.""" + return "Your name is %s, and your dynamic is %s." % ( + user_name, + f"dynamic_1: {self.dynamic_1}, dynamic_2: {self.dynamic_2}", + ) + + +class ExampleFlowDynamicOutputsOperator(MapOperator[str, str]): + """An example flow operator that includes dynamic outputs.""" + + metadata = ViewMetadata( + label="Example Dynamic Outputs Operator", + name="example_dynamic_outputs_operator", + category=OperatorCategory.EXAMPLE, + description="An example flow operator that includes dynamic outputs.", + parameters=[], + inputs=[ + IOField.build_from( + "User Name", + "user_name", + str, + description="The name of the user.", + ), + ], + outputs=[ + IOField.build_from( + "Dynamic", + "dynamic", + str, + description="User's selected dynamic.", + dynamic=True, + dynamic_minimum=1, + ), + ], + ) + + async def map(self, user_name: str) -> str: + """Map the user name to the dynamic.""" + return "Your name is %s, this operator has dynamic outputs." % user_name + + +class ExampleFlowDynamicInputsOperator(JoinOperator[str]): + """An example flow operator that includes dynamic inputs.""" + + metadata = ViewMetadata( + label="Example Dynamic Inputs Operator", + name="example_dynamic_inputs_operator", + category=OperatorCategory.EXAMPLE, + description="An example flow operator that includes dynamic inputs.", + parameters=[], + inputs=[ + IOField.build_from( + "User Name", + "user_name", + str, + description="The name of the user.", + ), + IOField.build_from( + "Other Inputs", + "other_inputs", + str, + description="Other inputs.", + dynamic=True, + dynamic_minimum=0, + ), + ], + outputs=[ + IOField.build_from( + "Dynamic", + "dynamic", + str, + description="User's selected dynamic.", + ), + ], + ) + + def __init__(self, **kwargs): + super().__init__(combine_function=self.join, **kwargs) + + async def join(self, user_name: str, *other_inputs: str) -> str: + """Map the user name to the dynamic.""" + if not other_inputs: + dyn_inputs = ["You have no other inputs."] + else: + dyn_inputs = [ + f"Input {i}: {input_data}" for i, input_data in enumerate(other_inputs) + ] + dyn_str = "\n".join(dyn_inputs) + return "Your name is %s, and your dynamic is %s." % ( + user_name, + f"other_inputs:\n{dyn_str}", + )