feat(runtime): Execute codes in a sandbox environment (#2119)

This commit is contained in:
Fangyin Cheng
2024-11-10 22:36:53 +08:00
committed by GitHub
parent 8593f10ae9
commit a134257e5c
22 changed files with 867 additions and 79 deletions

View File

@@ -35,7 +35,7 @@ class GptsMemory:
message_memory if message_memory is not None else DefaultGptsMessageMemory()
)
self.messages_cache: defaultdict = defaultdict(List[GptsMessage])
self.messages_cache: defaultdict = defaultdict(list)
self.channels: defaultdict = defaultdict(Queue)
self.enable_vis_map: defaultdict = defaultdict(bool)
self.start_round_map: defaultdict = defaultdict(int)
@@ -374,9 +374,9 @@ class GptsMemory:
"receiver": message.receiver,
"model": message.model_name,
"markdown": view_info,
"resource": message.resource_info
if message.resource_info
else None,
"resource": (
message.resource_info if message.resource_info else None
),
}
)
return await vis_client.get(VisAgentMessages.vis_tag()).display(
@@ -427,3 +427,20 @@ class GptsMemory:
else:
param["status"] = Status.COMPLETE.value
return await vis_client.get(VisAppLink.vis_tag()).display(content=param)
async def chat_messages(
self,
conv_id: str,
):
"""Get chat messages."""
while True:
queue = self.queue(conv_id)
if not queue:
break
item = await queue.get()
if item == "[DONE]":
queue.task_done()
break
else:
yield item
await asyncio.sleep(0.005)

View File

@@ -61,6 +61,7 @@ def initialize_components(
# Register serve apps
register_serve_apps(system_app, CFG, param.port)
_initialize_operators()
_initialize_code_server(system_app)
def _initialize_model_cache(system_app: SystemApp, port: int):
@@ -132,6 +133,7 @@ def _initialize_openapi(system_app: SystemApp):
def _initialize_operators():
from dbgpt.app.operators.code import CodeMapOperator
from dbgpt.app.operators.converter import StringToInteger
from dbgpt.app.operators.datasource import (
HODatasourceExecutorOperator,
@@ -140,3 +142,9 @@ def _initialize_operators():
from dbgpt.app.operators.llm import HOLLMOperator, HOStreamingLLMOperator
from dbgpt.app.operators.rag import HOKnowledgeOperator
from dbgpt.serve.agent.resource.datasource import DatasourceResource
def _initialize_code_server(system_app: SystemApp):
from dbgpt.util.code.server import initialize_code_server
initialize_code_server(system_app)

322
dbgpt/app/operators/code.py Normal file
View File

@@ -0,0 +1,322 @@
"""Code operators for DB-GPT.
The code will be executed in a sandbox environment, which is isolated from the host
system. You can limit the memory and file system access of the code execution.
"""
import json
import logging
import os
from dbgpt.core import ModelRequest
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
TAGS_ORDER_HIGH,
IOField,
OperatorCategory,
OptionValue,
Parameter,
ViewMetadata,
ui,
)
from dbgpt.util.code.server import get_code_server
from dbgpt.util.i18n_utils import _
logger = logging.getLogger(__name__)
_FN_PYTHON_MAP = """
import os
import json
import lyric_task
from lyric_py_task.imports import msgpack
def fn_map(args: dict[str, any]) -> dict[str, any]:
text = args.get("text")
return {
"text": text,
"key0": "customized key",
"key1": "hello, world",
"key2": [1, 2, 3],
"key3": {"a": 1, "b": 2},
}
"""
_FN_JAVASCRIPT_MAP = """
function fn_map(args) {
var text = args.text;
return {
text: text,
key0: "customized key",
key1: "hello, world",
key2: [1, 2, 3],
key3: {a: 1, b: 2},
};
}
"""
class CodeMapOperator(MapOperator[dict, dict]):
metadata = ViewMetadata(
label=_("Code Map Operator"),
name="default_code_map_operator",
description=_(
"Handle input dictionary with code and return output dictionary after execution."
),
category=OperatorCategory.CODE,
parameters=[
Parameter.build_from(
_("Code Editor"),
"code",
type=str,
optional=True,
default=_FN_PYTHON_MAP,
placeholder=_("Please input your code"),
description=_("The code to be executed."),
ui=ui.UICodeEditor(
language="python",
),
),
Parameter.build_from(
_("Language"),
"lang",
type=str,
optional=True,
default="python",
placeholder=_("Please select the language"),
description=_("The language of the code."),
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
Parameter.build_from(
_("Call Name"),
"call_name",
type=str,
optional=True,
default="fn_map",
placeholder=_("Please input the call name"),
description=_("The call name of the function."),
),
],
inputs=[
IOField.build_from(
_("Input Data"),
"input",
type=dict,
description=_("The input dictionary."),
)
],
outputs=[
IOField.build_from(
_("Output Data"),
"output",
type=dict,
description=_("The output dictionary."),
)
],
tags={"order": TAGS_ORDER_HIGH},
)
def __init__(
self,
code: str = _FN_PYTHON_MAP,
lang: str = "python",
call_name: str = "fn_map",
**kwargs,
):
super().__init__(**kwargs)
self.code = code
self.lang = lang
self.call_name = call_name
async def map(self, input_value: dict) -> dict:
exec_input_data_bytes = json.dumps(input_value).encode("utf-8")
code_server = await get_code_server()
result = await code_server.exec1(
self.code, exec_input_data_bytes, call_name=self.call_name, lang=self.lang
)
logger.info(f"Code execution result: {result}")
return result.output
_REQ_BUILD_PY_FUNC = """
import os
def fn_map(args: dict[str, any]) -> dict[str, any]:
llm_model = args.get("model", os.getenv("DBGPT_RUNTIME_LLM_MODEL"))
messages: str | list[str] = args.get("messages", [])
if isinstance(messages, str):
human_message = messages
else:
human_message = messages[0]
temperature = float(args.get("temperature") or 0.5)
max_new_tokens = int(args.get("max_new_tokens") or 2048)
conv_uid = args.get("conv_uid", "")
print("Conv uid is: ", conv_uid)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "human", "content": human_message}
]
return {
"model": llm_model,
"messages": messages,
"temperature": temperature,
"max_new_tokens": max_new_tokens
}
"""
_REQ_BUILD_JS_FUNC = """
function fn_map(args) {
var llm_model = args.model || "chatgpt_proxyllm";
var messages = args.messages || [];
var human_message = messages[0];
var temperature = parseFloat(args.temperature) || 0.5;
var max_new_tokens = parseInt(args.max_new_tokens) || 2048;
var conv_uid = args.conv_uid || "";
console.log("Conv uid is: ", conv_uid);
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "human", "content": human_message}
];
return {
model: llm_model,
messages: messages,
temperature: temperature,
max_new_tokens: max_new_tokens
};
}
"""
class CodeDictToModelRequestOperator(MapOperator[dict, ModelRequest]):
metadata = ViewMetadata(
label=_("Code Dict to Model Request Operator"),
name="default_code_dict_to_model_request_operator",
description=_(
"Handle input dictionary with code and return output ModelRequest after execution."
),
category=OperatorCategory.CODE,
parameters=[
Parameter.build_from(
_("Code Editor"),
"code",
type=str,
optional=True,
default=_REQ_BUILD_PY_FUNC,
placeholder=_("Please input your code"),
description=_("The code to be executed."),
ui=ui.UICodeEditor(
language="python",
),
),
Parameter.build_from(
_("Language"),
"lang",
type=str,
optional=True,
default="python",
placeholder=_("Please select the language"),
description=_("The language of the code."),
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
Parameter.build_from(
_("Call Name"),
"call_name",
type=str,
optional=True,
default="fn_map",
placeholder=_("Please input the call name"),
description=_("The call name of the function."),
),
],
inputs=[
IOField.build_from(
_("Input Data"),
"input",
type=dict,
description=_("The input dictionary."),
)
],
outputs=[
IOField.build_from(
_("Output Data"),
"output",
type=ModelRequest,
description=_("The output ModelRequest."),
)
],
tags={"order": TAGS_ORDER_HIGH},
)
def __init__(
self,
code: str = _REQ_BUILD_PY_FUNC,
lang: str = "python",
call_name: str = "fn_map",
**kwargs,
):
super().__init__(**kwargs)
self.code = code
self.lang = lang
self.call_name = call_name
async def map(self, input_value: dict) -> ModelRequest:
from lyric import PyTaskFsConfig, PyTaskMemoryConfig, PyTaskResourceConfig
exec_input_data_bytes = json.dumps(input_value).encode("utf-8")
code_server = await get_code_server()
model_name = os.getenv("LLM_MODEL")
fs = PyTaskFsConfig(
preopens=[
# Mount the /tmp directory to the /tmp directory in the sandbox
# Directory permissions are set to 3 (read and write)
# File permissions are set to 3 (read and write)
("/tmp", "/tmp", 3, 3),
# Mount the current directory to the /home directory in the sandbox
# Directory and file permissions are set to 1 (read)
(".", "/home", 1, 1),
]
)
memory = PyTaskMemoryConfig(memory_limit=50 * 1024 * 1024) # 50MB in bytes
resources = PyTaskResourceConfig(
fs=fs,
memory=memory,
env_vars=[
("DBGPT_RUNTIME_LLM_MODEL", model_name),
],
)
result = await code_server.exec1(
self.code,
exec_input_data_bytes,
call_name=self.call_name,
lang=self.lang,
resources=resources,
)
logger.info(f"Code execution result: {result}")
if result.exit_code != 0:
raise RuntimeError(f"Code execution failed: {result.logs}")
if not result.output:
raise RuntimeError(f"Code execution failed: {result.logs}")
if not isinstance(result.output, dict):
raise RuntimeError(
f"Code execution failed, invalid output: {result.output}"
)
logger.info(f"Code execution result: {result}")
return ModelRequest(**result.output)

View File

@@ -1,5 +1,6 @@
"""Type Converter Operators."""
from dbgpt.core import ModelOutput
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
TAGS_ORDER_HIGH,
@@ -184,3 +185,22 @@ class BooleanToString(MapOperator[bool, str]):
def __init__(self, **kwargs):
"""Create a new BooleanToString operator."""
super().__init__(map_function=lambda x: str(x), **kwargs)
class ModelOutputToDict(MapOperator[ModelOutput, dict]):
"""Converts a model output to a dictionary."""
metadata = ViewMetadata(
label=_("Model Output to Dict"),
name="default_converter_model_output_to_dict",
description=_("Converts a model output to a dictionary."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[IOField.build_from(_("Model Output"), "model_output", ModelOutput)],
outputs=[IOField.build_from(_("Dictionary"), "dict", dict)],
tags={"order": TAGS_ORDER_HIGH},
)
def __init__(self, **kwargs):
"""Create a new ModelOutputToDict operator."""
super().__init__(map_function=lambda x: x.to_dict(), **kwargs)

View File

@@ -276,6 +276,8 @@ class BaseChat(ABC):
)
payload.span_id = span.span_id
try:
msg = "<span style='color:red'>ERROR!</span> No response from model"
view_msg = msg
async for output in self.call_streaming_operator(payload):
# Plugin research in result generation
msg = self.prompt_template.output_parser.parse_model_stream_resp_ex(

View File

@@ -151,6 +151,7 @@ _OPERATOR_CATEGORY_DETAIL = {
"database": _CategoryDetail("Database", "Interact with the database"),
"type_converter": _CategoryDetail("Type Converter", "Convert the type"),
"example": _CategoryDetail("Example", "Example operator"),
"code": _CategoryDetail("Code", "Code operator"),
}
@@ -169,6 +170,7 @@ class OperatorCategory(str, Enum):
DATABASE = "database"
TYPE_CONVERTER = "type_converter"
EXAMPLE = "example"
CODE = "code"
def label(self) -> str:
"""Get the label of the category."""
@@ -1361,6 +1363,8 @@ def _register_operator(view_cls: Optional[Type[T]]):
"""Register the operator."""
if not view_cls or not view_cls.metadata:
return
if "metadata" not in view_cls.__dict__:
return # Skip the base class
metadata = view_cls.metadata
metadata.type_name = view_cls.__qualname__
metadata.type_cls = _get_type_name(view_cls)

View File

152
dbgpt/util/code/server.py Normal file
View File

@@ -0,0 +1,152 @@
import asyncio
import logging
from enum import Enum, auto
from typing import Dict, Optional, cast
from lyric import CodeResult, DefaultLyricDriver, PyTaskResourceConfig
from dbgpt.component import BaseComponent, SystemApp
logger = logging.getLogger(__name__)
class ServerState(Enum):
INIT = auto()
STARTING = auto()
READY = auto()
STOPPING = auto()
STOPPED = auto()
class CodeServer(BaseComponent):
def __init__(self, system_app: Optional[SystemApp] = None):
self.sys_app = system_app
super().__init__(system_app)
self._lcd = DefaultLyricDriver()
self._init_lock = asyncio.Lock()
self._state = ServerState.INIT
self._ready_event = asyncio.Event()
def init_app(self, system_app: SystemApp):
self.sys_app = system_app
def before_start(self):
if self._state == ServerState.INIT:
self._state = ServerState.STARTING
self._lcd.start()
def before_stop(self):
self._state = ServerState.STOPPING
self._lcd.stop()
self._state = ServerState.STOPPED
self._ready_event.clear()
async def async_after_start(self):
await self._ensure_initialized()
async def _ensure_initialized(self):
"""Ensure the server is initialized and workers are loaded."""
if self._state == ServerState.READY:
return
async with self._init_lock:
# Double check after acquiring lock
if self._state == ServerState.READY:
return
if self._state == ServerState.INIT:
logger.info("Starting code server...")
self._state = ServerState.STARTING
self._lcd.start()
if self._state == ServerState.STARTING:
await self._lcd.lyric.load_default_workers()
self._state = ServerState.READY
self._ready_event.set()
logger.info("Code server is ready")
async def wait_ready(self, timeout: Optional[float] = None) -> bool:
"""Wait until server is ready.
Args:
timeout: Maximum time to wait in seconds. None means wait forever.
Returns:
bool: True if server is ready, False if timeout occurred
"""
try:
await asyncio.wait_for(self._ready_event.wait(), timeout)
return True
except asyncio.TimeoutError:
return False
async def exec(
self, code: str, lang: str, resources: Optional[PyTaskResourceConfig] = None
) -> CodeResult:
await self._ensure_initialized()
return await self._lcd.exec(code, lang, resources=resources)
async def exec1(
self,
code: str,
input_bytes: bytes,
call_name: str,
lang: str = "python",
resources: Optional[PyTaskResourceConfig] = None,
) -> CodeResult:
await self._ensure_initialized()
return await self._lcd.exec1(
code, input_bytes, call_name, lang=lang, resources=resources
)
async def parse_awel(self, code: str) -> Optional[Dict]:
"""Parse the AWEL code.
Return the flow metadata.
"""
raise NotImplementedError
async def run_awel_operator(self, code: str):
"""Run an AWEL operator in remote mode."""
raise NotImplementedError
_SYSTEM_APP: Optional[SystemApp] = None
def initialize_code_server(system_app: SystemApp):
"""Initialize the code server."""
global _SYSTEM_APP
_SYSTEM_APP = system_app
code_server = CodeServer(system_app)
system_app.register_instance(code_server)
async def get_code_server(
system_app: Optional[SystemApp] = None,
wait_ready: bool = True,
timeout: Optional[float] = None,
) -> CodeServer:
"""Return the code server.
Args:
system_app (Optional[SystemApp]): The system app. Defaults to None.
wait_ready (bool): Whether to wait for server to be ready. Defaults to True.
timeout (Optional[float]): Maximum time to wait in seconds. None means wait forever.
Returns:
CodeServer: The code server.
"""
if not _SYSTEM_APP:
if not system_app:
system_app = SystemApp()
initialize_code_server(system_app)
app = system_app or _SYSTEM_APP
server = CodeServer.get_instance(cast(SystemApp, app))
if wait_ready:
await server._ensure_initialized()
if not await server.wait_ready(timeout):
raise TimeoutError("Timeout waiting for code server to be ready")
return server

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 174 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 252 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 218 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 145 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 292 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 272 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 278 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 217 KiB

View File

@@ -0,0 +1,305 @@
"""Run your code assistant agent in a sandbox environment.
This example demonstrates how to create a code assistant agent that can execute code
in a sandbox environment. The agent can execute Python and JavaScript code blocks
and provide the output to the user. The agent can also check the correctness of the
code execution results and provide feedback to the user.
You can limit the memory and file system resources available to the code execution
environment. The code execution environment is isolated from the host system,
preventing access to the internet and other external resources.
"""
import asyncio
import logging
from typing import Optional, Tuple
from dbgpt.agent import (
Action,
ActionOutput,
AgentContext,
AgentMemory,
AgentMemoryFragment,
AgentMessage,
AgentResource,
ConversableAgent,
HybridMemory,
LLMConfig,
ProfileConfig,
UserProxyAgent,
)
from dbgpt.agent.expand.code_assistant_agent import CHECK_RESULT_SYSTEM_MESSAGE
from dbgpt.core import ModelMessageRoleType
from dbgpt.util.code_utils import UNKNOWN, extract_code, infer_lang
from dbgpt.util.string_utils import str_to_bool
from dbgpt.util.utils import colored
from dbgpt.vis.tags.vis_code import Vis, VisCode
logger = logging.getLogger(__name__)
class SandboxCodeAction(Action[None]):
"""Code Action Module."""
def __init__(self, **kwargs):
"""Code action init."""
super().__init__(**kwargs)
self._render_protocol = VisCode()
self._code_execution_config = {}
@property
def render_protocol(self) -> Optional[Vis]:
"""Return the render protocol."""
return self._render_protocol
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
"""Perform the action."""
try:
code_blocks = extract_code(ai_message)
if len(code_blocks) < 1:
logger.info(
f"No executable code found in answer,{ai_message}",
)
return ActionOutput(
is_exe_success=False, content="No executable code found in answer."
)
elif len(code_blocks) > 1 and code_blocks[0][0] == UNKNOWN:
# found code blocks, execute code and push "last_n_messages" back
logger.info(
f"Missing available code block type, unable to execute code,"
f"{ai_message}",
)
return ActionOutput(
is_exe_success=False,
content="Missing available code block type, "
"unable to execute code.",
)
exitcode, logs = await self.execute_code_blocks(code_blocks)
exit_success = exitcode == 0
content = (
logs
if exit_success
else f"exitcode: {exitcode} (execution failed)\n {logs}"
)
param = {
"exit_success": exit_success,
"language": code_blocks[0][0],
"code": code_blocks,
"log": logs,
}
if not self.render_protocol:
raise NotImplementedError("The render_protocol should be implemented.")
view = await self.render_protocol.display(content=param)
return ActionOutput(
is_exe_success=exit_success,
content=content,
view=view,
thoughts=ai_message,
observations=content,
)
except Exception as e:
logger.exception("Code Action Run Failed")
return ActionOutput(
is_exe_success=False, content="Code execution exception" + str(e)
)
async def execute_code_blocks(self, code_blocks):
"""Execute the code blocks and return the result."""
from lyric import (
PyTaskFilePerms,
PyTaskFsConfig,
PyTaskMemoryConfig,
PyTaskResourceConfig,
)
from dbgpt.util.code.server import get_code_server
fs = PyTaskFsConfig(
preopens=[
# Mount the /tmp directory to the /tmp directory in the sandbox
# Directory permissions are set to 3 (read and write)
# File permissions are set to 3 (read and write)
("/tmp", "/tmp", 3, 3),
# Mount the current directory to the /home directory in the sandbox
# Directory and file permissions are set to 1 (read)
(".", "/home", 1, 1),
]
)
memory = PyTaskMemoryConfig(memory_limit=50 * 1024 * 1024) # 50MB in bytes
resources = PyTaskResourceConfig(
fs=fs,
memory=memory,
env_vars=[
("TEST_ENV", "hello, im an env var"),
("TEST_ENV2", "hello, im another env var"),
],
)
code_server = await get_code_server()
logs_all = ""
exitcode = -1
for i, code_block in enumerate(code_blocks):
lang, code = code_block
if not lang:
lang = infer_lang(code)
print(
colored(
f"\n>>>>>>>> EXECUTING CODE BLOCK {i} "
f"(inferred language is {lang})...",
"red",
),
flush=True,
)
if lang in ["python", "Python"]:
result = await code_server.exec(code, "python", resources=resources)
exitcode = result.exit_code
logs = result.logs
elif lang in ["javascript", "JavaScript"]:
result = await code_server.exec(code, "javascript", resources=resources)
exitcode = result.exit_code
logs = result.logs
else:
# In case the language is not supported, we return an error message.
exitcode, logs = (
1,
f"unknown language {lang}",
)
logs_all += "\n" + logs
if exitcode != 0:
return exitcode, logs_all
return exitcode, logs_all
class SandboxCodeAssistantAgent(ConversableAgent):
"""Code Assistant Agent."""
profile: ProfileConfig = ProfileConfig(
name="Turing",
role="CodeEngineer",
goal=(
"Solve tasks using your coding and language skills.\n"
"In the following cases, suggest python code (in a python coding block) or "
"javascript for the user to execute.\n"
" 1. When you need to collect info, use the code to output the info you "
"need, for example, get the current date/time, check the "
"operating system. After sufficient info is printed and the task is ready "
"to be solved based on your language skill, you can solve the task by "
"yourself.\n"
" 2. When you need to perform some task with code, use the code to "
"perform the task and output the result. Finish the task smartly."
),
constraints=[
"The user cannot provide any other feedback or perform any other "
"action beyond executing the code you suggest. The user can't modify "
"your code. So do not suggest incomplete code which requires users to "
"modify. Don't use a code block if it's not intended to be executed "
"by the user.Don't ask users to copy and paste results. Instead, "
"the 'Print' function must be used for output when relevant.",
"When using code, you must indicate the script type in the code block. "
"Please don't include multiple code blocks in one response.",
"If you receive user input that indicates an error in the code "
"execution, fix the error and output the complete code again. It is "
"recommended to use the complete code rather than partial code or "
"code changes. If the error cannot be fixed, or the task is not "
"resolved even after the code executes successfully, analyze the "
"problem, revisit your assumptions, gather additional information you "
"need from historical conversation records, and consider trying a "
"different approach.",
"Unless necessary, give priority to solving problems with python " "code.",
"The output content of the 'print' function will be passed to other "
"LLM agents as dependent data. Please control the length of the "
"output content of the 'print' function. The 'print' function only "
"outputs part of the key data information that is relied on, "
"and is as concise as possible.",
"Your code will by run in a sandbox environment(supporting python and "
"javascript), which means you can't access the internet or use any "
"libraries that are not in standard library.",
"It is prohibited to fabricate non-existent data to achieve goals.",
],
desc=(
"Can independently write and execute python/shell code to solve various"
" problems"
),
)
def __init__(self, **kwargs):
"""Create a new CodeAssistantAgent instance."""
super().__init__(**kwargs)
self._init_actions([SandboxCodeAction])
async def correctness_check(
self, message: AgentMessage
) -> Tuple[bool, Optional[str]]:
"""Verify whether the current execution results meet the target expectations."""
task_goal = message.current_goal
action_report = message.action_report
if not action_report:
return False, "No execution solution results were checked"
check_result, model = await self.thinking(
messages=[
AgentMessage(
role=ModelMessageRoleType.HUMAN,
content="Please understand the following task objectives and "
f"results and give your judgment:\n"
f"Task goal: {task_goal}\n"
f"Execution Result: {action_report.content}",
)
],
prompt=CHECK_RESULT_SYSTEM_MESSAGE,
)
success = str_to_bool(check_result)
fail_reason = None
if not success:
fail_reason = (
f"Your answer was successfully executed by the agent, but "
f"the goal cannot be completed yet. Please regenerate based on the "
f"failure reason:{check_result}"
)
return success, fail_reason
async def main():
from dbgpt.model.proxy import OpenAILLMClient
llm_client = OpenAILLMClient(model_alias="gpt-4o-mini")
context: AgentContext = AgentContext(conv_id="test123")
agent_memory = AgentMemory(HybridMemory[AgentMemoryFragment].from_chroma())
agent_memory.gpts_memory.init("test123")
coder = (
await SandboxCodeAssistantAgent()
.bind(context)
.bind(LLMConfig(llm_client=llm_client))
.bind(agent_memory)
.build()
)
user_proxy = await UserProxyAgent().bind(context).bind(agent_memory).build()
# First case: The user asks the agent to calculate 321 * 123
await user_proxy.initiate_chat(
recipient=coder,
reviewer=user_proxy,
message="计算下321 * 123等于多少",
)
await user_proxy.initiate_chat(
recipient=coder,
reviewer=user_proxy,
message="Calculate 100 * 99, must use javascript code block",
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1134,7 +1134,23 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
ui=ui.UICodeEditor(
language="python",
),
)
),
Parameter.build_from(
"Language",
"lang",
type=str,
optional=True,
default="python",
placeholder="Please select the language",
description="The language of the code.",
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
],
inputs=[
IOField.build_from(
@@ -1154,95 +1170,34 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
],
)
def __init__(self, code: str, **kwargs):
def __init__(self, code: str, lang: str = "python", **kwargs):
super().__init__(**kwargs)
self.code = code
self.lang = lang
async def map(self, user_name: str) -> str:
"""Map the user name to the code."""
from dbgpt.util.code_utils import UNKNOWN, extract_code
code = self.code
exitcode = -1
exit_code = -1
try:
code_blocks = extract_code(self.code)
if len(code_blocks) < 1:
logger.info(
f"No executable code found in: \n{code}",
)
raise ValueError(f"No executable code found in: \n{code}")
elif len(code_blocks) > 1 and code_blocks[0][0] == UNKNOWN:
# found code blocks, execute code and push "last_n_messages" back
logger.info(
f"Missing available code block type, unable to execute code,"
f"\n{code}",
)
raise ValueError(
"Missing available code block type, unable to execute code, "
f"\n{code}"
)
exitcode, logs = await self.blocking_func_to_async(
self.execute_code_blocks, code_blocks
)
# exitcode, logs = self.execute_code_blocks(code_blocks)
exit_code, logs = await self.execute_code_blocks(code, self.lang)
except Exception as e:
logger.error(f"Failed to execute code: {e}")
logs = f"Failed to execute code: {e}"
return (
f"Your name is {user_name}, and your code is \n\n```python\n{self.code}"
f"Your name is {user_name}, and your code is \n\n```python\n{code}"
f"\n\n```\n\nThe execution result is \n\n```\n{logs}\n\n```\n\n"
f"Exit code: {exitcode}."
f"Exit code: {exit_code}."
)
def execute_code_blocks(self, code_blocks):
async def execute_code_blocks(self, code_blocks: str, lang: str):
"""Execute the code blocks and return the result."""
from dbgpt.util.code_utils import execute_code, infer_lang
from dbgpt.util.utils import colored
from dbgpt.util.code.server import CodeResult, get_code_server
logs_all = ""
exitcode = -1
_code_execution_config = {"use_docker": False}
for i, code_block in enumerate(code_blocks):
lang, code = code_block
if not lang:
lang = infer_lang(code)
print(
colored(
f"\n>>>>>>>> EXECUTING CODE BLOCK {i} "
f"(inferred language is {lang})...",
"red",
),
flush=True,
)
if lang in ["bash", "shell", "sh"]:
exitcode, logs, image = execute_code(
code, lang=lang, **_code_execution_config
)
elif lang in ["python", "Python"]:
if code.startswith("# filename: "):
filename = code[11 : code.find("\n")].strip()
else:
filename = None
exitcode, logs, image = execute_code(
code,
lang="python",
filename=filename,
**_code_execution_config,
)
else:
# In case the language is not supported, we return an error message.
exitcode, logs, image = (
1,
f"unknown language {lang}",
None,
)
# raise NotImplementedError
if image is not None:
_code_execution_config["use_docker"] = image
logs_all += "\n" + logs
if exitcode != 0:
return exitcode, logs_all
return exitcode, logs_all
code_server = await get_code_server(self.system_app)
result: CodeResult = await code_server.exec(code_blocks, lang)
return result.exit_code, result.logs
class ExampleFlowDynamicParametersOperator(MapOperator[str, str]):

View File

@@ -517,13 +517,15 @@ def code_execution_requires():
"""
pip install "dbgpt[code]"
Code execution dependencies. For building a docker image.
Code execution dependencies.
"""
setup_spec.extras["code"] = setup_spec.extras["core"] + [
"pyzmq",
"msgpack",
# for AWEL operator serialization
"cloudpickle",
"lyric-py>=0.1.4",
"lyric-py-worker>=0.1.4",
"lyric-js-worker>=0.1.4",
]
@@ -723,6 +725,7 @@ def default_requires():
setup_spec.extras["default"] += setup_spec.extras["datasource"]
setup_spec.extras["default"] += setup_spec.extras["torch"]
setup_spec.extras["default"] += setup_spec.extras["cache"]
setup_spec.extras["default"] += setup_spec.extras["code"]
if INCLUDE_QUANTIZATION:
# Add quantization extra to default, default is True
setup_spec.extras["default"] += setup_spec.extras["quantization"]