mirror of
				https://github.com/hwchase17/langchain.git
				synced 2025-10-24 20:20:50 +00:00 
			
		
		
		
	## Description This pull-request extends the existing vector search strategies of MongoDBAtlasVectorSearch to include Hybrid (Reciprocal Rank Fusion) and Full-text via new Retrievers. There is a small breaking change in the form of the `prefilter` kwarg to search. For this, and because we have now added a great deal of features, including programmatic Index creation/deletion since 0.1.0, we plan to bump the version to 0.2.0. ### Checklist * Unit tests have been extended * formatting has been applied * One mypy error remains which will either go away in CI or be simplified. --------- Signed-off-by: Casey Clements <casey.clements@mongodb.com> Co-authored-by: Erick Friis <erick@langchain.dev>
		
			
				
	
	
		
			271 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Search Index Commands"""
 | |
| 
 | |
| import logging
 | |
| from time import monotonic, sleep
 | |
| from typing import Any, Callable, Dict, List, Optional
 | |
| 
 | |
| from pymongo.collection import Collection
 | |
| from pymongo.errors import OperationFailure
 | |
| from pymongo.operations import SearchIndexModel
 | |
| 
 | |
| logger = logging.getLogger(__file__)
 | |
| 
 | |
| 
 | |
| def _search_index_error_message() -> str:
 | |
|     return (
 | |
|         "Search index operations are not currently available on shared clusters, "
 | |
|         "such as MO. They require dedicated clusters >= M10. "
 | |
|         "You may still perform vector search. "
 | |
|         "You simply must set up indexes manually. Follow the instructions here: "
 | |
|         "https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/"
 | |
|     )
 | |
| 
 | |
| 
 | |
| def _vector_search_index_definition(
 | |
|     dimensions: int,
 | |
|     path: str,
 | |
|     similarity: str,
 | |
|     filters: Optional[List[str]] = None,
 | |
|     **kwargs: Any,
 | |
| ) -> Dict[str, Any]:
 | |
|     # https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/
 | |
|     fields = [
 | |
|         {
 | |
|             "numDimensions": dimensions,
 | |
|             "path": path,
 | |
|             "similarity": similarity,
 | |
|             "type": "vector",
 | |
|         },
 | |
|     ]
 | |
|     if filters:
 | |
|         for field in filters:
 | |
|             fields.append({"type": "filter", "path": field})
 | |
|     definition = {"fields": fields}
 | |
|     definition.update(kwargs)
 | |
|     return definition
 | |
| 
 | |
| 
 | |
| def create_vector_search_index(
 | |
|     collection: Collection,
 | |
|     index_name: str,
 | |
|     dimensions: int,
 | |
|     path: str,
 | |
|     similarity: str,
 | |
|     filters: Optional[List[str]] = None,
 | |
|     *,
 | |
|     wait_until_complete: Optional[float] = None,
 | |
|     **kwargs: Any,
 | |
| ) -> None:
 | |
|     """Experimental Utility function to create a vector search index
 | |
| 
 | |
|     Args:
 | |
|         collection (Collection): MongoDB Collection
 | |
|         index_name (str): Name of Index
 | |
|         dimensions (int): Number of dimensions in embedding
 | |
|         path (str): field with vector embedding
 | |
|         similarity (str): The similarity score used for the index
 | |
|         filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
 | |
|         wait_until_complete (Optional[float]): If provided, number of seconds to wait
 | |
|             until search index is ready.
 | |
|         kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
 | |
