DB-GPT/examples/awel/awel_flow_ui_components.py

1354 lines
45 KiB
Python

"""Some UI components for the AWEL flow."""
import json
import logging
from typing import Any, Dict, List, Optional
from dbgpt.core.awel import JoinOperator, MapOperator
from dbgpt.core.awel.flow import (
FunctionDynamicOptions,
IOField,
OperatorCategory,
OptionValue,
Parameter,
VariablesDynamicOptions,
ViewMetadata,
ui,
)
from dbgpt.core.interface.file import FileStorageClient
from dbgpt.core.interface.variables import (
BUILTIN_VARIABLES_CORE_EMBEDDINGS,
BUILTIN_VARIABLES_CORE_FLOW_NODES,
BUILTIN_VARIABLES_CORE_FLOWS,
BUILTIN_VARIABLES_CORE_LLMS,
BUILTIN_VARIABLES_CORE_SECRETS,
BUILTIN_VARIABLES_CORE_VARIABLES,
)
logger = logging.getLogger(__name__)
class ExampleFlowSelectOperator(MapOperator[str, str]):
"""An example flow operator that includes a select as parameter."""
metadata = ViewMetadata(
label="Example Flow Select",
name="example_flow_select",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a select as parameter.",
parameters=[
Parameter.build_from(
"Fruits Selector",
"fruits",
type=str,
optional=True,
default=None,
placeholder="Select the fruits",
description="The fruits you like.",
options=[
OptionValue(label="Apple", name="apple", value="apple"),
OptionValue(label="Banana", name="banana", value="banana"),
OptionValue(label="Orange", name="orange", value="orange"),
OptionValue(label="Pear", name="pear", value="pear"),
],
ui=ui.UISelect(attr=ui.UISelect.UIAttribute(show_search=True)),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Fruits",
"fruits",
str,
description="User's favorite fruits.",
)
],
)
def __init__(self, fruits: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.fruits = fruits
async def map(self, user_name: str) -> str:
"""Map the user name to the fruits."""
return "Your name is %s, and you like %s." % (user_name, self.fruits)
class ExampleFlowCascaderOperator(MapOperator[str, str]):
"""An example flow operator that includes a cascader as parameter."""
metadata = ViewMetadata(
label="Example Flow Cascader",
name="example_flow_cascader",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a cascader as parameter.",
parameters=[
Parameter.build_from(
"Address Selector",
"address",
type=str,
is_list=True,
optional=True,
default=None,
placeholder="Select the address",
description="The address of the location.",
options=[
OptionValue(
label="Zhejiang",
name="zhejiang",
value="zhejiang",
children=[
OptionValue(
label="Hangzhou",
name="hangzhou",
value="hangzhou",
children=[
OptionValue(
label="Xihu",
name="xihu",
value="xihu",
),
OptionValue(
label="Feilaifeng",
name="feilaifeng",
value="feilaifeng",
),
],
),
],
),
OptionValue(
label="Jiangsu",
name="jiangsu",
value="jiangsu",
children=[
OptionValue(
label="Nanjing",
name="nanjing",
value="nanjing",
children=[
OptionValue(
label="Zhonghua Gate",
name="zhonghuamen",
value="zhonghuamen",
),
OptionValue(
label="Zhongshanling",
name="zhongshanling",
value="zhongshanling",
),
],
),
],
),
],
ui=ui.UICascader(attr=ui.UICascader.UIAttribute(show_search=True)),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Address",
"address",
str,
description="User's address.",
)
],
)
def __int__(self, address: Optional[List[str]] = None, **kwargs):
super().__init__(**kwargs)
self.address = address or []
async def map(self, user_name: str) -> str:
"""Map the user name to the address."""
full_address_str = " ".join(self.address)
return "Your name is %s, and your address is %s." % (
user_name,
full_address_str,
)
class ExampleFlowCheckboxOperator(MapOperator[str, str]):
"""An example flow operator that includes a checkbox as parameter."""
metadata = ViewMetadata(
label="Example Flow Checkbox",
name="example_flow_checkbox",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a checkbox as parameter.",
parameters=[
Parameter.build_from(
"Fruits Selector",
"fruits",
type=str,
is_list=True,
optional=True,
default=None,
placeholder="Select the fruits",
description="The fruits you like.",
options=[
OptionValue(label="Apple", name="apple", value="apple"),
OptionValue(label="Banana", name="banana", value="banana"),
OptionValue(label="Orange", name="orange", value="orange"),
OptionValue(label="Pear", name="pear", value="pear"),
],
ui=ui.UICheckbox(),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Fruits",
"fruits",
str,
description="User's favorite fruits.",
)
],
)
def __init__(self, fruits: Optional[List[str]] = None, **kwargs):
super().__init__(**kwargs)
self.fruits = fruits or []
async def map(self, user_name: str) -> str:
"""Map the user name to the fruits."""
return "Your name is %s, and you like %s." % (user_name, ", ".join(self.fruits))
class ExampleFlowRadioOperator(MapOperator[str, str]):
"""An example flow operator that includes a radio as parameter."""
metadata = ViewMetadata(
label="Example Flow Radio",
name="example_flow_radio",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a radio as parameter.",
parameters=[
Parameter.build_from(
"Fruits Selector",
"fruits",
type=str,
optional=True,
default=None,
placeholder="Select the fruits",
description="The fruits you like.",
options=[
OptionValue(label="Apple", name="apple", value="apple"),
OptionValue(label="Banana", name="banana", value="banana"),
OptionValue(label="Orange", name="orange", value="orange"),
OptionValue(label="Pear", name="pear", value="pear"),
],
ui=ui.UIRadio(),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Fruits",
"fruits",
str,
description="User's favorite fruits.",
)
],
)
def __init__(self, fruits: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.fruits = fruits
async def map(self, user_name: str) -> str:
"""Map the user name to the fruits."""
return "Your name is %s, and you like %s." % (user_name, self.fruits)
class ExampleFlowDatePickerOperator(MapOperator[str, str]):
"""An example flow operator that includes a date picker as parameter."""
metadata = ViewMetadata(
label="Example Flow Date Picker",
name="example_flow_date_picker",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a date picker as parameter.",
parameters=[
Parameter.build_from(
"Date Selector",
"date",
type=str,
placeholder="Select the date",
description="The date you choose.",
ui=ui.UIDatePicker(
attr=ui.UIDatePicker.UIAttribute(placement="bottomLeft")
),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Date",
"date",
str,
description="User's selected date.",
)
],
)
def __init__(self, date: str, **kwargs):
super().__init__(**kwargs)
self.date = date
async def map(self, user_name: str) -> str:
"""Map the user name to the date."""
return "Your name is %s, and you choose the date %s." % (user_name, self.date)
class ExampleFlowInputOperator(MapOperator[str, str]):
"""An example flow operator that includes an input as parameter."""
metadata = ViewMetadata(
label="Example Flow Input",
name="example_flow_input",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a input as parameter.",
parameters=[
Parameter.build_from(
"Your hobby",
"hobby",
type=str,
placeholder="Please input your hobby",
description="The hobby you like.",
ui=ui.UIInput(
attr=ui.UIInput.UIAttribute(
prefix="icon:UserOutlined", show_count=True, maxlength=200
)
),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"User Hobby",
"hobby",
str,
description="User's hobby.",
)
],
)
def __init__(self, hobby: str, **kwargs):
super().__init__(**kwargs)
self.hobby = hobby
async def map(self, user_name: str) -> str:
"""Map the user name to the input."""
return "Your name is %s, and your hobby is %s." % (user_name, self.hobby)
class ExampleFlowTextAreaOperator(MapOperator[str, str]):
"""An example flow operator that includes a text area as parameter."""
metadata = ViewMetadata(
label="Example Flow Text Area",
name="example_flow_text_area",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a text area as parameter.",
parameters=[
Parameter.build_from(
"Your comment",
"comment",
type=str,
placeholder="Please input your comment",
description="The comment you want to say.",
ui=ui.UITextArea(
attr=ui.UITextArea.UIAttribute(
show_count=True,
maxlength=1000,
auto_size=ui.UITextArea.UIAttribute.AutoSize(
min_rows=2, max_rows=6
),
),
),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"User Comment",
"comment",
str,
description="User's comment.",
)
],
)
def __init__(self, comment: str, **kwargs):
super().__init__(**kwargs)
self.comment = comment
async def map(self, user_name: str) -> str:
"""Map the user name to the text area."""
return "Your name is %s, and your comment is %s." % (user_name, self.comment)
class ExampleFlowSliderOperator(MapOperator[float, float]):
metadata = ViewMetadata(
label="Example Flow Slider",
name="example_flow_slider",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a slider as parameter.",
parameters=[
Parameter.build_from(
"Default Temperature",
"default_temperature",
type=float,
optional=True,
default=0.7,
placeholder="Set the default temperature, e.g., 0.7",
description="The default temperature to pass to the LLM.",
ui=ui.UISlider(
show_input=True,
attr=ui.UISlider.UIAttribute(min=0.0, max=2.0, step=0.1),
),
)
],
inputs=[
IOField.build_from(
"Temperature",
"temperature",
float,
description="The temperature.",
)
],
outputs=[
IOField.build_from(
"Temperature",
"temperature",
float,
description="The temperature to pass to the LLM.",
)
],
)
def __init__(self, default_temperature: float = 0.7, **kwargs):
super().__init__(**kwargs)
self.default_temperature = default_temperature
async def map(self, temperature: float) -> float:
"""Map the temperature to the result."""
if temperature < 0.0 or temperature > 2.0:
logger.warning("Temperature out of range: %s", temperature)
return self.default_temperature
else:
return temperature
class ExampleFlowSliderListOperator(MapOperator[float, float]):
"""An example flow operator that includes a slider list as parameter."""
metadata = ViewMetadata(
label="Example Flow Slider List",
name="example_flow_slider_list",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a slider list as parameter.",
parameters=[
Parameter.build_from(
"Temperature Selector",
"temperature_range",
type=float,
is_list=True,
optional=True,
default=None,
placeholder="Set the temperature, e.g., [0.1, 0.9]",
description="The temperature range to pass to the LLM.",
ui=ui.UISlider(
show_input=True,
attr=ui.UISlider.UIAttribute(min=0.0, max=2.0, step=0.1),
),
)
],
inputs=[
IOField.build_from(
"Temperature",
"temperature",
float,
description="The temperature.",
)
],
outputs=[
IOField.build_from(
"Temperature",
"temperature",
float,
description="The temperature to pass to the LLM.",
)
],
)
def __init__(self, temperature_range: Optional[List[float]] = None, **kwargs):
super().__init__(**kwargs)
temperature_range = temperature_range or [0.1, 0.9]
if temperature_range and len(temperature_range) != 2:
raise ValueError("The length of temperature range must be 2.")
self.temperature_range = temperature_range
async def map(self, temperature: float) -> float:
"""Map the temperature to the result."""
min_temperature, max_temperature = self.temperature_range
if temperature < min_temperature or temperature > max_temperature:
logger.warning(
"Temperature out of range: %s, min: %s, max: %s",
temperature,
min_temperature,
max_temperature,
)
return min_temperature
return temperature
class ExampleFlowTimePickerOperator(MapOperator[str, str]):
"""An example flow operator that includes a time picker as parameter."""
metadata = ViewMetadata(
label="Example Flow Time Picker",
name="example_flow_time_picker",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a time picker as parameter.",
parameters=[
Parameter.build_from(
"Time Selector",
"time",
type=str,
placeholder="Select the time",
description="The time you choose.",
ui=ui.UITimePicker(
attr=ui.UITimePicker.UIAttribute(
format="HH:mm:ss", hour_step=2, minute_step=10, second_step=10
),
),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Time",
"time",
str,
description="User's selected time.",
)
],
)
def __init__(self, time: str, **kwargs):
super().__init__(**kwargs)
self.time = time
async def map(self, user_name: str) -> str:
"""Map the user name to the time."""
return "Your name is %s, and you choose the time %s." % (user_name, self.time)
class ExampleFlowTreeSelectOperator(MapOperator[str, str]):
"""An example flow operator that includes a tree select as parameter."""
metadata = ViewMetadata(
label="Example Flow Tree Select",
name="example_flow_tree_select",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a tree select as parameter.",
parameters=[
Parameter.build_from(
"Address Selector",
"address",
type=str,
is_list=True,
optional=True,
default=None,
placeholder="Select the address",
description="The address of the location.",
options=[
OptionValue(
label="Zhejiang",
name="zhejiang",
value="zhejiang",
children=[
OptionValue(
label="Hangzhou",
name="hangzhou",
value="hangzhou",
children=[
OptionValue(
label="Xihu",
name="xihu",
value="xihu",
),
OptionValue(
label="Feilaifeng",
name="feilaifeng",
value="feilaifeng",
),
],
),
],
),
OptionValue(
label="Jiangsu",
name="jiangsu",
value="jiangsu",
children=[
OptionValue(
label="Nanjing",
name="nanjing",
value="nanjing",
children=[
OptionValue(
label="Zhonghua Gate",
name="zhonghuamen",
value="zhonghuamen",
),
OptionValue(
label="Zhongshanling",
name="zhongshanling",
value="zhongshanling",
),
],
),
],
),
],
ui=ui.UITreeSelect(attr=ui.UITreeSelect.UIAttribute(show_search=True)),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Address",
"address",
str,
description="User's address.",
)
],
)
def __int__(self, address: Optional[List[str]] = None, **kwargs):
super().__init__(**kwargs)
self.address = address or []
async def map(self, user_name: str) -> str:
"""Map the user name to the address."""
full_address_str = " ".join(self.address)
return "Your name is %s, and your address is %s." % (
user_name,
full_address_str,
)
def get_recent_3_times(time_interval: int = 1) -> List[OptionValue]:
"""Get the recent times."""
from datetime import datetime, timedelta
now = datetime.now()
recent_times = [now - timedelta(hours=time_interval * i) for i in range(3)]
formatted_times = [time.strftime("%Y-%m-%d %H:%M:%S") for time in recent_times]
option_values = [
OptionValue(label=formatted_time, name=f"time_{i + 1}", value=formatted_time)
for i, formatted_time in enumerate(formatted_times)
]
return option_values
class ExampleFlowRefreshOperator(MapOperator[str, str]):
"""An example flow operator that includes a refresh option."""
metadata = ViewMetadata(
label="Example Refresh Operator",
name="example_refresh_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a refresh option.",
parameters=[
Parameter.build_from(
"Time Interval",
"time_interval",
type=int,
optional=True,
default=1,
placeholder="Set the time interval",
description="The time interval to fetch the times",
),
Parameter.build_from(
"Recent Time",
"recent_time",
type=str,
optional=True,
default=None,
placeholder="Select the recent time",
description="The recent time to choose.",
options=FunctionDynamicOptions(func=get_recent_3_times),
ui=ui.UISelect(
refresh=True,
refresh_depends=["time_interval"],
attr=ui.UISelect.UIAttribute(show_search=True),
),
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Time",
"time",
str,
description="User's selected time.",
)
],
)
def __init__(
self, time_interval: int = 1, recent_time: Optional[str] = None, **kwargs
):
super().__init__(**kwargs)
self.time_interval = time_interval
self.recent_time = recent_time
async def map(self, user_name: str) -> str:
"""Map the user name to the time."""
return "Your name is %s, and you choose the time %s." % (
user_name,
self.recent_time,
)
class ExampleFlowUploadOperator(MapOperator[str, str]):
"""An example flow operator that includes an upload as parameter."""
metadata = ViewMetadata(
label="Example Flow Upload",
name="example_flow_upload",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a upload as parameter.",
parameters=[
Parameter.build_from(
"Single File Selector",
"file",
type=str,
optional=True,
default=None,
placeholder="Select the file",
description="The file you want to upload.",
ui=ui.UIUpload(
max_file_size=1024 * 1024 * 100,
up_event="after_select",
attr=ui.UIUpload.UIAttribute(max_count=1),
),
),
Parameter.build_from(
"Multiple Files Selector",
"multiple_files",
type=str,
is_list=True,
optional=True,
default=None,
placeholder="Select the multiple files",
description="The multiple files you want to upload.",
ui=ui.UIUpload(
max_file_size=1024 * 1024 * 100,
up_event="button_click",
attr=ui.UIUpload.UIAttribute(max_count=5),
),
),
Parameter.build_from(
"CSV File Selector",
"csv_file",
type=str,
optional=True,
default=None,
placeholder="Select the CSV file",
description="The CSV file you want to upload.",
ui=ui.UIUpload(
max_file_size=1024 * 1024 * 100,
up_event="after_select",
file_types=[".csv"],
attr=ui.UIUpload.UIAttribute(max_count=1),
),
),
Parameter.build_from(
"Images Selector",
"images",
type=str,
is_list=True,
optional=True,
default=None,
placeholder="Select the images",
description="The images you want to upload.",
ui=ui.UIUpload(
max_file_size=1024 * 1024 * 100,
up_event="button_click",
file_types=["image/*", ".pdf"],
drag=True,
attr=ui.UIUpload.UIAttribute(max_count=5),
),
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"File",
"file",
str,
description="User's uploaded file.",
)
],
)
def __init__(
self,
file: Optional[str] = None,
multiple_files: Optional[List[str]] = None,
csv_file: Optional[str] = None,
images: Optional[List[str]] = None,
**kwargs,
):
super().__init__(**kwargs)
self.file = file
self.multiple_files = multiple_files or []
self.csv_file = csv_file
self.images = images or []
async def map(self, user_name: str) -> str:
"""Map the user name to the file."""
fsc = FileStorageClient.get_instance(self.system_app)
files_metadata = await self.blocking_func_to_async(
self._parse_files_metadata, fsc
)
files_metadata_str = json.dumps(files_metadata, ensure_ascii=False, indent=4)
return "Your name is %s, and you files are %s." % (
user_name,
files_metadata_str,
)
def _parse_files_metadata(self, fsc: FileStorageClient) -> List[Dict[str, Any]]:
"""Parse the files metadata."""
if not self.file:
raise ValueError("The file is not uploaded.")
if not self.multiple_files:
raise ValueError("The multiple files are not uploaded.")
files = [self.file] + self.multiple_files + [self.csv_file] + self.images
results = []
for file in files:
_, metadata = fsc.get_file(file)
results.append(
{
"bucket": metadata.bucket,
"file_id": metadata.file_id,
"file_size": metadata.file_size,
"storage_type": metadata.storage_type,
"uri": metadata.uri,
"file_hash": metadata.file_hash,
}
)
return results
class ExampleFlowVariablesOperator(MapOperator[str, str]):
"""An example flow operator that includes a variables option."""
metadata = ViewMetadata(
label="Example Variables Operator",
name="example_variables_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a variables option.",
parameters=[
Parameter.build_from(
"OpenAI API Key",
"openai_api_key",
type=str,
placeholder="Please select the OpenAI API key",
description="The OpenAI API key to use.",
options=VariablesDynamicOptions(),
ui=ui.UIPasswordInput(
key="dbgpt.model.openai.api_key",
),
),
Parameter.build_from(
"Model",
"model",
type=str,
placeholder="Please select the model",
description="The model to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key="dbgpt.model.openai.model",
),
),
Parameter.build_from(
"Builtin Flows",
"builtin_flow",
type=str,
placeholder="Please select the builtin flows",
description="The builtin flows to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_FLOWS,
),
),
Parameter.build_from(
"Builtin Flow Nodes",
"builtin_flow_node",
type=str,
placeholder="Please select the builtin flow nodes",
description="The builtin flow nodes to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_FLOW_NODES,
),
),
Parameter.build_from(
"Builtin Variables",
"builtin_variable",
type=str,
placeholder="Please select the builtin variables",
description="The builtin variables to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_VARIABLES,
),
),
Parameter.build_from(
"Builtin Secrets",
"builtin_secret",
type=str,
placeholder="Please select the builtin secrets",
description="The builtin secrets to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_SECRETS,
),
),
Parameter.build_from(
"Builtin LLMs",
"builtin_llm",
type=str,
placeholder="Please select the builtin LLMs",
description="The builtin LLMs to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_LLMS,
),
),
Parameter.build_from(
"Builtin Embeddings",
"builtin_embedding",
type=str,
placeholder="Please select the builtin embeddings",
description="The builtin embeddings to use.",
options=VariablesDynamicOptions(),
ui=ui.UIVariablesInput(
key=BUILTIN_VARIABLES_CORE_EMBEDDINGS,
),
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Model info",
"model",
str,
description="The model info.",
),
],
)
def __init__(
self,
openai_api_key: str,
model: str,
builtin_flow: str,
builtin_flow_node: str,
builtin_variable: str,
builtin_secret: str,
builtin_llm: str,
builtin_embedding: str,
**kwargs,
):
super().__init__(**kwargs)
self.openai_api_key = openai_api_key
self.model = model
self.builtin_flow = builtin_flow
self.builtin_flow_node = builtin_flow_node
self.builtin_variable = builtin_variable
self.builtin_secret = builtin_secret
self.builtin_llm = builtin_llm
self.builtin_embedding = builtin_embedding
async def map(self, user_name: str) -> str:
"""Map the user name to the model."""
dict_dict = {
"openai_api_key": self.openai_api_key,
"model": self.model,
"builtin_flow": self.builtin_flow,
"builtin_flow_node": self.builtin_flow_node,
"builtin_variable": self.builtin_variable,
"builtin_secret": self.builtin_secret,
"builtin_llm": self.builtin_llm,
"builtin_embedding": self.builtin_embedding,
}
json_data = json.dumps(dict_dict, ensure_ascii=False)
return "Your name is %s, and your model info is %s." % (user_name, json_data)
class ExampleFlowTagsOperator(MapOperator[str, str]):
"""An example flow operator that includes a tags option."""
metadata = ViewMetadata(
label="Example Tags Operator",
name="example_tags_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a tags",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Tags",
"tags",
str,
description="The tags to use.",
),
],
tags={"order": "higher-order", "type": "example"},
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, user_name: str) -> str:
"""Map the user name to the tags."""
return "Your name is %s, and your tags are %s." % (user_name, "higher-order")
class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
"""An example flow operator that includes a code editor as parameter."""
metadata = ViewMetadata(
label="Example Flow Code Editor",
name="example_flow_code_editor",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a code editor as parameter.",
parameters=[
Parameter.build_from(
"Code Editor",
"code",
type=str,
placeholder="Please input your code",
description="The code you want to edit.",
ui=ui.UICodeEditor(
language="python",
),
),
Parameter.build_from(
"Language",
"lang",
type=str,
optional=True,
default="python",
placeholder="Please select the language",
description="The language of the code.",
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Code",
"code",
str,
description="Result of the code.",
)
],
)
def __init__(self, code: str, lang: str = "python", **kwargs):
super().__init__(**kwargs)
self.code = code
self.lang = lang
async def map(self, user_name: str) -> str:
"""Map the user name to the code."""
code = self.code
exit_code = -1
try:
exit_code, logs = await self.execute_code_blocks(code, self.lang)
except Exception as e:
logger.error(f"Failed to execute code: {e}")
logs = f"Failed to execute code: {e}"
return (
f"Your name is {user_name}, and your code is \n\n```python\n{code}"
f"\n\n```\n\nThe execution result is \n\n```\n{logs}\n\n```\n\n"
f"Exit code: {exit_code}."
)
async def execute_code_blocks(self, code_blocks: str, lang: str):
"""Execute the code blocks and return the result."""
from dbgpt.util.code.server import CodeResult, get_code_server
code_server = await get_code_server(self.system_app)
result: CodeResult = await code_server.exec(code_blocks, lang)
return result.exit_code, result.logs
class ExampleFlowDynamicParametersOperator(MapOperator[str, str]):
"""An example flow operator that includes dynamic parameters."""
metadata = ViewMetadata(
label="Example Dynamic Parameters Operator",
name="example_dynamic_parameters_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic parameters.",
parameters=[
Parameter.build_from(
"Dynamic String",
"dynamic_1",
type=str,
is_list=True,
placeholder="Please input the dynamic parameter",
description="The dynamic parameter you want to use, you can add more, "
"at least 1 parameter.",
dynamic=True,
dynamic_minimum=1,
ui=ui.UIInput(),
),
Parameter.build_from(
"Dynamic Integer",
"dynamic_2",
type=int,
is_list=True,
placeholder="Please input the dynamic parameter",
description="The dynamic parameter you want to use, you can add more, "
"at least 0 parameter.",
dynamic=True,
dynamic_minimum=0,
),
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
),
],
)
def __init__(self, dynamic_1: List[str], dynamic_2: List[int], **kwargs):
super().__init__(**kwargs)
if not dynamic_1:
raise ValueError("The dynamic string is empty.")
self.dynamic_1 = dynamic_1
self.dynamic_2 = dynamic_2
async def map(self, user_name: str) -> str:
"""Map the user name to the dynamic."""
return "Your name is %s, and your dynamic is %s." % (
user_name,
f"dynamic_1: {self.dynamic_1}, dynamic_2: {self.dynamic_2}",
)
class ExampleFlowDynamicOutputsOperator(MapOperator[str, str]):
"""An example flow operator that includes dynamic outputs."""
metadata = ViewMetadata(
label="Example Dynamic Outputs Operator",
name="example_dynamic_outputs_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic outputs.",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
dynamic=True,
dynamic_minimum=1,
),
],
)
async def map(self, user_name: str) -> str:
"""Map the user name to the dynamic."""
return "Your name is %s, this operator has dynamic outputs." % user_name
class ExampleFlowDynamicInputsOperator(JoinOperator[str]):
"""An example flow operator that includes dynamic inputs."""
metadata = ViewMetadata(
label="Example Dynamic Inputs Operator",
name="example_dynamic_inputs_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes dynamic inputs.",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
IOField.build_from(
"Other Inputs",
"other_inputs",
str,
description="Other inputs.",
dynamic=True,
dynamic_minimum=0,
),
],
outputs=[
IOField.build_from(
"Dynamic",
"dynamic",
str,
description="User's selected dynamic.",
),
],
)
def __init__(self, **kwargs):
super().__init__(combine_function=self.join, **kwargs)
async def join(self, user_name: str, *other_inputs: str) -> str:
"""Map the user name to the dynamic."""
if not other_inputs:
dyn_inputs = ["You have no other inputs."]
else:
dyn_inputs = [
f"Input {i}: {input_data}" for i, input_data in enumerate(other_inputs)
]
dyn_str = "\n".join(dyn_inputs)
return "Your name is %s, and your dynamic is %s." % (
user_name,
f"other_inputs:\n{dyn_str}",
)