DB-GPT/dbgpt/app/knowledge/service.py
明天 b124ecc10b
feat: (0.6)New UI (#1855)
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com>
Co-authored-by: aries_ckt <916701291@qq.com>
Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com>
Co-authored-by: csunny <cfqsunny@163.com>
2024-08-21 17:37:45 +08:00

659 lines
24 KiB
Python

import json
import logging
import re
import timeit
from datetime import datetime
from typing import List
from dbgpt._private.config import Config
from dbgpt.app.knowledge.chunk_db import DocumentChunkDao, DocumentChunkEntity
from dbgpt.app.knowledge.document_db import (
KnowledgeDocumentDao,
KnowledgeDocumentEntity,
)
from dbgpt.app.knowledge.request.request import (
ChunkEditRequest,
ChunkQueryRequest,
DocumentQueryRequest,
DocumentRecallTestRequest,
DocumentSummaryRequest,
KnowledgeDocumentRequest,
KnowledgeSpaceRequest,
SpaceArgumentRequest,
)
from dbgpt.app.knowledge.request.response import (
ChunkQueryResponse,
DocumentQueryResponse,
DocumentResponse,
SpaceQueryResponse,
)
from dbgpt.component import ComponentType
from dbgpt.configs import DOMAIN_TYPE_FINANCIAL_REPORT
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.core import LLMClient
from dbgpt.model import DefaultLLMClient
from dbgpt.model.cluster import WorkerManagerFactory
from dbgpt.rag.assembler.summary import SummaryAssembler
from dbgpt.rag.chunk_manager import ChunkParameters
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
from dbgpt.rag.knowledge.base import KnowledgeType
from dbgpt.rag.knowledge.factory import KnowledgeFactory
from dbgpt.rag.retriever.rerank import RerankEmbeddingsRanker
from dbgpt.serve.rag.connector import VectorStoreConnector
from dbgpt.serve.rag.models.models import KnowledgeSpaceDao, KnowledgeSpaceEntity
from dbgpt.serve.rag.retriever.knowledge_space import KnowledgeSpaceRetriever
from dbgpt.serve.rag.service.service import SyncStatus
from dbgpt.storage.vector_store.base import VectorStoreConfig
from dbgpt.util.executor_utils import ExecutorFactory, blocking_func_to_async
from dbgpt.util.tracer import root_tracer, trace
knowledge_space_dao = KnowledgeSpaceDao()
knowledge_document_dao = KnowledgeDocumentDao()
document_chunk_dao = DocumentChunkDao()
logger = logging.getLogger(__name__)
CFG = Config()
# default summary max iteration call with llm.
DEFAULT_SUMMARY_MAX_ITERATION = 5
# default summary concurrency call with llm.
DEFAULT_SUMMARY_CONCURRENCY_LIMIT = 3
class KnowledgeService:
"""KnowledgeService
Knowledge Management Service:
-knowledge_space management
-knowledge_document management
-embedding management
"""
def __init__(self):
pass
@property
def llm_client(self) -> LLMClient:
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
return DefaultLLMClient(worker_manager, True)
def create_knowledge_space(self, request: KnowledgeSpaceRequest):
"""create knowledge space
Args:
- request: KnowledgeSpaceRequest
"""
query = KnowledgeSpaceEntity(
name=request.name,
)
if request.vector_type == "VectorStore":
request.vector_type = CFG.VECTOR_STORE_TYPE
if request.vector_type == "KnowledgeGraph":
knowledge_space_name_pattern = r"^[a-zA-Z0-9\u4e00-\u9fa5]+$"
if not re.match(knowledge_space_name_pattern, request.name):
raise Exception(f"space name:{request.name} invalid")
spaces = knowledge_space_dao.get_knowledge_space(query)
if len(spaces) > 0:
raise Exception(f"space name:{request.name} have already named")
space_id = knowledge_space_dao.create_knowledge_space(request)
return space_id
def create_knowledge_document(self, space, request: KnowledgeDocumentRequest):
"""create knowledge document
Args:
- request: KnowledgeDocumentRequest
"""
query = KnowledgeDocumentEntity(doc_name=request.doc_name, space=space)
documents = knowledge_document_dao.get_knowledge_documents(query)
if len(documents) > 0:
raise Exception(f"document name:{request.doc_name} have already named")
document = KnowledgeDocumentEntity(
doc_name=request.doc_name,
doc_type=request.doc_type,
space=space,
chunk_size=0,
status=SyncStatus.TODO.name,
last_sync=datetime.now(),
content=request.content,
result="",
)
doc_id = knowledge_document_dao.create_knowledge_document(document)
if doc_id is None:
raise Exception(f"create document failed, {request.doc_name}")
return doc_id
def get_knowledge_space(self, request: KnowledgeSpaceRequest):
"""get knowledge space
Args:
- request: KnowledgeSpaceRequest
"""
query = KnowledgeSpaceEntity(
id=request.id,
name=request.name,
vector_type=request.vector_type,
owner=request.owner,
)
spaces = knowledge_space_dao.get_knowledge_space(query)
space_names = [space.name for space in spaces]
docs_count = knowledge_document_dao.get_knowledge_documents_count_bulk(
space_names
)
responses = []
for space in spaces:
res = SpaceQueryResponse()
res.id = space.id
res.name = space.name
res.vector_type = space.vector_type
res.domain_type = space.domain_type
res.desc = space.desc
res.owner = space.owner
res.gmt_created = space.gmt_created
res.gmt_modified = space.gmt_modified
res.context = space.context
res.docs = docs_count.get(space.name, 0)
responses.append(res)
return responses
def arguments(self, space):
"""show knowledge space arguments
Args:
- space_name: Knowledge Space Name
"""
query = KnowledgeSpaceEntity(name=space)
spaces = knowledge_space_dao.get_knowledge_space(query)
if len(spaces) != 1:
raise Exception(f"there are no or more than one space called {space}")
space = spaces[0]
if space.context is None:
context = self._build_default_context()
else:
context = space.context
return json.loads(context)
def argument_save(self, space, argument_request: SpaceArgumentRequest):
"""save argument
Args:
- space_name: Knowledge Space Name
- argument_request: SpaceArgumentRequest
"""
query = KnowledgeSpaceEntity(name=space)
spaces = knowledge_space_dao.get_knowledge_space(query)
if len(spaces) != 1:
raise Exception(f"there are no or more than one space called {space}")
space = spaces[0]
space.context = argument_request.argument
return knowledge_space_dao.update_knowledge_space(space)
def get_knowledge_documents(self, space, request: DocumentQueryRequest):
"""get knowledge documents
Args:
- space: Knowledge Space Name
- request: DocumentQueryRequest
Returns:
- res DocumentQueryResponse
"""
if request.page_size <= 0:
request.page_size = 20
ks = knowledge_space_dao.get_one({"name": space})
if ks is None:
raise Exception(f"there is no space id called {space}")
res = DocumentQueryResponse()
if request.doc_ids and len(request.doc_ids) > 0:
documents: List[
KnowledgeDocumentEntity
] = knowledge_document_dao.documents_by_ids(request.doc_ids)
res.data = [item.to_dict() for item in documents]
else:
space_name = ks.name
query = {
"doc_type": request.doc_type,
"space": space_name,
"status": request.status,
}
if request.doc_name:
docs = knowledge_document_dao.get_list({"space": space_name})
docs = [DocumentResponse.serve_to_response(doc) for doc in docs]
res.data = [
doc
for doc in docs
if doc.doc_name and request.doc_name in doc.doc_name
]
else:
result = knowledge_document_dao.get_list_page(
query, page=request.page, page_size=request.page_size
)
docs = result.items
docs = [DocumentResponse.serve_to_response(doc) for doc in docs]
res.data = docs
res.total = result.total_count
res.page = result.page
return res
async def document_summary(self, request: DocumentSummaryRequest):
"""get document summary
Args:
- request: DocumentSummaryRequest
"""
doc_query = KnowledgeDocumentEntity(id=request.doc_id)
documents = knowledge_document_dao.get_documents(doc_query)
if len(documents) != 1:
raise Exception(f"can not found document for {request.doc_id}")
document = documents[0]
from dbgpt.model.cluster import WorkerManagerFactory
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
chunk_parameters = ChunkParameters(
chunk_strategy="CHUNK_BY_SIZE",
chunk_size=CFG.KNOWLEDGE_CHUNK_SIZE,
chunk_overlap=CFG.KNOWLEDGE_CHUNK_OVERLAP,
)
chunk_entities = document_chunk_dao.get_document_chunks(
DocumentChunkEntity(document_id=document.id)
)
if (
document.status not in [SyncStatus.RUNNING.name]
and len(chunk_entities) == 0
):
from dbgpt.serve.rag.service.service import Service
rag_service = Service.get_instance(CFG.SYSTEM_APP)
space = rag_service.get({"name": document.space})
document_vo = KnowledgeDocumentEntity.to_document_vo(documents)
await rag_service._sync_knowledge_document(
space_id=space.id,
doc_vo=document_vo[0],
chunk_parameters=chunk_parameters,
)
knowledge = KnowledgeFactory.create(
datasource=document.content,
knowledge_type=KnowledgeType.get_by_value(document.doc_type),
)
assembler = SummaryAssembler(
knowledge=knowledge,
model_name=request.model_name,
llm_client=DefaultLLMClient(
worker_manager=worker_manager, auto_convert_message=True
),
language=CFG.LANGUAGE,
chunk_parameters=chunk_parameters,
)
summary = await assembler.generate_summary()
if len(assembler.get_chunks()) == 0:
raise Exception(f"can not found chunks for {request.doc_id}")
return await self._llm_extract_summary(
summary, request.conv_uid, request.model_name
)
def get_space_context_by_space_id(self, space_id):
"""get space contect
Args:
- space_id: space name
"""
spaces = self.get_knowledge_space_by_ids([space_id])
if len(spaces) != 1:
raise Exception(
f"have not found {space_id} space or found more than one space called {space_id}"
)
space = spaces[0]
if space.context is not None:
return json.loads(spaces[0].context)
return None
def get_knowledge_space_by_ids(self, ids):
"""
get knowledge space by ids.
"""
return knowledge_space_dao.get_knowledge_space_by_ids(ids)
def recall_test(
self, space_name, doc_recall_test_request: DocumentRecallTestRequest
):
logger.info(f"recall_test {space_name}, {doc_recall_test_request}")
from dbgpt.rag.embedding.embedding_factory import RerankEmbeddingFactory
try:
start_time = timeit.default_timer()
question = doc_recall_test_request.question
space_context = self.get_space_context(space_name)
logger.info(f"space_context is {space_context}")
space = knowledge_space_dao.get_one({"name": space_name})
top_k = int(doc_recall_test_request.recall_top_k)
score_threshold = (
float(space_context["embedding"].get("recall_score", 0.3))
if (space_context and "embedding" in space_context)
else 0.3
)
if CFG.RERANK_MODEL is not None:
if top_k < int(CFG.RERANK_TOP_K) or top_k < 20:
# We use reranker, so if the top_k is less than 20,
# we need to set it to 20
top_k = max(int(CFG.RERANK_TOP_K), 20)
knowledge_space_retriever = KnowledgeSpaceRetriever(
space_id=space.id, top_k=top_k
)
chunks = knowledge_space_retriever.retrieve_with_scores(
question, score_threshold
)
retrievers_end_time = timeit.default_timer()
retrievers_cost_time = retrievers_end_time - start_time
logger.info(
f"retrieve chunks size is {len(chunks)}, "
f"retrievers_cost_time is {retrievers_cost_time} seconds"
)
recall_top_k = int(doc_recall_test_request.recall_top_k)
if CFG.RERANK_MODEL is not None:
rerank_embeddings = RerankEmbeddingFactory.get_instance(
CFG.SYSTEM_APP
).create()
reranker = RerankEmbeddingsRanker(rerank_embeddings, topk=recall_top_k)
chunks = reranker.rank(candidates_with_scores=chunks, query=question)
recall_score_threshold = doc_recall_test_request.recall_score_threshold
if recall_score_threshold is not None:
chunks = [
chunk for chunk in chunks if chunk.score >= recall_score_threshold
]
recall_end_time = timeit.default_timer()
recall_cost_time = recall_end_time - start_time
cost_time_map = {
"retrievers_cost_time": retrievers_cost_time,
"recall_cost_time": recall_cost_time,
}
logger.info(
f"recall chunks size is {len(chunks)}, "
f"recall_cost_time is {recall_cost_time} seconds, {cost_time_map}"
)
# return chunks, cost_time_map
return chunks
except Exception as e:
logger.error(f" recall_test error: {str(e)}")
return []
def update_knowledge_space(
self, space_id: int, space_request: KnowledgeSpaceRequest
):
"""update knowledge space
Args:
- space_id: space id
- space_request: KnowledgeSpaceRequest
"""
entity = KnowledgeSpaceEntity(
id=space_id,
name=space_request.name,
vector_type=space_request.vector_type,
desc=space_request.desc,
owner=space_request.owner,
)
knowledge_space_dao.update_knowledge_space(entity)
def delete_space(self, space_name: str):
"""delete knowledge space
Args:
- space_name: knowledge space name
"""
spaces = knowledge_space_dao.get_knowledge_space(
KnowledgeSpaceEntity(name=space_name)
)
if len(spaces) != 1:
raise Exception(f"invalid space name:{space_name}")
space = spaces[0]
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
config = VectorStoreConfig(
name=space.name,
embedding_fn=embedding_fn,
llm_client=self.llm_client,
model_name=None,
)
if space.domain_type == DOMAIN_TYPE_FINANCIAL_REPORT:
conn_manager = CFG.local_db_manager
conn_manager.delete_db(f"{space.name}_fin_report")
vector_store_connector = VectorStoreConnector(
vector_store_type=space.vector_type, vector_store_config=config
)
# delete vectors
vector_store_connector.delete_vector_name(space.name)
document_query = KnowledgeDocumentEntity(space=space.name)
# delete chunks
documents = knowledge_document_dao.get_documents(document_query)
for document in documents:
document_chunk_dao.raw_delete(document.id)
# delete documents
knowledge_document_dao.raw_delete(document_query)
# delete space
return knowledge_space_dao.delete_knowledge_space(space)
def delete_document(self, space_name: str, doc_name: str):
"""delete document
Args:
- space_name: knowledge space name
- doc_name: doocument name
"""
document_query = KnowledgeDocumentEntity(doc_name=doc_name, space=space_name)
documents = knowledge_document_dao.get_documents(document_query)
if len(documents) != 1:
raise Exception(f"there are no or more than one document called {doc_name}")
spaces = self.get_knowledge_space(KnowledgeSpaceRequest(name=space_name))
if len(spaces) != 1:
raise Exception(f"invalid space name:{space_name}")
space = spaces[0]
vector_ids = documents[0].vector_ids
if vector_ids is not None:
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
config = VectorStoreConfig(
name=space.name,
embedding_fn=embedding_fn,
llm_client=self.llm_client,
model_name=None,
)
vector_store_connector = VectorStoreConnector(
vector_store_type=space.vector_type, vector_store_config=config
)
# delete vector by ids
vector_store_connector.delete_by_ids(vector_ids)
# delete chunks
document_chunk_dao.raw_delete(documents[0].id)
# delete document
return knowledge_document_dao.raw_delete(document_query)
def get_document_chunks(self, request: ChunkQueryRequest):
"""get document chunks
Args:
- request: ChunkQueryRequest
"""
query = DocumentChunkEntity(
id=request.id,
document_id=request.document_id,
doc_name=request.doc_name,
doc_type=request.doc_type,
)
res = ChunkQueryResponse()
res.data = [
chunk.to_dict()
for chunk in document_chunk_dao.get_document_chunks(
query, page=request.page, page_size=request.page_size
)
]
return res
@trace("async_doc_embedding")
def async_doc_embedding(self, assembler, chunk_docs, doc):
"""async document embedding into vector db
Args:
- client: EmbeddingEngine Client
- chunk_docs: List[Document]
- doc: KnowledgeDocumentEntity
"""
logger.info(
f"async doc embedding sync, doc:{doc.doc_name}, chunks length is {len(chunk_docs)}"
)
try:
with root_tracer.start_span(
"app.knowledge.assembler.persist",
metadata={"doc": doc.doc_name, "chunks": len(chunk_docs)},
):
vector_ids = assembler.persist()
doc.status = SyncStatus.FINISHED.name
doc.result = "document embedding success"
if vector_ids is not None:
doc.vector_ids = ",".join(vector_ids)
logger.info(f"async document embedding, success:{doc.doc_name}")
# save chunk details
chunk_entities = [
DocumentChunkEntity(
doc_name=doc.doc_name,
doc_type=doc.doc_type,
document_id=doc.id,
content=chunk_doc.content,
meta_info=str(chunk_doc.metadata),
gmt_created=datetime.now(),
gmt_modified=datetime.now(),
)
for chunk_doc in chunk_docs
]
document_chunk_dao.create_documents_chunks(chunk_entities)
except Exception as e:
doc.status = SyncStatus.FAILED.name
doc.result = "document embedding failed" + str(e)
logger.error(f"document embedding, failed:{doc.doc_name}, {str(e)}")
return knowledge_document_dao.update_knowledge_document(doc)
def _build_default_context(self):
from dbgpt.app.scene.chat_knowledge.v1.prompt import (
_DEFAULT_TEMPLATE,
PROMPT_SCENE_DEFINE,
)
context_template = {
"embedding": {
"topk": CFG.KNOWLEDGE_SEARCH_TOP_SIZE,
"recall_score": CFG.KNOWLEDGE_SEARCH_RECALL_SCORE,
"recall_type": "TopK",
"model": EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL].rsplit("/", 1)[-1],
"chunk_size": CFG.KNOWLEDGE_CHUNK_SIZE,
"chunk_overlap": CFG.KNOWLEDGE_CHUNK_OVERLAP,
},
"prompt": {
"max_token": 2000,
"scene": PROMPT_SCENE_DEFINE,
"template": _DEFAULT_TEMPLATE,
},
"summary": {
"max_iteration": DEFAULT_SUMMARY_MAX_ITERATION,
"concurrency_limit": DEFAULT_SUMMARY_CONCURRENCY_LIMIT,
},
}
context_template_string = json.dumps(context_template, indent=4)
return context_template_string
def get_space_context(self, space_name):
"""get space contect
Args:
- space_name: space name
"""
request = KnowledgeSpaceRequest()
request.name = space_name
spaces = self.get_knowledge_space(request)
if len(spaces) != 1:
raise Exception(
f"have not found {space_name} space or found more than one space called {space_name}"
)
space = spaces[0]
if space.context is not None:
return json.loads(spaces[0].context)
return None
async def _llm_extract_summary(
self, doc: str, conn_uid: str, model_name: str = None
):
"""Extract triplets from text by llm
Args:
doc: Document
conn_uid: str,chat conversation id
model_name: str, model name
Returns:
chat: BaseChat, refine summary chat.
"""
from dbgpt.app.scene import ChatScene
chat_param = {
"chat_session_id": conn_uid,
"current_user_input": "",
"select_param": doc,
"model_name": model_name,
"model_cache_enable": False,
}
executor = CFG.SYSTEM_APP.get_component(
ComponentType.EXECUTOR_DEFAULT, ExecutorFactory
).create()
from dbgpt.app.openapi.api_v1.api_v1 import CHAT_FACTORY
chat = await blocking_func_to_async(
executor,
CHAT_FACTORY.get_implementation,
ChatScene.ExtractRefineSummary.value(),
**{"chat_param": chat_param},
)
return chat
def query_graph(self, space_name, limit):
embedding_factory = CFG.SYSTEM_APP.get_component(
"embedding_factory", EmbeddingFactory
)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
spaces = self.get_knowledge_space(KnowledgeSpaceRequest(name=space_name))
if len(spaces) != 1:
raise Exception(f"invalid space name:{space_name}")
space = spaces[0]
print(CFG.LLM_MODEL)
config = VectorStoreConfig(
name=space.name,
embedding_fn=embedding_fn,
max_chunks_once_load=CFG.KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD,
llm_client=self.llm_client,
model_name=None,
)
vector_store_connector = VectorStoreConnector(
vector_store_type=space.vector_type, vector_store_config=config
)
graph = vector_store_connector.client.query_graph(limit=limit)
res = {"nodes": [], "edges": []}
for node in graph.vertices():
res["nodes"].append({"vid": node.vid})
for edge in graph.edges():
res["edges"].append(
{
"src": edge.sid,
"dst": edge.tid,
"label": edge.props[graph.edge_label],
}
)
return res