mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-21 06:14:37 +00:00
## Description This pull-request extends the existing vector search strategies of MongoDBAtlasVectorSearch to include Hybrid (Reciprocal Rank Fusion) and Full-text via new Retrievers. There is a small breaking change in the form of the `prefilter` kwarg to search. For this, and because we have now added a great deal of features, including programmatic Index creation/deletion since 0.1.0, we plan to bump the version to 0.2.0. ### Checklist * Unit tests have been extended * formatting has been applied * One mypy error remains which will either go away in CI or be simplified. --------- Signed-off-by: Casey Clements <casey.clements@mongodb.com> Co-authored-by: Erick Friis <erick@langchain.dev>
271 lines
8.9 KiB
Python
271 lines
8.9 KiB
Python
"""Search Index Commands"""
|
|
|
|
import logging
|
|
from time import monotonic, sleep
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
from pymongo.collection import Collection
|
|
from pymongo.errors import OperationFailure
|
|
from pymongo.operations import SearchIndexModel
|
|
|
|
logger = logging.getLogger(__file__)
|
|
|
|
|
|
def _search_index_error_message() -> str:
|
|
return (
|
|
"Search index operations are not currently available on shared clusters, "
|
|
"such as MO. They require dedicated clusters >= M10. "
|
|
"You may still perform vector search. "
|
|
"You simply must set up indexes manually. Follow the instructions here: "
|
|
"https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/"
|
|
)
|
|
|
|
|
|
def _vector_search_index_definition(
|
|
dimensions: int,
|
|
path: str,
|
|
similarity: str,
|
|
filters: Optional[List[str]] = None,
|
|
**kwargs: Any,
|
|
) -> Dict[str, Any]:
|
|
# https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/
|
|
fields = [
|
|
{
|
|
"numDimensions": dimensions,
|
|
"path": path,
|
|
"similarity": similarity,
|
|
"type": "vector",
|
|
},
|
|
]
|
|
if filters:
|
|
for field in filters:
|
|
fields.append({"type": "filter", "path": field})
|
|
definition = {"fields": fields}
|
|
definition.update(kwargs)
|
|
return definition
|
|
|
|
|
|
def create_vector_search_index(
|
|
collection: Collection,
|
|
index_name: str,
|
|
dimensions: int,
|
|
path: str,
|
|
similarity: str,
|
|
filters: Optional[List[str]] = None,
|
|
*,
|
|
wait_until_complete: Optional[float] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Experimental Utility function to create a vector search index
|
|
|
|
Args:
|
|
collection (Collection): MongoDB Collection
|
|
index_name (str): Name of Index
|
|
dimensions (int): Number of dimensions in embedding
|
|
path (str): field with vector embedding
|
|
similarity (str): The similarity score used for the index
|
|
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
|
|
wait_until_complete (Optional[float]): If provided, number of seconds to wait
|
|
until search index is ready.
|
|
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
|
|
"""
|
|
logger.info("Creating Search Index %s on %s", index_name, collection.name)
|
|
|
|
try:
|
|
result = collection.create_search_index(
|
|
SearchIndexModel(
|
|
definition=_vector_search_index_definition(
|
|
dimensions=dimensions,
|
|
path=path,
|
|
similarity=similarity,
|
|
filters=filters,
|
|
**kwargs,
|
|
),
|
|
name=index_name,
|
|
type="vectorSearch",
|
|
)
|
|
)
|
|
except OperationFailure as e:
|
|
raise OperationFailure(_search_index_error_message()) from e
|
|
|
|
if wait_until_complete:
|
|
_wait_for_predicate(
|
|
predicate=lambda: _is_index_ready(collection, index_name),
|
|
err=f"{index_name=} did not complete in {wait_until_complete}!",
|
|
timeout=wait_until_complete,
|
|
)
|
|
logger.info(result)
|
|
|
|
|
|
def drop_vector_search_index(
|
|
collection: Collection,
|
|
index_name: str,
|
|
*,
|
|
wait_until_complete: Optional[float] = None,
|
|
) -> None:
|
|
"""Drop a created vector search index
|
|
|
|
Args:
|
|
collection (Collection): MongoDB Collection with index to be dropped
|
|
index_name (str): Name of the MongoDB index
|
|
wait_until_complete (Optional[float]): If provided, number of seconds to wait
|
|
until search index is ready.
|
|
"""
|
|
logger.info(
|
|
"Dropping Search Index %s from Collection: %s", index_name, collection.name
|
|
)
|
|
try:
|
|
collection.drop_search_index(index_name)
|
|
except OperationFailure as e:
|
|
if "CommandNotSupported" in str(e):
|
|
raise OperationFailure(_search_index_error_message()) from e
|
|
# else this most likely means an ongoing drop request was made so skip
|
|
if wait_until_complete:
|
|
_wait_for_predicate(
|
|
predicate=lambda: len(list(collection.list_search_indexes())) == 0,
|
|
err=f"Index {index_name} did not drop in {wait_until_complete}!",
|
|
timeout=wait_until_complete,
|
|
)
|
|
logger.info("Vector Search index %s.%s dropped", collection.name, index_name)
|
|
|
|
|
|
def update_vector_search_index(
|
|
collection: Collection,
|
|
index_name: str,
|
|
dimensions: int,
|
|
path: str,
|
|
similarity: str,
|
|
filters: Optional[List[str]] = None,
|
|
*,
|
|
wait_until_complete: Optional[float] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Update a search index.
|
|
|
|
Replace the existing index definition with the provided definition.
|
|
|
|
Args:
|
|
collection (Collection): MongoDB Collection
|
|
index_name (str): Name of Index
|
|
dimensions (int): Number of dimensions in embedding
|
|
path (str): field with vector embedding
|
|
similarity (str): The similarity score used for the index.
|
|
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
|
|
wait_until_complete (Optional[float]): If provided, number of seconds to wait
|
|
until search index is ready.
|
|
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
|
|
"""
|
|
|
|
logger.info(
|
|
"Updating Search Index %s from Collection: %s", index_name, collection.name
|
|
)
|
|
try:
|
|
collection.update_search_index(
|
|
name=index_name,
|
|
definition=_vector_search_index_definition(
|
|
dimensions=dimensions,
|
|
path=path,
|
|
similarity=similarity,
|
|
filters=filters,
|
|
**kwargs,
|
|
),
|
|
)
|
|
except OperationFailure as e:
|
|
raise OperationFailure(_search_index_error_message()) from e
|
|
|
|
if wait_until_complete:
|
|
_wait_for_predicate(
|
|
predicate=lambda: _is_index_ready(collection, index_name),
|
|
err=f"Index {index_name} update did not complete in {wait_until_complete}!",
|
|
timeout=wait_until_complete,
|
|
)
|
|
logger.info("Update succeeded")
|
|
|
|
|
|
def _is_index_ready(collection: Collection, index_name: str) -> bool:
|
|
"""Check for the index name in the list of available search indexes to see if the
|
|
specified index is of status READY
|
|
|
|
Args:
|
|
collection (Collection): MongoDB Collection to for the search indexes
|
|
index_name (str): Vector Search Index name
|
|
|
|
Returns:
|
|
bool : True if the index is present and READY false otherwise
|
|
"""
|
|
try:
|
|
search_indexes = collection.list_search_indexes(index_name)
|
|
except OperationFailure as e:
|
|
raise OperationFailure(_search_index_error_message()) from e
|
|
|
|
for index in search_indexes:
|
|
if index["type"] == "vectorSearch" and index["status"] == "READY":
|
|
return True
|
|
return False
|
|
|
|
|
|
def _wait_for_predicate(
|
|
predicate: Callable, err: str, timeout: float = 120, interval: float = 0.5
|
|
) -> None:
|
|
"""Generic to block until the predicate returns true
|
|
|
|
Args:
|
|
predicate (Callable[, bool]): A function that returns a boolean value
|
|
err (str): Error message to raise if nothing occurs
|
|
timeout (float, optional): Wait time for predicate. Defaults to TIMEOUT.
|
|
interval (float, optional): Interval to check predicate. Defaults to DELAY.
|
|
|
|
Raises:
|
|
TimeoutError: _description_
|
|
"""
|
|
start = monotonic()
|
|
while not predicate():
|
|
if monotonic() - start > timeout:
|
|
raise TimeoutError(err)
|
|
sleep(interval)
|
|
|
|
|
|
def create_fulltext_search_index(
|
|
collection: Collection,
|
|
index_name: str,
|
|
field: str,
|
|
*,
|
|
wait_until_complete: Optional[float] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Experimental Utility function to create an Atlas Search index
|
|
|
|
Args:
|
|
collection (Collection): MongoDB Collection
|
|
index_name (str): Name of Index
|
|
field (str): Field to index
|
|
wait_until_complete (Optional[float]): If provided, number of seconds to wait
|
|
until search index is ready
|
|
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
|
|
"""
|
|
logger.info("Creating Search Index %s on %s", index_name, collection.name)
|
|
|
|
definition = {
|
|
"mappings": {"dynamic": False, "fields": {field: [{"type": "string"}]}}
|
|
}
|
|
|
|
try:
|
|
result = collection.create_search_index(
|
|
SearchIndexModel(
|
|
definition=definition,
|
|
name=index_name,
|
|
type="search",
|
|
**kwargs,
|
|
)
|
|
)
|
|
except OperationFailure as e:
|
|
raise OperationFailure(_search_index_error_message()) from e
|
|
|
|
if wait_until_complete:
|
|
_wait_for_predicate(
|
|
predicate=lambda: _is_index_ready(collection, index_name),
|
|
err=f"{index_name=} did not complete in {wait_until_complete}!",
|
|
timeout=wait_until_complete,
|
|
)
|
|
logger.info(result)
|