DB-GPT/dbgpt/util/dbgpts/template.py
2024-05-30 18:51:57 +08:00

432 lines
12 KiB
Python

import os
import subprocess
from pathlib import Path
import click
from .base import DBGPTS_METADATA_FILE, TYPE_TO_PACKAGE
def create_template(
name: str,
name_label: str,
description: str,
dbgpts_type: str,
definition_type: str,
working_directory: str,
):
"""Create a new flow dbgpt"""
if dbgpts_type != "flow":
definition_type = "python"
mod_name = name.replace("-", "_")
base_metadata = {
"label": name_label,
"name": mod_name,
"version": "0.1.0",
"description": description,
"authors": [],
"definition_type": definition_type,
}
working_directory = os.path.join(working_directory, TYPE_TO_PACKAGE[dbgpts_type])
package_dir = Path(working_directory) / name
if os.path.exists(package_dir):
raise click.ClickException(f"Package '{str(package_dir)}' already exists")
if dbgpts_type == "flow":
_create_flow_template(
name,
mod_name,
dbgpts_type,
base_metadata,
definition_type,
working_directory,
)
elif dbgpts_type == "operator":
_create_operator_template(
name,
mod_name,
dbgpts_type,
base_metadata,
definition_type,
working_directory,
)
elif dbgpts_type == "agent":
_create_agent_template(
name,
mod_name,
dbgpts_type,
base_metadata,
definition_type,
working_directory,
)
elif dbgpts_type == "resource":
_create_resource_template(
name,
mod_name,
dbgpts_type,
base_metadata,
definition_type,
working_directory,
)
else:
raise ValueError(f"Invalid dbgpts type: {dbgpts_type}")
def _create_flow_template(
name: str,
mod_name: str,
dbgpts_type: str,
base_metadata: dict,
definition_type: str,
working_directory: str,
):
"""Create a new flow dbgpt"""
json_dict = {
"flow": base_metadata,
"python_config": {},
"json_config": {},
}
if definition_type == "json":
json_dict["json_config"] = {"file_path": "definition/flow_definition.json"}
_create_poetry_project(working_directory, name)
_write_dbgpts_toml(working_directory, name, json_dict)
_write_manifest_file(working_directory, name, mod_name)
if definition_type == "json":
_write_flow_define_json_file(working_directory, name, mod_name)
else:
_write_flow_define_python_file(working_directory, name, mod_name)
def _create_operator_template(
name: str,
mod_name: str,
dbgpts_type: str,
base_metadata: dict,
definition_type: str,
working_directory: str,
):
"""Create a new operator dbgpt"""
json_dict = {
"operator": base_metadata,
"python_config": {},
"json_config": {},
}
if definition_type != "python":
raise click.ClickException(
f"Unsupported definition type: {definition_type} for dbgpts type: "
f"{dbgpts_type}"
)
_create_poetry_project(working_directory, name)
_write_dbgpts_toml(working_directory, name, json_dict)
_write_operator_init_file(working_directory, name, mod_name)
_write_manifest_file(working_directory, name, mod_name)
def _create_agent_template(
name: str,
mod_name: str,
dbgpts_type: str,
base_metadata: dict,
definition_type: str,
working_directory: str,
):
json_dict = {
"agent": base_metadata,
"python_config": {},
"json_config": {},
}
if definition_type != "python":
raise click.ClickException(
f"Unsupported definition type: {definition_type} for dbgpts type: "
f"{dbgpts_type}"
)
_create_poetry_project(working_directory, name)
_write_dbgpts_toml(working_directory, name, json_dict)
_write_agent_init_file(working_directory, name, mod_name)
_write_manifest_file(working_directory, name, mod_name)
def _create_resource_template(
name: str,
mod_name: str,
dbgpts_type: str,
base_metadata: dict,
definition_type: str,
working_directory: str,
):
json_dict = {
"resource": base_metadata,
"python_config": {},
"json_config": {},
}
if definition_type != "python":
raise click.ClickException(
f"Unsupported definition type: {definition_type} for dbgpts type: "
f"{dbgpts_type}"
)
_create_poetry_project(working_directory, name)
_write_dbgpts_toml(working_directory, name, json_dict)
_write_resource_init_file(working_directory, name, mod_name)
_write_manifest_file(working_directory, name, mod_name)
def _create_poetry_project(working_directory: str, name: str):
"""Create a new poetry project"""
os.chdir(working_directory)
subprocess.run(["poetry", "new", name, "-n"], check=True)
def _write_dbgpts_toml(working_directory: str, name: str, json_data: dict):
"""Write the dbgpts.toml file"""
import tomlkit
with open(Path(working_directory) / name / DBGPTS_METADATA_FILE, "w") as f:
tomlkit.dump(json_data, f)
def _write_manifest_file(working_directory: str, name: str, mod_name: str):
"""Write the manifest file"""
manifest = f"""include dbgpts.toml
include {mod_name}/definition/*.json
"""
with open(Path(working_directory) / name / "MANIFEST.in", "w") as f:
f.write(manifest)
def _write_flow_define_json_file(working_directory: str, name: str, mod_name: str):
"""Write the flow define json file"""
def_file = (
Path(working_directory)
/ name
/ mod_name
/ "definition"
/ "flow_definition.json"
)
if not def_file.parent.exists():
def_file.parent.mkdir(parents=True)
with open(def_file, "w") as f:
f.write("")
print("Please write your flow json to the file: ", def_file)
def _write_flow_define_python_file(working_directory: str, name: str, mod_name: str):
"""Write the flow define python file"""
init_file = Path(working_directory) / name / mod_name / "__init__.py"
content = ""
with open(init_file, "w") as f:
f.write(f'"""{name} flow package"""\n{content}')
def _write_operator_init_file(working_directory: str, name: str, mod_name: str):
"""Write the operator __init__.py file"""
init_file = Path(working_directory) / name / mod_name / "__init__.py"
content = """
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import ViewMetadata, OperatorCategory, IOField, Parameter
class HelloWorldOperator(MapOperator[str, str]):
# The metadata for AWEL flow
metadata = ViewMetadata(
label="Hello World Operator",
name="hello_world_operator",
category=OperatorCategory.COMMON,
description="A example operator to say hello to someone.",
parameters=[
Parameter.build_from(
"Name",
"name",
str,
optional=True,
default="World",
description="The name to say hello",
)
],
inputs=[
IOField.build_from(
"Input value",
"value",
str,
description="The input value to say hello",
)
],
outputs=[
IOField.build_from(
"Output value",
"value",
str,
description="The output value after saying hello",
)
]
)
def __init__(self, name: str = "World", **kwargs):
super().__init__(**kwargs)
self.name = name
async def map(self, value: str) -> str:
return f"Hello, {self.name}! {value}"
"""
with open(init_file, "w") as f:
f.write(f'"""{name} operator package"""\n{content}')
def _write_agent_init_file(working_directory: str, name: str, mod_name: str):
"""Write the agent __init__.py file"""
init_file = Path(working_directory) / name / mod_name / "__init__.py"
content = """
import asyncio
from typing import Optional, Tuple
from dbgpt.agent import (
Action,
ActionOutput,
AgentMessage,
AgentResource,
ConversableAgent,
ProfileConfig,
)
from dbgpt.agent.util import cmp_string_equal
_HELLO_WORLD = "Hello world"
class HelloWorldSpeakerAgent(ConversableAgent):
profile: ProfileConfig = ProfileConfig(
name="Hodor",
role="HelloWorldSpeaker",
goal=f"answer any question from user with '{_HELLO_WORLD}'",
desc=f"You can answer any question from user with '{_HELLO_WORLD}'",
constraints=[
"You can only answer with '{{ fix_message }}'",
f"You can't use any other words",
],
examples=(
f"user: What's your name?\\nassistant: {_HELLO_WORLD}\\n\\n"
f"user: What's the weather today?\\nassistant: {_HELLO_WORLD}\\n\\n"
f"user: Can you help me?\\nassistant: {_HELLO_WORLD}\\n\\n"
f"user: Please tell me a joke.\\nassistant: {_HELLO_WORLD}\\n\\n"
f"user: Please answer me without '{_HELLO_WORLD}'.\\nassistant: "
f"{_HELLO_WORLD}"
"\\n\\n"
),
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._init_actions([HelloWorldAction])
def _init_reply_message(self, received_message: AgentMessage) -> AgentMessage:
reply_message = super()._init_reply_message(received_message)
# Fill in the dynamic parameters in the prompt template
reply_message.context = {"fix_message": _HELLO_WORLD}
return reply_message
async def correctness_check(
self, message: AgentMessage
) -> Tuple[bool, Optional[str]]:
action_report = message.action_report
task_result = ""
if action_report:
task_result = action_report.get("content", "")
if not cmp_string_equal(
task_result,
_HELLO_WORLD,
ignore_case=True,
ignore_punctuation=True,
ignore_whitespace=True,
):
return False, f"Please answer with {_HELLO_WORLD}, not '{task_result}'"
return True, None
class HelloWorldAction(Action[None]):
def __init__(self):
super().__init__()
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
return ActionOutput(is_exe_success=True, content=ai_message)
async def _test_agent():
\"\"\"Test the agent.
It will not run in the production environment.
\"\"\"
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt.agent import AgentContext, AgentMemory, UserProxyAgent, LLMConfig
llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")
context: AgentContext = AgentContext(conv_id="summarize")
agent_memory: AgentMemory = AgentMemory()
speaker = (
await HelloWorldSpeakerAgent()
.bind(context)
.bind(LLMConfig(llm_client=llm_client))
.bind(agent_memory)
.build()
)
user_proxy = await UserProxyAgent().bind(agent_memory).bind(context).build()
await user_proxy.initiate_chat(
recipient=speaker,
reviewer=user_proxy,
message="What's your name?",
)
print(await agent_memory.gpts_memory.one_chat_completions("summarize"))
if __name__ == "__main__":
asyncio.run(_test_agent())
"""
with open(init_file, "w") as f:
f.write(f'"""{name} agent package."""\n{content}')
def _write_resource_init_file(working_directory: str, name: str, mod_name: str):
"""Write the resource __init__.py file"""
init_file = Path(working_directory) / name / mod_name / "__init__.py"
content = """\"\"\"A custom resource module that provides a simple tool to send GET requests.\"\"\"
from dbgpt.agent.resource import tool
@tool
def simple_send_requests_get(url: str):
\"\"\"Send a GET request to the specified URL and return the text content.\"\"\"
import requests
response = requests.get(url)
return response.text
"""
with open(init_file, "w") as f:
f.write(content)