DB-GPT/dbgpt/rag/operators/chunk_manager.py
2024-12-18 11:16:30 +08:00

69 lines
2.1 KiB
Python

"""Chunk Manager Operator."""
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.rag import ChunkParameters
from dbgpt.rag.chunk_manager import ChunkManager
from dbgpt.rag.knowledge.base import Knowledge
from dbgpt.util.i18n_utils import _
class ChunkManagerOperator(MapOperator[Knowledge, List[Chunk]]):
"""Chunk Manager Operator."""
metadata = ViewMetadata(
label=_("Chunk Manager Operator"),
name="chunk_manager_operator",
description=_(" Split Knowledge Documents into chunks."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Chunk Split Parameters"),
"chunk_parameters",
ChunkParameters,
description=_("Chunk Split Parameters."),
optional=True,
default=None,
alias=["chunk_parameters"],
),
],
inputs=[
IOField.build_from(
_("Knowledge"),
"knowledge",
Knowledge,
description=_("The knowledge to be loaded."),
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The split chunks by chunk manager."),
is_list=True,
)
],
)
def __init__(
self,
chunk_parameters: Optional[ChunkParameters] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._chunk_parameters = chunk_parameters or ChunkParameters(
chunk_strategy="Automatic"
)
async def map(self, knowledge: Knowledge) -> List[Chunk]:
"""Persist chunks in vector db."""
documents = knowledge.load()
chunk_manager = ChunkManager(
knowledge=knowledge, chunk_parameter=self._chunk_parameters
)
return chunk_manager.split(documents)