Compare commits

...

19 Commits

Author SHA1 Message Date
Eugene Yurtsev
5ae6eb1429 x 2023-06-06 23:17:02 -04:00
Eugene Yurtsev
cd80d041f6 x 2023-06-06 22:56:10 -04:00
Eugene Yurtsev
ab0b73a415 x 2023-06-06 15:10:55 -04:00
Eugene Yurtsev
9b2e39cb24 x 2023-06-06 15:00:48 -04:00
Eugene Yurtsev
faf92ae133 x 2023-06-06 15:00:09 -04:00
Eugene Yurtsev
abbc4948c1 Merge branch 'master' into eugene/persistence_db 2023-06-06 11:36:33 -04:00
Eugene Yurtsev
a5c1a6fae0 x 2023-06-05 22:51:20 -04:00
Eugene Yurtsev
5b179daa47 q 2023-06-05 22:49:42 -04:00
Eugene Yurtsev
9e1bd1e90e x 2023-06-05 22:25:54 -04:00
Eugene Yurtsev
6968fc79be q 2023-06-05 22:21:51 -04:00
Eugene Yurtsev
41d61c9e8f x 2023-06-05 22:02:17 -04:00
Eugene Yurtsev
b0a2ebb271 q 2023-06-05 21:23:07 -04:00
Eugene Yurtsev
b6340bb94d x 2023-06-05 21:06:55 -04:00
Eugene Yurtsev
7cc1549a07 q 2023-06-05 20:46:01 -04:00
Eugene Yurtsev
a0f65462a5 Merge branch 'master' into eugene/persistence_db 2023-06-05 18:18:49 -04:00
Eugene Yurtsev
4b868c3214 q 2023-06-05 15:02:33 -04:00
Eugene Yurtsev
6097f3efe8 q 2023-06-05 13:52:46 -04:00
Eugene Yurtsev
6104e4ef4d q 2023-06-05 11:22:27 -04:00
Eugene Yurtsev
2893f5afd4 q 2023-06-05 11:11:09 -04:00
11 changed files with 731 additions and 6 deletions

View File

