add pdf-rag example

This commit is contained in:
ChosenQC 2025-07-23 04:26:43 +00:00
parent edd65a84dd
commit 6070287b34
5 changed files with 298 additions and 0 deletions

View File

@ -0,0 +1,101 @@
import numpy as np
import json
from FlagEmbedding import FlagAutoModel
import time
from rank_bm25 import BM25Okapi
import hnswlib
def get_list_shape(lst):
shape = []
current = lst
while isinstance(current, list) and len(current) > 0:
shape.append(len(current))
current = current[0]
return tuple(shape)
def load_model():
return FlagAutoModel.from_finetuned(
'BAAI/bge-base-en-v1.5',
query_instruction_for_retrieval="Represent this sentence for searching relevant passages:",
# devices='cpu', # Uncomment this line if you want to use GPU.
use_fp16=True
)
def encode_query(model, query):
query_vectors = [np.array(model.encode(query)).tolist()]
print('query_vectors_shape', get_list_shape(query_vectors))
return query_vectors
def load_data(vectors_path, docs_path):
vectors = np.load(vectors_path).tolist()
with open(docs_path, 'r', encoding='utf-8') as file:
docs = json.load(file)
return vectors, docs
def build_hnsw_index(vectors):
# start_time = time.time()
num_elements = len(vectors)
p = hnswlib.Index(space='cosine', dim=768)
p.init_index(max_elements=num_elements, ef_construction=200, M=16)
# M defines the maximum number of outgoing connections in the graph. Higher M leads to higher accuracy/run_time at fixed ef/efConstruction.
# ef_construction controls index search speed/build speed tradeoff. Increasing the efConstruction parameter may enhance index quality, but it also tends to lengthen the indexing time.
p.add_items(np.array(vectors), np.arange(num_elements))
# HNSW_time = time.time()
#print('HNSW build time:', HNSW_time - start_time)
p.set_ef(32)
# ef controlling query time/accuracy trade-off. Higher ef leads to more accurate but slower search.
return p
def search_hnsw(index, query_vectors, docs):
# HNSW_time = time.time()
labels, distances = index.knn_query(np.array(query_vectors), k=10)
results = [docs[i]['content'] for i in labels[0]]
# end_HNSW_time = time.time()
# print('HNSW search time:', end_HNSW_time - HNSW_time)
return results
def build_bm25(docs):
corpus = [doc['content'] for doc in docs]
tokenized_corpus = [list(text.split()) for text in corpus]
# bm25_build_start = time.time()
bm25 = BM25Okapi(tokenized_corpus)
# bm25_build_end = time.time()
# print('BM25 build time:', bm25_build_end - bm25_build_start)
return bm25, corpus
def search_bm25(bm25, corpus, query):
# bm25_search_start = time.time()
tokenized_query = list(query.split())
bm25_scores = bm25.get_scores(tokenized_query)
bm25_top_n = np.argsort(bm25_scores)[::-1][:10]
bm25_results = [corpus[i] for i in bm25_top_n]
# bm25_search_end = time.time()
# print('BM25 search time:', bm25_search_end - bm25_search_start)
return bm25_results
def merge_results(results, bm25_results):
merged_results = []
for i in range(len(results)):
merged_results.append(results[i])
for i in range(len(bm25_results)):
merged_results.append(bm25_results[i])
merged_results = list(set(merged_results))
return merged_results
def main():
model = load_model()
query = "This is a test query to find relevant documents."
query_vectors = encode_query(model, query)
vectors, docs = load_data('PATH_TO_YOUR_EMBEDDING.npy', 'PATH_TO_YOUR_JSON.json')
hnsw_index = build_hnsw_index(vectors)
hnsw_results = search_hnsw(hnsw_index, query_vectors, docs)
bm25, corpus = build_bm25(docs)
bm25_results = search_bm25(bm25, corpus, query)
merged_results = merge_results(hnsw_results, bm25_results)
return merged_results
if __name__ == "__main__":
retrieved_data=main()

View File

