mirror of
				https://github.com/hwchase17/langchain.git
				synced 2025-10-25 21:03:11 +00:00 
			
		
		
		
	## Description This pull-request extends the existing vector search strategies of MongoDBAtlasVectorSearch to include Hybrid (Reciprocal Rank Fusion) and Full-text via new Retrievers. There is a small breaking change in the form of the `prefilter` kwarg to search. For this, and because we have now added a great deal of features, including programmatic Index creation/deletion since 0.1.0, we plan to bump the version to 0.2.0. ### Checklist * Unit tests have been extended * formatting has been applied * One mypy error remains which will either go away in CI or be simplified. --------- Signed-off-by: Casey Clements <casey.clements@mongodb.com> Co-authored-by: Erick Friis <erick@langchain.dev>
		
			
				
	
	
		
			161 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Aggregation pipeline components used in Atlas Full-Text, Vector, and Hybrid Search
 | |
| 
 | |
| See the following for more:
 | |
|     - `Full-Text Search <https://www.mongodb.com/docs/atlas/atlas-search/aggregation-stages/search/#mongodb-pipeline-pipe.-search>`_
 | |
|     - `MongoDB Operators <https://www.mongodb.com/docs/atlas/atlas-search/operators-and-collectors/#std-label-operators-ref>`_
 | |
|     - `Vector Search <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/>`_
 | |
|     - `Filter Example <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#atlas-vector-search-pre-filter>`_
 | |
| """
 | |
| 
 | |
| from typing import Any, Dict, List, Optional
 | |
| 
 | |
| 
 | |
| def text_search_stage(
 | |
|     query: str,
 | |
|     search_field: str,
 | |
|     index_name: str,
 | |
|     limit: Optional[int] = None,
 | |
|     filter: Optional[Dict[str, Any]] = None,
 | |
|     include_scores: Optional[bool] = True,
 | |
|     **kwargs: Any,
 | |
| ) -> List[Dict[str, Any]]:  # noqa: E501
 | |
|     """Full-Text search using Lucene's standard (BM25) analyzer
 | |
| 
 | |
|     Args:
 | |
|         query: Input text to search for
 | |
|         search_field: Field in Collection that will be searched
 | |
|         index_name: Atlas Search Index name
 | |
|         limit: Maximum number of documents to return. Default of no limit
 | |
|         filter: Any MQL match expression comparing an indexed field
 | |
|         include_scores: Scores provide measure of relative relevance
 | |
| 
 | |
|     Returns:
 | |
|         Dictionary defining the $search stage
 | |
|     """
 | |
|     pipeline = [
 | |
|         {
 | |
|             "$search": {
 | |
|                 "index": index_name,
 | |
|                 "text": {"query": query, "path": search_field},
 | |
|             }
 | |
|         }
 | |
|     ]
 | |
|     if filter:
 | |
|         pipeline.append({"$match": filter})  # type: ignore
 | |
|     if include_scores:
 | |
|         pipeline.append({"$set": {"score": {"$meta": "searchScore"}}})
 | |
|     if limit:
 | |
|         pipeline.append({"$limit": limit})  # type: ignore
 | |
| 
 | |
|     return pipeline  # type: ignore
 | |
| 
 | |
| 
 | |
| def vector_search_stage(
 | |
|     query_vector: List[float],
 | |
|     search_field: str,
 | |
|     index_name: str,
 | |
|     top_k: int = 4,
 | |
|     filter: Optional[Dict[str, Any]] = None,
 | |
|     oversampling_factor: int = 10,
 | |
|     **kwargs: Any,
 | |
| ) -> Dict[str, Any]:  # noqa: E501
 | |
|     """Vector Search Stage without Scores.
 | |
| 
 | |
|     Scoring is applied later depending on strategy.
 | |
|     vector search includes a vectorSearchScore that is typically used.
 | |
|     hybrid uses Reciprocal Rank Fusion.
 | |
| 
 | |
|     Args:
 | |
|         query_vector: List of embedding vector
 | |
|         search_field: Field in Collection containing embedding vectors
 | |
|         index_name: Name of Atlas Vector Search Index tied to Collection
 | |