@@ -0,0 +1,213 @@
"""Implement artifact storage using the file system.
This is a simple implementation that stores artifacts in a directory and
metadata in a JSON file. It's used for prototyping.
Metadata should move into a SQLLite.
"""
from __future__ import annotations
import abc
import json
from pathlib import Path
from typing import (
TypedDict,
Sequence,
Optional,
Iterator,
Union,
List,
Iterable,
)
from langchain.docstore.base import ArtifactStore, Selector, Artifact, ArtifactWithData
from langchain.docstore.serialization import serialize_document, deserialize_document
from langchain.embeddings.base import Embeddings
from langchain.schema import Document
MaybeDocument = Optional[Document]
PathLike = Union[str, Path]
class Metadata(TypedDict):
"""Metadata format"""
artifacts: List[Artifact]
class MetadataStore(abc.ABC):
"""Abstract metadata store.
Need to populate with all required methods.
"""
@abc.abstractmethod
def upsert(self, artifact: Artifact):
"""Add the given artifact to the store."""
@abc.abstractmethod
def select(self, selector: Selector) -> Iterable[str]:
"""Select the artifacts matching the given selector."""
raise NotImplementedError
class CacheBackedEmbedder:
"""Interface for embedding models."""
def __init__(
self,
artifact_store: ArtifactStore,
underlying_embedder: Embeddings,
) -> None:
"""Initialize the embedder."""
self.artifact_store = artifact_store
self.underlying_embedder = underlying_embedder
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
raise NotImplementedError()
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
raise NotImplementedError()
class InMemoryStore(MetadataStore):
"""In-memory metadata store backed by a file.
In its current form, this store will be really slow for large collections of files.
"""
def __init__(self, data: Metadata) -> None:
"""Initialize the in-memory store."""
super().__init__()
self.data = data
self.artifacts = data["artifacts"]
# indexes for speed
self.artifact_uids = {artifact["uid"]: artifact for artifact in self.artifacts}
def exists_by_uids(self, uids: Sequence[str]) -> List[bool]:
"""Order preserving check if the artifact with the given id exists."""
return [bool(uid in self.artifact_uids) for uid in uids]
def get_by_uids(self, uids: Sequence[str]) -> List[Artifact]:
"""Return the documents with the given uuids."""
return [self.artifact_uids[uid] for uid in uids]
def select(self, selector: Selector) -> Iterable[str]:
"""Return the hashes the artifacts matching the given selector."""
# Inefficient implementation that loops through all artifacts.
# Optimize later.
for artifact in self.data["artifacts"]:
uid = artifact["uid"]
# Implement conjunctive normal form
if selector.uids and artifact["uid"] in selector.uids:
yield uid
continue
if selector.parent_uids and set(artifact["parent_uids"]).intersection(
selector.parent_uids
):
yield uid
continue
def save(self, path: PathLike) -> None:
"""Save the metadata to the given path."""
with open(path, "w") as f:
json.dump(self.data, f)
def upsert(self, artifact: Artifact) -> None:
"""Add the given artifact to the store."""
uid = artifact["uid"]
if uid not in self.artifact_uids:
self.data["artifacts"].append(artifact)
self.artifact_uids[artifact["uid"]] = artifact
def remove(self, selector: Selector) -> None:
"""Remove the given artifacts from the store."""
uids = list(self.select(selector))
self.remove_by_uuids(uids)
def remove_by_uuids(self, uids: Sequence[str]) -> None:
"""Remove the given artifacts from the store."""
for uid in uids:
del self.artifact_uids[uid]
raise NotImplementedError(f"Need to delete artifacts as well")
@classmethod
def from_file(cls, path: PathLike) -> InMemoryStore:
"""Load store metadata from the given path."""
with open(path, "r") as f:
content = json.load(f)
return cls(content)
class FileSystemArtifactLayer(ArtifactStore):
"""An artifact layer for storing artifacts on the file system."""
def __init__(self, root: PathLike) -> None:
"""Initialize the file system artifact layer."""
_root = root if isinstance(root, Path) else Path(root)
self.root = _root
# Metadata file will be kept in memory for now and updated with
# each call.
# This is error-prone due to race conditions (if multiple
# processes are writing), but OK for prototyping / simple use cases.
metadata_path = _root / "metadata.json"
self.metadata_path = metadata_path
if metadata_path.exists():
self.metadata_store = InMemoryStore.from_file(self.metadata_path)
else:
self.metadata_store = InMemoryStore({"artifacts": []})
def exists_by_uid(self, uuids: Sequence[str]) -> List[bool]:
"""Check if the artifacts with the given uuid exist."""
return self.metadata_store.exists_by_uids(uuids)
def _get_file_path(self, uid: str) -> Path:
"""Get path to file for the given uuid."""
return self.root / f"{uid}"
def upsert(
self,
artifacts_with_data: Sequence[ArtifactWithData],
) -> None:
"""Add the given artifacts."""
# Write the documents to the file system
for artifact_with_data in artifacts_with_data:
# Use the document hash to write the contents to the file system
document = artifact_with_data["document"]
file_path = self.root / f"{document.hash_}"
with open(file_path, "w") as f:
f.write(serialize_document(document))
artifact = artifact_with_data["artifact"].copy()
# Storing at a file -- can clean up the artifact with data request
# later
artifact["location"] = str(file_path)
self.metadata_store.upsert(artifact)
self.metadata_store.save(self.metadata_path)
def list_document_ids(self, selector: Selector) -> Iterator[str]:
"""List the document ids matching the given selector."""
yield from self.metadata_store.select(selector)
def list_documents(self, selector: Selector) -> Iterator[Document]:
"""Can even use JQ here!"""
uuids = self.metadata_store.select(selector)
for uuid in uuids:
artifact = self.metadata_store.get_by_uids([uuid])[0]
path = artifact["location"]
with open(path, "r") as f:
page_content = deserialize_document(f.read()).page_content
yield Document(
uid=artifact["uid"],
parent_uids=artifact["parent_uids"],
metadata=artifact["metadata"],
tags=artifact["tags"],
page_content=page_content,
)

View File

