diff --git a/libs/langchain/langchain/vectorstores/pinecone.py b/libs/langchain/langchain/vectorstores/pinecone.py index c6793b4de81..ec9fff790ab 100644 --- a/libs/langchain/langchain/vectorstores/pinecone.py +++ b/libs/langchain/langchain/vectorstores/pinecone.py @@ -3,15 +3,19 @@ from __future__ import annotations import logging import uuid import warnings -from typing import Any, Callable, Iterable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple, Union import numpy as np from langchain.docstore.document import Document from langchain.embeddings.base import Embeddings +from langchain.utils.iter import batch_iterate from langchain.vectorstores.base import VectorStore from langchain.vectorstores.utils import DistanceStrategy, maximal_marginal_relevance +if TYPE_CHECKING: + from pinecone import Index + logger = logging.getLogger(__name__) @@ -51,16 +55,16 @@ class Pinecone(VectorStore): "Could not import pinecone python package. " "Please install it with `pip install pinecone-client`." ) - if not isinstance(index, pinecone.index.Index): - raise ValueError( - f"client should be an instance of pinecone.index.Index, " - f"got {type(index)}" - ) if not isinstance(embedding, Embeddings): warnings.warn( "Passing in `embedding` as a Callable is deprecated. Please pass in an" " Embeddings object instead." ) + if not isinstance(index, pinecone.index.Index): + raise ValueError( + f"client should be an instance of pinecone.index.Index, " + f"got {type(index)}" + ) self._index = index self._embedding = embedding self._text_key = text_key @@ -93,15 +97,22 @@ class Pinecone(VectorStore): ids: Optional[List[str]] = None, namespace: Optional[str] = None, batch_size: int = 32, + embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. + Upsert optimization is done by chunking the embeddings and upserting them. + This is done to avoid memory issues and optimize using HTTP based embeddings. + For OpenAI embeddings, use pool_threads>4 when constructing the pinecone.Index, + embedding_chunk_size>1000 and batch_size~64 for best performance. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids to associate with the texts. namespace: Optional pinecone namespace to add the texts to. + batch_size: Batch size to use when adding the texts to the vectorstore. + embedding_chunk_size: Chunk size to use when embedding the texts. Returns: List of ids from adding the texts into the vectorstore. @@ -109,18 +120,34 @@ class Pinecone(VectorStore): """ if namespace is None: namespace = self._namespace - # Embed and create the documents - docs = [] + + texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] - embeddings = self._embed_documents(texts) - for i, (text, embedding) in enumerate(zip(texts, embeddings)): - metadata = metadatas[i] if metadatas else {} + metadatas = metadatas or [{} for _ in texts] + for metadata, text in zip(metadatas, texts): metadata[self._text_key] = text - docs.append((ids[i], embedding, metadata)) - # upsert to Pinecone - self._index.upsert( - vectors=docs, namespace=namespace, batch_size=batch_size, **kwargs - ) + + # For loops to avoid memory issues and optimize when using HTTP based embeddings + # The first loop runs the embeddings, it benefits when using OpenAI embeddings + # The second loops runs the pinecone upsert asynchoronously. + for i in range(0, len(texts), embedding_chunk_size): + chunk_texts = texts[i : i + embedding_chunk_size] + chunk_ids = ids[i : i + embedding_chunk_size] + chunk_metadatas = metadatas[i : i + embedding_chunk_size] + embeddings = self._embed_documents(chunk_texts) + async_res = [ + self._index.upsert( + vectors=batch, + namespace=namespace, + async_req=True, + **kwargs, + ) + for batch in batch_iterate( + batch_size, zip(chunk_ids, embeddings, chunk_metadatas) + ) + ] + [res.get() for res in async_res] + return ids def similarity_search_with_score( @@ -302,6 +329,45 @@ class Pinecone(VectorStore): embedding, k, fetch_k, lambda_mult, filter, namespace ) + @classmethod + def get_pinecone_index( + cls, + index_name: Optional[str], + pool_threads: int = 4, + ) -> Index: + """Return a Pinecone Index instance. + + Args: + index_name: Name of the index to use. + pool_threads: Number of threads to use for index upsert. + Returns: + Pinecone Index instance.""" + + try: + import pinecone + except ImportError: + raise ValueError( + "Could not import pinecone python package. " + "Please install it with `pip install pinecone-client`." + ) + + indexes = pinecone.list_indexes() # checks if provided index exists + + if index_name in indexes: + index = pinecone.Index(index_name, pool_threads=pool_threads) + elif len(indexes) == 0: + raise ValueError( + "No active indexes found in your Pinecone project, " + "are you sure you're using the right Pinecone API key and Environment? " + "Please double check your Pinecone dashboard." + ) + else: + raise ValueError( + f"Index '{index_name}' not found in your Pinecone project. " + f"Did you mean one of the following indexes: {', '.join(indexes)}" + ) + return index + @classmethod def from_texts( cls, @@ -311,9 +377,11 @@ class Pinecone(VectorStore): ids: Optional[List[str]] = None, batch_size: int = 32, text_key: str = "text", - index_name: Optional[str] = None, namespace: Optional[str] = None, + index_name: Optional[str] = None, upsert_kwargs: Optional[dict] = None, + pool_threads: int = 4, + embeddings_chunk_size: int = 1000, **kwargs: Any, ) -> Pinecone: """Construct Pinecone wrapper from raw documents. @@ -324,6 +392,7 @@ class Pinecone(VectorStore): This is intended to be a quick way to get started. + The `pool_threads` affects the speed of the upsert operations. Example: .. code-block:: python @@ -341,54 +410,19 @@ class Pinecone(VectorStore): index_name="langchain-demo" ) """ - try: - import pinecone - except ImportError: - raise ValueError( - "Could not import pinecone python package. " - "Please install it with `pip install pinecone-client`." - ) + pinecone_index = cls.get_pinecone_index(index_name, pool_threads) + pinecone = cls(pinecone_index, embedding, text_key, namespace, **kwargs) - indexes = pinecone.list_indexes() # checks if provided index exists - - if index_name in indexes: - index = pinecone.Index(index_name) - elif len(indexes) == 0: - raise ValueError( - "No active indexes found in your Pinecone project, " - "are you sure you're using the right API key and environment?" - ) - else: - raise ValueError( - f"Index '{index_name}' not found in your Pinecone project. " - f"Did you mean one of the following indexes: {', '.join(indexes)}" - ) - - for i in range(0, len(texts), batch_size): - # set end position of batch - i_end = min(i + batch_size, len(texts)) - # get batch of texts and ids - lines_batch = texts[i:i_end] - # create ids if not provided - if ids: - ids_batch = ids[i:i_end] - else: - ids_batch = [str(uuid.uuid4()) for n in range(i, i_end)] - # create embeddings - embeds = embedding.embed_documents(lines_batch) - # prep metadata and upsert batch - if metadatas: - metadata = metadatas[i:i_end] - else: - metadata = [{} for _ in range(i, i_end)] - for j, line in enumerate(lines_batch): - metadata[j][text_key] = line - to_upsert = zip(ids_batch, embeds, metadata) - - # upsert to Pinecone - _upsert_kwargs = upsert_kwargs or {} - index.upsert(vectors=list(to_upsert), namespace=namespace, **_upsert_kwargs) - return cls(index, embedding, text_key, namespace, **kwargs) + pinecone.add_texts( + texts, + metadatas=metadatas, + ids=ids, + namespace=namespace, + batch_size=batch_size, + embedding_chunk_size=embeddings_chunk_size, + **(upsert_kwargs or {}), + ) + return pinecone @classmethod def from_existing_index( @@ -397,17 +431,11 @@ class Pinecone(VectorStore): embedding: Embeddings, text_key: str = "text", namespace: Optional[str] = None, + pool_threads: int = 4, ) -> Pinecone: """Load pinecone vectorstore from index name.""" - try: - import pinecone - except ImportError: - raise ValueError( - "Could not import pinecone python package. " - "Please install it with `pip install pinecone-client`." - ) - - return cls(pinecone.Index(index_name), embedding, text_key, namespace) + pinecone_index = cls.get_pinecone_index(index_name, pool_threads) + return cls(pinecone_index, embedding, text_key, namespace) def delete( self, diff --git a/libs/langchain/tests/integration_tests/vectorstores/test_pinecone.py b/libs/langchain/tests/integration_tests/vectorstores/test_pinecone.py index db6b6cdbd93..66fd3929b80 100644 --- a/libs/langchain/tests/integration_tests/vectorstores/test_pinecone.py +++ b/libs/langchain/tests/integration_tests/vectorstores/test_pinecone.py @@ -231,3 +231,57 @@ class TestPinecone: assert all( (1 >= score or np.isclose(score, 1)) and score >= 0 for _, score in output ) + + @pytest.mark.skipif(reason="slow to run for benchmark") + @pytest.mark.parametrize( + "pool_threads,batch_size,embeddings_chunk_size,data_multiplier", + [ + ( + 1, + 32, + 32, + 1000, + ), # simulate single threaded with embeddings_chunk_size = batch_size = 32 + ( + 1, + 32, + 1000, + 1000, + ), # simulate single threaded with embeddings_chunk_size = 1000 + ( + 4, + 32, + 1000, + 1000, + ), # simulate 4 threaded with embeddings_chunk_size = 1000 + (20, 64, 5000, 1000), + ], # simulate 20 threaded with embeddings_chunk_size = 5000 + ) + def test_from_texts_with_metadatas_benchmark( + self, + pool_threads: int, + batch_size: int, + embeddings_chunk_size: int, + data_multiplier: int, + documents: List[Document], + embedding_openai: OpenAIEmbeddings, + ) -> None: + """Test end to end construction and search.""" + + texts = [document.page_content for document in documents] * data_multiplier + uuids = [uuid.uuid4().hex for _ in range(len(texts))] + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = Pinecone.from_texts( + texts, + embedding_openai, + ids=uuids, + metadatas=metadatas, + index_name=index_name, + namespace=namespace_name, + pool_threads=pool_threads, + batch_size=batch_size, + embeddings_chunk_size=embeddings_chunk_size, + ) + + query = "What did the president say about Ketanji Brown Jackson" + _ = docsearch.similarity_search(query, k=1, namespace=namespace_name) diff --git a/pyproject.toml b/pyproject.toml index dcf5c938a81..c2d3edde5eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,4 +40,4 @@ ignore-regex = '.*(Stati Uniti|Tense=Pres).*' # whats is a typo but used frequently in queries so kept as is # aapply - async apply # unsecure - typo but part of API, decided to not bother for now -ignore-words-list = 'momento,collison,ned,foor,reworkd,parth,whats,aapply,mysogyny,unsecure,damon,crate' \ No newline at end of file +ignore-words-list = 'momento,collison,ned,foor,reworkd,parth,whats,aapply,mysogyny,unsecure,damon,crate'