feat: Support dynamic parameters

This commit is contained in:
Fangyin Cheng 2024-08-30 07:24:22 +08:00
parent 0219f5733b
commit e9155f0c31
2 changed files with 228 additions and 34 deletions

View File

@ -36,6 +36,8 @@ _ALLOWED_TYPES: Dict[str, Type] = {
}
_BASIC_TYPES = [str, int, float, bool, dict, list, set]
_DYNAMIC_PARAMETER_TYPES = [str, int, float, bool]
DefaultParameterType = Union[str, int, float, bool, None]
T = TypeVar("T", bound="ViewMixin")
TM = TypeVar("TM", bound="TypeMetadata")
@ -292,9 +294,6 @@ class ParameterCategory(str, Enum):
return cls.RESOURCER
DefaultParameterType = Union[str, int, float, bool, None]
class TypeMetadata(BaseModel):
"""The metadata of the type."""
@ -313,7 +312,23 @@ class TypeMetadata(BaseModel):
return self.__class__(**self.model_dump(exclude_defaults=True))
class Parameter(TypeMetadata, Serializable):
class BaseDynamic(BaseModel):
"""The base dynamic field."""
dynamic: bool = Field(
default=False,
description="Whether current field is dynamic",
examples=[True, False],
)
dynamic_minimum: int = Field(
default=0,
description="The minimum count of the dynamic field, only valid when dynamic is"
" True",
examples=[0, 1, 2],
)
class Parameter(BaseDynamic, TypeMetadata, Serializable):
"""Parameter for build operator."""
label: str = Field(
@ -332,11 +347,6 @@ class Parameter(TypeMetadata, Serializable):
description="The category of the parameter",
examples=["common", "resource"],
)
# resource_category: Optional[str] = Field(
# default=None,
# description="The category of the resource, just for resource type",
# examples=["llm_client", "common"],
# )
resource_type: ResourceType = Field(
default=ResourceType.INSTANCE,
description="The type of the resource, just for resource type",
@ -389,6 +399,17 @@ class Parameter(TypeMetadata, Serializable):
values[k] = handled_v
return values
@model_validator(mode="after")
def check_parameters(self) -> "Parameter":
"""Check the parameters."""
if self.dynamic and not self.is_list:
raise FlowMetadataException("Dynamic parameter must be list.")
if self.dynamic and self.dynamic_minimum < 0:
raise FlowMetadataException(
"Dynamic minimum must be greater then or equal to 0."
)
return self
@classmethod
def _covert_to_real_type(cls, type_cls: str, v: Any, is_list: bool) -> Any:
def _parse_single_value(vv: Any) -> Any:
@ -450,6 +471,8 @@ class Parameter(TypeMetadata, Serializable):
description: Optional[str] = None,
options: Optional[Union[BaseDynamicOptions, List[OptionValue]]] = None,
resource_type: ResourceType = ResourceType.INSTANCE,
dynamic: bool = False,
dynamic_minimum: int = 0,
alias: Optional[List[str]] = None,
ui: Optional[UIComponent] = None,
):
@ -461,6 +484,8 @@ class Parameter(TypeMetadata, Serializable):
raise ValueError(f"Default value is missing for optional parameter {name}.")
if not optional:
default = None
if dynamic and type not in _DYNAMIC_PARAMETER_TYPES:
raise ValueError("Dynamic parameter must be str, int, float or bool.")
return cls(
label=label,
name=name,
@ -474,6 +499,8 @@ class Parameter(TypeMetadata, Serializable):
placeholder=placeholder,
description=description or label,
options=options,
dynamic=dynamic,
dynamic_minimum=dynamic_minimum,
alias=alias,
ui=ui,
)
@ -635,6 +662,11 @@ class BaseResource(Serializable, BaseModel):
description="The label to display in UI",
examples=["LLM Operator", "OpenAI LLM Client"],
)
custom_label: Optional[str] = Field(
None,
description="The custom label to display in UI",
examples=["LLM Operator", "OpenAI LLM Client"],
)
name: str = Field(
...,
description="The name of the operator",
@ -668,7 +700,7 @@ class IOFiledType(str, Enum):
LIST = "list"
class IOField(Resource):
class IOField(BaseDynamic, Resource):
"""The input or output field of the operator."""
is_list: bool = Field(
@ -676,17 +708,6 @@ class IOField(Resource):
description="Whether current field is list",
examples=[True, False],
)
dynamic: bool = Field(
default=False,
description="Whether current field is dynamic",
examples=[True, False],
)
dynamic_minimum: int = Field(
default=0,
description="The minimum count of the dynamic field, only valid when dynamic is"
" True",
examples=[0, 1, 2],
)
mappers: Optional[List[str]] = Field(
default=None,
description="The mappers of the field, transform the field to the target type",
@ -724,18 +745,6 @@ class IOField(Resource):
mappers=mappers_cls,
)
@model_validator(mode="before")
@classmethod
def base_pre_fill(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pre fill the metadata."""
if not isinstance(values, dict):
return values
if "dynamic" not in values:
values["dynamic"] = False
if "dynamic_minimum" not in values:
values["dynamic_minimum"] = 0
return values
class BaseMetadata(BaseResource):
"""The base metadata."""
@ -1137,6 +1146,38 @@ class ViewMetadata(BaseMetadata):
values["outputs"] = new_outputs
return values
@model_validator(mode="after")
def check_metadata(self) -> "ViewMetadata":
"""Check the metadata."""
if self.inputs:
for field in self.inputs:
if field.mappers:
raise ValueError("Input field can't have mappers.")
dyn_cnt, is_last_field_dynamic = 0, False
for field in self.inputs:
if field.dynamic:
dyn_cnt += 1
is_last_field_dynamic = True
else:
if is_last_field_dynamic:
raise ValueError("Dynamic field input must be the last field.")
is_last_field_dynamic = False
if dyn_cnt > 1:
raise ValueError("Only one dynamic input field is allowed.")
if self.outputs:
dyn_cnt, is_last_field_dynamic = 0, False
for field in self.outputs:
if field.dynamic:
dyn_cnt += 1
is_last_field_dynamic = True
else:
if is_last_field_dynamic:
raise ValueError("Dynamic field output must be the last field.")
is_last_field_dynamic = False
if dyn_cnt > 1:
raise ValueError("Only one dynamic output field is allowed.")
return self
def get_operator_key(self) -> str:
"""Get the operator key."""
if not self.flow_type:

View File

@ -4,7 +4,7 @@ import json
import logging
from typing import Any, Dict, List, Optional
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel import JoinOperator, MapOperator
from dbgpt.core.awel.flow import (
FunctionDynamicOptions,
IOField,
@ -1243,3 +1243,156 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
if exitcode != 0:
return exitcode, logs_all
return exitcode, logs_all
class ExampleFlowDynamicParametersOperator(MapOperator[str, str]):
"""An example flow operator that includes dynamic parameters."""
metadata = ViewMetadata(
label="Example Dynamic Parameters Operator",
name="example_dynamic_parameters_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic parameters.",
parameters=[
Parameter.build_from(
"Dynamic String",
"dynamic_1",
type=str,
is_list=True,
placeholder="Please input the dynamic parameter",
description="The dynamic parameter you want to use, you can add more, "
"at least 1 parameter.",
dynamic=True,
dynamic_minimum=1,
ui=ui.UIInput(),
),
Parameter.build_from(
"Dynamic Integer",
"dynamic_2",
type=int,
is_list=True,
placeholder="Please input the dynamic parameter",
description="The dynamic parameter you want to use, you can add more, "
"at least 0 parameter.",
dynamic=True,
dynamic_minimum=0,
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
),
],
)
def __init__(self, dynamic_1: List[str], dynamic_2: List[int], **kwargs):
super().__init__(**kwargs)
if not dynamic_1:
raise ValueError("The dynamic string is empty.")
self.dynamic_1 = dynamic_1
self.dynamic_2 = dynamic_2
async def map(self, user_name: str) -> str:
"""Map the user name to the dynamic."""
return "Your name is %s, and your dynamic is %s." % (
user_name,
f"dynamic_1: {self.dynamic_1}, dynamic_2: {self.dynamic_2}",
)
class ExampleFlowDynamicOutputsOperator(MapOperator[str, str]):
"""An example flow operator that includes dynamic outputs."""
metadata = ViewMetadata(
label="Example Dynamic Outputs Operator",
name="example_dynamic_outputs_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic outputs.",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
dynamic=True,
dynamic_minimum=1,
),
],
)
async def map(self, user_name: str) -> str:
"""Map the user name to the dynamic."""
return "Your name is %s, this operator has dynamic outputs." % user_name
class ExampleFlowDynamicInputsOperator(JoinOperator[str]):
"""An example flow operator that includes dynamic inputs."""
metadata = ViewMetadata(
label="Example Dynamic Inputs Operator",
name="example_dynamic_inputs_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic inputs.",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
IOField.build_from(
"Other Inputs",
"other_inputs",
str,
description="Other inputs.",
dynamic=True,
dynamic_minimum=0,
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
),
],
)
def __init__(self, **kwargs):
super().__init__(combine_function=self.join, **kwargs)
async def join(self, user_name: str, *other_inputs: str) -> str:
"""Map the user name to the dynamic."""
if not other_inputs:
dyn_inputs = ["You have no other inputs."]
else:
dyn_inputs = [
f"Input {i}: {input_data}" for i, input_data in enumerate(other_inputs)
]
dyn_str = "\n".join(dyn_inputs)
return "Your name is %s, and your dynamic is %s." % (
user_name,
f"other_inputs:\n{dyn_str}",
)