From 50388f6a335cf1ab742624c64eb9ec0366fd5696 Mon Sep 17 00:00:00 2001 From: Nathan Lenas Date: Tue, 23 Jul 2024 09:18:27 +0200 Subject: [PATCH] fix: specify dict type, fix bulk ingestion with metadata --- .../components/ingest/ingest_component.py | 80 +++++++++++++++---- .../components/ingest/ingest_helper.py | 4 +- private_gpt/server/ingest/ingest_router.py | 25 +++--- private_gpt/server/ingest/ingest_service.py | 23 +++++- tests/fixtures/ingest_helper.py | 8 +- tests/server/ingest/test_ingest_routes.py | 14 ++-- 6 files changed, 112 insertions(+), 42 deletions(-) diff --git a/private_gpt/components/ingest/ingest_component.py b/private_gpt/components/ingest/ingest_component.py index 122b8957..4e01c21b 100644 --- a/private_gpt/components/ingest/ingest_component.py +++ b/private_gpt/components/ingest/ingest_component.py @@ -40,11 +40,18 @@ class BaseIngestComponent(abc.ABC): self.transformations = transformations @abc.abstractmethod - def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]: + def ingest( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[Document]: pass @abc.abstractmethod - def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]: + def bulk_ingest( + self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None + ) -> list[Document]: pass @abc.abstractmethod @@ -117,16 +124,25 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex): ) -> None: super().__init__(storage_context, embed_model, transformations, *args, **kwargs) - def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]: + def ingest( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[Document]: logger.info("Ingesting file_name=%s", file_name) - documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata) + documents = IngestionHelper.transform_file_into_documents( + file_name, file_data, file_metadata + ) logger.info( "Transformed file=%s into count=%s documents", file_name, len(documents) ) logger.debug("Saving the documents in the index and doc store") return self._save_docs(documents) - def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]: + def bulk_ingest( + self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None + ) -> list[Document]: saved_documents = [] for file_name, file_data in files: documents = IngestionHelper.transform_file_into_documents( @@ -175,20 +191,32 @@ class BatchIngestComponent(BaseIngestComponentWithIndex): processes=self.count_workers ) - def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]: + def ingest( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[Document]: logger.info("Ingesting file_name=%s", file_name) - documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata) + documents = IngestionHelper.transform_file_into_documents( + file_name, file_data, file_metadata + ) logger.info( "Transformed file=%s into count=%s documents", file_name, len(documents) ) logger.debug("Saving the documents in the index and doc store") return self._save_docs(documents) - def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]: + def bulk_ingest( + self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None + ) -> list[Document]: + + # Pair the files with the metadata + args = [(file_tuple, metadata) for file_tuple in files] documents = list( itertools.chain.from_iterable( self._file_to_documents_work_pool.starmap( - IngestionHelper.transform_file_into_documents, files, metadata + IngestionHelper.transform_file_into_documents, args ) ) ) @@ -257,12 +285,18 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): processes=self.count_workers ) - def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]: + def ingest( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[Document]: logger.info("Ingesting file_name=%s", file_name) # Running in a single (1) process to release the current # thread, and take a dedicated CPU core for computation documents = self._file_to_documents_work_pool.apply( - IngestionHelper.transform_file_into_documents, (file_name, file_data, file_metadata) + IngestionHelper.transform_file_into_documents, + (file_name, file_data, file_metadata), ) logger.info( "Transformed file=%s into count=%s documents", file_name, len(documents) @@ -270,13 +304,16 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): logger.debug("Saving the documents in the index and doc store") return self._save_docs(documents) - def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]: + def bulk_ingest( + self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None + ) -> list[Document]: + + args = [(file_tuple, metadata) for file_tuple in files] # Lightweight threads, used for parallelize the # underlying IO calls made in the ingestion - documents = list( itertools.chain.from_iterable( - self._ingest_work_pool.starmap(self.ingest, files, metadata) + self._ingest_work_pool.starmap(self.ingest, args) ) ) return documents @@ -459,13 +496,22 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex): self.node_q.put(("flush", None, None, None)) self.node_q.join() - def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]: - documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata) + def ingest( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[Document]: + documents = IngestionHelper.transform_file_into_documents( + file_name, file_data, file_metadata + ) self.doc_q.put(("process", file_name, documents)) self._flush() return documents - def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]: + def bulk_ingest( + self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None + ) -> list[Document]: docs = [] for file_name, file_data in eta(files): try: diff --git a/private_gpt/components/ingest/ingest_helper.py b/private_gpt/components/ingest/ingest_helper.py index aa841b08..9ff2e685 100644 --- a/private_gpt/components/ingest/ingest_helper.py +++ b/private_gpt/components/ingest/ingest_helper.py @@ -69,13 +69,13 @@ class IngestionHelper: @staticmethod def transform_file_into_documents( - file_name: str, file_data: Path, file_metadata : dict | None = None + file_name: str, file_data: Path, file_metadata: dict[str, str] | None = None ) -> list[Document]: documents = IngestionHelper._load_file_to_documents(file_name, file_data) for document in documents: document.metadata.update(file_metadata or {}) document.metadata["file_name"] = file_name - + IngestionHelper._exclude_metadata(documents) return documents diff --git a/private_gpt/server/ingest/ingest_router.py b/private_gpt/server/ingest/ingest_router.py index 12f35802..cda60ee7 100644 --- a/private_gpt/server/ingest/ingest_router.py +++ b/private_gpt/server/ingest/ingest_router.py @@ -1,6 +1,6 @@ -from typing import Literal, Dict +from typing import Literal -from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, Form +from fastapi import APIRouter, Depends, Form, HTTPException, Request, UploadFile from pydantic import BaseModel, Field from private_gpt.server.ingest.ingest_service import IngestService @@ -20,14 +20,15 @@ class IngestTextBody(BaseModel): "Chinese martial arts." ] ) - metadata: Dict = Field(None, + metadata: dict[str, str] = Field( + None, examples=[ { "title": "Avatar: The Last Airbender", "author": "Michael Dante DiMartino, Bryan Konietzko", "year": "2005", } - ] + ], ) @@ -47,12 +48,14 @@ def ingest(request: Request, file: UploadFile) -> IngestResponse: @ingest_router.post("/ingest/file", tags=["Ingestion"]) -def ingest_file(request: Request, file: UploadFile, metadata: str = Form(None)) -> IngestResponse: +def ingest_file( + request: Request, file: UploadFile, metadata: str = Form(None) +) -> IngestResponse: """Ingests and processes a file, storing its chunks to be used as context. - - metadata: Optional metadata to be associated with the file. + + metadata: Optional metadata to be associated with the file. You do not have to specify this field if not needed. - e.g. {"title": "Avatar: The Last Airbender", "author": "Michael Dante DiMartino, Bryan Konietzko", "year": "2005"} + e.g. {"title": "Avatar: The Last Airbender", "year": "2005"} The context obtained from files is later used in `/chat/completions`, `/completions`, and `/chunks` APIs. @@ -70,9 +73,11 @@ def ingest_file(request: Request, file: UploadFile, metadata: str = Form(None)) service = request.state.injector.get(IngestService) if file.filename is None: raise HTTPException(400, "No file name provided") - + metadata_dict = None if metadata is None else eval(metadata) - ingested_documents = service.ingest_bin_data(file.filename, file.file, metadata_dict) + ingested_documents = service.ingest_bin_data( + file.filename, file.file, metadata_dict + ) return IngestResponse(object="list", model="private-gpt", data=ingested_documents) diff --git a/private_gpt/server/ingest/ingest_service.py b/private_gpt/server/ingest/ingest_service.py index 9082432c..0cd003be 100644 --- a/private_gpt/server/ingest/ingest_service.py +++ b/private_gpt/server/ingest/ingest_service.py @@ -48,7 +48,12 @@ class IngestService: settings=settings(), ) - def _ingest_data(self, file_name: str, file_data: AnyStr, file_metadata : dict | None = None) -> list[IngestedDoc]: + def _ingest_data( + self, + file_name: str, + file_data: AnyStr, + file_metadata: dict[str, str] | None = None, + ) -> list[IngestedDoc]: logger.debug("Got file data of size=%s to ingest", len(file_data)) # llama-index mainly supports reading from files, so # we have to create a tmp file to read for it to work @@ -65,18 +70,28 @@ class IngestService: tmp.close() path_to_tmp.unlink() - def ingest_file(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[IngestedDoc]: + def ingest_file( + self, + file_name: str, + file_data: Path, + file_metadata: dict[str, str] | None = None, + ) -> list[IngestedDoc]: logger.info("Ingesting file_name=%s", file_name) documents = self.ingest_component.ingest(file_name, file_data, file_metadata) logger.info("Finished ingestion file_name=%s", file_name) return [IngestedDoc.from_document(document) for document in documents] - def ingest_text(self, file_name: str, text: str, metadata : dict | None = None) -> list[IngestedDoc]: + def ingest_text( + self, file_name: str, text: str, metadata: dict[str, str] | None = None + ) -> list[IngestedDoc]: logger.debug("Ingesting text data with file_name=%s", file_name) return self._ingest_data(file_name, text, metadata) def ingest_bin_data( - self, file_name: str, raw_file_data: BinaryIO, file_metadata : dict | None = None + self, + file_name: str, + raw_file_data: BinaryIO, + file_metadata: dict[str, str] | None = None, ) -> list[IngestedDoc]: logger.debug("Ingesting binary data with file_name=%s", file_name) file_data = raw_file_data.read() diff --git a/tests/fixtures/ingest_helper.py b/tests/fixtures/ingest_helper.py index 64d4babc..4e6155ae 100644 --- a/tests/fixtures/ingest_helper.py +++ b/tests/fixtures/ingest_helper.py @@ -1,7 +1,7 @@ +import json from pathlib import Path import pytest -import json from fastapi.testclient import TestClient from private_gpt.server.ingest.ingest_router import IngestResponse @@ -18,15 +18,15 @@ class IngestHelper: assert response.status_code == 200 ingest_result = IngestResponse.model_validate(response.json()) return ingest_result - + def ingest_file_with_metadata(self, path: Path, metadata: dict) -> IngestResponse: files = { "file": (path.name, path.open("rb")), - "metadata": (None, json.dumps(metadata)) + "metadata": (None, json.dumps(metadata)), } response = self.test_client.post("/v1/ingest/file", files=files) - + assert response.status_code == 200 ingest_result = IngestResponse.model_validate(response.json()) return ingest_result diff --git a/tests/server/ingest/test_ingest_routes.py b/tests/server/ingest/test_ingest_routes.py index 15ad74e8..6fbf5fa4 100644 --- a/tests/server/ingest/test_ingest_routes.py +++ b/tests/server/ingest/test_ingest_routes.py @@ -48,17 +48,21 @@ def test_ingest_plain_text(test_client: TestClient) -> None: def test_ingest_text_with_metadata(test_client: TestClient): response = test_client.post( - "/v1/ingest/text", json={"file_name": "file_name", "text": "text", "metadata": {"foo": "bar"}} + "/v1/ingest/text", + json={"file_name": "file_name", "text": "text", "metadata": {"foo": "bar"}}, ) assert response.status_code == 200 ingest_result = IngestResponse.model_validate(response.json()) assert len(ingest_result.data) == 1 - - assert ingest_result.data[0].doc_metadata == {"file_name" : "file_name", "foo": "bar"} + + assert ingest_result.data[0].doc_metadata == { + "file_name": "file_name", + "foo": "bar", + } -def test_ingest_accepts_txt_files(ingest_helper: IngestHelper) -> None: +def test_ingest_accepts_txt_files_with_metadata(ingest_helper: IngestHelper) -> None: path = Path(__file__).parents[0] / "test.txt" ingest_result = ingest_helper.ingest_file_with_metadata(path, {"foo": "bar"}) assert len(ingest_result.data) == 1 - assert ingest_result.data[0].doc_metadata == {"file_name": "test.txt", "foo": "bar"} \ No newline at end of file + assert ingest_result.data[0].doc_metadata == {"file_name": "test.txt", "foo": "bar"}