langchain[minor], community[minor]: add CrossEncoderReranker with HuggingFaceCrossEncoder and SagemakerEndpointCrossEncoder (#13687)

- **Description:** Support reranking based on cross encoder models
available from HuggingFace.
      - Added `CrossEncoder` schema
- Implemented `HuggingFaceCrossEncoder` and
`SagemakerEndpointCrossEncoder`
- Implemented `CrossEncoderReranker` that performs similar functionality
to `CohereRerank`
- Added `cross-encoder-reranker.ipynb` to demonstrate how to use it.
Please let me know if anything else needs to be done to make it visible
on the table-of-contents navigation bar on the left, or on the card list
on [retrievers documentation
page](https://python.langchain.com/docs/integrations/retrievers).
  - **Issue:** N/A
  - **Dependencies:** None other than the existing ones.

---------

Co-authored-by: Kenny Choe <kchoe@amazon.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
Kenneth Choe
2024-03-31 15:51:31 -05:00
committed by GitHub
parent 3f7da03dd8
commit f98d7f7494
11 changed files with 660 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
"""**Cross encoders** are wrappers around cross encoder models from different APIs and
services.
**Cross encoder models** can be LLMs or not.
**Class hierarchy:**
.. code-block::
BaseCrossEncoder --> <name>CrossEncoder # Examples: SagemakerEndpointCrossEncoder
"""
import logging
from langchain_community.cross_encoders.base import BaseCrossEncoder
from langchain_community.cross_encoders.fake import FakeCrossEncoder
from langchain_community.cross_encoders.huggingface import HuggingFaceCrossEncoder
from langchain_community.cross_encoders.sagemaker_endpoint import (
SagemakerEndpointCrossEncoder,
)
logger = logging.getLogger(__name__)
__all__ = [
"BaseCrossEncoder",
"FakeCrossEncoder",
"HuggingFaceCrossEncoder",
"SagemakerEndpointCrossEncoder",
]

View File

@@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
from typing import List, Tuple
class BaseCrossEncoder(ABC):
"""Interface for cross encoder models."""
@abstractmethod
def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]:
"""Score pairs' similarity.
Args:
text_pairs: List of pairs of texts.
Returns:
List of scores.
"""

View File

@@ -0,0 +1,18 @@
from difflib import SequenceMatcher
from typing import List, Tuple
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.cross_encoders.base import BaseCrossEncoder
class FakeCrossEncoder(BaseCrossEncoder, BaseModel):
"""Fake cross encoder model."""
def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]:
scores = list(
map(
lambda pair: SequenceMatcher(None, pair[0], pair[1]).ratio(), text_pairs
)
)
return scores

View File

@@ -0,0 +1,63 @@
from typing import Any, Dict, List, Tuple
from langchain_core.pydantic_v1 import BaseModel, Extra, Field
from langchain_community.cross_encoders.base import BaseCrossEncoder
DEFAULT_MODEL_NAME = "BAAI/bge-reranker-base"
class HuggingFaceCrossEncoder(BaseModel, BaseCrossEncoder):
"""HuggingFace cross encoder models.
Example:
.. code-block:: python
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
model_name = "BAAI/bge-reranker-base"
model_kwargs = {'device': 'cpu'}
hf = HuggingFaceCrossEncoder(
model_name=model_name,
model_kwargs=model_kwargs
)
"""
client: Any #: :meta private:
model_name: str = DEFAULT_MODEL_NAME
"""Model name to use."""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Keyword arguments to pass to the model."""
def __init__(self, **kwargs: Any):
"""Initialize the sentence_transformer."""
super().__init__(**kwargs)
try:
import sentence_transformers
except ImportError as exc:
raise ImportError(
"Could not import sentence_transformers python package. "
"Please install it with `pip install sentence-transformers`."
) from exc
self.client = sentence_transformers.CrossEncoder(
self.model_name, **self.model_kwargs
)
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]:
"""Compute similarity scores using a HuggingFace transformer model.
Args:
text_pairs: The list of text text_pairs to score the similarity.
Returns:
List of scores, one for each pair.
"""
scores = self.client.predict(text_pairs)
return scores

View File

@@ -0,0 +1,151 @@
import json
from typing import Any, Dict, List, Optional, Tuple
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_community.cross_encoders.base import BaseCrossEncoder
class CrossEncoderContentHandler:
"""Content handler for CrossEncoder class."""
content_type = "application/json"
accepts = "application/json"
def transform_input(self, text_pairs: List[Tuple[str, str]]) -> bytes:
input_str = json.dumps({"text_pairs": text_pairs})
return input_str.encode("utf-8")
def transform_output(self, output: Any) -> List[float]:
response_json = json.loads(output.read().decode("utf-8"))
scores = response_json["scores"]
return scores
class SagemakerEndpointCrossEncoder(BaseModel, BaseCrossEncoder):
"""SageMaker Inference CrossEncoder endpoint.
To use, you must supply the endpoint name from your deployed
Sagemaker model & the region where it is deployed.
To authenticate, the AWS client uses the following methods to
automatically load credentials:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
If a specific credential profile should be used, you must pass
the name of the profile from the ~/.aws/credentials file that is to be used.
Make sure the credentials / roles used have the required policies to
access the Sagemaker endpoint.
See: https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html
"""
"""
Example:
.. code-block:: python
from langchain.embeddings import SagemakerEndpointCrossEncoder
endpoint_name = (
"my-endpoint-name"
)
region_name = (
"us-west-2"
)
credentials_profile_name = (
"default"
)
se = SagemakerEndpointCrossEncoder(
endpoint_name=endpoint_name,
region_name=region_name,
credentials_profile_name=credentials_profile_name
)
"""
client: Any #: :meta private:
endpoint_name: str = ""
"""The name of the endpoint from the deployed Sagemaker model.
Must be unique within an AWS Region."""
region_name: str = ""
"""The aws region where the Sagemaker model is deployed, eg. `us-west-2`."""
credentials_profile_name: Optional[str] = None
"""The name of the profile in the ~/.aws/credentials or ~/.aws/config files, which
has either access keys or role information specified.
If not specified, the default credential profile or, if on an EC2 instance,
credentials from IMDS will be used.
See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
"""
content_handler: CrossEncoderContentHandler = CrossEncoderContentHandler()
model_kwargs: Optional[Dict] = None
"""Keyword arguments to pass to the model."""
endpoint_kwargs: Optional[Dict] = None
"""Optional attributes passed to the invoke_endpoint
function. See `boto3`_. docs for more info.
.. _boto3: <https://boto3.amazonaws.com/v1/documentation/api/latest/index.html>
"""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that AWS credentials to and python package exists in environment."""
try:
import boto3
try:
if values["credentials_profile_name"] is not None:
session = boto3.Session(
profile_name=values["credentials_profile_name"]
)
else:
# use default credentials
session = boto3.Session()
values["client"] = session.client(
"sagemaker-runtime", region_name=values["region_name"]
)
except Exception as e:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
return values
def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]:
"""Call out to SageMaker Inference CrossEncoder endpoint."""
_endpoint_kwargs = self.endpoint_kwargs or {}
body = self.content_handler.transform_input(text_pairs)
content_type = self.content_handler.content_type
accepts = self.content_handler.accepts
# send request
try:
response = self.client.invoke_endpoint(
EndpointName=self.endpoint_name,
Body=body,
ContentType=content_type,
Accept=accepts,
**_endpoint_kwargs,
)
except Exception as e:
raise ValueError(f"Error raised by inference endpoint: {e}")
return self.content_handler.transform_output(response["Body"])

