mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-09-29 21:40:44 +00:00
137 lines
5.3 KiB
Python
137 lines
5.3 KiB
Python
import os
|
||
from typing import Optional
|
||
|
||
import markdown
|
||
from bs4 import BeautifulSoup
|
||
from langchain.document_loaders import PyPDFLoader, TextLoader
|
||
from langchain.embeddings import HuggingFaceEmbeddings
|
||
|
||
from pilot.configs.config import Config
|
||
from pilot.configs.model_config import DATASETS_DIR, KNOWLEDGE_CHUNK_SPLIT_SIZE
|
||
from pilot.source_embedding.chn_document_splitter import CHNDocumentSplitter
|
||
from pilot.source_embedding.csv_embedding import CSVEmbedding
|
||
from pilot.source_embedding.markdown_embedding import MarkdownEmbedding
|
||
from pilot.source_embedding.pdf_embedding import PDFEmbedding
|
||
from pilot.source_embedding.url_embedding import URLEmbedding
|
||
from pilot.source_embedding.word_embedding import WordEmbedding
|
||
from pilot.vector_store.connector import VectorStoreConnector
|
||
|
||
CFG = Config()
|
||
|
||
KnowledgeEmbeddingType = {
|
||
".txt": (MarkdownEmbedding, {}),
|
||
".md": (MarkdownEmbedding,{}),
|
||
".pdf": (PDFEmbedding, {}),
|
||
".doc": (WordEmbedding, {}),
|
||
".docx": (WordEmbedding, {}),
|
||
".csv": (CSVEmbedding, {}),
|
||
}
|
||
|
||
class KnowledgeEmbedding:
|
||
def __init__(
|
||
self,
|
||
model_name,
|
||
vector_store_config,
|
||
file_type: Optional[str] = "default",
|
||
file_path: Optional[str] = None,
|
||
|
||
):
|
||
"""Initialize with Loader url, model_name, vector_store_config"""
|
||
self.file_path = file_path
|
||
self.model_name = model_name
|
||
self.vector_store_config = vector_store_config
|
||
self.file_type = file_type
|
||
self.embeddings = HuggingFaceEmbeddings(model_name=self.model_name)
|
||
self.vector_store_config["embeddings"] = self.embeddings
|
||
|
||
def knowledge_embedding(self):
|
||
self.knowledge_embedding_client = self.init_knowledge_embedding()
|
||
self.knowledge_embedding_client.source_embedding()
|
||
|
||
def knowledge_embedding_batch(self):
|
||
self.knowledge_embedding_client.batch_embedding()
|
||
|
||
def init_knowledge_embedding(self):
|
||
if self.file_type == "url":
|
||
embedding = URLEmbedding(
|
||
file_path=self.file_path,
|
||
model_name=self.model_name,
|
||
vector_store_config=self.vector_store_config,
|
||
)
|
||
return embedding
|
||
extension = "." + self.file_path.rsplit(".", 1)[-1]
|
||
if extension in KnowledgeEmbeddingType:
|
||
knowledge_class, knowledge_args = KnowledgeEmbeddingType[extension]
|
||
embedding = knowledge_class(self.file_path, model_name=self.model_name, vector_store_config=self.vector_store_config, **knowledge_args)
|
||
return embedding
|
||
raise ValueError(f"Unsupported knowledge file type '{extension}'")
|
||
return embedding
|
||
|
||
def similar_search(self, text, topk):
|
||
vector_client = VectorStoreConnector(CFG.VECTOR_STORE_TYPE, self.vector_store_config)
|
||
return vector_client.similar_search(text, topk)
|
||
|
||
def vector_exist(self):
|
||
vector_client = VectorStoreConnector(
|
||
CFG.VECTOR_STORE_TYPE, self.vector_store_config
|
||
)
|
||
return vector_client.vector_name_exists()
|
||
|
||
def knowledge_persist_initialization(self, append_mode):
|
||
documents = self._load_knownlege(self.file_path)
|
||
self.vector_client = VectorStoreConnector(
|
||
CFG.VECTOR_STORE_TYPE, self.vector_store_config
|
||
)
|
||
self.vector_client.load_document(documents)
|
||
return self.vector_client
|
||
|
||
def _load_knownlege(self, path):
|
||
docments = []
|
||
for root, _, files in os.walk(path, topdown=False):
|
||
for file in files:
|
||
filename = os.path.join(root, file)
|
||
docs = self._load_file(filename)
|
||
new_docs = []
|
||
for doc in docs:
|
||
doc.metadata = {
|
||
"source": doc.metadata["source"].replace(DATASETS_DIR, "")
|
||
}
|
||
print("doc is embedding...", doc.metadata)
|
||
new_docs.append(doc)
|
||
docments += new_docs
|
||
return docments
|
||
|
||
def _load_file(self, filename):
|
||
if filename.lower().endswith(".md"):
|
||
loader = TextLoader(filename)
|
||
text_splitter = CHNDocumentSplitter(
|
||
pdf=True, sentence_size=KNOWLEDGE_CHUNK_SPLIT_SIZE
|
||
)
|
||
docs = loader.load_and_split(text_splitter)
|
||
i = 0
|
||
for d in docs:
|
||
content = markdown.markdown(d.page_content)
|
||
soup = BeautifulSoup(content, "html.parser")
|
||
for tag in soup(["!doctype", "meta", "i.fa"]):
|
||
tag.extract()
|
||
docs[i].page_content = soup.get_text()
|
||
docs[i].page_content = docs[i].page_content.replace("\n", " ")
|
||
i += 1
|
||
elif filename.lower().endswith(".pdf"):
|
||
loader = PyPDFLoader(filename)
|
||
textsplitter = CHNDocumentSplitter(
|
||
pdf=True, sentence_size=KNOWLEDGE_CHUNK_SPLIT_SIZE
|
||
)
|
||
docs = loader.load_and_split(textsplitter)
|
||
i = 0
|
||
for d in docs:
|
||
docs[i].page_content = d.page_content.replace("\n", " ").replace(
|
||
"<EFBFBD>", ""
|
||
)
|
||
i += 1
|
||
else:
|
||
loader = TextLoader(filename)
|
||
text_splitor = CHNDocumentSplitter(sentence_size=KNOWLEDGE_CHUNK_SPLIT_SIZE)
|
||
docs = loader.load_and_split(text_splitor)
|
||
return docs
|