1
0
mirror of https://github.com/hwchase17/langchain.git synced 2025-05-04 06:37:58 +00:00

community[minor]: Added classification_location parameter in PebbloSafeLoader. ()

Description: Add classifier_location feature flag. This flag enables
Pebblo to decide the classifier location, local or pebblo-cloud.
Unit Tests: N/A
Documentation: N/A

---------

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
This commit is contained in:
Rahul Triptahi 2024-06-25 03:00:38 +05:30 committed by GitHub
parent 2115fb76de
commit 9ef93ecd7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 247 additions and 173 deletions
libs/community/langchain_community
chains/pebblo_retrieval
document_loaders
utilities

View File

@ -5,6 +5,7 @@ against a vector database.
import datetime
import inspect
import json
import logging
from http import HTTPStatus
from typing import Any, Dict, List, Optional
@ -72,7 +73,9 @@ class PebbloRetrievalQA(Chain):
"""Pebblo cloud API key for app."""
classifier_url: str = CLASSIFIER_URL #: :meta private:
"""Classifier endpoint."""
_discover_sent: bool = False #: :meta private:
classifier_location: str = "local" #: :meta private:
"""Classifier location. It could be either of 'local' or 'pebblo-cloud'."""
_discover_sent = False #: :meta private:
"""Flag to check if discover payload has been sent."""
_prompt_sent: bool = False #: :meta private:
"""Flag to check if prompt payload has been sent."""
@ -94,6 +97,7 @@ class PebbloRetrievalQA(Chain):
answer, docs = res['result'], res['source_documents']
"""
prompt_time = datetime.datetime.now().isoformat()
PebbloRetrievalQA.set_prompt_sent(value=False)
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
question = inputs[self.input_key]
auth_context = inputs.get(self.auth_context_key, {})
@ -115,7 +119,9 @@ class PebbloRetrievalQA(Chain):
"name": self.app_name,
"context": [
{
"retrieved_from": doc.metadata.get("source"),
"retrieved_from": doc.metadata.get(
"full_path", doc.metadata.get("source")
),
"doc": doc.page_content,
"vector_db": self.retriever.vectorstore.__class__.__name__,
}
@ -131,6 +137,7 @@ class PebbloRetrievalQA(Chain):
"user_identities": auth_context.user_auth
if auth_context and hasattr(auth_context, "user_auth")
else [],
"classifier_location": self.classifier_location,
}
qa_payload = Qa(**qa)
self._send_prompt(qa_payload)
@ -220,6 +227,7 @@ class PebbloRetrievalQA(Chain):
chain_type_kwargs: Optional[dict] = None,
api_key: Optional[str] = None,
classifier_url: str = CLASSIFIER_URL,
classifier_location: str = "local",
**kwargs: Any,
) -> "PebbloRetrievalQA":
"""Load chain from chain type."""
@ -231,7 +239,7 @@ class PebbloRetrievalQA(Chain):
)
# generate app
app = PebbloRetrievalQA._get_app_details(
app: App = PebbloRetrievalQA._get_app_details(
app_name=app_name,
description=description,
owner=owner,
@ -240,7 +248,10 @@ class PebbloRetrievalQA(Chain):
)
PebbloRetrievalQA._send_discover(
app, api_key=api_key, classifier_url=classifier_url
app,
api_key=api_key,
classifier_url=classifier_url,
classifier_location=classifier_location,
)
return cls(
@ -250,6 +261,7 @@ class PebbloRetrievalQA(Chain):
description=description,
api_key=api_key,
classifier_url=classifier_url,
classifier_location=classifier_location,
**kwargs,
)
@ -300,7 +312,9 @@ class PebbloRetrievalQA(Chain):
)
@staticmethod
def _get_app_details(app_name, owner, description, llm, **kwargs) -> App: # type: ignore
def _get_app_details( # type: ignore
app_name: str, owner: str, description: str, llm: BaseLanguageModel, **kwargs
) -> App:
"""Fetch app details. Internal method.
Returns:
App: App details.
@ -319,38 +333,49 @@ class PebbloRetrievalQA(Chain):
return app
@staticmethod
def _send_discover(app, api_key, classifier_url) -> None: # type: ignore
def _send_discover(
app: App,
api_key: Optional[str],
classifier_url: str,
classifier_location: str,
) -> None: # type: ignore
"""Send app discovery payload to pebblo-server. Internal method."""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
payload = app.dict(exclude_unset=True)
app_discover_url = f"{classifier_url}{APP_DISCOVER_URL}"
try:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
logger.debug("discover-payload: %s", payload)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloRetrievalQA.set_discover_sent()
else:
logger.warning(
f"Received unexpected HTTP response code: {pebblo_resp.status_code}"
if classifier_location == "local":
app_discover_url = f"{classifier_url}{APP_DISCOVER_URL}"
try:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
logger.debug("discover-payload: %s", payload)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(
len(
pebblo_resp.request.body if pebblo_resp.request.body else []
)
),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloRetrievalQA.set_discover_sent()
else:
logger.warning(
"Received unexpected HTTP response code:"
+ f"{pebblo_resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
if api_key:
try:
@ -385,8 +410,8 @@ class PebbloRetrievalQA(Chain):
cls._discover_sent = True
@classmethod
def set_prompt_sent(cls) -> None:
cls._prompt_sent = True
def set_prompt_sent(cls, value: bool = True) -> None:
cls._prompt_sent = value
def _send_prompt(self, qa_payload: Qa) -> None:
headers = {
@ -394,39 +419,73 @@ class PebbloRetrievalQA(Chain):
"Content-Type": "application/json",
}
app_discover_url = f"{self.classifier_url}{PROMPT_URL}"
try:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=qa_payload.dict(), timeout=20
)
logger.debug("prompt-payload: %s", qa_payload)
logger.debug(
"send_prompt[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloRetrievalQA.set_prompt_sent()
else:
logger.warning(
f"Received unexpected HTTP response code: {pebblo_resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
if self.api_key:
pebblo_resp = None
payload = qa_payload.dict(exclude_unset=True)
if self.classifier_location == "local":
try:
pebblo_resp = requests.post(
app_discover_url,
headers=headers,
json=payload,
timeout=20,
)
logger.debug("prompt-payload: %s", payload)
logger.debug(
"send_prompt[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(
len(
pebblo_resp.request.body if pebblo_resp.request.body else []
)
),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloRetrievalQA.set_prompt_sent()
else:
logger.warning(
"Received unexpected HTTP response code:"
+ f"{pebblo_resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
# If classifier location is local, then response, context and prompt
# should be fetched from pebblo_resp and replaced in payload.
if self.api_key:
if self.classifier_location == "local":
if pebblo_resp:
payload["response"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("response", {})
)
payload["context"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("context", [])
)
payload["prompt"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("prompt", {})
)
else:
payload["response"] = None
payload["context"] = None
payload["prompt"] = None
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}"
try:
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}"
pebblo_cloud_response = requests.post(
pebblo_cloud_url,
headers=headers,
json=qa_payload.dict(),
json=payload,
timeout=20,
)
@ -449,9 +508,12 @@ class PebbloRetrievalQA(Chain):
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_prompt: cloud %s", e)
elif self.classifier_location == "pebblo-cloud":
logger.warning("API key is missing for sending prompt to Pebblo cloud.")
raise NameError("API key is missing for sending prompt to Pebblo cloud.")
@classmethod
def get_chain_details(cls, llm, **kwargs): # type: ignore
def get_chain_details(cls, llm: BaseLanguageModel, **kwargs): # type: ignore
llm_dict = llm.__dict__
chain = [
{
@ -474,6 +536,6 @@ class PebbloRetrievalQA(Chain):
),
}
],
}
},
]
return chain

