pinecone: delete from monorepo (#29889)

This now lives in https://github.com/langchain-ai/langchain-pinecone
This commit is contained in:
ccurme 2025-02-19 12:55:15 -05:00 committed by GitHub
parent 6c1e21d128
commit 68b13e5172
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 2 additions and 3734 deletions

View File

@ -64,8 +64,6 @@ jobs:
NOMIC_API_KEY: ${{ secrets.NOMIC_API_KEY }}
WATSONX_APIKEY: ${{ secrets.WATSONX_APIKEY }}
WATSONX_PROJECT_ID: ${{ secrets.WATSONX_PROJECT_ID }}
PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }}
PINECONE_ENVIRONMENT: ${{ secrets.PINECONE_ENVIRONMENT }}
ASTRA_DB_API_ENDPOINT: ${{ secrets.ASTRA_DB_API_ENDPOINT }}
ASTRA_DB_APPLICATION_TOKEN: ${{ secrets.ASTRA_DB_APPLICATION_TOKEN }}
ASTRA_DB_KEYSPACE: ${{ secrets.ASTRA_DB_KEYSPACE }}

View File

@ -297,8 +297,6 @@ jobs:
NOMIC_API_KEY: ${{ secrets.NOMIC_API_KEY }}
WATSONX_APIKEY: ${{ secrets.WATSONX_APIKEY }}
WATSONX_PROJECT_ID: ${{ secrets.WATSONX_PROJECT_ID }}
PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }}
PINECONE_ENVIRONMENT: ${{ secrets.PINECONE_ENVIRONMENT }}
ASTRA_DB_API_ENDPOINT: ${{ secrets.ASTRA_DB_API_ENDPOINT }}
ASTRA_DB_APPLICATION_TOKEN: ${{ secrets.ASTRA_DB_APPLICATION_TOKEN }}
ASTRA_DB_KEYSPACE: ${{ secrets.ASTRA_DB_KEYSPACE }}

View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,65 +0,0 @@
.PHONY: all format lint test tests integration_tests docker_tests help extended_tests
# Default target executed when no arguments are given to make.
all: help
.EXPORT_ALL_VARIABLES:
UV_FROZEN = true
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
integration_test integration_tests: TEST_FILE = tests/integration_tests/
test tests:
uv run --group test pytest --disable-socket --allow-unix-socket $(TEST_FILE)
integration_test integration_tests:
uv run --group test --group test_integration pytest $(TEST_FILE)
test_watch:
uv run --group test ptw --snapshot-update --now . -- -vv $(TEST_FILE)
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --relative=libs/partners/pinecone --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_pinecone
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
[ "$(PYTHON_FILES)" = "" ] || uv run --all-groups ruff check $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || uv run --all-groups ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || mkdir -p $(MYPY_CACHE) && uv run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
[ "$(PYTHON_FILES)" = "" ] || uv run --all-groups ruff format $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || uv run --all-groups ruff check --select I --fix $(PYTHON_FILES)
spell_check:
uv run --all-groups codespell --toml pyproject.toml
spell_fix:
uv run --all-groups codespell --toml pyproject.toml -w
check_imports: $(shell find langchain_pinecone -name '*.py')
uv run --all-groups python ./scripts/check_imports.py $^
######################
# HELP
######################
help:
@echo '----'
@echo 'check_imports - check imports'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

View File

@ -1,26 +1,3 @@
# langchain-pinecone
This package has moved!
This package contains the LangChain integration with Pinecone.
## Installation
```bash
pip install -U langchain-pinecone
```
And you should configure credentials by setting the following environment variables:
- `PINECONE_API_KEY`
- `PINECONE_INDEX_NAME`
## Usage
The `PineconeVectorStore` class exposes the connection to the Pinecone vector store.
```python
from langchain_pinecone import PineconeVectorStore
embeddings = ... # use a LangChain Embeddings class
vectorstore = PineconeVectorStore(embeddings=embeddings)
```
https://github.com/langchain-ai/langchain-pinecone

View File

@ -1,8 +0,0 @@
from langchain_pinecone.embeddings import PineconeEmbeddings
from langchain_pinecone.vectorstores import Pinecone, PineconeVectorStore
__all__ = [
"PineconeEmbeddings",
"PineconeVectorStore",
"Pinecone",
]

View File

