diff --git a/examples/cloud/pdf-rag/HNSW_retrieve.py b/examples/cloud/pdf-rag/HNSW_retrieve.py new file mode 100644 index 000000000..626c28603 --- /dev/null +++ b/examples/cloud/pdf-rag/HNSW_retrieve.py @@ -0,0 +1,101 @@ +import numpy as np +import json +from FlagEmbedding import FlagAutoModel +import time +from rank_bm25 import BM25Okapi +import hnswlib + +def get_list_shape(lst): + shape = [] + current = lst + while isinstance(current, list) and len(current) > 0: + shape.append(len(current)) + current = current[0] + return tuple(shape) + +def load_model(): + return FlagAutoModel.from_finetuned( + 'BAAI/bge-base-en-v1.5', + query_instruction_for_retrieval="Represent this sentence for searching relevant passages:", + # devices='cpu', # Uncomment this line if you want to use GPU. + use_fp16=True + ) + +def encode_query(model, query): + query_vectors = [np.array(model.encode(query)).tolist()] + print('query_vectors_shape', get_list_shape(query_vectors)) + return query_vectors + +def load_data(vectors_path, docs_path): + vectors = np.load(vectors_path).tolist() + with open(docs_path, 'r', encoding='utf-8') as file: + docs = json.load(file) + return vectors, docs + +def build_hnsw_index(vectors): + # start_time = time.time() + num_elements = len(vectors) + p = hnswlib.Index(space='cosine', dim=768) + p.init_index(max_elements=num_elements, ef_construction=200, M=16) + # M defines the maximum number of outgoing connections in the graph. Higher M leads to higher accuracy/run_time at fixed ef/efConstruction. + # ef_construction controls index search speed/build speed tradeoff. Increasing the efConstruction parameter may enhance index quality, but it also tends to lengthen the indexing time. + p.add_items(np.array(vectors), np.arange(num_elements)) + # HNSW_time = time.time() + #print('HNSW build time:', HNSW_time - start_time) + p.set_ef(32) + # ef controlling query time/accuracy trade-off. Higher ef leads to more accurate but slower search. + return p + +def search_hnsw(index, query_vectors, docs): + # HNSW_time = time.time() + labels, distances = index.knn_query(np.array(query_vectors), k=10) + results = [docs[i]['content'] for i in labels[0]] + # end_HNSW_time = time.time() + # print('HNSW search time:', end_HNSW_time - HNSW_time) + return results + +def build_bm25(docs): + corpus = [doc['content'] for doc in docs] + tokenized_corpus = [list(text.split()) for text in corpus] + # bm25_build_start = time.time() + bm25 = BM25Okapi(tokenized_corpus) + # bm25_build_end = time.time() + # print('BM25 build time:', bm25_build_end - bm25_build_start) + return bm25, corpus + +def search_bm25(bm25, corpus, query): + # bm25_search_start = time.time() + tokenized_query = list(query.split()) + bm25_scores = bm25.get_scores(tokenized_query) + bm25_top_n = np.argsort(bm25_scores)[::-1][:10] + bm25_results = [corpus[i] for i in bm25_top_n] + # bm25_search_end = time.time() + # print('BM25 search time:', bm25_search_end - bm25_search_start) + return bm25_results + +def merge_results(results, bm25_results): + merged_results = [] + for i in range(len(results)): + merged_results.append(results[i]) + for i in range(len(bm25_results)): + merged_results.append(bm25_results[i]) + merged_results = list(set(merged_results)) + return merged_results + +def main(): + model = load_model() + query = "This is a test query to find relevant documents." + query_vectors = encode_query(model, query) + vectors, docs = load_data('PATH_TO_YOUR_EMBEDDING.npy', 'PATH_TO_YOUR_JSON.json') + + hnsw_index = build_hnsw_index(vectors) + hnsw_results = search_hnsw(hnsw_index, query_vectors, docs) + + bm25, corpus = build_bm25(docs) + bm25_results = search_bm25(bm25, corpus, query) + + merged_results = merge_results(hnsw_results, bm25_results) + + return merged_results +if __name__ == "__main__": + retrieved_data=main() diff --git a/examples/cloud/pdf-rag/README.md b/examples/cloud/pdf-rag/README.md new file mode 100644 index 000000000..a7da19314 --- /dev/null +++ b/examples/cloud/pdf-rag/README.md @@ -0,0 +1,56 @@ +# PDF_RAG Workflow Demo + +This project demonstrates a document retrieval and vectorization workflow based on Haystack, FlagEmbedding, HNSWLib, and BM25. + +## Directory Structure + +- `RAG_workflow/parse.py`: Parses and splits PDF documents, and generates the content in JSON format. +- `RAG_workflow/embedding.py`:Vectorizes the document content and produces the embedding vector base. +- `RAG_workflow/HNSW_retrieve.py`:Performs hybrid retrieval and recall using HNSW and BM25. + +## Environment Setup + +Install dependencies with: + +```bash +pip install -r requirements.txt +``` + +## Workflow Steps + +1. **PDF Parsing** + - Modify `PATH_TO_YOUR_PDF_DIRECTORY` in `parse.py` to your PDF folder path. + - Update the output JSON path(`PATH_TO_YOUR_JSON`)。 + - Run: + ```bash + python PDF-RAG/parse.py + ``` + - The generated JSON file will be used in the next embedding step. + +2. **JSON Vectorization** + - In `embedding.py`, update the input JSON path (`PATH_TO_YOUR_JSON.json`) and the output embedding file path (`PATH_TO_YOUR_EMBEDDING.npy`). + - Run: + ```bash + python PDF-RAG/embedding.py + ``` + +3. **Retrieval and Recall** + - In `HNSW_retrieve.py`, update the embedding and JSON paths (`PATH_TO_YOUR_JSON.json`). + - Run: + ```bash + python PDF-RAG/HNSW_retrieve.py + ``` + - The script will output the construction and retrieval times for both HNSW and BM25, along with the merged retrieval results. + - Adjust HNSW and BM25 parameters according to the descriptions to get desired results. + - In `hnswlib.Index()`, use `space='l2'` for Squared L2, `'ip'` for Inner Product, and `'cosine'` for Cosine Similarity. + +## Dependencies + +- numpy +- scikit-learn +- hnswlib +- rank_bm25 +- FlagEmbedding +- haystack +- haystack-integrations + diff --git a/examples/cloud/pdf-rag/embedding.py b/examples/cloud/pdf-rag/embedding.py new file mode 100644 index 000000000..30d7d747e --- /dev/null +++ b/examples/cloud/pdf-rag/embedding.py @@ -0,0 +1,78 @@ +import json +import numpy as np +from FlagEmbedding import FlagAutoModel +import time +from sklearn.metrics.pairwise import cosine_similarity +import os + +def load_model(model_name="BAAI/bge-base-en-v1.5", use_fp16=True): + return FlagAutoModel.from_finetuned( + model_name, + query_instruction_for_retrieval="Represent this sentence for searching relevant passages:", + # device='cpu', # Uncomment this line if you want to use GPU. + use_fp16=use_fp16 + ) + +def load_data(file_path): + try: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + print("Error loading data from", file_path) + return [] + +def extract_texts(data): + return [doc.get("content", '').strip() for doc in data] + +def generate_embeddings(model, texts): + return np.array(model.encode(texts)) + +def save_embeddings(embeddings, output_path): + os.makedirs(os.path.dirname(output_path), exist_ok=True) + np.save(output_path, embeddings) + +def load_embeddings(file_path): + try: + return np.load(file_path) + except FileNotFoundError: + print("Error loading embeddings from", file_path) + return None + + +def main(): + config = { + "model_name": "BAAI/bge-base-en-v1.5", + "json_path": #PATH_TO_YOUR_JSON.json#, + "embedding_path": #PATH_TO_YOUR_EMBEDDING.npy#, + "use_fp16": True, + "use_precomputed_embeddings": False + } + + model = load_model( + model_name=config["model_name"], + use_fp16=config["use_fp16"] + ) + + if config["use_precomputed_embeddings"]: + embeddings = load_embeddings(config["embedding_path"]) + if embeddings is None: + return + else: + data = load_data(config["json_path"]) + if not data: + return + + texts = extract_texts(data) + embeddings = generate_embeddings(model, texts) + save_embeddings(embeddings, config["embedding_path"]) + +##### Test demo with simple KNN cosine_similarity + # query='This is a test query to find relevant documents.' + # query_embedding=np.array(model.encode(query)) + # similarity_scores = cosine_similarity([query_embedding], embeddings) + # indices = np.argsort(-similarity_scores) + + return embeddings + +if __name__ == '__main__': + main() diff --git a/examples/cloud/pdf-rag/parse.py b/examples/cloud/pdf-rag/parse.py new file mode 100644 index 000000000..9a5f1ef1e --- /dev/null +++ b/examples/cloud/pdf-rag/parse.py @@ -0,0 +1,56 @@ +from pathlib import Path +import time +import json +from haystack import Pipeline +from haystack.components.converters import PyPDFToDocument +from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter +from haystack.components.writers import DocumentWriter +from haystack.document_stores.types import DuplicatePolicy +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack import Document + +def create_indexing_pipeline(): + document_store = InMemoryDocumentStore() + converter = PyPDFToDocument() + cleaner = DocumentCleaner() + splitter = DocumentSplitter(split_by="sentence", split_length=1) + writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP) + + indexing_pipeline = Pipeline() + indexing_pipeline.add_component("converter", converter) + indexing_pipeline.add_component("cleaner", cleaner) + indexing_pipeline.add_component("splitter", splitter) + indexing_pipeline.add_component("writer", writer) + + indexing_pipeline.connect("converter", "cleaner") + indexing_pipeline.connect("cleaner", "splitter") + indexing_pipeline.connect("splitter", "writer") + + return indexing_pipeline, document_store + +def process_pdfs(pdf_directory, indexing_pipeline): + papers_dir = Path(pdf_directory) + pdf_files = list(papers_dir.glob("*.pdf")) + for pdf_file in pdf_files: + try: + indexing_pipeline.run({"converter": {"sources": [pdf_file]}}) + except: + pass + +def save_to_json(document_store, output_path): + all_documents = document_store.filter_documents() + docs_list = [doc.to_dict() for doc in all_documents] + with open(output_path, "w", encoding="utf-8") as f: + json.dump(docs_list, f, ensure_ascii=False, indent=2) + +def main(): + PDF_DIRECTORY = #PATH_TO_YOUR_PDF_DIRECTORY# + OUTPUT_JSON = #PATH_TO_YOUR_JSON# + + start_time = time.time() + indexing_pipeline, document_store = create_indexing_pipeline() + process_pdfs(PDF_DIRECTORY, indexing_pipeline) + save_to_json(document_store, OUTPUT_JSON) + +if __name__ == "__main__": + main() diff --git a/examples/cloud/pdf-rag/requirements.txt b/examples/cloud/pdf-rag/requirements.txt new file mode 100644 index 000000000..84d85fecc --- /dev/null +++ b/examples/cloud/pdf-rag/requirements.txt @@ -0,0 +1,7 @@ +numpy +scikit-learn +hnswlib +rank_bm25 +FlagEmbedding +haystack +haystack-integrations \ No newline at end of file