Enhance metadata of sharepointLoader. (#22248)

Description: 2 feature flags added to SharePointLoader in this PR:

1. load_auth: if set to True, adds authorised identities to metadata
2. load_extended_metadata, adds source, owner and full_path to metadata

Unit tests:N/A
Documentation: To be done.

---------

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
This commit is contained in:
Rahul Triptahi 2024-06-22 05:33:38 +05:30 committed by GitHub
parent 5d4133d82f
commit 0cd3f93361
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 134 additions and 33 deletions

View File

@ -124,6 +124,7 @@ class O365BaseLoader(BaseLoader, BaseModel):
"created_by": str(file.created_by), "created_by": str(file.created_by),
"modified_by": str(file.modified_by), "modified_by": str(file.modified_by),
"description": file.description, "description": file.description,
"id": str(file.object_id),
} }
loader = FileSystemBlobLoader(path=temp_dir) loader = FileSystemBlobLoader(path=temp_dir)
@ -157,6 +158,7 @@ class O365BaseLoader(BaseLoader, BaseModel):
the files loaded from the drive using the specified object_ids. the files loaded from the drive using the specified object_ids.
""" """
file_mime_types = self._fetch_mime_types file_mime_types = self._fetch_mime_types
metadata_dict: Dict[str, Dict[str, Any]] = {}
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
for object_id in object_ids: for object_id in object_ids:
file = drive.get_item(object_id) file = drive.get_item(object_id)
@ -169,8 +171,25 @@ class O365BaseLoader(BaseLoader, BaseModel):
if file.is_file: if file.is_file:
if file.mime_type in list(file_mime_types.values()): if file.mime_type in list(file_mime_types.values()):
file.download(to_path=temp_dir, chunk_size=self.chunk_size) file.download(to_path=temp_dir, chunk_size=self.chunk_size)
metadata_dict[file.name] = {
"source": file.web_url,
"mime_type": file.mime_type,
"created": file.created,
"modified": file.modified,
"created_by": str(file.created_by),
"modified_by": str(file.modified_by),
"description": file.description,
"id": str(file.object_id),
}
loader = FileSystemBlobLoader(path=temp_dir) loader = FileSystemBlobLoader(path=temp_dir)
yield from loader.yield_blobs() for blob in loader.yield_blobs():
if not isinstance(blob.path, PurePath):
raise NotImplementedError("Expected blob path to be a PurePath")
if blob.path:
file_metadata_ = metadata_dict.get(str(blob.path.name), {})
blob.metadata.update(file_metadata_)
yield blob
def _auth(self) -> Account: def _auth(self) -> Account:
"""Authenticates the OneDrive API client """Authenticates the OneDrive API client

View File

