disable hugging face parallelism. Continue on file to doc transform failure

This commit is contained in:
Brett England
2024-03-17 10:09:49 -04:00
parent d4f7d56800
commit 98fc12a781
2 changed files with 21 additions and 6 deletions

View File

@@ -345,6 +345,15 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
) -> None:
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
self.count_workers = count_workers
assert (
len(self.transformations) >= 2
), "Embeddings must be in the transformations"
assert count_workers > 0, "count_workers must be > 0"
self.count_workers = count_workers
# We are doing our own multiprocessing
# To do not collide with the multiprocessing of huggingface, we disable it
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# doc_q stores parsed files as Document chunks.
# Using a shallow queue causes the filesystem parser to block
# when it reaches capacity. This ensures it doesn't outpace the
@@ -352,9 +361,11 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
# memory consumption
self.doc_q: Queue[tuple[str, str | None, list[Document] | None]] = Queue(20)
# node_q stores documents parsed into nodes (embeddings).
# Larger queue size so we don't block the embedding workers during a slow
# index update.
self.node_q: Queue[
tuple[str, str | None, list[Document] | None, list[BaseNode] | None]
] = Queue(20)
] = Queue(40)
threading.Thread(target=self._doc_to_node, daemon=True).start()
threading.Thread(target=self._write_nodes, daemon=True).start()
@@ -449,11 +460,14 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
docs = []
for file_name, file_data in files:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data
)
self.doc_q.put(("process", file_name, documents))
docs.extend(documents)
try:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data
)
self.doc_q.put(("process", file_name, documents))
docs.extend(documents)
except Exception:
logger.exception(f"Skipping {file_data.name}")
self._flush()
return docs

View File

@@ -1,3 +1,4 @@
# poetry install --extras "ui llms-llama-cpp vector-stores-qdrant embeddings-huggingface"
server:
env_name: ${APP_ENV:local}