From 97b57fb071b28f7203524e14f49a9fda357df8d4 Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Tue, 6 Aug 2024 10:17:58 +0800 Subject: [PATCH] feat(core): Support refresh for AWEL flow --- dbgpt/core/awel/flow/base.py | 52 ++++++++- dbgpt/core/awel/flow/ui.py | 33 ++++++ dbgpt/core/awel/util/parameter_util.py | 38 ++++++- dbgpt/serve/flow/api/endpoints.py | 19 +++- dbgpt/serve/flow/api/schemas.py | 40 ++++++- examples/awel/awel_flow_ui_components.py | 136 +++++++++++++++++++++++ 6 files changed, 307 insertions(+), 11 deletions(-) diff --git a/dbgpt/core/awel/flow/base.py b/dbgpt/core/awel/flow/base.py index 53a4bf2ec..6dd287e56 100644 --- a/dbgpt/core/awel/flow/base.py +++ b/dbgpt/core/awel/flow/base.py @@ -15,7 +15,11 @@ from dbgpt._private.pydantic import ( model_to_dict, model_validator, ) -from dbgpt.core.awel.util.parameter_util import BaseDynamicOptions, OptionValue +from dbgpt.core.awel.util.parameter_util import ( + BaseDynamicOptions, + OptionValue, + RefreshOptionRequest, +) from dbgpt.core.interface.serialization import Serializable from .exceptions import FlowMetadataException, FlowParameterMetadataException @@ -486,6 +490,25 @@ class Parameter(TypeMetadata, Serializable): dict_value["ui"] = self.ui.to_dict() return dict_value + def refresh(self, request: Optional[RefreshOptionRequest] = None) -> Dict: + """Refresh the options of the parameter. + + Args: + request (RefreshOptionRequest): The request to refresh the options. + + Returns: + Dict: The response. + """ + dict_value = self.to_dict() + if not self.options: + dict_value["options"] = None + elif isinstance(self.options, BaseDynamicOptions): + values = self.options.refresh(request) + dict_value["options"] = [value.to_dict() for value in values] + else: + dict_value["options"] = [value.to_dict() for value in self.options] + return dict_value + def get_dict_options(self) -> Optional[List[Dict]]: """Get the options of the parameter.""" if not self.options: @@ -657,10 +680,10 @@ class BaseMetadata(BaseResource): ], ) - tags: Optional[List[str]] = Field( + tags: Optional[Dict[str, str]] = Field( default=None, description="The tags of the operator", - examples=[["llm", "openai", "gpt3"]], + examples=[{"order": "higher-order"}, {"order": "first-order"}], ) parameters: List[Parameter] = Field( @@ -770,6 +793,20 @@ class BaseMetadata(BaseResource): ] return dict_value + def refresh(self, request: List[RefreshOptionRequest]) -> Dict: + """Refresh the metadata.""" + name_to_request = {req.name: req for req in request} + parameter_requests = { + parameter.name: name_to_request.get(parameter.name) + for parameter in self.parameters + } + dict_value = self.to_dict() + dict_value["parameters"] = [ + parameter.refresh(parameter_requests.get(parameter.name)) + for parameter in self.parameters + ] + return dict_value + class ResourceMetadata(BaseMetadata, TypeMetadata): """The metadata of the resource.""" @@ -1053,6 +1090,15 @@ class FlowRegistry: """Get the metadata list.""" return [item.metadata.to_dict() for item in self._registry.values()] + def refresh( + self, key: str, is_operator: bool, request: List[RefreshOptionRequest] + ) -> Dict: + """Refresh the metadata.""" + if is_operator: + return _get_operator_class(key).metadata.refresh(request) # type: ignore + else: + return _get_resource_class(key).metadata.refresh(request) + _OPERATOR_REGISTRY: FlowRegistry = FlowRegistry() diff --git a/dbgpt/core/awel/flow/ui.py b/dbgpt/core/awel/flow/ui.py index ca4361276..91008269e 100644 --- a/dbgpt/core/awel/flow/ui.py +++ b/dbgpt/core/awel/flow/ui.py @@ -8,6 +8,7 @@ from dbgpt.core.interface.serialization import Serializable from .exceptions import FlowUIComponentException _UI_TYPE = Literal[ + "select", "cascader", "checkbox", "date_picker", @@ -102,6 +103,38 @@ class UIComponent(RefreshableMixin, Serializable, BaseModel): return model_to_dict(self) +class UISelect(UIComponent): + """Select component.""" + + class UIAttribute(UIComponent.UIAttribute): + """Select attribute.""" + + show_search: bool = Field( + False, + description="Whether to show search input", + ) + mode: Optional[Literal["tags"]] = Field( + None, + description="The mode of the select", + ) + placement: Optional[ + Literal["topLeft", "topRight", "bottomLeft", "bottomRight"] + ] = Field( + None, + description="The position of the picker panel, None means bottomLeft", + ) + + ui_type: Literal["select"] = Field("select", frozen=True) + attr: Optional[UIAttribute] = Field( + None, + description="The attributes of the component", + ) + + def check_parameter(self, parameter_dict: Dict[str, Any]): + """Check parameter.""" + self._check_options(parameter_dict.get("options", {})) + + class UICascader(UIComponent): """Cascader component.""" diff --git a/dbgpt/core/awel/util/parameter_util.py b/dbgpt/core/awel/util/parameter_util.py index 70015c9ba..2393aed89 100644 --- a/dbgpt/core/awel/util/parameter_util.py +++ b/dbgpt/core/awel/util/parameter_util.py @@ -10,6 +10,27 @@ from dbgpt.core.interface.serialization import Serializable _DEFAULT_DYNAMIC_REGISTRY = {} +class RefreshOptionDependency(BaseModel): + """The refresh dependency.""" + + name: str = Field(..., description="The name of the refresh dependency") + value: Optional[Any] = Field( + None, description="The value of the refresh dependency" + ) + has_value: bool = Field( + False, description="Whether the refresh dependency has value" + ) + + +class RefreshOptionRequest(BaseModel): + """The refresh option request.""" + + name: str = Field(..., description="The name of parameter to refresh") + depends: Optional[List[RefreshOptionDependency]] = Field( + None, description="The depends of the refresh config" + ) + + class OptionValue(Serializable, BaseModel): """The option value of the parameter.""" @@ -28,24 +49,31 @@ class OptionValue(Serializable, BaseModel): class BaseDynamicOptions(Serializable, BaseModel, ABC): """The base dynamic options.""" - @abstractmethod def option_values(self) -> List[OptionValue]: """Return the option values of the parameter.""" + return self.refresh(None) + + @abstractmethod + def refresh(self, request: Optional[RefreshOptionRequest]) -> List[OptionValue]: + """Refresh the dynamic options.""" class FunctionDynamicOptions(BaseDynamicOptions): """The function dynamic options.""" - func: Callable[[], List[OptionValue]] = Field( + func: Callable[..., List[OptionValue]] = Field( ..., description="The function to generate the dynamic options" ) func_id: str = Field( ..., description="The unique id of the function to generate the dynamic options" ) - def option_values(self) -> List[OptionValue]: - """Return the option values of the parameter.""" - return self.func() + def refresh(self, request: Optional[RefreshOptionRequest]) -> List[OptionValue]: + """Refresh the dynamic options.""" + if not request or not request.depends: + return self.func() + kwargs = {dep.name: dep.value for dep in request.depends if dep.has_value} + return self.func(**kwargs) @model_validator(mode="before") @classmethod diff --git a/dbgpt/serve/flow/api/endpoints.py b/dbgpt/serve/flow/api/endpoints.py index e2755cce1..c2c62b95f 100644 --- a/dbgpt/serve/flow/api/endpoints.py +++ b/dbgpt/serve/flow/api/endpoints.py @@ -12,7 +12,7 @@ from dbgpt.util import PaginationResult from ..config import APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..service.service import Service -from .schemas import ServeRequest, ServerResponse +from .schemas import RefreshNodeRequest, ServeRequest, ServerResponse router = APIRouter() @@ -252,6 +252,23 @@ async def get_nodes(): return Result.succ(metadata_list) +@router.post("/nodes/refresh", dependencies=[Depends(check_api_key)]) +async def refresh_nodes(refresh_request: RefreshNodeRequest): + """Refresh the operator or resource nodes + + Returns: + Result[None]: The response + """ + from dbgpt.core.awel.flow.base import _OPERATOR_REGISTRY + + new_metadata = _OPERATOR_REGISTRY.refresh( + key=refresh_request.id, + is_operator=refresh_request.flow_type == "operator", + request=refresh_request.refresh, + ) + return Result.succ(new_metadata) + + def init_endpoints(system_app: SystemApp) -> None: """Initialize the endpoints""" global global_system_app diff --git a/dbgpt/serve/flow/api/schemas.py b/dbgpt/serve/flow/api/schemas.py index 6fb8c1924..2daa8f581 100644 --- a/dbgpt/serve/flow/api/schemas.py +++ b/dbgpt/serve/flow/api/schemas.py @@ -1,7 +1,8 @@ -from dbgpt._private.pydantic import ConfigDict +from typing import List, Literal -# Define your Pydantic schemas here +from dbgpt._private.pydantic import BaseModel, ConfigDict, Field from dbgpt.core.awel.flow.flow_factory import FlowPanel +from dbgpt.core.awel.util.parameter_util import RefreshOptionRequest from ..config import SERVE_APP_NAME_HUMP @@ -14,3 +15,38 @@ class ServerResponse(FlowPanel): # TODO define your own fields here model_config = ConfigDict(title=f"ServerResponse for {SERVE_APP_NAME_HUMP}") + + +class RefreshNodeRequest(BaseModel): + """Flow response model""" + + model_config = ConfigDict(title=f"RefreshNodeRequest") + id: str = Field( + ..., + title="The id of the node", + description="The id of the node to refresh", + examples=["operator_llm_operator___$$___llm___$$___v1"], + ) + flow_type: Literal["operator", "resource"] = Field( + "operator", + title="The type of the node", + description="The type of the node to refresh", + examples=["operator", "resource"], + ) + type_name: str = Field( + ..., + title="The type of the node", + description="The type of the node to refresh", + examples=["LLMOperator"], + ) + type_cls: str = Field( + ..., + title="The class of the node", + description="The class of the node to refresh", + examples=["dbgpt.core.operator.llm.LLMOperator"], + ) + refresh: List[RefreshOptionRequest] = Field( + ..., + title="The refresh options", + description="The refresh options", + ) diff --git a/examples/awel/awel_flow_ui_components.py b/examples/awel/awel_flow_ui_components.py index 2af3e2bf3..fc8d9a5c4 100644 --- a/examples/awel/awel_flow_ui_components.py +++ b/examples/awel/awel_flow_ui_components.py @@ -5,6 +5,7 @@ from typing import List, Optional from dbgpt.core.awel import MapOperator from dbgpt.core.awel.flow import ( + FunctionDynamicOptions, IOField, OperatorCategory, OptionValue, @@ -16,6 +17,59 @@ from dbgpt.core.awel.flow import ( logger = logging.getLogger(__name__) +class ExampleFlowSelectOperator(MapOperator[str, str]): + """An example flow operator that includes a select as parameter.""" + + metadata = ViewMetadata( + label="Example Flow Select", + name="example_flow_select", + category=OperatorCategory.EXAMPLE, + description="An example flow operator that includes a select as parameter.", + parameters=[ + Parameter.build_from( + "Fruits Selector", + "fruits", + type=str, + optional=True, + default=None, + placeholder="Select the fruits", + description="The fruits you like.", + options=[ + OptionValue(label="Apple", name="apple", value="apple"), + OptionValue(label="Banana", name="banana", value="banana"), + OptionValue(label="Orange", name="orange", value="orange"), + OptionValue(label="Pear", name="pear", value="pear"), + ], + ui=ui.UISelect(attr=ui.UISelect.UIAttribute(show_search=True)), + ) + ], + inputs=[ + IOField.build_from( + "User Name", + "user_name", + str, + description="The name of the user.", + ) + ], + outputs=[ + IOField.build_from( + "Fruits", + "fruits", + str, + description="User's favorite fruits.", + ) + ], + ) + + def __init__(self, fruits: Optional[str] = None, **kwargs): + super().__init__(**kwargs) + self.fruits = fruits + + async def map(self, user_name: str) -> str: + """Map the user name to the fruits.""" + return "Your name is %s, and you like %s." % (user_name, self.fruits) + + class ExampleFlowCascaderOperator(MapOperator[str, str]): """An example flow operator that includes a cascader as parameter.""" @@ -581,3 +635,85 @@ class ExampleFlowTreeSelectOperator(MapOperator[str, str]): user_name, full_address_str, ) + + +def get_recent_3_times(time_interval: int = 1) -> List[OptionValue]: + """Get the recent times.""" + from datetime import datetime, timedelta + + now = datetime.now() + recent_times = [now - timedelta(hours=time_interval * i) for i in range(3)] + formatted_times = [time.strftime("%Y-%m-%d %H:%M:%S") for time in recent_times] + option_values = [ + OptionValue(label=formatted_time, name=f"time_{i + 1}", value=formatted_time) + for i, formatted_time in enumerate(formatted_times) + ] + + return option_values + + +class ExampleFlowRefreshOperator(MapOperator[str, str]): + """An example flow operator that includes a refresh option.""" + + metadata = ViewMetadata( + label="Example Refresh Operator", + name="example_refresh_operator", + category=OperatorCategory.EXAMPLE, + description="An example flow operator that includes a refresh option.", + parameters=[ + Parameter.build_from( + "Time Interval", + "time_interval", + type=int, + optional=True, + default=1, + placeholder="Set the time interval", + description="The time interval to fetch the times", + ), + Parameter.build_from( + "Recent Time", + "recent_time", + type=str, + optional=True, + default=None, + placeholder="Select the recent time", + description="The recent time to choose.", + options=FunctionDynamicOptions(func=get_recent_3_times), + ui=ui.UISelect( + refresh=True, + refresh_depends=["time_interval"], + attr=ui.UISelect.UIAttribute(show_search=True), + ), + ), + ], + inputs=[ + IOField.build_from( + "User Name", + "user_name", + str, + description="The name of the user.", + ) + ], + outputs=[ + IOField.build_from( + "Time", + "time", + str, + description="User's selected time.", + ) + ], + ) + + def __init__( + self, time_interval: int = 1, recent_time: Optional[str] = None, **kwargs + ): + super().__init__(**kwargs) + self.time_interval = time_interval + self.recent_time = recent_time + + async def map(self, user_name: str) -> str: + """Map the user name to the time.""" + return "Your name is %s, and you choose the time %s." % ( + user_name, + self.recent_time, + )