mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-22 20:01:46 +00:00
301 lines
9.3 KiB
Python
301 lines
9.3 KiB
Python
"""this module contains the flow client functions."""
|
|
|
|
from typing import Any, Callable, Dict, List
|
|
|
|
from httpx import AsyncClient
|
|
|
|
from dbgpt.core.awel.flow.flow_factory import FlowPanel
|
|
from dbgpt.core.schema.api import Result
|
|
|
|
from .client import Client, ClientException
|
|
|
|
|
|
async def create_flow(client: Client, flow: FlowPanel) -> FlowPanel:
|
|
"""Create a new flow.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
flow (FlowPanel): The flow panel.
|
|
"""
|
|
try:
|
|
res = await client.get("/awel/flows", flow.to_dict())
|
|
result: Result = res.json()
|
|
if result["success"]:
|
|
return FlowPanel(**result["data"])
|
|
else:
|
|
raise ClientException(status=result["err_code"], reason=result)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to create flow: {e}")
|
|
|
|
|
|
async def update_flow(client: Client, flow: FlowPanel) -> FlowPanel:
|
|
"""Update a flow.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
flow (FlowPanel): The flow panel.
|
|
Returns:
|
|
FlowPanel: The flow panel.
|
|
Raises:
|
|
ClientException: If the request failed.
|
|
"""
|
|
try:
|
|
res = await client.put("/awel/flows", flow.to_dict())
|
|
result: Result = res.json()
|
|
if result["success"]:
|
|
return FlowPanel(**result["data"])
|
|
else:
|
|
raise ClientException(status=result["err_code"], reason=result)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to update flow: {e}")
|
|
|
|
|
|
async def delete_flow(client: Client, flow_id: str) -> FlowPanel:
|
|
"""
|
|
Delete a flow.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
flow_id (str): The flow id.
|
|
Returns:
|
|
FlowPanel: The flow panel.
|
|
Raises:
|
|
ClientException: If the request failed.
|
|
"""
|
|
try:
|
|
res = await client.delete("/awel/flows/" + flow_id)
|
|
result: Result = res.json()
|
|
if result["success"]:
|
|
return FlowPanel(**result["data"])
|
|
else:
|
|
raise ClientException(status=result["err_code"], reason=result)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to delete flow: {e}")
|
|
|
|
|
|
async def get_flow(client: Client, flow_id: str) -> FlowPanel:
|
|
"""
|
|
Get a flow.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
flow_id (str): The flow id.
|
|
Returns:
|
|
FlowPanel: The flow panel.
|
|
Raises:
|
|
ClientException: If the request failed.
|
|
"""
|
|
try:
|
|
res = await client.get("/awel/flows/" + flow_id)
|
|
result: Result = res.json()
|
|
if result["success"]:
|
|
return FlowPanel(**result["data"])
|
|
else:
|
|
raise ClientException(status=result["err_code"], reason=result)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to get flow: {e}")
|
|
|
|
|
|
async def list_flow(
|
|
client: Client, name: str | None = None, uid: str | None = None
|
|
) -> List[FlowPanel]:
|
|
"""
|
|
List flows.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
name (str): The name of the flow.
|
|
uid (str): The uid of the flow.
|
|
Returns:
|
|
List[FlowPanel]: The list of flow panels.
|
|
Raises:
|
|
ClientException: If the request failed.
|
|
"""
|
|
try:
|
|
res = await client.get("/awel/flows", **{"name": name, "uid": uid})
|
|
result: Result = res.json()
|
|
if result["success"]:
|
|
return [FlowPanel(**flow) for flow in result["data"]["items"]]
|
|
else:
|
|
raise ClientException(status=result["err_code"], reason=result)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to list flows: {e}")
|
|
|
|
|
|
async def run_flow_cmd(
|
|
client: Client,
|
|
name: str | None = None,
|
|
uid: str | None = None,
|
|
data: Dict[str, Any] | None = None,
|
|
non_streaming_callback: Callable[[str], None] | None = None,
|
|
streaming_callback: Callable[[str], None] | None = None,
|
|
) -> None:
|
|
"""
|
|
Run flows.
|
|
|
|
Args:
|
|
client (Client): The dbgpt client.
|
|
name (str): The name of the flow.
|
|
uid (str): The uid of the flow.
|
|
data (Dict[str, Any]): The data to run the flow.
|
|
non_streaming_callback (Callable[[str], None]): The non-streaming callback.
|
|
streaming_callback (Callable[[str], None]): The streaming callback.
|
|
Returns:
|
|
List[FlowPanel]: The list of flow panels.
|
|
Raises:
|
|
ClientException: If the request failed.
|
|
"""
|
|
try:
|
|
res = await client.get("/awel/flows", **{"name": name, "uid": uid})
|
|
result: Result = res.json()
|
|
if not result["success"]:
|
|
raise ClientException("Flow not found with the given name or uid")
|
|
flows = result["data"]["items"]
|
|
if not flows:
|
|
raise ClientException("Flow not found with the given name or uid")
|
|
if len(flows) > 1:
|
|
raise ClientException("More than one flow found")
|
|
flow = flows[0]
|
|
flow_panel = FlowPanel(**flow)
|
|
metadata = flow.get("metadata")
|
|
await _run_flow_trigger(
|
|
client,
|
|
flow_panel,
|
|
metadata,
|
|
data,
|
|
non_streaming_callback=non_streaming_callback,
|
|
streaming_callback=streaming_callback,
|
|
)
|
|
except Exception as e:
|
|
raise ClientException(f"Failed to run flows: {e}")
|
|
|
|
|
|
async def _run_flow_trigger(
|
|
client: Client,
|
|
flow: FlowPanel,
|
|
metadata: Dict[str, Any] | None = None,
|
|
data: Dict[str, Any] | None = None,
|
|
non_streaming_callback: Callable[[str], None] | None = None,
|
|
streaming_callback: Callable[[str], None] | None = None,
|
|
):
|
|
if not metadata:
|
|
raise ClientException("No AWEL flow metadata found")
|
|
if "triggers" not in metadata:
|
|
raise ClientException("No triggers found in AWEL flow metadata")
|
|
triggers = metadata["triggers"]
|
|
if len(triggers) > 1:
|
|
raise ClientException("More than one trigger found")
|
|
trigger = triggers[0]
|
|
sse_output = metadata.get("sse_output", False)
|
|
streaming_output = metadata.get("streaming_output", False)
|
|
trigger_type = trigger["trigger_type"]
|
|
if trigger_type == "http":
|
|
methods = trigger["methods"]
|
|
if not methods:
|
|
method = "GET"
|
|
else:
|
|
method = methods[0]
|
|
path = trigger["path"]
|
|
base_url = client._base_url()
|
|
req_url = f"{base_url}{path}"
|
|
if streaming_output:
|
|
await _call_stream_request(
|
|
client._http_client,
|
|
method,
|
|
req_url,
|
|
sse_output,
|
|
data,
|
|
streaming_callback,
|
|
)
|
|
elif non_streaming_callback:
|
|
await _call_non_stream_request(
|
|
client._http_client, method, req_url, data, non_streaming_callback
|
|
)
|
|
else:
|
|
raise ClientException(f"Invalid trigger type: {trigger_type}")
|
|
|
|
|
|
async def _call_non_stream_request(
|
|
http_client: AsyncClient,
|
|
method: str,
|
|
base_url: str,
|
|
data: Dict[str, Any] | None = None,
|
|
non_streaming_callback: Callable[[str], None] | None = None,
|
|
):
|
|
import httpx
|
|
|
|
kwargs: Dict[str, Any] = {"url": base_url, "method": method}
|
|
if method in ["POST", "PUT"]:
|
|
kwargs["json"] = data
|
|
else:
|
|
kwargs["params"] = data
|
|
response = await http_client.request(**kwargs)
|
|
bytes_response_content = await response.aread()
|
|
if response.status_code != 200:
|
|
str_error_message = ""
|
|
error_message = await response.aread()
|
|
if error_message:
|
|
str_error_message = error_message.decode("utf-8")
|
|
raise httpx.RequestError(
|
|
f"Request failed with status {response.status_code}, error_message: "
|
|
f"{str_error_message}",
|
|
request=response.request,
|
|
)
|
|
response_content = bytes_response_content.decode("utf-8")
|
|
if non_streaming_callback:
|
|
non_streaming_callback(response_content)
|
|
return response_content
|
|
|
|
|
|
async def _call_stream_request(
|
|
http_client: AsyncClient,
|
|
method: str,
|
|
base_url: str,
|
|
sse_output: bool,
|
|
data: Dict[str, Any] | None = None,
|
|
streaming_callback: Callable[[str], None] | None = None,
|
|
):
|
|
full_out = ""
|
|
async for out in _stream_request(http_client, method, base_url, sse_output, data):
|
|
if streaming_callback:
|
|
streaming_callback(out)
|
|
full_out += out
|
|
return full_out
|
|
|
|
|
|
async def _stream_request(
|
|
http_client: AsyncClient,
|
|
method: str,
|
|
base_url: str,
|
|
sse_output: bool,
|
|
data: Dict[str, Any] | None = None,
|
|
):
|
|
import json
|
|
|
|
from dbgpt.core.awel.util.chat_util import parse_openai_output
|
|
|
|
kwargs: Dict[str, Any] = {"url": base_url, "method": method}
|
|
if method in ["POST", "PUT"]:
|
|
kwargs["json"] = data
|
|
else:
|
|
kwargs["params"] = data
|
|
|
|
async with http_client.stream(**kwargs) as response:
|
|
if response.status_code == 200:
|
|
async for line in response.aiter_lines():
|
|
if not line:
|
|
continue
|
|
if sse_output:
|
|
out = parse_openai_output(line)
|
|
if not out.success:
|
|
raise ClientException(f"Failed to parse output: {out.text}")
|
|
yield out.text
|
|
else:
|
|
yield line
|
|
else:
|
|
try:
|
|
error = await response.aread()
|
|
yield json.loads(error)
|
|
except Exception as e:
|
|
raise e
|