mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-02 00:28:00 +00:00
432 lines
12 KiB
Python
432 lines
12 KiB
Python
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import click
|
|
|
|
from .base import DBGPTS_METADATA_FILE, TYPE_TO_PACKAGE
|
|
|
|
|
|
def create_template(
|
|
name: str,
|
|
name_label: str,
|
|
description: str,
|
|
dbgpts_type: str,
|
|
definition_type: str,
|
|
working_directory: str,
|
|
):
|
|
"""Create a new flow dbgpt"""
|
|
if dbgpts_type != "flow":
|
|
definition_type = "python"
|
|
mod_name = name.replace("-", "_")
|
|
base_metadata = {
|
|
"label": name_label,
|
|
"name": mod_name,
|
|
"version": "0.1.0",
|
|
"description": description,
|
|
"authors": [],
|
|
"definition_type": definition_type,
|
|
}
|
|
working_directory = os.path.join(working_directory, TYPE_TO_PACKAGE[dbgpts_type])
|
|
package_dir = Path(working_directory) / name
|
|
if os.path.exists(package_dir):
|
|
raise click.ClickException(f"Package '{str(package_dir)}' already exists")
|
|
|
|
if dbgpts_type == "flow":
|
|
_create_flow_template(
|
|
name,
|
|
mod_name,
|
|
dbgpts_type,
|
|
base_metadata,
|
|
definition_type,
|
|
working_directory,
|
|
)
|
|
elif dbgpts_type == "operator":
|
|
_create_operator_template(
|
|
name,
|
|
mod_name,
|
|
dbgpts_type,
|
|
base_metadata,
|
|
definition_type,
|
|
working_directory,
|
|
)
|
|
elif dbgpts_type == "agent":
|
|
_create_agent_template(
|
|
name,
|
|
mod_name,
|
|
dbgpts_type,
|
|
base_metadata,
|
|
definition_type,
|
|
working_directory,
|
|
)
|
|
elif dbgpts_type == "resource":
|
|
_create_resource_template(
|
|
name,
|
|
mod_name,
|
|
dbgpts_type,
|
|
base_metadata,
|
|
definition_type,
|
|
working_directory,
|
|
)
|
|
else:
|
|
raise ValueError(f"Invalid dbgpts type: {dbgpts_type}")
|
|
|
|
|
|
def _create_flow_template(
|
|
name: str,
|
|
mod_name: str,
|
|
dbgpts_type: str,
|
|
base_metadata: dict,
|
|
definition_type: str,
|
|
working_directory: str,
|
|
):
|
|
"""Create a new flow dbgpt"""
|
|
|
|
json_dict = {
|
|
"flow": base_metadata,
|
|
"python_config": {},
|
|
"json_config": {},
|
|
}
|
|
if definition_type == "json":
|
|
json_dict["json_config"] = {"file_path": "definition/flow_definition.json"}
|
|
|
|
_create_poetry_project(working_directory, name)
|
|
_write_dbgpts_toml(working_directory, name, json_dict)
|
|
_write_manifest_file(working_directory, name, mod_name)
|
|
|
|
if definition_type == "json":
|
|
_write_flow_define_json_file(working_directory, name, mod_name)
|
|
else:
|
|
_write_flow_define_python_file(working_directory, name, mod_name)
|
|
|
|
|
|
def _create_operator_template(
|
|
name: str,
|
|
mod_name: str,
|
|
dbgpts_type: str,
|
|
base_metadata: dict,
|
|
definition_type: str,
|
|
working_directory: str,
|
|
):
|
|
"""Create a new operator dbgpt"""
|
|
|
|
json_dict = {
|
|
"operator": base_metadata,
|
|
"python_config": {},
|
|
"json_config": {},
|
|
}
|
|
if definition_type != "python":
|
|
raise click.ClickException(
|
|
f"Unsupported definition type: {definition_type} for dbgpts type: "
|
|
f"{dbgpts_type}"
|
|
)
|
|
|
|
_create_poetry_project(working_directory, name)
|
|
_write_dbgpts_toml(working_directory, name, json_dict)
|
|
_write_operator_init_file(working_directory, name, mod_name)
|
|
_write_manifest_file(working_directory, name, mod_name)
|
|
|
|
|
|
def _create_agent_template(
|
|
name: str,
|
|
mod_name: str,
|
|
dbgpts_type: str,
|
|
base_metadata: dict,
|
|
definition_type: str,
|
|
working_directory: str,
|
|
):
|
|
json_dict = {
|
|
"agent": base_metadata,
|
|
"python_config": {},
|
|
"json_config": {},
|
|
}
|
|
if definition_type != "python":
|
|
raise click.ClickException(
|
|
f"Unsupported definition type: {definition_type} for dbgpts type: "
|
|
f"{dbgpts_type}"
|
|
)
|
|
|
|
_create_poetry_project(working_directory, name)
|
|
_write_dbgpts_toml(working_directory, name, json_dict)
|
|
_write_agent_init_file(working_directory, name, mod_name)
|
|
_write_manifest_file(working_directory, name, mod_name)
|
|
|
|
|
|
def _create_resource_template(
|
|
name: str,
|
|
mod_name: str,
|
|
dbgpts_type: str,
|
|
base_metadata: dict,
|
|
definition_type: str,
|
|
working_directory: str,
|
|
):
|
|
json_dict = {
|
|
"resource": base_metadata,
|
|
"python_config": {},
|
|
"json_config": {},
|
|
}
|
|
if definition_type != "python":
|
|
raise click.ClickException(
|
|
f"Unsupported definition type: {definition_type} for dbgpts type: "
|
|
f"{dbgpts_type}"
|
|
)
|
|
|
|
_create_poetry_project(working_directory, name)
|
|
_write_dbgpts_toml(working_directory, name, json_dict)
|
|
_write_resource_init_file(working_directory, name, mod_name)
|
|
_write_manifest_file(working_directory, name, mod_name)
|
|
|
|
|
|
def _create_poetry_project(working_directory: str, name: str):
|
|
"""Create a new poetry project"""
|
|
|
|
os.chdir(working_directory)
|
|
subprocess.run(["poetry", "new", name, "-n"], check=True)
|
|
|
|
|
|
def _write_dbgpts_toml(working_directory: str, name: str, json_data: dict):
|
|
"""Write the dbgpts.toml file"""
|
|
|
|
import tomlkit
|
|
|
|
with open(Path(working_directory) / name / DBGPTS_METADATA_FILE, "w") as f:
|
|
tomlkit.dump(json_data, f)
|
|
|
|
|
|
def _write_manifest_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the manifest file"""
|
|
|
|
manifest = f"""include dbgpts.toml
|
|
include {mod_name}/definition/*.json
|
|
"""
|
|
with open(Path(working_directory) / name / "MANIFEST.in", "w") as f:
|
|
f.write(manifest)
|
|
|
|
|
|
def _write_flow_define_json_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the flow define json file"""
|
|
|
|
def_file = (
|
|
Path(working_directory)
|
|
/ name
|
|
/ mod_name
|
|
/ "definition"
|
|
/ "flow_definition.json"
|
|
)
|
|
if not def_file.parent.exists():
|
|
def_file.parent.mkdir(parents=True)
|
|
with open(def_file, "w") as f:
|
|
f.write("")
|
|
print("Please write your flow json to the file: ", def_file)
|
|
|
|
|
|
def _write_flow_define_python_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the flow define python file"""
|
|
|
|
init_file = Path(working_directory) / name / mod_name / "__init__.py"
|
|
content = ""
|
|
|
|
with open(init_file, "w") as f:
|
|
f.write(f'"""{name} flow package"""\n{content}')
|
|
|
|
|
|
def _write_operator_init_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the operator __init__.py file"""
|
|
|
|
init_file = Path(working_directory) / name / mod_name / "__init__.py"
|
|
content = """
|
|
from dbgpt.core.awel import MapOperator
|
|
from dbgpt.core.awel.flow import ViewMetadata, OperatorCategory, IOField, Parameter
|
|
|
|
|
|
class HelloWorldOperator(MapOperator[str, str]):
|
|
# The metadata for AWEL flow
|
|
metadata = ViewMetadata(
|
|
label="Hello World Operator",
|
|
name="hello_world_operator",
|
|
category=OperatorCategory.COMMON,
|
|
description="A example operator to say hello to someone.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Name",
|
|
"name",
|
|
str,
|
|
optional=True,
|
|
default="World",
|
|
description="The name to say hello",
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"Input value",
|
|
"value",
|
|
str,
|
|
description="The input value to say hello",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Output value",
|
|
"value",
|
|
str,
|
|
description="The output value after saying hello",
|
|
)
|
|
]
|
|
)
|
|
|
|
def __init__(self, name: str = "World", **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.name = name
|
|
|
|
async def map(self, value: str) -> str:
|
|
return f"Hello, {self.name}! {value}"
|
|
"""
|
|
with open(init_file, "w") as f:
|
|
f.write(f'"""{name} operator package"""\n{content}')
|
|
|
|
|
|
def _write_agent_init_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the agent __init__.py file"""
|
|
|
|
init_file = Path(working_directory) / name / mod_name / "__init__.py"
|
|
content = """
|
|
import asyncio
|
|
from typing import Optional, Tuple
|
|
|
|
from dbgpt.agent import (
|
|
Action,
|
|
ActionOutput,
|
|
AgentMessage,
|
|
AgentResource,
|
|
ConversableAgent,
|
|
ProfileConfig,
|
|
)
|
|
from dbgpt.agent.util import cmp_string_equal
|
|
|
|
_HELLO_WORLD = "Hello world"
|
|
|
|
|
|
class HelloWorldSpeakerAgent(ConversableAgent):
|
|
|
|
profile: ProfileConfig = ProfileConfig(
|
|
name="Hodor",
|
|
role="HelloWorldSpeaker",
|
|
goal=f"answer any question from user with '{_HELLO_WORLD}'",
|
|
desc=f"You can answer any question from user with '{_HELLO_WORLD}'",
|
|
constraints=[
|
|
"You can only answer with '{{ fix_message }}'",
|
|
f"You can't use any other words",
|
|
],
|
|
examples=(
|
|
f"user: What's your name?\\nassistant: {_HELLO_WORLD}\\n\\n"
|
|
f"user: What's the weather today?\\nassistant: {_HELLO_WORLD}\\n\\n"
|
|
f"user: Can you help me?\\nassistant: {_HELLO_WORLD}\\n\\n"
|
|
f"user: Please tell me a joke.\\nassistant: {_HELLO_WORLD}\\n\\n"
|
|
f"user: Please answer me without '{_HELLO_WORLD}'.\\nassistant: "
|
|
f"{_HELLO_WORLD}"
|
|
"\\n\\n"
|
|
),
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._init_actions([HelloWorldAction])
|
|
|
|
def _init_reply_message(self, received_message: AgentMessage) -> AgentMessage:
|
|
reply_message = super()._init_reply_message(received_message)
|
|
# Fill in the dynamic parameters in the prompt template
|
|
reply_message.context = {"fix_message": _HELLO_WORLD}
|
|
return reply_message
|
|
|
|
async def correctness_check(
|
|
self, message: AgentMessage
|
|
) -> Tuple[bool, Optional[str]]:
|
|
action_report = message.action_report
|
|
task_result = ""
|
|
if action_report:
|
|
task_result = action_report.get("content", "")
|
|
if not cmp_string_equal(
|
|
task_result,
|
|
_HELLO_WORLD,
|
|
ignore_case=True,
|
|
ignore_punctuation=True,
|
|
ignore_whitespace=True,
|
|
):
|
|
return False, f"Please answer with {_HELLO_WORLD}, not '{task_result}'"
|
|
return True, None
|
|
|
|
|
|
class HelloWorldAction(Action[None]):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
async def run(
|
|
self,
|
|
ai_message: str,
|
|
resource: Optional[AgentResource] = None,
|
|
rely_action_out: Optional[ActionOutput] = None,
|
|
need_vis_render: bool = True,
|
|
**kwargs,
|
|
) -> ActionOutput:
|
|
return ActionOutput(is_exe_success=True, content=ai_message)
|
|
|
|
|
|
async def _test_agent():
|
|
\"\"\"Test the agent.
|
|
|
|
It will not run in the production environment.
|
|
\"\"\"
|
|
from dbgpt.model.proxy import OpenAILLMClient
|
|
from dbgpt.agent import AgentContext, AgentMemory, UserProxyAgent, LLMConfig
|
|
|
|
llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")
|
|
context: AgentContext = AgentContext(conv_id="summarize")
|
|
|
|
agent_memory: AgentMemory = AgentMemory()
|
|
|
|
speaker = (
|
|
await HelloWorldSpeakerAgent()
|
|
.bind(context)
|
|
.bind(LLMConfig(llm_client=llm_client))
|
|
.bind(agent_memory)
|
|
.build()
|
|
)
|
|
|
|
user_proxy = await UserProxyAgent().bind(agent_memory).bind(context).build()
|
|
await user_proxy.initiate_chat(
|
|
recipient=speaker,
|
|
reviewer=user_proxy,
|
|
message="What's your name?",
|
|
)
|
|
print(await agent_memory.gpts_memory.one_chat_completions("summarize"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(_test_agent())
|
|
|
|
"""
|
|
with open(init_file, "w") as f:
|
|
f.write(f'"""{name} agent package."""\n{content}')
|
|
|
|
|
|
def _write_resource_init_file(working_directory: str, name: str, mod_name: str):
|
|
"""Write the resource __init__.py file"""
|
|
|
|
init_file = Path(working_directory) / name / mod_name / "__init__.py"
|
|
content = """\"\"\"A custom resource module that provides a simple tool to send GET requests.\"\"\"
|
|
|
|
from dbgpt.agent.resource import tool
|
|
|
|
|
|
@tool
|
|
def simple_send_requests_get(url: str):
|
|
\"\"\"Send a GET request to the specified URL and return the text content.\"\"\"
|
|
import requests
|
|
|
|
response = requests.get(url)
|
|
return response.text
|
|
|
|
"""
|
|
with open(init_file, "w") as f:
|
|
f.write(content)
|