From 820b713086bb16512d5c707e5e8ecf1c703c15e1 Mon Sep 17 00:00:00 2001 From: Rahul Triptahi Date: Mon, 8 Apr 2024 20:40:04 +0530 Subject: [PATCH] community[minor]: Add support for Pebblo cloud_api_key in PebbloSafeLoader (#19855) **Description**: _PebbloSafeLoader_: Add support for pebblo's cloud api-key in PebbloSafeLoader - This Pull request enables PebbloSafeLoader to accept pebblo's cloud api-key and send the semantic classification data to pebblo cloud. **Documentation**: Updated **Unit test**: Added **Issue**: NA **Dependencies**: - None **Twitter handle**: @rahul_tripathi2 Signed-off-by: Rahul Tripathi Co-authored-by: Rahul Tripathi --- .../document_loaders/pebblo.ipynb | 29 ++++ .../document_loaders/pebblo.py | 130 ++++++++++++++---- .../langchain_community/utilities/pebblo.py | 6 +- .../document_loaders/test_pebblo.py | 20 +++ 4 files changed, 158 insertions(+), 27 deletions(-) diff --git a/docs/docs/integrations/document_loaders/pebblo.ipynb b/docs/docs/integrations/document_loaders/pebblo.ipynb index 40aa7ee6b0c..e444c426cd2 100644 --- a/docs/docs/integrations/document_loaders/pebblo.ipynb +++ b/docs/docs/integrations/document_loaders/pebblo.ipynb @@ -62,6 +62,35 @@ "documents = loader.load()\n", "print(documents)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Send semantic topics and identities to Pebblo cloud server\n", + "\n", + "To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.document_loaders.csv_loader import CSVLoader\n", + "from langchain_community.document_loaders import PebbloSafeLoader\n", + "\n", + "loader = PebbloSafeLoader(\n", + " CSVLoader(\"data/corp_sens_data.csv\"),\n", + " name=\"acme-corp-rag-1\", # App name (Mandatory)\n", + " owner=\"Joe Smith\", # Owner (Optional)\n", + " description=\"Support productivity RAG application\", # Description (Optional)\n", + " api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n", + ")\n", + "documents = loader.load()\n", + "print(documents)" + ] } ], "metadata": { diff --git a/libs/community/langchain_community/document_loaders/pebblo.py b/libs/community/langchain_community/document_loaders/pebblo.py index 8b67898cf27..593452d9059 100644 --- a/libs/community/langchain_community/document_loaders/pebblo.py +++ b/libs/community/langchain_community/document_loaders/pebblo.py @@ -1,17 +1,21 @@ """Pebblo's safe dataloader is a wrapper for document loaders""" +import json import logging import os import uuid from http import HTTPStatus -from typing import Any, Dict, Iterator, List +from typing import Any, Dict, Iterator, List, Optional import requests from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseLoader from langchain_community.utilities.pebblo import ( + APP_DISCOVER_URL, CLASSIFIER_URL, + LOADER_DOC_URL, + PEBBLO_CLOUD_URL, PLUGIN_VERSION, App, Doc, @@ -38,10 +42,12 @@ class PebbloSafeLoader(BaseLoader): name: str, owner: str = "", description: str = "", + api_key: Optional[str] = None, ): if not name or not isinstance(name, str): raise NameError("Must specify a valid name.") self.app_name = name + self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key self.load_id = str(uuid.uuid4()) self.loader = langchain_loader self.owner = owner @@ -114,8 +120,9 @@ class PebbloSafeLoader(BaseLoader): def set_loader_sent(cls) -> None: cls._loader_sent = True - def _send_loader_doc(self, loading_end: bool = False) -> None: - """Send documents fetched from loader to pebblo-server. Internal method. + def _send_loader_doc(self, loading_end: bool = False) -> list: + """Send documents fetched from loader to pebblo-server. Then send + classified documents to Daxa cloud(If api_key is present). Internal method. Args: loading_end (bool, optional): Flag indicating the halt of data @@ -163,28 +170,67 @@ class PebbloSafeLoader(BaseLoader): if "loader_details" in payload: payload["loader_details"]["source_aggr_size"] = self.source_aggr_size payload = Doc(**payload).dict(exclude_unset=True) - load_doc_url = f"{CLASSIFIER_URL}/v1/loader/doc" + load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}" + classified_docs = [] try: - resp = requests.post( - load_doc_url, headers=headers, json=payload, timeout=20 + pebblo_resp = requests.post( + load_doc_url, headers=headers, json=payload, timeout=300 ) - if resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: + classified_docs = json.loads(pebblo_resp.text).get("docs", None) + if pebblo_resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: logger.warning( - f"Received unexpected HTTP response code: {resp.status_code}" + "Received unexpected HTTP response code: %s", + pebblo_resp.status_code, ) logger.debug( - f"send_loader_doc: request \ - url {resp.request.url}, \ - body {str(resp.request.body)[:999]} \ - len {len(resp.request.body if resp.request.body else [])} \ - response status{resp.status_code} body {resp.json()}" + "send_loader_doc[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), + str(pebblo_resp.status_code), + pebblo_resp.json(), ) except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") - except Exception: - logger.warning("An Exception caught in _send_loader_doc.") + except Exception as e: + logger.warning("An Exception caught in _send_loader_doc: %s", e) + + if self.api_key: + if not classified_docs: + logger.warning("No classified docs to send to pebblo-cloud.") + return classified_docs + try: + payload["docs"] = classified_docs + payload["classified"] = True + headers.update({"x-api-key": self.api_key}) + pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}" + pebblo_cloud_response = requests.post( + pebblo_cloud_url, headers=headers, json=payload, timeout=20 + ) + logger.debug( + "send_loader_doc[cloud]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_cloud_response.request.url, + str(pebblo_cloud_response.request.body), + str( + len( + pebblo_cloud_response.request.body + if pebblo_cloud_response.request.body + else [] + ) + ), + str(pebblo_cloud_response.status_code), + pebblo_cloud_response.json(), + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach Pebblo cloud server.") + except Exception as e: + logger.warning("An Exception caught in _send_loader_doc: %s", e) + if loading_end is True: PebbloSafeLoader.set_loader_sent() + return classified_docs @staticmethod def calculate_content_size(page_content: str) -> int: @@ -206,32 +252,64 @@ class PebbloSafeLoader(BaseLoader): def _send_discover(self) -> None: """Send app discovery payload to pebblo-server. Internal method.""" - headers = {"Accept": "application/json", "Content-Type": "application/json"} + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } payload = self.app.dict(exclude_unset=True) - app_discover_url = f"{CLASSIFIER_URL}/v1/app/discover" + app_discover_url = f"{CLASSIFIER_URL}{APP_DISCOVER_URL}" try: - resp = requests.post( + pebblo_resp = requests.post( app_discover_url, headers=headers, json=payload, timeout=20 ) logger.debug( - f"send_discover: request \ - url {resp.request.url}, \ - headers {resp.request.headers}, \ - body {str(resp.request.body)[:999]} \ - len {len(resp.request.body if resp.request.body else [])} \ - response status{resp.status_code} body {resp.json()}" + "send_discover[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), + str(pebblo_resp.status_code), + pebblo_resp.json(), ) - if resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: + if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: PebbloSafeLoader.set_discover_sent() else: logger.warning( - f"Received unexpected HTTP response code: {resp.status_code}" + f"Received unexpected HTTP response code: {pebblo_resp.status_code}" ) except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") except Exception: logger.warning("An Exception caught in _send_discover.") + if self.api_key: + try: + headers.update({"x-api-key": self.api_key}) + pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{APP_DISCOVER_URL}" + pebblo_cloud_response = requests.post( + pebblo_cloud_url, headers=headers, json=payload, timeout=20 + ) + + logger.debug( + "send_discover[cloud]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_cloud_response.request.url, + str(pebblo_cloud_response.request.body), + str( + len( + pebblo_cloud_response.request.body + if pebblo_cloud_response.request.body + else [] + ) + ), + str(pebblo_cloud_response.status_code), + pebblo_cloud_response.json(), + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach Pebblo cloud server.") + except Exception as e: + logger.warning("An Exception caught in _send_discover: %s", e) + def _get_app_details(self) -> App: """Fetch app details. Internal method. diff --git a/libs/community/langchain_community/utilities/pebblo.py b/libs/community/langchain_community/utilities/pebblo.py index a9c5e3bdc2b..5319b17400a 100644 --- a/libs/community/langchain_community/utilities/pebblo.py +++ b/libs/community/langchain_community/utilities/pebblo.py @@ -13,8 +13,12 @@ from langchain_community.document_loaders.base import BaseLoader logger = logging.getLogger(__name__) -PLUGIN_VERSION = "0.1.0" +PLUGIN_VERSION = "0.1.1" CLASSIFIER_URL = os.getenv("PEBBLO_CLASSIFIER_URL", "http://localhost:8000") +PEBBLO_CLOUD_URL = os.getenv("PEBBLO_CLOUD_URL", "https://api.daxa.ai") + +LOADER_DOC_URL = "/v1/loader/doc" +APP_DISCOVER_URL = "/v1/app/discover" # Supported loaders for Pebblo safe data loading file_loader = [ diff --git a/libs/community/tests/unit_tests/document_loaders/test_pebblo.py b/libs/community/tests/unit_tests/document_loaders/test_pebblo.py index 9ab487c8e78..a98f4712287 100644 --- a/libs/community/tests/unit_tests/document_loaders/test_pebblo.py +++ b/libs/community/tests/unit_tests/document_loaders/test_pebblo.py @@ -112,3 +112,23 @@ def test_pdf_lazy_load(mocker: MockerFixture) -> None: # Assert assert len(result) == 2 + + +def test_pebblo_safe_loader_api_key() -> None: + # Setup + from langchain_community.document_loaders import PebbloSafeLoader + + file_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "test_empty.csv") + api_key = "dummy_api_key" + + # Exercise + loader = PebbloSafeLoader( + CSVLoader(file_path=file_path), + "dummy_app_name", + "dummy_owner", + "dummy_description", + api_key=api_key, + ) + + # Assert + assert loader.api_key == api_key