langchain/libs/partners/mongodb/langchain_mongodb/index.py
Casey Clements 6e9a8b188f
mongodb: Add Hybrid and Full-Text Search Retrievers, release 0.2.0 (#25057)
## Description

This pull-request extends the existing vector search strategies of
MongoDBAtlasVectorSearch to include Hybrid (Reciprocal Rank Fusion) and
Full-text via new Retrievers.

There is a small breaking change in the form of the `prefilter` kwarg to
search. For this, and because we have now added a great deal of
features, including programmatic Index creation/deletion since 0.1.0, we
plan to bump the version to 0.2.0.

### Checklist
* Unit tests have been extended
* formatting has been applied
* One mypy error remains which will either go away in CI or be
simplified.

---------

Signed-off-by: Casey Clements <casey.clements@mongodb.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
2024-08-07 20:10:29 +00:00

271 lines
8.9 KiB
Python

"""Search Index Commands"""
import logging
from time import monotonic, sleep
from typing import Any, Callable, Dict, List, Optional
from pymongo.collection import Collection
from pymongo.errors import OperationFailure
from pymongo.operations import SearchIndexModel
logger = logging.getLogger(__file__)
def _search_index_error_message() -> str:
return (
"Search index operations are not currently available on shared clusters, "
"such as MO. They require dedicated clusters >= M10. "
"You may still perform vector search. "
"You simply must set up indexes manually. Follow the instructions here: "
"https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/"
)
def _vector_search_index_definition(
dimensions: int,
path: str,
similarity: str,
filters: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
# https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/
fields = [
{
"numDimensions": dimensions,
"path": path,
"similarity": similarity,
"type": "vector",
},
]
if filters:
for field in filters:
fields.append({"type": "filter", "path": field})
definition = {"fields": fields}
definition.update(kwargs)
return definition
def create_vector_search_index(
collection: Collection,
index_name: str,
dimensions: int,
path: str,
similarity: str,
filters: Optional[List[str]] = None,
*,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Experimental Utility function to create a vector search index
Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
dimensions (int): Number of dimensions in embedding
path (str): field with vector embedding
similarity (str): The similarity score used for the index
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info("Creating Search Index %s on %s", index_name, collection.name)
try:
result = collection.create_search_index(
SearchIndexModel(
definition=_vector_search_index_definition(
dimensions=dimensions,
path=path,
similarity=similarity,
filters=filters,
**kwargs,
),
name=index_name,
type="vectorSearch",
)
)
except OperationFailure as e:
raise OperationFailure(_search_index_error_message()) from e
if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"{index_name=} did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info(result)
def drop_vector_search_index(
collection: Collection,
index_name: str,
*,
wait_until_complete: Optional[float] = None,
) -> None:
"""Drop a created vector search index
Args:
collection (Collection): MongoDB Collection with index to be dropped
index_name (str): Name of the MongoDB index
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
"""
logger.info(
"Dropping Search Index %s from Collection: %s", index_name, collection.name
)
try:
collection.drop_search_index(index_name)
except OperationFailure as e:
if "CommandNotSupported" in str(e):
raise OperationFailure(_search_index_error_message()) from e
# else this most likely means an ongoing drop request was made so skip
if wait_until_complete:
_wait_for_predicate(
predicate=lambda: len(list(collection.list_search_indexes())) == 0,
err=f"Index {index_name} did not drop in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info("Vector Search index %s.%s dropped", collection.name, index_name)
def update_vector_search_index(
collection: Collection,
index_name: str,
dimensions: int,
path: str,
similarity: str,
filters: Optional[List[str]] = None,
*,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Update a search index.
Replace the existing index definition with the provided definition.
Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
dimensions (int): Number of dimensions in embedding
path (str): field with vector embedding
similarity (str): The similarity score used for the index.
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info(
"Updating Search Index %s from Collection: %s", index_name, collection.name
)
try:
collection.update_search_index(
name=index_name,
definition=_vector_search_index_definition(
dimensions=dimensions,
path=path,
similarity=similarity,
filters=filters,
**kwargs,
),
)
except OperationFailure as e:
raise OperationFailure(_search_index_error_message()) from e
if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"Index {index_name} update did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info("Update succeeded")
def _is_index_ready(collection: Collection, index_name: str) -> bool:
"""Check for the index name in the list of available search indexes to see if the
specified index is of status READY
Args:
collection (Collection): MongoDB Collection to for the search indexes
index_name (str): Vector Search Index name
Returns:
bool : True if the index is present and READY false otherwise
"""
try:
search_indexes = collection.list_search_indexes(index_name)
except OperationFailure as e:
raise OperationFailure(_search_index_error_message()) from e
for index in search_indexes:
if index["type"] == "vectorSearch" and index["status"] == "READY":
return True
return False
def _wait_for_predicate(
predicate: Callable, err: str, timeout: float = 120, interval: float = 0.5
) -> None:
"""Generic to block until the predicate returns true
Args:
predicate (Callable[, bool]): A function that returns a boolean value
err (str): Error message to raise if nothing occurs
timeout (float, optional): Wait time for predicate. Defaults to TIMEOUT.
interval (float, optional): Interval to check predicate. Defaults to DELAY.
Raises:
TimeoutError: _description_
"""
start = monotonic()
while not predicate():
if monotonic() - start > timeout:
raise TimeoutError(err)
sleep(interval)
def create_fulltext_search_index(
collection: Collection,
index_name: str,
field: str,
*,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Experimental Utility function to create an Atlas Search index
Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
field (str): Field to index
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info("Creating Search Index %s on %s", index_name, collection.name)
definition = {
"mappings": {"dynamic": False, "fields": {field: [{"type": "string"}]}}
}
try:
result = collection.create_search_index(
SearchIndexModel(
definition=definition,
name=index_name,
type="search",
**kwargs,
)
)
except OperationFailure as e:
raise OperationFailure(_search_index_error_message()) from e
if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"{index_name=} did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info(result)