mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-03 01:12:15 +00:00
75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
"""Full Text Operator."""
|
|
import os
|
|
from typing import List, Optional
|
|
|
|
from dbgpt.core import Chunk
|
|
from dbgpt.core.awel import MapOperator
|
|
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
|
|
from dbgpt.storage.full_text.base import FullTextStoreBase
|
|
from dbgpt.util.i18n_utils import _
|
|
|
|
|
|
class FullTextStorageOperator(MapOperator[List[Chunk], List[Chunk]]):
|
|
"""Full Text Operator."""
|
|
|
|
metadata = ViewMetadata(
|
|
label=_("Full Text Storage Operator"),
|
|
name="full text_storage_operator",
|
|
description=_("Persist embeddings into full text storage."),
|
|
category=OperatorCategory.RAG,
|
|
parameters=[
|
|
Parameter.build_from(
|
|
_("Full Text Connector"),
|
|
"full_text_store",
|
|
FullTextStoreBase,
|
|
description=_("The full text store."),
|
|
alias=["full_text_store"],
|
|
),
|
|
],
|
|
inputs=[
|
|
IOField.build_from(
|
|
_("Chunks"),
|
|
"chunks",
|
|
List[Chunk],
|
|
description=_("The text split chunks by chunk manager."),
|
|
is_list=True,
|
|
)
|
|
],
|
|
outputs=[
|
|
IOField.build_from(
|
|
_("Chunks"),
|
|
"chunks",
|
|
List[Chunk],
|
|
description=_(
|
|
"The assembled chunks, it has been persisted to full text " "store."
|
|
),
|
|
is_list=True,
|
|
)
|
|
],
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
full_text_store: Optional[FullTextStoreBase] = None,
|
|
max_chunks_once_load: Optional[int] = None,
|
|
**kwargs,
|
|
):
|
|
"""Init the datasource operator."""
|
|
MapOperator.__init__(self, **kwargs)
|
|
self._full_text_store = full_text_store
|
|
self._embeddings = full_text_store.get_config().embedding_fn
|
|
self._max_chunks_once_load = max_chunks_once_load
|
|
self.full_text_store = full_text_store
|
|
|
|
async def map(self, chunks: List[Chunk]) -> List[Chunk]:
|
|
"""Persist chunks in full text db."""
|
|
max_chunks_once_load = self._max_chunks_once_load or int(
|
|
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
|
|
)
|
|
full_text_ids = await self._full_text_store.aload_document_with_limit(
|
|
chunks, max_chunks_once_load
|
|
)
|
|
for chunk, full_text_id in zip(chunks, full_text_ids):
|
|
chunk.chunk_id = str(full_text_id)
|
|
return chunks
|