@@ -1,8 +1,21 @@
"""Interface to access to place that stores documents."""
import abc
import dataclasses
from abc import ABC, abstractmethod
from typing import Dict, Union
from typing import (
Dict,
Sequence,
Iterator,
Optional,
List,
Literal,
TypedDict,
Tuple,
Union,
Any,
)
from langchain.docstore.document import Document
from langchain.schema import Document
class Docstore(ABC):
@@ -23,3 +36,126 @@ class AddableMixin(ABC):
@abstractmethod
def add(self, texts: Dict[str, Document]) -> None:
"""Add more documents."""
@dataclasses.dataclass(frozen=True)
class Selector:
"""Selection criteria represented in conjunctive normal form.
https://en.wikipedia.org/wiki/Conjunctive_normal_form
At the moment, the explicit representation is used for simplicity / prototyping.
It may be replaced by an ability of specifying selection with jq
if operating on JSON metadata or else something free form like SQL.
"""
parent_uids: Optional[Sequence[str]] = None
uids: Optional[Sequence[str]] = None
# Pick up all artifacts with the given tags.
# Maybe we should call this transformations.
tags: Optional[Sequence[str]] = None # <-- WE DONT WANT TO DO IT THIS WAY
transformation_path: Sequence[str] = None
"""Use to specify a transformation path according to which we select documents"""
# KNOWN WAYS THIS CAN FAIL:
# 1) If the process crashes while text splitting, creating only some of the artifacts
# ... new pipeline will not re-create the missing artifacts! (at least for now)
# it will use the ones that exist and assume that all of them have been created
# TODO: MAJOR MAJOR MAJOR MAJOR
# 1. FIX SEMANTICS WITH REGARDS TO ID, UUID. AND POTENTIALLY ARTIFACT_ID
# NEED TO REASON THROUGH USE CASES CAREFULLY TO REASON ABOUT WHATS MINIMAL SUFFICIENT
# 2. Using hashes throughout for implementation simplicity, but may want to switch
# to ids assigned by the a database? probability of collision is really small
class Artifact(TypedDict):
"""A representation of an artifact."""
uid: str # This has to be handled carefully -- we'll eventually get collisions
"""A unique identifier for the artifact."""
type_: Union[Literal["document"], Literal["embedding"], Literal["blob"]]
"""A unique identifier for the artifact."""
data_hash: str
"""A hash of the data of the artifact."""
metadata_hash: str
"""A hash of the metadata of the artifact."""
parent_uids: Tuple[str, ...]
"""A tuple of uids representing the parent artifacts."""
parent_hashes: Tuple[str, ...]
"""A tuple of hashes representing the parent artifacts at time of transformation."""
transformation_hash: str
"""A hash of the transformation that was applied to generate artifact.
This parameterizes the transformation logic together with any transformation
parameters.
"""
created_at: str # ISO-8601
"""The time the artifact was created."""
updated_at: str # ISO-8601
"""The time the artifact was last updated."""
metadata: Any
"""A dictionary representing the metadata of the artifact."""
tags: Tuple[str, ...]
"""A tuple of tags associated with the artifact.
Can use tags to add information about the transformation that was applied
to the given artifact.
THIS IS NOT A GOOD REPRESENTATION.
"""
"""The type of the artifact.""" # THIS MAY NEED TO BE CHANGED
data: Optional[bytes]
"""The data of the artifact when the artifact contains the data by value.
Will likely change somehow.
* For first pass contains embedding data.
* document data and blob data stored externally.
"""
location: Optional[str]
# Location specifies the location of the artifact when
# the artifact contains the data by reference (use for documents / blobs)
class ArtifactWithData(TypedDict):
"""A document with the transformation that generated it."""
artifact: Artifact
document: Document
class ArtifactStore(abc.ABC):
"""Use to keep track of artifacts generated while processing content.
The first version of the artifact store is used to work with Documents
rather than Blobs.
We will likely want to evolve this into Blobs, but faster to prototype
with Documents.
"""
def exists_by_uid(self, uids: Sequence[str]) -> List[bool]:
"""Check if the artifacts with the given uuid exist."""
raise NotImplementedError()
def exists_by_parent_uids(self, uids: Sequence[str]) -> List[bool]:
"""Check if the artifacts with the given id exist."""
raise NotImplementedError()
def upsert(
self,
artifacts_with_data: Sequence[ArtifactWithData],
) -> None:
"""Upsert the given artifacts."""
raise NotImplementedError()
def list_documents(self, selector: Selector) -> Iterator[Document]:
"""Yield documents matching the given selector."""
raise NotImplementedError()
def list_document_ids(self, selector: Selector) -> Iterator[str]:
"""Yield document ids matching the given selector."""
raise NotImplementedError()

View File

