feat(ChatKnowledge):pdf loader support table extraction (#2147)

This commit is contained in:
Aries-ckt 2024-11-21 20:34:57 +08:00 committed by GitHub
parent 0bc478b7b5
commit 43c86caebf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 461 additions and 44 deletions

View File

@ -1,6 +1,11 @@
"""PDF Knowledge."""
import json
import os
import re
from collections import defaultdict
from typing import Any, Dict, List, Optional, Union
from dbgpt.component import logger
from dbgpt.core import Document
from dbgpt.rag.knowledge.base import (
ChunkStrategy,
@ -38,44 +43,213 @@ class PDFKnowledge(Knowledge):
**kwargs,
)
self._language = language
self._pdf_processor = PDFProcessor(filepath=self._path)
self.all_title: List[dict] = []
self.all_text: List[dict] = []
def process_text_data(self):
"""Text data processing to level 1 and level 2 titles."""
for i, data in enumerate(self.all_text):
# data = self.all_text[i]
inside_content = data.get("inside")
content_type = data.get("type")
if content_type == "text":
# use regex to match the first level title
first_level_match = re.match(
r"§(\d+)+([\u4e00-\u9fa5]+)", inside_content.strip()
)
second_level_match = re.match(
r"(\d+\.\d+)([\u4e00-\u9fa5]+)", inside_content.strip()
)
first_num_match = re.match(r"^§(\d+)$", inside_content.strip())
# get all level 1 titles
title_name = [
dictionary["first_title"]
for dictionary in self.all_title
if "first_title" in dictionary
]
if first_level_match:
first_title_text = first_level_match.group(2)
first_title_num = first_level_match.group(1)
first_title = first_title_num + first_title_text
# the title does not contain "..." and is not in the title list
# , add it to the title list
if first_title not in title_name and (
int(first_title_num) == 1
or int(first_title_num) - int(self.all_title[-1]["id"]) == 1
):
current_entry = {
"id": first_title_num,
"first_title": first_title,
"second_title": [],
"table": [],
}
self.all_title.append(current_entry)
elif second_level_match:
second_title_name = second_level_match.group(0)
second_title = second_level_match.group(1)
first_title = second_title.split(".")[0]
if (int(first_title) - 1 >= len(self.all_title)) or int(
first_title
) - 1 < 0:
continue
else:
titles = [
sub_item["title"]
for sub_item in self.all_title[int(first_title) - 1][
"second_title"
]
]
if second_title_name not in titles:
self.all_title[int(first_title) - 1]["second_title"].append(
{"title": second_title_name, "table": []}
)
elif first_num_match:
first_num = first_num_match.group(1)
first_text = self.all_text[i + 1].get("inside")
first_title = first_num_match.group(1) + first_text
# if the title does not contain "..." and is not in the title list
if (
"..." not in first_text
and first_title not in title_name
and (
int(first_num) == 1
or int(first_num) - int(self.all_title[-1]["id"]) == 1
)
):
current_entry = {
"id": first_num,
"first_title": first_title,
"second_title": [],
"table": [],
}
self.all_title.append(current_entry)
def _load(self) -> List[Document]:
"""Load pdf document from loader."""
if self._loader:
documents = self._loader.load()
else:
import pypdf
self._pdf_processor.pdf_to_json()
file_title = self.file_path.rsplit("/", 1)[-1].replace(".pdf", "")
self.all_text = list(self._pdf_processor.all_text.values())
self.process_text_data()
temp_table = []
temp_title = None
page_documents = []
merged_data = {} # type ignore
for i, data in enumerate(self.all_text):
content_type = data.get("type")
inside_content = data.get("inside")
page = data.get("page")
pages = []
documents = []
if not self._path:
raise ValueError("file path is required")
with open(self._path, "rb") as file:
reader = pypdf.PdfReader(file)
for page_num in range(len(reader.pages)):
_page = reader.pages[page_num]
pages.append((_page.extract_text(), page_num))
# cleaned_pages = []
for page, page_num in pages:
lines = page.splitlines()
cleaned_lines = []
for line in lines:
if self._language == "en":
words = list(line) # noqa: F841
if content_type == "excel":
temp_table.append(inside_content)
if temp_title is None:
for j in range(i - 1, -1, -1):
if self.all_text[j]["type"] == "excel":
break
if self.all_text[j]["type"] == "text":
content = self.all_text[j]["inside"]
if re.match(
r"^\d+\.\d+", content
) or content.startswith("§"):
temp_title = content.strip()
break
else:
temp_title = content.strip()
break
elif content_type == "text":
if page in merged_data:
# page merge
merged_data[page]["inside_content"] += " " + inside_content
else:
words = line.split() # noqa: F841
cleaned_lines.append(line)
page = "\n".join(cleaned_lines)
# cleaned_pages.append(page)
metadata = {"source": self._path, "page": page_num}
if self._metadata:
metadata.update(self._metadata) # type: ignore
# text = "\f".join(cleaned_pages)
document = Document(content=page, metadata=metadata)
documents.append(document)
return documents
merged_data[page] = {
"inside_content": inside_content,
"type": "text",
}
# merge excel table
if temp_table:
table_meta = {
"title": temp_title or temp_table[0],
"type": "excel",
}
self.all_title.append(table_meta)
# markdown format
markdown_tables = []
if temp_table:
header = eval(temp_table[0])
markdown_tables.append(header)
for entry in temp_table[1:]:
row = eval(entry)
markdown_tables.append(row)
markdown_output = "| " + " | ".join(header) + " |\n"
markdown_output += (
"| " + " | ".join(["---"] * len(header)) + " |\n"
)
for row in markdown_tables[1:]:
markdown_output += "| " + " | ".join(row) + " |\n"
# merged content
merged_data[page]["excel_content"] = temp_table
merged_data[page]["markdown_output"] = markdown_output
temp_title = None
temp_table = []
# deal last excel
if temp_table:
table_meta = {
"title": temp_title or temp_table[0],
"table": temp_table,
"type": "excel",
}
self.all_title.append(table_meta)
# markdown format
markdown_tables = []
if temp_table:
header = eval(temp_table[0])
markdown_tables.append(header)
for entry in temp_table[1:]:
row = eval(entry)
markdown_tables.append(row)
markdown_output = "| " + " | ".join(header) + " |\n"
markdown_output += "| " + " | ".join(["---"] * len(header)) + " |\n"
for row in markdown_tables[1:]:
markdown_output += "| " + " | ".join(row) + " |\n"
# merged content
merged_data[page]["excel_content"] = temp_table
merged_data[page]["markdown_output"] = markdown_output
for page, content in merged_data.items():
inside_content = content["inside_content"]
if "markdown_output" in content:
markdown_content = content["markdown_output"]
content_metadata = {
"page": page,
"type": "excel",
"title": file_title,
}
page_documents.append(
Document(
content=inside_content + "\n" + markdown_content,
metadata=content_metadata,
)
)
else:
content_metadata = {
"page": page,
"type": "text",
"title": file_title,
}
page_documents.append(
Document(content=inside_content, metadata=content_metadata)
)
return page_documents
return [Document.langchain2doc(lc_document) for lc_document in documents]
@classmethod
@ -101,3 +275,236 @@ class PDFKnowledge(Knowledge):
def document_type(cls) -> DocumentType:
"""Document type of PDF."""
return DocumentType.PDF
class PDFProcessor:
"""PDFProcessor class."""
def __init__(self, filepath):
"""Initialize PDFProcessor class."""
self.filepath = filepath
try:
import pdfplumber # type: ignore
except ImportError:
raise ImportError("Please install pdfplumber first.")
self.pdf = pdfplumber.open(filepath)
self.all_text = defaultdict(dict)
self.allrow = 0
self.last_num = 0
def check_lines(self, page, top, buttom):
"""Check lines."""
lines = page.extract_words()[::]
text = ""
last_top = 0
last_check = 0
for line in range(len(lines)):
each_line = lines[line]
check_re = (
"(?:。||单位:人民币元|金额单位:人民币元|单位:万元|币种:人民币|\d|"
"报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
)
if top == "" and buttom == "":
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.9 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
elif top == "":
if each_line["top"] > buttom:
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.85 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
else:
if each_line["top"] < top and each_line["top"] > buttom:
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.85 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
last_top = each_line["top"]
last_check = each_line["x1"] - page.width * 0.85
return text
def drop_empty_cols(self, data):
"""Delete empty column."""
transposed_data = list(map(list, zip(*data)))
filtered_data = [
col for col in transposed_data if not all(cell == "" for cell in col)
]
result = list(map(list, zip(*filtered_data)))
return result
def extract_text_and_tables(self, page):
"""Extract text and tables."""
buttom = 0
tables = page.find_tables()
if len(tables) >= 1:
count = len(tables)
for table in tables:
# process text before table
if table.bbox[3] < buttom:
pass
else:
count -= 1
# process text before table
top = table.bbox[1]
text = self.check_lines(page, top, buttom)
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
# process table
buttom = table.bbox[3]
new_table = table.extract()
r_count = 0
for r in range(len(new_table)):
row = new_table[r]
if row[0] is None:
r_count += 1
for c in range(len(row)):
if row[c] is not None and row[c] not in ["", " "]:
if new_table[r - r_count][c] is None:
new_table[r - r_count][c] = row[c]
else:
new_table[r - r_count][c] += row[c]
new_table[r][c] = None
else:
r_count = 0
end_table = []
for row in new_table:
if row[0] is not None:
cell_list = []
cell_check = False
for cell in row:
if cell is not None:
cell = cell.replace("\n", "")
else:
cell = ""
if cell != "":
cell_check = True
cell_list.append(cell)
if cell_check:
end_table.append(cell_list)
end_table = self.drop_empty_cols(end_table)
# process when column name is empty
if len(end_table) > 0:
for i in range(len(end_table[0])):
if end_table[0][i] == "":
if 0 < i < len(end_table[0]) - 1:
# left column name
left_column = end_table[0][i - 1]
# right column name
right_column = end_table[0][i + 1]
# current name = left name + right name
end_table[0][i] = left_column + right_column
else:
# if current column is empty and is the first
# column, assign the right column name.
# if current column is empty and is the
# last column, assign the left column name.
end_table[0][i] = (
end_table[0][i - 1]
if i == len(end_table[0]) - 1
else end_table[0][i + 1]
)
# if the first row is empty, assign the value of the previous row
for i in range(1, len(end_table)):
for j in range(len(end_table[i])):
if end_table[i][j] == "":
end_table[i][j] = end_table[i][j - 1]
for row in end_table:
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "excel",
"inside": str(row),
}
self.allrow += 1
if count == 0:
text = self.check_lines(page, "", buttom)
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
else:
text = self.check_lines(page, "", "")
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
first_re = "[^计](?:报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
end_re = "^(?:\d|\\|\/|第|共|页|-|_| ){1,}"
if self.last_num == 0:
try:
first_text = str(self.all_text[1]["inside"])
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
if re.search(first_re, first_text) and "[" not in end_text:
self.all_text[1]["type"] = "页眉"
if re.search(end_re, end_text) and "[" not in end_text:
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
except Exception:
print(page.page_number)
else:
try:
first_text = str(self.all_text[self.last_num + 2]["inside"])
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
if re.search(first_re, first_text) and "[" not in end_text:
self.all_text[self.last_num + 2]["type"] = "页眉"
if re.search(end_re, end_text) and "[" not in end_text:
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
except Exception:
print(page.page_number)
self.last_num = len(self.all_text) - 1
def pdf_to_json(self):
"""Process pdf."""
for i in range(len(self.pdf.pages)):
self.extract_text_and_tables(self.pdf.pages[i])
logger.info(f"{self.filepath} page {i} extract text success")
def save_all_text(self, path):
"""Save all text."""
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
for key in self.all_text.keys():
with open(path, "a+", encoding="utf-8") as file:
file.write(json.dumps(self.all_text[key], ensure_ascii=False) + "\n")

