diff --git a/packages/dbgpt-app/src/dbgpt_app/operators/datasource.py b/packages/dbgpt-app/src/dbgpt_app/operators/datasource.py index 0e3e856ab..702c981e7 100644 --- a/packages/dbgpt-app/src/dbgpt_app/operators/datasource.py +++ b/packages/dbgpt-app/src/dbgpt_app/operators/datasource.py @@ -396,3 +396,49 @@ class HODatasourceDashboardOperator(GPTVisMixin, MapOperator[dict, str]): view = await vis.display(charts=chart_params) await self.save_view_message(self.current_dag_context, view) return view + + +class HODatasourceStructedOperator(MapOperator[dict, str]): + """Execute the context from the datasource.""" + + metadata = ViewMetadata( + label=_("Datasource Structed Operator"), + name="higher_order_datasource_structed_operator", + description=_( + "Execute the context from the datasource and output structed data." + ), + category=OperatorCategory.DATABASE, + parameters=[_PARAMETER_DATASOURCE.new()], + inputs=[_INPUTS_SQL_DICT_LIST.new()], + outputs=[IOField.build_from(_("Dictionary"), "dict", dict)], + tags={"order": TAGS_ORDER_HIGH}, + ) + + def __init__(self, datasource: DBResource, **kwargs): + """Initialize the operator.""" + MapOperator.__init__(self, **kwargs) + self._datasource = datasource + + async def map(self, sql_dict_list: List[dict]) -> dict: + """Execute the context from the datasource.""" + + if not isinstance(sql_dict_list, list): + raise ValueError( + "The input value of datasource executor should be a list of " + "dictionaries." + ) + chart_params = [] + for chart_item in sql_dict_list: + chart_dict = {k: v for k, v in chart_item.items()} + sql = chart_item.get("sql") + try: + data_df = await self._datasource.query_to_df(sql) + # Here we use pandas.DataFrame.to_json() + # to avoid JSON serialization failures caused by data type issues + # (e.g., pandas.Timestamp) in subsequent operations + chart_dict["data"] = json.loads(data_df.to_json(orient="records")) + except Exception as e: + logger.warning(f"Sql execute failed!{str(e)}") + chart_dict["err_msg"] = str(e) + chart_params.append(chart_dict) + return {"data": chart_params} diff --git a/packages/dbgpt-serve/src/dbgpt_serve/flow/compat/0.7.0_compat_flow.json b/packages/dbgpt-serve/src/dbgpt_serve/flow/compat/0.7.0_compat_flow.json index 559d6f036..7228bbed1 100644 --- a/packages/dbgpt-serve/src/dbgpt_serve/flow/compat/0.7.0_compat_flow.json +++ b/packages/dbgpt-serve/src/dbgpt_serve/flow/compat/0.7.0_compat_flow.json @@ -2841,6 +2841,24 @@ "sql_dict" ], "version": "v1" + }, + { + "type": "operator", + "type_cls": "dbgpt_app.operators.datasource.HODatasourceStructedOperator", + "type_name": "HODatasourceStructedOperator", + "name": "higher_order_datasource_structed_operator", + "id": "operator_higher_order_datasource_structed_operator___$$___database___$$___v1", + "category": "database", + "parameters": [ + "datasource" + ], + "outputs": [ + "sql_result" + ], + "inputs": [ + "sql_dict_list" + ], + "version": "v1" }, { "type": "operator",