mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-28 01:19:31 +00:00
support add_embeddings for elasticsearch (#11002)
- **Description:** Provide a way to use different text for embedding. - For example, if you are ingesting stack-overflow Q&As for RAG, you would want to embed the questions and return the answer(s) for the hits. With this change, the consumer of langchain can implement that easily. - I noticed the similar function is added on faiss.py with #1912 which was for performance reason, but I see the same function can be used to achieve what I thought. So instead of changing Document class to have embedding_content, I mimicked the implementation of faiss.py. - The test should provide some guidance on how to use it. It would be more intuitive if I just pass texts and embedding_texts as separate arguments, but I chose to use `zip`-ed object for the consistency with faiss.py implementation. - I plan to make similar pull request for OpenSearch. - **Issue:** N/A - **Dependencies:** None other than the existing ones. Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
parent
76d3afaef0
commit
62efe1ffb9
@ -866,6 +866,78 @@ class ElasticsearchStore(VectorStore):
|
|||||||
)
|
)
|
||||||
self.client.indices.create(index=index_name, **indexSettings)
|
self.client.indices.create(index=index_name, **indexSettings)
|
||||||
|
|
||||||
|
def __add(
|
||||||
|
self,
|
||||||
|
texts: Iterable[str],
|
||||||
|
embeddings: Optional[List[List[float]]],
|
||||||
|
metadatas: Optional[List[Dict[Any, Any]]] = None,
|
||||||
|
ids: Optional[List[str]] = None,
|
||||||
|
refresh_indices: bool = True,
|
||||||
|
create_index_if_not_exists: bool = True,
|
||||||
|
bulk_kwargs: Optional[Dict] = None,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> List[str]:
|
||||||
|
try:
|
||||||
|
from elasticsearch.helpers import BulkIndexError, bulk
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError(
|
||||||
|
"Could not import elasticsearch python package. "
|
||||||
|
"Please install it with `pip install elasticsearch`."
|
||||||
|
)
|
||||||
|
bulk_kwargs = bulk_kwargs or {}
|
||||||
|
ids = ids or [str(uuid.uuid4()) for _ in texts]
|
||||||
|
requests = []
|
||||||
|
|
||||||
|
if create_index_if_not_exists:
|
||||||
|
if embeddings:
|
||||||
|
dims_length = len(embeddings[0])
|
||||||
|
else:
|
||||||
|
dims_length = None
|
||||||
|
|
||||||
|
self._create_index_if_not_exists(
|
||||||
|
index_name=self.index_name, dims_length=dims_length
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, text in enumerate(texts):
|
||||||
|
metadata = metadatas[i] if metadatas else {}
|
||||||
|
|
||||||
|
request = {
|
||||||
|
"_op_type": "index",
|
||||||
|
"_index": self.index_name,
|
||||||
|
self.query_field: text,
|
||||||
|
"metadata": metadata,
|
||||||
|
"_id": ids[i],
|
||||||
|
}
|
||||||
|
if embeddings:
|
||||||
|
request[self.vector_query_field] = embeddings[i]
|
||||||
|
|
||||||
|
requests.append(request)
|
||||||
|
|
||||||
|
if len(requests) > 0:
|
||||||
|
try:
|
||||||
|
success, failed = bulk(
|
||||||
|
self.client,
|
||||||
|
requests,
|
||||||
|
stats_only=True,
|
||||||
|
refresh=refresh_indices,
|
||||||
|
**bulk_kwargs,
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"Added {success} and failed to add {failed} texts to index"
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(f"added texts {ids} to index")
|
||||||
|
return ids
|
||||||
|
except BulkIndexError as e:
|
||||||
|
logger.error(f"Error adding texts: {e}")
|
||||||
|
firstError = e.errors[0].get("index", {}).get("error", {})
|
||||||
|
logger.error(f"First error reason: {firstError.get('reason')}")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.debug("No texts to add to index")
|
||||||
|
return []
|
||||||
|
|
||||||
def add_texts(
|
def add_texts(
|
||||||
self,
|
self,
|
||||||
texts: Iterable[str],
|
texts: Iterable[str],
|
||||||
@ -893,86 +965,65 @@ class ElasticsearchStore(VectorStore):
|
|||||||
Returns:
|
Returns:
|
||||||
List of ids from adding the texts into the vectorstore.
|
List of ids from adding the texts into the vectorstore.
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
from elasticsearch.helpers import BulkIndexError, bulk
|
|
||||||
except ImportError:
|
|
||||||
raise ImportError(
|
|
||||||
"Could not import elasticsearch python package. "
|
|
||||||
"Please install it with `pip install elasticsearch`."
|
|
||||||
)
|
|
||||||
bulk_kwargs = bulk_kwargs or {}
|
|
||||||
embeddings = []
|
|
||||||
ids = ids or [str(uuid.uuid4()) for _ in texts]
|
|
||||||
requests = []
|
|
||||||
|
|
||||||
if self.embedding is not None:
|
if self.embedding is not None:
|
||||||
# If no search_type requires inference, we use the provided
|
# If no search_type requires inference, we use the provided
|
||||||
# embedding function to embed the texts.
|
# embedding function to embed the texts.
|
||||||
embeddings = self.embedding.embed_documents(list(texts))
|
embeddings = self.embedding.embed_documents(list(texts))
|
||||||
dims_length = len(embeddings[0])
|
|
||||||
|
|
||||||
if create_index_if_not_exists:
|
|
||||||
self._create_index_if_not_exists(
|
|
||||||
index_name=self.index_name, dims_length=dims_length
|
|
||||||
)
|
|
||||||
|
|
||||||
for i, (text, vector) in enumerate(zip(texts, embeddings)):
|
|
||||||
metadata = metadatas[i] if metadatas else {}
|
|
||||||
|
|
||||||
requests.append(
|
|
||||||
{
|
|
||||||
"_op_type": "index",
|
|
||||||
"_index": self.index_name,
|
|
||||||
self.query_field: text,
|
|
||||||
self.vector_query_field: vector,
|
|
||||||
"metadata": metadata,
|
|
||||||
"_id": ids[i],
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# the search_type doesn't require inference, so we don't need to
|
# the search_type doesn't require inference, so we don't need to
|
||||||
# embed the texts.
|
# embed the texts.
|
||||||
if create_index_if_not_exists:
|
embeddings = None
|
||||||
self._create_index_if_not_exists(index_name=self.index_name)
|
|
||||||
|
|
||||||
for i, text in enumerate(texts):
|
return self.__add(
|
||||||
metadata = metadatas[i] if metadatas else {}
|
texts,
|
||||||
|
embeddings,
|
||||||
requests.append(
|
metadatas=metadatas,
|
||||||
{
|
ids=ids,
|
||||||
"_op_type": "index",
|
refresh_indices=refresh_indices,
|
||||||
"_index": self.index_name,
|
create_index_if_not_exists=create_index_if_not_exists,
|
||||||
self.query_field: text,
|
bulk_kwargs=bulk_kwargs,
|
||||||
"metadata": metadata,
|
kwargs=kwargs,
|
||||||
"_id": ids[i],
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(requests) > 0:
|
def add_embeddings(
|
||||||
try:
|
self,
|
||||||
success, failed = bulk(
|
text_embeddings: Iterable[Tuple[str, List[float]]],
|
||||||
self.client,
|
metadatas: Optional[List[dict]] = None,
|
||||||
requests,
|
ids: Optional[List[str]] = None,
|
||||||
stats_only=True,
|
refresh_indices: bool = True,
|
||||||
refresh=refresh_indices,
|
create_index_if_not_exists: bool = True,
|
||||||
**bulk_kwargs,
|
bulk_kwargs: Optional[Dict] = None,
|
||||||
)
|
**kwargs: Any,
|
||||||
logger.debug(
|
) -> List[str]:
|
||||||
f"Added {success} and failed to add {failed} texts to index"
|
"""Add the given texts and embeddings to the vectorstore.
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug(f"added texts {ids} to index")
|
Args:
|
||||||
return ids
|
text_embeddings: Iterable pairs of string and embedding to
|
||||||
except BulkIndexError as e:
|
add to the vectorstore.
|
||||||
logger.error(f"Error adding texts: {e}")
|
metadatas: Optional list of metadatas associated with the texts.
|
||||||
firstError = e.errors[0].get("index", {}).get("error", {})
|
ids: Optional list of unique IDs.
|
||||||
logger.error(f"First error reason: {firstError.get('reason')}")
|
refresh_indices: Whether to refresh the Elasticsearch indices
|
||||||
raise e
|
after adding the texts.
|
||||||
|
create_index_if_not_exists: Whether to create the Elasticsearch
|
||||||
|
index if it doesn't already exist.
|
||||||
|
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
|
||||||
|
- chunk_size: Optional. Number of texts to add to the
|
||||||
|
index at a time. Defaults to 500.
|
||||||
|
|
||||||
else:
|
Returns:
|
||||||
logger.debug("No texts to add to index")
|
List of ids from adding the texts into the vectorstore.
|
||||||
return []
|
"""
|
||||||
|
texts, embeddings = zip(*text_embeddings)
|
||||||
|
return self.__add(
|
||||||
|
list(texts),
|
||||||
|
list(embeddings),
|
||||||
|
metadatas=metadatas,
|
||||||
|
ids=ids,
|
||||||
|
refresh_indices=refresh_indices,
|
||||||
|
create_index_if_not_exists=create_index_if_not_exists,
|
||||||
|
bulk_kwargs=bulk_kwargs,
|
||||||
|
kwargs=kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_texts(
|
def from_texts(
|
||||||
|
@ -203,7 +203,7 @@ class FAISS(VectorStore):
|
|||||||
ids: Optional[List[str]] = None,
|
ids: Optional[List[str]] = None,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
"""Run more texts through the embeddings and add to the vectorstore.
|
"""Add the given texts and embeddings to the vectorstore.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
text_embeddings: Iterable pairs of string and embedding to
|
text_embeddings: Iterable pairs of string and embedding to
|
||||||
|
@ -172,6 +172,34 @@ class TestElasticsearch:
|
|||||||
output = await docsearch.asimilarity_search("foo", k=1)
|
output = await docsearch.asimilarity_search("foo", k=1)
|
||||||
assert output == [Document(page_content="foo")]
|
assert output == [Document(page_content="foo")]
|
||||||
|
|
||||||
|
def test_add_embeddings(
|
||||||
|
self, elasticsearch_connection: dict, index_name: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Test add_embeddings, which accepts pre-built embeddings instead of
|
||||||
|
using inference for the texts.
|
||||||
|
This allows you to separate the embeddings text and the page_content
|
||||||
|
for better proximity between user's question and embedded text.
|
||||||
|
For example, your embedding text can be a question, whereas page_content
|
||||||
|
is the answer.
|
||||||
|
"""
|
||||||
|
embeddings = ConsistentFakeEmbeddings()
|
||||||
|
text_input = ["foo1", "foo2", "foo3"]
|
||||||
|
metadatas = [{"page": i} for i in range(len(text_input))]
|
||||||
|
|
||||||
|
"""In real use case, embedding_input can be questions for each text"""
|
||||||
|
embedding_input = ["foo2", "foo3", "foo1"]
|
||||||
|
embedding_vectors = embeddings.embed_documents(embedding_input)
|
||||||
|
|
||||||
|
docsearch = ElasticsearchStore._create_cls_from_kwargs(
|
||||||
|
embeddings,
|
||||||
|
**elasticsearch_connection,
|
||||||
|
index_name=index_name,
|
||||||
|
)
|
||||||
|
docsearch.add_embeddings(list(zip(text_input, embedding_vectors)), metadatas)
|
||||||
|
output = docsearch.similarity_search("foo1", k=1)
|
||||||
|
assert output == [Document(page_content="foo3", metadata={"page": 2})]
|
||||||
|
|
||||||
def test_similarity_search_with_metadata(
|
def test_similarity_search_with_metadata(
|
||||||
self, elasticsearch_connection: dict, index_name: str
|
self, elasticsearch_connection: dict, index_name: str
|
||||||
) -> None:
|
) -> None:
|
||||||
|
Loading…
Reference in New Issue
Block a user