Add Support for Loading Documents from Huawei OBS (#8573)

Description:
This PR adds support for loading documents from Huawei OBS (Object
Storage Service) in Langchain. OBS is a cloud-based object storage
service provided by Huawei Cloud. With this enhancement, Langchain users
can now easily access and load documents stored in Huawei OBS directly
into the system.

Key Changes:
- Added a new document loader module specifically for Huawei OBS
integration.
- Implemented the necessary logic to authenticate and connect to Huawei
OBS using access credentials.
- Enabled the loading of individual documents from a specified bucket
and object key in Huawei OBS.
- Provided the option to specify custom authentication information or
obtain security tokens from Huawei Cloud ECS for easy access.

How to Test:
1. Ensure the required package "esdk-obs-python" is installed.
2. Configure the endpoint, access key, secret key, and bucket details
for Huawei OBS in the Langchain settings.
3. Load documents from Huawei OBS using the updated document loader
module.
4. Verify that documents are successfully retrieved and loaded into
Langchain for further processing.

Please review this PR and let us know if any further improvements are
needed. Your feedback is highly appreciated!

@rlancemartin, @eyurtsev

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
mpb159753
2023-08-02 00:30:30 +08:00
committed by GitHub
parent ed9a0f8185
commit 7df2dfc4c2
5 changed files with 548 additions and 0 deletions

View File

@@ -98,6 +98,8 @@ from langchain.document_loaders.modern_treasury import ModernTreasuryLoader
from langchain.document_loaders.notebook import NotebookLoader
from langchain.document_loaders.notion import NotionDirectoryLoader
from langchain.document_loaders.notiondb import NotionDBLoader
from langchain.document_loaders.obs_directory import OBSDirectoryLoader
from langchain.document_loaders.obs_file import OBSFileLoader
from langchain.document_loaders.obsidian import ObsidianLoader
from langchain.document_loaders.odt import UnstructuredODTLoader
from langchain.document_loaders.onedrive import OneDriveLoader
@@ -251,6 +253,8 @@ __all__ = [
"NotebookLoader",
"NotionDBLoader",
"NotionDirectoryLoader",
"OBSDirectoryLoader",
"OBSFileLoader",
"ObsidianLoader",
"OneDriveFileLoader",
"OneDriveLoader",

View File

@@ -0,0 +1,82 @@
# coding:utf-8
from typing import List, Optional
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.obs_file import OBSFileLoader
class OBSDirectoryLoader(BaseLoader):
"""Loading logic for loading documents from Huawei OBS."""
def __init__(
self,
bucket: str,
endpoint: str,
config: Optional[dict] = None,
prefix: str = "",
):
"""Initialize the OBSDirectoryLoader with the specified settings.
Args:
bucket (str): The name of the OBS bucket to be used.
endpoint (str): The endpoint URL of your OBS bucket.
config (dict): The parameters for connecting to OBS, provided as a dictionary. The dictionary could have the following keys:
- "ak" (str, optional): Your OBS access key (required if `get_token_from_ecs` is False and bucket policy is not public read).
- "sk" (str, optional): Your OBS secret key (required if `get_token_from_ecs` is False and bucket policy is not public read).
- "token" (str, optional): Your security token (required if using temporary credentials).
- "get_token_from_ecs" (bool, optional): Whether to retrieve the security token from ECS. Defaults to False if not provided. If set to True, `ak`, `sk`, and `token` will be ignored.
prefix (str, optional): The prefix to be added to the OBS key. Defaults to "".
Note:
Before using this class, make sure you have registered with OBS and have the necessary credentials. The `ak`, `sk`, and `endpoint` values are mandatory unless `get_token_from_ecs` is True or the bucket policy is public read. `token` is required when using temporary credentials.
Example:
To create a new OBSDirectoryLoader:
```
config = {
"ak": "your-access-key",
"sk": "your-secret-key"
}
```
directory_loader = OBSDirectoryLoader("your-bucket-name", "your-end-endpoint", config, "your-prefix")
""" # noqa: E501
try:
from obs import ObsClient
except ImportError:
raise ValueError(
"Could not import esdk-obs-python python package. "
"Please install it with `pip install esdk-obs-python`."
)
if not config:
config = dict()
if config.get("get_token_from_ecs"):
self.client = ObsClient(server=endpoint, security_provider_policy="ECS")
else:
self.client = ObsClient(
access_key_id=config.get("ak"),
secret_access_key=config.get("sk"),
security_token=config.get("token"),
server=endpoint,
)
self.bucket = bucket
self.prefix = prefix
def load(self) -> List[Document]:
"""Load documents."""
max_num = 1000
mark = None
docs = []
while True:
resp = self.client.listObjects(
self.bucket, prefix=self.prefix, marker=mark, max_keys=max_num
)
if resp.status < 300:
for content in resp.body.contents:
loader = OBSFileLoader(self.bucket, content.key, client=self.client)
docs.extend(loader.load())
if resp.body.is_truncated is True:
mark = resp.body.next_marker
else:
break
return docs

View File

@@ -0,0 +1,104 @@
# coding:utf-8
import os
import tempfile
from typing import Any, List, Optional
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.unstructured import UnstructuredFileLoader
class OBSFileLoader(BaseLoader):
"""Loader for Huawei OBS file."""
def __init__(
self,
bucket: str,
key: str,
client: Any = None,
endpoint: str = "",
config: Optional[dict] = None,
) -> None:
"""Initialize the OBSFileLoader with the specified settings.
Args:
bucket (str): The name of the OBS bucket to be used.
key (str): The name of the object in the OBS bucket.
client (ObsClient, optional): An instance of the ObsClient to connect to OBS.
endpoint (str, optional): The endpoint URL of your OBS bucket. This parameter is mandatory if `client` is not provided.
config (dict, optional): The parameters for connecting to OBS, provided as a dictionary. This parameter is ignored if `client` is provided. The dictionary could have the following keys:
- "ak" (str, optional): Your OBS access key (required if `get_token_from_ecs` is False and bucket policy is not public read).
- "sk" (str, optional): Your OBS secret key (required if `get_token_from_ecs` is False and bucket policy is not public read).
- "token" (str, optional): Your security token (required if using temporary credentials).
- "get_token_from_ecs" (bool, optional): Whether to retrieve the security token from ECS. Defaults to False if not provided. If set to True, `ak`, `sk`, and `token` will be ignored.
Raises:
ValueError: If the `esdk-obs-python` package is not installed.
TypeError: If the provided `client` is not an instance of ObsClient.
ValueError: If `client` is not provided, but `endpoint` is missing.
Note:
Before using this class, make sure you have registered with OBS and have the necessary credentials. The `ak`, `sk`, and `endpoint` values are mandatory unless `get_token_from_ecs` is True or the bucket policy is public read. `token` is required when using temporary credentials.
Example:
To create a new OBSFileLoader with a new client:
```
config = {
"ak": "your-access-key",
"sk": "your-secret-key"
}
obs_loader = OBSFileLoader("your-bucket-name", "your-object-key", config=config)
```
To create a new OBSFileLoader with an existing client:
```
from obs import ObsClient
# Assuming you have an existing ObsClient object 'obs_client'
obs_loader = OBSFileLoader("your-bucket-name", "your-object-key", client=obs_client)
```
To create a new OBSFileLoader without an existing client:
```
obs_loader = OBSFileLoader("your-bucket-name", "your-object-key", endpoint="your-endpoint-url")
```
""" # noqa: E501
try:
from obs import ObsClient
except ImportError:
raise ValueError(
"Could not import esdk-obs-python python package. "
"Please install it with `pip install esdk-obs-python`."
)
if not client:
if not endpoint:
raise ValueError("Either OBSClient or endpoint must be provided.")
if not config:
config = dict()
if config.get("get_token_from_ecs"):
client = ObsClient(server=endpoint, security_provider_policy="ECS")
else:
client = ObsClient(
access_key_id=config.get("ak"),
secret_access_key=config.get("sk"),
security_token=config.get("token"),
server=endpoint,
)
if not isinstance(client, ObsClient):
raise TypeError("Client must be ObsClient type")
self.client = client
self.bucket = bucket
self.key = key
def load(self) -> List[Document]:
"""Load documents."""
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{self.bucket}/{self.key}"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Download the file to a destination
self.client.downloadFile(
bucketName=self.bucket, objectKey=self.key, downloadFile=file_path
)
loader = UnstructuredFileLoader(file_path)
return loader.load()