feat(AWEL):Add operator for structured output data of datasource in AWEL (#2794)

This commit is contained in:
geebytes 2025-06-25 21:14:59 +08:00 committed by GitHub
parent 2ed145c3aa
commit 87ae219a79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 64 additions and 0 deletions

View File

@ -396,3 +396,49 @@ class HODatasourceDashboardOperator(GPTVisMixin, MapOperator[dict, str]):
view = await vis.display(charts=chart_params) view = await vis.display(charts=chart_params)
await self.save_view_message(self.current_dag_context, view) await self.save_view_message(self.current_dag_context, view)
return view return view
class HODatasourceStructedOperator(MapOperator[dict, str]):
"""Execute the context from the datasource."""
metadata = ViewMetadata(
label=_("Datasource Structed Operator"),
name="higher_order_datasource_structed_operator",
description=_(
"Execute the context from the datasource and output structed data."
),
category=OperatorCategory.DATABASE,
parameters=[_PARAMETER_DATASOURCE.new()],
inputs=[_INPUTS_SQL_DICT_LIST.new()],
outputs=[IOField.build_from(_("Dictionary"), "dict", dict)],
tags={"order": TAGS_ORDER_HIGH},
)
def __init__(self, datasource: DBResource, **kwargs):
"""Initialize the operator."""
MapOperator.__init__(self, **kwargs)
self._datasource = datasource
async def map(self, sql_dict_list: List[dict]) -> dict:
"""Execute the context from the datasource."""
if not isinstance(sql_dict_list, list):
raise ValueError(
"The input value of datasource executor should be a list of "
"dictionaries."
)
chart_params = []
for chart_item in sql_dict_list:
chart_dict = {k: v for k, v in chart_item.items()}
sql = chart_item.get("sql")
try:
data_df = await self._datasource.query_to_df(sql)
# Here we use pandas.DataFrame.to_json()
# to avoid JSON serialization failures caused by data type issues
# (e.g., pandas.Timestamp) in subsequent operations
chart_dict["data"] = json.loads(data_df.to_json(orient="records"))
except Exception as e:
logger.warning(f"Sql execute failed{str(e)}")
chart_dict["err_msg"] = str(e)
chart_params.append(chart_dict)
return {"data": chart_params}

View File

@ -2841,6 +2841,24 @@
"sql_dict" "sql_dict"
], ],
"version": "v1" "version": "v1"
},
{
"type": "operator",
"type_cls": "dbgpt_app.operators.datasource.HODatasourceStructedOperator",
"type_name": "HODatasourceStructedOperator",
"name": "higher_order_datasource_structed_operator",
"id": "operator_higher_order_datasource_structed_operator___$$___database___$$___v1",
"category": "database",
"parameters": [
"datasource"
],
"outputs": [
"sql_result"
],
"inputs": [
"sql_dict_list"
],
"version": "v1"
}, },
{ {
"type": "operator", "type": "operator",