View File

@@ -0,0 +1 @@
"""Test cross encoder integrations."""

View File

@@ -0,0 +1,22 @@
"""Test huggingface cross encoders."""
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
def _assert(encoder: HuggingFaceCrossEncoder) -> None:
query = "I love you"
texts = ["I love you", "I like you", "I don't like you", "I hate you"]
output = encoder.score([(query, text) for text in texts])
for i in range(len(texts) - 1):
assert output[i] > output[i + 1]
def test_huggingface_cross_encoder() -> None:
encoder = HuggingFaceCrossEncoder()
_assert(encoder)
def test_huggingface_cross_encoder_with_designated_model_name() -> None:
encoder = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
_assert(encoder)

View File

@@ -6,6 +6,9 @@ from langchain.retrievers.document_compressors.chain_filter import (
LLMChainFilter,
)
from langchain.retrievers.document_compressors.cohere_rerank import CohereRerank
from langchain.retrievers.document_compressors.cross_encoder_rerank import (
CrossEncoderReranker,
)
from langchain.retrievers.document_compressors.embeddings_filter import (
EmbeddingsFilter,
)
@@ -17,5 +20,6 @@ __all__ = [
"LLMChainExtractor",
"LLMChainFilter",
"CohereRerank",
"CrossEncoderReranker",
"FlashrankRerank",
]

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
import operator
from typing import Optional, Sequence
from langchain_community.cross_encoders import BaseCrossEncoder
from langchain_core.callbacks import Callbacks
from langchain_core.documents import BaseDocumentCompressor, Document
from langchain_core.pydantic_v1 import Extra
class CrossEncoderReranker(BaseDocumentCompressor):
"""Document compressor that uses CrossEncoder for reranking."""
model: BaseCrossEncoder
"""CrossEncoder model to use for scoring similarity
between the query and documents."""
top_n: int = 3
"""Number of documents to return."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
def compress_documents(
self,
documents: Sequence[Document],
query: str,
callbacks: Optional[Callbacks] = None,
) -> Sequence[Document]:
"""
Rerank documents using CrossEncoder.
Args:
documents: A sequence of documents to compress.
query: The query to use for compressing the documents.
callbacks: Callbacks to run during the compression process.
Returns:
A sequence of compressed documents.
"""
scores = self.model.score([(query, doc.page_content) for doc in documents])
docs_with_scores = list(zip(documents, scores))
result = sorted(docs_with_scores, key=operator.itemgetter(1), reverse=True)
return [doc for doc, _ in result[: self.top_n]]

View File

@@ -0,0 +1,34 @@
"""Integration test for CrossEncoderReranker."""
from typing import List
from langchain_community.cross_encoders import FakeCrossEncoder
from langchain_core.documents import Document
from langchain.retrievers.document_compressors import CrossEncoderReranker
def test_rerank() -> None:
texts = [
"aaa1",
"bbb1",
"aaa2",
"bbb2",
"aaa3",
"bbb3",
]
docs = list(map(lambda text: Document(page_content=text), texts))
compressor = CrossEncoderReranker(model=FakeCrossEncoder())
actual_docs = compressor.compress_documents(docs, "bbb2")
actual = list(map(lambda doc: doc.page_content, actual_docs))
expected_returned = ["bbb2", "bbb1", "bbb3"]
expected_not_returned = ["aaa1", "aaa2", "aaa3"]
assert all([text in actual for text in expected_returned])
assert all([text not in actual for text in expected_not_returned])
assert actual[0] == "bbb2"
def test_rerank_empty() -> None:
docs: List[Document] = []
compressor = CrossEncoderReranker(model=FakeCrossEncoder())
actual_docs = compressor.compress_documents(docs, "query")
assert len(actual_docs) == 0