mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-05 10:29:36 +00:00
513 lines
22 KiB
Python
513 lines
22 KiB
Python
"""PDF Knowledge."""
|
||
import json
|
||
import os
|
||
import re
|
||
from collections import defaultdict
|
||
from typing import Any, Dict, List, Optional, Union
|
||
|
||
from dbgpt.component import logger
|
||
from dbgpt.core import Document
|
||
from dbgpt.rag.knowledge.base import (
|
||
ChunkStrategy,
|
||
DocumentType,
|
||
Knowledge,
|
||
KnowledgeType,
|
||
)
|
||
|
||
|
||
class PDFKnowledge(Knowledge):
|
||
"""PDF Knowledge."""
|
||
|
||
def __init__(
|
||
self,
|
||
file_path: Optional[str] = None,
|
||
knowledge_type: KnowledgeType = KnowledgeType.DOCUMENT,
|
||
loader: Optional[Any] = None,
|
||
language: Optional[str] = "zh",
|
||
metadata: Optional[Dict[str, Union[str, List[str]]]] = None,
|
||
**kwargs: Any,
|
||
) -> None:
|
||
"""Create PDF Knowledge with Knowledge arguments.
|
||
|
||
Args:
|
||
file_path(str, optional): file path
|
||
knowledge_type(KnowledgeType, optional): knowledge type
|
||
loader(Any, optional): loader
|
||
language(str, optional): language
|
||
"""
|
||
super().__init__(
|
||
path=file_path,
|
||
knowledge_type=knowledge_type,
|
||
data_loader=loader,
|
||
metadata=metadata,
|
||
**kwargs,
|
||
)
|
||
self._language = language
|
||
self._pdf_processor = PDFProcessor(filepath=self._path)
|
||
self.all_title: List[dict] = []
|
||
self.all_text: List[dict] = []
|
||
|
||
def process_text_data(self):
|
||
"""Text data processing to level 1 and level 2 titles."""
|
||
for i, data in enumerate(self.all_text):
|
||
# data = self.all_text[i]
|
||
inside_content = data.get("inside")
|
||
content_type = data.get("type")
|
||
if content_type == "text":
|
||
# use regex to match the first level title
|
||
first_level_match = re.match(
|
||
r"§(\d+)+([\u4e00-\u9fa5]+)", inside_content.strip()
|
||
)
|
||
second_level_match = re.match(
|
||
r"(\d+\.\d+)([\u4e00-\u9fa5]+)", inside_content.strip()
|
||
)
|
||
first_num_match = re.match(r"^§(\d+)$", inside_content.strip())
|
||
# get all level 1 titles
|
||
title_name = [
|
||
dictionary["first_title"]
|
||
for dictionary in self.all_title
|
||
if "first_title" in dictionary
|
||
]
|
||
if first_level_match:
|
||
first_title_text = first_level_match.group(2)
|
||
first_title_num = first_level_match.group(1)
|
||
first_title = first_title_num + first_title_text
|
||
# the title does not contain "..." and is not in the title list
|
||
# , add it to the title list
|
||
if first_title not in title_name and (
|
||
int(first_title_num) == 1
|
||
or int(first_title_num) - int(self.all_title[-1]["id"]) == 1
|
||
):
|
||
current_entry = {
|
||
"id": first_title_num,
|
||
"first_title": first_title,
|
||
"second_title": [],
|
||
"table": [],
|
||
}
|
||
self.all_title.append(current_entry)
|
||
|
||
elif second_level_match:
|
||
second_title_name = second_level_match.group(0)
|
||
second_title = second_level_match.group(1)
|
||
first_title = second_title.split(".")[0]
|
||
if (int(first_title) - 1 >= len(self.all_title)) or int(
|
||
first_title
|
||
) - 1 < 0:
|
||
continue
|
||
else:
|
||
titles = [
|
||
sub_item["title"]
|
||
for sub_item in self.all_title[int(first_title) - 1][
|
||
"second_title"
|
||
]
|
||
]
|
||
if second_title_name not in titles:
|
||
self.all_title[int(first_title) - 1]["second_title"].append(
|
||
{"title": second_title_name, "table": []}
|
||
)
|
||
elif first_num_match:
|
||
first_num = first_num_match.group(1)
|
||
first_text = self.all_text[i + 1].get("inside")
|
||
first_title = first_num_match.group(1) + first_text
|
||
# if the title does not contain "..." and is not in the title list
|
||
if (
|
||
"..." not in first_text
|
||
and first_title not in title_name
|
||
and (
|
||
int(first_num) == 1
|
||
or int(first_num) - int(self.all_title[-1]["id"]) == 1
|
||
)
|
||
):
|
||
current_entry = {
|
||
"id": first_num,
|
||
"first_title": first_title,
|
||
"second_title": [],
|
||
"table": [],
|
||
}
|
||
self.all_title.append(current_entry)
|
||
|
||
def _load(self) -> List[Document]:
|
||
"""Load pdf document from loader."""
|
||
if self._loader:
|
||
documents = self._loader.load()
|
||
else:
|
||
self._pdf_processor.pdf_to_json()
|
||
file_title = self.file_path.rsplit("/", 1)[-1].replace(".pdf", "")
|
||
self.all_text = list(self._pdf_processor.all_text.values())
|
||
self.process_text_data()
|
||
temp_table = []
|
||
temp_title = None
|
||
page_documents = []
|
||
merged_data = {} # type: ignore # noqa
|
||
for i, data in enumerate(self.all_text):
|
||
content_type = data.get("type")
|
||
inside_content = data.get("inside")
|
||
page = data.get("page")
|
||
|
||
if content_type == "excel":
|
||
temp_table.append(inside_content)
|
||
if temp_title is None:
|
||
for j in range(i - 1, -1, -1):
|
||
if self.all_text[j]["type"] == "excel":
|
||
break
|
||
if self.all_text[j]["type"] == "text":
|
||
content = self.all_text[j]["inside"]
|
||
if re.match(
|
||
r"^\d+\.\d+", content
|
||
) or content.startswith("§"):
|
||
temp_title = content.strip()
|
||
break
|
||
else:
|
||
temp_title = content.strip()
|
||
break
|
||
elif content_type == "text":
|
||
if page in merged_data:
|
||
# page merge
|
||
merged_data[page]["inside_content"] += " " + inside_content
|
||
else:
|
||
merged_data[page] = {
|
||
"inside_content": inside_content,
|
||
"type": "text",
|
||
}
|
||
|
||
# merge excel table
|
||
if temp_table:
|
||
table_meta = {
|
||
"title": temp_title or temp_table[0],
|
||
"type": "excel",
|
||
}
|
||
self.all_title.append(table_meta)
|
||
|
||
# markdown format
|
||
markdown_tables = []
|
||
if temp_table:
|
||
header = eval(temp_table[0])
|
||
markdown_tables.append(header)
|
||
for entry in temp_table[1:]:
|
||
row = eval(entry)
|
||
markdown_tables.append(row)
|
||
markdown_output = "| " + " | ".join(header) + " |\n"
|
||
markdown_output += (
|
||
"| " + " | ".join(["---"] * len(header)) + " |\n"
|
||
)
|
||
for row in markdown_tables[1:]:
|
||
markdown_output += "| " + " | ".join(row) + " |\n"
|
||
|
||
# merged content
|
||
merged_data[page]["excel_content"] = temp_table
|
||
merged_data[page]["markdown_output"] = markdown_output
|
||
|
||
temp_title = None
|
||
temp_table = []
|
||
|
||
# deal last excel
|
||
if temp_table:
|
||
table_meta = {
|
||
"title": temp_title or temp_table[0],
|
||
"table": temp_table,
|
||
"type": "excel",
|
||
}
|
||
self.all_title.append(table_meta)
|
||
# markdown format
|
||
markdown_tables = []
|
||
if temp_table:
|
||
header = eval(temp_table[0])
|
||
markdown_tables.append(header)
|
||
for entry in temp_table[1:]:
|
||
row = eval(entry)
|
||
markdown_tables.append(row)
|
||
markdown_output = "| " + " | ".join(header) + " |\n"
|
||
markdown_output += "| " + " | ".join(["---"] * len(header)) + " |\n"
|
||
for row in markdown_tables[1:]:
|
||
markdown_output += "| " + " | ".join(row) + " |\n"
|
||
# merged content
|
||
merged_data[page]["excel_content"] = temp_table
|
||
merged_data[page]["markdown_output"] = markdown_output
|
||
|
||
for page, content in merged_data.items():
|
||
inside_content = content["inside_content"]
|
||
if "markdown_output" in content:
|
||
markdown_content = content["markdown_output"]
|
||
content_metadata = {
|
||
"page": page,
|
||
"type": "excel",
|
||
"title": file_title,
|
||
"source": self.file_path,
|
||
}
|
||
page_documents.append(
|
||
Document(
|
||
content=inside_content + "\n" + markdown_content,
|
||
metadata=content_metadata,
|
||
)
|
||
)
|
||
else:
|
||
content_metadata = {
|
||
"page": page,
|
||
"type": "text",
|
||
"title": file_title,
|
||
"source": self.file_path,
|
||
}
|
||
page_documents.append(
|
||
Document(content=inside_content, metadata=content_metadata)
|
||
)
|
||
|
||
return page_documents
|
||
return [Document.langchain2doc(lc_document) for lc_document in documents]
|
||
|
||
@classmethod
|
||
def support_chunk_strategy(cls) -> List[ChunkStrategy]:
|
||
"""Return support chunk strategy."""
|
||
return [
|
||
ChunkStrategy.CHUNK_BY_SIZE,
|
||
ChunkStrategy.CHUNK_BY_PAGE,
|
||
ChunkStrategy.CHUNK_BY_SEPARATOR,
|
||
]
|
||
|
||
@classmethod
|
||
def default_chunk_strategy(cls) -> ChunkStrategy:
|
||
"""Return default chunk strategy."""
|
||
return ChunkStrategy.CHUNK_BY_SIZE
|
||
|
||
@classmethod
|
||
def type(cls) -> KnowledgeType:
|
||
"""Return knowledge type."""
|
||
return KnowledgeType.DOCUMENT
|
||
|
||
@classmethod
|
||
def document_type(cls) -> DocumentType:
|
||
"""Document type of PDF."""
|
||
return DocumentType.PDF
|
||
|
||
|
||
class PDFProcessor:
|
||
"""PDFProcessor class."""
|
||
|
||
def __init__(self, filepath):
|
||
"""Initialize PDFProcessor class."""
|
||
self.filepath = filepath
|
||
try:
|
||
import pdfplumber # type: ignore
|
||
except ImportError:
|
||
raise ImportError("Please install pdfplumber first.")
|
||
self.pdf = pdfplumber.open(filepath)
|
||
self.all_text = defaultdict(dict)
|
||
self.allrow = 0
|
||
self.last_num = 0
|
||
|
||
def check_lines(self, page, top, buttom):
|
||
"""Check lines."""
|
||
lines = page.extract_words()[::]
|
||
text = ""
|
||
last_top = 0
|
||
last_check = 0
|
||
for line in range(len(lines)):
|
||
each_line = lines[line]
|
||
check_re = (
|
||
"(?:。|;|单位:人民币元|金额单位:人民币元|单位:万元|币种:人民币|\d|"
|
||
"报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
|
||
)
|
||
if top == "" and buttom == "":
|
||
if abs(last_top - each_line["top"]) <= 2 or (
|
||
last_check > 0
|
||
and (page.height * 0.9 - each_line["top"]) > 0
|
||
and not re.search(check_re, text)
|
||
):
|
||
text = text + each_line["text"]
|
||
else:
|
||
text = text + "\n" + each_line["text"]
|
||
elif top == "":
|
||
if each_line["top"] > buttom:
|
||
if abs(last_top - each_line["top"]) <= 2 or (
|
||
last_check > 0
|
||
and (page.height * 0.85 - each_line["top"]) > 0
|
||
and not re.search(check_re, text)
|
||
):
|
||
text = text + each_line["text"]
|
||
else:
|
||
text = text + "\n" + each_line["text"]
|
||
else:
|
||
if each_line["top"] < top and each_line["top"] > buttom:
|
||
if abs(last_top - each_line["top"]) <= 2 or (
|
||
last_check > 0
|
||
and (page.height * 0.85 - each_line["top"]) > 0
|
||
and not re.search(check_re, text)
|
||
):
|
||
text = text + each_line["text"]
|
||
else:
|
||
text = text + "\n" + each_line["text"]
|
||
last_top = each_line["top"]
|
||
last_check = each_line["x1"] - page.width * 0.85
|
||
|
||
return text
|
||
|
||
def drop_empty_cols(self, data):
|
||
"""Delete empty column."""
|
||
transposed_data = list(map(list, zip(*data)))
|
||
filtered_data = [
|
||
col for col in transposed_data if not all(cell == "" for cell in col)
|
||
]
|
||
result = list(map(list, zip(*filtered_data)))
|
||
return result
|
||
|
||
def extract_text_and_tables(self, page):
|
||
"""Extract text and tables."""
|
||
buttom = 0
|
||
tables = page.find_tables()
|
||
if len(tables) >= 1:
|
||
count = len(tables)
|
||
for table in tables:
|
||
# process text before table
|
||
if table.bbox[3] < buttom:
|
||
pass
|
||
else:
|
||
count -= 1
|
||
# process text before table
|
||
top = table.bbox[1]
|
||
text = self.check_lines(page, top, buttom)
|
||
text_list = text.split("\n")
|
||
for _t in range(len(text_list)):
|
||
self.all_text[self.allrow] = {
|
||
"page": page.page_number,
|
||
"allrow": self.allrow,
|
||
"type": "text",
|
||
"inside": text_list[_t],
|
||
}
|
||
self.allrow += 1
|
||
|
||
# process table
|
||
buttom = table.bbox[3]
|
||
new_table = table.extract()
|
||
r_count = 0
|
||
for r in range(len(new_table)):
|
||
row = new_table[r]
|
||
if row[0] is None:
|
||
r_count += 1
|
||
for c in range(len(row)):
|
||
if row[c] is not None and row[c] not in ["", " "]:
|
||
if new_table[r - r_count][c] is None:
|
||
new_table[r - r_count][c] = row[c]
|
||
else:
|
||
new_table[r - r_count][c] += row[c]
|
||
new_table[r][c] = None
|
||
else:
|
||
r_count = 0
|
||
|
||
end_table = []
|
||
for row in new_table:
|
||
if row[0] is not None:
|
||
cell_list = []
|
||
cell_check = False
|
||
for cell in row:
|
||
if cell is not None:
|
||
cell = cell.replace("\n", "")
|
||
else:
|
||
cell = ""
|
||
if cell != "":
|
||
cell_check = True
|
||
cell_list.append(cell)
|
||
if cell_check:
|
||
end_table.append(cell_list)
|
||
|
||
end_table = self.drop_empty_cols(end_table)
|
||
|
||
# process when column name is empty
|
||
if len(end_table) > 0:
|
||
for i in range(len(end_table[0])):
|
||
if end_table[0][i] == "":
|
||
if 0 < i < len(end_table[0]) - 1:
|
||
# left column name
|
||
left_column = end_table[0][i - 1]
|
||
# right column name
|
||
right_column = end_table[0][i + 1]
|
||
# current name = left name + right name
|
||
end_table[0][i] = left_column + right_column
|
||
else:
|
||
# if current column is empty and is the first
|
||
# column, assign the right column name.
|
||
# if current column is empty and is the
|
||
# last column, assign the left column name.
|
||
end_table[0][i] = (
|
||
end_table[0][i - 1]
|
||
if i == len(end_table[0]) - 1
|
||
else end_table[0][i + 1]
|
||
)
|
||
|
||
# if the first row is empty, assign the value of the previous row
|
||
for i in range(1, len(end_table)):
|
||
for j in range(len(end_table[i])):
|
||
if end_table[i][j] == "":
|
||
end_table[i][j] = end_table[i][j - 1]
|
||
|
||
for row in end_table:
|
||
self.all_text[self.allrow] = {
|
||
"page": page.page_number,
|
||
"allrow": self.allrow,
|
||
"type": "excel",
|
||
"inside": str(row),
|
||
}
|
||
self.allrow += 1
|
||
|
||
if count == 0:
|
||
text = self.check_lines(page, "", buttom)
|
||
text_list = text.split("\n")
|
||
for _t in range(len(text_list)):
|
||
self.all_text[self.allrow] = {
|
||
"page": page.page_number,
|
||
"allrow": self.allrow,
|
||
"type": "text",
|
||
"inside": text_list[_t],
|
||
}
|
||
self.allrow += 1
|
||
|
||
else:
|
||
text = self.check_lines(page, "", "")
|
||
text_list = text.split("\n")
|
||
for _t in range(len(text_list)):
|
||
self.all_text[self.allrow] = {
|
||
"page": page.page_number,
|
||
"allrow": self.allrow,
|
||
"type": "text",
|
||
"inside": text_list[_t],
|
||
}
|
||
self.allrow += 1
|
||
|
||
first_re = "[^计](?:报告(?:全文)?(?:(修订版)|(修订稿)|(更正后))?)$"
|
||
end_re = "^(?:\d|\\|\/|第|共|页|-|_| ){1,}"
|
||
if self.last_num == 0:
|
||
try:
|
||
first_text = str(self.all_text[1]["inside"])
|
||
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
|
||
if re.search(first_re, first_text) and "[" not in end_text:
|
||
self.all_text[1]["type"] = "页眉"
|
||
if re.search(end_re, end_text) and "[" not in end_text:
|
||
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
|
||
except Exception:
|
||
print(page.page_number)
|
||
else:
|
||
try:
|
||
first_text = str(self.all_text[self.last_num + 2]["inside"])
|
||
end_text = str(self.all_text[len(self.all_text) - 1]["inside"])
|
||
if re.search(first_re, first_text) and "[" not in end_text:
|
||
self.all_text[self.last_num + 2]["type"] = "页眉"
|
||
if re.search(end_re, end_text) and "[" not in end_text:
|
||
self.all_text[len(self.all_text) - 1]["type"] = "页脚"
|
||
except Exception:
|
||
print(page.page_number)
|
||
|
||
self.last_num = len(self.all_text) - 1
|
||
|
||
def pdf_to_json(self):
|
||
"""Process pdf."""
|
||
for i in range(len(self.pdf.pages)):
|
||
self.extract_text_and_tables(self.pdf.pages[i])
|
||
logger.info(f"{self.filepath} page {i} extract text success")
|
||
|
||
def save_all_text(self, path):
|
||
"""Save all text."""
|
||
directory = os.path.dirname(path)
|
||
if not os.path.exists(directory):
|
||
os.makedirs(directory)
|
||
for key in self.all_text.keys():
|
||
with open(path, "a+", encoding="utf-8") as file:
|
||
file.write(json.dumps(self.all_text[key], ensure_ascii=False) + "\n")
|