mirror of
https://github.com/hwchase17/langchain.git
synced 2025-10-11 15:23:57 +00:00
## Description Created a helper method to make vector search indexes via client-side pymongo. **Recent Update** -- Removed error suppressing/overwriting layer in favor of letting the original exception provide information. ## ToDo's - [x] Make _wait_untils for integration test delete index functionalities. - [x] Add documentation for its use. Highlight it's experimental - [x] Post Integration Test Results in a screenshot - [x] Get review from MongoDB internal team (@shaneharvey, @blink1073 , @NoahStapp , @caseyclements) - [x] **Add tests and docs**: If you're adding a new integration, please include 1. Added new integration tests. Not eligible for unit testing since the operation is Atlas Cloud specific. 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory.  - [x] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/
536 lines
19 KiB
Python
536 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from importlib.metadata import version
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Generator,
|
|
Iterable,
|
|
List,
|
|
Optional,
|
|
Tuple,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
import numpy as np
|
|
from bson import ObjectId, json_util
|
|
from langchain_core.documents import Document
|
|
from langchain_core.embeddings import Embeddings
|
|
from langchain_core.runnables.config import run_in_executor
|
|
from langchain_core.vectorstores import VectorStore
|
|
from pymongo import MongoClient
|
|
from pymongo.collection import Collection
|
|
from pymongo.driver_info import DriverInfo
|
|
from pymongo.errors import CollectionInvalid
|
|
|
|
from langchain_mongodb.index import (
|
|
create_vector_search_index,
|
|
update_vector_search_index,
|
|
)
|
|
from langchain_mongodb.utils import maximal_marginal_relevance
|
|
|
|
MongoDBDocumentType = TypeVar("MongoDBDocumentType", bound=Dict[str, Any])
|
|
VST = TypeVar("VST", bound=VectorStore)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_INSERT_BATCH_SIZE = 100_000
|
|
|
|
|
|
class MongoDBAtlasVectorSearch(VectorStore):
|
|
"""`MongoDB Atlas Vector Search` vector store.
|
|
|
|
To use, you should have both:
|
|
- the ``pymongo`` python package installed
|
|
- a connection string associated with a MongoDB Atlas Cluster having deployed an
|
|
Atlas Search index
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
|
|
from langchain_mongodb import MongoDBAtlasVectorSearch
|
|
from langchain_openai import OpenAIEmbeddings
|
|
from pymongo import MongoClient
|
|
|
|
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
|
|
collection = mongo_client["<db_name>"]["<collection_name>"]
|
|
embeddings = OpenAIEmbeddings()
|
|
vectorstore = MongoDBAtlasVectorSearch(collection, embeddings)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
collection: Collection[MongoDBDocumentType],
|
|
embedding: Embeddings,
|
|
*,
|
|
index_name: str = "default",
|
|
text_key: str = "text",
|
|
embedding_key: str = "embedding",
|
|
relevance_score_fn: str = "cosine",
|
|
):
|
|
"""
|
|
Args:
|
|
collection: MongoDB collection to add the texts to.
|
|
embedding: Text embedding model to use.
|
|
text_key: MongoDB field that will contain the text for each
|
|
document.
|
|
defaults to 'text'
|
|
embedding_key: MongoDB field that will contain the embedding for
|
|
each document.
|
|
defaults to 'embedding'
|
|
index_name: Name of the Atlas Search index.
|
|
defaults to 'default'
|
|
relevance_score_fn: The similarity score used for the index.
|
|
defaults to 'cosine'
|
|
Currently supported: 'euclidean', 'cosine', and 'dotProduct'.
|
|
"""
|
|
self._collection = collection
|
|
self._embedding = embedding
|
|
self._index_name = index_name
|
|
self._text_key = text_key
|
|
self._embedding_key = embedding_key
|
|
self._relevance_score_fn = relevance_score_fn
|
|
|
|
@property
|
|
def embeddings(self) -> Embeddings:
|
|
return self._embedding
|
|
|
|
def _select_relevance_score_fn(self) -> Callable[[float], float]:
|
|
scoring: dict[str, Callable] = {
|
|
"euclidean": self._euclidean_relevance_score_fn,
|
|
"dotProduct": self._max_inner_product_relevance_score_fn,
|
|
"cosine": self._cosine_relevance_score_fn,
|
|
}
|
|
if self._relevance_score_fn in scoring:
|
|
return scoring[self._relevance_score_fn]
|
|
else:
|
|
raise NotImplementedError(
|
|
f"No relevance score function for ${self._relevance_score_fn}"
|
|
)
|
|
|
|
@classmethod
|
|
def from_connection_string(
|
|
cls,
|
|
connection_string: str,
|
|
namespace: str,
|
|
embedding: Embeddings,
|
|
**kwargs: Any,
|
|
) -> MongoDBAtlasVectorSearch:
|
|
"""Construct a `MongoDB Atlas Vector Search` vector store
|
|
from a MongoDB connection URI.
|
|
|
|
Args:
|
|
connection_string: A valid MongoDB connection URI.
|
|
namespace: A valid MongoDB namespace (database and collection).
|
|
embedding: The text embedding model to use for the vector store.
|
|
|
|
Returns:
|
|
A new MongoDBAtlasVectorSearch instance.
|
|
|
|
"""
|
|
client: MongoClient = MongoClient(
|
|
connection_string,
|
|
driver=DriverInfo(name="Langchain", version=version("langchain")),
|
|
)
|
|
db_name, collection_name = namespace.split(".")
|
|
collection = client[db_name][collection_name]
|
|
return cls(collection, embedding, **kwargs)
|
|
|
|
def add_texts(
|
|
self,
|
|
texts: Iterable[str],
|
|
metadatas: Optional[List[Dict[str, Any]]] = None,
|
|
**kwargs: Any,
|
|
) -> List:
|
|
"""Run more texts through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
texts: Iterable of strings to add to the vectorstore.
|
|
metadatas: Optional list of metadatas associated with the texts.
|
|
|
|
Returns:
|
|
List of ids from adding the texts into the vectorstore.
|
|
"""
|
|
batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE)
|
|
_metadatas: Union[List, Generator] = metadatas or ({} for _ in texts)
|
|
texts_batch = texts
|
|
metadatas_batch = _metadatas
|
|
result_ids = []
|
|
if batch_size:
|
|
texts_batch = []
|
|
metadatas_batch = []
|
|
size = 0
|
|
for i, (text, metadata) in enumerate(zip(texts, _metadatas)):
|
|
size += len(text) + len(metadata)
|
|
texts_batch.append(text)
|
|
metadatas_batch.append(metadata)
|
|
if (i + 1) % batch_size == 0 or size >= 47_000_000:
|
|
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
|
|
texts_batch = []
|
|
metadatas_batch = []
|
|
size = 0
|
|
if texts_batch:
|
|
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) # type: ignore
|
|
return result_ids
|
|
|
|
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List:
|
|
if not texts:
|
|
return []
|
|
# Embed and create the documents
|
|
embeddings = self._embedding.embed_documents(texts)
|
|
to_insert = [
|
|
{self._text_key: t, self._embedding_key: embedding, **m}
|
|
for t, m, embedding in zip(texts, metadatas, embeddings)
|
|
]
|
|
# insert the documents in MongoDB Atlas
|
|
insert_result = self._collection.insert_many(to_insert) # type: ignore
|
|
return insert_result.inserted_ids
|
|
|
|
def _similarity_search_with_score(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
pre_filter: Optional[Dict] = None,
|
|
post_filter_pipeline: Optional[List[Dict]] = None,
|
|
include_embedding: bool = False,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
params = {
|
|
"queryVector": embedding,
|
|
"path": self._embedding_key,
|
|
"numCandidates": k * 10,
|
|
"limit": k,
|
|
"index": self._index_name,
|
|
}
|
|
if pre_filter:
|
|
params["filter"] = pre_filter
|
|
query = {"$vectorSearch": params}
|
|
|
|
pipeline = [
|
|
query,
|
|
{"$set": {"score": {"$meta": "vectorSearchScore"}}},
|
|
]
|
|
|
|
# Exclude the embedding key from the return payload
|
|
if not include_embedding:
|
|
pipeline.append({"$project": {self._embedding_key: 0}})
|
|
|
|
if post_filter_pipeline is not None:
|
|
pipeline.extend(post_filter_pipeline)
|
|
cursor = self._collection.aggregate(pipeline) # type: ignore[arg-type]
|
|
docs = []
|
|
|
|
def _make_serializable(obj: Dict[str, Any]) -> None:
|
|
for k, v in obj.items():
|
|
if isinstance(v, dict):
|
|
_make_serializable(v)
|
|
elif isinstance(v, list) and v and isinstance(v[0], ObjectId):
|
|
obj[k] = [json_util.default(item) for item in v]
|
|
elif isinstance(v, ObjectId):
|
|
obj[k] = json_util.default(v)
|
|
|
|
for res in cursor:
|
|
text = res.pop(self._text_key)
|
|
score = res.pop("score")
|
|
# Make every ObjectId found JSON-Serializable
|
|
# following format used in bson.json_util.loads
|
|
# e.g. loads('{"_id": {"$oid": "664..."}}') == {'_id': ObjectId('664..')} # noqa: E501
|
|
_make_serializable(res)
|
|
docs.append((Document(page_content=text, metadata=res), score))
|
|
return docs
|
|
|
|
def similarity_search_with_score(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
pre_filter: Optional[Dict] = None,
|
|
post_filter_pipeline: Optional[List[Dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return MongoDB documents most similar to the given query and their scores.
|
|
|
|
Uses the vectorSearch operator available in MongoDB Atlas Search.
|
|
For more: https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/
|
|
|
|
Args:
|
|
query: Text to look up documents similar to.
|
|
k: (Optional) number of documents to return. Defaults to 4.
|
|
pre_filter: (Optional) dictionary of argument(s) to prefilter document
|
|
fields on.
|
|
post_filter_pipeline: (Optional) Pipeline of MongoDB aggregation stages
|
|
following the vectorSearch stage.
|
|
|
|
Returns:
|
|
List of documents most similar to the query and their scores.
|
|
"""
|
|
embedding = self._embedding.embed_query(query)
|
|
docs = self._similarity_search_with_score(
|
|
embedding,
|
|
k=k,
|
|
pre_filter=pre_filter,
|
|
post_filter_pipeline=post_filter_pipeline,
|
|
**kwargs,
|
|
)
|
|
return docs
|
|
|
|
def similarity_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
pre_filter: Optional[Dict] = None,
|
|
post_filter_pipeline: Optional[List[Dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return MongoDB documents most similar to the given query.
|
|
|
|
Uses the vectorSearch operator available in MongoDB Atlas Search.
|
|
For more: https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/
|
|
|
|
Args:
|
|
query: Text to look up documents similar to.
|
|
k: (Optional) number of documents to return. Defaults to 4.
|
|
pre_filter: (Optional) dictionary of argument(s) to prefilter document
|
|
fields on.
|
|
post_filter_pipeline: (Optional) Pipeline of MongoDB aggregation stages
|
|
following the vectorSearch stage.
|
|
|
|
Returns:
|
|
List of documents most similar to the query and their scores.
|
|
"""
|
|
additional = kwargs.get("additional")
|
|
docs_and_scores = self.similarity_search_with_score(
|
|
query,
|
|
k=k,
|
|
pre_filter=pre_filter,
|
|
post_filter_pipeline=post_filter_pipeline,
|
|
**kwargs,
|
|
)
|
|
|
|
if additional and "similarity_score" in additional:
|
|
for doc, score in docs_and_scores:
|
|
doc.metadata["score"] = score
|
|
return [doc for doc, _ in docs_and_scores]
|
|
|
|
def max_marginal_relevance_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
pre_filter: Optional[Dict] = None,
|
|
post_filter_pipeline: Optional[List[Dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return documents selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
query: Text to look up documents similar to.
|
|
k: (Optional) number of documents to return. Defaults to 4.
|
|
fetch_k: (Optional) number of documents to fetch before passing to MMR
|
|
algorithm. Defaults to 20.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
pre_filter: (Optional) dictionary of argument(s) to prefilter on document
|
|
fields.
|
|
post_filter_pipeline: (Optional) pipeline of MongoDB aggregation stages
|
|
following the vectorSearch stage.
|
|
Returns:
|
|
List of documents selected by maximal marginal relevance.
|
|
"""
|
|
query_embedding = self._embedding.embed_query(query)
|
|
return self.max_marginal_relevance_search_by_vector(
|
|
embedding=query_embedding,
|
|
k=k,
|
|
fetch_k=fetch_k,
|
|
lambda_mult=lambda_mult,
|
|
pre_filter=pre_filter,
|
|
post_filter_pipeline=post_filter_pipeline,
|
|
**kwargs,
|
|
)
|
|
|
|
@classmethod
|
|
def from_texts(
|
|
cls,
|
|
texts: List[str],
|
|
embedding: Embeddings,
|
|
metadatas: Optional[List[Dict]] = None,
|
|
collection: Optional[Collection[MongoDBDocumentType]] = None,
|
|
**kwargs: Any,
|
|
) -> MongoDBAtlasVectorSearch:
|
|
"""Construct a `MongoDB Atlas Vector Search` vector store from raw documents.
|
|
|
|
This is a user-friendly interface that:
|
|
1. Embeds documents.
|
|
2. Adds the documents to a provided MongoDB Atlas Vector Search index
|
|
(Lucene)
|
|
|
|
This is intended to be a quick way to get started.
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
from pymongo import MongoClient
|
|
|
|
from langchain_mongodb import MongoDBAtlasVectorSearch
|
|
from langchain_openai import OpenAIEmbeddings
|
|
|
|
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
|
|
collection = mongo_client["<db_name>"]["<collection_name>"]
|
|
embeddings = OpenAIEmbeddings()
|
|
vectorstore = MongoDBAtlasVectorSearch.from_texts(
|
|
texts,
|
|
embeddings,
|
|
metadatas=metadatas,
|
|
collection=collection
|
|
)
|
|
"""
|
|
if collection is None:
|
|
raise ValueError("Must provide 'collection' named parameter.")
|
|
vectorstore = cls(collection, embedding, **kwargs)
|
|
vectorstore.add_texts(texts, metadatas=metadatas)
|
|
return vectorstore
|
|
|
|
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
|
"""Delete by ObjectId or other criteria.
|
|
|
|
Args:
|
|
ids: List of ids to delete.
|
|
**kwargs: Other keyword arguments that subclasses might use.
|
|
|
|
Returns:
|
|
Optional[bool]: True if deletion is successful,
|
|
False otherwise, None if not implemented.
|
|
"""
|
|
search_params: dict[str, Any] = {}
|
|
if ids:
|
|
search_params[self._text_key]["$in"] = ids
|
|
|
|
return self._collection.delete_many({**search_params, **kwargs}).acknowledged
|
|
|
|
async def adelete(
|
|
self, ids: Optional[List[str]] = None, **kwargs: Any
|
|
) -> Optional[bool]:
|
|
"""Delete by vector ID or other criteria.
|
|
|
|
Args:
|
|
ids: List of ids to delete.
|
|
**kwargs: Other keyword arguments that subclasses might use.
|
|
|
|
Returns:
|
|
Optional[bool]: True if deletion is successful,
|
|
False otherwise, None if not implemented.
|
|
"""
|
|
return await run_in_executor(None, self.delete, ids=ids, **kwargs)
|
|
|
|
def max_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
pre_filter: Optional[Dict] = None,
|
|
post_filter_pipeline: Optional[List[Dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[Document]: # type: ignore
|
|
"""Return docs selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
embedding: Embedding to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
pre_filter: (Optional) dictionary of argument(s) to prefilter on document
|
|
fields.
|
|
post_filter_pipeline: (Optional) pipeline of MongoDB aggregation stages
|
|
following the vectorSearch stage.
|
|
Returns:
|
|
List of Documents selected by maximal marginal relevance.
|
|
"""
|
|
docs = self._similarity_search_with_score(
|
|
embedding,
|
|
k=fetch_k,
|
|
pre_filter=pre_filter,
|
|
post_filter_pipeline=post_filter_pipeline,
|
|
include_embedding=kwargs.pop("include_embedding", True),
|
|
**kwargs,
|
|
)
|
|
mmr_doc_indexes = maximal_marginal_relevance(
|
|
np.array(embedding),
|
|
[doc.metadata[self._embedding_key] for doc, _ in docs],
|
|
k=k,
|
|
lambda_mult=lambda_mult,
|
|
)
|
|
mmr_docs = [docs[i][0] for i in mmr_doc_indexes]
|
|
return mmr_docs
|
|
|
|
async def amax_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance."""
|
|
return await run_in_executor(
|
|
None,
|
|
self.max_marginal_relevance_search_by_vector,
|
|
embedding,
|
|
k=k,
|
|
fetch_k=fetch_k,
|
|
lambda_mult=lambda_mult,
|
|
**kwargs,
|
|
)
|
|
|
|
def create_vector_search_index(
|
|
self,
|
|
dimensions: int,
|
|
filters: Optional[List[Dict[str, str]]] = None,
|
|
update: bool = False,
|
|
) -> None:
|
|
"""Creates a MongoDB Atlas vectorSearch index for the VectorStore
|
|
|
|
Note**: This method may fail as it requires a MongoDB Atlas with
|
|
these pre-requisites:
|
|
- M10 cluster or higher
|
|
- https://www.mongodb.com/docs/atlas/atlas-vector-search/create-index/#prerequisites
|
|
|
|
Args:
|
|
dimensions (int): Number of dimensions in embedding
|
|
filters (Optional[List[Dict[str, str]]], optional): additional filters
|
|
for index definition.
|
|
Defaults to None.
|
|
update (bool, optional): Updates existing vectorSearch index.
|
|
Defaults to False.
|
|
"""
|
|
try:
|
|
self._collection.database.create_collection(self._collection.name)
|
|
except CollectionInvalid:
|
|
pass
|
|
|
|
index_operation = (
|
|
update_vector_search_index if update else create_vector_search_index
|
|
)
|
|
|
|
index_operation(
|
|
collection=self._collection,
|
|
index_name=self._index_name,
|
|
dimensions=dimensions,
|
|
path=self._embedding_key,
|
|
similarity=self._relevance_score_fn,
|
|
filters=filters or [],
|
|
)
|