feat(core): Fetch flow nodes API supports filterd by tags

This commit is contained in:
Fangyin Cheng
2024-08-11 22:07:21 +08:00
parent 94ef5da873
commit abf1c78748
4 changed files with 98 additions and 6 deletions

View File

@@ -1145,9 +1145,40 @@ class FlowRegistry:
"""Get the registry item by the key."""
return self._registry.get(key)
def metadata_list(self):
"""Get the metadata list."""
return [item.metadata.to_dict() for item in self._registry.values()]
def metadata_list(
self,
tags: Optional[Dict[str, str]] = None,
user_name: Optional[str] = None,
sys_code: Optional[str] = None,
) -> List[Dict]:
"""Get the metadata list.
TODO: Support the user and system code filter.
Args:
tags (Optional[Dict[str, str]], optional): The tags. Defaults to None.
user_name (Optional[str], optional): The user name. Defaults to None.
sys_code (Optional[str], optional): The system code. Defaults to None.
Returns:
List[Dict]: The metadata list.
"""
if not tags:
return [item.metadata.to_dict() for item in self._registry.values()]
else:
results = []
for item in self._registry.values():
node_tags = item.metadata.tags
is_match = True
if not node_tags or not isinstance(node_tags, dict):
continue
for k, v in tags.items():
if node_tags.get(k) != v:
is_match = False
break
if is_match:
results.append(item.metadata.to_dict())
return results
async def refresh(
self,

View File

@@ -796,6 +796,8 @@ def is_variable_string(variable_str: str) -> bool:
Returns:
bool: True if the string is a variable string, False otherwise.
"""
if not variable_str or not isinstance(variable_str, str):
return False
if not _is_variable_format(variable_str):
return False
try:

View File

@@ -1,5 +1,6 @@
import json
from functools import cache
from typing import List, Literal, Optional, Union
from typing import Dict, List, Literal, Optional, Union
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer
@@ -229,16 +230,38 @@ async def query_page(
@router.get("/nodes", dependencies=[Depends(check_api_key)])
async def get_nodes():
async def get_nodes(
user_name: Optional[str] = Query(default=None, description="user name"),
sys_code: Optional[str] = Query(default=None, description="system code"),
tags: Optional[str] = Query(default=None, description="tags"),
):
"""Get the operator or resource nodes
Args:
user_name (Optional[str]): The username
sys_code (Optional[str]): The system code
tags (Optional[str]): The tags encoded in JSON format
Returns:
Result[List[Union[ViewMetadata, ResourceMetadata]]]:
The operator or resource nodes
"""
from dbgpt.core.awel.flow.base import _OPERATOR_REGISTRY
metadata_list = _OPERATOR_REGISTRY.metadata_list()
tags_dict: Optional[Dict[str, str]] = None
if tags:
try:
tags_dict = json.loads(tags)
except json.JSONDecodeError:
return Result.fail("Invalid JSON format for tags")
metadata_list = await blocking_func_to_async(
global_system_app,
_OPERATOR_REGISTRY.metadata_list,
tags_dict,
user_name,
sys_code,
)
return Result.succ(metadata_list)

View File

@@ -881,3 +881,39 @@ class ExampleFlowVariablesOperator(MapOperator[str, str]):
}
json_data = json.dumps(dict_dict, ensure_ascii=False)
return "Your name is %s, and your model info is %s." % (user_name, json_data)
class ExampleFlowTagsOperator(MapOperator[str, str]):
"""An example flow operator that includes a tags option."""
metadata = ViewMetadata(
label="Example Tags Operator",
name="example_tags_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a tags",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Tags",
"tags",
str,
description="The tags to use.",
),
],
tags={"order": "higher-order", "type": "example"},
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, user_name: str) -> str:
"""Map the user name to the tags."""
return "Your name is %s, and your tags are %s." % (user_name, "higher-order")