From 62efe1ffb9b2ffe369c294be1392de35c517bc0b Mon Sep 17 00:00:00 2001 From: Kenneth Choe Date: Thu, 19 Oct 2023 11:43:51 -0500 Subject: [PATCH] support add_embeddings for elasticsearch (#11002) - **Description:** Provide a way to use different text for embedding. - For example, if you are ingesting stack-overflow Q&As for RAG, you would want to embed the questions and return the answer(s) for the hits. With this change, the consumer of langchain can implement that easily. - I noticed the similar function is added on faiss.py with #1912 which was for performance reason, but I see the same function can be used to achieve what I thought. So instead of changing Document class to have embedding_content, I mimicked the implementation of faiss.py. - The test should provide some guidance on how to use it. It would be more intuitive if I just pass texts and embedding_texts as separate arguments, but I chose to use `zip`-ed object for the consistency with faiss.py implementation. - I plan to make similar pull request for OpenSearch. - **Issue:** N/A - **Dependencies:** None other than the existing ones. Co-authored-by: Bagatur --- .../langchain/vectorstores/elasticsearch.py | 189 +++++++++++------- .../langchain/langchain/vectorstores/faiss.py | 2 +- .../vectorstores/test_elasticsearch.py | 28 +++ 3 files changed, 149 insertions(+), 70 deletions(-) diff --git a/libs/langchain/langchain/vectorstores/elasticsearch.py b/libs/langchain/langchain/vectorstores/elasticsearch.py index be034547b84..e1eed36f3c9 100644 --- a/libs/langchain/langchain/vectorstores/elasticsearch.py +++ b/libs/langchain/langchain/vectorstores/elasticsearch.py @@ -866,6 +866,78 @@ class ElasticsearchStore(VectorStore): ) self.client.indices.create(index=index_name, **indexSettings) + def __add( + self, + texts: Iterable[str], + embeddings: Optional[List[List[float]]], + metadatas: Optional[List[Dict[Any, Any]]] = None, + ids: Optional[List[str]] = None, + refresh_indices: bool = True, + create_index_if_not_exists: bool = True, + bulk_kwargs: Optional[Dict] = None, + **kwargs: Any, + ) -> List[str]: + try: + from elasticsearch.helpers import BulkIndexError, bulk + except ImportError: + raise ImportError( + "Could not import elasticsearch python package. " + "Please install it with `pip install elasticsearch`." + ) + bulk_kwargs = bulk_kwargs or {} + ids = ids or [str(uuid.uuid4()) for _ in texts] + requests = [] + + if create_index_if_not_exists: + if embeddings: + dims_length = len(embeddings[0]) + else: + dims_length = None + + self._create_index_if_not_exists( + index_name=self.index_name, dims_length=dims_length + ) + + for i, text in enumerate(texts): + metadata = metadatas[i] if metadatas else {} + + request = { + "_op_type": "index", + "_index": self.index_name, + self.query_field: text, + "metadata": metadata, + "_id": ids[i], + } + if embeddings: + request[self.vector_query_field] = embeddings[i] + + requests.append(request) + + if len(requests) > 0: + try: + success, failed = bulk( + self.client, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, + ) + logger.debug( + f"Added {success} and failed to add {failed} texts to index" + ) + + logger.debug(f"added texts {ids} to index") + return ids + except BulkIndexError as e: + logger.error(f"Error adding texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + logger.error(f"First error reason: {firstError.get('reason')}") + raise e + + else: + logger.debug("No texts to add to index") + return [] + def add_texts( self, texts: Iterable[str], @@ -893,86 +965,65 @@ class ElasticsearchStore(VectorStore): Returns: List of ids from adding the texts into the vectorstore. """ - try: - from elasticsearch.helpers import BulkIndexError, bulk - except ImportError: - raise ImportError( - "Could not import elasticsearch python package. " - "Please install it with `pip install elasticsearch`." - ) - bulk_kwargs = bulk_kwargs or {} - embeddings = [] - ids = ids or [str(uuid.uuid4()) for _ in texts] - requests = [] - if self.embedding is not None: # If no search_type requires inference, we use the provided # embedding function to embed the texts. embeddings = self.embedding.embed_documents(list(texts)) - dims_length = len(embeddings[0]) - - if create_index_if_not_exists: - self._create_index_if_not_exists( - index_name=self.index_name, dims_length=dims_length - ) - - for i, (text, vector) in enumerate(zip(texts, embeddings)): - metadata = metadatas[i] if metadatas else {} - - requests.append( - { - "_op_type": "index", - "_index": self.index_name, - self.query_field: text, - self.vector_query_field: vector, - "metadata": metadata, - "_id": ids[i], - } - ) - else: # the search_type doesn't require inference, so we don't need to # embed the texts. - if create_index_if_not_exists: - self._create_index_if_not_exists(index_name=self.index_name) + embeddings = None - for i, text in enumerate(texts): - metadata = metadatas[i] if metadatas else {} + return self.__add( + texts, + embeddings, + metadatas=metadatas, + ids=ids, + refresh_indices=refresh_indices, + create_index_if_not_exists=create_index_if_not_exists, + bulk_kwargs=bulk_kwargs, + kwargs=kwargs, + ) - requests.append( - { - "_op_type": "index", - "_index": self.index_name, - self.query_field: text, - "metadata": metadata, - "_id": ids[i], - } - ) + def add_embeddings( + self, + text_embeddings: Iterable[Tuple[str, List[float]]], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + refresh_indices: bool = True, + create_index_if_not_exists: bool = True, + bulk_kwargs: Optional[Dict] = None, + **kwargs: Any, + ) -> List[str]: + """Add the given texts and embeddings to the vectorstore. - if len(requests) > 0: - try: - success, failed = bulk( - self.client, - requests, - stats_only=True, - refresh=refresh_indices, - **bulk_kwargs, - ) - logger.debug( - f"Added {success} and failed to add {failed} texts to index" - ) + Args: + text_embeddings: Iterable pairs of string and embedding to + add to the vectorstore. + metadatas: Optional list of metadatas associated with the texts. + ids: Optional list of unique IDs. + refresh_indices: Whether to refresh the Elasticsearch indices + after adding the texts. + create_index_if_not_exists: Whether to create the Elasticsearch + index if it doesn't already exist. + *bulk_kwargs: Additional arguments to pass to Elasticsearch bulk. + - chunk_size: Optional. Number of texts to add to the + index at a time. Defaults to 500. - logger.debug(f"added texts {ids} to index") - return ids - except BulkIndexError as e: - logger.error(f"Error adding texts: {e}") - firstError = e.errors[0].get("index", {}).get("error", {}) - logger.error(f"First error reason: {firstError.get('reason')}") - raise e - - else: - logger.debug("No texts to add to index") - return [] + Returns: + List of ids from adding the texts into the vectorstore. + """ + texts, embeddings = zip(*text_embeddings) + return self.__add( + list(texts), + list(embeddings), + metadatas=metadatas, + ids=ids, + refresh_indices=refresh_indices, + create_index_if_not_exists=create_index_if_not_exists, + bulk_kwargs=bulk_kwargs, + kwargs=kwargs, + ) @classmethod def from_texts( diff --git a/libs/langchain/langchain/vectorstores/faiss.py b/libs/langchain/langchain/vectorstores/faiss.py index 0b867d27098..53b2e8e31a6 100644 --- a/libs/langchain/langchain/vectorstores/faiss.py +++ b/libs/langchain/langchain/vectorstores/faiss.py @@ -203,7 +203,7 @@ class FAISS(VectorStore): ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: - """Run more texts through the embeddings and add to the vectorstore. + """Add the given texts and embeddings to the vectorstore. Args: text_embeddings: Iterable pairs of string and embedding to diff --git a/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py b/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py index fd549cf7f41..d0a0d7f546e 100644 --- a/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py +++ b/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py @@ -172,6 +172,34 @@ class TestElasticsearch: output = await docsearch.asimilarity_search("foo", k=1) assert output == [Document(page_content="foo")] + def test_add_embeddings( + self, elasticsearch_connection: dict, index_name: str + ) -> None: + """ + Test add_embeddings, which accepts pre-built embeddings instead of + using inference for the texts. + This allows you to separate the embeddings text and the page_content + for better proximity between user's question and embedded text. + For example, your embedding text can be a question, whereas page_content + is the answer. + """ + embeddings = ConsistentFakeEmbeddings() + text_input = ["foo1", "foo2", "foo3"] + metadatas = [{"page": i} for i in range(len(text_input))] + + """In real use case, embedding_input can be questions for each text""" + embedding_input = ["foo2", "foo3", "foo1"] + embedding_vectors = embeddings.embed_documents(embedding_input) + + docsearch = ElasticsearchStore._create_cls_from_kwargs( + embeddings, + **elasticsearch_connection, + index_name=index_name, + ) + docsearch.add_embeddings(list(zip(text_input, embedding_vectors)), metadatas) + output = docsearch.similarity_search("foo1", k=1) + assert output == [Document(page_content="foo3", metadata={"page": 2})] + def test_similarity_search_with_metadata( self, elasticsearch_connection: dict, index_name: str ) -> None: