Compare commits

...

3 Commits

Author SHA1 Message Date
Eugene Yurtsev
1ef42e8e8a x 2023-08-22 14:48:31 -04:00
Eugene Yurtsev
a2ff5f2fd7 x 2023-08-22 14:44:18 -04:00
Eugene Yurtsev
ff0862e3b1 x 2023-08-22 14:39:43 -04:00
3 changed files with 157 additions and 1 deletions

View File

@@ -132,6 +132,7 @@ from langchain.document_loaders.pdf import (
PyPDFLoader,
UnstructuredPDFLoader,
)
from langchain.document_loaders.pipeline import DocumentPipeline
from langchain.document_loaders.polars_dataframe import PolarsDataFrameLoader
from langchain.document_loaders.powerpoint import UnstructuredPowerPointLoader
from langchain.document_loaders.psychic import PsychicLoader
@@ -239,12 +240,13 @@ __all__ = [
"ConcurrentLoader",
"ConfluenceLoader",
"CubeSemanticLoader",
"DataFrameLoader",
"DatadogLogsLoader",
"DataFrameLoader",
"DiffbotLoader",
"DirectoryLoader",
"DiscordChatLoader",
"DocugamiLoader",
"DocumentPipeline",
"Docx2txtLoader",
"DropboxLoader",
"DuckDBLoader",

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import Iterator, List, Optional, Sequence
from langchain.document_loaders.base import BaseLoader
from langchain.schema import BaseDocumentTransformer, Document
from langchain.text_splitter import TextSplitter
class DocumentPipeline(BaseLoader):
"""A document pipeline that can be used to load documents.
A simple document pipeline that composes a loader and a list of transformers.
"""
def __init__(
self,
loader: BaseLoader,
*,
transformers: Sequence[BaseDocumentTransformer] = (),
) -> None:
"""Initialize the document pipeline.
Args:
loader: The loader to use for loading the documents.
transformers: The transformers to use for transforming the documents.
"""
self.loader = loader
self.transformers = transformers
def lazy_load(self) -> Iterator[Document]:
"""Fetch the data from the data selector."""
try:
documents = self.loader.lazy_load()
except NotImplementedError:
documents = iter(self.loader.load())
for document in documents:
_docs = [document]
for transformer in self.transformers:
# List below is needed because of typing issue in langchain
_docs = list(transformer.transform_documents(_docs))
yield from _docs
def load(self) -> List[Document]:
"""Fetch the data from the data selector."""
raise NotImplementedError("Use lazy_load instead")
def load_and_split(
self, text_splitter: Optional[TextSplitter] = None
) -> List[Document]:
"""Fetch the data from the data selector."""
raise NotImplementedError("Use lazy_load instead")

View File

@@ -0,0 +1,101 @@
"""Test simple document pipeline."""
from typing import Any, Iterator, List, Sequence
import pytest
from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.pipeline import DocumentPipeline
from langchain.schema import BaseDocumentTransformer, Document
class ToyLoader(BaseLoader):
"""Toy loader that always returns the same documents."""
def __init__(self, documents: Sequence[Document]) -> None:
"""Initialize with the documents to return."""
self.documents = documents
def lazy_load(
self,
) -> Iterator[Document]:
yield from self.documents
def load(self) -> List[Document]:
"""Load the documents from the source."""
return list(self.lazy_load())
class SimpleSplitter(BaseDocumentTransformer):
def __init__(self, sentinel: int) -> None:
"""Initialize with the sentinel value."""
self.sentinel = sentinel
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
"""Split the document into two documents."""
docs = []
for document in documents:
doc1 = document.copy()
doc1.page_content = doc1.page_content + f"({self.sentinel}|1)"
docs.append(doc1)
doc2 = document.copy()
doc2.page_content = doc2.page_content + f"({self.sentinel}|2)"
docs.append(doc2)
return docs
async def atransform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
raise NotImplementedError()
@pytest.fixture
def loader() -> ToyLoader:
"""Get a toy loader."""
return ToyLoader(
documents=[
Document(
page_content="A",
),
Document(
page_content="B",
),
]
)
def test_methods_should_remain_unimplemented(loader: ToyLoader) -> None:
"""Test the document pipeline."""
pipeline = DocumentPipeline(loader)
with pytest.raises(NotImplementedError):
pipeline.load()
with pytest.raises(NotImplementedError):
pipeline.load_and_split()
def test_simple_pipeline(loader: ToyLoader) -> None:
"""Test simple document pipeline."""
pipeline = DocumentPipeline(loader)
assert list(pipeline.lazy_load()) == loader.documents
def test_pipeline_with_transformations(loader: ToyLoader) -> None:
"""Test pipeline with transformations."""
pipeline = DocumentPipeline(
loader, transformers=[SimpleSplitter(1), SimpleSplitter(2)]
)
docs = list(pipeline.lazy_load())
assert sorted(doc.page_content for doc in docs) == [
"A(1|1)(2|1)",
"A(1|1)(2|2)",
"A(1|2)(2|1)",
"A(1|2)(2|2)",
"B(1|1)(2|1)",
"B(1|1)(2|2)",
"B(1|2)(2|1)",
"B(1|2)(2|2)",
]