mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-23 20:26:15 +00:00
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com> Co-authored-by: aries_ckt <916701291@qq.com> Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com> Co-authored-by: csunny <cfqsunny@163.com>
423 lines
15 KiB
Python
423 lines
15 KiB
Python
from datetime import datetime
|
|
from typing import Any, Dict, List, Union
|
|
|
|
from sqlalchemy import Column, DateTime, Integer, String, Text, func
|
|
|
|
from dbgpt._private.config import Config
|
|
from dbgpt._private.pydantic import model_to_dict
|
|
from dbgpt.serve.conversation.api.schemas import ServeRequest
|
|
from dbgpt.serve.rag.api.schemas import (
|
|
DocumentServeRequest,
|
|
DocumentServeResponse,
|
|
DocumentVO,
|
|
)
|
|
from dbgpt.storage.metadata import BaseDao, Model
|
|
from dbgpt.storage.metadata._base_dao import QUERY_SPEC, RES
|
|
from dbgpt.util import PaginationResult
|
|
|
|
CFG = Config()
|
|
|
|
|
|
class KnowledgeDocumentEntity(Model):
|
|
__tablename__ = "knowledge_document"
|
|
id = Column(Integer, primary_key=True)
|
|
doc_name = Column(String(100))
|
|
doc_type = Column(String(100))
|
|
doc_token = Column(String(100))
|
|
space = Column(String(100))
|
|
chunk_size = Column(Integer)
|
|
status = Column(String(100))
|
|
last_sync = Column(DateTime)
|
|
content = Column(Text)
|
|
result = Column(Text)
|
|
vector_ids = Column(Text)
|
|
summary = Column(Text)
|
|
gmt_created = Column(DateTime)
|
|
gmt_modified = Column(DateTime)
|
|
questions = Column(Text)
|
|
|
|
def __repr__(self):
|
|
return f"KnowledgeDocumentEntity(id={self.id}, doc_name='{self.doc_name}', doc_type='{self.doc_type}', chunk_size='{self.chunk_size}', status='{self.status}', last_sync='{self.last_sync}', content='{self.content}', result='{self.result}', summary='{self.summary}', gmt_created='{self.gmt_created}', gmt_modified='{self.gmt_modified}', questions='{self.questions}')"
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"__tablename__": self.__tablename__,
|
|
"id": self.id,
|
|
"doc_name": self.doc_name,
|
|
"doc_type": self.doc_type,
|
|
"doc_token": self.doc_token,
|
|
"space": self.space,
|
|
"chunk_size": self.chunk_size,
|
|
"status": self.status,
|
|
"last_sync": self.last_sync,
|
|
"content": self.content,
|
|
"result": self.result,
|
|
"vector_ids": self.vector_ids,
|
|
"summary": self.summary,
|
|
"gmt_create": self.gmt_created,
|
|
"gmt_modified": self.gmt_modified,
|
|
"questions": self.questions,
|
|
}
|
|
|
|
|
|
class KnowledgeDocumentDao(BaseDao):
|
|
def create_knowledge_document(self, document: KnowledgeDocumentEntity):
|
|
session = self.get_raw_session()
|
|
knowledge_document = KnowledgeDocumentEntity(
|
|
doc_name=document.doc_name,
|
|
doc_type=document.doc_type,
|
|
doc_token=document.doc_token,
|
|
space=document.space,
|
|
chunk_size=0.0,
|
|
status=document.status,
|
|
last_sync=document.last_sync,
|
|
content=document.content or "",
|
|
result=document.result or "",
|
|
summary=document.summary or "",
|
|
vector_ids=document.vector_ids,
|
|
gmt_created=datetime.now(),
|
|
gmt_modified=datetime.now(),
|
|
questions=document.questions,
|
|
)
|
|
session.add(knowledge_document)
|
|
session.commit()
|
|
doc_id = knowledge_document.id
|
|
session.close()
|
|
return doc_id
|
|
|
|
def get_knowledge_documents(self, query, page=1, page_size=20):
|
|
"""Get a list of documents that match the given query.
|
|
Args:
|
|
query: A KnowledgeDocumentEntity object containing the query parameters.
|
|
page: The page number to return.
|
|
page_size: The number of documents to return per page.
|
|
"""
|
|
session = self.get_raw_session()
|
|
print(f"current session:{session}")
|
|
knowledge_documents = session.query(KnowledgeDocumentEntity)
|
|
if query.id is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.id == query.id
|
|
)
|
|
if query.doc_name is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_name == query.doc_name
|
|
)
|
|
if query.doc_type is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_type == query.doc_type
|
|
)
|
|
if query.space is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.space == query.space
|
|
)
|
|
if query.status is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.status == query.status
|
|
)
|
|
|
|
knowledge_documents = knowledge_documents.order_by(
|
|
KnowledgeDocumentEntity.id.desc()
|
|
)
|
|
knowledge_documents = knowledge_documents.offset((page - 1) * page_size).limit(
|
|
page_size
|
|
)
|
|
result = knowledge_documents.all()
|
|
session.close()
|
|
return result
|
|
|
|
def document_by_id(self, document_id) -> KnowledgeDocumentEntity:
|
|
session = self.get_raw_session()
|
|
query = session.query(KnowledgeDocumentEntity).filter(
|
|
KnowledgeDocumentEntity.id == document_id
|
|
)
|
|
|
|
result = query.first()
|
|
session.close()
|
|
return result
|
|
|
|
def documents_by_ids(self, ids) -> List[KnowledgeDocumentEntity]:
|
|
"""Get a list of documents by their IDs.
|
|
Args:
|
|
ids: A list of document IDs.
|
|
Returns:
|
|
A list of KnowledgeDocumentEntity objects.
|
|
"""
|
|
session = self.get_raw_session()
|
|
print(f"current session:{session}")
|
|
knowledge_documents = session.query(KnowledgeDocumentEntity)
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.id.in_(ids)
|
|
)
|
|
result = knowledge_documents.all()
|
|
session.close()
|
|
return result
|
|
|
|
def get_documents(self, query):
|
|
session = self.get_raw_session()
|
|
print(f"current session:{session}")
|
|
knowledge_documents = session.query(KnowledgeDocumentEntity)
|
|
if query.id is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.id == query.id
|
|
)
|
|
if query.doc_name is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_name == query.doc_name
|
|
)
|
|
if query.doc_type is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_type == query.doc_type
|
|
)
|
|
if query.space is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.space == query.space
|
|
)
|
|
if query.status is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.status == query.status
|
|
)
|
|
|
|
knowledge_documents = knowledge_documents.order_by(
|
|
KnowledgeDocumentEntity.id.desc()
|
|
)
|
|
result = knowledge_documents.all()
|
|
session.close()
|
|
return result
|
|
|
|
def get_knowledge_documents_count_bulk(self, space_names):
|
|
session = self.get_raw_session()
|
|
"""
|
|
Perform a batch query to count the number of documents for each knowledge space.
|
|
|
|
Args:
|
|
space_names: A list of knowledge space names to query for document counts.
|
|
session: A SQLAlchemy session object.
|
|
|
|
Returns:
|
|
A dictionary mapping each space name to its document count.
|
|
"""
|
|
counts_query = (
|
|
session.query(
|
|
KnowledgeDocumentEntity.space,
|
|
func.count(KnowledgeDocumentEntity.id).label("document_count"),
|
|
)
|
|
.filter(KnowledgeDocumentEntity.space.in_(space_names))
|
|
.group_by(KnowledgeDocumentEntity.space)
|
|
)
|
|
|
|
results = counts_query.all()
|
|
docs_count = {result.space: result.document_count for result in results}
|
|
session.close()
|
|
return docs_count
|
|
|
|
def get_knowledge_documents_count_bulk_by_ids(self, spaces):
|
|
session = self.get_raw_session()
|
|
"""
|
|
Perform a batch query to count the number of documents for each knowledge space.
|
|
|
|
Args:
|
|
spaces: A list of knowledge space names to query for document counts.
|
|
session: A SQLAlchemy session object.
|
|
|
|
Returns:
|
|
A dictionary mapping each space name to its document count.
|
|
"""
|
|
# build the group by query
|
|
counts_query = (
|
|
session.query(
|
|
KnowledgeDocumentEntity.space,
|
|
func.count(KnowledgeDocumentEntity.id).label("document_count"),
|
|
)
|
|
.filter(KnowledgeDocumentEntity.space.in_(spaces))
|
|
.group_by(KnowledgeDocumentEntity.space)
|
|
)
|
|
|
|
results = counts_query.all()
|
|
docs_count = {result.space: result.document_count for result in results}
|
|
session.close()
|
|
return docs_count
|
|
|
|
def get_knowledge_documents_count(self, query):
|
|
session = self.get_raw_session()
|
|
knowledge_documents = session.query(func.count(KnowledgeDocumentEntity.id))
|
|
if query.id is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.id == query.id
|
|
)
|
|
if query.doc_name is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_name == query.doc_name
|
|
)
|
|
if query.doc_type is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_type == query.doc_type
|
|
)
|
|
if query.space is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.space == query.space
|
|
)
|
|
if query.status is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.status == query.status
|
|
)
|
|
count = knowledge_documents.scalar()
|
|
session.close()
|
|
return count
|
|
|
|
def update_set_space_id(self, space, space_id):
|
|
session = self.get_raw_session()
|
|
knowledge_documents = session.query(KnowledgeDocumentEntity)
|
|
if space is not None:
|
|
knowledge_documents.filter(KnowledgeDocumentEntity.space == space).filter(
|
|
KnowledgeDocumentEntity.id == None
|
|
).update({KnowledgeDocumentEntity.id: space_id}, synchronize_session=False)
|
|
session.commit()
|
|
session.close()
|
|
|
|
def update_knowledge_document(self, document: KnowledgeDocumentEntity):
|
|
try:
|
|
session = self.get_raw_session()
|
|
updated_space = session.merge(document)
|
|
session.commit()
|
|
return updated_space.id
|
|
finally:
|
|
session.close()
|
|
|
|
def raw_delete(self, query: KnowledgeDocumentEntity):
|
|
session = self.get_raw_session()
|
|
knowledge_documents = session.query(KnowledgeDocumentEntity)
|
|
if query.id is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.id == query.id
|
|
)
|
|
if query.doc_name is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.doc_name == query.doc_name
|
|
)
|
|
if query.space is not None:
|
|
knowledge_documents = knowledge_documents.filter(
|
|
KnowledgeDocumentEntity.space == query.space
|
|
)
|
|
knowledge_documents.delete()
|
|
session.commit()
|
|
session.close()
|
|
|
|
def get_list_page(
|
|
self, query_request: QUERY_SPEC, page: int, page_size: int
|
|
) -> PaginationResult[RES]:
|
|
"""Get a page of entity objects.
|
|
|
|
Args:
|
|
query_request (REQ): The request schema object or dict for query.
|
|
page (int): The page number.
|
|
page_size (int): The page size.
|
|
|
|
Returns:
|
|
PaginationResult: The pagination result.
|
|
"""
|
|
with self.session() as session:
|
|
query = self._create_query_object(session, query_request)
|
|
total_count = query.count()
|
|
items = (
|
|
query.order_by(KnowledgeDocumentEntity.id.desc())
|
|
.offset((page - 1) * page_size)
|
|
.limit(page_size)
|
|
)
|
|
items = [self.to_response(item) for item in items]
|
|
total_pages = (total_count + page_size - 1) // page_size
|
|
|
|
return PaginationResult(
|
|
items=items,
|
|
total_count=total_count,
|
|
total_pages=total_pages,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
def from_request(
|
|
self, request: Union[ServeRequest, Dict[str, Any]]
|
|
) -> KnowledgeDocumentEntity:
|
|
"""Convert the request to an entity
|
|
|
|
Args:
|
|
request (Union[ServeRequest, Dict[str, Any]]): The request
|
|
|
|
Returns:
|
|
T: The entity
|
|
"""
|
|
request_dict = (
|
|
request.dict() if isinstance(request, DocumentServeRequest) else request
|
|
)
|
|
entity = KnowledgeDocumentEntity(**request_dict)
|
|
return entity
|
|
|
|
def to_request(self, entity: KnowledgeDocumentEntity) -> DocumentServeResponse:
|
|
"""Convert the entity to a request
|
|
|
|
Args:
|
|
entity (T): The entity
|
|
|
|
Returns:
|
|
REQ: The request
|
|
"""
|
|
return DocumentServeResponse(
|
|
id=entity.id,
|
|
doc_name=entity.doc_name,
|
|
doc_type=entity.doc_type,
|
|
space=entity.space,
|
|
chunk_size=entity.chunk_size,
|
|
status=entity.status,
|
|
last_sync=entity.last_sync,
|
|
content=entity.content,
|
|
result=entity.result,
|
|
vector_ids=entity.vector_ids,
|
|
summary=entity.summary,
|
|
questions=entity.questions,
|
|
gmt_created=entity.gmt_created,
|
|
gmt_modified=entity.gmt_modified,
|
|
)
|
|
|
|
def to_response(self, entity: KnowledgeDocumentEntity) -> DocumentServeResponse:
|
|
"""Convert the entity to a response
|
|
|
|
Args:
|
|
entity (T): The entity
|
|
|
|
Returns:
|
|
REQ: The request
|
|
"""
|
|
return DocumentServeResponse(
|
|
id=entity.id,
|
|
doc_name=entity.doc_name,
|
|
doc_type=entity.doc_type,
|
|
space=entity.space,
|
|
chunk_size=entity.chunk_size,
|
|
status=entity.status,
|
|
last_sync=str(entity.last_sync),
|
|
content=entity.content,
|
|
result=entity.result,
|
|
vector_ids=entity.vector_ids,
|
|
summary=entity.summary,
|
|
questions=entity.questions,
|
|
gmt_created=str(entity.gmt_created),
|
|
gmt_modified=str(entity.gmt_modified),
|
|
)
|
|
|
|
def from_response(
|
|
self, response: Union[DocumentServeResponse, Dict[str, Any]]
|
|
) -> KnowledgeDocumentEntity:
|
|
"""Convert the request to an entity
|
|
|
|
Args:
|
|
request (Union[ServeRequest, Dict[str, Any]]): The request
|
|
|
|
Returns:
|
|
T: The entity
|
|
"""
|
|
response_dict = (
|
|
response.dict() if isinstance(response, DocumentServeResponse) else response
|
|
)
|
|
entity = KnowledgeDocumentEntity(**response_dict)
|
|
return entity
|