@@ -0,0 +1,133 @@
"""Module implements a pipeline.
There might be a better name for this.
"""
from __future__ import annotations
import datetime
from typing import Sequence, Optional, Iterator, Iterable, List
from langchain.docstore.base import ArtifactWithData, ArtifactStore, Selector
from langchain.document_loaders.base import BaseLoader
from langchain.schema import Document, BaseDocumentTransformer
from langchain.text_splitter import TextSplitter
def _convert_document_to_artifact_upsert(
document: Document, parent_documents: Sequence[Document], transformation_hash: str
) -> ArtifactWithData:
"""Convert the given documents to artifacts for upserting."""
dt = datetime.datetime.now().isoformat()
parent_uids = [str(parent_doc.uid) for parent_doc in parent_documents]
parent_hashes = [str(parent_doc.hash_) for parent_doc in parent_documents]
return {
"artifact": {
"uid": str(document.uid),
"parent_uids": parent_uids,
"metadata": document.metadata,
"parent_hashes": parent_hashes,
"tags": tuple(),
"type_": "document",
"data": None,
"location": None,
"data_hash": str(document.hash_),
"metadata_hash": "N/A",
"created_at": dt,
"updated_at": dt,
"transformation_hash": transformation_hash,
},
"document": document,
}
class Pipeline(BaseLoader): # MAY NOT WANT TO INHERIT FROM LOADER
def __init__(
self,
loader: BaseLoader,
*,
transformers: Optional[Sequence[BaseDocumentTransformer]] = None,
artifact_store: Optional[ArtifactStore] = None,
) -> None:
"""Initialize the document pipeline.
Args:
loader: The loader to use for loading the documents.
transformers: The transformers to use for transforming the documents.
artifact_store: The artifact store to use for storing the artifacts.
"""
self.loader = loader
self.transformers = transformers
self.artifact_store = artifact_store
def lazy_load(
self,
) -> Iterator[Document]:
"""Lazy load the documents."""
transformations = self.transformers or []
# Need syntax for determining whether this should be cached.
try:
doc_iterator = self.loader.lazy_load()
except NotImplementedError:
doc_iterator = self.loader.load()
for document in doc_iterator:
new_documents = [document]
for transformation in transformations:
# Batched for now here -- lots of optimization possible
# but not needed for now and is likely going to get complex
new_documents = list(
self._propagate_documents(new_documents, transformation)
)
yield from new_documents
def _propagate_documents(
self, documents: Sequence[Document], transformation: BaseDocumentTransformer
) -> Iterable[Document]:
"""Transform the given documents using the transformation with caching."""
docs_exist = self.artifact_store.exists_by_uid(
[document.uid for document in documents]
)
for document, exists in zip(documents, docs_exist):
if exists:
existing_docs = self.artifact_store.list_documents(
Selector(parent_uids=[document.uid])
)
materialized_docs = list(existing_docs)
if materialized_docs:
yield from materialized_docs
continue
transformed_docs = transformation.transform_documents([document])
# MAJOR: Hash should encapsulate transformation parameters
transformation_hash = transformation.__class__.__name__
artifacts_with_data = [
_convert_document_to_artifact_upsert(
transformed_doc, [document], transformation_hash
)
for transformed_doc in transformed_docs
]
self.artifact_store.upsert(artifacts_with_data)
yield from transformed_docs
def load(self) -> List[Document]:
"""Load the documents."""
return list(self.lazy_load())
def run(self) -> None: # BAD API NEED
"""Execute the pipeline, returning nothing."""
for _ in self.lazy_load():
pass
def load_and_split(
self, text_splitter: Optional[TextSplitter] = None
) -> List[Document]:
raise NotImplementedError("This method will never be implemented.")

View File

@@ -0,0 +1,48 @@
"""Module for serialization code.
This code will likely be replaced by Nuno's serialization method.
"""
import json
from json import JSONEncoder, JSONDecodeError
from uuid import UUID
from langchain.schema import Document
class UUIDEncoder(JSONEncoder):
"""Will either be replaced by Nuno's serialization method or something else.
Potentially there will be no serialization for a document object since
the document can be broken into 2 pieces:
* the content -> saved on disk or in database
* the metadata -> saved in metadata store
It may not make sense to keep the metadata together with the document
for the persistence.
"""
def default(self, obj):
if isinstance(obj, UUID):
return str(obj) # Convert UUID to string
return super().default(obj)
# PUBLIC API
def serialize_document(document: Document) -> str:
"""Serialize the given document to a string."""
try:
# Serialize only the content.
# Metadata always stored separately.
return json.dumps(document.page_content)
except JSONDecodeError:
raise ValueError(f"Could not serialize document with ID: {document.uid}")
def deserialize_document(serialized_document: str) -> Document:
"""Deserialize the given document from a string."""
return Document(
page_content=json.loads(serialized_document),
)

