core: Move document loader interfaces to core (#17723)

This is needed to be able to move document loaders to partner packages.

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Christophe Bornet 2024-03-06 19:59:00 +01:00 committed by GitHub
parent 97de498d39
commit ea141511d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 356 additions and 313 deletions

View File

@ -1,119 +1,6 @@
"""Abstract interface for document loader implementations.""" from langchain_core.document_loaders import BaseBlobParser, BaseLoader
from __future__ import annotations
from abc import ABC, abstractmethod __all__ = [
from typing import TYPE_CHECKING, AsyncIterator, Iterator, List, Optional "BaseBlobParser",
"BaseLoader",
from langchain_core.documents import Document ]
from langchain_core.runnables import run_in_executor
if TYPE_CHECKING:
from langchain_text_splitters import TextSplitter
from langchain_community.document_loaders.blob_loaders import Blob
class BaseLoader(ABC):
"""Interface for Document Loader.
Implementations should implement the lazy-loading method using generators
to avoid loading all Documents into memory at once.
`load` is provided just for user convenience and should not be overridden.
"""
# Sub-classes should not implement this method directly. Instead, they
# should implement the lazy load method.
def load(self) -> List[Document]:
"""Load data into Document objects."""
return list(self.lazy_load())
def load_and_split(
self, text_splitter: Optional[TextSplitter] = None
) -> List[Document]:
"""Load Documents and split into chunks. Chunks are returned as Documents.
Do not override this method. It should be considered to be deprecated!
Args:
text_splitter: TextSplitter instance to use for splitting documents.
Defaults to RecursiveCharacterTextSplitter.
Returns:
List of Documents.
"""
if text_splitter is None:
try:
from langchain_text_splitters import RecursiveCharacterTextSplitter
except ImportError as e:
raise ImportError(
"Unable to import from langchain_text_splitters. Please specify "
"text_splitter or install langchain_text_splitters with "
"`pip install -U langchain-text-splitters`."
) from e
_text_splitter: TextSplitter = RecursiveCharacterTextSplitter()
else:
_text_splitter = text_splitter
docs = self.load()
return _text_splitter.split_documents(docs)
# Attention: This method will be upgraded into an abstractmethod once it's
# implemented in all the existing subclasses.
def lazy_load(self) -> Iterator[Document]:
"""A lazy loader for Documents."""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement lazy_load()"
)
async def alazy_load(self) -> AsyncIterator[Document]:
"""A lazy loader for Documents."""
iterator = await run_in_executor(None, self.lazy_load)
done = object()
while True:
doc = await run_in_executor(None, next, iterator, done) # type: ignore[call-arg, arg-type]
if doc is done:
break
yield doc # type: ignore[misc]
class BaseBlobParser(ABC):
"""Abstract interface for blob parsers.
A blob parser provides a way to parse raw data stored in a blob into one
or more documents.
The parser can be composed with blob loaders, making it easy to reuse
a parser independent of how the blob was originally loaded.
"""
@abstractmethod
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Lazy parsing interface.
Subclasses are required to implement this method.
Args:
blob: Blob instance
Returns:
Generator of documents
"""
def parse(self, blob: Blob) -> List[Document]:
"""Eagerly parse the blob into a document or documents.
This is a convenience method for interactive development environment.
Production applications should favor the lazy_parse method instead.
Subclasses should generally not over-ride this parse method.
Args:
blob: Blob instance
Returns:
List of documents
"""
return list(self.lazy_parse(blob))

View File

@ -1,195 +1,7 @@
"""Schema for Blobs and Blob Loaders. from langchain_core.document_loaders.blob_loaders import Blob, BlobLoader, PathLike
The goal is to facilitate decoupling of content loading from content parsing code. __all__ = [
"Blob",
In addition, content loading code should provide a lazy loading interface by default. "BlobLoader",
""" "PathLike",
from __future__ import annotations ]
import contextlib
import mimetypes
from abc import ABC, abstractmethod
from io import BufferedReader, BytesIO
from pathlib import PurePath
from typing import Any, Dict, Generator, Iterable, Mapping, Optional, Union, cast
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
PathLike = Union[str, PurePath]
class Blob(BaseModel):
"""Blob represents raw data by either reference or value.
Provides an interface to materialize the blob in different representations, and
help to decouple the development of data loaders from the downstream parsing of
the raw data.
Inspired by: https://developer.mozilla.org/en-US/docs/Web/API/Blob
"""
data: Union[bytes, str, None]
"""Raw data associated with the blob."""
mimetype: Optional[str] = None
"""MimeType not to be confused with a file extension."""
encoding: str = "utf-8"
"""Encoding to use if decoding the bytes into a string.
Use utf-8 as default encoding, if decoding to string.
"""
path: Optional[PathLike] = None
"""Location where the original content was found."""
metadata: Dict[str, Any] = Field(default_factory=dict)
"""Metadata about the blob (e.g., source)"""
class Config:
arbitrary_types_allowed = True
frozen = True
@property
def source(self) -> Optional[str]:
"""The source location of the blob as string if known otherwise none.
If a path is associated with the blob, it will default to the path location.
Unless explicitly set via a metadata field called "source", in which
case that value will be used instead.
"""
if self.metadata and "source" in self.metadata:
return cast(Optional[str], self.metadata["source"])
return str(self.path) if self.path else None
@root_validator(pre=True)
def check_blob_is_valid(cls, values: Mapping[str, Any]) -> Mapping[str, Any]:
"""Verify that either data or path is provided."""
if "data" not in values and "path" not in values:
raise ValueError("Either data or path must be provided")
return values
def as_string(self) -> str:
"""Read data as a string."""
if self.data is None and self.path:
with open(str(self.path), "r", encoding=self.encoding) as f:
return f.read()
elif isinstance(self.data, bytes):
return self.data.decode(self.encoding)
elif isinstance(self.data, str):
return self.data
else:
raise ValueError(f"Unable to get string for blob {self}")
def as_bytes(self) -> bytes:
"""Read data as bytes."""
if isinstance(self.data, bytes):
return self.data
elif isinstance(self.data, str):
return self.data.encode(self.encoding)
elif self.data is None and self.path:
with open(str(self.path), "rb") as f:
return f.read()
else:
raise ValueError(f"Unable to get bytes for blob {self}")
@contextlib.contextmanager
def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
"""Read data as a byte stream."""
if isinstance(self.data, bytes):
yield BytesIO(self.data)
elif self.data is None and self.path:
with open(str(self.path), "rb") as f:
yield f
else:
raise NotImplementedError(f"Unable to convert blob {self}")
@classmethod
def from_path(
cls,
path: PathLike,
*,
encoding: str = "utf-8",
mime_type: Optional[str] = None,
guess_type: bool = True,
metadata: Optional[dict] = None,
) -> Blob:
"""Load the blob from a path like object.
Args:
path: path like object to file to be read
encoding: Encoding to use if decoding the bytes into a string
mime_type: if provided, will be set as the mime-type of the data
guess_type: If True, the mimetype will be guessed from the file extension,
if a mime-type was not provided
metadata: Metadata to associate with the blob
Returns:
Blob instance
"""
if mime_type is None and guess_type:
_mimetype = mimetypes.guess_type(path)[0] if guess_type else None
else:
_mimetype = mime_type
# We do not load the data immediately, instead we treat the blob as a
# reference to the underlying data.
return cls(
data=None,
mimetype=_mimetype,
encoding=encoding,
path=path,
metadata=metadata if metadata is not None else {},
)
@classmethod
def from_data(
cls,
data: Union[str, bytes],
*,
encoding: str = "utf-8",
mime_type: Optional[str] = None,
path: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Blob:
"""Initialize the blob from in-memory data.
Args:
data: the in-memory data associated with the blob
encoding: Encoding to use if decoding the bytes into a string
mime_type: if provided, will be set as the mime-type of the data
path: if provided, will be set as the source from which the data came
metadata: Metadata to associate with the blob
Returns:
Blob instance
"""
return cls(
data=data,
mimetype=mime_type,
encoding=encoding,
path=path,
metadata=metadata if metadata is not None else {},
)
def __repr__(self) -> str:
"""Define the blob representation."""
str_repr = f"Blob {id(self)}"
if self.source:
str_repr += f" {self.source}"
return str_repr
class BlobLoader(ABC):
"""Abstract interface for blob loaders implementation.
Implementer should be able to load raw content from a storage system according
to some criteria and return the raw content lazily as a stream of blobs.
"""
@abstractmethod
def yield_blobs(
self,
) -> Iterable[Blob]:
"""A lazy loader for raw data represented by LangChain's Blob object.
Returns:
A generator over blobs
"""

View File

@ -0,0 +1,10 @@
from langchain_core.document_loaders.base import BaseBlobParser, BaseLoader
from langchain_core.document_loaders.blob_loaders import Blob, BlobLoader, PathLike
__all__ = [
"BaseBlobParser",
"BaseLoader",
"Blob",
"BlobLoader",
"PathLike",
]

View File

@ -0,0 +1,119 @@
"""Abstract interface for document loader implementations."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AsyncIterator, Iterator, List, Optional
from langchain_core.documents import Document
from langchain_core.runnables import run_in_executor
if TYPE_CHECKING:
from langchain_text_splitters import TextSplitter
from langchain_core.document_loaders.blob_loaders import Blob
class BaseLoader(ABC):
"""Interface for Document Loader.
Implementations should implement the lazy-loading method using generators
to avoid loading all Documents into memory at once.
`load` is provided just for user convenience and should not be overridden.
"""
# Sub-classes should not implement this method directly. Instead, they
# should implement the lazy load method.
def load(self) -> List[Document]:
"""Load data into Document objects."""
return list(self.lazy_load())
def load_and_split(
self, text_splitter: Optional[TextSplitter] = None
) -> List[Document]:
"""Load Documents and split into chunks. Chunks are returned as Documents.
Do not override this method. It should be considered to be deprecated!
Args:
text_splitter: TextSplitter instance to use for splitting documents.
Defaults to RecursiveCharacterTextSplitter.
Returns:
List of Documents.
"""
if text_splitter is None:
try:
from langchain_text_splitters import RecursiveCharacterTextSplitter
except ImportError as e:
raise ImportError(
"Unable to import from langchain_text_splitters. Please specify "
"text_splitter or install langchain_text_splitters with "
"`pip install -U langchain-text-splitters`."
) from e
_text_splitter: TextSplitter = RecursiveCharacterTextSplitter()
else:
_text_splitter = text_splitter
docs = self.load()
return _text_splitter.split_documents(docs)
# Attention: This method will be upgraded into an abstractmethod once it's
# implemented in all the existing subclasses.
def lazy_load(self) -> Iterator[Document]:
"""A lazy loader for Documents."""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement lazy_load()"
)
async def alazy_load(self) -> AsyncIterator[Document]:
"""A lazy loader for Documents."""
iterator = await run_in_executor(None, self.lazy_load)
done = object()
while True:
doc = await run_in_executor(None, next, iterator, done) # type: ignore[call-arg, arg-type]
if doc is done:
break
yield doc # type: ignore[misc]
class BaseBlobParser(ABC):
"""Abstract interface for blob parsers.
A blob parser provides a way to parse raw data stored in a blob into one
or more documents.
The parser can be composed with blob loaders, making it easy to reuse
a parser independent of how the blob was originally loaded.
"""
@abstractmethod
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Lazy parsing interface.
Subclasses are required to implement this method.
Args:
blob: Blob instance
Returns:
Generator of documents
"""
def parse(self, blob: Blob) -> List[Document]:
"""Eagerly parse the blob into a document or documents.
This is a convenience method for interactive development environment.
Production applications should favor the lazy_parse method instead.
Subclasses should generally not over-ride this parse method.
Args:
blob: Blob instance
Returns:
List of documents
"""
return list(self.lazy_parse(blob))

View File

@ -0,0 +1,195 @@
"""Schema for Blobs and Blob Loaders.
The goal is to facilitate decoupling of content loading from content parsing code.
In addition, content loading code should provide a lazy loading interface by default.
"""
from __future__ import annotations
import contextlib
import mimetypes
from abc import ABC, abstractmethod
from io import BufferedReader, BytesIO
from pathlib import PurePath
from typing import Any, Dict, Generator, Iterable, Mapping, Optional, Union, cast
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
PathLike = Union[str, PurePath]
class Blob(BaseModel):
"""Blob represents raw data by either reference or value.
Provides an interface to materialize the blob in different representations, and
help to decouple the development of data loaders from the downstream parsing of
the raw data.
Inspired by: https://developer.mozilla.org/en-US/docs/Web/API/Blob
"""
data: Union[bytes, str, None]
"""Raw data associated with the blob."""
mimetype: Optional[str] = None
"""MimeType not to be confused with a file extension."""
encoding: str = "utf-8"
"""Encoding to use if decoding the bytes into a string.
Use utf-8 as default encoding, if decoding to string.
"""
path: Optional[PathLike] = None
"""Location where the original content was found."""
metadata: Dict[str, Any] = Field(default_factory=dict)
"""Metadata about the blob (e.g., source)"""
class Config:
arbitrary_types_allowed = True
frozen = True
@property
def source(self) -> Optional[str]:
"""The source location of the blob as string if known otherwise none.
If a path is associated with the blob, it will default to the path location.
Unless explicitly set via a metadata field called "source", in which
case that value will be used instead.
"""
if self.metadata and "source" in self.metadata:
return cast(Optional[str], self.metadata["source"])
return str(self.path) if self.path else None
@root_validator(pre=True)
def check_blob_is_valid(cls, values: Mapping[str, Any]) -> Mapping[str, Any]:
"""Verify that either data or path is provided."""
if "data" not in values and "path" not in values:
raise ValueError("Either data or path must be provided")
return values
def as_string(self) -> str:
"""Read data as a string."""
if self.data is None and self.path:
with open(str(self.path), "r", encoding=self.encoding) as f:
return f.read()
elif isinstance(self.data, bytes):
return self.data.decode(self.encoding)
elif isinstance(self.data, str):
return self.data
else:
raise ValueError(f"Unable to get string for blob {self}")
def as_bytes(self) -> bytes:
"""Read data as bytes."""
if isinstance(self.data, bytes):
return self.data
elif isinstance(self.data, str):
return self.data.encode(self.encoding)
elif self.data is None and self.path:
with open(str(self.path), "rb") as f:
return f.read()
else:
raise ValueError(f"Unable to get bytes for blob {self}")
@contextlib.contextmanager
def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
"""Read data as a byte stream."""
if isinstance(self.data, bytes):
yield BytesIO(self.data)
elif self.data is None and self.path:
with open(str(self.path), "rb") as f:
yield f
else:
raise NotImplementedError(f"Unable to convert blob {self}")
@classmethod
def from_path(
cls,
path: PathLike,
*,
encoding: str = "utf-8",
mime_type: Optional[str] = None,
guess_type: bool = True,
metadata: Optional[dict] = None,
) -> Blob:
"""Load the blob from a path like object.
Args:
path: path like object to file to be read
encoding: Encoding to use if decoding the bytes into a string
mime_type: if provided, will be set as the mime-type of the data
guess_type: If True, the mimetype will be guessed from the file extension,
if a mime-type was not provided
metadata: Metadata to associate with the blob
Returns:
Blob instance
"""
if mime_type is None and guess_type:
_mimetype = mimetypes.guess_type(path)[0] if guess_type else None
else:
_mimetype = mime_type
# We do not load the data immediately, instead we treat the blob as a
# reference to the underlying data.
return cls(
data=None,
mimetype=_mimetype,
encoding=encoding,
path=path,
metadata=metadata if metadata is not None else {},
)
@classmethod
def from_data(
cls,
data: Union[str, bytes],
*,
encoding: str = "utf-8",
mime_type: Optional[str] = None,
path: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Blob:
"""Initialize the blob from in-memory data.
Args:
data: the in-memory data associated with the blob
encoding: Encoding to use if decoding the bytes into a string
mime_type: if provided, will be set as the mime-type of the data
path: if provided, will be set as the source from which the data came
metadata: Metadata to associate with the blob
Returns:
Blob instance
"""
return cls(
data=data,
mimetype=mime_type,
encoding=encoding,
path=path,
metadata=metadata if metadata is not None else {},
)
def __repr__(self) -> str:
"""Define the blob representation."""
str_repr = f"Blob {id(self)}"
if self.source:
str_repr += f" {self.source}"
return str_repr
class BlobLoader(ABC):
"""Abstract interface for blob loaders implementation.
Implementer should be able to load raw content from a storage system according
to some criteria and return the raw content lazily as a stream of blobs.
"""
@abstractmethod
def yield_blobs(
self,
) -> Iterable[Blob]:
"""A lazy loader for raw data represented by LangChain's Blob object.
Returns:
A generator over blobs
"""

21
libs/core/poetry.lock generated
View File

@ -1133,6 +1133,25 @@ files = [
{file = "jupyterlab_widgets-3.0.9.tar.gz", hash = "sha256:6005a4e974c7beee84060fdfba341a3218495046de8ae3ec64888e5fe19fdb4c"}, {file = "jupyterlab_widgets-3.0.9.tar.gz", hash = "sha256:6005a4e974c7beee84060fdfba341a3218495046de8ae3ec64888e5fe19fdb4c"},
] ]
[[package]]
name = "langchain-text-splitters"
version = "0.0.1"
description = "LangChain text splitting utilities"
optional = false
python-versions = ">=3.8.1,<4.0"
files = []
develop = true
[package.dependencies]
langchain-core = "^0.1.28"
[package.extras]
extended-testing = ["lxml (>=5.1.0,<6.0.0)"]
[package.source]
type = "directory"
url = "../text-splitters"
[[package]] [[package]]
name = "langsmith" name = "langsmith"
version = "0.1.1" version = "0.1.1"
@ -2815,4 +2834,4 @@ extended-testing = ["jinja2"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.8.1,<4.0" python-versions = ">=3.8.1,<4.0"
content-hash = "de97591989f083b89c7a7bc6dabba87e29e13fddc812450d5196d564b2c02ce1" content-hash = "092a56ee5733650e75cdacb0480d6a7fea1ff40a4a7f33500f77990a6e590ea4"

View File

@ -34,6 +34,7 @@ mypy = "^0.991"
types-pyyaml = "^6.0.12.2" types-pyyaml = "^6.0.12.2"
types-requests = "^2.28.11.5" types-requests = "^2.28.11.5"
types-jinja2 = "^2.11.9" types-jinja2 = "^2.11.9"
langchain-text-splitters = {path = "../text-splitters", develop = true}
[tool.poetry.group.dev] [tool.poetry.group.dev]
optional = true optional = true