DB-GPT/dbgpt/rag/knowledge/pdf.py

513 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PDF Knowledge."""
import json
import os
import re
from collections import defaultdict
from typing import Any, Dict, List, Optional, Union
from dbgpt.component import logger
from dbgpt.core import Document
from dbgpt.rag.knowledge.base import (
ChunkStrategy,
DocumentType,
Knowledge,
KnowledgeType,
)
class PDFKnowledge(Knowledge):
"""PDF Knowledge."""
def __init__(
self,
file_path: Optional[str] = None,
knowledge_type: KnowledgeType = KnowledgeType.DOCUMENT,
loader: Optional[Any] = None,
language: Optional[str] = "zh",
metadata: Optional[Dict[str, Union[str, List[str]]]] = None,
**kwargs: Any,
) -> None:
"""Create PDF Knowledge with Knowledge arguments.
Args:
file_path(str, optional): file path
knowledge_type(KnowledgeType, optional): knowledge type
loader(Any, optional): loader
language(str, optional): language
"""
super().__init__(
path=file_path,
knowledge_type=knowledge_type,
data_loader=loader,
metadata=metadata,
**kwargs,
)
self._language = language
self._pdf_processor = PDFProcessor(filepath=self._path)
self.all_title: List[dict] = []
self.all_text: List[dict] = []
def process_text_data(self):
"""Text data processing to level 1 and level 2 titles."""
for i, data in enumerate(self.all_text):
# data = self.all_text[i]
inside_content = data.get("inside")
content_type = data.get("type")
if content_type == "text":
# use regex to match the first level title
first_level_match = re.match(
r"§(\d+)+([\u4e00-\u9fa5]+)", inside_content.strip()
)
second_level_match = re.match(
r"(\d+\.\d+)([\u4e00-\u9fa5]+)", inside_content.strip()
)
first_num_match = re.match(r"^§(\d+)$", inside_content.strip())
# get all level 1 titles
title_name = [
dictionary["first_title"]
for dictionary in self.all_title
if "first_title" in dictionary
]
if first_level_match:
first_title_text = first_level_match.group(2)
first_title_num = first_level_match.group(1)
first_title = first_title_num + first_title_text
# the title does not contain "..." and is not in the title list
# , add it to the title list
if first_title not in title_name and (
int(first_title_num) == 1
or int(first_title_num) - int(self.all_title[-1]["id"]) == 1
):
current_entry = {
"id": first_title_num,
"first_title": first_title,
"second_title": [],
"table": [],
}
self.all_title.append(current_entry)
elif second_level_match:
second_title_name = second_level_match.group(0)
second_title = second_level_match.group(1)
first_title = second_title.split(".")[0]
if (int(first_title) - 1 >= len(self.all_title)) or int(
first_title
) - 1 < 0:
continue
else:
titles = [
sub_item["title"]
for sub_item in self.all_title[int(first_title) - 1][
"second_title"
]
]
if second_title_name not in titles:
self.all_title[int(first_title) - 1]["second_title"].append(
{"title": second_title_name, "table": []}
)
elif first_num_match:
first_num = first_num_match.group(1)
first_text = self.all_text[i + 1].get("inside")
first_title = first_num_match.group(1) + first_text
# if the title does not contain "..." and is not in the title list
if (
"..." not in first_text
and first_title not in title_name
and (
int(first_num) == 1
or int(first_num) - int(self.all_title[-1]["id"]) == 1
)
):
current_entry = {
"id": first_num,
"first_title": first_title,
"second_title": [],
"table": [],
}
self.all_title.append(current_entry)
def _load(self) -> List[Document]:
"""Load pdf document from loader."""
if self._loader:
documents = self._loader.load()
else:
self._pdf_processor.pdf_to_json()
file_title = self.file_path.rsplit("/", 1)[-1].replace(".pdf", "")
self.all_text = list(self._pdf_processor.all_text.values())
self.process_text_data()
temp_table = []
temp_title = None
page_documents = []
merged_data = {} # type: ignore # noqa
for i, data in enumerate(self.all_text):
content_type = data.get("type")
inside_content = data.get("inside")
page = data.get("page")
if content_type == "excel":
temp_table.append(inside_content)
if temp_title is None:
for j in range(i - 1, -1, -1):
if self.all_text[j]["type"] == "excel":
break
if self.all_text[j]["type"] == "text":
content = self.all_text[j]["inside"]
if re.match(
r"^\d+\.\d+", content
) or content.startswith("§"):
temp_title = content.strip()
break
else:
temp_title = content.strip()
break
elif content_type == "text":
if page in merged_data:
# page merge
merged_data[page]["inside_content"] += " " + inside_content
else:
merged_data[page] = {
"inside_content": inside_content,
"type": "text",
}
# merge excel table
if temp_table:
table_meta = {
"title": temp_title or temp_table[0],
"type": "excel",
}
self.all_title.append(table_meta)
# markdown format
markdown_tables = []
if temp_table:
header = eval(temp_table[0])
markdown_tables.append(header)
for entry in temp_table[1:]:
row = eval(entry)
markdown_tables.append(row)
markdown_output = "| " + " | ".join(header) + " |\n"
markdown_output += (
"| " + " | ".join(["---"] * len(header)) + " |\n"
)
for row in markdown_tables[1:]:
markdown_output += "| " + " | ".join(row) + " |\n"
# merged content
merged_data[page]["excel_content"] = temp_table
merged_data[page]["markdown_output"] = markdown_output
temp_title = None
temp_table = []
# deal last excel
if temp_table:
table_meta = {
"title": temp_title or temp_table[0],
"table": temp_table,
"type": "excel",
}
self.all_title.append(table_meta)
# markdown format
markdown_tables = []
if temp_table:
header = eval(temp_table[0])
markdown_tables.append(header)
for entry in temp_table[1:]:
row = eval(entry)
markdown_tables.append(row)
markdown_output = "| " + " | ".join(header) + " |\n"
markdown_output += "| " + " | ".join(["---"] * len(header)) + " |\n"
for row in markdown_tables[1:]:
markdown_output += "| " + " | ".join(row) + " |\n"
# merged content
merged_data[page]["excel_content"] = temp_table
merged_data[page]["markdown_output"] = markdown_output
for page, content in merged_data.items():
inside_content = content["inside_content"]
if "markdown_output" in content:
markdown_content = content["markdown_output"]
content_metadata = {
"page": page,
"type": "excel",
"title": file_title,
"source": self.file_path,
}
page_documents.append(
Document(
content=inside_content + "\n" + markdown_content,
metadata=content_metadata,
)
)
else:
content_metadata = {
"page": page,
"type": "text",
"title": file_title,
"source": self.file_path,
}
page_documents.append(
Document(content=inside_content, metadata=content_metadata)
)
return page_documents
return [Document.langchain2doc(lc_document) for lc_document in documents]
@classmethod
def support_chunk_strategy(cls) -> List[ChunkStrategy]:
"""Return support chunk strategy."""
return [
ChunkStrategy.CHUNK_BY_SIZE,
ChunkStrategy.CHUNK_BY_PAGE,
ChunkStrategy.CHUNK_BY_SEPARATOR,
]
@classmethod
def default_chunk_strategy(cls) -> ChunkStrategy:
"""Return default chunk strategy."""
return ChunkStrategy.CHUNK_BY_SIZE
@classmethod
def type(cls) -> KnowledgeType:
"""Return knowledge type."""
return KnowledgeType.DOCUMENT
@classmethod
def document_type(cls) -> DocumentType:
"""Document type of PDF."""
return DocumentType.PDF
class PDFProcessor:
"""PDFProcessor class."""
def __init__(self, filepath):
"""Initialize PDFProcessor class."""
self.filepath = filepath
try:
import pdfplumber # type: ignore
except ImportError:
raise ImportError("Please install pdfplumber first.")
self.pdf = pdfplumber.open(filepath)
self.all_text = defaultdict(dict)
self.allrow = 0
self.last_num = 0
def check_lines(self, page, top, buttom):
"""Check lines."""
lines = page.extract_words()[::]
text = ""
last_top = 0
last_check = 0
for line in range(len(lines)):
each_line = lines[line]
check_re = (
"(?:。||单位:人民币元|金额单位:人民币元|单位:万元|币种:人民币|\d|"
"报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
)
if top == "" and buttom == "":
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.9 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
elif top == "":
if each_line["top"] > buttom:
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.85 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
else:
if each_line["top"] < top and each_line["top"] > buttom:
if abs(last_top - each_line["top"]) <= 2 or (
last_check > 0
and (page.height * 0.85 - each_line["top"]) > 0
and not re.search(check_re, text)
):
text = text + each_line["text"]
else:
text = text + "\n" + each_line["text"]
last_top = each_line["top"]
last_check = each_line["x1"] - page.width * 0.85
return text
def drop_empty_cols(self, data):
"""Delete empty column."""
transposed_data = list(map(list, zip(*data)))
filtered_data = [
col for col in transposed_data if not all(cell == "" for cell in col)
]
result = list(map(list, zip(*filtered_data)))
return result
def extract_text_and_tables(self, page):
"""Extract text and tables."""
buttom = 0
tables = page.find_tables()
if len(tables) >= 1:
count = len(tables)
for table in tables:
# process text before table
if table.bbox[3] < buttom:
pass
else:
count -= 1
# process text before table
top = table.bbox[1]
text = self.check_lines(page, top, buttom)
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
# process table
buttom = table.bbox[3]
new_table = table.extract()
r_count = 0
for r in range(len(new_table)):
row = new_table[r]
if row[0] is None:
r_count += 1
for c in range(len(row)):
if row[c] is not None and row[c] not in ["", " "]:
if new_table[r - r_count][c] is None:
new_table[r - r_count][c] = row[c]
else:
new_table[r - r_count][c] += row[c]
new_table[r][c] = None
else:
r_count = 0
end_table = []
for row in new_table:
if row[0] is not None:
cell_list = []
cell_check = False
for cell in row:
if cell is not None:
cell = cell.replace("\n", "")
else:
cell = ""
if cell != "":
cell_check = True
cell_list.append(cell)
if cell_check:
end_table.append(cell_list)
end_table = self.drop_empty_cols(end_table)
# process when column name is empty
if len(end_table) > 0:
for i in range(len(end_table[0])):
if end_table[0][i] == "":
if 0 < i < len(end_table[0]) - 1:
# left column name
left_column = end_table[0][i - 1]
# right column name
right_column = end_table[0][i + 1]
# current name = left name + right name
end_table[0][i] = left_column + right_column
else:
# if current column is empty and is the first
# column, assign the right column name.
# if current column is empty and is the
# last column, assign the left column name.
end_table[0][i] = (
end_table[0][i - 1]
if i == len(end_table[0]) - 1
else end_table[0][i + 1]
)
# if the first row is empty, assign the value of the previous row
for i in range(1, len(end_table)):
for j in range(len(end_table[i])):
if end_table[i][j] == "":
end_table[i][j] = end_table[i][j - 1]
for row in end_table:
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "excel",
"inside": str(row),
}
self.allrow += 1
if count == 0:
text = self.check_lines(page, "", buttom)
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
else:
text = self.check_lines(page, "", "")
text_list = text.split("\n")
for _t in range(len(text_list)):
self.all_text[self.allrow] = {
"page": page.page_number,
"allrow": self.allrow,
"type": "text",
"inside": text_list[_t],
}
self.allrow += 1
first_re = "[^计](?:报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
end_re = "^(?:\d|\\|\/|第|共|页|-|_| ){1,}"
if self.last_num == 0:
try:
first_text = str(self.all_text[1]["inside"])
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
if re.search(first_re, first_text) and "[" not in end_text:
self.all_text[1]["type"] = "页眉"
if re.search(end_re, end_text) and "[" not in end_text:
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
except Exception:
print(page.page_number)
else:
try:
first_text = str(self.all_text[self.last_num + 2]["inside"])
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
if re.search(first_re, first_text) and "[" not in end_text:
self.all_text[self.last_num + 2]["type"] = "页眉"
if re.search(end_re, end_text) and "[" not in end_text:
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
except Exception:
print(page.page_number)
self.last_num = len(self.all_text) - 1
def pdf_to_json(self):
"""Process pdf."""
for i in range(len(self.pdf.pages)):
self.extract_text_and_tables(self.pdf.pages[i])
logger.info(f"{self.filepath} page {i} extract text success")
def save_all_text(self, path):
"""Save all text."""
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
for key in self.all_text.keys():
with open(path, "a+", encoding="utf-8") as file:
file.write(json.dumps(self.all_text[key], ensure_ascii=False) + "\n")