feat: AWEL flow supports dynamic parameters (#1251)

This commit is contained in:
Fangyin Cheng
2024-03-04 21:52:32 +08:00
committed by GitHub
parent 3c93fe589a
commit 191f546ca4
6 changed files with 122 additions and 35 deletions

View File

@@ -3,11 +3,15 @@
This module contains the classes and functions to build AWEL DAGs from serialized data.
"""
from ..util.parameter_util import ( # noqa: F401
BaseDynamicOptions,
FunctionDynamicOptions,
OptionValue,
)
from .base import ( # noqa: F401
IOField,
OperatorCategory,
OperatorType,
OptionValue,
Parameter,
ResourceCategory,
ResourceMetadata,
@@ -29,4 +33,6 @@ __ALL__ = [
"ResourceType",
"OperatorType",
"IOField",
"BaseDynamicOptions",
"FunctionDynamicOptions",
]

View File

@@ -8,6 +8,7 @@ from enum import Enum
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast
from dbgpt._private.pydantic import BaseModel, Field, ValidationError, root_validator
from dbgpt.core.awel.util.parameter_util import BaseDynamicOptions, OptionValue
from dbgpt.core.interface.serialization import Serializable
from .exceptions import FlowMetadataException, FlowParameterMetadataException
@@ -205,18 +206,6 @@ class ResourceType(str, Enum):
CLASS = "class"
class OptionValue(Serializable, BaseModel):
"""The option value of the parameter."""
label: str = Field(..., description="The label of the option")
name: str = Field(..., description="The name of the option")
value: Any = Field(..., description="The value of the option")
def to_dict(self) -> Dict:
"""Convert current metadata to json dict."""
return self.dict()
class ParameterType(str, Enum):
"""The type of the parameter."""
@@ -317,7 +306,7 @@ class Parameter(TypeMetadata, Serializable):
description: Optional[str] = Field(
None, description="The description of the parameter"
)
options: Optional[List[OptionValue]] = Field(
options: Optional[Union[BaseDynamicOptions, List[OptionValue]]] = Field(
None, description="The options of the parameter"
)
value: Optional[Any] = Field(
@@ -379,7 +368,7 @@ class Parameter(TypeMetadata, Serializable):
default: Optional[Union[DefaultParameterType, _MISSING_TYPE]] = _MISSING_VALUE,
placeholder: Optional[DefaultParameterType] = None,
description: Optional[str] = None,
options: Optional[List[OptionValue]] = None,
options: Optional[Union[BaseDynamicOptions, List[OptionValue]]] = None,
resource_type: ResourceType = ResourceType.INSTANCE,
):
"""Build the parameter from the type."""
@@ -435,7 +424,15 @@ class Parameter(TypeMetadata, Serializable):
def to_dict(self) -> Dict:
"""Convert current metadata to json dict."""
return self.dict()
dict_value = self.dict(exclude={"options"})
if not self.options:
dict_value["options"] = None
elif isinstance(self.options, BaseDynamicOptions):
values = self.options.option_values()
dict_value["options"] = [value.to_dict() for value in values]
else:
dict_value["options"] = [value.to_dict() for value in self.options]
return dict_value
def to_runnable_parameter(
self,
@@ -685,6 +682,14 @@ class BaseMetadata(BaseResource):
split_ids = self.id.split("_")
return "_".join(split_ids[:-1])
def to_dict(self) -> Dict:
"""Convert current metadata to json dict."""
dict_value = self.dict(exclude={"parameters"})
dict_value["parameters"] = [
parameter.to_dict() for parameter in self.parameters
]
return dict_value
class ResourceMetadata(BaseMetadata, TypeMetadata):
"""The metadata of the resource."""
@@ -939,9 +944,9 @@ class FlowRegistry:
"""Get the registry item by the key."""
return self._registry.get(key)
def metadata_list(self) -> List[Union[ViewMetadata, ResourceMetadata]]:
def metadata_list(self):
"""Get the metadata list."""
return [item.metadata for item in self._registry.values()]
return [item.metadata.to_dict() for item in self._registry.values()]
_OPERATOR_REGISTRY: FlowRegistry = FlowRegistry()

View File

@@ -0,0 +1,70 @@
"""The parameter utility."""
import inspect
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List
from dbgpt._private.pydantic import BaseModel, Field, root_validator
from dbgpt.core.interface.serialization import Serializable
_DEFAULT_DYNAMIC_REGISTRY = {}
class OptionValue(Serializable, BaseModel):
"""The option value of the parameter."""
label: str = Field(..., description="The label of the option")
name: str = Field(..., description="The name of the option")
value: Any = Field(..., description="The value of the option")
def to_dict(self) -> Dict:
"""Convert current metadata to json dict."""
return self.dict()
class BaseDynamicOptions(Serializable, BaseModel, ABC):
"""The base dynamic options."""
@abstractmethod
def option_values(self) -> List[OptionValue]:
"""Return the option values of the parameter."""
class FunctionDynamicOptions(BaseDynamicOptions):
"""The function dynamic options."""
func: Callable[[], List[OptionValue]] = Field(
..., description="The function to generate the dynamic options"
)
func_id: str = Field(
..., description="The unique id of the function to generate the dynamic options"
)
def option_values(self) -> List[OptionValue]:
"""Return the option values of the parameter."""
return self.func()
@root_validator(pre=True)
def pre_fill(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pre fill the function id."""
func = values.get("func")
if func is None:
raise ValueError(
"The function to generate the dynamic options is required."
)
func_id = _generate_unique_id(func)
values["func_id"] = func_id
_DEFAULT_DYNAMIC_REGISTRY[func_id] = func
return values
def to_dict(self) -> Dict:
"""Convert current metadata to json dict."""
return {"func_id": self.func_id}
def _generate_unique_id(func: Callable) -> str:
if func.__name__ == "<lambda>":
func_id = f"lambda_{inspect.getfile(func)}_{inspect.getsourcelines(func)}"
else:
func_id = f"{func.__module__}.{func.__name__}"
return func_id