View File

@ -5,8 +5,8 @@ import pytest
from dbgpt.rag.knowledge.pdf import PDFKnowledge
MOCK_PDF_PAGES = [
("This is the content of the first page.", 0),
("This is the content of the second page.", 1),
("", 0),
("", 1),
]
@ -19,19 +19,19 @@ def mock_pdf_open_and_reader():
for page in MOCK_PDF_PAGES
]
with patch("builtins.open", mock_pdf_file):
with patch("pypdf.PdfReader", return_value=mock_reader) as mock:
with patch("pdfplumber.open", return_value=mock_reader) as mock:
yield mock
def test_load_from_pdf(mock_pdf_open_and_reader):
file_path = "test_document.pdf"
file_path = "test_document"
knowledge = PDFKnowledge(file_path=file_path)
documents = knowledge._load()
assert len(documents) == len(MOCK_PDF_PAGES)
for i, document in enumerate(documents):
assert MOCK_PDF_PAGES[i][0] in document.content
assert document.metadata["source"] == file_path
assert document.metadata["page"] == MOCK_PDF_PAGES[i][1]
assert document.metadata["title"] == file_path
assert document.metadata["type"] == "text"
#

View File

@ -58,9 +58,15 @@ class TextSplitter(ABC):
_metadatas = metadatas or [{}] * len(texts)
chunks = []
for i, text in enumerate(texts):
for chunk in self.split_text(text, separator=separator, **kwargs):
new_doc = Chunk(content=chunk, metadata=copy.deepcopy(_metadatas[i]))
chunks.append(new_doc)
if _metadatas[i].get("type") == "excel":
table_chunk = Chunk(content=text, metadata=copy.deepcopy(_metadatas[i]))
chunks.append(table_chunk)
else:
for chunk in self.split_text(text, separator=separator, **kwargs):
new_doc = Chunk(
content=chunk, metadata=copy.deepcopy(_metadatas[i])
)
chunks.append(new_doc)
return chunks
def split_documents(self, documents: Iterable[Document], **kwargs) -> List[Chunk]:
@ -489,11 +495,15 @@ class MarkdownHeaderTextSplitter(TextSplitter):
_metadatas = metadatas or [{}] * len(texts)
chunks = []
for i, text in enumerate(texts):
for chunk in self.split_text(text, separator, **kwargs):
metadata = chunk.metadata or {}
metadata.update(_metadatas[i])
new_doc = Chunk(content=chunk.content, metadata=metadata)
chunks.append(new_doc)
if _metadatas[i].get("type") == "excel":
table_chunk = Chunk(content=text, metadata=copy.deepcopy(_metadatas[i]))
chunks.append(table_chunk)
else:
for chunk in self.split_text(text, separator, **kwargs):
metadata = chunk.metadata or {}
metadata.update(_metadatas[i])
new_doc = Chunk(content=chunk.content, metadata=metadata)
chunks.append(new_doc)
return chunks
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Chunk]: