mirror of
https://github.com/csunny/DB-GPT.git
synced 2026-01-15 14:56:03 +00:00
1.create knowledge space 2.list knowledge space 3.create knowledge document 4.list knowledge document 5.save document chunks 6.sync embedding document
174 lines
6.3 KiB
Python
174 lines
6.3 KiB
Python
import threading
|
|
from datetime import datetime
|
|
|
|
from pilot.configs.config import Config
|
|
from pilot.configs.model_config import LLM_MODEL_CONFIG
|
|
from pilot.embedding_engine.knowledge_embedding import KnowledgeEmbedding
|
|
from pilot.logs import logger
|
|
from pilot.server.knowledge.document_chunk_dao import DocumentChunkEntity, DocumentChunkDao
|
|
from pilot.server.knowledge.knowledge_document_dao import (
|
|
KnowledgeDocumentDao,
|
|
KnowledgeDocumentEntity,
|
|
)
|
|
from pilot.server.knowledge.knowledge_space_dao import KnowledgeSpaceDao, KnowledgeSpaceEntity
|
|
from pilot.server.knowledge.request.knowledge_request import (
|
|
KnowledgeSpaceRequest,
|
|
KnowledgeDocumentRequest, DocumentQueryRequest, ChunkQueryRequest,
|
|
)
|
|
from enum import Enum
|
|
|
|
|
|
knowledge_space_dao = KnowledgeSpaceDao()
|
|
knowledge_document_dao = KnowledgeDocumentDao()
|
|
document_chunk_dao = DocumentChunkDao()
|
|
|
|
CFG=Config()
|
|
|
|
class SyncStatus(Enum):
|
|
TODO = "TODO"
|
|
FAILED = "FAILED"
|
|
RUNNING = "RUNNING"
|
|
FINISHED = "FINISHED"
|
|
|
|
|
|
# @singleton
|
|
class KnowledgeService:
|
|
def __init__(self):
|
|
pass
|
|
|
|
"""create knowledge space"""
|
|
|
|
def create_knowledge_space(self, request: KnowledgeSpaceRequest):
|
|
query = KnowledgeSpaceEntity(
|
|
name=request.name,
|
|
)
|
|
spaces = knowledge_space_dao.get_knowledge_space(query)
|
|
if len(spaces) > 0:
|
|
raise Exception(f"space name:{request.name} have already named")
|
|
knowledge_space_dao.create_knowledge_space(request)
|
|
return True
|
|
|
|
"""create knowledge document"""
|
|
|
|
def create_knowledge_document(self, space, request: KnowledgeDocumentRequest):
|
|
query = KnowledgeDocumentEntity(
|
|
doc_name=request.doc_name,
|
|
space=space
|
|
)
|
|
documents = knowledge_document_dao.get_knowledge_documents(query)
|
|
if len(documents) > 0:
|
|
raise Exception(f"document name:{request.doc_name} have already named")
|
|
document = KnowledgeDocumentEntity(
|
|
doc_name=request.doc_name,
|
|
doc_type=request.doc_type,
|
|
space=space,
|
|
chunk_size=0,
|
|
status=SyncStatus.TODO.name,
|
|
last_sync=datetime.now(),
|
|
content="",
|
|
)
|
|
knowledge_document_dao.create_knowledge_document(document)
|
|
return True
|
|
|
|
"""get knowledge space"""
|
|
|
|
def get_knowledge_space(self, request:KnowledgeSpaceRequest):
|
|
query = KnowledgeSpaceEntity(
|
|
name=request.name,
|
|
vector_type=request.vector_type,
|
|
owner=request.owner
|
|
)
|
|
return knowledge_space_dao.get_knowledge_space(query)
|
|
|
|
"""get knowledge get_knowledge_documents"""
|
|
|
|
def get_knowledge_documents(self, space, request:DocumentQueryRequest):
|
|
query = KnowledgeDocumentEntity(
|
|
doc_name=request.doc_name,
|
|
doc_type=request.doc_type,
|
|
space=space,
|
|
status=request.status,
|
|
)
|
|
return knowledge_document_dao.get_knowledge_documents(query, page=request.page, page_size=request.page_size)
|
|
|
|
"""sync knowledge document chunk into vector store"""
|
|
def sync_knowledge_document(self, space_name, doc_ids):
|
|
for doc_id in doc_ids:
|
|
query = KnowledgeDocumentEntity(
|
|
id=doc_id,
|
|
space=space_name,
|
|
)
|
|
doc = knowledge_document_dao.get_knowledge_documents(query)[0]
|
|
client = KnowledgeEmbedding(file_path=doc.doc_name,
|
|
file_type="url",
|
|
model_name=LLM_MODEL_CONFIG[CFG.EMBEDDING_MODEL],
|
|
vector_store_config={
|
|
"vector_store_name": space_name,
|
|
})
|
|
chunk_docs = client.read()
|
|
# update document status
|
|
doc.status = SyncStatus.RUNNING.name
|
|
doc.chunk_size = len(chunk_docs)
|
|
doc.gmt_modified = datetime.now()
|
|
knowledge_document_dao.update_knowledge_document(doc)
|
|
# async doc embeddings
|
|
thread = threading.Thread(target=self.async_doc_embedding(client, chunk_docs, doc))
|
|
thread.start()
|
|
#save chunk details
|
|
chunk_entities = [
|
|
DocumentChunkEntity(
|
|
doc_name=doc.doc_name,
|
|
doc_type=doc.doc_type,
|
|
document_id=doc.id,
|
|
content=chunk_doc.page_content,
|
|
meta_info=str(chunk_doc.metadata),
|
|
gmt_created=datetime.now(),
|
|
gmt_modified=datetime.now()
|
|
)
|
|
for chunk_doc in chunk_docs]
|
|
document_chunk_dao.create_documents_chunks(chunk_entities)
|
|
#update document status
|
|
# doc.status = SyncStatus.RUNNING.name
|
|
# doc.chunk_size = len(chunk_docs)
|
|
# doc.gmt_modified = datetime.now()
|
|
# knowledge_document_dao.update_knowledge_document(doc)
|
|
|
|
return True
|
|
|
|
"""update knowledge space"""
|
|
|
|
def update_knowledge_space(
|
|
self, space_id: int, space_request: KnowledgeSpaceRequest
|
|
):
|
|
knowledge_space_dao.update_knowledge_space(space_id, space_request)
|
|
|
|
"""delete knowledge space"""
|
|
|
|
def delete_knowledge_space(self, space_id: int):
|
|
return knowledge_space_dao.delete_knowledge_space(space_id)
|
|
|
|
"""get document chunks"""
|
|
def get_document_chunks(self, request:ChunkQueryRequest):
|
|
query = DocumentChunkEntity(
|
|
id=request.id,
|
|
document_id=request.document_id,
|
|
doc_name=request.doc_name,
|
|
doc_type=request.doc_type
|
|
)
|
|
return document_chunk_dao.get_document_chunks(query, page=request.page, page_size=request.page_size)
|
|
|
|
def async_doc_embedding(self, client, chunk_docs, doc):
|
|
logger.info(f"async_doc_embedding, doc:{doc.doc_name}, chunk_size:{len(chunk_docs)}, begin embedding to vector store-{CFG.VECTOR_STORE_TYPE}")
|
|
try:
|
|
vector_ids = client.knowledge_embedding_batch(chunk_docs)
|
|
doc.status = SyncStatus.FINISHED.name
|
|
doc.content = "embedding success"
|
|
doc.vector_ids = ",".join(vector_ids)
|
|
except Exception as e:
|
|
doc.status = SyncStatus.FAILED.name
|
|
doc.content = str(e)
|
|
|
|
return knowledge_document_dao.update_knowledge_document(doc)
|
|
|
|
|