Community: Adding bulk_size as a setable param for OpenSearchVectorSearch (#28325)

Description:
When using langchain.retrievers.parent_document_retriever.py with
vectorstore is OpenSearchVectorSearch, I found that the bulk_size param
I passed into OpenSearchVectorSearch class did not work on my
ParentDocumentRetriever.add_documents() function correctly, it will be
overwrite with int 500 the function which OpenSearchVectorSearch class
had (e.g., add_texts(), add_embeddings()...).

So I made this PR requset to fix this, thanks!

---------

Co-authored-by: Erick Friis <erick@langchain.dev>
This commit is contained in:
manukychen 2024-12-12 09:45:22 +08:00 committed by GitHub
parent 0af5ad8262
commit ba9b95cd23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -402,6 +402,7 @@ class OpenSearchVectorSearch(VectorStore):
self.client = _get_opensearch_client(opensearch_url, **kwargs)
self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs)
self.engine = kwargs.get("engine", "nmslib")
self.bulk_size = kwargs.get("bulk_size", 500)
@property
def embeddings(self) -> Embeddings:
@ -413,9 +414,10 @@ class OpenSearchVectorSearch(VectorStore):
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
@ -454,9 +456,10 @@ class OpenSearchVectorSearch(VectorStore):
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
@ -560,7 +563,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
@ -582,6 +585,7 @@ class OpenSearchVectorSearch(VectorStore):
to "text".
"""
embeddings = self.embedding_function.embed_documents(list(texts))
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return self.__add(
texts,
embeddings,
@ -596,7 +600,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""
@ -604,6 +608,7 @@ class OpenSearchVectorSearch(VectorStore):
and add to the vectorstore.
"""
embeddings = await self.embedding_function.aembed_documents(list(texts))
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return await self.__aadd(
texts,
embeddings,
@ -618,7 +623,7 @@ class OpenSearchVectorSearch(VectorStore):
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts and embeddings to the vectorstore.
@ -641,6 +646,7 @@ class OpenSearchVectorSearch(VectorStore):
to "text".
"""
texts, embeddings = zip(*text_embeddings)
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return self.__add(
list(texts),
list(embeddings),
@ -1085,7 +1091,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
@ -1134,6 +1140,7 @@ class OpenSearchVectorSearch(VectorStore):
"""
embeddings = embedding.embed_documents(texts)
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
return cls.from_embeddings(
embeddings,
texts,
@ -1150,7 +1157,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
@ -1199,6 +1206,7 @@ class OpenSearchVectorSearch(VectorStore):
"""
embeddings = await embedding.aembed_documents(texts)
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
return await cls.afrom_embeddings(
embeddings,
texts,
@ -1216,7 +1224,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
@ -1285,6 +1293,7 @@ class OpenSearchVectorSearch(VectorStore):
"max_chunk_bytes",
"is_aoss",
]
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
@ -1346,7 +1355,7 @@ class OpenSearchVectorSearch(VectorStore):
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
@ -1417,6 +1426,7 @@ class OpenSearchVectorSearch(VectorStore):
"max_chunk_bytes",
"is_aoss",
]
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable