fix: remove metadata from bulk ingestion

This commit is contained in:
Nathan Lenas 2024-07-23 09:34:16 +02:00
parent 50388f6a33
commit f47c05730d

View File

@ -49,9 +49,7 @@ class BaseIngestComponent(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def bulk_ingest( def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
pass pass
@abc.abstractmethod @abc.abstractmethod
@ -140,13 +138,11 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex):
logger.debug("Saving the documents in the index and doc store") logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents) return self._save_docs(documents)
def bulk_ingest( def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
saved_documents = [] saved_documents = []
for file_name, file_data in files: for file_name, file_data in files:
documents = IngestionHelper.transform_file_into_documents( documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, metadata file_name, file_data
) )
saved_documents.extend(self._save_docs(documents)) saved_documents.extend(self._save_docs(documents))
return saved_documents return saved_documents
@ -207,16 +203,12 @@ class BatchIngestComponent(BaseIngestComponentWithIndex):
logger.debug("Saving the documents in the index and doc store") logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents) return self._save_docs(documents)
def bulk_ingest( def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
# Pair the files with the metadata
args = [(file_tuple, metadata) for file_tuple in files]
documents = list( documents = list(
itertools.chain.from_iterable( itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap( self._file_to_documents_work_pool.starmap(
IngestionHelper.transform_file_into_documents, args IngestionHelper.transform_file_into_documents, files
) )
) )
) )
@ -304,16 +296,13 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
logger.debug("Saving the documents in the index and doc store") logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents) return self._save_docs(documents)
def bulk_ingest( def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
args = [(file_tuple, metadata) for file_tuple in files]
# Lightweight threads, used for parallelize the # Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion # underlying IO calls made in the ingestion
documents = list( documents = list(
itertools.chain.from_iterable( itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, args) self._ingest_work_pool.starmap(self.ingest, files)
) )
) )
return documents return documents
@ -509,14 +498,12 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
self._flush() self._flush()
return documents return documents
def bulk_ingest( def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
docs = [] docs = []
for file_name, file_data in eta(files): for file_name, file_data in eta(files):
try: try:
documents = IngestionHelper.transform_file_into_documents( documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, metadata file_name, file_data
) )
self.doc_q.put(("process", file_name, documents)) self.doc_q.put(("process", file_name, documents))
docs.extend(documents) docs.extend(documents)