DB-GPT/dbgpt/app/knowledge/chunk_db.py
明天 b124ecc10b
feat: (0.6)New UI (#1855)
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com>
Co-authored-by: aries_ckt <916701291@qq.com>
Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com>
Co-authored-by: csunny <cfqsunny@163.com>
2024-08-21 17:37:45 +08:00

249 lines
8.7 KiB
Python

from datetime import datetime
from typing import Any, Dict, List, Union
from sqlalchemy import Column, DateTime, Integer, String, Text, func, not_
from dbgpt._private.config import Config
from dbgpt.serve.rag.api.schemas import ChunkServeRequest, ChunkServeResponse
from dbgpt.storage.metadata import BaseDao, Model
CFG = Config()
class DocumentChunkEntity(Model):
__tablename__ = "document_chunk"
id = Column(Integer, primary_key=True)
document_id = Column(Integer)
doc_name = Column(String(100))
doc_type = Column(String(100))
content = Column(Text)
questions = Column(Text)
meta_info = Column(String(500))
gmt_created = Column(DateTime)
gmt_modified = Column(DateTime)
def __repr__(self):
return f"DocumentChunkEntity(id={self.id}, doc_name='{self.doc_name}', doc_type='{self.doc_type}', document_id='{self.document_id}', content='{self.content}', questions='{self.questions}', meta_info='{self.meta_info}', gmt_created='{self.gmt_created}', gmt_modified='{self.gmt_modified}')"
def to_dict(self):
return {
"id": self.id,
"document_id": self.document_id,
"doc_name": self.doc_name,
"doc_type": self.doc_type,
"content": self.content,
"questions": self.questions,
"meta_info": self.meta_info,
"gmt_created": self.gmt_created,
"gmt_modified": self.gmt_modified,
}
class DocumentChunkDao(BaseDao):
def create_documents_chunks(self, documents: List):
session = self.get_raw_session()
docs = [
DocumentChunkEntity(
doc_name=document.doc_name,
doc_type=document.doc_type,
document_id=document.document_id,
content=document.content or "",
meta_info=document.meta_info or "",
gmt_created=datetime.now(),
gmt_modified=datetime.now(),
)
for document in documents
]
session.add_all(docs)
session.commit()
session.close()
def get_document_chunks(
self, query: DocumentChunkEntity, page=1, page_size=20, document_ids=None
):
session = self.get_raw_session()
document_chunks = session.query(DocumentChunkEntity)
if query.id is not None:
document_chunks = document_chunks.filter(DocumentChunkEntity.id == query.id)
if query.document_id is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.document_id == query.document_id
)
if query.doc_type is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.doc_type == query.doc_type
)
if query.content is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.content.like(f"%{query.content}%")
)
if query.doc_name is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.doc_name == query.doc_name
)
if query.meta_info is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.meta_info == query.meta_info
)
if document_ids is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.document_id.in_(document_ids)
)
document_chunks = document_chunks.order_by(DocumentChunkEntity.id.asc())
document_chunks = document_chunks.offset((page - 1) * page_size).limit(
page_size
)
result = document_chunks.all()
session.close()
return result
def get_chunks_with_questions(self, query: DocumentChunkEntity, document_ids=None):
session = self.get_raw_session()
document_chunks = session.query(DocumentChunkEntity)
if query.doc_name is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.doc_name == query.doc_name
)
if query.meta_info is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.meta_info == query.meta_info
)
document_chunks = document_chunks.filter(
not_(DocumentChunkEntity.questions == None)
)
if document_ids is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.document_id.in_(document_ids)
)
document_chunks = document_chunks.order_by(DocumentChunkEntity.id.asc())
result = document_chunks.all()
session.close()
return result
def update_chunk(self, chunk: DocumentChunkEntity):
"""Update a chunk"""
try:
session = self.get_raw_session()
updated = session.merge(chunk)
session.commit()
return updated.id
finally:
session.close()
def get_document_chunks_count(self, query: DocumentChunkEntity):
session = self.get_raw_session()
document_chunks = session.query(func.count(DocumentChunkEntity.id))
if query.id is not None:
document_chunks = document_chunks.filter(DocumentChunkEntity.id == query.id)
if query.document_id is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.document_id == query.document_id
)
if query.doc_type is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.doc_type == query.doc_type
)
if query.doc_name is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.doc_name == query.doc_name
)
if query.meta_info is not None:
document_chunks = document_chunks.filter(
DocumentChunkEntity.meta_info == query.meta_info
)
count = document_chunks.scalar()
session.close()
return count
def raw_delete(self, document_id: int):
session = self.get_raw_session()
if document_id is None:
raise Exception("document_id is None")
query = DocumentChunkEntity(document_id=document_id)
knowledge_documents = session.query(DocumentChunkEntity)
if query.document_id is not None:
chunks = knowledge_documents.filter(
DocumentChunkEntity.document_id == query.document_id
)
chunks.delete()
session.commit()
session.close()
def from_request(
self, request: Union[ChunkServeRequest, Dict[str, Any]]
) -> DocumentChunkEntity:
"""Convert the request to an entity
Args:
request (Union[ServeRequest, Dict[str, Any]]): The request
Returns:
T: The entity
"""
request_dict = (
request.dict() if isinstance(request, ChunkServeRequest) else request
)
entity = DocumentChunkEntity(**request_dict)
return entity
def to_request(self, entity: DocumentChunkEntity) -> ChunkServeRequest:
"""Convert the entity to a request
Args:
entity (T): The entity
Returns:
REQ: The request
"""
return ChunkServeRequest(
id=entity.id,
doc_name=entity.doc_name,
doc_type=entity.doc_type,
document_id=entity.document_id,
content=entity.content,
questions=entity.questions,
meta_info=entity.meta_info,
gmt_created=entity.gmt_created,
gmt_modified=entity.gmt_modified,
)
def to_response(self, entity: DocumentChunkEntity) -> ChunkServeResponse:
"""Convert the entity to a response
Args:
entity (T): The entity
Returns:
REQ: The request
"""
return ChunkServeResponse(
id=entity.id,
doc_name=entity.doc_name,
doc_type=entity.doc_type,
document_id=entity.document_id,
content=entity.content,
questions=entity.questions,
meta_info=entity.meta_info,
gmt_created=str(entity.gmt_created),
gmt_modified=str(entity.gmt_modified),
)
def from_response(
self, response: Union[ChunkServeResponse, Dict[str, Any]]
) -> DocumentChunkEntity:
"""Convert the request to an entity
Args:
request (Union[ServeRequest, Dict[str, Any]]): The request
Returns:
T: The entity
"""
response_dict = (
response.dict() if isinstance(response, ChunkServeResponse) else response
)
entity = DocumentChunkEntity(**response_dict)
return entity