DB-GPT/dbgpt/rag/operators/knowledge_graph.py
2024-12-18 11:16:30 +08:00

75 lines
2.4 KiB
Python

"""Knowledge Graph Operator."""
import os
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.knowledge_graph.base import KnowledgeGraphBase
from dbgpt.util.i18n_utils import _
class KnowledgeGraphOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Knowledge Graph Operator."""
metadata = ViewMetadata(
label=_("Knowledge Graph Operator"),
name="knowledge_graph_operator",
description=_("Extract Documents and persist into graph database."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Knowledge Graph Connector"),
"graph_store",
KnowledgeGraphBase,
description=_("The knowledge graph."),
alias=["graph_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to graph store."
),
is_list=True,
)
],
)
def __init__(
self,
graph_store: Optional[KnowledgeGraphBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the Knowledge Graph operator."""
MapOperator.__init__(self, **kwargs)
self._graph_store = graph_store
self._embeddings = graph_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.graph_store = graph_store
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in graph db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
graph_ids = await self._graph_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, graph_id in zip(chunks, graph_ids):
chunk.chunk_id = str(graph_id)
return chunks