@ -0,0 +1,56 @@
# PDF_RAG Workflow Demo
This project demonstrates a document retrieval and vectorization workflow based on Haystack, FlagEmbedding, HNSWLib, and BM25.
## Directory Structure
- `RAG_workflow/parse.py`: Parses and splits PDF documents, and generates the content in JSON format.
- `RAG_workflow/embedding.py`Vectorizes the document content and produces the embedding vector base.
- `RAG_workflow/HNSW_retrieve.py`Performs hybrid retrieval and recall using HNSW and BM25.
## Environment Setup
Install dependencies with:
```bash
pip install -r requirements.txt
```
## Workflow Steps
1. **PDF Parsing**
- Modify `PATH_TO_YOUR_PDF_DIRECTORY` in `parse.py` to your PDF folder path.
- Update the output JSON path`PATH_TO_YOUR_JSON`)。
- Run:
```bash
python PDF-RAG/parse.py
```
- The generated JSON file will be used in the next embedding step.
2. **JSON Vectorization**
- In `embedding.py`, update the input JSON path (`PATH_TO_YOUR_JSON.json`) and the output embedding file path (`PATH_TO_YOUR_EMBEDDING.npy`).
- Run:
```bash
python PDF-RAG/embedding.py
```
3. **Retrieval and Recall**
- In `HNSW_retrieve.py`, update the embedding and JSON paths (`PATH_TO_YOUR_JSON.json`).
- Run:
```bash
python PDF-RAG/HNSW_retrieve.py
```
- The script will output the construction and retrieval times for both HNSW and BM25, along with the merged retrieval results.
- Adjust HNSW and BM25 parameters according to the descriptions to get desired results.
- In `hnswlib.Index()`, use `space='l2'` for Squared L2, `'ip'` for Inner Product, and `'cosine'` for Cosine Similarity.
## Dependencies
- numpy
- scikit-learn
- hnswlib
- rank_bm25
- FlagEmbedding
- haystack
- haystack-integrations

View File

@ -0,0 +1,78 @@
import json
import numpy as np
from FlagEmbedding import FlagAutoModel
import time
from sklearn.metrics.pairwise import cosine_similarity
import os
def load_model(model_name="BAAI/bge-base-en-v1.5", use_fp16=True):
return FlagAutoModel.from_finetuned(
model_name,
query_instruction_for_retrieval="Represent this sentence for searching relevant passages:",
# device='cpu', # Uncomment this line if you want to use GPU.
use_fp16=use_fp16
)
def load_data(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
print("Error loading data from", file_path)
return []
def extract_texts(data):
return [doc.get("content", '').strip() for doc in data]
def generate_embeddings(model, texts):
return np.array(model.encode(texts))
def save_embeddings(embeddings, output_path):
os.makedirs(os.path.dirname(output_path), exist_ok=True)
np.save(output_path, embeddings)
def load_embeddings(file_path):
try:
return np.load(file_path)
except FileNotFoundError:
print("Error loading embeddings from", file_path)
return None
def main():
config = {
"model_name": "BAAI/bge-base-en-v1.5",
"json_path": #PATH_TO_YOUR_JSON.json#,
"embedding_path": #PATH_TO_YOUR_EMBEDDING.npy#,
"use_fp16": True,
"use_precomputed_embeddings": False
}
model = load_model(
model_name=config["model_name"],
use_fp16=config["use_fp16"]
)
if config["use_precomputed_embeddings"]:
embeddings = load_embeddings(config["embedding_path"])
if embeddings is None:
return
else:
data = load_data(config["json_path"])
if not data:
return
texts = extract_texts(data)
embeddings = generate_embeddings(model, texts)
save_embeddings(embeddings, config["embedding_path"])
##### Test demo with simple KNN cosine_similarity
# query='This is a test query to find relevant documents.'
# query_embedding=np.array(model.encode(query))
# similarity_scores = cosine_similarity([query_embedding], embeddings)
# indices = np.argsort(-similarity_scores)
return embeddings
if __name__ == '__main__':
main()

View File

@ -0,0 +1,56 @@
from pathlib import Path
import time
import json
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
def create_indexing_pipeline():
document_store = InMemoryDocumentStore()
converter = PyPDFToDocument()
cleaner = DocumentCleaner()
splitter = DocumentSplitter(split_by="sentence", split_length=1)
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", converter)
indexing_pipeline.add_component("cleaner", cleaner)
indexing_pipeline.add_component("splitter", splitter)
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("converter", "cleaner")
indexing_pipeline.connect("cleaner", "splitter")
indexing_pipeline.connect("splitter", "writer")
return indexing_pipeline, document_store
def process_pdfs(pdf_directory, indexing_pipeline):
papers_dir = Path(pdf_directory)
pdf_files = list(papers_dir.glob("*.pdf"))
for pdf_file in pdf_files:
try:
indexing_pipeline.run({"converter": {"sources": [pdf_file]}})
except:
pass
def save_to_json(document_store, output_path):
all_documents = document_store.filter_documents()
docs_list = [doc.to_dict() for doc in all_documents]
with open(output_path, "w", encoding="utf-8") as f:
json.dump(docs_list, f, ensure_ascii=False, indent=2)
def main():
PDF_DIRECTORY = #PATH_TO_YOUR_PDF_DIRECTORY#
OUTPUT_JSON = #PATH_TO_YOUR_JSON#
start_time = time.time()
indexing_pipeline, document_store = create_indexing_pipeline()
process_pdfs(PDF_DIRECTORY, indexing_pipeline)
save_to_json(document_store, OUTPUT_JSON)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,7 @@
numpy
scikit-learn
hnswlib
rank_bm25
FlagEmbedding
haystack
haystack-integrations