|         top_k: Number of documents to return
 | |
|         oversampling_factor: this times limit is the number of candidates
 | |
|         filter: MQL match expression comparing an indexed field.
 | |
|             Some operators are not supported.
 | |
|             See `vectorSearch filter docs <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#atlas-vector-search-pre-filter>`_
 | |
| 
 | |
| 
 | |
|     Returns:
 | |
|         Dictionary defining the $vectorSearch
 | |
|     """
 | |
|     stage = {
 | |
|         "index": index_name,
 | |
|         "path": search_field,
 | |
|         "queryVector": query_vector,
 | |
|         "numCandidates": top_k * oversampling_factor,
 | |
|         "limit": top_k,
 | |
|     }
 | |
|     if filter:
 | |
|         stage["filter"] = filter
 | |
|     return {"$vectorSearch": stage}
 | |
| 
 | |
| 
 | |
| def combine_pipelines(
 | |
|     pipeline: List[Any], stage: List[Dict[str, Any]], collection_name: str
 | |
| ) -> None:
 | |
|     """Combines two aggregations into a single result set in-place."""
 | |
|     if pipeline:
 | |
|         pipeline.append({"$unionWith": {"coll": collection_name, "pipeline": stage}})
 | |
|     else:
 | |
|         pipeline.extend(stage)
 | |
| 
 | |
| 
 | |
| def reciprocal_rank_stage(
 | |
|     score_field: str, penalty: float = 0, **kwargs: Any
 | |
| ) -> List[Dict[str, Any]]:
 | |
|     """Stage adds Reciprocal Rank Fusion weighting.
 | |
| 
 | |
|         First, it pushes documents retrieved from previous stage
 | |
|         into a temporary sub-document. It then unwinds to establish
 | |
|         the rank to each and applies the penalty.
 | |
| 
 | |
|     Args:
 | |
|         score_field: A unique string to identify the search being ranked
 | |
|         penalty: A non-negative float.
 | |
|         extra_fields: Any fields other than text_field that one wishes to keep.
 | |
| 
 | |
|     Returns:
 | |
|         RRF score := \frac{1}{rank + penalty} with rank in [1,2,..,n]
 | |
|     """
 | |
| 
 | |
|     rrf_pipeline = [
 | |
|         {"$group": {"_id": None, "docs": {"$push": "$$ROOT"}}},
 | |
|         {"$unwind": {"path": "$docs", "includeArrayIndex": "rank"}},
 | |
|         {
 | |
|             "$addFields": {
 | |
|                 f"docs.{score_field}": {
 | |
|                     "$divide": [1.0, {"$add": ["$rank", penalty, 1]}]
 | |
|                 },
 | |
|                 "docs.rank": "$rank",
 | |
|                 "_id": "$docs._id",
 | |
|             }
 | |
|         },
 | |
|         {"$replaceRoot": {"newRoot": "$docs"}},
 | |
|     ]
 | |
| 
 | |
|     return rrf_pipeline  # type: ignore
 | |
| 
 | |
| 
 | |
| def final_hybrid_stage(
 | |
|     scores_fields: List[str], limit: int, **kwargs: Any
 | |
| ) -> List[Dict[str, Any]]:
 | |
|     """Sum weighted scores, sort, and apply limit.
 | |
| 
 | |
|     Args:
 | |
|         scores_fields: List of fields given to scores of vector and text searches
 | |
|         limit: Number of documents to return
 | |
| 
 | |
|     Returns:
 | |
|         Final aggregation stages
 | |
|     """
 | |
| 
 | |
|     return [
 | |
|         {"$group": {"_id": "$_id", "docs": {"$mergeObjects": "$$ROOT"}}},
 | |
|         {"$replaceRoot": {"newRoot": "$docs"}},
 | |
|         {"$set": {score: {"$ifNull": [f"${score}", 0]} for score in scores_fields}},
 | |
|         {"$addFields": {"score": {"$add": [f"${score}" for score in scores_fields]}}},
 | |
|         {"$sort": {"score": -1}},
 | |
|         {"$limit": limit},
 | |
|     ]
 |