|     """
 | |
|     logger.info("Creating Search Index %s on %s", index_name, collection.name)
 | |
| 
 | |
|     try:
 | |
|         result = collection.create_search_index(
 | |
|             SearchIndexModel(
 | |
|                 definition=_vector_search_index_definition(
 | |
|                     dimensions=dimensions,
 | |
|                     path=path,
 | |
|                     similarity=similarity,
 | |
|                     filters=filters,
 | |
|                     **kwargs,
 | |
|                 ),
 | |
|                 name=index_name,
 | |
|                 type="vectorSearch",
 | |
|             )
 | |
|         )
 | |
|     except OperationFailure as e:
 | |
|         raise OperationFailure(_search_index_error_message()) from e
 | |
| 
 | |
|     if wait_until_complete:
 | |
|         _wait_for_predicate(
 | |
|             predicate=lambda: _is_index_ready(collection, index_name),
 | |
|             err=f"{index_name=} did not complete in {wait_until_complete}!",
 | |
|             timeout=wait_until_complete,
 | |
|         )
 | |
|     logger.info(result)
 | |
| 
 | |
| 
 | |
| def drop_vector_search_index(
 | |
|     collection: Collection,
 | |
|     index_name: str,
 | |
|     *,
 | |
|     wait_until_complete: Optional[float] = None,
 | |
| ) -> None:
 | |
|     """Drop a created vector search index
 | |
| 
 | |
|     Args:
 | |
|         collection (Collection): MongoDB Collection with index to be dropped
 | |
|         index_name (str): Name of the MongoDB index
 | |
|         wait_until_complete (Optional[float]): If provided, number of seconds to wait
 | |
|             until search index is ready.
 | |
|     """
 | |
|     logger.info(
 | |
|         "Dropping Search Index %s from Collection: %s", index_name, collection.name
 | |
|     )
 | |
|     try:
 | |
|         collection.drop_search_index(index_name)
 | |
|     except OperationFailure as e:
 | |
|         if "CommandNotSupported" in str(e):
 | |
|             raise OperationFailure(_search_index_error_message()) from e
 | |
|         # else this most likely means an ongoing drop request was made so skip
 | |
|     if wait_until_complete:
 | |
|         _wait_for_predicate(
 | |
|             predicate=lambda: len(list(collection.list_search_indexes())) == 0,
 | |
|             err=f"Index {index_name} did not drop in {wait_until_complete}!",
 | |
|             timeout=wait_until_complete,
 | |
|         )
 | |
|     logger.info("Vector Search index %s.%s dropped", collection.name, index_name)
 | |
| 
 | |
| 
 | |
| def update_vector_search_index(
 | |
|     collection: Collection,
 | |
|     index_name: str,
 | |
|     dimensions: int,
 | |
|     path: str,
 | |
|     similarity: str,
 | |
|     filters: Optional[List[str]] = None,
 | |
|     *,
 | |
|     wait_until_complete: Optional[float] = None,
 | |
|     **kwargs: Any,
 | |
| ) -> None:
 | |
|     """Update a search index.
 | |
| 
 | |
|     Replace the existing index definition with the provided definition.
 | |
| 
 | |
|     Args:
 | |
|         collection (Collection): MongoDB Collection
 | |
|         index_name (str): Name of Index
 | |
|         dimensions (int): Number of dimensions in embedding
 | |
|         path (str): field with vector embedding
 | |
|         similarity (str): The similarity score used for the index.
 | |
|         filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
 | |
|         wait_until_complete (Optional[float]): If provided, number of seconds to wait
 | |
|             until search index is ready.
 | |
|         kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
 | |
|     """
 | |
| 
 | |
|     logger.info(
 | |
|         "Updating Search Index %s from Collection: %s", index_name, collection.name
 | |
|     )
 | |
|     try:
 | |
|         collection.update_search_index(
 | |
|             name=index_name,
 | |
|             definition=_vector_search_index_definition(
 | |
|                 dimensions=dimensions,
 | |
|                 path=path,
 | |
|                 similarity=similarity,
 | |
|                 filters=filters,
 | |
|                 **kwargs,
 | |
|             ),
 | |
|         )
 | |
|     except OperationFailure as e:
 | |
|         raise OperationFailure(_search_index_error_message()) from e
 | |
| 
 | |
|     if wait_until_complete:
 | |
|         _wait_for_predicate(
 | |
|             predicate=lambda: _is_index_ready(collection, index_name),
 | |
|             err=f"Index {index_name} update did not complete in {wait_until_complete}!",
 | |
|             timeout=wait_until_complete,
 | |
|         )
 | |
|     logger.info("Update succeeded")
 | |
| 
 | |
| 
 | |
| def _is_index_ready(collection: Collection, index_name: str) -> bool:
 | |
|     """Check for the index name in the list of available search indexes to see if the
 | |