View File

@ -1,6 +1,6 @@
"""Models for the PebbloRetrievalQA chain."""
from typing import Any, List, Optional
from typing import Any, List, Optional, Union
from langchain_core.pydantic_v1 import BaseModel
@ -137,9 +137,10 @@ class Prompt(BaseModel):
class Qa(BaseModel):
name: str
context: List[Optional[Context]]
prompt: Prompt
response: Prompt
context: Union[List[Optional[Context]], Optional[Context]]
prompt: Optional[Prompt]
response: Optional[Prompt]
prompt_time: str
user: str
user_identities: Optional[List[str]]
classifier_location: str

View File

@ -46,6 +46,8 @@ class PebbloSafeLoader(BaseLoader):
api_key: Optional[str] = None,
load_semantic: bool = False,
classifier_url: Optional[str] = None,
*,
classifier_location: str = "local",
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
@ -65,6 +67,7 @@ class PebbloSafeLoader(BaseLoader):
self.source_path_size = self.get_source_size(self.source_path)
self.source_aggregate_size = 0
self.classifier_url = classifier_url or CLASSIFIER_URL
self.classifier_location = classifier_location
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
@ -158,6 +161,7 @@ class PebbloSafeLoader(BaseLoader):
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
docs = []
classified_docs = []
for doc in doc_content:
doc_metadata = doc.get("metadata", {})
doc_authorized_identities = doc_metadata.get("authorized_identities", [])
@ -204,6 +208,7 @@ class PebbloSafeLoader(BaseLoader):
"loader_details": self.loader_details,
"loading_end": "false",
"source_owner": self.source_owner,
"classifier_location": self.classifier_location,
}
if loading_end is True:
payload["loading_end"] = "true"
@ -212,39 +217,46 @@ class PebbloSafeLoader(BaseLoader):
"source_aggregate_size"
] = self.source_aggregate_size
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}"
classified_docs = []
try:
pebblo_resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=300
)
classified_docs = json.loads(pebblo_resp.text).get("docs", None)
if pebblo_resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
logger.warning(
"Received unexpected HTTP response code: %s",
pebblo_resp.status_code,
)
logger.debug(
"send_loader_doc[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: local %s", e)
if self.api_key:
if not classified_docs:
return classified_docs
# Raw payload to be sent to classifier
if self.classifier_location == "local":
load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}"
try:
pebblo_resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=300
)
classified_docs = json.loads(pebblo_resp.text).get("docs", None)
if pebblo_resp.status_code not in [
HTTPStatus.OK,
HTTPStatus.BAD_GATEWAY,
]:
logger.warning(
"Received unexpected HTTP response code: %s",
pebblo_resp.status_code,
)
logger.debug(
"send_loader_doc[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(
len(
pebblo_resp.request.body if pebblo_resp.request.body else []
)
),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: local %s", e)
if self.api_key:
if self.classifier_location == "local":
payload["docs"] = classified_docs
payload["classified"] = True
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}"
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}"
try:
pebblo_cloud_response = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
@ -267,9 +279,10 @@ class PebbloSafeLoader(BaseLoader):
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: cloud %s", e)
elif self.classifier_location == "pebblo-cloud":
logger.warning("API key is missing for sending docs to Pebblo cloud.")
raise NameError("API key is missing for sending docs to Pebblo cloud.")
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
return classified_docs
@staticmethod
@ -298,45 +311,50 @@ class PebbloSafeLoader(BaseLoader):
"Content-Type": "application/json",
}
payload = self.app.dict(exclude_unset=True)
app_discover_url = f"{self.classifier_url}{APP_DISCOVER_URL}"
try:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloSafeLoader.set_discover_sent()
else:
logger.warning(
f"Received unexpected HTTP response code: {pebblo_resp.status_code}"
# Raw discover payload to be sent to classifier
if self.classifier_location == "local":
app_discover_url = f"{self.classifier_url}{APP_DISCOVER_URL}"
try:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
pebblo_resp.request.url,
str(pebblo_resp.request.body),
str(
len(
pebblo_resp.request.body if pebblo_resp.request.body else []
)
),
str(pebblo_resp.status_code),
pebblo_resp.json(),
)
if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]:
PebbloSafeLoader.set_discover_sent()
else:
logger.warning(
f"Received unexpected HTTP response code:\
{pebblo_resp.status_code}"
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
if self.api_key:
try:
headers.update({"x-api-key": self.api_key})
# If the pebblo_resp is None,
# then the pebblo server version is not available
if pebblo_resp:
pebblo_server_version = json.loads(pebblo_resp.text).get(
"pebblo_server_version"
)
payload.update(
{
"pebblo_server_version": pebblo_server_version,
"pebblo_client_version": payload["plugin_version"],
}
)
payload.pop("plugin_version")
payload.update({"pebblo_server_version": pebblo_server_version})
payload.update({"pebblo_client_version": PLUGIN_VERSION})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{APP_DISCOVER_URL}"
pebblo_cloud_response = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20

View File

@ -63,93 +63,86 @@ logger = logging.getLogger(__name__)
class IndexedDocument(Document):
"""Pebblo Indexed Document."""
id: str
"""Unique ID of the document."""
class Runtime(BaseModel):
"""Pebblo Runtime.
Args:
type (Optional[str]): Runtime type. Defaults to ""
host (str): Hostname of runtime.
path (str): Current working directory path.
ip (Optional[str]): Ip of current runtime. Defaults to ""
platform (str): Platform details of current runtime.
os (str): OS name.
os_version (str): OS version.
language (str): Runtime kernel.
language_version (str): version of current runtime kernel.
runtime (Optional[str]) More runtime details. Defaults to ""
"""
"""Pebblo Runtime."""
type: str = "local"
"""Runtime type. Defaults to 'local'."""
host: str
"""Host name of the runtime."""
path: str
"""Current working directory path."""
ip: Optional[str] = ""
"""IP address of the runtime. Defaults to ''."""
platform: str
"""Platform details of the runtime."""
os: str
"""OS name."""
os_version: str
"""OS version."""
language: str
"""Runtime kernel."""
language_version: str
"""Version of the runtime kernel."""
runtime: str = "local"
"""More runtime details. Defaults to 'local'."""
class Framework(BaseModel):
"""Pebblo Framework instance.
Args:
name (str): Name of the Framework.
version (str): Version of the Framework.
"""
"""Pebblo Framework instance."""
name: str
"""Name of the Framework."""
version: str
"""Version of the Framework."""
class App(BaseModel):
"""Pebblo AI application.
Args:
name (str): Name of the app.
owner (str): Owner of the app.
description (Optional[str]): Description of the app.
load_id (str): Unique load_id of the app instance.
runtime (Runtime): Runtime details of app.
framework (Framework): Framework details of the app
plugin_version (str): Plugin version used for the app.
"""
"""Pebblo AI application."""
name: str
"""Name of the app."""
owner: str
"""Owner of the app."""
description: Optional[str]
"""Description of the app."""
load_id: str
"""Unique load_id of the app instance."""
runtime: Runtime
"""Runtime details of the app."""
framework: Framework
"""Framework details of the app."""
plugin_version: str
"""Plugin version used for the app."""
class Doc(BaseModel):
"""Pebblo document.
Args:
name (str): Name of app originating this document.
owner (str): Owner of app.
docs (list): List of documents with its metadata.
plugin_version (str): Pebblo plugin Version
load_id (str): Unique load_id of the app instance.
loader_details (dict): Loader details with its metadata.
loading_end (bool): Boolean, specifying end of loading of source.
source_owner (str): Owner of the source of the loader.
"""
"""Pebblo document."""
name: str
"""Name of app originating this document."""
owner: str
"""Owner of app."""
docs: list
"""List of documents with its metadata."""
plugin_version: str
"""Pebblo plugin Version"""
load_id: str
"""Unique load_id of the app instance."""
loader_details: dict
"""Loader details with its metadata."""
loading_end: bool
"""Boolean, specifying end of loading of source."""
source_owner: str
"""Owner of the source of the loader."""
classifier_location: str
"""Location of the classifier."""
def get_full_path(path: str) -> str: