mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-28 15:00:23 +00:00
airbyte[patch]: init pkg (#18236)
This commit is contained in:
121
libs/partners/airbyte/langchain_airbyte/document_loaders.py
Normal file
121
libs/partners/airbyte/langchain_airbyte/document_loaders.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Airbyte vector stores."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncIterator,
|
||||
Dict,
|
||||
Iterator,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
import airbyte as ab
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.prompts import PromptTemplate
|
||||
from langchain_core.runnables import run_in_executor
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langchain.text_splitter import TextSplitter
|
||||
from langchain_core.documents import Document
|
||||
|
||||
VST = TypeVar("VST", bound=VectorStore)
|
||||
|
||||
|
||||
class AirbyteLoader:
|
||||
"""Airbyte Document Loader.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_airbyte import AirbyteLoader
|
||||
|
||||
loader = AirbyteLoader(
|
||||
source="github",
|
||||
stream="pull_requests",
|
||||
)
|
||||
documents = loader.lazy_load()
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source: str,
|
||||
stream: str,
|
||||
*,
|
||||
config: Optional[Dict] = None,
|
||||
include_metadata: bool = True,
|
||||
template: Optional[PromptTemplate] = None,
|
||||
):
|
||||
self._airbyte_source = ab.get_source(source, config=config, streams=[stream])
|
||||
self._stream = stream
|
||||
self._template = template
|
||||
self._include_metadata = include_metadata
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
"""Load source data into Document objects."""
|
||||
return list(self.lazy_load())
|
||||
|
||||
def load_and_split(
|
||||
self, text_splitter: Optional[TextSplitter] = None
|
||||
) -> List[Document]:
|
||||
"""Load Documents and split into chunks. Chunks are returned as Documents.
|
||||
|
||||
Args:
|
||||
text_splitter: TextSplitter instance to use for splitting documents.
|
||||
Defaults to RecursiveCharacterTextSplitter.
|
||||
|
||||
Returns:
|
||||
List of Documents.
|
||||
"""
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
|
||||
if text_splitter is None:
|
||||
_text_splitter: TextSplitter = RecursiveCharacterTextSplitter()
|
||||
else:
|
||||
_text_splitter = text_splitter
|
||||
docs = self.lazy_load()
|
||||
return _text_splitter.split_documents(docs)
|
||||
|
||||
def lazy_load(self) -> Iterator[Document]:
|
||||
"""A lazy loader for Documents."""
|
||||
# if no prompt template defined, use default airbyte documents
|
||||
if not self._template:
|
||||
for document in self._airbyte_source.get_documents(self._stream):
|
||||
# convert airbyte document to langchain document
|
||||
metadata = (
|
||||
{}
|
||||
if not self._include_metadata
|
||||
else {
|
||||
**document.metadata,
|
||||
"_last_modified": document.last_modified,
|
||||
"_id": document.id,
|
||||
}
|
||||
)
|
||||
yield Document(
|
||||
page_content=document.content,
|
||||
metadata=metadata,
|
||||
)
|
||||
else:
|
||||
records: Iterator[Mapping[str, Any]] = self._airbyte_source.get_records(
|
||||
self._stream
|
||||
)
|
||||
for record in records:
|
||||
metadata = {} if not self._include_metadata else dict(record)
|
||||
yield Document(
|
||||
page_content=self._template.format(**record), metadata=metadata
|
||||
)
|
||||
|
||||
async def alazy_load(self) -> AsyncIterator[Document]:
|
||||
"""A lazy loader for Documents."""
|
||||
iterator = await run_in_executor(None, self.lazy_load)
|
||||
done = object()
|
||||
while True:
|
||||
doc = await run_in_executor(None, next, iterator, done) # type: ignore[call-arg, arg-type]
|
||||
if doc is done:
|
||||
break
|
||||
yield doc # type: ignore[misc]
|
Reference in New Issue
Block a user