View File

@@ -0,0 +1,69 @@
"""Module contains doc for syncing from docstore to vectorstores."""
from __future__ import annotations
from itertools import islice
from typing import TypedDict, Sequence, Optional, TypeVar, Iterable, Iterator, List
from langchain.docstore.base import ArtifactStore, Selector
from langchain.vectorstores import VectorStore
class SyncResult(TypedDict):
"""Syncing result."""
first_n_errors: Sequence[str]
"""First n errors that occurred during syncing."""
num_added: Optional[int]
"""Number of added documents."""
num_updated: Optional[int]
"""Number of updated documents because they were not up to date."""
num_deleted: Optional[int]
"""Number of deleted documents."""
num_skipped: Optional[int]
"""Number of skipped documents because they were already up to date."""
T = TypeVar("T")
def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
"""Utility batching function."""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
return
yield chunk
# SYNC IMPLEMENTATION
def sync(
artifact_store: ArtifactStore,
vector_store: VectorStore,
selector: Selector,
*,
batch_size: int = 1000,
) -> SyncResult:
"""Sync the given artifact layer with the given vector store."""
document_uids = artifact_store.list_document_ids(selector)
all_uids = []
# IDs must fit into memory for this to work.
for uid_batch in _batch(batch_size, document_uids):
all_uids.extend(uid_batch)
document_batch = list(artifact_store.list_documents(Selector(uids=uid_batch)))
upsert_info = vector_store.upsert_by_id(
documents=document_batch, batch_size=batch_size
)
# Non-intuitive interface, but simple to implement
# (maybe we can have a better solution though)
num_deleted = vector_store.delete_non_matching_ids(all_uids)
return {
"first_n_errors": [],
"num_added": None,
"num_updated": None,
"num_skipped": None,
"num_deleted": None,
}

View File

