mirror of
https://github.com/imartinez/privateGPT.git
synced 2025-09-23 03:57:13 +00:00
feat(ingest): Created a faster ingestion mode - pipeline (#1750)
* Unify pgvector and postgres connection settings * Remove local changes * Update file pgvector->postgres * postgresql should be postgres * Adding pipeline ingestion mode * disable hugging face parallelism. Continue on file to doc transform failure * Semaphore to limit docq async workers. ETA reporting
This commit is contained in:
@@ -6,6 +6,7 @@ import multiprocessing.pool
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
from typing import Any
|
||||
|
||||
from llama_index.core.data_structs import IndexDict
|
||||
@@ -13,12 +14,13 @@ from llama_index.core.embeddings.utils import EmbedType
|
||||
from llama_index.core.indices import VectorStoreIndex, load_index_from_storage
|
||||
from llama_index.core.indices.base import BaseIndex
|
||||
from llama_index.core.ingestion import run_transformations
|
||||
from llama_index.core.schema import Document, TransformComponent
|
||||
from llama_index.core.schema import BaseNode, Document, TransformComponent
|
||||
from llama_index.core.storage import StorageContext
|
||||
|
||||
from private_gpt.components.ingest.ingest_helper import IngestionHelper
|
||||
from private_gpt.paths import local_data_path
|
||||
from private_gpt.settings.settings import Settings
|
||||
from private_gpt.utils.eta import eta
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -314,6 +316,170 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
|
||||
self._file_to_documents_work_pool.terminate()
|
||||
|
||||
|
||||
class PipelineIngestComponent(BaseIngestComponentWithIndex):
|
||||
"""Pipeline ingestion - keeping the embedding worker pool as busy as possible.
|
||||
|
||||
This class implements a threaded ingestion pipeline, which comprises two threads
|
||||
and two queues. The primary thread is responsible for reading and parsing files
|
||||
into documents. These documents are then placed into a queue, which is
|
||||
distributed to a pool of worker processes for embedding computation. After
|
||||
embedding, the documents are transferred to another queue where they are
|
||||
accumulated until a threshold is reached. Upon reaching this threshold, the
|
||||
accumulated documents are flushed to the document store, index, and vector
|
||||
store.
|
||||
|
||||
Exception handling ensures robustness against erroneous files. However, in the
|
||||
pipelined design, one error can lead to the discarding of multiple files. Any
|
||||
discarded files will be reported.
|
||||
"""
|
||||
|
||||
NODE_FLUSH_COUNT = 5000 # Save the index every # nodes.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage_context: StorageContext,
|
||||
embed_model: EmbedType,
|
||||
transformations: list[TransformComponent],
|
||||
count_workers: int,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
|
||||
self.count_workers = count_workers
|
||||
assert (
|
||||
len(self.transformations) >= 2
|
||||
), "Embeddings must be in the transformations"
|
||||
assert count_workers > 0, "count_workers must be > 0"
|
||||
self.count_workers = count_workers
|
||||
# We are doing our own multiprocessing
|
||||
# To do not collide with the multiprocessing of huggingface, we disable it
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
|
||||
# doc_q stores parsed files as Document chunks.
|
||||
# Using a shallow queue causes the filesystem parser to block
|
||||
# when it reaches capacity. This ensures it doesn't outpace the
|
||||
# computationally intensive embeddings phase, avoiding unnecessary
|
||||
# memory consumption. The semaphore is used to bound the async worker
|
||||
# embedding computations to cause the doc Q to fill and block.
|
||||
self.doc_semaphore = multiprocessing.Semaphore(
|
||||
self.count_workers
|
||||
) # limit the doc queue to # items.
|
||||
self.doc_q: Queue[tuple[str, str | None, list[Document] | None]] = Queue(20)
|
||||
# node_q stores documents parsed into nodes (embeddings).
|
||||
# Larger queue size so we don't block the embedding workers during a slow
|
||||
# index update.
|
||||
self.node_q: Queue[
|
||||
tuple[str, str | None, list[Document] | None, list[BaseNode] | None]
|
||||
] = Queue(40)
|
||||
threading.Thread(target=self._doc_to_node, daemon=True).start()
|
||||
threading.Thread(target=self._write_nodes, daemon=True).start()
|
||||
|
||||
def _doc_to_node(self) -> None:
|
||||
# Parse documents into nodes
|
||||
with multiprocessing.pool.ThreadPool(processes=self.count_workers) as pool:
|
||||
while True:
|
||||
try:
|
||||
cmd, file_name, documents = self.doc_q.get(
|
||||
block=True
|
||||
) # Documents for a file
|
||||
if cmd == "process":
|
||||
# Push CPU/GPU embedding work to the worker pool
|
||||
# Acquire semaphore to control access to worker pool
|
||||
self.doc_semaphore.acquire()
|
||||
pool.apply_async(
|
||||
self._doc_to_node_worker, (file_name, documents)
|
||||
)
|
||||
elif cmd == "quit":
|
||||
break
|
||||
finally:
|
||||
if cmd != "process":
|
||||
self.doc_q.task_done() # unblock Q joins
|
||||
|
||||
def _doc_to_node_worker(self, file_name: str, documents: list[Document]) -> None:
|
||||
# CPU/GPU intensive work in its own process
|
||||
try:
|
||||
nodes = run_transformations(
|
||||
documents, # type: ignore[arg-type]
|
||||
self.transformations,
|
||||
show_progress=self.show_progress,
|
||||
)
|
||||
self.node_q.put(("process", file_name, documents, nodes))
|
||||
finally:
|
||||
self.doc_semaphore.release()
|
||||
self.doc_q.task_done() # unblock Q joins
|
||||
|
||||
def _save_docs(
|
||||
self, files: list[str], documents: list[Document], nodes: list[BaseNode]
|
||||
) -> None:
|
||||
try:
|
||||
logger.info(
|
||||
f"Saving {len(files)} files ({len(documents)} documents / {len(nodes)} nodes)"
|
||||
)
|
||||
self._index.insert_nodes(nodes)
|
||||
for document in documents:
|
||||
self._index.docstore.set_document_hash(
|
||||
document.get_doc_id(), document.hash
|
||||
)
|
||||
self._save_index()
|
||||
except Exception:
|
||||
# Tell the user so they can investigate these files
|
||||
logger.exception(f"Processing files {files}")
|
||||
finally:
|
||||
# Clearing work, even on exception, maintains a clean state.
|
||||
nodes.clear()
|
||||
documents.clear()
|
||||
files.clear()
|
||||
|
||||
def _write_nodes(self) -> None:
|
||||
# Save nodes to index. I/O intensive.
|
||||
node_stack: list[BaseNode] = []
|
||||
doc_stack: list[Document] = []
|
||||
file_stack: list[str] = []
|
||||
while True:
|
||||
try:
|
||||
cmd, file_name, documents, nodes = self.node_q.get(block=True)
|
||||
if cmd in ("flush", "quit"):
|
||||
if file_stack:
|
||||
self._save_docs(file_stack, doc_stack, node_stack)
|
||||
if cmd == "quit":
|
||||
break
|
||||
elif cmd == "process":
|
||||
node_stack.extend(nodes) # type: ignore[arg-type]
|
||||
doc_stack.extend(documents) # type: ignore[arg-type]
|
||||
file_stack.append(file_name) # type: ignore[arg-type]
|
||||
# Constant saving is heavy on I/O - accumulate to a threshold
|
||||
if len(node_stack) >= self.NODE_FLUSH_COUNT:
|
||||
self._save_docs(file_stack, doc_stack, node_stack)
|
||||
finally:
|
||||
self.node_q.task_done()
|
||||
|
||||
def _flush(self) -> None:
|
||||
self.doc_q.put(("flush", None, None))
|
||||
self.doc_q.join()
|
||||
self.node_q.put(("flush", None, None, None))
|
||||
self.node_q.join()
|
||||
|
||||
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
|
||||
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
|
||||
self.doc_q.put(("process", file_name, documents))
|
||||
self._flush()
|
||||
return documents
|
||||
|
||||
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
|
||||
docs = []
|
||||
for file_name, file_data in eta(files):
|
||||
try:
|
||||
documents = IngestionHelper.transform_file_into_documents(
|
||||
file_name, file_data
|
||||
)
|
||||
self.doc_q.put(("process", file_name, documents))
|
||||
docs.extend(documents)
|
||||
except Exception:
|
||||
logger.exception(f"Skipping {file_data.name}")
|
||||
self._flush()
|
||||
return docs
|
||||
|
||||
|
||||
def get_ingestion_component(
|
||||
storage_context: StorageContext,
|
||||
embed_model: EmbedType,
|
||||
@@ -336,6 +502,13 @@ def get_ingestion_component(
|
||||
transformations=transformations,
|
||||
count_workers=settings.embedding.count_workers,
|
||||
)
|
||||
elif ingest_mode == "pipeline":
|
||||
return PipelineIngestComponent(
|
||||
storage_context=storage_context,
|
||||
embed_model=embed_model,
|
||||
transformations=transformations,
|
||||
count_workers=settings.embedding.count_workers,
|
||||
)
|
||||
else:
|
||||
return SimpleIngestComponent(
|
||||
storage_context=storage_context,
|
||||
|
Reference in New Issue
Block a user