mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-21 11:29:15 +00:00
1354 lines
45 KiB
Python
1354 lines
45 KiB
Python
"""Some UI components for the AWEL flow."""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from dbgpt.core.awel import JoinOperator, MapOperator
|
|
from dbgpt.core.awel.flow import (
|
|
FunctionDynamicOptions,
|
|
IOField,
|
|
OperatorCategory,
|
|
OptionValue,
|
|
Parameter,
|
|
VariablesDynamicOptions,
|
|
ViewMetadata,
|
|
ui,
|
|
)
|
|
from dbgpt.core.interface.file import FileStorageClient
|
|
from dbgpt.core.interface.variables import (
|
|
BUILTIN_VARIABLES_CORE_EMBEDDINGS,
|
|
BUILTIN_VARIABLES_CORE_FLOW_NODES,
|
|
BUILTIN_VARIABLES_CORE_FLOWS,
|
|
BUILTIN_VARIABLES_CORE_LLMS,
|
|
BUILTIN_VARIABLES_CORE_SECRETS,
|
|
BUILTIN_VARIABLES_CORE_VARIABLES,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ExampleFlowSelectOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a select as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Select",
|
|
name="example_flow_select",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a select as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Fruits Selector",
|
|
"fruits",
|
|
type=str,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the fruits",
|
|
description="The fruits you like.",
|
|
options=[
|
|
OptionValue(label="Apple", name="apple", value="apple"),
|
|
OptionValue(label="Banana", name="banana", value="banana"),
|
|
OptionValue(label="Orange", name="orange", value="orange"),
|
|
OptionValue(label="Pear", name="pear", value="pear"),
|
|
],
|
|
ui=ui.UISelect(attr=ui.UISelect.UIAttribute(show_search=True)),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Fruits",
|
|
"fruits",
|
|
str,
|
|
description="User's favorite fruits.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, fruits: Optional[str] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.fruits = fruits
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the fruits."""
|
|
return "Your name is %s, and you like %s." % (user_name, self.fruits)
|
|
|
|
|
|
class ExampleFlowCascaderOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a cascader as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Cascader",
|
|
name="example_flow_cascader",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a cascader as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Address Selector",
|
|
"address",
|
|
type=str,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the address",
|
|
description="The address of the location.",
|
|
options=[
|
|
OptionValue(
|
|
label="Zhejiang",
|
|
name="zhejiang",
|
|
value="zhejiang",
|
|
children=[
|
|
OptionValue(
|
|
label="Hangzhou",
|
|
name="hangzhou",
|
|
value="hangzhou",
|
|
children=[
|
|
OptionValue(
|
|
label="Xihu",
|
|
name="xihu",
|
|
value="xihu",
|
|
),
|
|
OptionValue(
|
|
label="Feilaifeng",
|
|
name="feilaifeng",
|
|
value="feilaifeng",
|
|
),
|
|
],
|
|
),
|
|
],
|
|
),
|
|
OptionValue(
|
|
label="Jiangsu",
|
|
name="jiangsu",
|
|
value="jiangsu",
|
|
children=[
|
|
OptionValue(
|
|
label="Nanjing",
|
|
name="nanjing",
|
|
value="nanjing",
|
|
children=[
|
|
OptionValue(
|
|
label="Zhonghua Gate",
|
|
name="zhonghuamen",
|
|
value="zhonghuamen",
|
|
),
|
|
OptionValue(
|
|
label="Zhongshanling",
|
|
name="zhongshanling",
|
|
value="zhongshanling",
|
|
),
|
|
],
|
|
),
|
|
],
|
|
),
|
|
],
|
|
ui=ui.UICascader(attr=ui.UICascader.UIAttribute(show_search=True)),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Address",
|
|
"address",
|
|
str,
|
|
description="User's address.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __int__(self, address: Optional[List[str]] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.address = address or []
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the address."""
|
|
full_address_str = " ".join(self.address)
|
|
return "Your name is %s, and your address is %s." % (
|
|
user_name,
|
|
full_address_str,
|
|
)
|
|
|
|
|
|
class ExampleFlowCheckboxOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a checkbox as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Checkbox",
|
|
name="example_flow_checkbox",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a checkbox as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Fruits Selector",
|
|
"fruits",
|
|
type=str,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the fruits",
|
|
description="The fruits you like.",
|
|
options=[
|
|
OptionValue(label="Apple", name="apple", value="apple"),
|
|
OptionValue(label="Banana", name="banana", value="banana"),
|
|
OptionValue(label="Orange", name="orange", value="orange"),
|
|
OptionValue(label="Pear", name="pear", value="pear"),
|
|
],
|
|
ui=ui.UICheckbox(),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Fruits",
|
|
"fruits",
|
|
str,
|
|
description="User's favorite fruits.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, fruits: Optional[List[str]] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.fruits = fruits or []
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the fruits."""
|
|
return "Your name is %s, and you like %s." % (user_name, ", ".join(self.fruits))
|
|
|
|
|
|
class ExampleFlowRadioOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a radio as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Radio",
|
|
name="example_flow_radio",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a radio as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Fruits Selector",
|
|
"fruits",
|
|
type=str,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the fruits",
|
|
description="The fruits you like.",
|
|
options=[
|
|
OptionValue(label="Apple", name="apple", value="apple"),
|
|
OptionValue(label="Banana", name="banana", value="banana"),
|
|
OptionValue(label="Orange", name="orange", value="orange"),
|
|
OptionValue(label="Pear", name="pear", value="pear"),
|
|
],
|
|
ui=ui.UIRadio(),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Fruits",
|
|
"fruits",
|
|
str,
|
|
description="User's favorite fruits.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, fruits: Optional[str] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.fruits = fruits
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the fruits."""
|
|
return "Your name is %s, and you like %s." % (user_name, self.fruits)
|
|
|
|
|
|
class ExampleFlowDatePickerOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a date picker as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Date Picker",
|
|
name="example_flow_date_picker",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a date picker as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Date Selector",
|
|
"date",
|
|
type=str,
|
|
placeholder="Select the date",
|
|
description="The date you choose.",
|
|
ui=ui.UIDatePicker(
|
|
attr=ui.UIDatePicker.UIAttribute(placement="bottomLeft")
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Date",
|
|
"date",
|
|
str,
|
|
description="User's selected date.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, date: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.date = date
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the date."""
|
|
return "Your name is %s, and you choose the date %s." % (user_name, self.date)
|
|
|
|
|
|
class ExampleFlowInputOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes an input as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Input",
|
|
name="example_flow_input",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a input as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Your hobby",
|
|
"hobby",
|
|
type=str,
|
|
placeholder="Please input your hobby",
|
|
description="The hobby you like.",
|
|
ui=ui.UIInput(
|
|
attr=ui.UIInput.UIAttribute(
|
|
prefix="icon:UserOutlined", show_count=True, maxlength=200
|
|
)
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"User Hobby",
|
|
"hobby",
|
|
str,
|
|
description="User's hobby.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, hobby: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.hobby = hobby
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the input."""
|
|
return "Your name is %s, and your hobby is %s." % (user_name, self.hobby)
|
|
|
|
|
|
class ExampleFlowTextAreaOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a text area as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Text Area",
|
|
name="example_flow_text_area",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a text area as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Your comment",
|
|
"comment",
|
|
type=str,
|
|
placeholder="Please input your comment",
|
|
description="The comment you want to say.",
|
|
ui=ui.UITextArea(
|
|
attr=ui.UITextArea.UIAttribute(
|
|
show_count=True,
|
|
maxlength=1000,
|
|
auto_size=ui.UITextArea.UIAttribute.AutoSize(
|
|
min_rows=2, max_rows=6
|
|
),
|
|
),
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"User Comment",
|
|
"comment",
|
|
str,
|
|
description="User's comment.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, comment: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.comment = comment
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the text area."""
|
|
return "Your name is %s, and your comment is %s." % (user_name, self.comment)
|
|
|
|
|
|
class ExampleFlowSliderOperator(MapOperator[float, float]):
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Slider",
|
|
name="example_flow_slider",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a slider as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Default Temperature",
|
|
"default_temperature",
|
|
type=float,
|
|
optional=True,
|
|
default=0.7,
|
|
placeholder="Set the default temperature, e.g., 0.7",
|
|
description="The default temperature to pass to the LLM.",
|
|
ui=ui.UISlider(
|
|
show_input=True,
|
|
attr=ui.UISlider.UIAttribute(min=0.0, max=2.0, step=0.1),
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"Temperature",
|
|
"temperature",
|
|
float,
|
|
description="The temperature.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Temperature",
|
|
"temperature",
|
|
float,
|
|
description="The temperature to pass to the LLM.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, default_temperature: float = 0.7, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.default_temperature = default_temperature
|
|
|
|
async def map(self, temperature: float) -> float:
|
|
"""Map the temperature to the result."""
|
|
if temperature < 0.0 or temperature > 2.0:
|
|
logger.warning("Temperature out of range: %s", temperature)
|
|
return self.default_temperature
|
|
else:
|
|
return temperature
|
|
|
|
|
|
class ExampleFlowSliderListOperator(MapOperator[float, float]):
|
|
"""An example flow operator that includes a slider list as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Slider List",
|
|
name="example_flow_slider_list",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a slider list as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Temperature Selector",
|
|
"temperature_range",
|
|
type=float,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Set the temperature, e.g., [0.1, 0.9]",
|
|
description="The temperature range to pass to the LLM.",
|
|
ui=ui.UISlider(
|
|
show_input=True,
|
|
attr=ui.UISlider.UIAttribute(min=0.0, max=2.0, step=0.1),
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"Temperature",
|
|
"temperature",
|
|
float,
|
|
description="The temperature.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Temperature",
|
|
"temperature",
|
|
float,
|
|
description="The temperature to pass to the LLM.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, temperature_range: Optional[List[float]] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
temperature_range = temperature_range or [0.1, 0.9]
|
|
if temperature_range and len(temperature_range) != 2:
|
|
raise ValueError("The length of temperature range must be 2.")
|
|
self.temperature_range = temperature_range
|
|
|
|
async def map(self, temperature: float) -> float:
|
|
"""Map the temperature to the result."""
|
|
min_temperature, max_temperature = self.temperature_range
|
|
if temperature < min_temperature or temperature > max_temperature:
|
|
logger.warning(
|
|
"Temperature out of range: %s, min: %s, max: %s",
|
|
temperature,
|
|
min_temperature,
|
|
max_temperature,
|
|
)
|
|
return min_temperature
|
|
return temperature
|
|
|
|
|
|
class ExampleFlowTimePickerOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a time picker as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Time Picker",
|
|
name="example_flow_time_picker",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a time picker as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Time Selector",
|
|
"time",
|
|
type=str,
|
|
placeholder="Select the time",
|
|
description="The time you choose.",
|
|
ui=ui.UITimePicker(
|
|
attr=ui.UITimePicker.UIAttribute(
|
|
format="HH:mm:ss", hour_step=2, minute_step=10, second_step=10
|
|
),
|
|
),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Time",
|
|
"time",
|
|
str,
|
|
description="User's selected time.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, time: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.time = time
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the time."""
|
|
return "Your name is %s, and you choose the time %s." % (user_name, self.time)
|
|
|
|
|
|
class ExampleFlowTreeSelectOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a tree select as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Tree Select",
|
|
name="example_flow_tree_select",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a tree select as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Address Selector",
|
|
"address",
|
|
type=str,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the address",
|
|
description="The address of the location.",
|
|
options=[
|
|
OptionValue(
|
|
label="Zhejiang",
|
|
name="zhejiang",
|
|
value="zhejiang",
|
|
children=[
|
|
OptionValue(
|
|
label="Hangzhou",
|
|
name="hangzhou",
|
|
value="hangzhou",
|
|
children=[
|
|
OptionValue(
|
|
label="Xihu",
|
|
name="xihu",
|
|
value="xihu",
|
|
),
|
|
OptionValue(
|
|
label="Feilaifeng",
|
|
name="feilaifeng",
|
|
value="feilaifeng",
|
|
),
|
|
],
|
|
),
|
|
],
|
|
),
|
|
OptionValue(
|
|
label="Jiangsu",
|
|
name="jiangsu",
|
|
value="jiangsu",
|
|
children=[
|
|
OptionValue(
|
|
label="Nanjing",
|
|
name="nanjing",
|
|
value="nanjing",
|
|
children=[
|
|
OptionValue(
|
|
label="Zhonghua Gate",
|
|
name="zhonghuamen",
|
|
value="zhonghuamen",
|
|
),
|
|
OptionValue(
|
|
label="Zhongshanling",
|
|
name="zhongshanling",
|
|
value="zhongshanling",
|
|
),
|
|
],
|
|
),
|
|
],
|
|
),
|
|
],
|
|
ui=ui.UITreeSelect(attr=ui.UITreeSelect.UIAttribute(show_search=True)),
|
|
)
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Address",
|
|
"address",
|
|
str,
|
|
description="User's address.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __int__(self, address: Optional[List[str]] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.address = address or []
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the address."""
|
|
full_address_str = " ".join(self.address)
|
|
return "Your name is %s, and your address is %s." % (
|
|
user_name,
|
|
full_address_str,
|
|
)
|
|
|
|
|
|
def get_recent_3_times(time_interval: int = 1) -> List[OptionValue]:
|
|
"""Get the recent times."""
|
|
from datetime import datetime, timedelta
|
|
|
|
now = datetime.now()
|
|
recent_times = [now - timedelta(hours=time_interval * i) for i in range(3)]
|
|
formatted_times = [time.strftime("%Y-%m-%d %H:%M:%S") for time in recent_times]
|
|
option_values = [
|
|
OptionValue(label=formatted_time, name=f"time_{i + 1}", value=formatted_time)
|
|
for i, formatted_time in enumerate(formatted_times)
|
|
]
|
|
|
|
return option_values
|
|
|
|
|
|
class ExampleFlowRefreshOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a refresh option."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Refresh Operator",
|
|
name="example_refresh_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a refresh option.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Time Interval",
|
|
"time_interval",
|
|
type=int,
|
|
optional=True,
|
|
default=1,
|
|
placeholder="Set the time interval",
|
|
description="The time interval to fetch the times",
|
|
),
|
|
Parameter.build_from(
|
|
"Recent Time",
|
|
"recent_time",
|
|
type=str,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the recent time",
|
|
description="The recent time to choose.",
|
|
options=FunctionDynamicOptions(func=get_recent_3_times),
|
|
ui=ui.UISelect(
|
|
refresh=True,
|
|
refresh_depends=["time_interval"],
|
|
attr=ui.UISelect.UIAttribute(show_search=True),
|
|
),
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Time",
|
|
"time",
|
|
str,
|
|
description="User's selected time.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(
|
|
self, time_interval: int = 1, recent_time: Optional[str] = None, **kwargs
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.time_interval = time_interval
|
|
self.recent_time = recent_time
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the time."""
|
|
return "Your name is %s, and you choose the time %s." % (
|
|
user_name,
|
|
self.recent_time,
|
|
)
|
|
|
|
|
|
class ExampleFlowUploadOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes an upload as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Upload",
|
|
name="example_flow_upload",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a upload as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Single File Selector",
|
|
"file",
|
|
type=str,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the file",
|
|
description="The file you want to upload.",
|
|
ui=ui.UIUpload(
|
|
max_file_size=1024 * 1024 * 100,
|
|
up_event="after_select",
|
|
attr=ui.UIUpload.UIAttribute(max_count=1),
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Multiple Files Selector",
|
|
"multiple_files",
|
|
type=str,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the multiple files",
|
|
description="The multiple files you want to upload.",
|
|
ui=ui.UIUpload(
|
|
max_file_size=1024 * 1024 * 100,
|
|
up_event="button_click",
|
|
attr=ui.UIUpload.UIAttribute(max_count=5),
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"CSV File Selector",
|
|
"csv_file",
|
|
type=str,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the CSV file",
|
|
description="The CSV file you want to upload.",
|
|
ui=ui.UIUpload(
|
|
max_file_size=1024 * 1024 * 100,
|
|
up_event="after_select",
|
|
file_types=[".csv"],
|
|
attr=ui.UIUpload.UIAttribute(max_count=1),
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Images Selector",
|
|
"images",
|
|
type=str,
|
|
is_list=True,
|
|
optional=True,
|
|
default=None,
|
|
placeholder="Select the images",
|
|
description="The images you want to upload.",
|
|
ui=ui.UIUpload(
|
|
max_file_size=1024 * 1024 * 100,
|
|
up_event="button_click",
|
|
file_types=["image/*", ".pdf"],
|
|
drag=True,
|
|
attr=ui.UIUpload.UIAttribute(max_count=5),
|
|
),
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"File",
|
|
"file",
|
|
str,
|
|
description="User's uploaded file.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
file: Optional[str] = None,
|
|
multiple_files: Optional[List[str]] = None,
|
|
csv_file: Optional[str] = None,
|
|
images: Optional[List[str]] = None,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.file = file
|
|
self.multiple_files = multiple_files or []
|
|
self.csv_file = csv_file
|
|
self.images = images or []
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the file."""
|
|
|
|
fsc = FileStorageClient.get_instance(self.system_app)
|
|
files_metadata = await self.blocking_func_to_async(
|
|
self._parse_files_metadata, fsc
|
|
)
|
|
files_metadata_str = json.dumps(files_metadata, ensure_ascii=False, indent=4)
|
|
return "Your name is %s, and you files are %s." % (
|
|
user_name,
|
|
files_metadata_str,
|
|
)
|
|
|
|
def _parse_files_metadata(self, fsc: FileStorageClient) -> List[Dict[str, Any]]:
|
|
"""Parse the files metadata."""
|
|
if not self.file:
|
|
raise ValueError("The file is not uploaded.")
|
|
if not self.multiple_files:
|
|
raise ValueError("The multiple files are not uploaded.")
|
|
files = [self.file] + self.multiple_files + [self.csv_file] + self.images
|
|
results = []
|
|
for file in files:
|
|
_, metadata = fsc.get_file(file)
|
|
results.append(
|
|
{
|
|
"bucket": metadata.bucket,
|
|
"file_id": metadata.file_id,
|
|
"file_size": metadata.file_size,
|
|
"storage_type": metadata.storage_type,
|
|
"uri": metadata.uri,
|
|
"file_hash": metadata.file_hash,
|
|
}
|
|
)
|
|
return results
|
|
|
|
|
|
class ExampleFlowVariablesOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a variables option."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Variables Operator",
|
|
name="example_variables_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a variables option.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"OpenAI API Key",
|
|
"openai_api_key",
|
|
type=str,
|
|
placeholder="Please select the OpenAI API key",
|
|
description="The OpenAI API key to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIPasswordInput(
|
|
key="dbgpt.model.openai.api_key",
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Model",
|
|
"model",
|
|
type=str,
|
|
placeholder="Please select the model",
|
|
description="The model to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key="dbgpt.model.openai.model",
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin Flows",
|
|
"builtin_flow",
|
|
type=str,
|
|
placeholder="Please select the builtin flows",
|
|
description="The builtin flows to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_FLOWS,
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin Flow Nodes",
|
|
"builtin_flow_node",
|
|
type=str,
|
|
placeholder="Please select the builtin flow nodes",
|
|
description="The builtin flow nodes to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_FLOW_NODES,
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin Variables",
|
|
"builtin_variable",
|
|
type=str,
|
|
placeholder="Please select the builtin variables",
|
|
description="The builtin variables to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_VARIABLES,
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin Secrets",
|
|
"builtin_secret",
|
|
type=str,
|
|
placeholder="Please select the builtin secrets",
|
|
description="The builtin secrets to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_SECRETS,
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin LLMs",
|
|
"builtin_llm",
|
|
type=str,
|
|
placeholder="Please select the builtin LLMs",
|
|
description="The builtin LLMs to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_LLMS,
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Builtin Embeddings",
|
|
"builtin_embedding",
|
|
type=str,
|
|
placeholder="Please select the builtin embeddings",
|
|
description="The builtin embeddings to use.",
|
|
options=VariablesDynamicOptions(),
|
|
ui=ui.UIVariablesInput(
|
|
key=BUILTIN_VARIABLES_CORE_EMBEDDINGS,
|
|
),
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
),
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Model info",
|
|
"model",
|
|
str,
|
|
description="The model info.",
|
|
),
|
|
],
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
openai_api_key: str,
|
|
model: str,
|
|
builtin_flow: str,
|
|
builtin_flow_node: str,
|
|
builtin_variable: str,
|
|
builtin_secret: str,
|
|
builtin_llm: str,
|
|
builtin_embedding: str,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.openai_api_key = openai_api_key
|
|
self.model = model
|
|
self.builtin_flow = builtin_flow
|
|
self.builtin_flow_node = builtin_flow_node
|
|
self.builtin_variable = builtin_variable
|
|
self.builtin_secret = builtin_secret
|
|
self.builtin_llm = builtin_llm
|
|
self.builtin_embedding = builtin_embedding
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the model."""
|
|
dict_dict = {
|
|
"openai_api_key": self.openai_api_key,
|
|
"model": self.model,
|
|
"builtin_flow": self.builtin_flow,
|
|
"builtin_flow_node": self.builtin_flow_node,
|
|
"builtin_variable": self.builtin_variable,
|
|
"builtin_secret": self.builtin_secret,
|
|
"builtin_llm": self.builtin_llm,
|
|
"builtin_embedding": self.builtin_embedding,
|
|
}
|
|
json_data = json.dumps(dict_dict, ensure_ascii=False)
|
|
return "Your name is %s, and your model info is %s." % (user_name, json_data)
|
|
|
|
|
|
class ExampleFlowTagsOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a tags option."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Tags Operator",
|
|
name="example_tags_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a tags",
|
|
parameters=[],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
),
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Tags",
|
|
"tags",
|
|
str,
|
|
description="The tags to use.",
|
|
),
|
|
],
|
|
tags={"order": "higher-order", "type": "example"},
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the tags."""
|
|
return "Your name is %s, and your tags are %s." % (user_name, "higher-order")
|
|
|
|
|
|
class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes a code editor as parameter."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Flow Code Editor",
|
|
name="example_flow_code_editor",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes a code editor as parameter.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Code Editor",
|
|
"code",
|
|
type=str,
|
|
placeholder="Please input your code",
|
|
description="The code you want to edit.",
|
|
ui=ui.UICodeEditor(
|
|
language="python",
|
|
),
|
|
),
|
|
Parameter.build_from(
|
|
"Language",
|
|
"lang",
|
|
type=str,
|
|
optional=True,
|
|
default="python",
|
|
placeholder="Please select the language",
|
|
description="The language of the code.",
|
|
options=[
|
|
OptionValue(label="Python", name="python", value="python"),
|
|
OptionValue(
|
|
label="JavaScript", name="javascript", value="javascript"
|
|
),
|
|
],
|
|
ui=ui.UISelect(),
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Code",
|
|
"code",
|
|
str,
|
|
description="Result of the code.",
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(self, code: str, lang: str = "python", **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.code = code
|
|
self.lang = lang
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the code."""
|
|
|
|
code = self.code
|
|
exit_code = -1
|
|
try:
|
|
exit_code, logs = await self.execute_code_blocks(code, self.lang)
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute code: {e}")
|
|
logs = f"Failed to execute code: {e}"
|
|
return (
|
|
f"Your name is {user_name}, and your code is \n\n```python\n{code}"
|
|
f"\n\n```\n\nThe execution result is \n\n```\n{logs}\n\n```\n\n"
|
|
f"Exit code: {exit_code}."
|
|
)
|
|
|
|
async def execute_code_blocks(self, code_blocks: str, lang: str):
|
|
"""Execute the code blocks and return the result."""
|
|
from dbgpt.util.code.server import CodeResult, get_code_server
|
|
|
|
code_server = await get_code_server(self.system_app)
|
|
result: CodeResult = await code_server.exec(code_blocks, lang)
|
|
return result.exit_code, result.logs
|
|
|
|
|
|
class ExampleFlowDynamicParametersOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes dynamic parameters."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Dynamic Parameters Operator",
|
|
name="example_dynamic_parameters_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes dynamic parameters.",
|
|
parameters=[
|
|
Parameter.build_from(
|
|
"Dynamic String",
|
|
"dynamic_1",
|
|
type=str,
|
|
is_list=True,
|
|
placeholder="Please input the dynamic parameter",
|
|
description="The dynamic parameter you want to use, you can add more, "
|
|
"at least 1 parameter.",
|
|
dynamic=True,
|
|
dynamic_minimum=1,
|
|
ui=ui.UIInput(),
|
|
),
|
|
Parameter.build_from(
|
|
"Dynamic Integer",
|
|
"dynamic_2",
|
|
type=int,
|
|
is_list=True,
|
|
placeholder="Please input the dynamic parameter",
|
|
description="The dynamic parameter you want to use, you can add more, "
|
|
"at least 0 parameter.",
|
|
dynamic=True,
|
|
dynamic_minimum=0,
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
),
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Dynamic",
|
|
"dynamic",
|
|
str,
|
|
description="User's selected dynamic.",
|
|
),
|
|
],
|
|
)
|
|
|
|
def __init__(self, dynamic_1: List[str], dynamic_2: List[int], **kwargs):
|
|
super().__init__(**kwargs)
|
|
if not dynamic_1:
|
|
raise ValueError("The dynamic string is empty.")
|
|
self.dynamic_1 = dynamic_1
|
|
self.dynamic_2 = dynamic_2
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the dynamic."""
|
|
return "Your name is %s, and your dynamic is %s." % (
|
|
user_name,
|
|
f"dynamic_1: {self.dynamic_1}, dynamic_2: {self.dynamic_2}",
|
|
)
|
|
|
|
|
|
class ExampleFlowDynamicOutputsOperator(MapOperator[str, str]):
|
|
"""An example flow operator that includes dynamic outputs."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Dynamic Outputs Operator",
|
|
name="example_dynamic_outputs_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes dynamic outputs.",
|
|
parameters=[],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
),
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Dynamic",
|
|
"dynamic",
|
|
str,
|
|
description="User's selected dynamic.",
|
|
dynamic=True,
|
|
dynamic_minimum=1,
|
|
),
|
|
],
|
|
)
|
|
|
|
async def map(self, user_name: str) -> str:
|
|
"""Map the user name to the dynamic."""
|
|
return "Your name is %s, this operator has dynamic outputs." % user_name
|
|
|
|
|
|
class ExampleFlowDynamicInputsOperator(JoinOperator[str]):
|
|
"""An example flow operator that includes dynamic inputs."""
|
|
|
|
metadata = ViewMetadata(
|
|
label="Example Dynamic Inputs Operator",
|
|
name="example_dynamic_inputs_operator",
|
|
category=OperatorCategory.EXAMPLE,
|
|
description="An example flow operator that includes dynamic inputs.",
|
|
parameters=[],
|
|
inputs=[
|
|
IOField.build_from(
|
|
"User Name",
|
|
"user_name",
|
|
str,
|
|
description="The name of the user.",
|
|
),
|
|
IOField.build_from(
|
|
"Other Inputs",
|
|
"other_inputs",
|
|
str,
|
|
description="Other inputs.",
|
|
dynamic=True,
|
|
dynamic_minimum=0,
|
|
),
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
"Dynamic",
|
|
"dynamic",
|
|
str,
|
|
description="User's selected dynamic.",
|
|
),
|
|
],
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(combine_function=self.join, **kwargs)
|
|
|
|
async def join(self, user_name: str, *other_inputs: str) -> str:
|
|
"""Map the user name to the dynamic."""
|
|
if not other_inputs:
|
|
dyn_inputs = ["You have no other inputs."]
|
|
else:
|
|
dyn_inputs = [
|
|
f"Input {i}: {input_data}" for i, input_data in enumerate(other_inputs)
|
|
]
|
|
dyn_str = "\n".join(dyn_inputs)
|
|
return "Your name is %s, and your dynamic is %s." % (
|
|
user_name,
|
|
f"other_inputs:\n{dyn_str}",
|
|
)
|