feat(core): Support refresh for AWEL flow

This commit is contained in:
Fangyin Cheng
2024-08-06 10:17:58 +08:00
parent d6477da379
commit 97b57fb071
6 changed files with 307 additions and 11 deletions

View File

@@ -15,7 +15,11 @@ from dbgpt._private.pydantic import (
model_to_dict,
model_validator,
)
from dbgpt.core.awel.util.parameter_util import BaseDynamicOptions, OptionValue
from dbgpt.core.awel.util.parameter_util import (
BaseDynamicOptions,
OptionValue,
RefreshOptionRequest,
)
from dbgpt.core.interface.serialization import Serializable
from .exceptions import FlowMetadataException, FlowParameterMetadataException
@@ -486,6 +490,25 @@ class Parameter(TypeMetadata, Serializable):
dict_value["ui"] = self.ui.to_dict()
return dict_value
def refresh(self, request: Optional[RefreshOptionRequest] = None) -> Dict:
"""Refresh the options of the parameter.
Args:
request (RefreshOptionRequest): The request to refresh the options.
Returns:
Dict: The response.
"""
dict_value = self.to_dict()
if not self.options:
dict_value["options"] = None
elif isinstance(self.options, BaseDynamicOptions):
values = self.options.refresh(request)
dict_value["options"] = [value.to_dict() for value in values]
else:
dict_value["options"] = [value.to_dict() for value in self.options]
return dict_value
def get_dict_options(self) -> Optional[List[Dict]]:
"""Get the options of the parameter."""
if not self.options:
@@ -657,10 +680,10 @@ class BaseMetadata(BaseResource):
],
)
tags: Optional[List[str]] = Field(
tags: Optional[Dict[str, str]] = Field(
default=None,
description="The tags of the operator",
examples=[["llm", "openai", "gpt3"]],
examples=[{"order": "higher-order"}, {"order": "first-order"}],
)
parameters: List[Parameter] = Field(
@@ -770,6 +793,20 @@ class BaseMetadata(BaseResource):
]
return dict_value
def refresh(self, request: List[RefreshOptionRequest]) -> Dict:
"""Refresh the metadata."""
name_to_request = {req.name: req for req in request}
parameter_requests = {
parameter.name: name_to_request.get(parameter.name)
for parameter in self.parameters
}
dict_value = self.to_dict()
dict_value["parameters"] = [
parameter.refresh(parameter_requests.get(parameter.name))
for parameter in self.parameters
]
return dict_value
class ResourceMetadata(BaseMetadata, TypeMetadata):
"""The metadata of the resource."""
@@ -1053,6 +1090,15 @@ class FlowRegistry:
"""Get the metadata list."""
return [item.metadata.to_dict() for item in self._registry.values()]
def refresh(
self, key: str, is_operator: bool, request: List[RefreshOptionRequest]
) -> Dict:
"""Refresh the metadata."""
if is_operator:
return _get_operator_class(key).metadata.refresh(request) # type: ignore
else:
return _get_resource_class(key).metadata.refresh(request)
_OPERATOR_REGISTRY: FlowRegistry = FlowRegistry()

View File

@@ -8,6 +8,7 @@ from dbgpt.core.interface.serialization import Serializable
from .exceptions import FlowUIComponentException
_UI_TYPE = Literal[
"select",
"cascader",
"checkbox",
"date_picker",
@@ -102,6 +103,38 @@ class UIComponent(RefreshableMixin, Serializable, BaseModel):
return model_to_dict(self)
class UISelect(UIComponent):
"""Select component."""
class UIAttribute(UIComponent.UIAttribute):
"""Select attribute."""
show_search: bool = Field(
False,
description="Whether to show search input",
)
mode: Optional[Literal["tags"]] = Field(
None,
description="The mode of the select",
)
placement: Optional[
Literal["topLeft", "topRight", "bottomLeft", "bottomRight"]
] = Field(
None,
description="The position of the picker panel, None means bottomLeft",
)
ui_type: Literal["select"] = Field("select", frozen=True)
attr: Optional[UIAttribute] = Field(
None,
description="The attributes of the component",
)
def check_parameter(self, parameter_dict: Dict[str, Any]):
"""Check parameter."""
self._check_options(parameter_dict.get("options", {}))
class UICascader(UIComponent):
"""Cascader component."""

View File

@@ -10,6 +10,27 @@ from dbgpt.core.interface.serialization import Serializable
_DEFAULT_DYNAMIC_REGISTRY = {}
class RefreshOptionDependency(BaseModel):
"""The refresh dependency."""
name: str = Field(..., description="The name of the refresh dependency")
value: Optional[Any] = Field(
None, description="The value of the refresh dependency"
)
has_value: bool = Field(
False, description="Whether the refresh dependency has value"
)
class RefreshOptionRequest(BaseModel):
"""The refresh option request."""
name: str = Field(..., description="The name of parameter to refresh")
depends: Optional[List[RefreshOptionDependency]] = Field(
None, description="The depends of the refresh config"
)
class OptionValue(Serializable, BaseModel):
"""The option value of the parameter."""
@@ -28,24 +49,31 @@ class OptionValue(Serializable, BaseModel):
class BaseDynamicOptions(Serializable, BaseModel, ABC):
"""The base dynamic options."""
@abstractmethod
def option_values(self) -> List[OptionValue]:
"""Return the option values of the parameter."""
return self.refresh(None)
@abstractmethod
def refresh(self, request: Optional[RefreshOptionRequest]) -> List[OptionValue]:
"""Refresh the dynamic options."""
class FunctionDynamicOptions(BaseDynamicOptions):
"""The function dynamic options."""
func: Callable[[], List[OptionValue]] = Field(
func: Callable[..., List[OptionValue]] = Field(
..., description="The function to generate the dynamic options"
)
func_id: str = Field(
..., description="The unique id of the function to generate the dynamic options"
)
def option_values(self) -> List[OptionValue]:
"""Return the option values of the parameter."""
return self.func()
def refresh(self, request: Optional[RefreshOptionRequest]) -> List[OptionValue]:
"""Refresh the dynamic options."""
if not request or not request.depends:
return self.func()
kwargs = {dep.name: dep.value for dep in request.depends if dep.has_value}
return self.func(**kwargs)
@model_validator(mode="before")
@classmethod

View File

@@ -12,7 +12,7 @@ from dbgpt.util import PaginationResult
from ..config import APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
from ..service.service import Service
from .schemas import ServeRequest, ServerResponse
from .schemas import RefreshNodeRequest, ServeRequest, ServerResponse
router = APIRouter()
@@ -252,6 +252,23 @@ async def get_nodes():
return Result.succ(metadata_list)
@router.post("/nodes/refresh", dependencies=[Depends(check_api_key)])
async def refresh_nodes(refresh_request: RefreshNodeRequest):
"""Refresh the operator or resource nodes
Returns:
Result[None]: The response
"""
from dbgpt.core.awel.flow.base import _OPERATOR_REGISTRY
new_metadata = _OPERATOR_REGISTRY.refresh(
key=refresh_request.id,
is_operator=refresh_request.flow_type == "operator",
request=refresh_request.refresh,
)
return Result.succ(new_metadata)
def init_endpoints(system_app: SystemApp) -> None:
"""Initialize the endpoints"""
global global_system_app

View File

@@ -1,7 +1,8 @@
from dbgpt._private.pydantic import ConfigDict
from typing import List, Literal
# Define your Pydantic schemas here
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field
from dbgpt.core.awel.flow.flow_factory import FlowPanel
from dbgpt.core.awel.util.parameter_util import RefreshOptionRequest
from ..config import SERVE_APP_NAME_HUMP
@@ -14,3 +15,38 @@ class ServerResponse(FlowPanel):
# TODO define your own fields here
model_config = ConfigDict(title=f"ServerResponse for {SERVE_APP_NAME_HUMP}")
class RefreshNodeRequest(BaseModel):
"""Flow response model"""
model_config = ConfigDict(title=f"RefreshNodeRequest")
id: str = Field(
...,
title="The id of the node",
description="The id of the node to refresh",
examples=["operator_llm_operator___$$___llm___$$___v1"],
)
flow_type: Literal["operator", "resource"] = Field(
"operator",
title="The type of the node",
description="The type of the node to refresh",
examples=["operator", "resource"],
)
type_name: str = Field(
...,
title="The type of the node",
description="The type of the node to refresh",
examples=["LLMOperator"],
)
type_cls: str = Field(
...,
title="The class of the node",
description="The class of the node to refresh",
examples=["dbgpt.core.operator.llm.LLMOperator"],
)
refresh: List[RefreshOptionRequest] = Field(
...,
title="The refresh options",
description="The refresh options",
)

View File

@@ -5,6 +5,7 @@ from typing import List, Optional
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
FunctionDynamicOptions,
IOField,
OperatorCategory,
OptionValue,
@@ -16,6 +17,59 @@ from dbgpt.core.awel.flow import (
logger = logging.getLogger(__name__)
class ExampleFlowSelectOperator(MapOperator[str, str]):
"""An example flow operator that includes a select as parameter."""
metadata = ViewMetadata(
label="Example Flow Select",
name="example_flow_select",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a select as parameter.",
parameters=[
Parameter.build_from(
"Fruits Selector",
"fruits",
type=str,
optional=True,
default=None,
placeholder="Select the fruits",
description="The fruits you like.",
options=[
OptionValue(label="Apple", name="apple", value="apple"),
OptionValue(label="Banana", name="banana", value="banana"),
OptionValue(label="Orange", name="orange", value="orange"),
OptionValue(label="Pear", name="pear", value="pear"),
],
ui=ui.UISelect(attr=ui.UISelect.UIAttribute(show_search=True)),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Fruits",
"fruits",
str,
description="User's favorite fruits.",
)
],
)
def __init__(self, fruits: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.fruits = fruits
async def map(self, user_name: str) -> str:
"""Map the user name to the fruits."""
return "Your name is %s, and you like %s." % (user_name, self.fruits)
class ExampleFlowCascaderOperator(MapOperator[str, str]):
"""An example flow operator that includes a cascader as parameter."""
@@ -581,3 +635,85 @@ class ExampleFlowTreeSelectOperator(MapOperator[str, str]):
user_name,
full_address_str,
)
def get_recent_3_times(time_interval: int = 1) -> List[OptionValue]:
"""Get the recent times."""
from datetime import datetime, timedelta
now = datetime.now()
recent_times = [now - timedelta(hours=time_interval * i) for i in range(3)]
formatted_times = [time.strftime("%Y-%m-%d %H:%M:%S") for time in recent_times]
option_values = [
OptionValue(label=formatted_time, name=f"time_{i + 1}", value=formatted_time)
for i, formatted_time in enumerate(formatted_times)
]
return option_values
class ExampleFlowRefreshOperator(MapOperator[str, str]):
"""An example flow operator that includes a refresh option."""
metadata = ViewMetadata(
label="Example Refresh Operator",
name="example_refresh_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a refresh option.",
parameters=[
Parameter.build_from(
"Time Interval",
"time_interval",
type=int,
optional=True,
default=1,
placeholder="Set the time interval",
description="The time interval to fetch the times",
),
Parameter.build_from(
"Recent Time",
"recent_time",
type=str,
optional=True,
default=None,
placeholder="Select the recent time",
description="The recent time to choose.",
options=FunctionDynamicOptions(func=get_recent_3_times),
ui=ui.UISelect(
refresh=True,
refresh_depends=["time_interval"],
attr=ui.UISelect.UIAttribute(show_search=True),
),
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Time",
"time",
str,
description="User's selected time.",
)
],
)
def __init__(
self, time_interval: int = 1, recent_time: Optional[str] = None, **kwargs
):
super().__init__(**kwargs)
self.time_interval = time_interval
self.recent_time = recent_time
async def map(self, user_name: str) -> str:
"""Map the user name to the time."""
return "Your name is %s, and you choose the time %s." % (
user_name,
self.recent_time,
)