@ -1,78 +0,0 @@
from enum import Enum
from typing import List, Union
import numpy as np
Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray]
class DistanceStrategy(str, Enum):
"""Enumerator of the Distance strategies for calculating distances
between vectors."""
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"
COSINE = "COSINE"
def maximal_marginal_relevance(
query_embedding: np.ndarray,
embedding_list: list,
lambda_mult: float = 0.5,
k: int = 4,
) -> List[int]:
"""Calculate maximal marginal relevance."""
if min(k, len(embedding_list)) <= 0:
return []
if query_embedding.ndim == 1:
query_embedding = np.expand_dims(query_embedding, axis=0)
similarity_to_query = cosine_similarity(query_embedding, embedding_list)[0]
most_similar = int(np.argmax(similarity_to_query))
idxs = [most_similar]
selected = np.array([embedding_list[most_similar]])
while len(idxs) < min(k, len(embedding_list)):
best_score = -np.inf
idx_to_add = -1
similarity_to_selected = cosine_similarity(embedding_list, selected)
for i, query_score in enumerate(similarity_to_query):
if i in idxs:
continue
redundant_score = max(similarity_to_selected[i])
equation_score = (
lambda_mult * query_score - (1 - lambda_mult) * redundant_score
)
if equation_score > best_score:
best_score = equation_score
idx_to_add = i
idxs.append(idx_to_add)
selected = np.append(selected, [embedding_list[idx_to_add]], axis=0)
return idxs
def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
if len(X) == 0 or len(Y) == 0:
return np.array([])
X = np.array(X)
Y = np.array(Y)
if X.shape[1] != Y.shape[1]:
raise ValueError(
f"Number of columns in X and Y must be the same. X has shape {X.shape} "
f"and Y has shape {Y.shape}."
)
try:
import simsimd as simd
X = np.array(X, dtype=np.float32)
Y = np.array(Y, dtype=np.float32)
Z = 1 - np.array(simd.cdist(X, Y, metric="cosine"))
return Z
except ImportError:
X_norm = np.linalg.norm(X, axis=1)
Y_norm = np.linalg.norm(Y, axis=1)
# Ignore divide by zero errors run time warnings as those are handled below.
with np.errstate(divide="ignore", invalid="ignore"):
similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm)
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
return similarity

View File

@ -1,186 +0,0 @@
import logging
from typing import Any, Dict, Iterable, List, Optional
import aiohttp
from langchain_core.embeddings import Embeddings
from langchain_core.utils import secret_from_env
from pinecone import Pinecone as PineconeClient # type: ignore[import-untyped]
from pydantic import (
BaseModel,
ConfigDict,
Field,
PrivateAttr,
SecretStr,
model_validator,
)
from typing_extensions import Self
logger = logging.getLogger(__name__)
DEFAULT_BATCH_SIZE = 64
class PineconeEmbeddings(BaseModel, Embeddings):
"""PineconeEmbeddings embedding model.
Example:
.. code-block:: python
from langchain_pinecone import PineconeEmbeddings
model = PineconeEmbeddings(model="multilingual-e5-large")
"""
# Clients
_client: PineconeClient = PrivateAttr(default=None)
_async_client: Optional[aiohttp.ClientSession] = PrivateAttr(default=None)
model: str
"""Model to use for example 'multilingual-e5-large'."""
# Config
batch_size: Optional[int] = None
"""Batch size for embedding documents."""
query_params: Dict = Field(default_factory=dict)
"""Parameters for embedding query."""
document_params: Dict = Field(default_factory=dict)
"""Parameters for embedding document"""
#
dimension: Optional[int] = None
#
show_progress_bar: bool = False
pinecone_api_key: SecretStr = Field(
default_factory=secret_from_env(
"PINECONE_API_KEY",
error_message="Pinecone API key not found. Please set the PINECONE_API_KEY "
"environment variable or pass it via `pinecone_api_key`.",
),
alias="api_key",
)
"""Pinecone API key.
If not provided, will look for the PINECONE_API_KEY environment variable."""
model_config = ConfigDict(
extra="forbid",
populate_by_name=True,
protected_namespaces=(),
)
@property
def async_client(self) -> aiohttp.ClientSession:
"""Lazily initialize the async client."""
if self._async_client is None:
self._async_client = aiohttp.ClientSession(
headers={
"Api-Key": self.pinecone_api_key.get_secret_value(),
"Content-Type": "application/json",
"X-Pinecone-API-Version": "2024-10",
}
)
return self._async_client
@model_validator(mode="before")
@classmethod
def set_default_config(cls, values: dict) -> Any:
"""Set default configuration based on model."""
default_config_map = {
"multilingual-e5-large": {
"batch_size": 96,
"query_params": {"input_type": "query", "truncation": "END"},
"document_params": {"input_type": "passage", "truncation": "END"},
"dimension": 1024,
}
}
model = values.get("model")
if model in default_config_map:
config = default_config_map[model]
for key, value in config.items():
if key not in values:
values[key] = value
return values
@model_validator(mode="after")
def validate_environment(self) -> Self:
"""Validate that Pinecone version and credentials exist in environment."""
api_key_str = self.pinecone_api_key.get_secret_value()
client = PineconeClient(api_key=api_key_str, source_tag="langchain")
self._client = client
# Ensure async_client is lazily initialized
return self
def _get_batch_iterator(self, texts: List[str]) -> Iterable:
if self.batch_size is None:
batch_size = DEFAULT_BATCH_SIZE
else:
batch_size = self.batch_size
if self.show_progress_bar:
try:
from tqdm.auto import tqdm # type: ignore
except ImportError as e:
raise ImportError(
"Must have tqdm installed if `show_progress_bar` is set to True. "
"Please install with `pip install tqdm`."
) from e
_iter = tqdm(range(0, len(texts), batch_size))
else:
_iter = range(0, len(texts), batch_size)
return _iter
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
embeddings: List[List[float]] = []
_iter = self._get_batch_iterator(texts)
for i in _iter:
response = self._client.inference.embed(
model=self.model,
parameters=self.document_params,
inputs=texts[i : i + self.batch_size],
)
embeddings.extend([r["values"] for r in response])
return embeddings
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
embeddings: List[List[float]] = []
_iter = self._get_batch_iterator(texts)
for i in _iter:
response = await self._aembed_texts(
model=self.model,
parameters=self.document_params,
texts=texts[i : i + self.batch_size],
)
embeddings.extend([r["values"] for r in response["data"]])
return embeddings
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
return self._client.inference.embed(
model=self.model, parameters=self.query_params, inputs=[text]
)[0]["values"]
async def aembed_query(self, text: str) -> List[float]:
"""Asynchronously embed query text."""
response = await self._aembed_texts(
model=self.model,
parameters=self.document_params,
texts=[text],
)
return response["data"][0]["values"]
async def _aembed_texts(
self, texts: List[str], model: str, parameters: dict
) -> Dict:
data = {
"model": model,
"inputs": [{"text": text} for text in texts],
"parameters": parameters,
}
async with self.async_client.post(
"https://api.pinecone.io/embed", json=data
) as response:
response_data = await response.json(content_type=None)
return response_data

