diff --git a/dbgpt/rag/knowledge/pdf.py b/dbgpt/rag/knowledge/pdf.py index 0701e653a..eff1d00d8 100644 --- a/dbgpt/rag/knowledge/pdf.py +++ b/dbgpt/rag/knowledge/pdf.py @@ -1,6 +1,11 @@ """PDF Knowledge.""" +import json +import os +import re +from collections import defaultdict from typing import Any, Dict, List, Optional, Union +from dbgpt.component import logger from dbgpt.core import Document from dbgpt.rag.knowledge.base import ( ChunkStrategy, @@ -38,44 +43,213 @@ class PDFKnowledge(Knowledge): **kwargs, ) self._language = language + self._pdf_processor = PDFProcessor(filepath=self._path) + self.all_title: List[dict] = [] + self.all_text: List[dict] = [] + + def process_text_data(self): + """Text data processing to level 1 and level 2 titles.""" + for i, data in enumerate(self.all_text): + # data = self.all_text[i] + inside_content = data.get("inside") + content_type = data.get("type") + if content_type == "text": + # use regex to match the first level title + first_level_match = re.match( + r"§(\d+)+([\u4e00-\u9fa5]+)", inside_content.strip() + ) + second_level_match = re.match( + r"(\d+\.\d+)([\u4e00-\u9fa5]+)", inside_content.strip() + ) + first_num_match = re.match(r"^§(\d+)$", inside_content.strip()) + # get all level 1 titles + title_name = [ + dictionary["first_title"] + for dictionary in self.all_title + if "first_title" in dictionary + ] + if first_level_match: + first_title_text = first_level_match.group(2) + first_title_num = first_level_match.group(1) + first_title = first_title_num + first_title_text + # the title does not contain "..." and is not in the title list + # , add it to the title list + if first_title not in title_name and ( + int(first_title_num) == 1 + or int(first_title_num) - int(self.all_title[-1]["id"]) == 1 + ): + current_entry = { + "id": first_title_num, + "first_title": first_title, + "second_title": [], + "table": [], + } + self.all_title.append(current_entry) + + elif second_level_match: + second_title_name = second_level_match.group(0) + second_title = second_level_match.group(1) + first_title = second_title.split(".")[0] + if (int(first_title) - 1 >= len(self.all_title)) or int( + first_title + ) - 1 < 0: + continue + else: + titles = [ + sub_item["title"] + for sub_item in self.all_title[int(first_title) - 1][ + "second_title" + ] + ] + if second_title_name not in titles: + self.all_title[int(first_title) - 1]["second_title"].append( + {"title": second_title_name, "table": []} + ) + elif first_num_match: + first_num = first_num_match.group(1) + first_text = self.all_text[i + 1].get("inside") + first_title = first_num_match.group(1) + first_text + # if the title does not contain "..." and is not in the title list + if ( + "..." not in first_text + and first_title not in title_name + and ( + int(first_num) == 1 + or int(first_num) - int(self.all_title[-1]["id"]) == 1 + ) + ): + current_entry = { + "id": first_num, + "first_title": first_title, + "second_title": [], + "table": [], + } + self.all_title.append(current_entry) def _load(self) -> List[Document]: """Load pdf document from loader.""" if self._loader: documents = self._loader.load() else: - import pypdf + self._pdf_processor.pdf_to_json() + file_title = self.file_path.rsplit("/", 1)[-1].replace(".pdf", "") + self.all_text = list(self._pdf_processor.all_text.values()) + self.process_text_data() + temp_table = [] + temp_title = None + page_documents = [] + merged_data = {} # type ignore + for i, data in enumerate(self.all_text): + content_type = data.get("type") + inside_content = data.get("inside") + page = data.get("page") - pages = [] - documents = [] - if not self._path: - raise ValueError("file path is required") - with open(self._path, "rb") as file: - reader = pypdf.PdfReader(file) - for page_num in range(len(reader.pages)): - _page = reader.pages[page_num] - pages.append((_page.extract_text(), page_num)) - - # cleaned_pages = [] - for page, page_num in pages: - lines = page.splitlines() - - cleaned_lines = [] - for line in lines: - if self._language == "en": - words = list(line) # noqa: F841 + if content_type == "excel": + temp_table.append(inside_content) + if temp_title is None: + for j in range(i - 1, -1, -1): + if self.all_text[j]["type"] == "excel": + break + if self.all_text[j]["type"] == "text": + content = self.all_text[j]["inside"] + if re.match( + r"^\d+\.\d+", content + ) or content.startswith("§"): + temp_title = content.strip() + break + else: + temp_title = content.strip() + break + elif content_type == "text": + if page in merged_data: + # page merge + merged_data[page]["inside_content"] += " " + inside_content else: - words = line.split() # noqa: F841 - cleaned_lines.append(line) - page = "\n".join(cleaned_lines) - # cleaned_pages.append(page) - metadata = {"source": self._path, "page": page_num} - if self._metadata: - metadata.update(self._metadata) # type: ignore - # text = "\f".join(cleaned_pages) - document = Document(content=page, metadata=metadata) - documents.append(document) - return documents + merged_data[page] = { + "inside_content": inside_content, + "type": "text", + } + + # merge excel table + if temp_table: + table_meta = { + "title": temp_title or temp_table[0], + "type": "excel", + } + self.all_title.append(table_meta) + + # markdown format + markdown_tables = [] + if temp_table: + header = eval(temp_table[0]) + markdown_tables.append(header) + for entry in temp_table[1:]: + row = eval(entry) + markdown_tables.append(row) + markdown_output = "| " + " | ".join(header) + " |\n" + markdown_output += ( + "| " + " | ".join(["---"] * len(header)) + " |\n" + ) + for row in markdown_tables[1:]: + markdown_output += "| " + " | ".join(row) + " |\n" + + # merged content + merged_data[page]["excel_content"] = temp_table + merged_data[page]["markdown_output"] = markdown_output + + temp_title = None + temp_table = [] + + # deal last excel + if temp_table: + table_meta = { + "title": temp_title or temp_table[0], + "table": temp_table, + "type": "excel", + } + self.all_title.append(table_meta) + # markdown format + markdown_tables = [] + if temp_table: + header = eval(temp_table[0]) + markdown_tables.append(header) + for entry in temp_table[1:]: + row = eval(entry) + markdown_tables.append(row) + markdown_output = "| " + " | ".join(header) + " |\n" + markdown_output += "| " + " | ".join(["---"] * len(header)) + " |\n" + for row in markdown_tables[1:]: + markdown_output += "| " + " | ".join(row) + " |\n" + # merged content + merged_data[page]["excel_content"] = temp_table + merged_data[page]["markdown_output"] = markdown_output + + for page, content in merged_data.items(): + inside_content = content["inside_content"] + if "markdown_output" in content: + markdown_content = content["markdown_output"] + content_metadata = { + "page": page, + "type": "excel", + "title": file_title, + } + page_documents.append( + Document( + content=inside_content + "\n" + markdown_content, + metadata=content_metadata, + ) + ) + else: + content_metadata = { + "page": page, + "type": "text", + "title": file_title, + } + page_documents.append( + Document(content=inside_content, metadata=content_metadata) + ) + + return page_documents return [Document.langchain2doc(lc_document) for lc_document in documents] @classmethod @@ -101,3 +275,236 @@ class PDFKnowledge(Knowledge): def document_type(cls) -> DocumentType: """Document type of PDF.""" return DocumentType.PDF + + +class PDFProcessor: + """PDFProcessor class.""" + + def __init__(self, filepath): + """Initialize PDFProcessor class.""" + self.filepath = filepath + try: + import pdfplumber # type: ignore + except ImportError: + raise ImportError("Please install pdfplumber first.") + self.pdf = pdfplumber.open(filepath) + self.all_text = defaultdict(dict) + self.allrow = 0 + self.last_num = 0 + + def check_lines(self, page, top, buttom): + """Check lines.""" + lines = page.extract_words()[::] + text = "" + last_top = 0 + last_check = 0 + for line in range(len(lines)): + each_line = lines[line] + check_re = ( + "(?:。|;|单位:人民币元|金额单位:人民币元|单位:万元|币种:人民币|\d|" + "报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$" + ) + if top == "" and buttom == "": + if abs(last_top - each_line["top"]) <= 2 or ( + last_check > 0 + and (page.height * 0.9 - each_line["top"]) > 0 + and not re.search(check_re, text) + ): + text = text + each_line["text"] + else: + text = text + "\n" + each_line["text"] + elif top == "": + if each_line["top"] > buttom: + if abs(last_top - each_line["top"]) <= 2 or ( + last_check > 0 + and (page.height * 0.85 - each_line["top"]) > 0 + and not re.search(check_re, text) + ): + text = text + each_line["text"] + else: + text = text + "\n" + each_line["text"] + else: + if each_line["top"] < top and each_line["top"] > buttom: + if abs(last_top - each_line["top"]) <= 2 or ( + last_check > 0 + and (page.height * 0.85 - each_line["top"]) > 0 + and not re.search(check_re, text) + ): + text = text + each_line["text"] + else: + text = text + "\n" + each_line["text"] + last_top = each_line["top"] + last_check = each_line["x1"] - page.width * 0.85 + + return text + + def drop_empty_cols(self, data): + """Delete empty column.""" + transposed_data = list(map(list, zip(*data))) + filtered_data = [ + col for col in transposed_data if not all(cell == "" for cell in col) + ] + result = list(map(list, zip(*filtered_data))) + return result + + def extract_text_and_tables(self, page): + """Extract text and tables.""" + buttom = 0 + tables = page.find_tables() + if len(tables) >= 1: + count = len(tables) + for table in tables: + # process text before table + if table.bbox[3] < buttom: + pass + else: + count -= 1 + # process text before table + top = table.bbox[1] + text = self.check_lines(page, top, buttom) + text_list = text.split("\n") + for _t in range(len(text_list)): + self.all_text[self.allrow] = { + "page": page.page_number, + "allrow": self.allrow, + "type": "text", + "inside": text_list[_t], + } + self.allrow += 1 + + # process table + buttom = table.bbox[3] + new_table = table.extract() + r_count = 0 + for r in range(len(new_table)): + row = new_table[r] + if row[0] is None: + r_count += 1 + for c in range(len(row)): + if row[c] is not None and row[c] not in ["", " "]: + if new_table[r - r_count][c] is None: + new_table[r - r_count][c] = row[c] + else: + new_table[r - r_count][c] += row[c] + new_table[r][c] = None + else: + r_count = 0 + + end_table = [] + for row in new_table: + if row[0] is not None: + cell_list = [] + cell_check = False + for cell in row: + if cell is not None: + cell = cell.replace("\n", "") + else: + cell = "" + if cell != "": + cell_check = True + cell_list.append(cell) + if cell_check: + end_table.append(cell_list) + + end_table = self.drop_empty_cols(end_table) + + # process when column name is empty + if len(end_table) > 0: + for i in range(len(end_table[0])): + if end_table[0][i] == "": + if 0 < i < len(end_table[0]) - 1: + # left column name + left_column = end_table[0][i - 1] + # right column name + right_column = end_table[0][i + 1] + # current name = left name + right name + end_table[0][i] = left_column + right_column + else: + # if current column is empty and is the first + # column, assign the right column name. + # if current column is empty and is the + # last column, assign the left column name. + end_table[0][i] = ( + end_table[0][i - 1] + if i == len(end_table[0]) - 1 + else end_table[0][i + 1] + ) + + # if the first row is empty, assign the value of the previous row + for i in range(1, len(end_table)): + for j in range(len(end_table[i])): + if end_table[i][j] == "": + end_table[i][j] = end_table[i][j - 1] + + for row in end_table: + self.all_text[self.allrow] = { + "page": page.page_number, + "allrow": self.allrow, + "type": "excel", + "inside": str(row), + } + self.allrow += 1 + + if count == 0: + text = self.check_lines(page, "", buttom) + text_list = text.split("\n") + for _t in range(len(text_list)): + self.all_text[self.allrow] = { + "page": page.page_number, + "allrow": self.allrow, + "type": "text", + "inside": text_list[_t], + } + self.allrow += 1 + + else: + text = self.check_lines(page, "", "") + text_list = text.split("\n") + for _t in range(len(text_list)): + self.all_text[self.allrow] = { + "page": page.page_number, + "allrow": self.allrow, + "type": "text", + "inside": text_list[_t], + } + self.allrow += 1 + + first_re = "[^计](?:报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$" + end_re = "^(?:\d|\\|\/|第|共|页|-|_| ){1,}" + if self.last_num == 0: + try: + first_text = str(self.all_text[1]["inside"]) + end_text = str(self.all_text[len(self.all_text) - 1]["inside"]) + if re.search(first_re, first_text) and "[" not in end_text: + self.all_text[1]["type"] = "页眉" + if re.search(end_re, end_text) and "[" not in end_text: + self.all_text[len(self.all_text) - 1]["type"] = "页脚" + except Exception: + print(page.page_number) + else: + try: + first_text = str(self.all_text[self.last_num + 2]["inside"]) + end_text = str(self.all_text[len(self.all_text) - 1]["inside"]) + if re.search(first_re, first_text) and "[" not in end_text: + self.all_text[self.last_num + 2]["type"] = "页眉" + if re.search(end_re, end_text) and "[" not in end_text: + self.all_text[len(self.all_text) - 1]["type"] = "页脚" + except Exception: + print(page.page_number) + + self.last_num = len(self.all_text) - 1 + + def pdf_to_json(self): + """Process pdf.""" + for i in range(len(self.pdf.pages)): + self.extract_text_and_tables(self.pdf.pages[i]) + logger.info(f"{self.filepath} page {i} extract text success") + + def save_all_text(self, path): + """Save all text.""" + directory = os.path.dirname(path) + if not os.path.exists(directory): + os.makedirs(directory) + for key in self.all_text.keys(): + with open(path, "a+", encoding="utf-8") as file: + file.write(json.dumps(self.all_text[key], ensure_ascii=False) + "\n") diff --git a/dbgpt/rag/knowledge/tests/test_pdf.py b/dbgpt/rag/knowledge/tests/test_pdf.py index ea3c4be99..07096432c 100644 --- a/dbgpt/rag/knowledge/tests/test_pdf.py +++ b/dbgpt/rag/knowledge/tests/test_pdf.py @@ -5,8 +5,8 @@ import pytest from dbgpt.rag.knowledge.pdf import PDFKnowledge MOCK_PDF_PAGES = [ - ("This is the content of the first page.", 0), - ("This is the content of the second page.", 1), + ("", 0), + ("", 1), ] @@ -19,19 +19,19 @@ def mock_pdf_open_and_reader(): for page in MOCK_PDF_PAGES ] with patch("builtins.open", mock_pdf_file): - with patch("pypdf.PdfReader", return_value=mock_reader) as mock: + with patch("pdfplumber.open", return_value=mock_reader) as mock: yield mock def test_load_from_pdf(mock_pdf_open_and_reader): - file_path = "test_document.pdf" + file_path = "test_document" knowledge = PDFKnowledge(file_path=file_path) documents = knowledge._load() assert len(documents) == len(MOCK_PDF_PAGES) for i, document in enumerate(documents): assert MOCK_PDF_PAGES[i][0] in document.content - assert document.metadata["source"] == file_path - assert document.metadata["page"] == MOCK_PDF_PAGES[i][1] + assert document.metadata["title"] == file_path + assert document.metadata["type"] == "text" # diff --git a/dbgpt/rag/text_splitter/text_splitter.py b/dbgpt/rag/text_splitter/text_splitter.py index a4b44ca8e..f4374e65e 100644 --- a/dbgpt/rag/text_splitter/text_splitter.py +++ b/dbgpt/rag/text_splitter/text_splitter.py @@ -58,9 +58,15 @@ class TextSplitter(ABC): _metadatas = metadatas or [{}] * len(texts) chunks = [] for i, text in enumerate(texts): - for chunk in self.split_text(text, separator=separator, **kwargs): - new_doc = Chunk(content=chunk, metadata=copy.deepcopy(_metadatas[i])) - chunks.append(new_doc) + if _metadatas[i].get("type") == "excel": + table_chunk = Chunk(content=text, metadata=copy.deepcopy(_metadatas[i])) + chunks.append(table_chunk) + else: + for chunk in self.split_text(text, separator=separator, **kwargs): + new_doc = Chunk( + content=chunk, metadata=copy.deepcopy(_metadatas[i]) + ) + chunks.append(new_doc) return chunks def split_documents(self, documents: Iterable[Document], **kwargs) -> List[Chunk]: @@ -489,11 +495,15 @@ class MarkdownHeaderTextSplitter(TextSplitter): _metadatas = metadatas or [{}] * len(texts) chunks = [] for i, text in enumerate(texts): - for chunk in self.split_text(text, separator, **kwargs): - metadata = chunk.metadata or {} - metadata.update(_metadatas[i]) - new_doc = Chunk(content=chunk.content, metadata=metadata) - chunks.append(new_doc) + if _metadatas[i].get("type") == "excel": + table_chunk = Chunk(content=text, metadata=copy.deepcopy(_metadatas[i])) + chunks.append(table_chunk) + else: + for chunk in self.split_text(text, separator, **kwargs): + metadata = chunk.metadata or {} + metadata.update(_metadatas[i]) + new_doc = Chunk(content=chunk.content, metadata=metadata) + chunks.append(new_doc) return chunks def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Chunk]: