mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-25 04:53:36 +00:00
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com> Co-authored-by: aries_ckt <916701291@qq.com> Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com> Co-authored-by: csunny <cfqsunny@163.com>
606 lines
22 KiB
Python
606 lines
22 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import List, Optional, cast
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from dbgpt._private.config import Config
|
|
from dbgpt.app.knowledge.chunk_db import DocumentChunkDao, DocumentChunkEntity
|
|
from dbgpt.app.knowledge.document_db import (
|
|
KnowledgeDocumentDao,
|
|
KnowledgeDocumentEntity,
|
|
)
|
|
from dbgpt.app.knowledge.request.request import BusinessFieldType, KnowledgeSpaceRequest
|
|
from dbgpt.component import ComponentType, SystemApp
|
|
from dbgpt.configs import TAG_KEY_KNOWLEDGE_FACTORY_DOMAIN_TYPE
|
|
from dbgpt.configs.model_config import (
|
|
EMBEDDING_MODEL_CONFIG,
|
|
KNOWLEDGE_UPLOAD_ROOT_PATH,
|
|
)
|
|
from dbgpt.core import LLMClient
|
|
from dbgpt.model import DefaultLLMClient
|
|
from dbgpt.model.cluster import WorkerManagerFactory
|
|
from dbgpt.rag.assembler import EmbeddingAssembler
|
|
from dbgpt.rag.chunk_manager import ChunkParameters
|
|
from dbgpt.rag.embedding import EmbeddingFactory
|
|
from dbgpt.rag.knowledge import ChunkStrategy, KnowledgeFactory, KnowledgeType
|
|
from dbgpt.serve.core import BaseService
|
|
from dbgpt.serve.rag.connector import VectorStoreConnector
|
|
from dbgpt.storage.metadata import BaseDao
|
|
from dbgpt.storage.metadata._base_dao import QUERY_SPEC
|
|
from dbgpt.storage.vector_store.base import VectorStoreConfig
|
|
from dbgpt.util.pagination_utils import PaginationResult
|
|
from dbgpt.util.string_utils import remove_trailing_punctuation
|
|
from dbgpt.util.tracer import root_tracer, trace
|
|
|
|
from ..api.schemas import (
|
|
ChunkServeRequest,
|
|
DocumentServeRequest,
|
|
DocumentServeResponse,
|
|
DocumentVO,
|
|
KnowledgeSyncRequest,
|
|
SpaceServeRequest,
|
|
SpaceServeResponse,
|
|
)
|
|
from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig
|
|
from ..models.models import KnowledgeSpaceDao, KnowledgeSpaceEntity
|
|
|
|
logger = logging.getLogger(__name__)
|
|
CFG = Config()
|
|
|
|
|
|
class SyncStatus(Enum):
|
|
TODO = "TODO"
|
|
FAILED = "FAILED"
|
|
RUNNING = "RUNNING"
|
|
FINISHED = "FINISHED"
|
|
|
|
|
|
class Service(BaseService[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeResponse]):
|
|
"""The service class for Flow"""
|
|
|
|
name = SERVE_SERVICE_COMPONENT_NAME
|
|
|
|
def __init__(
|
|
self,
|
|
system_app: SystemApp,
|
|
dao: Optional[KnowledgeSpaceDao] = None,
|
|
document_dao: Optional[KnowledgeDocumentDao] = None,
|
|
chunk_dao: Optional[DocumentChunkDao] = None,
|
|
):
|
|
self._system_app = system_app
|
|
self._dao: KnowledgeSpaceDao = dao
|
|
self._document_dao: KnowledgeDocumentDao = document_dao
|
|
self._chunk_dao: DocumentChunkDao = chunk_dao
|
|
|
|
super().__init__(system_app)
|
|
|
|
def init_app(self, system_app: SystemApp) -> None:
|
|
"""Initialize the service
|
|
|
|
Args:
|
|
system_app (SystemApp): The system app
|
|
"""
|
|
super().init_app(system_app)
|
|
self._serve_config = ServeConfig.from_app_config(
|
|
system_app.config, SERVE_CONFIG_KEY_PREFIX
|
|
)
|
|
self._dao = self._dao or KnowledgeSpaceDao()
|
|
self._document_dao = self._document_dao or KnowledgeDocumentDao()
|
|
self._chunk_dao = self._chunk_dao or DocumentChunkDao()
|
|
self._system_app = system_app
|
|
|
|
@property
|
|
def dao(
|
|
self,
|
|
) -> BaseDao[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeResponse]:
|
|
"""Returns the internal DAO."""
|
|
return self._dao
|
|
|
|
@property
|
|
def config(self) -> ServeConfig:
|
|
"""Returns the internal ServeConfig."""
|
|
return self._serve_config
|
|
|
|
@property
|
|
def llm_client(self) -> LLMClient:
|
|
worker_manager = self._system_app.get_component(
|
|
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
|
|
).create()
|
|
return DefaultLLMClient(worker_manager, True)
|
|
|
|
def create_space(self, request: SpaceServeRequest) -> SpaceServeResponse:
|
|
"""Create a new Space entity
|
|
|
|
Args:
|
|
request (KnowledgeSpaceRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
query = {"name": request.name}
|
|
space = self.get(query)
|
|
if space is not None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"space name:{request.name} have already named",
|
|
)
|
|
return self._dao.create_knowledge_space(request)
|
|
|
|
def update_space(self, request: SpaceServeRequest) -> SpaceServeResponse:
|
|
"""Create a new Space entity
|
|
|
|
Args:
|
|
request (KnowledgeSpaceRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
spaces = self._dao.get_knowledge_space(
|
|
KnowledgeSpaceEntity(id=request.id, name=request.name)
|
|
)
|
|
if len(spaces) == 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"no space name named {request.name}",
|
|
)
|
|
update_obj = self._dao.update_knowledge_space(self._dao.from_request(request))
|
|
return update_obj
|
|
|
|
async def create_document(
|
|
self, request: DocumentServeRequest
|
|
) -> SpaceServeResponse:
|
|
"""Create a new document entity
|
|
|
|
Args:
|
|
request (KnowledgeSpaceRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
space = self.get({"id": request.space_id})
|
|
if space is None:
|
|
raise Exception(f"space id:{request.space_id} not found")
|
|
query = KnowledgeDocumentEntity(doc_name=request.doc_name, space=space.name)
|
|
documents = self._document_dao.get_knowledge_documents(query)
|
|
if len(documents) > 0:
|
|
raise Exception(f"document name:{request.doc_name} have already named")
|
|
if request.doc_file and request.doc_type == KnowledgeType.DOCUMENT.name:
|
|
doc_file = request.doc_file
|
|
if not os.path.exists(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name)):
|
|
os.makedirs(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name))
|
|
tmp_fd, tmp_path = tempfile.mkstemp(
|
|
dir=os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name)
|
|
)
|
|
with os.fdopen(tmp_fd, "wb") as tmp:
|
|
tmp.write(await request.doc_file.read())
|
|
shutil.move(
|
|
tmp_path,
|
|
os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name, doc_file.filename),
|
|
)
|
|
request.content = os.path.join(
|
|
KNOWLEDGE_UPLOAD_ROOT_PATH, space.name, doc_file.filename
|
|
)
|
|
document = KnowledgeDocumentEntity(
|
|
doc_name=request.doc_name,
|
|
doc_type=request.doc_type,
|
|
space=space.name,
|
|
chunk_size=0,
|
|
status=SyncStatus.TODO.name,
|
|
last_sync=datetime.now(),
|
|
content=request.content,
|
|
result="",
|
|
)
|
|
doc_id = self._document_dao.create_knowledge_document(document)
|
|
if doc_id is None:
|
|
raise Exception(f"create document failed, {request.doc_name}")
|
|
return doc_id
|
|
|
|
async def sync_document(self, requests: List[KnowledgeSyncRequest]) -> List:
|
|
"""Create a new document entity
|
|
|
|
Args:
|
|
request (KnowledgeSpaceRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
doc_ids = []
|
|
for sync_request in requests:
|
|
space_id = sync_request.space_id
|
|
docs = self._document_dao.documents_by_ids([sync_request.doc_id])
|
|
if len(docs) == 0:
|
|
raise Exception(
|
|
f"there are document called, doc_id: {sync_request.doc_id}"
|
|
)
|
|
doc = docs[0]
|
|
if (
|
|
doc.status == SyncStatus.RUNNING.name
|
|
or doc.status == SyncStatus.FINISHED.name
|
|
):
|
|
raise Exception(
|
|
f" doc:{doc.doc_name} status is {doc.status}, can not sync"
|
|
)
|
|
chunk_parameters = sync_request.chunk_parameters
|
|
if chunk_parameters.chunk_strategy != ChunkStrategy.CHUNK_BY_SIZE.name:
|
|
space_context = self.get_space_context(space_id)
|
|
chunk_parameters.chunk_size = (
|
|
CFG.KNOWLEDGE_CHUNK_SIZE
|
|
if space_context is None
|
|
else int(space_context["embedding"]["chunk_size"])
|
|
)
|
|
chunk_parameters.chunk_overlap = (
|
|
CFG.KNOWLEDGE_CHUNK_OVERLAP
|
|
if space_context is None
|
|
else int(space_context["embedding"]["chunk_overlap"])
|
|
)
|
|
await self._sync_knowledge_document(space_id, doc, chunk_parameters)
|
|
doc_ids.append(doc.id)
|
|
return doc_ids
|
|
|
|
def get(self, request: QUERY_SPEC) -> Optional[SpaceServeResponse]:
|
|
"""Get a Flow entity
|
|
|
|
Args:
|
|
request (SpaceServeRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
# TODO: implement your own logic here
|
|
# Build the query request from the request
|
|
query_request = request
|
|
return self._dao.get_one(query_request)
|
|
|
|
def get_document(self, request: QUERY_SPEC) -> Optional[SpaceServeResponse]:
|
|
"""Get a Flow entity
|
|
|
|
Args:
|
|
request (SpaceServeRequest): The request
|
|
|
|
Returns:
|
|
SpaceServeResponse: The response
|
|
"""
|
|
# TODO: implement your own logic here
|
|
# Build the query request from the request
|
|
query_request = request
|
|
return self._document_dao.get_one(query_request)
|
|
|
|
def delete(self, space_id: str) -> Optional[SpaceServeResponse]:
|
|
"""Delete a Flow entity
|
|
|
|
Args:
|
|
uid (str): The uid
|
|
|
|
Returns:
|
|
SpaceServeResponse: The data after deletion
|
|
"""
|
|
|
|
# TODO: implement your own logic here
|
|
# Build the query request from the request
|
|
query_request = {"id": space_id}
|
|
space = self.get(query_request)
|
|
if space is None:
|
|
raise HTTPException(status_code=400, detail=f"Space {space_id} not found")
|
|
config = VectorStoreConfig(
|
|
name=space.name, llm_client=self.llm_client, model_name=None
|
|
)
|
|
vector_store_connector = VectorStoreConnector(
|
|
vector_store_type=space.vector_type, vector_store_config=config
|
|
)
|
|
# delete vectors
|
|
vector_store_connector.delete_vector_name(space.name)
|
|
document_query = KnowledgeDocumentEntity(space=space.name)
|
|
# delete chunks
|
|
documents = self._document_dao.get_documents(document_query)
|
|
for document in documents:
|
|
self._chunk_dao.raw_delete(document.id)
|
|
# delete documents
|
|
self._document_dao.raw_delete(document_query)
|
|
# delete space
|
|
self._dao.delete(query_request)
|
|
return space
|
|
|
|
def update_document(self, request: DocumentServeRequest):
|
|
"""update knowledge document
|
|
|
|
Args:
|
|
- space_id: space id
|
|
- request: KnowledgeDocumentRequest
|
|
"""
|
|
if not request.id:
|
|
raise Exception("doc_id is required")
|
|
document = self._document_dao.get_one({"id": request.id})
|
|
entity = self._document_dao.from_response(document)
|
|
if request.doc_name:
|
|
entity.doc_name = request.doc_name
|
|
if len(request.questions) == 0:
|
|
request.questions = ""
|
|
questions = [
|
|
remove_trailing_punctuation(question) for question in request.questions
|
|
]
|
|
entity.questions = json.dumps(questions, ensure_ascii=False)
|
|
self._document_dao.update_knowledge_document(entity)
|
|
|
|
def delete_document(self, document_id: str) -> Optional[DocumentServeResponse]:
|
|
"""Delete a Flow entity
|
|
|
|
Args:
|
|
uid (str): The uid
|
|
|
|
Returns:
|
|
SpaceServeResponse: The data after deletion
|
|
"""
|
|
|
|
query_request = {"id": document_id}
|
|
docuemnt = self._document_dao.get_one(query_request)
|
|
if docuemnt is None:
|
|
raise Exception(f"there are no or more than one document {document_id}")
|
|
|
|
# get space by name
|
|
spaces = self._dao.get_knowledge_space(
|
|
KnowledgeSpaceEntity(name=docuemnt.space)
|
|
)
|
|
if len(spaces) != 1:
|
|
raise Exception(f"invalid space name: {docuemnt.space}")
|
|
space = spaces[0]
|
|
|
|
vector_ids = docuemnt.vector_ids
|
|
if vector_ids is not None:
|
|
config = VectorStoreConfig(
|
|
name=space.name, llm_client=self.llm_client, model_name=None
|
|
)
|
|
vector_store_connector = VectorStoreConnector(
|
|
vector_store_type=space.vector_type, vector_store_config=config
|
|
)
|
|
# delete vector by ids
|
|
vector_store_connector.delete_by_ids(vector_ids)
|
|
# delete chunks
|
|
self._chunk_dao.raw_delete(docuemnt.id)
|
|
# delete document
|
|
self._document_dao.raw_delete(docuemnt)
|
|
return docuemnt
|
|
|
|
def get_list(self, request: SpaceServeRequest) -> List[SpaceServeResponse]:
|
|
"""Get a list of Flow entities
|
|
|
|
Args:
|
|
request (SpaceServeRequest): The request
|
|
|
|
Returns:
|
|
List[SpaceServeResponse]: The response
|
|
"""
|
|
# TODO: implement your own logic here
|
|
# Build the query request from the request
|
|
query_request = request
|
|
return self.dao.get_list(query_request)
|
|
|
|
def get_list_by_page(
|
|
self, request: QUERY_SPEC, page: int, page_size: int
|
|
) -> PaginationResult[SpaceServeResponse]:
|
|
"""Get a list of Flow entities by page
|
|
|
|
Args:
|
|
request (SpaceServeRequest): The request
|
|
page (int): The page number
|
|
page_size (int): The page size
|
|
|
|
Returns:
|
|
List[SpaceServeResponse]: The response
|
|
"""
|
|
return self.dao.get_list_page(request, page, page_size)
|
|
|
|
def get_document_list(
|
|
self, request: QUERY_SPEC, page: int, page_size: int
|
|
) -> PaginationResult[DocumentServeResponse]:
|
|
"""Get a list of Flow entities by page
|
|
|
|
Args:
|
|
request (SpaceServeRequest): The request
|
|
page (int): The page number
|
|
page_size (int): The page size
|
|
|
|
Returns:
|
|
List[SpaceServeResponse]: The response
|
|
"""
|
|
return self._document_dao.get_list_page(request, page, page_size)
|
|
|
|
def get_chunk_list(self, request: QUERY_SPEC, page: int, page_size: int):
|
|
"""get document chunks
|
|
Args:
|
|
- request: QUERY_SPEC
|
|
"""
|
|
return self._chunk_dao.get_list_page(request, page, page_size)
|
|
|
|
def update_chunk(self, request: ChunkServeRequest):
|
|
"""update knowledge document chunk"""
|
|
if not request.id:
|
|
raise Exception("chunk_id is required")
|
|
chunk = self._chunk_dao.get_one({"id": request.id})
|
|
entity = self._chunk_dao.from_response(chunk)
|
|
if request.content:
|
|
entity.content = request.content
|
|
if request.questions:
|
|
questions = [
|
|
remove_trailing_punctuation(question) for question in request.questions
|
|
]
|
|
entity.questions = json.dumps(questions, ensure_ascii=False)
|
|
self._chunk_dao.update_chunk(entity)
|
|
|
|
async def _batch_document_sync(
|
|
self, space_id, sync_requests: List[KnowledgeSyncRequest]
|
|
) -> List[int]:
|
|
"""batch sync knowledge document chunk into vector store
|
|
Args:
|
|
- space: Knowledge Space Name
|
|
- sync_requests: List[KnowledgeSyncRequest]
|
|
Returns:
|
|
- List[int]: document ids
|
|
"""
|
|
doc_ids = []
|
|
for sync_request in sync_requests:
|
|
docs = self._document_dao.documents_by_ids([sync_request.doc_id])
|
|
if len(docs) == 0:
|
|
raise Exception(
|
|
f"there are document called, doc_id: {sync_request.doc_id}"
|
|
)
|
|
doc = docs[0]
|
|
if (
|
|
doc.status == SyncStatus.RUNNING.name
|
|
or doc.status == SyncStatus.FINISHED.name
|
|
):
|
|
raise Exception(
|
|
f" doc:{doc.doc_name} status is {doc.status}, can not sync"
|
|
)
|
|
chunk_parameters = sync_request.chunk_parameters
|
|
if chunk_parameters.chunk_strategy != ChunkStrategy.CHUNK_BY_SIZE.name:
|
|
space_context = self.get_space_context(space_id)
|
|
chunk_parameters.chunk_size = (
|
|
CFG.KNOWLEDGE_CHUNK_SIZE
|
|
if space_context is None
|
|
else int(space_context["embedding"]["chunk_size"])
|
|
)
|
|
chunk_parameters.chunk_overlap = (
|
|
CFG.KNOWLEDGE_CHUNK_OVERLAP
|
|
if space_context is None
|
|
else int(space_context["embedding"]["chunk_overlap"])
|
|
)
|
|
await self._sync_knowledge_document(space_id, doc, chunk_parameters)
|
|
doc_ids.append(doc.id)
|
|
return doc_ids
|
|
|
|
async def _sync_knowledge_document(
|
|
self,
|
|
space_id,
|
|
doc: KnowledgeDocumentEntity,
|
|
chunk_parameters: ChunkParameters,
|
|
) -> None:
|
|
"""sync knowledge document chunk into vector store"""
|
|
embedding_factory = CFG.SYSTEM_APP.get_component(
|
|
"embedding_factory", EmbeddingFactory
|
|
)
|
|
embedding_fn = embedding_factory.create(
|
|
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
|
|
)
|
|
from dbgpt.storage.vector_store.base import VectorStoreConfig
|
|
|
|
space = self.get({"id": space_id})
|
|
config = VectorStoreConfig(
|
|
name=space.name,
|
|
embedding_fn=embedding_fn,
|
|
max_chunks_once_load=CFG.KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD,
|
|
llm_client=self.llm_client,
|
|
model_name=None,
|
|
)
|
|
vector_store_connector = VectorStoreConnector(
|
|
vector_store_type=space.vector_type, vector_store_config=config
|
|
)
|
|
knowledge = None
|
|
if not space.domain_type or (
|
|
space.domain_type.lower() == BusinessFieldType.NORMAL.value.lower()
|
|
):
|
|
knowledge = KnowledgeFactory.create(
|
|
datasource=doc.content,
|
|
knowledge_type=KnowledgeType.get_by_value(doc.doc_type),
|
|
)
|
|
doc.status = SyncStatus.RUNNING.name
|
|
|
|
doc.gmt_modified = datetime.now()
|
|
self._document_dao.update_knowledge_document(doc)
|
|
asyncio.create_task(
|
|
self.async_doc_embedding(
|
|
knowledge, chunk_parameters, vector_store_connector, doc, space
|
|
)
|
|
)
|
|
logger.info(f"begin save document chunks, doc:{doc.doc_name}")
|
|
|
|
@trace("async_doc_embedding")
|
|
async def async_doc_embedding(
|
|
self, knowledge, chunk_parameters, vector_store_connector, doc, space
|
|
):
|
|
"""async document embedding into vector db
|
|
Args:
|
|
- knowledge: Knowledge
|
|
- chunk_parameters: ChunkParameters
|
|
- vector_store_connector: vector_store_connector
|
|
- doc: doc
|
|
"""
|
|
|
|
logger.info(f"async doc persist sync, doc:{doc.doc_name}")
|
|
try:
|
|
with root_tracer.start_span(
|
|
"app.knowledge.assembler.persist",
|
|
metadata={"doc": doc.doc_name},
|
|
):
|
|
from dbgpt.core.awel import BaseOperator
|
|
from dbgpt.serve.flow.service.service import Service as FlowService
|
|
|
|
dags = self.dag_manager.get_dags_by_tag(
|
|
TAG_KEY_KNOWLEDGE_FACTORY_DOMAIN_TYPE, space.domain_type
|
|
)
|
|
if dags and dags[0].leaf_nodes:
|
|
end_task = cast(BaseOperator, dags[0].leaf_nodes[0])
|
|
logger.info(
|
|
f"Found dag by tag key: {TAG_KEY_KNOWLEDGE_FACTORY_DOMAIN_TYPE}"
|
|
f" and value: {space.domain_type}, dag: {dags[0]}"
|
|
)
|
|
db_name, chunk_docs = await end_task.call(
|
|
{"file_path": doc.content, "space": doc.space}
|
|
)
|
|
doc.chunk_size = len(chunk_docs)
|
|
vector_ids = [chunk.chunk_id for chunk in chunk_docs]
|
|
else:
|
|
assembler = await EmbeddingAssembler.aload_from_knowledge(
|
|
knowledge=knowledge,
|
|
index_store=vector_store_connector.index_client,
|
|
chunk_parameters=chunk_parameters,
|
|
)
|
|
|
|
chunk_docs = assembler.get_chunks()
|
|
doc.chunk_size = len(chunk_docs)
|
|
vector_ids = await assembler.apersist()
|
|
doc.status = SyncStatus.FINISHED.name
|
|
doc.result = "document persist into index store success"
|
|
if vector_ids is not None:
|
|
doc.vector_ids = ",".join(vector_ids)
|
|
logger.info(f"async document persist index store success:{doc.doc_name}")
|
|
# save chunk details
|
|
chunk_entities = [
|
|
DocumentChunkEntity(
|
|
doc_name=doc.doc_name,
|
|
doc_type=doc.doc_type,
|
|
document_id=doc.id,
|
|
content=chunk_doc.content,
|
|
meta_info=str(chunk_doc.metadata),
|
|
gmt_created=datetime.now(),
|
|
gmt_modified=datetime.now(),
|
|
)
|
|
for chunk_doc in chunk_docs
|
|
]
|
|
self._chunk_dao.create_documents_chunks(chunk_entities)
|
|
except Exception as e:
|
|
doc.status = SyncStatus.FAILED.name
|
|
doc.result = "document embedding failed" + str(e)
|
|
logger.error(f"document embedding, failed:{doc.doc_name}, {str(e)}")
|
|
return self._document_dao.update_knowledge_document(doc)
|
|
|
|
def get_space_context(self, space_id):
|
|
"""get space contect
|
|
Args:
|
|
- space_name: space name
|
|
"""
|
|
space = self.get({"id": space_id})
|
|
if space is None:
|
|
raise Exception(
|
|
f"have not found {space_id} space or found more than one space called {space_id}"
|
|
)
|
|
if space.context is not None:
|
|
return json.loads(space.context)
|
|
return None
|