community[minor]: Add support for Pebblo cloud_api_key in PebbloSafeLoader (#19855)

**Description**:
_PebbloSafeLoader_: Add support for pebblo's cloud api-key in
PebbloSafeLoader

- This Pull request enables PebbloSafeLoader to accept pebblo's cloud
api-key and send the semantic classification data to pebblo cloud.

**Documentation**: Updated 
**Unit test**: Added
**Issue**: NA
**Dependencies**: - None
**Twitter handle**: @rahul_tripathi2

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
This commit is contained in:
Rahul Triptahi 2024-04-08 20:40:04 +05:30 committed by GitHub
parent 34a24d4df6
commit 820b713086
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 158 additions and 27 deletions

View File

@ -62,6 +62,35 @@
"documents = loader.load()\n",
"print(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Send semantic topics and identities to Pebblo cloud server\n",
"\n",
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"from langchain_community.document_loaders import PebbloSafeLoader\n",
"\n",
"loader = PebbloSafeLoader(\n",
" CSVLoader(\"data/corp_sens_data.csv\"),\n",
" name=\"acme-corp-rag-1\", # App name (Mandatory)\n",
" owner=\"Joe Smith\", # Owner (Optional)\n",
" description=\"Support productivity RAG application\", # Description (Optional)\n",
" api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n",
")\n",
"documents = loader.load()\n",
"print(documents)"
]
}
],
"metadata": {

View File

@ -1,17 +1,21 @@
"""Pebblo's safe dataloader is a wrapper for document loaders"""
import json
import logging
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List
from typing import Any, Dict, Iterator, List, Optional
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.pebblo import (
APP_DISCOVER_URL,
CLASSIFIER_URL,
LOADER_DOC_URL,
PEBBLO_CLOUD_URL,
PLUGIN_VERSION,
App,
Doc,
@ -38,10 +42,12 @@ class PebbloSafeLoader(BaseLoader):
name: str,
owner: str = "",
description: str = "",
api_key: Optional[str] = None,
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
self.app_name = name
self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.owner = owner
@ -114,8 +120,9 @@ class PebbloSafeLoader(BaseLoader):
def set_loader_sent(cls) -> None:
cls._loader_sent = True
def _send_loader_doc(self, loading_end: bool = False) -> None:
"""Send documents fetched from loader to pebblo-server. Internal method.
def _send_loader_doc(self, loading_end: bool = False) -> list:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.
Args:
loading_end (bool, optional): Flag indicating the halt of data
@ -163,28 +170,67 @@ class PebbloSafeLoader(BaseLoader):
if "loader_details" in payload:
payload["loader_details"]["source_aggr_size"] = self.source_aggr_size
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{CLASSIFIER_URL}/v1/loader/doc"
load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}"
classified_docs = []
try:
resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=20
pebblo_resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=300
)
if resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
classified_docs = json.loads(pebblo_resp.text).get("docs", None)
if pebblo_resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
logger.warning(
f"Received unexpected HTTP response code: {resp.status_code}"
"Received unexpected HTTP response code: %s",
pebblo_resp.status_code,
)
logger.debug(
f"send_loader_doc: request \
url {resp.request.url}, \
body {str(resp.request.body)[:999]} \
len {len(resp.request.body if resp.request.body else [])} \
response status{resp.status_code} body {resp.json()}"
"send_loader_doc[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_loader_doc.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
if self.api_key:
if not classified_docs:
logger.warning("No classified docs to send to pebblo-cloud.")
return classified_docs
try:
payload["docs"] = classified_docs
payload["classified"] = True
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}"
pebblo_cloud_response = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_loader_doc[cloud]: request url %s, body %s len %s\
response status %s body %s",
pebblo_cloud_response.request.url,
str(pebblo_cloud_response.request.body),
str(
len(
pebblo_cloud_response.request.body
if pebblo_cloud_response.request.body
else []
)
),
str(pebblo_cloud_response.status_code),
pebblo_cloud_response.json(),
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
return classified_docs
@staticmethod
def calculate_content_size(page_content: str) -> int:
@ -206,32 +252,64 @@ class PebbloSafeLoader(BaseLoader):
def _send_discover(self) -> None:
"""Send app discovery payload to pebblo-server. Internal method."""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
payload = self.app.dict(exclude_unset=True)
app_discover_url = f"{CLASSIFIER_URL}/v1/app/discover"
app_discover_url = f"{CLASSIFIER_URL}{APP_DISCOVER_URL}"
try:
resp = requests.post(
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
logger.debug(
f"send_discover: request \
url {resp.request.url}, \
headers {resp.request.headers}, \
body {str(resp.request.body)[:999]} \
len {len(resp.request.body if resp.request.body else [])} \
response status{resp.status_code} body {resp.json()}"
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloSafeLoader.set_discover_sent()
else:
logger.warning(
f"Received unexpected HTTP response code: {resp.status_code}"
f"Received unexpected HTTP response code: {pebblo_resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_discover.")
if self.api_key:
try:
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{APP_DISCOVER_URL}"
pebblo_cloud_response = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_discover[cloud]: request url %s, body %s len %s\
response status %s body %s",
pebblo_cloud_response.request.url,
str(pebblo_cloud_response.request.body),
str(
len(
pebblo_cloud_response.request.body
if pebblo_cloud_response.request.body
else []
)
),
str(pebblo_cloud_response.status_code),
pebblo_cloud_response.json(),
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: %s", e)
def _get_app_details(self) -> App:
"""Fetch app details. Internal method.

View File

@ -13,8 +13,12 @@ from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
PLUGIN_VERSION = "0.1.0"
PLUGIN_VERSION = "0.1.1"
CLASSIFIER_URL = os.getenv("PEBBLO_CLASSIFIER_URL", "http://localhost:8000")
PEBBLO_CLOUD_URL = os.getenv("PEBBLO_CLOUD_URL", "https://api.daxa.ai")
LOADER_DOC_URL = "/v1/loader/doc"
APP_DISCOVER_URL = "/v1/app/discover"
# Supported loaders for Pebblo safe data loading
file_loader = [

View File

@ -112,3 +112,23 @@ def test_pdf_lazy_load(mocker: MockerFixture) -> None:
# Assert
assert len(result) == 2
def test_pebblo_safe_loader_api_key() -> None:
# Setup
from langchain_community.document_loaders import PebbloSafeLoader
file_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "test_empty.csv")
api_key = "dummy_api_key"
# Exercise
loader = PebbloSafeLoader(
CSVLoader(file_path=file_path),
"dummy_app_name",
"dummy_owner",
"dummy_description",
api_key=api_key,
)
# Assert
assert loader.api_key == api_key