|     specified index is of status READY
 | |
| 
 | |
|     Args:
 | |
|         collection (Collection): MongoDB Collection to for the search indexes
 | |
|         index_name (str): Vector Search Index name
 | |
| 
 | |
|     Returns:
 | |
|         bool : True if the index is present and READY false otherwise
 | |
|     """
 | |
|     try:
 | |
|         search_indexes = collection.list_search_indexes(index_name)
 | |
|     except OperationFailure as e:
 | |
|         raise OperationFailure(_search_index_error_message()) from e
 | |
| 
 | |
|     for index in search_indexes:
 | |
|         if index["type"] == "vectorSearch" and index["status"] == "READY":
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def _wait_for_predicate(
 | |
|     predicate: Callable, err: str, timeout: float = 120, interval: float = 0.5
 | |
| ) -> None:
 | |
|     """Generic to block until the predicate returns true
 | |
| 
 | |
|     Args:
 | |
|         predicate (Callable[, bool]): A function that returns a boolean value
 | |
|         err (str): Error message to raise if nothing occurs
 | |
|         timeout (float, optional): Wait time for predicate. Defaults to TIMEOUT.
 | |
|         interval (float, optional): Interval to check predicate. Defaults to DELAY.
 | |
| 
 | |
|     Raises:
 | |
|         TimeoutError: _description_
 | |
|     """
 | |
|     start = monotonic()
 | |
|     while not predicate():
 | |
|         if monotonic() - start > timeout:
 | |
|             raise TimeoutError(err)
 | |
|         sleep(interval)
 | |
| 
 | |
| 
 | |
| def create_fulltext_search_index(
 | |
|     collection: Collection,
 | |
|     index_name: str,
 | |
|     field: str,
 | |
|     *,
 | |
|     wait_until_complete: Optional[float] = None,
 | |
|     **kwargs: Any,
 | |
| ) -> None:
 | |
|     """Experimental Utility function to create an Atlas Search index
 | |
| 
 | |
|     Args:
 | |
|         collection (Collection): MongoDB Collection
 | |
|         index_name (str): Name of Index
 | |
|         field (str): Field to index
 | |
|         wait_until_complete (Optional[float]): If provided, number of seconds to wait
 | |
|             until search index is ready
 | |
|         kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
 | |
|     """
 | |
|     logger.info("Creating Search Index %s on %s", index_name, collection.name)
 | |
| 
 | |
|     definition = {
 | |
|         "mappings": {"dynamic": False, "fields": {field: [{"type": "string"}]}}
 | |
|     }
 | |
| 
 | |
|     try:
 | |
|         result = collection.create_search_index(
 | |
|             SearchIndexModel(
 | |
|                 definition=definition,
 | |
|                 name=index_name,
 | |
|                 type="search",
 | |
|                 **kwargs,
 | |
|             )
 | |
|         )
 | |
|     except OperationFailure as e:
 | |
|         raise OperationFailure(_search_index_error_message()) from e
 | |
| 
 | |
|     if wait_until_complete:
 | |
|         _wait_for_predicate(
 | |
|             predicate=lambda: _is_index_ready(collection, index_name),
 | |
|             err=f"{index_name=} did not complete in {wait_until_complete}!",
 | |
|             timeout=wait_until_complete,
 | |
|         )
 | |
|     logger.info(result)
 |