@@ -1,6 +1,8 @@
"""Common schema objects."""
from __future__ import annotations
import hashlib
import uuid
from abc import ABC, abstractmethod
from typing import (
Any,
@@ -12,9 +14,11 @@ from typing import (
Sequence,
TypeVar,
Union,
Tuple,
)
from uuid import UUID, uuid5
from pydantic import BaseModel, Extra, Field, root_validator
from pydantic import BaseModel, Extra, Field, root_validator, ValidationError
def get_buffer_string(
@@ -266,8 +270,39 @@ class BaseChatMessageHistory(ABC):
class Document(BaseModel):
"""Interface for interacting with a document."""
uid: str # Assigned unique identifier
hash_: UUID # A hash of the content + metadata
# TODO(We likely want multiple hashes, one for content, one for metadata, etc)
# content_hash_: UUID # A hash of the content alone.
page_content: str
# Required field for provenance.
# Provenance ALWAYS refers to the original source of the document.
# No matter what transformations have been done on the context.
# provenance: Tuple[str, ...] = tuple() # TODO(not needed for now)
# User created metadata
metadata: dict = Field(default_factory=dict)
# Use to keep track of parent documents from which the document was generated
# We could keep this is a non sequence to get started for simplicity
# parent_uids: Tuple[str, ...] = tuple() # TODO(Move to metadata store)
@root_validator(pre=True)
def assign_id_if_not_provided(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Assign an ID if one is not provided."""
if "page_content" not in values:
raise ValidationError("Must provide page_content")
if "hash_" not in values:
# TODO: Hash should be updated to include all metadata fields.
# Document should become immutable likely otherwise it invalidates
# any logic done based on hash -- and that's the default uid used.
content_hash = hashlib.sha256(values["page_content"].encode()).hexdigest()
hash_ = str(uuid5(UUID(int=0), content_hash))
values["hash_"] = hash_
else:
hash_ = values["hash_"]
if "uid" not in values:
# Generate an ID based on the hash of the content
values["uid"] = str(hash_)
return values
class BaseRetriever(ABC):

View File

@@ -5,7 +5,7 @@ import asyncio
import warnings
from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Sequence
from pydantic import BaseModel, Field, root_validator
@@ -15,6 +15,17 @@ from langchain.schema import BaseRetriever
VST = TypeVar("VST", bound="VectorStore")
from typing import TypedDict
class UpsertResult(TypedDict):
# Number of documents updated
num_updated: Optional[int]
# Number of documents newly added
num_added: Optional[int]
# Documents can be skipped if hashes match
num_skipped: Optional[int]
class VectorStore(ABC):
"""Interface for vector stores."""
@@ -60,6 +71,21 @@ class VectorStore(ABC):
metadatas = [doc.metadata for doc in documents]
return self.add_texts(texts, metadatas, **kwargs)
def upsert_by_id(self, documents: Sequence[Document], **kwargs) -> UpsertResult:
"""Update or insert a document into the vectorstore."""
raise NotImplementedError()
# THIS MAY NEED TO BE CLEANED UP. ITS NOT SUPER PRETTY BUT IT IS EFFICIENT.
# THIS SHOULD PROBABL BE REPLACED TO DELETION BY A METADATA TAG
# OTHERWISE MEMORY MANAGEMENT IS AN ISSUE
def delete_non_matching_ids(self, ids: Iterable[str], **kwargs) -> int:
"""Delete all ids that are not in the given list, but are in the vector store"""
raise NotImplementedError
def delete_by_id(self, ids: Iterable[str], batch_size: int = 1, **kwargs):
"""Delete a document from the vectorstore."""
raise NotImplementedError
async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:

View File

@@ -3,15 +3,16 @@ from __future__ import annotations
import logging
import uuid
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, Sequence
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils import xor_args
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.base import VectorStore, UpsertResult
from langchain.vectorstores.utils import maximal_marginal_relevance
from typing import List, Iterable
if TYPE_CHECKING:
import chromadb
@@ -162,6 +163,29 @@ class Chroma(VectorStore):
)
return ids
def upsert_by_id(self, documents: Sequence[Document], **kwargs) -> UpsertResult:
"""Upsert documents by ID."""
upsert_result: UpsertResult = {
# Chroma upsert does not return this information
"num_added": None,
"num_updated": None,
"num_skipped": None,
}
info = [(doc.uid, doc.metadata, doc.page_content) for doc in documents]
uids, metadata, texts = zip(*info)
if self._embedding_function is not None:
embeddings = self._embedding_function.embed_documents(
[doc.page_content for doc in documents]
)
else:
embeddings = None
self._collection.upsert(
ids=uids, metadatas=metadata, embeddings=embeddings, documents=texts
)
return upsert_result
def similarity_search(
self,
query: str,

View File

@@ -0,0 +1,22 @@
from langchain.docstore.artifacts import serialize_document, deserialize_document
from langchain.schema import Document
def test_serialization() -> None:
"""Test serialization."""
initial_doc = Document(page_content="hello")
serialized_doc = serialize_document(initial_doc)
assert isinstance(serialized_doc, str)
deserialized_doc = deserialize_document(serialized_doc)
assert isinstance(deserialized_doc, Document)
assert deserialized_doc == initial_doc
def test_serialization_with_metadata() -> None:
"""Test serialization with metadata."""
initial_doc = Document(page_content="hello", metadata={"source": "hello"})
serialized_doc = serialize_document(initial_doc)
assert isinstance(serialized_doc, str)
deserialized_doc = deserialize_document(serialized_doc)
assert isinstance(deserialized_doc, Document)
assert deserialized_doc == initial_doc

View File

View File

@@ -0,0 +1,19 @@
"""Test document schema."""
from langchain.schema import Document
def test_document_hashes() -> None:
"""Test document hashing."""
d1 = Document(page_content="hello")
expected_hash = "0945717e-8d14-5f14-957f-0fb0ea1d56af"
assert str(d1.hash_) == expected_hash
d2 = Document(id="hello", page_content="hello")
assert str(d2.hash_) == expected_hash
d3 = Document(id="hello", page_content="hello2")
assert str(d3.hash_) != expected_hash
# Still fails. Need to update hash to hash metadata as well.
d4 = Document(id="hello", page_content="hello", metadata={"source": "hello"})
assert str(d4.hash_) != expected_hash