View File

@ -1,648 +0,0 @@
from __future__ import annotations
import logging
import os
import uuid
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
List,
Optional,
Tuple,
TypeVar,
)
import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore
from pinecone import Pinecone as PineconeClient # type: ignore
from langchain_pinecone._utilities import DistanceStrategy, maximal_marginal_relevance
if TYPE_CHECKING:
from pinecone import Index
logger = logging.getLogger(__name__)
VST = TypeVar("VST", bound=VectorStore)
class PineconeVectorStore(VectorStore):
"""Pinecone vector store integration.
Setup:
Install ``langchain-pinecone`` and set the environment variable ``PINECONE_API_KEY``.
.. code-block:: bash
pip install -qU langchain-pinecone
export PINECONE_API_KEY = "your-pinecone-api-key"
Key init args indexing params:
embedding: Embeddings
Embedding function to use.
Key init args client params:
index: Optional[Index]
Index to use.
# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python
import time
import os
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))
index_name = "langchain-test-index" # change if desired
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
if index_name not in existing_indexes:
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
deletion_protection="enabled", # Defaults to "disabled"
)
while not pc.describe_index(index_name).status["ready"]:
time.sleep(1)
index = pc.Index(index_name)
vector_store = PineconeVectorStore(index=index, embedding=OpenAIEmbeddings())
Add Documents:
.. code-block:: python
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Delete Documents:
.. code-block:: python
vector_store.delete(ids=["3"])
Search:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* thud [{'bar': 'baz'}]
Search with filter:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"})
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* thud [{'bar': 'baz'}]
Search with score:
.. code-block:: python
results = vector_store.similarity_search_with_score(query="qux",k=1)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* [SIM=0.832268] foo [{'baz': 'bar'}]
Async:
.. code-block:: python
# add documents
# await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
* [SIM=0.832268] foo [{'baz': 'bar'}]
Use as Retriever:
.. code-block:: python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
.. code-block:: python
[Document(metadata={'bar': 'baz'}, page_content='thud')]
""" # noqa: E501
def __init__(
self,
# setting default params to bypass having to pass in
# the index and embedding objects - manually throw
# exceptions if they are not passed in or set in environment
# (keeping param for backwards compatibility)
index: Optional[Any] = None,
embedding: Optional[Embeddings] = None,
text_key: Optional[str] = "text",
namespace: Optional[str] = None,
distance_strategy: Optional[DistanceStrategy] = DistanceStrategy.COSINE,
*,
pinecone_api_key: Optional[str] = None,
index_name: Optional[str] = None,
):
if embedding is None:
raise ValueError("Embedding must be provided")
self._embedding = embedding
if text_key is None:
raise ValueError("Text key must be provided")
self._text_key = text_key
self._namespace = namespace
self.distance_strategy = distance_strategy
if index:
# supports old way of initializing externally
self._index = index
else:
# all internal initialization
_pinecone_api_key = (
pinecone_api_key or os.environ.get("PINECONE_API_KEY") or ""
)
if not _pinecone_api_key:
raise ValueError(
"Pinecone API key must be provided in either `pinecone_api_key` "
"or `PINECONE_API_KEY` environment variable"
)
_index_name = index_name or os.environ.get("PINECONE_INDEX_NAME") or ""
if not _index_name:
raise ValueError(
"Pinecone index name must be provided in either `index_name` "
"or `PINECONE_INDEX_NAME` environment variable"
)
# needs
client = PineconeClient(api_key=_pinecone_api_key, source_tag="langchain")
self._index = client.Index(_index_name)
@property
def embeddings(self) -> Optional[Embeddings]:
"""Access the query embedding object if available."""
return self._embedding
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
namespace: Optional[str] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
async_req: bool = True,
id_prefix: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Upsert optimization is done by chunking the embeddings and upserting them.
This is done to avoid memory issues and optimize using HTTP based embeddings.
For OpenAI embeddings, use pool_threads>4 when constructing the pinecone.Index,
embedding_chunk_size>1000 and batch_size~64 for best performance.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
namespace: Optional pinecone namespace to add the texts to.
batch_size: Batch size to use when adding the texts to the vectorstore.
embedding_chunk_size: Chunk size to use when embedding the texts.
async_req: Whether runs asynchronously.
id_prefix: Optional string to use as an ID prefix when upserting vectors.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
if id_prefix:
ids = [
id_prefix + "#" + id if id_prefix + "#" not in id else id for id in ids
]
metadatas = metadatas or [{} for _ in texts]
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
# For loops to avoid memory issues and optimize when using HTTP based embeddings
# The first loop runs the embeddings, it benefits when using OpenAI embeddings
# The second loops runs the pinecone upsert asynchronously.
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embedding.embed_documents(chunk_texts)
vector_tuples = zip(chunk_ids, embeddings, chunk_metadatas)
if async_req:
# Runs the pinecone upsert asynchronously.
async_res = [
self._index.upsert(
vectors=batch_vector_tuples,
namespace=namespace,
async_req=async_req,
**kwargs,
)
for batch_vector_tuples in batch_iterate(batch_size, vector_tuples)
]
[res.get() for res in async_res]
else:
self._index.upsert(
vectors=vector_tuples,
namespace=namespace,
async_req=async_req,
**kwargs,
)
return ids
def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""Return pinecone documents most similar to query, along with scores.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Dictionary of argument(s) to filter on metadata
namespace: Namespace to search in. Default will search in '' namespace.
Returns:
List of Documents most similar to the query and score for each
"""
return self.similarity_search_by_vector_with_score(
self._embedding.embed_query(query), k=k, filter=filter, namespace=namespace
)
def similarity_search_by_vector_with_score(
self,
embedding: List[float],
*,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""Return pinecone documents most similar to embedding, along with scores."""
if namespace is None:
namespace = self._namespace
docs = []
results = self._index.query(
vector=embedding,
top_k=k,
include_metadata=True,
namespace=namespace,
filter=filter,
)
for res in results["matches"]:
metadata = res["metadata"]
id = res.get("id")
if self._text_key in metadata:
text = metadata.pop(self._text_key)
score = res["score"]
docs.append(
(Document(id=id, page_content=text, metadata=metadata), score)
)
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
return docs
def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return pinecone documents most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Dictionary of argument(s) to filter on metadata
namespace: Namespace to search in. Default will search in '' namespace.
Returns:
List of Documents most similar to the query and score for each
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function
may differ depending on a few things, including:
- the distance / similarity metric used by the VectorStore
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
- embedding dimensionality
- etc.
"""
if self.distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, max_inner_product "
"(dot product), or euclidean"
)
@staticmethod
def _cosine_relevance_score_fn(score: float) -> float:
"""Pinecone returns cosine similarity scores between [-1,1]"""
return (score + 1) / 2
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Dictionary of argument(s) to filter on metadata
namespace: Namespace to search in. Default will search in '' namespace.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if namespace is None:
namespace = self._namespace
results = self._index.query(
vector=[embedding],
top_k=fetch_k,
include_values=True,
include_metadata=True,
namespace=namespace,
filter=filter,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item["values"] for item in results["matches"]],
k=k,
lambda_mult=lambda_mult,
)
selected = [results["matches"][i]["metadata"] for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata)
for metadata in selected
]
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Dictionary of argument(s) to filter on metadata
namespace: Namespace to search in. Default will search in '' namespace.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self._embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult, filter, namespace
)
@classmethod
def get_pinecone_index(
cls,
index_name: Optional[str],
pool_threads: int = 4,
*,
pinecone_api_key: Optional[str] = None,
) -> Index:
"""Return a Pinecone Index instance.
Args:
index_name: Name of the index to use.
pool_threads: Number of threads to use for index upsert.
pinecone_api_key: The api_key of Pinecone.
Returns:
Pinecone Index instance."""
_pinecone_api_key = pinecone_api_key or os.environ.get("PINECONE_API_KEY") or ""
client = PineconeClient(
api_key=_pinecone_api_key, pool_threads=pool_threads, source_tag="langchain"
)
indexes = client.list_indexes()
index_names = [i.name for i in indexes.index_list["indexes"]]
if index_name in index_names:
index = client.Index(index_name)
elif len(index_names) == 0:
raise ValueError(
"No active indexes found in your Pinecone project, "
"are you sure you're using the right Pinecone API key and Environment? "
"Please double check your Pinecone dashboard."
)
else:
raise ValueError(
f"Index '{index_name}' not found in your Pinecone project. "
f"Did you mean one of the following indexes: {', '.join(index_names)}"
)
return index
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
text_key: str = "text",
namespace: Optional[str] = None,
index_name: Optional[str] = None,
upsert_kwargs: Optional[dict] = None,
pool_threads: int = 4,
embeddings_chunk_size: int = 1000,
async_req: bool = True,
*,
id_prefix: Optional[str] = None,
**kwargs: Any,
) -> PineconeVectorStore:
"""Construct Pinecone wrapper from raw documents.
This is a user-friendly interface that:
1. Embeds documents.
2. Adds the documents to a provided Pinecone index
This is intended to be a quick way to get started.
The `pool_threads` affects the speed of the upsert operations.
Setup: set the `PINECONE_API_KEY` environment variable to your Pinecone API key.
Example:
.. code-block:: python
from langchain_pinecone import PineconeVectorStore, PineconeEmbeddings
embeddings = PineconeEmbeddings(model="multilingual-e5-large")
index_name = "my-index"
vectorstore = PineconeVectorStore.from_texts(
texts,
index_name=index_name,
embedding=embedding,
namespace=namespace,
)
"""
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
pinecone = cls(pinecone_index, embedding, text_key, namespace, **kwargs)
pinecone.add_texts(
texts,
metadatas=metadatas,
ids=ids,
namespace=namespace,
batch_size=batch_size,
embedding_chunk_size=embeddings_chunk_size,
async_req=async_req,
id_prefix=id_prefix,
**(upsert_kwargs or {}),
)
return pinecone
@classmethod
def from_existing_index(
cls,
index_name: str,
embedding: Embeddings,
text_key: str = "text",
namespace: Optional[str] = None,
pool_threads: int = 4,
) -> PineconeVectorStore:
"""Load pinecone vectorstore from index name."""
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
return cls(pinecone_index, embedding, text_key, namespace)
def delete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
namespace: Optional[str] = None,
filter: Optional[dict] = None,
**kwargs: Any,
) -> None:
"""Delete by vector IDs or filter.
Args:
ids: List of ids to delete.
delete_all: Whether delete all vectors in the index.
filter: Dictionary of conditions to filter vectors to delete.
namespace: Namespace to search in. Default will search in '' namespace.
"""
if namespace is None:
namespace = self._namespace
if delete_all:
self._index.delete(delete_all=True, namespace=namespace, **kwargs)
elif ids is not None:
chunk_size = 1000
for i in range(0, len(ids), chunk_size):
chunk = ids[i : i + chunk_size]
self._index.delete(ids=chunk, namespace=namespace, **kwargs)
elif filter is not None:
self._index.delete(filter=filter, namespace=namespace, **kwargs)
else:
raise ValueError("Either ids, delete_all, or filter must be provided.")
return None
@deprecated(since="0.0.3", removal="1.0.0", alternative="PineconeVectorStore")
class Pinecone(PineconeVectorStore):
"""Deprecated. Use PineconeVectorStore instead."""
pass

View File

@ -1,63 +0,0 @@
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"
[project]
authors = []
license = { text = "MIT" }
requires-python = "<3.14,>=3.9"
dependencies = [
"langchain-core<1.0.0,>=0.3.34",
"pinecone<6.0.0,>=5.4.0",
"aiohttp<3.11,>=3.10",
"numpy<2.0.0,>=1.26.4",
"langchain-tests<1.0.0,>=0.3.7",
]
name = "langchain-pinecone"
version = "0.2.3"
description = "An integration package connecting Pinecone and LangChain"
readme = "README.md"
[project.urls]
"Source Code" = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/pinecone"
"Release Notes" = "https://github.com/langchain-ai/langchain/releases?q=tag%3A%22langchain-pinecone%3D%3D0%22&expanded=true"
repository = "https://github.com/langchain-ai/langchain"
[dependency-groups]
test = [
"pytest<9,>=8",
"freezegun<2.0.0,>=1.2.2",
"pytest-mock<4.0.0,>=3.10.0",
"syrupy<5.0.0,>=4.0.2",
"pytest-watcher<1.0.0,>=0.3.4",
"pytest-asyncio<1,>=0.25.0",
"pytest-socket<1.0.0,>=0.7.0",
"langchain-core",
]
codespell = ["codespell<3.0.0,>=2.2.0"]
test_integration = ["langchain-openai"]
lint = ["ruff<1.0,>=0.5"]
dev = ["langchain-core"]
typing = ["mypy<2.0,>=1.10", "simsimd<6.0.0,>=5.0.0", "langchain-core"]
[tool.uv.sources]
langchain-core = { path = "../../core", editable = true }
langchain-openai = { path = "../openai", editable = true }
[tool.mypy]
disallow_untyped_defs = "True"
[tool.ruff.lint]
select = ["E", "F", "I", "T201"]
[tool.coverage.run]
omit = ["tests/*"]
[tool.pytest.ini_options]
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
markers = [
"requires: mark tests as requiring a specific library",
"compile: mark placeholder test used to compile integration tests without running them",
]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

View File

@ -1,17 +0,0 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_failure = True
print(file) # noqa: T201
traceback.print_exc()
print() # noqa: T201
sys.exit(1 if has_failure else 0)

View File

@ -1,17 +0,0 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain or langchain_experimental
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

View File

@ -1,7 +0,0 @@
import pytest # type: ignore[import-not-found]
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

View File

@ -1,84 +0,0 @@
import time
from typing import AsyncGenerator
import pytest
from langchain_core.documents import Document
from pinecone import Pinecone, ServerlessSpec # type: ignore
from langchain_pinecone import PineconeEmbeddings, PineconeVectorStore
from tests.integration_tests.test_vectorstores import DEFAULT_SLEEP
DIMENSION = 1024
INDEX_NAME = "langchain-pinecone-embeddings"
MODEL = "multilingual-e5-large"
NAMESPACE_NAME = "test_namespace"
@pytest.fixture(scope="function")
async def embd_client() -> AsyncGenerator[PineconeEmbeddings, None]:
client = PineconeEmbeddings(model=MODEL)
yield client
await client.async_client.close()
@pytest.fixture
def pc() -> Pinecone:
return Pinecone()
@pytest.fixture()
def pc_index(pc: Pinecone) -> Pinecone.Index:
if INDEX_NAME not in [index["name"] for index in pc.list_indexes()]:
pc.create_index(
name=INDEX_NAME,
dimension=DIMENSION,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
while not pc.describe_index(INDEX_NAME).status["ready"]:
time.sleep(1)
yield pc.Index(INDEX_NAME)
pc.delete_index(INDEX_NAME)
def test_embed_query(embd_client: PineconeEmbeddings) -> None:
out = embd_client.embed_query("Hello, world!")
assert isinstance(out, list)
assert len(out) == DIMENSION
@pytest.mark.asyncio
async def test_aembed_query(embd_client: PineconeEmbeddings) -> None:
out = await embd_client.aembed_query("Hello, world!")
assert isinstance(out, list)
assert len(out) == DIMENSION
def test_embed_documents(embd_client: PineconeEmbeddings) -> None:
out = embd_client.embed_documents(["Hello, world!", "This is a test."])
assert isinstance(out, list)
assert len(out) == 2
assert len(out[0]) == DIMENSION
@pytest.mark.asyncio
async def test_aembed_documents(embd_client: PineconeEmbeddings) -> None:
out = await embd_client.aembed_documents(["Hello, world!", "This is a test."])
assert isinstance(out, list)
assert len(out) == 2
assert len(out[0]) == DIMENSION
def test_vector_store(
embd_client: PineconeEmbeddings, pc_index: Pinecone.Index
) -> None:
vectorstore = PineconeVectorStore(index_name=INDEX_NAME, embedding=embd_client)
vectorstore.add_documents(
[Document("Hello, world!"), Document("This is a test.")],
namespace=NAMESPACE_NAME,
)
time.sleep(DEFAULT_SLEEP) # Increase wait time to ensure indexing is complete
resp = vectorstore.similarity_search(query="hello", namespace=NAMESPACE_NAME)
assert len(resp) == 2

View File

@ -1,330 +0,0 @@
import os
import time
import uuid
from typing import List
import numpy as np
import pinecone # type: ignore
import pytest # type: ignore[import-not-found]
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings # type: ignore[import-not-found]
from langchain_tests.integration_tests.vectorstores import VectorStoreIntegrationTests
from pinecone import ServerlessSpec
from pytest_mock import MockerFixture # type: ignore[import-not-found]
from langchain_pinecone import PineconeVectorStore
INDEX_NAME = "langchain-test-index" # name of the index
NAMESPACE_NAME = "langchain-test-namespace" # name of the namespace
DIMENSION = 1536 # dimension of the embeddings
DEFAULT_SLEEP = 20
class TestPinecone(VectorStoreIntegrationTests):
index: "pinecone.Index"
pc: "pinecone.Pinecone"
@classmethod
def setup_class(self) -> None:
import pinecone
client = pinecone.Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index_list = client.list_indexes()
if INDEX_NAME in [
i["name"] for i in index_list
]: # change to list comprehension
client.delete_index(INDEX_NAME)
time.sleep(DEFAULT_SLEEP) # prevent race with subsequent creation
client.create_index(
name=INDEX_NAME,
dimension=DIMENSION,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-west-2"),
)
self.index = client.Index(INDEX_NAME)
self.pc = client
@classmethod
def teardown_class(self) -> None:
self.pc.delete_index()
@pytest.fixture(autouse=True)
def setup(self) -> None:
# delete all the vectors in the index
print("called") # noqa: T201
index_stats = self.index.describe_index_stats()
if index_stats["total_vector_count"] > 0:
try:
self.index.delete(delete_all=True, namespace=NAMESPACE_NAME)
except Exception:
# if namespace not found
pass
@pytest.fixture
def embedding_openai(self) -> OpenAIEmbeddings:
return OpenAIEmbeddings()
@pytest.fixture
def texts(self) -> List[str]:
return ["foo", "bar", "baz"]
def test_from_texts(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
"""Test end to end construction and search."""
unique_id = uuid.uuid4().hex
needs = f"foobuu {unique_id} booo"
texts.insert(0, needs)
docsearch = PineconeVectorStore.from_texts(
texts=texts,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
)
time.sleep(DEFAULT_SLEEP) # prevent race condition
output = docsearch.similarity_search(unique_id, k=1, namespace=NAMESPACE_NAME)
output[0].id = None # overwrite ID for ease of comparison
assert output == [Document(page_content=needs)]
def test_from_texts_with_metadatas(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
"""Test end to end construction and search."""
unique_id = uuid.uuid4().hex
needs = f"foobuu {unique_id} booo"
texts = [needs] + texts
metadatas = [{"page": i} for i in range(len(texts))]
namespace = f"{NAMESPACE_NAME}-md"
docsearch = PineconeVectorStore.from_texts(
texts,
embedding_openai,
index_name=INDEX_NAME,
metadatas=metadatas,
namespace=namespace,
)
time.sleep(DEFAULT_SLEEP) # prevent race condition
output = docsearch.similarity_search(needs, k=1, namespace=namespace)
output[0].id = None
# TODO: why metadata={"page": 0.0}) instead of {"page": 0}?
assert output == [Document(page_content=needs, metadata={"page": 0.0})]
def test_from_texts_with_scores(self, embedding_openai: OpenAIEmbeddings) -> None:
"""Test end to end construction and search with scores and IDs."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
print("metadatas", metadatas) # noqa: T201
docsearch = PineconeVectorStore.from_texts(
texts,
embedding_openai,
index_name=INDEX_NAME,
metadatas=metadatas,
namespace=NAMESPACE_NAME,
)
print(texts) # noqa: T201
time.sleep(DEFAULT_SLEEP) # prevent race condition
output = docsearch.similarity_search_with_score(
"foo", k=3, namespace=NAMESPACE_NAME
)
docs = [o[0] for o in output]
scores = [o[1] for o in output]
sorted_documents = sorted(docs, key=lambda x: x.metadata["page"])
print(sorted_documents) # noqa: T201
for document in sorted_documents:
document.id = None # overwrite IDs for ease of comparison
# TODO: why metadata={"page": 0.0}) instead of {"page": 0}, etc???
assert sorted_documents == [
Document(page_content="foo", metadata={"page": 0.0}),
Document(page_content="bar", metadata={"page": 1.0}),
Document(page_content="baz", metadata={"page": 2.0}),
]
assert scores[0] > scores[1] > scores[2]
def test_from_existing_index_with_namespaces(
self, embedding_openai: OpenAIEmbeddings
) -> None:
"""Test that namespaces are properly handled."""
# Create two indexes with the same name but different namespaces
texts_1 = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts_1))]
PineconeVectorStore.from_texts(
texts_1,
embedding_openai,
index_name=INDEX_NAME,
metadatas=metadatas,
namespace=f"{INDEX_NAME}-1",
)
texts_2 = ["foo2", "bar2", "baz2"]
metadatas = [{"page": i} for i in range(len(texts_2))]
PineconeVectorStore.from_texts(
texts_2,
embedding_openai,
index_name=INDEX_NAME,
metadatas=metadatas,
namespace=f"{INDEX_NAME}-2",
)
time.sleep(DEFAULT_SLEEP) # prevent race condition
# Search with namespace
docsearch = PineconeVectorStore.from_existing_index(
index_name=INDEX_NAME,
embedding=embedding_openai,
namespace=f"{INDEX_NAME}-1",
)
output = docsearch.similarity_search("foo", k=20, namespace=f"{INDEX_NAME}-1")
# check that we don't get results from the other namespace
page_contents = sorted(set([o.page_content for o in output]))
assert all(content in ["foo", "bar", "baz"] for content in page_contents)
assert all(content not in ["foo2", "bar2", "baz2"] for content in page_contents)
def test_add_documents_with_ids(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
ids = [uuid.uuid4().hex for _ in range(len(texts))]
PineconeVectorStore.from_texts(
texts=texts,
ids=ids,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
)
time.sleep(DEFAULT_SLEEP) # prevent race condition
index_stats = self.index.describe_index_stats()
assert index_stats["namespaces"][NAMESPACE_NAME]["vector_count"] == len(texts)
ids_1 = [uuid.uuid4().hex for _ in range(len(texts))]
PineconeVectorStore.from_texts(
texts=[t + "-1" for t in texts],
ids=ids_1,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
)
time.sleep(DEFAULT_SLEEP) # prevent race condition
index_stats = self.index.describe_index_stats()
assert (
index_stats["namespaces"][NAMESPACE_NAME]["vector_count"] == len(texts) * 2
)
# only focused on this namespace now
# assert index_stats["total_vector_count"] == len(texts) * 2
@pytest.mark.xfail(reason="relevance score just over 1")
def test_relevance_score_bound(self, embedding_openai: OpenAIEmbeddings) -> None:
"""Ensures all relevance scores are between 0 and 1."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = PineconeVectorStore.from_texts(
texts,
embedding_openai,
index_name=INDEX_NAME,
metadatas=metadatas,
)
# wait for the index to be ready
time.sleep(DEFAULT_SLEEP)
output = docsearch.similarity_search_with_relevance_scores("foo", k=3)
print(output) # noqa: T201
assert all(
(1 >= score or np.isclose(score, 1)) and score >= 0 for _, score in output
)
@pytest.mark.skipif(reason="slow to run for benchmark")
@pytest.mark.parametrize(
"pool_threads,batch_size,embeddings_chunk_size,data_multiplier",
[
(
1,
32,
32,
1000,
), # simulate single threaded with embeddings_chunk_size = batch_size = 32
(
1,
32,
1000,
1000,
), # simulate single threaded with embeddings_chunk_size = 1000
(
4,
32,
1000,
1000,
), # simulate 4 threaded with embeddings_chunk_size = 1000
(20, 64, 5000, 1000),
], # simulate 20 threaded with embeddings_chunk_size = 5000
)
def test_from_texts_with_metadatas_benchmark(
self,
pool_threads: int,
batch_size: int,
embeddings_chunk_size: int,
data_multiplier: int,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
texts = [document.page_content for document in documents] * data_multiplier
uuids = [uuid.uuid4().hex for _ in range(len(texts))]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = PineconeVectorStore.from_texts(
texts,
embedding_openai,
ids=uuids,
metadatas=metadatas,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
pool_threads=pool_threads,
batch_size=batch_size,
embeddings_chunk_size=embeddings_chunk_size,
)
query = "What did the president say about Ketanji Brown Jackson"
_ = docsearch.similarity_search(query, k=1, namespace=NAMESPACE_NAME)
@pytest.fixture
def mock_pool_not_supported(self, mocker: MockerFixture) -> None:
"""
This is the error thrown when multiprocessing is not supported.
See https://github.com/langchain-ai/langchain/issues/11168
"""
mocker.patch(
"multiprocessing.synchronize.SemLock.__init__",
side_effect=OSError(
"FileNotFoundError: [Errno 2] No such file or directory"
),
)
@pytest.mark.usefixtures("mock_pool_not_supported")
def test_that_async_freq_uses_multiprocessing(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
with pytest.raises(OSError):
PineconeVectorStore.from_texts(
texts=texts,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
async_req=True,
)
@pytest.mark.usefixtures("mock_pool_not_supported")
def test_that_async_freq_false_enabled_singlethreading(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
PineconeVectorStore.from_texts(
texts=texts,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
async_req=False,
)

View File

@ -1,83 +0,0 @@
from typing import Any, Type
from unittest.mock import patch
import aiohttp
import pytest
from langchain_core.utils import convert_to_secret_str
from langchain_tests.unit_tests.embeddings import EmbeddingsTests
from langchain_pinecone import PineconeEmbeddings
API_KEY = convert_to_secret_str("NOT_A_VALID_KEY")
MODEL_NAME = "multilingual-e5-large"
@pytest.fixture(autouse=True)
def mock_pinecone() -> Any:
"""Mock Pinecone client for all tests."""
with patch("langchain_pinecone.embeddings.PineconeClient") as mock:
yield mock
class TestPineconeEmbeddingsStandard(EmbeddingsTests):
"""Standard LangChain embeddings tests."""
@property
def embeddings_class(self) -> Type[PineconeEmbeddings]:
"""Get the class under test."""
return PineconeEmbeddings
@property
def embedding_model_params(self) -> dict:
"""Get the parameters for initializing the embeddings model."""
return {
"model": MODEL_NAME,
"pinecone_api_key": API_KEY,
}
class TestPineconeEmbeddingsConfig:
"""Additional configuration tests for PineconeEmbeddings."""
def test_default_config(self) -> None:
"""Test default configuration is set correctly."""
embeddings = PineconeEmbeddings(model=MODEL_NAME, pinecone_api_key=API_KEY) # type: ignore
assert embeddings.batch_size == 96
assert embeddings.query_params == {"input_type": "query", "truncation": "END"}
assert embeddings.document_params == {
"input_type": "passage",
"truncation": "END",
}
assert embeddings.dimension == 1024
def test_custom_config(self) -> None:
"""Test custom configuration overrides defaults."""
embeddings = PineconeEmbeddings(
model=MODEL_NAME,
api_key=API_KEY,
batch_size=128,
query_params={"custom": "param"},
document_params={"other": "param"},
)
assert embeddings.batch_size == 128
assert embeddings.query_params == {"custom": "param"}
assert embeddings.document_params == {"other": "param"}
@pytest.mark.asyncio
async def test_async_client_initialization(self) -> None:
"""Test async client is initialized correctly and only when needed."""
embeddings = PineconeEmbeddings(model=MODEL_NAME, api_key=API_KEY)
assert embeddings._async_client is None
# Access async_client property
client = embeddings.async_client
assert client is not None
assert isinstance(client, aiohttp.ClientSession)
# Ensure headers are set correctly
expected_headers = {
"Api-Key": API_KEY.get_secret_value(),
"Content-Type": "application/json",
"X-Pinecone-API-Version": "2024-10",
}
assert client._default_headers == expected_headers

View File

@ -1,11 +0,0 @@
from langchain_pinecone import __all__
EXPECTED_ALL = [
"PineconeVectorStore",
"Pinecone",
"PineconeEmbeddings",
]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)

View File

@ -1,25 +0,0 @@
from unittest.mock import Mock
from langchain_pinecone.vectorstores import PineconeVectorStore
def test_initialization() -> None:
"""Test integration vectorstore initialization."""
# mock index
index = Mock()
embedding = Mock()
text_key = "xyz"
PineconeVectorStore(index, embedding, text_key)
def test_id_prefix() -> None:
"""Test integration of the id_prefix parameter."""
embedding = Mock()
embedding.embed_documents = Mock(return_value=[0.1, 0.2, 0.3, 0.4, 0.5])
index = Mock()
index.upsert = Mock(return_value=None)
text_key = "testing"
vectorstore = PineconeVectorStore(index, embedding, text_key)
texts = ["alpha", "beta", "gamma", "delta", "epsilon"]
id_prefix = "testing_prefixes"
vectorstore.add_texts(texts, id_prefix=id_prefix, async_req=False)

File diff suppressed because it is too large Load Diff