mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-13 21:47:12 +00:00
Add quip loader (#12259)
- **Description:** implement [quip](https://quip.com) loader - **Issue:** https://github.com/langchain-ai/langchain/issues/10352 - **Dependencies:** No - pass make format, make lint, make test --------- Co-authored-by: Hao Fan <h_fan@apple.com> Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
232
libs/langchain/langchain/document_loaders/quip.py
Normal file
232
libs/langchain/langchain/document_loaders/quip.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import logging
|
||||
import re
|
||||
import xml.etree.cElementTree
|
||||
import xml.sax.saxutils
|
||||
from io import BytesIO
|
||||
from typing import List, Optional, Sequence
|
||||
from xml.etree.ElementTree import ElementTree
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MAXIMUM_TITLE_LENGTH = 64
|
||||
|
||||
|
||||
class QuipLoader(BaseLoader):
|
||||
"""Load `Quip` pages.
|
||||
|
||||
Port of https://github.com/quip/quip-api/tree/master/samples/baqup
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, api_url: str, access_token: str, request_timeout: Optional[int] = 60
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
api_url: https://platform.quip.com
|
||||
access_token: token of access quip API. Please refer:
|
||||
https://quip.com/dev/automation/documentation/current#section/Authentication/Get-Access-to-Quip's-APIs
|
||||
request_timeout: timeout of request, default 60s.
|
||||
"""
|
||||
try:
|
||||
from quip_api.quip import QuipClient
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"`quip_api` package not found, please run " "`pip install quip_api`"
|
||||
)
|
||||
|
||||
self.quip_client = QuipClient(
|
||||
access_token=access_token, base_url=api_url, request_timeout=request_timeout
|
||||
)
|
||||
|
||||
def load(
|
||||
self,
|
||||
folder_ids: Optional[List[str]] = None,
|
||||
thread_ids: Optional[List[str]] = None,
|
||||
max_docs: Optional[int] = 1000,
|
||||
include_all_folders: bool = False,
|
||||
include_comments: bool = False,
|
||||
include_images: bool = False,
|
||||
) -> List[Document]:
|
||||
"""
|
||||
Args:
|
||||
:param folder_ids: List of specific folder IDs to load, defaults to None
|
||||
:param thread_ids: List of specific thread IDs to load, defaults to None
|
||||
:param max_docs: Maximum number of docs to retrieve in total, defaults 1000
|
||||
:param include_all_folders: Include all folders that your access_token
|
||||
can access, but doesn't include your private folder
|
||||
:param include_comments: Include comments, defaults to False
|
||||
:param include_images: Include images, defaults to False
|
||||
"""
|
||||
if not folder_ids and not thread_ids and not include_all_folders:
|
||||
raise ValueError(
|
||||
"Must specify at least one among `folder_ids`, `thread_ids` "
|
||||
"or set `include_all`_folders as True"
|
||||
)
|
||||
|
||||
thread_ids = thread_ids or []
|
||||
|
||||
if folder_ids:
|
||||
for folder_id in folder_ids:
|
||||
self.get_thread_ids_by_folder_id(folder_id, 0, thread_ids)
|
||||
|
||||
if include_all_folders:
|
||||
user = self.quip_client.get_authenticated_user()
|
||||
if "group_folder_ids" in user:
|
||||
self.get_thread_ids_by_folder_id(
|
||||
user["group_folder_ids"], 0, thread_ids
|
||||
)
|
||||
if "shared_folder_ids" in user:
|
||||
self.get_thread_ids_by_folder_id(
|
||||
user["shared_folder_ids"], 0, thread_ids
|
||||
)
|
||||
|
||||
thread_ids = list(set(thread_ids[:max_docs]))
|
||||
return self.process_threads(thread_ids, include_images, include_comments)
|
||||
|
||||
def get_thread_ids_by_folder_id(
|
||||
self, folder_id: str, depth: int, thread_ids: List[str]
|
||||
) -> None:
|
||||
"""Get thread ids by folder id and update in thread_ids"""
|
||||
from quip_api.quip import HTTPError, QuipError
|
||||
|
||||
try:
|
||||
folder = self.quip_client.get_folder(folder_id)
|
||||
except QuipError as e:
|
||||
if e.code == 403:
|
||||
logging.warning(
|
||||
f"depth {depth}, Skipped over restricted folder {folder_id}, {e}"
|
||||
)
|
||||
else:
|
||||
logging.warning(
|
||||
f"depth {depth}, Skipped over folder {folder_id} "
|
||||
f"due to unknown error {e.code}"
|
||||
)
|
||||
return
|
||||
except HTTPError as e:
|
||||
logging.warning(
|
||||
f"depth {depth}, Skipped over folder {folder_id} "
|
||||
f"due to HTTP error {e.code}"
|
||||
)
|
||||
return
|
||||
|
||||
title = folder["folder"].get("title", "Folder %s" % folder_id)
|
||||
|
||||
logging.info(f"depth {depth}, Processing folder {title}")
|
||||
for child in folder["children"]:
|
||||
if "folder_id" in child:
|
||||
self.get_thread_ids_by_folder_id(
|
||||
child["folder_id"], depth + 1, thread_ids
|
||||
)
|
||||
elif "thread_id" in child:
|
||||
thread_ids.append(child["thread_id"])
|
||||
|
||||
def process_threads(
|
||||
self, thread_ids: Sequence[str], include_images: bool, include_messages: bool
|
||||
) -> List[Document]:
|
||||
"""Process a list of thread into a list of documents."""
|
||||
docs = []
|
||||
for thread_id in thread_ids:
|
||||
doc = self.process_thread(thread_id, include_images, include_messages)
|
||||
if doc is not None:
|
||||
docs.append(doc)
|
||||
return docs
|
||||
|
||||
def process_thread(
|
||||
self, thread_id: str, include_images: bool, include_messages: bool
|
||||
) -> Optional[Document]:
|
||||
thread = self.quip_client.get_thread(thread_id)
|
||||
thread_id = thread["thread"]["id"]
|
||||
title = thread["thread"]["title"]
|
||||
link = thread["thread"]["link"]
|
||||
update_ts = thread["thread"]["updated_usec"]
|
||||
sanitized_title = QuipLoader._sanitize_title(title)
|
||||
|
||||
logger.info(
|
||||
f"processing thread {thread_id} title {sanitized_title} "
|
||||
f"link {link} update_ts {update_ts}"
|
||||
)
|
||||
|
||||
if "html" in thread:
|
||||
# Parse the document
|
||||
try:
|
||||
tree = self.quip_client.parse_document_html(thread["html"])
|
||||
except xml.etree.cElementTree.ParseError as e:
|
||||
logger.error(f"Error parsing thread {title} {thread_id}, skipping, {e}")
|
||||
return None
|
||||
|
||||
metadata = {
|
||||
"title": sanitized_title,
|
||||
"update_ts": update_ts,
|
||||
"id": thread_id,
|
||||
"source": link,
|
||||
}
|
||||
|
||||
# Download each image and replace with the new URL
|
||||
text = ""
|
||||
if include_images:
|
||||
text = self.process_thread_images(tree)
|
||||
|
||||
if include_messages:
|
||||
text = text + "/n" + self.process_thread_messages(thread_id)
|
||||
|
||||
return Document(
|
||||
page_content=thread["html"] + text,
|
||||
metadata=metadata,
|
||||
)
|
||||
return None
|
||||
|
||||
def process_thread_images(self, tree: ElementTree) -> str:
|
||||
text = ""
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
from pytesseract import pytesseract
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"`Pillow or pytesseract` package not found, "
|
||||
"please run "
|
||||
"`pip install Pillow` or `pip install pytesseract`"
|
||||
)
|
||||
|
||||
for img in tree.iter("img"):
|
||||
src = img.get("src")
|
||||
if not src or not src.startswith("/blob"):
|
||||
continue
|
||||
_, _, thread_id, blob_id = src.split("/")
|
||||
blob_response = self.quip_client.get_blob(thread_id, blob_id)
|
||||
try:
|
||||
image = Image.open(BytesIO(blob_response.read()))
|
||||
text = text + "\n" + pytesseract.image_to_string(image)
|
||||
except OSError as e:
|
||||
logger.error(f"failed to convert image to text, {e}")
|
||||
raise e
|
||||
return text
|
||||
|
||||
def process_thread_messages(self, thread_id: str) -> str:
|
||||
max_created_usec = None
|
||||
messages = []
|
||||
while True:
|
||||
chunk = self.quip_client.get_messages(
|
||||
thread_id, max_created_usec=max_created_usec, count=100
|
||||
)
|
||||
messages.extend(chunk)
|
||||
if chunk:
|
||||
max_created_usec = chunk[-1]["created_usec"] - 1
|
||||
else:
|
||||
break
|
||||
messages.reverse()
|
||||
|
||||
texts = [message["text"] for message in messages]
|
||||
|
||||
return "\n".join(texts)
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_title(title: str) -> str:
|
||||
sanitized_title = re.sub(r"\s", " ", title)
|
||||
sanitized_title = re.sub(r"(?u)[^- \w.]", "", sanitized_title)
|
||||
if len(sanitized_title) > _MAXIMUM_TITLE_LENGTH:
|
||||
sanitized_title = sanitized_title[:_MAXIMUM_TITLE_LENGTH]
|
||||
return sanitized_title
|
@@ -0,0 +1,179 @@
|
||||
from typing import Dict
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from langchain.document_loaders.quip import QuipLoader
|
||||
from langchain.schema import Document
|
||||
|
||||
try:
|
||||
from quip_api.quip import QuipClient # noqa: F401
|
||||
|
||||
quip_installed = True
|
||||
except ImportError:
|
||||
quip_installed = False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_quip(): # type: ignore
|
||||
# mock quip_client
|
||||
with patch("quip_api.quip.QuipClient") as mock_quip:
|
||||
yield mock_quip
|
||||
|
||||
|
||||
@pytest.mark.requires("quip_api")
|
||||
class TestQuipLoader:
|
||||
API_URL = "https://example-api.quip.com"
|
||||
DOC_URL_PREFIX = ("https://example.quip.com",)
|
||||
ACCESS_TOKEN = "api_token"
|
||||
|
||||
MOCK_FOLDER_IDS = ["ABC"]
|
||||
MOCK_THREAD_IDS = ["ABC", "DEF"]
|
||||
|
||||
def test_quip_loader_initialization(self, mock_quip: MagicMock) -> None:
|
||||
QuipLoader(self.API_URL, access_token=self.ACCESS_TOKEN, request_timeout=60)
|
||||
mock_quip.assert_called_once_with(
|
||||
access_token=self.ACCESS_TOKEN, base_url=self.API_URL, request_timeout=60
|
||||
)
|
||||
|
||||
def test_quip_loader_load_date_invalid_args(self) -> None:
|
||||
quip_loader = QuipLoader(
|
||||
self.API_URL, access_token=self.ACCESS_TOKEN, request_timeout=60
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="Must specify at least one among `folder_ids`, `thread_ids` or "
|
||||
"set `include_all`_folders as True",
|
||||
):
|
||||
quip_loader.load()
|
||||
|
||||
def test_quip_loader_load_data_by_folder_id(self, mock_quip: MagicMock) -> None:
|
||||
mock_quip.get_folder.side_effect = [
|
||||
self._get_mock_folder(self.MOCK_FOLDER_IDS[0])
|
||||
]
|
||||
mock_quip.get_thread.side_effect = [
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[0]),
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[1]),
|
||||
]
|
||||
|
||||
quip_loader = self._get_mock_quip_loader(mock_quip)
|
||||
documents = quip_loader.load(folder_ids=[self.MOCK_FOLDER_IDS[0]])
|
||||
assert mock_quip.get_folder.call_count == 1
|
||||
assert mock_quip.get_thread.call_count == 2
|
||||
assert len(documents) == 2
|
||||
assert all(isinstance(doc, Document) for doc in documents)
|
||||
assert (
|
||||
documents[0].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[0]}"
|
||||
)
|
||||
assert (
|
||||
documents[1].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[1]}"
|
||||
)
|
||||
|
||||
def test_quip_loader_load_data_all_folder(self, mock_quip: MagicMock) -> None:
|
||||
mock_quip.get_authenticated_user.side_effect = [
|
||||
self._get_mock_authenticated_user()
|
||||
]
|
||||
|
||||
mock_quip.get_folder.side_effect = [
|
||||
self._get_mock_folder(self.MOCK_FOLDER_IDS[0]),
|
||||
]
|
||||
|
||||
mock_quip.get_thread.side_effect = [
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[0]),
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[1]),
|
||||
]
|
||||
|
||||
quip_loader = self._get_mock_quip_loader(mock_quip)
|
||||
documents = quip_loader.load(include_all_folders=True)
|
||||
assert mock_quip.get_folder.call_count == 1
|
||||
assert mock_quip.get_thread.call_count == 2
|
||||
assert len(documents) == 2
|
||||
assert all(isinstance(doc, Document) for doc in documents)
|
||||
assert (
|
||||
documents[0].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[0]}"
|
||||
)
|
||||
assert (
|
||||
documents[1].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[1]}"
|
||||
)
|
||||
|
||||
def test_quip_loader_load_data_by_thread_id(self, mock_quip: MagicMock) -> None:
|
||||
mock_quip.get_thread.side_effect = [
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[0]),
|
||||
self._get_mock_thread(self.MOCK_THREAD_IDS[1]),
|
||||
]
|
||||
|
||||
quip_loader = self._get_mock_quip_loader(mock_quip)
|
||||
documents = quip_loader.load(thread_ids=self.MOCK_THREAD_IDS)
|
||||
|
||||
assert mock_quip.get_folder.call_count == 0
|
||||
assert mock_quip.get_thread.call_count == 2
|
||||
assert len(documents) == 2
|
||||
assert all(isinstance(doc, Document) for doc in documents)
|
||||
assert (
|
||||
documents[0].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[0]}"
|
||||
)
|
||||
assert (
|
||||
documents[1].metadata.get("source")
|
||||
== f"https://example.quip.com/{self.MOCK_THREAD_IDS[1]}"
|
||||
)
|
||||
|
||||
def _get_mock_quip_loader(self, mock_quip: MagicMock) -> QuipLoader:
|
||||
quip_loader = QuipLoader(
|
||||
self.API_URL, access_token=self.ACCESS_TOKEN, request_timeout=60
|
||||
)
|
||||
quip_loader.quip_client = mock_quip
|
||||
return quip_loader
|
||||
|
||||
def _get_mock_folder(self, folder_id: str) -> Dict:
|
||||
return {
|
||||
"folder": {
|
||||
"title": "runbook",
|
||||
"creator_id": "testing",
|
||||
"folder_type": "shared",
|
||||
"parent_id": "ABCD",
|
||||
"inherit_mode": "inherit",
|
||||
"color": "manila",
|
||||
"id": f"{folder_id}",
|
||||
"created_usec": 1668405728528904,
|
||||
"updated_usec": 1697356632672453,
|
||||
"link": "https://example.quip.com/YPH9OAR2Eu5",
|
||||
},
|
||||
"member_ids": [],
|
||||
"children": [
|
||||
{"thread_id": "ABC"},
|
||||
{"thread_id": "DEF"},
|
||||
],
|
||||
}
|
||||
|
||||
def _get_mock_thread(self, thread_id: str) -> Dict:
|
||||
return {
|
||||
"thread": {
|
||||
"author_id": "testing",
|
||||
"thread_class": "document",
|
||||
"owning_company_id": "ABC",
|
||||
"id": f"{thread_id}",
|
||||
"created_usec": 1690873126670055,
|
||||
"updated_usec": 1690874891638991,
|
||||
"title": f"Unit Test Doc {thread_id}",
|
||||
"link": f"https://example.quip.com/{thread_id}",
|
||||
"document_id": "ABC",
|
||||
"type": "document",
|
||||
"is_template": False,
|
||||
"is_deleted": False,
|
||||
},
|
||||
"user_ids": [],
|
||||
"shared_folder_ids": ["ABC"],
|
||||
"expanded_user_ids": ["ABCDEFG"],
|
||||
"invited_user_emails": [],
|
||||
"access_levels": {"ABCD": {"access_level": "OWN"}},
|
||||
"html": "<h1 id='temp:C:ABCD'>How to write Python Test </h1>",
|
||||
}
|
||||
|
||||
def _get_mock_authenticated_user(self) -> Dict:
|
||||
return {"shared_folder_ids": self.MOCK_FOLDER_IDS, "id": "Test"}
|
Reference in New Issue
Block a user