community[minor]: Add pebblo safe document loader (#16862)

- **Description:** Pebblo opensource project enables developers to
safely load data to their Gen AI apps. It identifies semantic topics and
entities found in the loaded data and summarizes them in a
developer-friendly report.
  - **Dependencies:** none
  - **Twitter handle:** srics

@hwchase17
This commit is contained in:
Sridhar Ramaswamy 2024-02-12 21:56:12 -08:00 committed by GitHub
parent 0834457f28
commit 9f1cbbc6ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 748 additions and 0 deletions

View File

@ -0,0 +1,88 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pebblo Safe DocumentLoader\n",
"\n",
"> [Pebblo](https://github.com/daxa-ai/pebblo) enables developers to safely load data and promote their Gen AI app to deployment without worrying about the organizations compliance and security requirements. The project identifies semantic topics and entities found in the loaded data and summarizes them on the UI or a PDF report.\n",
"\n",
"Pebblo has two components.\n",
"\n",
"1. Pebblo Safe DocumentLoader for Langchain\n",
"1. Pebblo Daemon\n",
"\n",
"This document describes how to augment your existing Langchain DocumentLoader with Pebblo Safe DocumentLoader to get deep data visibility on the types of Topics and Entities ingested into the Gen-AI Langchain application. For details on `Pebblo Daemon` see this [pebblo daemon](https://daxa-ai.github.io/pebblo-docs/daemon.html) document.\n",
"\n",
"Pebblo Safeloader enables safe data ingestion for Langchain `DocumentLoader`. This is done by wrapping the document loader call with `Pebblo Safe DocumentLoader`.\n",
"\n",
"#### How to Pebblo enable Document Loading?\n",
"\n",
"Assume a Langchain RAG application snippet using `CSVLoader` to read a CSV document for inference.\n",
"\n",
"Here is the snippet of Document loading using `CSVLoader`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"\n",
"loader = CSVLoader(\"data/corp_sens_data.csv\")\n",
"documents = loader.load()\n",
"print(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The Pebblo SafeLoader can be enabled with few lines of code change to the above snippet."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"from langchain_community.document_loaders import PebbloSafeLoader\n",
"\n",
"loader = PebbloSafeLoader(\n",
" CSVLoader(\"data/corp_sens_data.csv\"),\n",
" name=\"acme-corp-rag-1\", # App name (Mandatory)\n",
" owner=\"Joe Smith\", # Owner (Optional)\n",
" description=\"Support productivity RAG application\", # Description (Optional)\n",
")\n",
"documents = loader.load()\n",
"print(documents)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -160,6 +160,7 @@ from langchain_community.document_loaders.pdf import (
PyPDFLoader,
UnstructuredPDFLoader,
)
from langchain_community.document_loaders.pebblo import PebbloSafeLoader
from langchain_community.document_loaders.polars_dataframe import PolarsDataFrameLoader
from langchain_community.document_loaders.powerpoint import UnstructuredPowerPointLoader
from langchain_community.document_loaders.psychic import PsychicLoader
@ -284,6 +285,7 @@ __all__ = [
"CubeSemanticLoader",
"DataFrameLoader",
"DatadogLogsLoader",
"PebbloSafeLoader",
"DiffbotLoader",
"DirectoryLoader",
"DiscordChatLoader",

View File

@ -0,0 +1,291 @@
"""Pebblo's safe dataloader is a wrapper for document loaders"""
import logging
import os
import pwd
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.pebblo import (
CLASSIFIER_URL,
PLUGIN_VERSION,
App,
Doc,
get_full_path,
get_loader_full_path,
get_loader_type,
get_runtime,
)
logger = logging.getLogger(__name__)
class PebbloSafeLoader(BaseLoader):
"""Pebblo Safe Loader class is a wrapper around document loaders enabling the data
to be scrutinized.
"""
_discover_sent: bool = False
_loader_sent: bool = False
def __init__(
self,
langchain_loader: BaseLoader,
name: str,
owner: str = "",
description: str = "",
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
self.app_name = name
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.owner = owner
self.description = description
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
self.source_aggr_size = 0
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
"source_type": self.source_type,
**(
{"source_path_size": str(self.source_path_size)}
if self.source_path_size > 0
else {}
),
}
# generate app
self.app = self._get_app_details()
self._send_discover()
def load(self) -> List[Document]:
"""Load Documents.
Returns:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
self._send_loader_doc(loading_end=True)
return self.docs
def lazy_load(self) -> Iterator[Document]:
"""Load documents in lazy fashion.
Raises:
NotImplementedError: raised when lazy_load id not implemented
within wrapped loader.
Yields:
list: Documents from loader's lazy loading.
"""
try:
doc_iterator = self.loader.lazy_load()
except NotImplementedError as exc:
err_str = f"{self.loader.__class__.__name__} does not implement lazy_load()"
logger.error(err_str)
raise NotImplementedError(err_str) from exc
while True:
try:
doc = next(doc_iterator)
except StopIteration:
self.docs = []
self._send_loader_doc(loading_end=True)
break
self.docs = [
doc,
]
self._send_loader_doc()
yield doc
@classmethod
def set_discover_sent(cls) -> None:
cls._discover_sent = True
@classmethod
def set_loader_sent(cls) -> None:
cls._loader_sent = True
def _send_loader_doc(self, loading_end: bool = False) -> None:
"""Send documents fetched from loader to pebblo-server. Internal method.
Args:
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
"""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
doc_content = [doc.dict() for doc in self.docs]
docs = []
for doc in doc_content:
doc_source_path = get_full_path(doc.get("metadata", {}).get("source"))
doc_source_owner = PebbloSafeLoader.get_file_owner_from_path(
doc_source_path
)
doc_source_size = self.get_source_size(doc_source_path)
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggr_size += page_content_size
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
{"source_path_size": doc_source_size}
if doc_source_size is not None
else {}
),
}
)
payload: Dict[str, Any] = {
"name": self.app_name,
"owner": self.owner,
"docs": docs,
"plugin_version": PLUGIN_VERSION,
"load_id": self.load_id,
"loader_details": self.loader_details,
"loading_end": "false",
"source_owner": self.source_owner,
}
if loading_end is True:
payload["loading_end"] = "true"
if "loader_details" in payload:
payload["loader_details"]["source_aggr_size"] = self.source_aggr_size
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{CLASSIFIER_URL}/v1/loader/doc"
try:
resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=20
)
if resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
logger.warning(
f"Received unexpected HTTP response code: {resp.status_code}"
)
logger.debug(
f"send_loader_doc: request \
url {resp.request.url}, \
body {str(resp.request.body)[:999]} \
len {len(resp.request.body if resp.request.body else [])} \
response status{resp.status_code} body {resp.json()}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_loader_doc.")
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
@staticmethod
def calculate_content_size(page_content: str) -> int:
"""Calculate the content size in bytes:
- Encode the string to bytes using a specific encoding (e.g., UTF-8)
- Get the length of the encoded bytes.
Args:
page_content (str): Data string.
Returns:
int: Size of string in bytes.
"""
# Encode the content to bytes using UTF-8
encoded_content = page_content.encode("utf-8")
size = len(encoded_content)
return size
def _send_discover(self) -> None:
"""Send app discovery payload to pebblo-server. Internal method."""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
payload = self.app.dict(exclude_unset=True)
app_discover_url = f"{CLASSIFIER_URL}/v1/app/discover"
try:
resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
logger.debug(
f"send_discover: request \
url {resp.request.url}, \
headers {resp.request.headers}, \
body {str(resp.request.body)[:999]} \
len {len(resp.request.body if resp.request.body else [])} \
response status{resp.status_code} body {resp.json()}"
)
if resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloSafeLoader.set_discover_sent()
else:
logger.warning(
f"Received unexpected HTTP response code: {resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_discover.")
def _get_app_details(self) -> App:
"""Fetch app details. Internal method.
Returns:
App: App details.
"""
framework, runtime = get_runtime()
app = App(
name=self.app_name,
owner=self.owner,
description=self.description,
load_id=self.load_id,
runtime=runtime,
framework=framework,
plugin_version=PLUGIN_VERSION,
)
return app
@staticmethod
def get_file_owner_from_path(file_path: str) -> str:
"""Fetch owner of local file path.
Args:
file_path (str): Local file path.
Returns:
str: Name of owner.
"""
try:
file_owner_uid = os.stat(file_path).st_uid
file_owner_name = pwd.getpwuid(file_owner_uid).pw_name
except Exception:
file_owner_name = "unknown"
return file_owner_name
def get_source_size(self, source_path: str) -> int:
"""Fetch size of source path. Source can be a directory or a file.
Args:
source_path (str): Local path of data source.
Returns:
int: Source size in bytes.
"""
if not source_path:
return 0
size = 0
if os.path.isfile(source_path):
size = os.path.getsize(source_path)
elif os.path.isdir(source_path):
total_size = 0
for dirpath, _, filenames in os.walk(source_path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
size = total_size
return size

View File

@ -0,0 +1,249 @@
from __future__ import annotations
import logging
import os
import pathlib
import platform
from typing import Optional, Tuple
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
PLUGIN_VERSION = "0.1.0"
CLASSIFIER_URL = os.getenv("PEBBLO_CLASSIFIER_URL", "http://localhost:8000")
# Supported loaders for Pebblo safe data loading
file_loader = [
"JSONLoader",
"S3FileLoader",
"UnstructuredMarkdownLoader",
"UnstructuredPDFLoader",
"UnstructuredFileLoader",
"UnstructuredJsonLoader",
"PyPDFLoader",
"GCSFileLoader",
"AmazonTextractPDFLoader",
"CSVLoader",
"UnstructuredExcelLoader",
]
dir_loader = ["DirectoryLoader", "S3DirLoader", "PyPDFDirectoryLoader"]
in_memory = ["DataFrameLoader"]
LOADER_TYPE_MAPPING = {"file": file_loader, "dir": dir_loader, "in-memory": in_memory}
SUPPORTED_LOADERS = (*file_loader, *dir_loader, *in_memory)
logger = logging.getLogger(__name__)
class Runtime(BaseModel):
"""This class represents a Runtime.
Args:
type (Optional[str]): Runtime type. Defaults to ""
host (str): Hostname of runtime.
path (str): Current working directory path.
ip (Optional[str]): Ip of current runtime. Defaults to ""
platform (str): Platform details of current runtime.
os (str): OS name.
os_version (str): OS version.
language (str): Runtime kernel.
language_version (str): version of current runtime kernel.
runtime (Optional[str]) More runtime details. Defaults to ""
"""
type: str = "local"
host: str
path: str
ip: Optional[str] = ""
platform: str
os: str
os_version: str
language: str
language_version: str
runtime: str = "local"
class Framework(BaseModel):
"""This class represents a Framework instance.
Args:
name (str): Name of the Framework.
version (str): Version of the Framework.
"""
name: str
version: str
class App(BaseModel):
"""This class represents an AI application.
Args:
name (str): Name of the app.
owner (str): Owner of the app.
description (Optional[str]): Description of the app.
load_id (str): Unique load_id of the app instance.
runtime (Runtime): Runtime details of app.
framework (Framework): Framework details of the app
plugin_version (str): Plugin version used for the app.
"""
name: str
owner: str
description: Optional[str]
load_id: str
runtime: Runtime
framework: Framework
plugin_version: str
class Doc(BaseModel):
"""This class represents a pebblo document.
Args:
name (str): Name of app originating this document.
owner (str): Owner of app.
docs (list): List of documents with its metadata.
plugin_version (str): Pebblo plugin Version
load_id (str): Unique load_id of the app instance.
loader_details (dict): Loader details with its metadata.
loading_end (bool): Boolean, specifying end of loading of source.
source_owner (str): Owner of the source of the loader.
"""
name: str
owner: str
docs: list
plugin_version: str
load_id: str
loader_details: dict
loading_end: bool
source_owner: str
def get_full_path(path: str) -> str:
"""Return absolute local path for a local file/directory,
for network related path, return as is.
Args:
path (str): Relative path to be resolved.
Returns:
str: Resolved absolute path.
"""
if (
not path
or ("://" in path)
or ("/" == path[0])
or (path in ["unknown", "-", "in-memory"])
):
return path
full_path = pathlib.Path(path).resolve()
return str(full_path)
def get_loader_type(loader: str) -> str:
"""Return loader type among, file, dir or in-memory.
Args:
loader (str): Name of the loader, whose type is to be resolved.
Returns:
str: One of the loader type among, file/dir/in-memory.
"""
for loader_type, loaders in LOADER_TYPE_MAPPING.items():
if loader in loaders:
return loader_type
return "unknown"
def get_loader_full_path(loader: BaseLoader) -> str:
"""Return absolute source path of source of loader based on the
keys present in Document object from loader.
Args:
loader (BaseLoader): Langchain document loader, derived from Baseloader.
"""
from langchain_community.document_loaders import (
DataFrameLoader,
GCSFileLoader,
S3FileLoader,
)
location = "-"
if not isinstance(loader, BaseLoader):
logger.error(
"loader is not derived from BaseLoader, source location will be unknown!"
)
return location
loader_dict = loader.__dict__
try:
if "bucket" in loader_dict:
if isinstance(loader, GCSFileLoader):
location = f"gc://{loader.bucket}/{loader.blob}"
elif isinstance(loader, S3FileLoader):
location = f"s3://{loader.bucket}/{loader.key}"
elif "path" in loader_dict:
location = loader_dict["path"]
elif "file_path" in loader_dict:
location = loader_dict["file_path"]
elif "web_paths" in loader_dict:
location = loader_dict["web_paths"][0]
# For in-memory types:
elif isinstance(loader, DataFrameLoader):
location = "in-memory"
except Exception:
pass
return get_full_path(str(location))
def get_runtime() -> Tuple[Framework, Runtime]:
"""Fetch the current Framework and Runtime details.
Returns:
Tuple[Framework, Runtime]: Framework and Runtime for the current app instance.
"""
runtime_env = get_runtime_environment()
framework = Framework(
name="langchain", version=runtime_env.get("library_version", None)
)
uname = platform.uname()
runtime = Runtime(
host=uname.node,
path=os.environ["PWD"],
platform=runtime_env.get("platform", "unknown"),
os=uname.system,
os_version=uname.version,
ip=get_ip(),
language=runtime_env.get("runtime", "unknown"),
language_version=runtime_env.get("runtime_version", "unknown"),
)
if "Darwin" in runtime.os:
runtime.type = "desktop"
runtime.runtime = "Mac OSX"
logger.debug(f"framework {framework}")
logger.debug(f"runtime {runtime}")
return framework, runtime
def get_ip() -> str:
"""Fetch local runtime ip address
Returns:
str: IP address
"""
import socket # lazy imports
host = socket.gethostname()
try:
public_ip = socket.gethostbyname(host)
except Exception:
public_ip = socket.gethostbyname("localhost")
return public_ip

View File

@ -0,0 +1,3 @@
column1,column2,column3
value1,value2,value3
value4,value5,value6
1 column1 column2 column3
2 value1 value2 value3
3 value4 value5 value6

View File

@ -49,6 +49,7 @@ EXPECTED_ALL = [
"CubeSemanticLoader",
"DataFrameLoader",
"DatadogLogsLoader",
"PebbloSafeLoader",
"DiffbotLoader",
"DirectoryLoader",
"DiscordChatLoader",

View File

@ -0,0 +1,114 @@
import os
from pathlib import Path
from typing import Dict
import pytest
from langchain_core.documents import Document
from pytest_mock import MockerFixture
from langchain_community.document_loaders import CSVLoader, PyPDFLoader
EXAMPLE_DOCS_DIRECTORY = str(Path(__file__).parent.parent.parent / "examples/")
class MockResponse:
def __init__(self, json_data: Dict, status_code: int):
self.json_data = json_data
self.status_code = status_code
def json(self) -> Dict:
return self.json_data
def test_pebblo_import() -> None:
"""Test that the Pebblo safe loader can be imported."""
from langchain_community.document_loaders import PebbloSafeLoader # noqa: F401
def test_empty_filebased_loader(mocker: MockerFixture) -> None:
"""Test basic file based csv loader."""
# Setup
from langchain_community.document_loaders import PebbloSafeLoader
mocker.patch.multiple(
"requests",
get=MockResponse(json_data={"data": ""}, status_code=200),
post=MockResponse(json_data={"data": ""}, status_code=200),
)
file_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "test_empty.csv")
expected_docs: list = []
# Exercise
loader = PebbloSafeLoader(
CSVLoader(file_path=file_path),
"dummy_app_name",
"dummy_owner",
"dummy_description",
)
result = loader.load()
# Assert
assert result == expected_docs
def test_csv_loader_load_valid_data(mocker: MockerFixture) -> None:
# Setup
from langchain_community.document_loaders import PebbloSafeLoader
mocker.patch.multiple(
"requests",
get=MockResponse(json_data={"data": ""}, status_code=200),
post=MockResponse(json_data={"data": ""}, status_code=200),
)
file_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "test_nominal.csv")
expected_docs = [
Document(
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0},
),
Document(
page_content="column1: value4\ncolumn2: value5\ncolumn3: value6",
metadata={"source": file_path, "row": 1},
),
]
# Exercise
loader = PebbloSafeLoader(
CSVLoader(file_path=file_path),
"dummy_app_name",
"dummy_owner",
"dummy_description",
)
result = loader.load()
# Assert
assert result == expected_docs
@pytest.mark.requires("pypdf")
def test_pdf_lazy_load(mocker: MockerFixture) -> None:
# Setup
from langchain_community.document_loaders import PebbloSafeLoader
mocker.patch.multiple(
"requests",
get=MockResponse(json_data={"data": ""}, status_code=200),
post=MockResponse(json_data={"data": ""}, status_code=200),
)
file_path = os.path.join(
EXAMPLE_DOCS_DIRECTORY, "multi-page-forms-sample-2-page.pdf"
)
# Exercise
loader = PebbloSafeLoader(
PyPDFLoader(file_path=file_path),
"dummy_app_name",
"dummy_owner",
"dummy_description",
)
result = list(loader.lazy_load())
# Assert
assert len(result) == 2