@ -6,7 +6,7 @@ import json
from pathlib import Path from pathlib import Path
from typing import Any, Iterator, List, Optional, Sequence from typing import Any, Iterator, List, Optional, Sequence
import requests import requests # type: ignore
from langchain_core.document_loaders import BaseLoader from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document from langchain_core.documents import Document
from langchain_core.pydantic_v1 import Field from langchain_core.pydantic_v1 import Field
@ -37,19 +37,31 @@ class SharePointLoader(O365BaseLoader, BaseLoader):
""" The ID of the file for which we need auth identities""" """ The ID of the file for which we need auth identities"""
site_id: Optional[str] = None site_id: Optional[str] = None
""" The ID of the Sharepoint site of the user where the file is present """ """ The ID of the Sharepoint site of the user where the file is present """
load_extended_metadata: Optional[bool] = False
""" Whether to load extended metadata. Size, Owner and full_path."""
@property @property
def _file_types(self) -> Sequence[_FileType]: def _file_types(self) -> Sequence[_FileType]:
"""Return supported file types.""" """Return supported file types.
Returns:
A sequence of supported file types.
"""
return _FileType.DOC, _FileType.DOCX, _FileType.PDF return _FileType.DOC, _FileType.DOCX, _FileType.PDF
@property @property
def _scopes(self) -> List[str]: def _scopes(self) -> List[str]:
"""Return required scopes.""" """Return required scopes.
Returns:
List[str]: A list of required scopes.
"""
return ["sharepoint", "basic"] return ["sharepoint", "basic"]
def lazy_load(self) -> Iterator[Document]: def lazy_load(self) -> Iterator[Document]:
"""Load documents lazily. Use this when working at a large scale.""" """
Load documents lazily. Use this when working at a large scale.
Yields:
Document: A document object representing the parsed blob.
"""
try: try:
from O365.drive import Drive, Folder from O365.drive import Drive, Folder
except ImportError: except ImportError:
@ -65,22 +77,47 @@ class SharePointLoader(O365BaseLoader, BaseLoader):
if not isinstance(target_folder, Folder): if not isinstance(target_folder, Folder):
raise ValueError(f"There isn't a folder with path {self.folder_path}.") raise ValueError(f"There isn't a folder with path {self.folder_path}.")
for blob in self._load_from_folder(target_folder): for blob in self._load_from_folder(target_folder):
file_id = str(blob.metadata.get("id"))
if self.load_auth is True: if self.load_auth is True:
for parsed_blob in blob_parser.lazy_parse(blob): auth_identities = self.authorized_identities(file_id)
auth_identities = self.authorized_identities() if self.load_extended_metadata is True:
extended_metadata = self.get_extended_metadata(file_id)
for parsed_blob in blob_parser.lazy_parse(blob):
if self.load_auth is True:
parsed_blob.metadata["authorized_identities"] = auth_identities parsed_blob.metadata["authorized_identities"] = auth_identities
yield parsed_blob if self.load_extended_metadata is True:
else: parsed_blob.metadata.update(extended_metadata)
yield from blob_parser.lazy_parse(blob) yield parsed_blob
if self.folder_id: if self.folder_id:
target_folder = drive.get_item(self.folder_id) target_folder = drive.get_item(self.folder_id)
if not isinstance(target_folder, Folder): if not isinstance(target_folder, Folder):
raise ValueError(f"There isn't a folder with path {self.folder_path}.") raise ValueError(f"There isn't a folder with path {self.folder_path}.")
for blob in self._load_from_folder(target_folder): for blob in self._load_from_folder(target_folder):
yield from blob_parser.lazy_parse(blob) file_id = str(blob.metadata.get("id"))
if self.load_auth is True:
auth_identities = self.authorized_identities(file_id)
if self.load_extended_metadata is True:
extended_metadata = self.get_extended_metadata(file_id)
for parsed_blob in blob_parser.lazy_parse(blob):
if self.load_auth is True:
parsed_blob.metadata["authorized_identities"] = auth_identities
if self.load_extended_metadata is True:
parsed_blob.metadata.update(extended_metadata)
yield parsed_blob
if self.object_ids: if self.object_ids:
for blob in self._load_from_object_ids(drive, self.object_ids): for blob in self._load_from_object_ids(drive, self.object_ids):
yield from blob_parser.lazy_parse(blob) file_id = str(blob.metadata.get("id"))
if self.load_auth is True:
auth_identities = self.authorized_identities(file_id)
if self.load_extended_metadata is True:
extended_metadata = self.get_extended_metadata(file_id)
for parsed_blob in blob_parser.lazy_parse(blob):
if self.load_auth is True:
parsed_blob.metadata["authorized_identities"] = auth_identities
if self.load_extended_metadata is True:
parsed_blob.metadata.update(extended_metadata)
yield parsed_blob
if not (self.folder_path or self.folder_id or self.object_ids): if not (self.folder_path or self.folder_id or self.object_ids):
target_folder = drive.get_root_folder() target_folder = drive.get_root_folder()
if not isinstance(target_folder, Folder): if not isinstance(target_folder, Folder):
@ -90,38 +127,83 @@ class SharePointLoader(O365BaseLoader, BaseLoader):
blob_part.metadata.update(blob.metadata) blob_part.metadata.update(blob.metadata)
yield blob_part yield blob_part
def authorized_identities(self) -> List: def authorized_identities(self, file_id: str) -> List:
"""
Retrieve the access identities (user/group emails) for a given file.
Args:
file_id (str): The ID of the file.
Returns:
List: A list of group names (email addresses) that have
access to the file.
"""
data = self._fetch_access_token() data = self._fetch_access_token()
access_token = data.get("access_token") access_token = data.get("access_token")
url = ( url = (
f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/" "https://graph.microsoft.com/v1.0/drives"
f"drives/{self.document_library_id}/items/{self.file_id}/permissions" f"/{self.document_library_id}/items/{file_id}/permissions"
) )
headers = {"Authorization": f"Bearer {access_token}"} headers = {"Authorization": f"Bearer {access_token}"}
response = requests.request("GET", url, headers=headers, data={}) response = requests.request("GET", url, headers=headers)
groups_list = response.json() access_list = response.json()
group_names = [] group_names = []
for group_data in groups_list.get("value"): for access_data in access_list.get("value"):
if group_data.get("grantedToV2"): if access_data.get("grantedToV2"):
if group_data.get("grantedToV2").get("siteGroup"): site_data = (
site_data = group_data.get("grantedToV2").get("siteGroup") (access_data.get("grantedToV2").get("siteUser"))
# print(group_data) or (access_data.get("grantedToV2").get("user"))
group_names.append(site_data.get("displayName")) or (access_data.get("grantedToV2").get("group"))
elif group_data.get("grantedToV2").get("group") or ( )
group_data.get("grantedToV2").get("user") if site_data:
): email = site_data.get("email")
site_data = group_data.get("grantedToV2").get("group") or ( if email:
group_data.get("grantedToV2").get("user") group_names.append(email)
)
# print(group_data)
group_names.append(site_data.get("displayName"))
return group_names return group_names
def _fetch_access_token(self) -> Any: def _fetch_access_token(self) -> Any:
with open(self.token_path) as f: """
Fetch the access token from the token file.
Returns:
The access token as a dictionary.
"""
with open(self.token_path, encoding="utf-8") as f:
s = f.read() s = f.read()
data = json.loads(s) data = json.loads(s)
return data return data
def get_extended_metadata(self, file_id: str) -> dict:
"""
Retrieve extended metadata for a file in SharePoint.
As of today, following fields are supported in the extended metadata:
- size: size of the source file.
- owner: display name of the owner of the source file.
- full_path: pretty human readable path of the source file.
Args:
file_id (str): The ID of the file.
Returns:
dict: A dictionary containing the extended metadata of the file,
including size, owner, and full path.
"""
data = self._fetch_access_token()
access_token = data.get("access_token")
url = (
"https://graph.microsoft.com/v1.0/drives/"
f"{self.document_library_id}/items/{file_id}"
"?$select=size,createdBy,parentReference,name"
)
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.request("GET", url, headers=headers)
metadata = response.json()
staged_metadata = {
"size": metadata.get("size", 0),
"owner": metadata.get("createdBy", {})
.get("user", {})
.get("displayName", ""),
"full_path": metadata.get("parentReference", {})
.get("path", "")
.split(":")[-1]
+ "/"
+ metadata.get("name", ""),
}
return staged_metadata