mirror of
https://github.com/hwchase17/langchain.git
synced 2026-01-23 21:31:02 +00:00
Compare commits
19 Commits
sr/fix-cod
...
eugene/per
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ae6eb1429 | ||
|
|
cd80d041f6 | ||
|
|
ab0b73a415 | ||
|
|
9b2e39cb24 | ||
|
|
faf92ae133 | ||
|
|
abbc4948c1 | ||
|
|
a5c1a6fae0 | ||
|
|
5b179daa47 | ||
|
|
9e1bd1e90e | ||
|
|
6968fc79be | ||
|
|
41d61c9e8f | ||
|
|
b0a2ebb271 | ||
|
|
b6340bb94d | ||
|
|
7cc1549a07 | ||
|
|
a0f65462a5 | ||
|
|
4b868c3214 | ||
|
|
6097f3efe8 | ||
|
|
6104e4ef4d | ||
|
|
2893f5afd4 |
213
langchain/docstore/artifact_stores.py
Normal file
213
langchain/docstore/artifact_stores.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Implement artifact storage using the file system.
|
||||
|
||||
This is a simple implementation that stores artifacts in a directory and
|
||||
metadata in a JSON file. It's used for prototyping.
|
||||
|
||||
Metadata should move into a SQLLite.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
TypedDict,
|
||||
Sequence,
|
||||
Optional,
|
||||
Iterator,
|
||||
Union,
|
||||
List,
|
||||
Iterable,
|
||||
)
|
||||
|
||||
from langchain.docstore.base import ArtifactStore, Selector, Artifact, ArtifactWithData
|
||||
from langchain.docstore.serialization import serialize_document, deserialize_document
|
||||
from langchain.embeddings.base import Embeddings
|
||||
from langchain.schema import Document
|
||||
|
||||
MaybeDocument = Optional[Document]
|
||||
|
||||
PathLike = Union[str, Path]
|
||||
|
||||
|
||||
class Metadata(TypedDict):
|
||||
"""Metadata format"""
|
||||
|
||||
artifacts: List[Artifact]
|
||||
|
||||
|
||||
class MetadataStore(abc.ABC):
|
||||
"""Abstract metadata store.
|
||||
|
||||
Need to populate with all required methods.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def upsert(self, artifact: Artifact):
|
||||
"""Add the given artifact to the store."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def select(self, selector: Selector) -> Iterable[str]:
|
||||
"""Select the artifacts matching the given selector."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class CacheBackedEmbedder:
|
||||
"""Interface for embedding models."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
artifact_store: ArtifactStore,
|
||||
underlying_embedder: Embeddings,
|
||||
) -> None:
|
||||
"""Initialize the embedder."""
|
||||
self.artifact_store = artifact_store
|
||||
self.underlying_embedder = underlying_embedder
|
||||
|
||||
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
||||
"""Embed search docs."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def embed_query(self, text: str) -> List[float]:
|
||||
"""Embed query text."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class InMemoryStore(MetadataStore):
|
||||
"""In-memory metadata store backed by a file.
|
||||
|
||||
In its current form, this store will be really slow for large collections of files.
|
||||
"""
|
||||
|
||||
def __init__(self, data: Metadata) -> None:
|
||||
"""Initialize the in-memory store."""
|
||||
super().__init__()
|
||||
self.data = data
|
||||
self.artifacts = data["artifacts"]
|
||||
# indexes for speed
|
||||
self.artifact_uids = {artifact["uid"]: artifact for artifact in self.artifacts}
|
||||
|
||||
def exists_by_uids(self, uids: Sequence[str]) -> List[bool]:
|
||||
"""Order preserving check if the artifact with the given id exists."""
|
||||
return [bool(uid in self.artifact_uids) for uid in uids]
|
||||
|
||||
def get_by_uids(self, uids: Sequence[str]) -> List[Artifact]:
|
||||
"""Return the documents with the given uuids."""
|
||||
return [self.artifact_uids[uid] for uid in uids]
|
||||
|
||||
def select(self, selector: Selector) -> Iterable[str]:
|
||||
"""Return the hashes the artifacts matching the given selector."""
|
||||
# Inefficient implementation that loops through all artifacts.
|
||||
# Optimize later.
|
||||
for artifact in self.data["artifacts"]:
|
||||
uid = artifact["uid"]
|
||||
# Implement conjunctive normal form
|
||||
if selector.uids and artifact["uid"] in selector.uids:
|
||||
yield uid
|
||||
continue
|
||||
|
||||
if selector.parent_uids and set(artifact["parent_uids"]).intersection(
|
||||
selector.parent_uids
|
||||
):
|
||||
yield uid
|
||||
continue
|
||||
|
||||
def save(self, path: PathLike) -> None:
|
||||
"""Save the metadata to the given path."""
|
||||
with open(path, "w") as f:
|
||||
json.dump(self.data, f)
|
||||
|
||||
def upsert(self, artifact: Artifact) -> None:
|
||||
"""Add the given artifact to the store."""
|
||||
uid = artifact["uid"]
|
||||
if uid not in self.artifact_uids:
|
||||
self.data["artifacts"].append(artifact)
|
||||
self.artifact_uids[artifact["uid"]] = artifact
|
||||
|
||||
def remove(self, selector: Selector) -> None:
|
||||
"""Remove the given artifacts from the store."""
|
||||
uids = list(self.select(selector))
|
||||
self.remove_by_uuids(uids)
|
||||
|
||||
def remove_by_uuids(self, uids: Sequence[str]) -> None:
|
||||
"""Remove the given artifacts from the store."""
|
||||
for uid in uids:
|
||||
del self.artifact_uids[uid]
|
||||
raise NotImplementedError(f"Need to delete artifacts as well")
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: PathLike) -> InMemoryStore:
|
||||
"""Load store metadata from the given path."""
|
||||
with open(path, "r") as f:
|
||||
content = json.load(f)
|
||||
return cls(content)
|
||||
|
||||
|
||||
class FileSystemArtifactLayer(ArtifactStore):
|
||||
"""An artifact layer for storing artifacts on the file system."""
|
||||
|
||||
def __init__(self, root: PathLike) -> None:
|
||||
"""Initialize the file system artifact layer."""
|
||||
_root = root if isinstance(root, Path) else Path(root)
|
||||
self.root = _root
|
||||
# Metadata file will be kept in memory for now and updated with
|
||||
# each call.
|
||||
# This is error-prone due to race conditions (if multiple
|
||||
# processes are writing), but OK for prototyping / simple use cases.
|
||||
metadata_path = _root / "metadata.json"
|
||||
self.metadata_path = metadata_path
|
||||
|
||||
if metadata_path.exists():
|
||||
self.metadata_store = InMemoryStore.from_file(self.metadata_path)
|
||||
else:
|
||||
self.metadata_store = InMemoryStore({"artifacts": []})
|
||||
|
||||
def exists_by_uid(self, uuids: Sequence[str]) -> List[bool]:
|
||||
"""Check if the artifacts with the given uuid exist."""
|
||||
return self.metadata_store.exists_by_uids(uuids)
|
||||
|
||||
def _get_file_path(self, uid: str) -> Path:
|
||||
"""Get path to file for the given uuid."""
|
||||
return self.root / f"{uid}"
|
||||
|
||||
def upsert(
|
||||
self,
|
||||
artifacts_with_data: Sequence[ArtifactWithData],
|
||||
) -> None:
|
||||
"""Add the given artifacts."""
|
||||
# Write the documents to the file system
|
||||
for artifact_with_data in artifacts_with_data:
|
||||
# Use the document hash to write the contents to the file system
|
||||
document = artifact_with_data["document"]
|
||||
file_path = self.root / f"{document.hash_}"
|
||||
with open(file_path, "w") as f:
|
||||
f.write(serialize_document(document))
|
||||
|
||||
artifact = artifact_with_data["artifact"].copy()
|
||||
# Storing at a file -- can clean up the artifact with data request
|
||||
# later
|
||||
artifact["location"] = str(file_path)
|
||||
self.metadata_store.upsert(artifact)
|
||||
|
||||
self.metadata_store.save(self.metadata_path)
|
||||
|
||||
def list_document_ids(self, selector: Selector) -> Iterator[str]:
|
||||
"""List the document ids matching the given selector."""
|
||||
yield from self.metadata_store.select(selector)
|
||||
|
||||
def list_documents(self, selector: Selector) -> Iterator[Document]:
|
||||
"""Can even use JQ here!"""
|
||||
uuids = self.metadata_store.select(selector)
|
||||
|
||||
for uuid in uuids:
|
||||
artifact = self.metadata_store.get_by_uids([uuid])[0]
|
||||
path = artifact["location"]
|
||||
with open(path, "r") as f:
|
||||
page_content = deserialize_document(f.read()).page_content
|
||||
yield Document(
|
||||
uid=artifact["uid"],
|
||||
parent_uids=artifact["parent_uids"],
|
||||
metadata=artifact["metadata"],
|
||||
tags=artifact["tags"],
|
||||
page_content=page_content,
|
||||
)
|
||||
@@ -1,8 +1,21 @@
|
||||
"""Interface to access to place that stores documents."""
|
||||
import abc
|
||||
import dataclasses
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Union
|
||||
from typing import (
|
||||
Dict,
|
||||
Sequence,
|
||||
Iterator,
|
||||
Optional,
|
||||
List,
|
||||
Literal,
|
||||
TypedDict,
|
||||
Tuple,
|
||||
Union,
|
||||
Any,
|
||||
)
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.schema import Document
|
||||
|
||||
|
||||
class Docstore(ABC):
|
||||
@@ -23,3 +36,126 @@ class AddableMixin(ABC):
|
||||
@abstractmethod
|
||||
def add(self, texts: Dict[str, Document]) -> None:
|
||||
"""Add more documents."""
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Selector:
|
||||
"""Selection criteria represented in conjunctive normal form.
|
||||
|
||||
https://en.wikipedia.org/wiki/Conjunctive_normal_form
|
||||
|
||||
At the moment, the explicit representation is used for simplicity / prototyping.
|
||||
|
||||
It may be replaced by an ability of specifying selection with jq
|
||||
if operating on JSON metadata or else something free form like SQL.
|
||||
"""
|
||||
|
||||
parent_uids: Optional[Sequence[str]] = None
|
||||
uids: Optional[Sequence[str]] = None
|
||||
# Pick up all artifacts with the given tags.
|
||||
# Maybe we should call this transformations.
|
||||
tags: Optional[Sequence[str]] = None # <-- WE DONT WANT TO DO IT THIS WAY
|
||||
transformation_path: Sequence[str] = None
|
||||
"""Use to specify a transformation path according to which we select documents"""
|
||||
|
||||
|
||||
# KNOWN WAYS THIS CAN FAIL:
|
||||
# 1) If the process crashes while text splitting, creating only some of the artifacts
|
||||
# ... new pipeline will not re-create the missing artifacts! (at least for now)
|
||||
# it will use the ones that exist and assume that all of them have been created
|
||||
|
||||
|
||||
# TODO: MAJOR MAJOR MAJOR MAJOR
|
||||
# 1. FIX SEMANTICS WITH REGARDS TO ID, UUID. AND POTENTIALLY ARTIFACT_ID
|
||||
# NEED TO REASON THROUGH USE CASES CAREFULLY TO REASON ABOUT WHATS MINIMAL SUFFICIENT
|
||||
# 2. Using hashes throughout for implementation simplicity, but may want to switch
|
||||
# to ids assigned by the a database? probability of collision is really small
|
||||
class Artifact(TypedDict):
|
||||
|
||||
"""A representation of an artifact."""
|
||||
|
||||
uid: str # This has to be handled carefully -- we'll eventually get collisions
|
||||
"""A unique identifier for the artifact."""
|
||||
type_: Union[Literal["document"], Literal["embedding"], Literal["blob"]]
|
||||
"""A unique identifier for the artifact."""
|
||||
data_hash: str
|
||||
"""A hash of the data of the artifact."""
|
||||
metadata_hash: str
|
||||
"""A hash of the metadata of the artifact."""
|
||||
parent_uids: Tuple[str, ...]
|
||||
"""A tuple of uids representing the parent artifacts."""
|
||||
parent_hashes: Tuple[str, ...]
|
||||
"""A tuple of hashes representing the parent artifacts at time of transformation."""
|
||||
transformation_hash: str
|
||||
"""A hash of the transformation that was applied to generate artifact.
|
||||
|
||||
This parameterizes the transformation logic together with any transformation
|
||||
parameters.
|
||||
"""
|
||||
created_at: str # ISO-8601
|
||||
"""The time the artifact was created."""
|
||||
updated_at: str # ISO-8601
|
||||
"""The time the artifact was last updated."""
|
||||
metadata: Any
|
||||
"""A dictionary representing the metadata of the artifact."""
|
||||
tags: Tuple[str, ...]
|
||||
"""A tuple of tags associated with the artifact.
|
||||
|
||||
Can use tags to add information about the transformation that was applied
|
||||
to the given artifact.
|
||||
|
||||
THIS IS NOT A GOOD REPRESENTATION.
|
||||
"""
|
||||
"""The type of the artifact.""" # THIS MAY NEED TO BE CHANGED
|
||||
data: Optional[bytes]
|
||||
"""The data of the artifact when the artifact contains the data by value.
|
||||
|
||||
Will likely change somehow.
|
||||
|
||||
* For first pass contains embedding data.
|
||||
* document data and blob data stored externally.
|
||||
"""
|
||||
location: Optional[str]
|
||||
# Location specifies the location of the artifact when
|
||||
# the artifact contains the data by reference (use for documents / blobs)
|
||||
|
||||
|
||||
class ArtifactWithData(TypedDict):
|
||||
"""A document with the transformation that generated it."""
|
||||
|
||||
artifact: Artifact
|
||||
document: Document
|
||||
|
||||
|
||||
class ArtifactStore(abc.ABC):
|
||||
"""Use to keep track of artifacts generated while processing content.
|
||||
|
||||
The first version of the artifact store is used to work with Documents
|
||||
rather than Blobs.
|
||||
|
||||
We will likely want to evolve this into Blobs, but faster to prototype
|
||||
with Documents.
|
||||
"""
|
||||
|
||||
def exists_by_uid(self, uids: Sequence[str]) -> List[bool]:
|
||||
"""Check if the artifacts with the given uuid exist."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def exists_by_parent_uids(self, uids: Sequence[str]) -> List[bool]:
|
||||
"""Check if the artifacts with the given id exist."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def upsert(
|
||||
self,
|
||||
artifacts_with_data: Sequence[ArtifactWithData],
|
||||
) -> None:
|
||||
"""Upsert the given artifacts."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def list_documents(self, selector: Selector) -> Iterator[Document]:
|
||||
"""Yield documents matching the given selector."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def list_document_ids(self, selector: Selector) -> Iterator[str]:
|
||||
"""Yield document ids matching the given selector."""
|
||||
raise NotImplementedError()
|
||||
|
||||
133
langchain/docstore/pipeline.py
Normal file
133
langchain/docstore/pipeline.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Module implements a pipeline.
|
||||
|
||||
There might be a better name for this.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from typing import Sequence, Optional, Iterator, Iterable, List
|
||||
|
||||
from langchain.docstore.base import ArtifactWithData, ArtifactStore, Selector
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
from langchain.schema import Document, BaseDocumentTransformer
|
||||
from langchain.text_splitter import TextSplitter
|
||||
|
||||
|
||||
def _convert_document_to_artifact_upsert(
|
||||
document: Document, parent_documents: Sequence[Document], transformation_hash: str
|
||||
) -> ArtifactWithData:
|
||||
"""Convert the given documents to artifacts for upserting."""
|
||||
dt = datetime.datetime.now().isoformat()
|
||||
parent_uids = [str(parent_doc.uid) for parent_doc in parent_documents]
|
||||
parent_hashes = [str(parent_doc.hash_) for parent_doc in parent_documents]
|
||||
|
||||
return {
|
||||
"artifact": {
|
||||
"uid": str(document.uid),
|
||||
"parent_uids": parent_uids,
|
||||
"metadata": document.metadata,
|
||||
"parent_hashes": parent_hashes,
|
||||
"tags": tuple(),
|
||||
"type_": "document",
|
||||
"data": None,
|
||||
"location": None,
|
||||
"data_hash": str(document.hash_),
|
||||
"metadata_hash": "N/A",
|
||||
"created_at": dt,
|
||||
"updated_at": dt,
|
||||
"transformation_hash": transformation_hash,
|
||||
},
|
||||
"document": document,
|
||||
}
|
||||
|
||||
|
||||
class Pipeline(BaseLoader): # MAY NOT WANT TO INHERIT FROM LOADER
|
||||
def __init__(
|
||||
self,
|
||||
loader: BaseLoader,
|
||||
*,
|
||||
transformers: Optional[Sequence[BaseDocumentTransformer]] = None,
|
||||
artifact_store: Optional[ArtifactStore] = None,
|
||||
) -> None:
|
||||
"""Initialize the document pipeline.
|
||||
|
||||
Args:
|
||||
loader: The loader to use for loading the documents.
|
||||
transformers: The transformers to use for transforming the documents.
|
||||
artifact_store: The artifact store to use for storing the artifacts.
|
||||
"""
|
||||
self.loader = loader
|
||||
self.transformers = transformers
|
||||
self.artifact_store = artifact_store
|
||||
|
||||
def lazy_load(
|
||||
self,
|
||||
) -> Iterator[Document]:
|
||||
"""Lazy load the documents."""
|
||||
transformations = self.transformers or []
|
||||
# Need syntax for determining whether this should be cached.
|
||||
|
||||
try:
|
||||
doc_iterator = self.loader.lazy_load()
|
||||
except NotImplementedError:
|
||||
doc_iterator = self.loader.load()
|
||||
|
||||
for document in doc_iterator:
|
||||
new_documents = [document]
|
||||
for transformation in transformations:
|
||||
# Batched for now here -- lots of optimization possible
|
||||
# but not needed for now and is likely going to get complex
|
||||
new_documents = list(
|
||||
self._propagate_documents(new_documents, transformation)
|
||||
)
|
||||
|
||||
yield from new_documents
|
||||
|
||||
def _propagate_documents(
|
||||
self, documents: Sequence[Document], transformation: BaseDocumentTransformer
|
||||
) -> Iterable[Document]:
|
||||
"""Transform the given documents using the transformation with caching."""
|
||||
docs_exist = self.artifact_store.exists_by_uid(
|
||||
[document.uid for document in documents]
|
||||
)
|
||||
|
||||
for document, exists in zip(documents, docs_exist):
|
||||
if exists:
|
||||
existing_docs = self.artifact_store.list_documents(
|
||||
Selector(parent_uids=[document.uid])
|
||||
)
|
||||
|
||||
materialized_docs = list(existing_docs)
|
||||
|
||||
if materialized_docs:
|
||||
yield from materialized_docs
|
||||
continue
|
||||
|
||||
transformed_docs = transformation.transform_documents([document])
|
||||
|
||||
# MAJOR: Hash should encapsulate transformation parameters
|
||||
transformation_hash = transformation.__class__.__name__
|
||||
|
||||
artifacts_with_data = [
|
||||
_convert_document_to_artifact_upsert(
|
||||
transformed_doc, [document], transformation_hash
|
||||
)
|
||||
for transformed_doc in transformed_docs
|
||||
]
|
||||
|
||||
self.artifact_store.upsert(artifacts_with_data)
|
||||
yield from transformed_docs
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
"""Load the documents."""
|
||||
return list(self.lazy_load())
|
||||
|
||||
def run(self) -> None: # BAD API NEED
|
||||
"""Execute the pipeline, returning nothing."""
|
||||
for _ in self.lazy_load():
|
||||
pass
|
||||
|
||||
def load_and_split(
|
||||
self, text_splitter: Optional[TextSplitter] = None
|
||||
) -> List[Document]:
|
||||
raise NotImplementedError("This method will never be implemented.")
|
||||
48
langchain/docstore/serialization.py
Normal file
48
langchain/docstore/serialization.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Module for serialization code.
|
||||
|
||||
This code will likely be replaced by Nuno's serialization method.
|
||||
"""
|
||||
import json
|
||||
from json import JSONEncoder, JSONDecodeError
|
||||
from uuid import UUID
|
||||
|
||||
from langchain.schema import Document
|
||||
|
||||
|
||||
class UUIDEncoder(JSONEncoder):
|
||||
"""Will either be replaced by Nuno's serialization method or something else.
|
||||
|
||||
Potentially there will be no serialization for a document object since
|
||||
the document can be broken into 2 pieces:
|
||||
|
||||
* the content -> saved on disk or in database
|
||||
* the metadata -> saved in metadata store
|
||||
|
||||
It may not make sense to keep the metadata together with the document
|
||||
for the persistence.
|
||||
"""
|
||||
|
||||
def default(self, obj):
|
||||
if isinstance(obj, UUID):
|
||||
return str(obj) # Convert UUID to string
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
# PUBLIC API
|
||||
|
||||
|
||||
def serialize_document(document: Document) -> str:
|
||||
"""Serialize the given document to a string."""
|
||||
try:
|
||||
# Serialize only the content.
|
||||
# Metadata always stored separately.
|
||||
return json.dumps(document.page_content)
|
||||
except JSONDecodeError:
|
||||
raise ValueError(f"Could not serialize document with ID: {document.uid}")
|
||||
|
||||
|
||||
def deserialize_document(serialized_document: str) -> Document:
|
||||
"""Deserialize the given document from a string."""
|
||||
return Document(
|
||||
page_content=json.loads(serialized_document),
|
||||
)
|
||||
69
langchain/docstore/sync.py
Normal file
69
langchain/docstore/sync.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Module contains doc for syncing from docstore to vectorstores."""
|
||||
from __future__ import annotations
|
||||
|
||||
from itertools import islice
|
||||
from typing import TypedDict, Sequence, Optional, TypeVar, Iterable, Iterator, List
|
||||
from langchain.docstore.base import ArtifactStore, Selector
|
||||
from langchain.vectorstores import VectorStore
|
||||
|
||||
|
||||
class SyncResult(TypedDict):
|
||||
"""Syncing result."""
|
||||
|
||||
first_n_errors: Sequence[str]
|
||||
"""First n errors that occurred during syncing."""
|
||||
num_added: Optional[int]
|
||||
"""Number of added documents."""
|
||||
num_updated: Optional[int]
|
||||
"""Number of updated documents because they were not up to date."""
|
||||
num_deleted: Optional[int]
|
||||
"""Number of deleted documents."""
|
||||
num_skipped: Optional[int]
|
||||
"""Number of skipped documents because they were already up to date."""
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
|
||||
"""Utility batching function."""
|
||||
it = iter(iterable)
|
||||
while True:
|
||||
chunk = list(islice(it, size))
|
||||
if not chunk:
|
||||
return
|
||||
yield chunk
|
||||
|
||||
|
||||
# SYNC IMPLEMENTATION
|
||||
|
||||
|
||||
def sync(
|
||||
artifact_store: ArtifactStore,
|
||||
vector_store: VectorStore,
|
||||
selector: Selector,
|
||||
*,
|
||||
batch_size: int = 1000,
|
||||
) -> SyncResult:
|
||||
"""Sync the given artifact layer with the given vector store."""
|
||||
document_uids = artifact_store.list_document_ids(selector)
|
||||
|
||||
all_uids = []
|
||||
# IDs must fit into memory for this to work.
|
||||
for uid_batch in _batch(batch_size, document_uids):
|
||||
all_uids.extend(uid_batch)
|
||||
document_batch = list(artifact_store.list_documents(Selector(uids=uid_batch)))
|
||||
upsert_info = vector_store.upsert_by_id(
|
||||
documents=document_batch, batch_size=batch_size
|
||||
)
|
||||
# Non-intuitive interface, but simple to implement
|
||||
# (maybe we can have a better solution though)
|
||||
num_deleted = vector_store.delete_non_matching_ids(all_uids)
|
||||
|
||||
return {
|
||||
"first_n_errors": [],
|
||||
"num_added": None,
|
||||
"num_updated": None,
|
||||
"num_skipped": None,
|
||||
"num_deleted": None,
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Common schema objects."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -12,9 +14,11 @@ from typing import (
|
||||
Sequence,
|
||||
TypeVar,
|
||||
Union,
|
||||
Tuple,
|
||||
)
|
||||
from uuid import UUID, uuid5
|
||||
|
||||
from pydantic import BaseModel, Extra, Field, root_validator
|
||||
from pydantic import BaseModel, Extra, Field, root_validator, ValidationError
|
||||
|
||||
|
||||
def get_buffer_string(
|
||||
@@ -266,8 +270,39 @@ class BaseChatMessageHistory(ABC):
|
||||
class Document(BaseModel):
|
||||
"""Interface for interacting with a document."""
|
||||
|
||||
uid: str # Assigned unique identifier
|
||||
hash_: UUID # A hash of the content + metadata
|
||||
# TODO(We likely want multiple hashes, one for content, one for metadata, etc)
|
||||
# content_hash_: UUID # A hash of the content alone.
|
||||
page_content: str
|
||||
# Required field for provenance.
|
||||
# Provenance ALWAYS refers to the original source of the document.
|
||||
# No matter what transformations have been done on the context.
|
||||
# provenance: Tuple[str, ...] = tuple() # TODO(not needed for now)
|
||||
# User created metadata
|
||||
metadata: dict = Field(default_factory=dict)
|
||||
# Use to keep track of parent documents from which the document was generated
|
||||
# We could keep this is a non sequence to get started for simplicity
|
||||
# parent_uids: Tuple[str, ...] = tuple() # TODO(Move to metadata store)
|
||||
|
||||
@root_validator(pre=True)
|
||||
def assign_id_if_not_provided(cls, values: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Assign an ID if one is not provided."""
|
||||
if "page_content" not in values:
|
||||
raise ValidationError("Must provide page_content")
|
||||
if "hash_" not in values:
|
||||
# TODO: Hash should be updated to include all metadata fields.
|
||||
# Document should become immutable likely otherwise it invalidates
|
||||
# any logic done based on hash -- and that's the default uid used.
|
||||
content_hash = hashlib.sha256(values["page_content"].encode()).hexdigest()
|
||||
hash_ = str(uuid5(UUID(int=0), content_hash))
|
||||
values["hash_"] = hash_
|
||||
else:
|
||||
hash_ = values["hash_"]
|
||||
if "uid" not in values:
|
||||
# Generate an ID based on the hash of the content
|
||||
values["uid"] = str(hash_)
|
||||
return values
|
||||
|
||||
|
||||
class BaseRetriever(ABC):
|
||||
|
||||
@@ -5,7 +5,7 @@ import asyncio
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from functools import partial
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Sequence
|
||||
|
||||
from pydantic import BaseModel, Field, root_validator
|
||||
|
||||
@@ -15,6 +15,17 @@ from langchain.schema import BaseRetriever
|
||||
|
||||
VST = TypeVar("VST", bound="VectorStore")
|
||||
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class UpsertResult(TypedDict):
|
||||
# Number of documents updated
|
||||
num_updated: Optional[int]
|
||||
# Number of documents newly added
|
||||
num_added: Optional[int]
|
||||
# Documents can be skipped if hashes match
|
||||
num_skipped: Optional[int]
|
||||
|
||||
|
||||
class VectorStore(ABC):
|
||||
"""Interface for vector stores."""
|
||||
@@ -60,6 +71,21 @@ class VectorStore(ABC):
|
||||
metadatas = [doc.metadata for doc in documents]
|
||||
return self.add_texts(texts, metadatas, **kwargs)
|
||||
|
||||
def upsert_by_id(self, documents: Sequence[Document], **kwargs) -> UpsertResult:
|
||||
"""Update or insert a document into the vectorstore."""
|
||||
raise NotImplementedError()
|
||||
|
||||
# THIS MAY NEED TO BE CLEANED UP. ITS NOT SUPER PRETTY BUT IT IS EFFICIENT.
|
||||
# THIS SHOULD PROBABL BE REPLACED TO DELETION BY A METADATA TAG
|
||||
# OTHERWISE MEMORY MANAGEMENT IS AN ISSUE
|
||||
def delete_non_matching_ids(self, ids: Iterable[str], **kwargs) -> int:
|
||||
"""Delete all ids that are not in the given list, but are in the vector store"""
|
||||
raise NotImplementedError
|
||||
|
||||
def delete_by_id(self, ids: Iterable[str], batch_size: int = 1, **kwargs):
|
||||
"""Delete a document from the vectorstore."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def aadd_documents(
|
||||
self, documents: List[Document], **kwargs: Any
|
||||
) -> List[str]:
|
||||
|
||||
@@ -3,15 +3,16 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, Sequence
|
||||
|
||||
import numpy as np
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.embeddings.base import Embeddings
|
||||
from langchain.utils import xor_args
|
||||
from langchain.vectorstores.base import VectorStore
|
||||
from langchain.vectorstores.base import VectorStore, UpsertResult
|
||||
from langchain.vectorstores.utils import maximal_marginal_relevance
|
||||
from typing import List, Iterable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import chromadb
|
||||
@@ -162,6 +163,29 @@ class Chroma(VectorStore):
|
||||
)
|
||||
return ids
|
||||
|
||||
def upsert_by_id(self, documents: Sequence[Document], **kwargs) -> UpsertResult:
|
||||
"""Upsert documents by ID."""
|
||||
upsert_result: UpsertResult = {
|
||||
# Chroma upsert does not return this information
|
||||
"num_added": None,
|
||||
"num_updated": None,
|
||||
"num_skipped": None,
|
||||
}
|
||||
info = [(doc.uid, doc.metadata, doc.page_content) for doc in documents]
|
||||
uids, metadata, texts = zip(*info)
|
||||
|
||||
if self._embedding_function is not None:
|
||||
embeddings = self._embedding_function.embed_documents(
|
||||
[doc.page_content for doc in documents]
|
||||
)
|
||||
else:
|
||||
embeddings = None
|
||||
|
||||
self._collection.upsert(
|
||||
ids=uids, metadatas=metadata, embeddings=embeddings, documents=texts
|
||||
)
|
||||
return upsert_result
|
||||
|
||||
def similarity_search(
|
||||
self,
|
||||
query: str,
|
||||
|
||||
22
tests/unit_tests/docstore/test_persistence.py
Normal file
22
tests/unit_tests/docstore/test_persistence.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from langchain.docstore.artifacts import serialize_document, deserialize_document
|
||||
from langchain.schema import Document
|
||||
|
||||
|
||||
def test_serialization() -> None:
|
||||
"""Test serialization."""
|
||||
initial_doc = Document(page_content="hello")
|
||||
serialized_doc = serialize_document(initial_doc)
|
||||
assert isinstance(serialized_doc, str)
|
||||
deserialized_doc = deserialize_document(serialized_doc)
|
||||
assert isinstance(deserialized_doc, Document)
|
||||
assert deserialized_doc == initial_doc
|
||||
|
||||
|
||||
def test_serialization_with_metadata() -> None:
|
||||
"""Test serialization with metadata."""
|
||||
initial_doc = Document(page_content="hello", metadata={"source": "hello"})
|
||||
serialized_doc = serialize_document(initial_doc)
|
||||
assert isinstance(serialized_doc, str)
|
||||
deserialized_doc = deserialize_document(serialized_doc)
|
||||
assert isinstance(deserialized_doc, Document)
|
||||
assert deserialized_doc == initial_doc
|
||||
0
tests/unit_tests/schema/__init__.py
Normal file
0
tests/unit_tests/schema/__init__.py
Normal file
19
tests/unit_tests/schema/test_document.py
Normal file
19
tests/unit_tests/schema/test_document.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Test document schema."""
|
||||
from langchain.schema import Document
|
||||
|
||||
|
||||
def test_document_hashes() -> None:
|
||||
"""Test document hashing."""
|
||||
d1 = Document(page_content="hello")
|
||||
expected_hash = "0945717e-8d14-5f14-957f-0fb0ea1d56af"
|
||||
assert str(d1.hash_) == expected_hash
|
||||
|
||||
d2 = Document(id="hello", page_content="hello")
|
||||
assert str(d2.hash_) == expected_hash
|
||||
|
||||
d3 = Document(id="hello", page_content="hello2")
|
||||
assert str(d3.hash_) != expected_hash
|
||||
|
||||
# Still fails. Need to update hash to hash metadata as well.
|
||||
d4 = Document(id="hello", page_content="hello", metadata={"source": "hello"})
|
||||
assert str(d4.hash_) != expected_hash
|
||||
Reference in New Issue
Block a user