[ElasticsearchStore] Enable custom Bulk Args (#11065)

This enables bulk args like `chunk_size` to be passed down from the
ingest methods (from_text, from_documents) to be passed down to the bulk
API.

This helps alleviate issues where bulk importing a large amount of
documents into Elasticsearch was resulting in a timeout.

Contribution Shoutout
- @elastic

- [x] Updated Integration tests

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
Joseph McElroy 2023-09-26 20:53:50 +01:00 committed by GitHub
parent d19fd0cfae
commit 175ef0a55d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 16 deletions

View File

@ -18,7 +18,7 @@ Example: Run a single-node Elasticsearch instance with security disabled. This i
#### Deploy Elasticsearch on Elastic Cloud #### Deploy Elasticsearch on Elastic Cloud
Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?storm=langchain-notebook). Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?utm_source=langchain&utm_content=documentation).
### Install Client ### Install Client

View File

@ -44,7 +44,7 @@
"source": [ "source": [
"There are two main ways to setup an Elasticsearch instance for use with:\n", "There are two main ways to setup an Elasticsearch instance for use with:\n",
"\n", "\n",
"1. Elastic Cloud: Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?storm=langchain-notebook).\n", "1. Elastic Cloud: Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?utm_source=langchain&utm_content=documentation).\n",
"\n", "\n",
"To connect to an Elasticsearch instance that does not require\n", "To connect to an Elasticsearch instance that does not require\n",
"login credentials (starting the docker instance with security enabled), pass the Elasticsearch URL and index name along with the\n", "login credentials (starting the docker instance with security enabled), pass the Elasticsearch URL and index name along with the\n",
@ -662,7 +662,7 @@
"id": "0960fa0a", "id": "0960fa0a",
"metadata": {}, "metadata": {},
"source": [ "source": [
"# Customise the Query\n", "## Customise the Query\n",
"With `custom_query` parameter at search, you are able to adjust the query that is used to retrieve documents from Elasticsearch. This is useful if you want to want to use a more complex query, to support linear boosting of fields." "With `custom_query` parameter at search, you are able to adjust the query that is used to retrieve documents from Elasticsearch. This is useful if you want to want to use a more complex query, to support linear boosting of fields."
] ]
}, },
@ -720,6 +720,35 @@
"print(results[0])" "print(results[0])"
] ]
}, },
{
"cell_type": "markdown",
"id": "3242fd42",
"metadata": {},
"source": [
"# FAQ\n",
"\n",
"## Question: Im getting timeout errors when indexing documents into Elasticsearch. How do I fix this?\n",
"One possible issue is your documents might take longer to index into Elasticsearch. ElasticsearchStore uses the Elasticsearch bulk API which has a few defaults that you can adjust to reduce the chance of timeout errors.\n",
"\n",
"This is also a good idea when you're using SparseVectorRetrievalStrategy.\n",
"\n",
"The defaults are:\n",
"- `chunk_size`: 500\n",
"- `max_chunk_bytes`: 100MB\n",
"\n",
"To adjust these, you can pass in the `chunk_size` and `max_chunk_bytes` parameters to the ElasticsearchStore `add_texts` method.\n",
"\n",
"```python\n",
" vector_store.add_texts(\n",
" texts,\n",
" bulk_kwargs={\n",
" \"chunk_size\": 50,\n",
" \"max_chunk_bytes\": 200000000\n",
" }\n",
" )\n",
"```"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"id": "604c66ea", "id": "604c66ea",

View File

@ -506,7 +506,9 @@ class ElasticsearchStore(VectorStore):
self.strategy = strategy self.strategy = strategy
if es_connection is not None: if es_connection is not None:
self.client = es_connection self.client = es_connection.options(
headers={"user-agent": self.get_user_agent()}
)
elif es_url is not None or es_cloud_id is not None: elif es_url is not None or es_cloud_id is not None:
self.client = ElasticsearchStore.connect_to_elasticsearch( self.client = ElasticsearchStore.connect_to_elasticsearch(
es_url=es_url, es_url=es_url,
@ -521,6 +523,12 @@ class ElasticsearchStore(VectorStore):
or valid credentials for creating a new connection.""" or valid credentials for creating a new connection."""
) )
@staticmethod
def get_user_agent() -> str:
from langchain import __version__
return f"langchain-py-vs/{__version__}"
@staticmethod @staticmethod
def connect_to_elasticsearch( def connect_to_elasticsearch(
*, *,
@ -557,7 +565,10 @@ class ElasticsearchStore(VectorStore):
elif username and password: elif username and password:
connection_params["basic_auth"] = (username, password) connection_params["basic_auth"] = (username, password)
es_client = elasticsearch.Elasticsearch(**connection_params) es_client = elasticsearch.Elasticsearch(
**connection_params,
headers={"user-agent": ElasticsearchStore.get_user_agent()},
)
try: try:
es_client.info() es_client.info()
except Exception as e: except Exception as e:
@ -791,6 +802,7 @@ class ElasticsearchStore(VectorStore):
ids: Optional[List[str]] = None, ids: Optional[List[str]] = None,
refresh_indices: bool = True, refresh_indices: bool = True,
create_index_if_not_exists: bool = True, create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any, **kwargs: Any,
) -> List[str]: ) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore. """Run more texts through the embeddings and add to the vectorstore.
@ -803,6 +815,9 @@ class ElasticsearchStore(VectorStore):
after adding the texts. after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist. index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.
Returns: Returns:
List of ids from adding the texts into the vectorstore. List of ids from adding the texts into the vectorstore.
@ -814,7 +829,7 @@ class ElasticsearchStore(VectorStore):
"Could not import elasticsearch python package. " "Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`." "Please install it with `pip install elasticsearch`."
) )
bulk_kwargs = bulk_kwargs or {}
embeddings = [] embeddings = []
ids = ids or [str(uuid.uuid4()) for _ in texts] ids = ids or [str(uuid.uuid4()) for _ in texts]
requests = [] requests = []
@ -866,7 +881,11 @@ class ElasticsearchStore(VectorStore):
if len(requests) > 0: if len(requests) > 0:
try: try:
success, failed = bulk( success, failed = bulk(
self.client, requests, stats_only=True, refresh=refresh_indices self.client,
requests,
stats_only=True,
refresh=refresh_indices,
**bulk_kwargs,
) )
logger.debug( logger.debug(
f"Added {success} and failed to add {failed} texts to index" f"Added {success} and failed to add {failed} texts to index"
@ -890,6 +909,7 @@ class ElasticsearchStore(VectorStore):
texts: List[str], texts: List[str],
embedding: Optional[Embeddings] = None, embedding: Optional[Embeddings] = None,
metadatas: Optional[List[Dict[str, Any]]] = None, metadatas: Optional[List[Dict[str, Any]]] = None,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any, **kwargs: Any,
) -> "ElasticsearchStore": ) -> "ElasticsearchStore":
"""Construct ElasticsearchStore wrapper from raw documents. """Construct ElasticsearchStore wrapper from raw documents.
@ -927,6 +947,8 @@ class ElasticsearchStore(VectorStore):
strategy to use. Defaults to "COSINE". strategy to use. Defaults to "COSINE".
can be one of "COSINE", can be one of "COSINE",
"EUCLIDEAN_DISTANCE", "DOT_PRODUCT". "EUCLIDEAN_DISTANCE", "DOT_PRODUCT".
bulk_kwargs: Optional. Additional arguments to pass to
Elasticsearch bulk.
""" """
elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs(
@ -934,7 +956,9 @@ class ElasticsearchStore(VectorStore):
) )
# Encode the provided texts and add them to the newly created index. # Encode the provided texts and add them to the newly created index.
elasticsearchStore.add_texts(texts, metadatas=metadatas) elasticsearchStore.add_texts(
texts, metadatas=metadatas, bulk_kwargs=bulk_kwargs
)
return elasticsearchStore return elasticsearchStore
@ -985,6 +1009,7 @@ class ElasticsearchStore(VectorStore):
cls, cls,
documents: List[Document], documents: List[Document],
embedding: Optional[Embeddings] = None, embedding: Optional[Embeddings] = None,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any, **kwargs: Any,
) -> "ElasticsearchStore": ) -> "ElasticsearchStore":
"""Construct ElasticsearchStore wrapper from documents. """Construct ElasticsearchStore wrapper from documents.
@ -1018,13 +1043,15 @@ class ElasticsearchStore(VectorStore):
vector_query_field: Optional. Name of the field vector_query_field: Optional. Name of the field
to store the embedding vectors in. to store the embedding vectors in.
query_field: Optional. Name of the field to store the texts in. query_field: Optional. Name of the field to store the texts in.
bulk_kwargs: Optional. Additional arguments to pass to
Elasticsearch bulk.
""" """
elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs(
embedding=embedding, **kwargs embedding=embedding, **kwargs
) )
# Encode the provided texts and add them to the newly created index. # Encode the provided texts and add them to the newly created index.
elasticsearchStore.add_documents(documents) elasticsearchStore.add_documents(documents, bulk_kwargs=bulk_kwargs)
return elasticsearchStore return elasticsearchStore

View File

@ -7,6 +7,7 @@ services:
- discovery.type=single-node - discovery.type=single-node
- xpack.security.enabled=false # security has been disabled, so no login or password is required. - xpack.security.enabled=false # security has been disabled, so no login or password is required.
- xpack.security.http.ssl.enabled=false - xpack.security.http.ssl.enabled=false
- xpack.license.self_generated.type=trial
ports: ports:
- "9200:9200" - "9200:9200"
healthcheck: healthcheck:

View File

@ -1,8 +1,9 @@
"""Test ElasticSearch functionality.""" """Test ElasticSearch functionality."""
import logging import logging
import os import os
import re
import uuid import uuid
from typing import Generator, List, Union from typing import Any, Dict, Generator, List, Union
import pytest import pytest
@ -58,19 +59,20 @@ class TestElasticsearch:
es_password = os.environ.get("ES_PASSWORD", "changeme") es_password = os.environ.get("ES_PASSWORD", "changeme")
if cloud_id: if cloud_id:
es = Elasticsearch(
cloud_id=cloud_id,
basic_auth=(es_username, es_password),
)
yield { yield {
"es_cloud_id": cloud_id, "es_cloud_id": cloud_id,
"es_user": es_username, "es_user": es_username,
"es_password": es_password, "es_password": es_password,
} }
es = Elasticsearch(cloud_id=cloud_id, basic_auth=(es_username, es_password))
else: else:
# Running this integration test with local docker instance # Running this integration test with local docker instance
yield {
"es_url": es_url,
}
es = Elasticsearch(hosts=es_url) es = Elasticsearch(hosts=es_url)
yield {"es_url": es_url}
# Clear all indexes # Clear all indexes
index_names = es.indices.get(index="_all").keys() index_names = es.indices.get(index="_all").keys()
@ -92,6 +94,37 @@ class TestElasticsearch:
except Exception: except Exception:
pass pass
@pytest.fixture(scope="function")
def es_client(self) -> Any:
# Running this integration test with Elastic Cloud
# Required for in-stack inference testing (ELSER + model_id)
from elastic_transport import Transport
from elasticsearch import Elasticsearch
class CustomTransport(Transport):
requests = []
def perform_request(self, *args, **kwargs): # type: ignore
self.requests.append(kwargs)
return super().perform_request(*args, **kwargs)
es_url = os.environ.get("ES_URL", "http://localhost:9200")
cloud_id = os.environ.get("ES_CLOUD_ID")
es_username = os.environ.get("ES_USERNAME", "elastic")
es_password = os.environ.get("ES_PASSWORD", "changeme")
if cloud_id:
es = Elasticsearch(
cloud_id=cloud_id,
basic_auth=(es_username, es_password),
transport_class=CustomTransport,
)
return es
else:
# Running this integration test with local docker instance
es = Elasticsearch(hosts=es_url, transport_class=CustomTransport)
return es
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def index_name(self) -> str: def index_name(self) -> str:
"""Return the index name.""" """Return the index name."""
@ -115,7 +148,6 @@ class TestElasticsearch:
return query_body return query_body
texts = ["foo", "bar", "baz"] texts = ["foo", "bar", "baz"]
print(elasticsearch_connection)
docsearch = ElasticsearchStore.from_texts( docsearch = ElasticsearchStore.from_texts(
texts, texts,
FakeEmbeddings(), FakeEmbeddings(),
@ -131,7 +163,6 @@ class TestElasticsearch:
) -> None: ) -> None:
"""Test end to end construction and search without metadata.""" """Test end to end construction and search without metadata."""
texts = ["foo", "bar", "baz"] texts = ["foo", "bar", "baz"]
print(elasticsearch_connection)
docsearch = ElasticsearchStore.from_texts( docsearch = ElasticsearchStore.from_texts(
texts, texts,
FakeEmbeddings(), FakeEmbeddings(),
@ -607,3 +638,60 @@ class TestElasticsearch:
log_message = f"First error reason: {error_reason}" log_message = f"First error reason: {error_reason}"
assert log_message in caplog.text assert log_message in caplog.text
def test_elasticsearch_with_user_agent(
self, es_client: Any, index_name: str
) -> None:
"""Test to make sure the user-agent is set correctly."""
texts = ["foo", "bob", "baz"]
ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
es_connection=es_client,
index_name=index_name,
)
user_agent = es_client.transport.requests[0]["headers"]["User-Agent"]
pattern = r"^langchain-py-vs/\d+\.\d+\.\d+$"
match = re.match(pattern, user_agent)
assert (
match is not None
), f"The string '{user_agent}' does not match the expected pattern."
def test_elasticsearch_with_internal_user_agent(
self, elasticsearch_connection: Dict, index_name: str
) -> None:
"""Test to make sure the user-agent is set correctly."""
texts = ["foo"]
store = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
)
user_agent = store.client._headers["User-Agent"]
pattern = r"^langchain-py-vs/\d+\.\d+\.\d+$"
match = re.match(pattern, user_agent)
assert (
match is not None
), f"The string '{user_agent}' does not match the expected pattern."
def test_bulk_args(self, es_client: Any, index_name: str) -> None:
"""Test to make sure the user-agent is set correctly."""
texts = ["foo", "bob", "baz"]
ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
es_connection=es_client,
index_name=index_name,
bulk_kwargs={"chunk_size": 1},
)
# 1 for index exist, 1 for index create, 3 for index docs
assert len(es_client.transport.requests) == 5 # type: ignore