DB-GPT/dbgpt/rag/operators/full_text.py
2024-12-18 11:16:30 +08:00

75 lines
2.5 KiB
Python

"""Full Text Operator."""
import os
from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.full_text.base import FullTextStoreBase
from dbgpt.util.i18n_utils import _
class FullTextStorageOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Full Text Operator."""
metadata = ViewMetadata(
label=_("Full Text Storage Operator"),
name="full text_storage_operator",
description=_("Persist embeddings into full text storage."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Full Text Connector"),
"full_text_store",
FullTextStoreBase,
description=_("The full text store."),
alias=["full_text_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to full text " "store."
),
is_list=True,
)
],
)
def __init__(
self,
full_text_store: Optional[FullTextStoreBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._full_text_store = full_text_store
self._embeddings = full_text_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.full_text_store = full_text_store
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in full text db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
full_text_ids = await self._full_text_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, full_text_id in zip(chunks, full_text_ids):
chunk.chunk_id = str(full_text_id)
return chunks