feat:Add Knowledge Process Workflow (#2210)

This commit is contained in:
Aries-ckt
2024-12-18 11:16:30 +08:00
committed by GitHub
parent 3745d7411d
commit b05febbf77
23 changed files with 7217 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
"""Module for RAG operators."""
from .chunk_manager import ChunkManagerOperator # noqa: F401
from .datasource import DatasourceRetrieverOperator # noqa: F401
from .db_schema import ( # noqa: F401
DBSchemaAssemblerOperator,
@@ -10,21 +10,32 @@ from .embedding import ( # noqa: F401
EmbeddingRetrieverOperator,
)
from .evaluation import RetrieverEvaluatorOperator # noqa: F401
from .full_text import FullTextStorageOperator # noqa: F401
from .knowledge import ChunksToStringOperator, KnowledgeOperator # noqa: F401
from .knowledge_graph import KnowledgeGraphOperator # noqa: F401
from .process_branch import KnowledgeProcessBranchOperator # noqa: F401
from .process_branch import KnowledgeProcessJoinOperator
from .rerank import RerankOperator # noqa: F401
from .rewrite import QueryRewriteOperator # noqa: F401
from .summary import SummaryAssemblerOperator # noqa: F401
from .vector_store import VectorStorageOperator # noqa: F401
__all__ = [
"ChunkManagerOperator",
"DatasourceRetrieverOperator",
"DBSchemaRetrieverOperator",
"DBSchemaAssemblerOperator",
"EmbeddingRetrieverOperator",
"EmbeddingAssemblerOperator",
"FullTextStorageOperator",
"KnowledgeOperator",
"KnowledgeGraphOperator",
"KnowledgeProcessBranchOperator",
"KnowledgeProcessJoinOperator",
"ChunksToStringOperator",
"RerankOperator",
"QueryRewriteOperator",
"SummaryAssemblerOperator",
"RetrieverEvaluatorOperator",
"VectorStorageOperator",
]

View File

@@ -0,0 +1,68 @@
"""Chunk Manager Operator."""
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.rag import ChunkParameters
from dbgpt.rag.chunk_manager import ChunkManager
from dbgpt.rag.knowledge.base import Knowledge
from dbgpt.util.i18n_utils import _
class ChunkManagerOperator(MapOperator[Knowledge, List[Chunk]]):
"""Chunk Manager Operator."""
metadata = ViewMetadata(
label=_("Chunk Manager Operator"),
name="chunk_manager_operator",
description=_(" Split Knowledge Documents into chunks."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Chunk Split Parameters"),
"chunk_parameters",
ChunkParameters,
description=_("Chunk Split Parameters."),
optional=True,
default=None,
alias=["chunk_parameters"],
),
],
inputs=[
IOField.build_from(
_("Knowledge"),
"knowledge",
Knowledge,
description=_("The knowledge to be loaded."),
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The split chunks by chunk manager."),
is_list=True,
)
],
)
def __init__(
self,
chunk_parameters: Optional[ChunkParameters] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._chunk_parameters = chunk_parameters or ChunkParameters(
chunk_strategy="Automatic"
)
async def map(self, knowledge: Knowledge) -> List[Chunk]:
"""Persist chunks in vector db."""
documents = knowledge.load()
chunk_manager = ChunkManager(
knowledge=knowledge, chunk_parameter=self._chunk_parameters
)
return chunk_manager.split(documents)

View File

@@ -0,0 +1,74 @@
"""Full Text Operator."""
import os
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.full_text.base import FullTextStoreBase
from dbgpt.util.i18n_utils import _
class FullTextStorageOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Full Text Operator."""
metadata = ViewMetadata(
label=_("Full Text Storage Operator"),
name="full text_storage_operator",
description=_("Persist embeddings into full text storage."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Full Text Connector"),
"full_text_store",
FullTextStoreBase,
description=_("The full text store."),
alias=["full_text_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to full text " "store."
),
is_list=True,
)
],
)
def __init__(
self,
full_text_store: Optional[FullTextStoreBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._full_text_store = full_text_store
self._embeddings = full_text_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.full_text_store = full_text_store
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in full text db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
full_text_ids = await self._full_text_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, full_text_id in zip(chunks, full_text_ids):
chunk.chunk_id = str(full_text_id)
return chunks

View File

@@ -0,0 +1,74 @@
"""Knowledge Graph Operator."""
import os
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.knowledge_graph.base import KnowledgeGraphBase
from dbgpt.util.i18n_utils import _
class KnowledgeGraphOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Knowledge Graph Operator."""
metadata = ViewMetadata(
label=_("Knowledge Graph Operator"),
name="knowledge_graph_operator",
description=_("Extract Documents and persist into graph database."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Knowledge Graph Connector"),
"graph_store",
KnowledgeGraphBase,
description=_("The knowledge graph."),
alias=["graph_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to graph store."
),
is_list=True,
)
],
)
def __init__(
self,
graph_store: Optional[KnowledgeGraphBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the Knowledge Graph operator."""
MapOperator.__init__(self, **kwargs)
self._graph_store = graph_store
self._embeddings = graph_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.graph_store = graph_store
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in graph db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
graph_ids = await self._graph_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, graph_id in zip(chunks, graph_ids):
chunk.chunk_id = str(graph_id)
return chunks

View File

@@ -0,0 +1,193 @@
"""Knowledge Process Branch Operator."""
from typing import Dict, List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import (
BranchFunc,
BranchOperator,
BranchTaskType,
JoinOperator,
logger,
)
from dbgpt.core.awel.flow import IOField, OperatorCategory, OperatorType, ViewMetadata
from dbgpt.rag.knowledge.base import Knowledge
from dbgpt.util.i18n_utils import _
class KnowledgeProcessBranchOperator(BranchOperator[Knowledge, Knowledge]):
"""Knowledge Process branch operator.
This operator will branch the workflow based on
the stream flag of the request.
"""
metadata = ViewMetadata(
label=_("Knowledge Process Branch Operator"),
name="knowledge_process_operator",
category=OperatorCategory.RAG,
operator_type=OperatorType.BRANCH,
description=_("Branch the workflow based on the stream flag of the request."),
parameters=[],
inputs=[
IOField.build_from(
_("Document Chunks"),
"input_value",
List[Chunk],
description=_("The input value of the operator."),
is_list=True,
),
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("Chunks for Vector Storage Connector."),
is_list=True,
),
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("Chunks for Knowledge Graph Connector."),
is_list=True,
),
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("Chunks for Full Text Connector."),
is_list=True,
),
],
)
def __init__(
self,
graph_task_name: Optional[str] = None,
embedding_task_name: Optional[str] = None,
**kwargs,
):
"""Create the intent detection branch operator."""
super().__init__(**kwargs)
self._graph_task_name = graph_task_name
self._embedding_task_name = embedding_task_name
self._full_text_task_name = embedding_task_name
async def branches(
self,
) -> Dict[BranchFunc[Knowledge], BranchTaskType]:
"""Branch the intent detection result to different tasks."""
download_cls_list = set(task.__class__ for task in self.downstream) # noqa
branch_func_map = {}
async def check_graph_process(r: Knowledge) -> bool:
# If check graph is true, we will run extract knowledge graph triplets.
from dbgpt.rag.operators import KnowledgeGraphOperator
if KnowledgeGraphOperator in download_cls_list:
return True
return False
async def check_embedding_process(r: Knowledge) -> bool:
# If check embedding is true, we will run extract document embedding.
from dbgpt.rag.operators import VectorStorageOperator
if VectorStorageOperator in download_cls_list:
return True
return False
async def check_full_text_process(r: Knowledge) -> bool:
# If check full text is true, we will run extract document keywords.
from dbgpt.rag.operators.full_text import FullTextStorageOperator
if FullTextStorageOperator in download_cls_list:
return True
return False
branch_func_map[check_graph_process] = self._graph_task_name
branch_func_map[check_embedding_process] = self._embedding_task_name
branch_func_map[check_full_text_process] = self._full_text_task_name
return branch_func_map # type: ignore
class KnowledgeProcessJoinOperator(JoinOperator[List[str]]):
"""Knowledge Process Join Operator."""
metadata = ViewMetadata(
label=_("Knowledge Process Join Operator"),
name="knowledge_process_join_operator",
category=OperatorCategory.RAG,
operator_type=OperatorType.JOIN,
description=_(
"Join Branch the workflow based on the Knowledge Process Results."
),
parameters=[],
inputs=[
IOField.build_from(
_("Vector Storage Results"),
"input_value",
List[Chunk],
description=_("vector storage results."),
is_list=True,
),
IOField.build_from(
_("Knowledge Graph Storage Results"),
"input_value",
List[Chunk],
description=_("knowledge graph storage results."),
is_list=True,
),
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("Knowledge Process Results."),
is_list=True,
),
],
)
def __init__(
self,
**kwargs,
):
"""Knowledge Process Join Operator."""
super().__init__(combine_function=self._join, **kwargs)
async def _join(
self,
vector_chunks: Optional[List[Chunk]] = None,
graph_chunks: Optional[List[Chunk]] = None,
full_text_chunks: Optional[List[Chunk]] = None,
) -> List[str]:
"""Join results.
Args:
vector_chunks: The list of vector chunks.
graph_chunks: The list of graph chunks.
full_text_chunks: The list of full text chunks.
"""
results = []
if vector_chunks:
result_msg = (
f"async persist vector store success {len(vector_chunks)} chunks."
)
logger.info(result_msg)
results.append(result_msg)
if graph_chunks:
result_msg = (
f"async persist graph store success {len(graph_chunks)} chunks."
)
logger.info(result_msg)
results.append(result_msg)
if full_text_chunks:
result_msg = (
f"async persist full text store success {len(full_text_chunks)} "
f"chunks."
)
logger.info(result_msg)
results.append(result_msg)
return results

View File

@@ -0,0 +1,74 @@
"""Vector Storage Operator."""
import os
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.vector_store.base import VectorStoreBase
from dbgpt.util.i18n_utils import _
class VectorStorageOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Vector Storage Operator."""
metadata = ViewMetadata(
label=_("Vector Storage Operator"),
name="vector_storage_operator",
description=_("Persist embeddings into vector storage."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Vector Store Connector"),
"vector_store",
VectorStoreBase,
description=_("The vector store."),
alias=["vector_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to vector " "store."
),
is_list=True,
)
],
)
def __init__(
self,
vector_store: Optional[VectorStoreBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._vector_store = vector_store
self._embeddings = vector_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.vector_store = vector_store
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in vector db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
vector_ids = await self._vector_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, vector_id in zip(chunks, vector_ids):
chunk.chunk_id = str(vector_id)
return chunks