couchbase: Migrate couchbase partner package to different repo (#29239)

**Description:** Migrate the couchbase partner package to
[Couchbase-Ecosystem](https://github.com/Couchbase-Ecosystem/langchain-couchbase)
org
This commit is contained in:
Nithish Raghunandanan 2025-01-15 21:37:27 +01:00 committed by GitHub
parent eaf2fb287f
commit 1051fa5729
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 4 additions and 4505 deletions

View File

@ -259,8 +259,8 @@ packages:
downloads: 35495
downloads_updated_at: '2024-12-23T20:10:11.816059+00:00'
- name: langchain-couchbase
path: libs/partners/couchbase
repo: langchain-ai/langchain
path: .
repo: Couchbase-Ecosystem/langchain-couchbase
downloads: 347
downloads_updated_at: '2024-12-23T20:10:11.816059+00:00'
- name: langchain-ollama

View File

@ -1,3 +0,0 @@
__pycache__
# mypy
.mypy_cache/

View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2024 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,64 +0,0 @@
.PHONY: all format lint test tests integration_tests docker_tests help extended_tests
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
integration_test integration_tests: TEST_FILE = tests/integration_tests/
# unit tests are run with the --disable-socket flag to prevent network calls
test tests:
poetry run pytest --disable-socket --allow-unix-socket $(TEST_FILE)
test_watch:
poetry run ptw --snapshot-update --now . -- -vv $(TEST_FILE)
# integration tests are run without the --disable-socket flag to allow network calls
integration_test integration_tests:
poetry run pytest $(TEST_FILE)
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --relative=libs/partners/couchbase --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_couchbase
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff check $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || mkdir -p $(MYPY_CACHE) && poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff format $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff check --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml
spell_fix:
poetry run codespell --toml pyproject.toml -w
check_imports: $(shell find langchain_couchbase -name '*.py')
poetry run python ./scripts/check_imports.py $^
######################
# HELP
######################
help:
@echo '----'
@echo 'check_imports - check imports'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

View File

@ -1,42 +1,3 @@
# langchain-couchbase
This package has moved!
This package contains the LangChain integration with Couchbase
## Installation
```bash
pip install -U langchain-couchbase
```
## Usage
The `CouchbaseVectorStore` class exposes the connection to the Couchbase vector store.
```python
from langchain_couchbase.vectorstores import CouchbaseVectorStore
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from datetime import timedelta
auth = PasswordAuthenticator(username, password)
options = ClusterOptions(auth)
connect_string = "couchbases://localhost"
cluster = Cluster(connect_string, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
embeddings = OpenAIEmbeddings()
vectorstore = CouchbaseVectorStore(
cluster=cluster,
bucket_name="",
scope_name="",
collection_name="",
embedding=embeddings,
index_name="vector-search-index",
)
```
https://github.com/Couchbase-Ecosystem/langchain-couchbase/tree/main/langchain_couchbase

View File

@ -1,10 +0,0 @@
from langchain_couchbase.cache import CouchbaseCache, CouchbaseSemanticCache
from langchain_couchbase.chat_message_histories import CouchbaseChatMessageHistory
from langchain_couchbase.vectorstores import CouchbaseVectorStore
__all__ = [
"CouchbaseVectorStore",
"CouchbaseCache",
"CouchbaseSemanticCache",
"CouchbaseChatMessageHistory",
]

View File

@ -1,386 +0,0 @@
"""
LangChain Couchbase Caches
Functions "_hash", "_loads_generations" and "_dumps_generations"
are duplicated in this utility from modules:
- "libs/community/langchain_community/cache.py"
"""
import hashlib
import json
import logging
from datetime import timedelta
from typing import Any, Dict, Optional, Union
from couchbase.cluster import Cluster
from langchain_core.caches import RETURN_VAL_TYPE, BaseCache
from langchain_core.embeddings import Embeddings
from langchain_core.load.dump import dumps
from langchain_core.load.load import loads
from langchain_core.outputs import Generation
from langchain_couchbase.vectorstores import CouchbaseVectorStore
logger = logging.getLogger(__file__)
def _hash(_input: str) -> str:
"""Use a deterministic hashing approach."""
return hashlib.md5(_input.encode()).hexdigest()
def _dumps_generations(generations: RETURN_VAL_TYPE) -> str:
"""
Serialization for generic RETURN_VAL_TYPE, i.e. sequence of `Generation`
Args:
generations (RETURN_VAL_TYPE): A list of language model generations.
Returns:
str: a single string representing a list of generations.
This function (+ its counterpart `_loads_generations`) rely on
the dumps/loads pair with Reviver, so are able to deal
with all subclasses of Generation.
Each item in the list can be `dumps`ed to a string,
then we make the whole list of strings into a json-dumped.
"""
return json.dumps([dumps(_item) for _item in generations])
def _loads_generations(generations_str: str) -> Union[RETURN_VAL_TYPE, None]:
"""
Deserialization of a string into a generic RETURN_VAL_TYPE
(i.e. a sequence of `Generation`).
See `_dumps_generations`, the inverse of this function.
Args:
generations_str (str): A string representing a list of generations.
Compatible with the legacy cache-blob format
Does not raise exceptions for malformed entries, just logs a warning
and returns none: the caller should be prepared for such a cache miss.
Returns:
RETURN_VAL_TYPE: A list of generations.
"""
try:
generations = [loads(_item_str) for _item_str in json.loads(generations_str)]
return generations
except (json.JSONDecodeError, TypeError):
# deferring the (soft) handling to after the legacy-format attempt
pass
try:
gen_dicts = json.loads(generations_str)
# not relying on `_load_generations_from_json` (which could disappear):
generations = [Generation(**generation_dict) for generation_dict in gen_dicts]
logger.warning(
f"Legacy 'Generation' cached blob encountered: '{generations_str}'"
)
return generations
except (json.JSONDecodeError, TypeError):
logger.warning(
f"Malformed/unparsable cached blob encountered: '{generations_str}'"
)
return None
def _validate_ttl(ttl: Optional[timedelta]) -> None:
"""Validate the time to live"""
if not isinstance(ttl, timedelta):
raise ValueError(f"ttl should be of type timedelta but was {type(ttl)}.")
if ttl <= timedelta(seconds=0):
raise ValueError(
f"ttl must be greater than 0 but was {ttl.total_seconds()} seconds."
)
class CouchbaseCache(BaseCache):
"""Couchbase LLM Cache
LLM Cache that uses Couchbase as the backend
"""
PROMPT = "prompt"
LLM = "llm"
RETURN_VAL = "return_val"
def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster"""
bucket_manager = self._cluster.buckets()
try:
bucket_manager.get_bucket(self._bucket_name)
return True
except Exception:
return False
def _check_scope_and_collection_exists(self) -> bool:
"""Check if the scope and collection exists in the linked Couchbase bucket
Raises a ValueError if either is not found"""
scope_collection_map: Dict[str, Any] = {}
# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []
# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)
# Check if the scope exists
if self._scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self._scope_name} not found in Couchbase "
f"bucket {self._bucket_name}"
)
# Check if the collection exists in the scope
if self._collection_name not in scope_collection_map[self._scope_name]:
raise ValueError(
f"Collection {self._collection_name} not found in scope "
f"{self._scope_name} in Couchbase bucket {self._bucket_name}"
)
return True
def __init__(
self,
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
ttl: Optional[timedelta] = None,
**kwargs: Dict[str, Any],
) -> None:
"""Initialize the Couchbase LLM Cache
Args:
cluster (Cluster): couchbase cluster object with active connection.
bucket_name (str): name of the bucket to store documents in.
scope_name (str): name of the scope in bucket to store documents in.
collection_name (str): name of the collection in the scope to store
documents in.
ttl (Optional[timedelta]): TTL or time for the document to live in the cache
After this time, the document will get deleted from the cache.
"""
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._ttl = None
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
# Check if the time to live is provided and valid
if ttl is not None:
_validate_ttl(ttl)
self._ttl = ttl
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up from cache based on prompt and llm_string."""
try:
doc = self._collection.get(
self._generate_key(prompt, llm_string)
).content_as[dict]
return _loads_generations(doc[self.RETURN_VAL])
except Exception:
return None
def _generate_key(self, prompt: str, llm_string: str) -> str:
"""Generate the key based on prompt and llm_string."""
return _hash(prompt + llm_string)
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
doc = {
self.PROMPT: prompt,
self.LLM: llm_string,
self.RETURN_VAL: _dumps_generations(return_val),
}
document_key = self._generate_key(prompt, llm_string)
try:
if self._ttl:
self._collection.upsert(
key=document_key,
value=doc,
expiry=self._ttl,
)
else:
self._collection.upsert(key=document_key, value=doc)
except Exception:
logger.error("Error updating cache")
def clear(self, **kwargs: Any) -> None:
"""Clear the cache.
This will delete all documents in the collection. This requires an index on the
collection.
"""
try:
query = f"DELETE FROM `{self._collection_name}`"
self._scope.query(query).execute()
except Exception:
logger.error("Error clearing cache. Please check if you have an index.")
class CouchbaseSemanticCache(BaseCache, CouchbaseVectorStore):
"""Couchbase Semantic Cache
Cache backed by a Couchbase Server with Vector Store support
"""
LLM = "llm_string"
RETURN_VAL = "return_val"
def __init__(
self,
cluster: Cluster,
embedding: Embeddings,
bucket_name: str,
scope_name: str,
collection_name: str,
index_name: str,
score_threshold: Optional[float] = None,
ttl: Optional[timedelta] = None,
) -> None:
"""Initialize the Couchbase LLM Cache
Args:
cluster (Cluster): couchbase cluster object with active connection.
embedding (Embeddings): embedding model to use.
bucket_name (str): name of the bucket to store documents in.
scope_name (str): name of the scope in bucket to store documents in.
collection_name (str): name of the collection in the scope to store
documents in.
index_name (str): name of the Search index to use.
score_threshold (float): score threshold to use for filtering results.
ttl (Optional[timedelta]): TTL or time for the document to live in the cache
After this time, the document will get deleted from the cache.
"""
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._ttl = None
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
self.score_threshold = score_threshold
if ttl is not None:
_validate_ttl(ttl)
self._ttl = ttl
# Initialize the vector store
super().__init__(
cluster=cluster,
bucket_name=bucket_name,
scope_name=scope_name,
collection_name=collection_name,
embedding=embedding,
index_name=index_name,
)
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up from cache based on the semantic similarity of the prompt"""
search_results = self.similarity_search_with_score(
prompt, k=1, search_options={f"metadata.{self.LLM}": llm_string}
)
if search_results:
selected_doc, score = search_results[0]
else:
return None
# Check if the score is above the threshold if a threshold is provided
if self.score_threshold:
if score < self.score_threshold:
return None
# Note that the llm_string might not match the vector search result.
# So if the llm_string does not match, do not return the result.
if selected_doc.metadata["llm_string"] != llm_string:
return None
return _loads_generations(selected_doc.metadata[self.RETURN_VAL])
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on the prompt and llm_string"""
try:
self.add_texts(
texts=[prompt],
metadatas=[
{
self.LLM: llm_string,
self.RETURN_VAL: _dumps_generations(return_val),
}
],
ttl=self._ttl,
)
except Exception:
logger.error("Error updating cache")
def clear(self, **kwargs: Any) -> None:
"""Clear the cache.
This will delete all documents in the collection.
This requires an index on the collection.
"""
try:
query = f"DELETE FROM `{self._collection_name}`"
self._scope.query(query).execute()
except Exception:
logger.error("Error clearing cache. Please check if you have an index.")

View File

@ -1,269 +0,0 @@
"""Couchbase Chat Message History"""
import logging
import time
import uuid
from datetime import timedelta
from typing import Any, Dict, List, Optional, Sequence
from couchbase.cluster import Cluster
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
logger = logging.getLogger(__name__)
DEFAULT_SESSION_ID_KEY = "session_id"
DEFAULT_MESSAGE_KEY = "message"
DEFAULT_TS_KEY = "ts"
DEFAULT_INDEX_NAME = "LANGCHAIN_CHAT_HISTORY"
DEFAULT_BATCH_SIZE = 100
def _validate_ttl(ttl: Optional[timedelta]) -> None:
"""Validate the time to live"""
if not isinstance(ttl, timedelta):
raise ValueError(f"ttl should be of type timedelta but was {type(ttl)}.")
if ttl <= timedelta(seconds=0):
raise ValueError(
f"ttl must be greater than 0 but was {ttl.total_seconds()} seconds."
)
class CouchbaseChatMessageHistory(BaseChatMessageHistory):
"""Couchbase Chat Message History
Chat message history that uses Couchbase as the storage
"""
def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster"""
bucket_manager = self._cluster.buckets()
try:
bucket_manager.get_bucket(self._bucket_name)
return True
except Exception:
return False
def _check_scope_and_collection_exists(self) -> bool:
"""Check if the scope and collection exists in the linked Couchbase bucket
Raises a ValueError if either is not found"""
scope_collection_map: Dict[str, Any] = {}
# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []
# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)
# Check if the scope exists
if self._scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self._scope_name} not found in Couchbase "
f"bucket {self._bucket_name}"
)
# Check if the collection exists in the scope
if self._collection_name not in scope_collection_map[self._scope_name]:
raise ValueError(
f"Collection {self._collection_name} not found in scope "
f"{self._scope_name} in Couchbase bucket "
f"{self._bucket_name}"
)
return True
def __init__(
self,
*,
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
session_id: str,
session_id_key: str = DEFAULT_SESSION_ID_KEY,
message_key: str = DEFAULT_MESSAGE_KEY,
create_index: bool = True,
ttl: Optional[timedelta] = None,
) -> None:
"""Initialize the Couchbase Chat Message History
Args:
cluster (Cluster): couchbase cluster object with active connection.
bucket_name (str): name of the bucket to store documents in.
scope_name (str): name of the scope in bucket to store documents in.
collection_name (str): name of the collection in the scope to store
documents in.
session_id (str): value for the session used to associate messages from
a single chat session. It is stored as a field in the chat message.
session_id_key (str): name of the field to use for the session id.
Set to "session_id" by default.
message_key (str): name of the field to use for the messages
Set to "message" by default.
create_index (bool): create an index if True. Set to True by default.
ttl (timedelta): time to live for the documents in the collection.
When set, the documents are automatically deleted after the ttl expires.
"""
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._ttl = None
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
self._session_id_key = session_id_key
self._message_key = message_key
self._create_index = create_index
self._session_id = session_id
self._ts_key = DEFAULT_TS_KEY
if ttl is not None:
_validate_ttl(ttl)
self._ttl = ttl
# Create an index if it does not exist if requested
if create_index:
index_fields = (
f"({self._session_id_key}, {self._ts_key}, {self._message_key})"
)
index_creation_query = (
f"CREATE INDEX {DEFAULT_INDEX_NAME} IF NOT EXISTS ON "
+ f"{self._collection_name}{index_fields} "
)
try:
self._scope.query(index_creation_query).execute()
except Exception as e:
logger.error("Error creating index: ", e)
def add_message(self, message: BaseMessage) -> None:
"""Add a message to the cache"""
# Generate a UUID for the document key
document_key = uuid.uuid4().hex
# get utc timestamp for ordering the messages
timestamp = time.time()
message_content = message_to_dict(message)
try:
if self._ttl:
self._collection.insert(
document_key,
value={
self._message_key: message_content,
self._session_id_key: self._session_id,
self._ts_key: timestamp,
},
expiry=self._ttl,
)
else:
self._collection.insert(
document_key,
value={
self._message_key: message_content,
self._session_id_key: self._session_id,
self._ts_key: timestamp,
},
)
except Exception as e:
logger.error("Error adding message: ", e)
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
"""Add messages to the cache in a batched manner"""
batch_size = DEFAULT_BATCH_SIZE
messages_to_insert = []
for message in messages:
document_key = uuid.uuid4().hex
timestamp = time.time()
message_content = message_to_dict(message)
messages_to_insert.append(
{
document_key: {
self._message_key: message_content,
self._session_id_key: self._session_id,
self._ts_key: timestamp,
},
}
)
# Add the messages to the cache in batches of batch_size
try:
for i in range(0, len(messages_to_insert), batch_size):
batch = messages_to_insert[i : i + batch_size]
# Convert list of dictionaries to a single dictionary to insert
insert_batch = {list(d.keys())[0]: list(d.values())[0] for d in batch}
if self._ttl:
self._collection.insert_multi(insert_batch, expiry=self._ttl)
else:
self._collection.insert_multi(insert_batch)
except Exception as e:
logger.error("Error adding messages: ", e)
def clear(self) -> None:
"""Clear the cache"""
# Delete all documents in the collection with the session_id
clear_query = (
f"DELETE FROM `{self._collection_name}`"
+ f"WHERE {self._session_id_key}=$session_id"
)
try:
self._scope.query(clear_query, session_id=self._session_id).execute()
except Exception as e:
logger.error("Error clearing cache: ", e)
@property
def messages(self) -> List[BaseMessage]:
"""Get all messages in the cache associated with the session_id"""
fetch_query = (
f"SELECT {self._message_key} FROM `{self._collection_name}` "
+ f"where {self._session_id_key}=$session_id"
+ f" ORDER BY {self._ts_key} ASC"
)
message_items = []
try:
result = self._scope.query(fetch_query, session_id=self._session_id)
for document in result:
message_items.append(document[f"{self._message_key}"])
except Exception as e:
logger.error("Error fetching messages: ", e)
return messages_from_dict(message_items)
@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_messages' instead."
)

View File

@ -1,739 +0,0 @@
"""Couchbase vector stores."""
from __future__ import annotations
import uuid
from typing import (
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
)
import couchbase.search as search
from couchbase.cluster import Cluster
from couchbase.exceptions import DocumentExistsException, DocumentNotFoundException
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
class CouchbaseVectorStore(VectorStore):
"""__ModuleName__ vector store integration.
Setup:
Install ``langchain-couchbase`` and head over to the Couchbase [website](https://cloud.couchbase.com) and create a new connection, with a bucket, collection, and search index.
.. code-block:: bash
pip install -U langchain-couchbase
.. code-block:: python
import getpass
COUCHBASE_CONNECTION_STRING = getpass.getpass("Enter the connection string for the Couchbase cluster: ")
DB_USERNAME = getpass.getpass("Enter the username for the Couchbase cluster: ")
DB_PASSWORD = getpass.getpass("Enter the password for the Couchbase cluster: ")
Key init args indexing params:
embedding: Embeddings
Embedding function to use.
Key init args client params:
cluster: Cluster
Couchbase cluster object with active connection.
bucket_name: str
Name of the bucket to store documents in.
scope_name: str
Name of the scope in the bucket to store documents in.
collection_name: str
Name of the collection in the scope to store documents in.
index_name: str
Name of the Search index to use.
Instantiate:
.. code-block:: python
from datetime import timedelta
from langchain_openai import OpenAIEmbeddings
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
auth = PasswordAuthenticator(DB_USERNAME, DB_PASSWORD)
options = ClusterOptions(auth)
cluster = Cluster(COUCHBASE_CONNECTION_STRING, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
BUCKET_NAME = "langchain_bucket"
SCOPE_NAME = "_default"
COLLECTION_NAME = "default"
SEARCH_INDEX_NAME = "langchain-test-index"
vector_store = CouchbaseVectorStore(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
embedding=embeddings,
index_name=SEARCH_INDEX_NAME,
)
Add Documents:
.. code-block:: python
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Delete Documents:
.. code-block:: python
vector_store.delete(ids=["3"])
# TODO: Fill out with example output.
Search:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with relevant variables and example output.
Search with filter:
.. code-block:: python
# TODO: Update filter to correct format
results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"})
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with example output.
Search with score:
.. code-block:: python
results = vector_store.similarity_search_with_score(query="qux",k=1)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with example output.
Async:
.. code-block:: python
# add documents
# await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with example output.
Use as Retriever:
.. code-block:: python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
.. code-block:: python
# TODO: Example output
""" # noqa: E501
# Default batch size
DEFAULT_BATCH_SIZE = 100
_metadata_key = "metadata"
_default_text_key = "text"
_default_embedding_key = "embedding"
def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster"""
bucket_manager = self._cluster.buckets()
try:
bucket_manager.get_bucket(self._bucket_name)
return True
except Exception:
return False
def _check_scope_and_collection_exists(self) -> bool:
"""Check if the scope and collection exists in the linked Couchbase bucket
Raises a ValueError if either is not found"""
scope_collection_map: Dict[str, Any] = {}
# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []
# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)
# Check if the scope exists
if self._scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self._scope_name} not found in Couchbase "
f"bucket {self._bucket_name}"
)
# Check if the collection exists in the scope
if self._collection_name not in scope_collection_map[self._scope_name]:
raise ValueError(
f"Collection {self._collection_name} not found in scope "
f"{self._scope_name} in Couchbase bucket {self._bucket_name}"
)
return True
def _check_index_exists(self) -> bool:
"""Check if the Search index exists in the linked Couchbase cluster
Raises a ValueError if the index does not exist"""
if self._scoped_index:
all_indexes = [
index.name for index in self._scope.search_indexes().get_all_indexes()
]
if self._index_name not in all_indexes:
raise ValueError(
f"Index {self._index_name} does not exist. "
" Please create the index before searching."
)
else:
all_indexes = [
index.name for index in self._cluster.search_indexes().get_all_indexes()
]
if self._index_name not in all_indexes:
raise ValueError(
f"Index {self._index_name} does not exist. "
" Please create the index before searching."
)
return True
def __init__(
self,
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
embedding: Embeddings,
index_name: str,
*,
text_key: Optional[str] = _default_text_key,
embedding_key: Optional[str] = _default_embedding_key,
scoped_index: bool = True,
) -> None:
"""
Initialize the Couchbase Vector Store.
Args:
cluster (Cluster): couchbase cluster object with active connection.
bucket_name (str): name of bucket to store documents in.
scope_name (str): name of scope in the bucket to store documents in.
collection_name (str): name of collection in the scope to store documents in
embedding (Embeddings): embedding function to use.
index_name (str): name of the Search index to use.
text_key (optional[str]): key in document to use as text.
Set to text by default.
embedding_key (optional[str]): key in document to use for the embeddings.
Set to embedding by default.
scoped_index (optional[bool]): specify whether the index is a scoped index.
Set to True by default.
"""
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
if not embedding:
raise ValueError("Embeddings instance must be provided.")
if not bucket_name:
raise ValueError("bucket_name must be provided.")
if not scope_name:
raise ValueError("scope_name must be provided.")
if not collection_name:
raise ValueError("collection_name must be provided.")
if not index_name:
raise ValueError("index_name must be provided.")
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._embedding_function = embedding
self._text_key = text_key
self._embedding_key = embedding_key
self._index_name = index_name
self._scoped_index = scoped_index
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
# Check if the index exists. Throws ValueError if it doesn't
try:
self._check_index_exists()
except Exception as e:
raise e
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Run texts through the embeddings and persist in vectorstore.
If the document IDs are passed, the existing documents (if any) will be
overwritten with the new ones.
Args:
texts (Iterable[str]): Iterable of strings to add to the vectorstore.
metadatas (Optional[List[Dict]]): Optional list of metadatas associated
with the texts.
ids (Optional[List[str]]): Optional list of ids associated with the texts.
IDs have to be unique strings across the collection.
If it is not specified uuids are generated and used as ids.
batch_size (Optional[int]): Optional batch size for bulk insertions.
Default is 100.
Returns:
List[str]:List of ids from adding the texts into the vectorstore.
"""
if not batch_size:
batch_size = self.DEFAULT_BATCH_SIZE
doc_ids: List[str] = []
if ids is None:
ids = [uuid.uuid4().hex for _ in texts]
if metadatas is None:
metadatas = [{} for _ in texts]
# Check if TTL is provided
ttl = kwargs.get("ttl", None)
embedded_texts = self._embedding_function.embed_documents(list(texts))
documents_to_insert = [
{
id: {
self._text_key: text,
self._embedding_key: vector,
self._metadata_key: metadata,
}
for id, text, vector, metadata in zip(
ids, texts, embedded_texts, metadatas
)
}
]
# Insert in batches
for i in range(0, len(documents_to_insert), batch_size):
batch = documents_to_insert[i : i + batch_size]
try:
# Insert with TTL if provided
if ttl:
result = self._collection.upsert_multi(batch[0], expiry=ttl)
else:
result = self._collection.upsert_multi(batch[0])
if result.all_ok:
doc_ids.extend(batch[0].keys())
except DocumentExistsException as e:
raise ValueError(f"Document already exists: {e}")
return doc_ids
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""Delete documents from the vector store by ids.
Args:
ids (List[str]): List of IDs of the documents to delete.
batch_size (Optional[int]): Optional batch size for bulk deletions.
Returns:
bool: True if all the documents were deleted successfully, False otherwise.
"""
if ids is None:
raise ValueError("No document ids provided to delete.")
batch_size = kwargs.get("batch_size", self.DEFAULT_BATCH_SIZE)
deletion_status = True
# Delete in batches
for i in range(0, len(ids), batch_size):
batch = ids[i : i + batch_size]
try:
result = self._collection.remove_multi(batch)
except DocumentNotFoundException as e:
deletion_status = False
raise ValueError(f"Document not found: {e}")
deletion_status &= result.all_ok
return deletion_status
@property
def embeddings(self) -> Embeddings:
"""Return the query embedding object."""
return self._embedding_function
def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]:
"""Helper method to format the metadata from the Couchbase Search API.
Args:
row_fields (Dict[str, Any]): The fields to format.
Returns:
Dict[str, Any]: The formatted metadata.
"""
metadata = {}
for key, value in row_fields.items():
# Couchbase Search returns the metadata key with a prefix
# `metadata.` We remove it to get the original metadata key
if key.startswith(self._metadata_key):
new_key = key.split(self._metadata_key + ".")[-1]
metadata[new_key] = value
else:
metadata[key] = value
return metadata
def similarity_search(
self,
query: str,
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Document]:
"""Return documents most similar to embedding vector with their scores.
Args:
query (str): Query to look up for similar documents
k (int): Number of Documents to return.
Defaults to 4.
search_options (Optional[Dict[str, Any]]): Optional search options that are
passed to Couchbase search.
Defaults to empty dictionary
fields (Optional[List[str]]): Optional list of fields to include in the
metadata of results. Note that these need to be stored in the index.
If nothing is specified, defaults to all the fields stored in the index.
Returns:
List of Documents most similar to the query.
"""
query_embedding = self.embeddings.embed_query(query)
docs_with_scores = self.similarity_search_with_score_by_vector(
query_embedding, k, search_options, **kwargs
)
return [doc for doc, _ in docs_with_scores]
def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to embedding vector with their scores.
Args:
embedding (List[float]): Embedding vector to look up documents similar to.
k (int): Number of Documents to return.
Defaults to 4.
search_options (Optional[Dict[str, Any]]): Optional search options that are
passed to Couchbase search.
Defaults to empty dictionary.
fields (Optional[List[str]]): Optional list of fields to include in the
metadata of results. Note that these need to be stored in the index.
If nothing is specified, defaults to all the fields stored in the index.
Returns:
List of (Document, score) that are the most similar to the query vector.
"""
fields = kwargs.get("fields", ["*"])
# Document text field needs to be returned from the search
if fields != ["*"] and self._text_key not in fields:
fields.append(self._text_key)
search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
self._embedding_key,
embedding,
k,
)
)
)
try:
if self._scoped_index:
search_iter = self._scope.search(
self._index_name,
search_req,
SearchOptions(
limit=k,
fields=fields,
raw=search_options,
),
)
else:
search_iter = self._cluster.search(
self._index_name,
search_req,
SearchOptions(limit=k, fields=fields, raw=search_options),
)
docs_with_score = []
# Parse the results
for row in search_iter.rows():
text = row.fields.pop(self._text_key, "")
id = row.id
# Format the metadata from Couchbase
metadata = self._format_metadata(row.fields)
score = row.score
doc = Document(id=id, page_content=text, metadata=metadata)
docs_with_score.append((doc, score))
except Exception as e:
raise ValueError(f"Search failed with error: {e}")
return docs_with_score
def similarity_search_with_score(
self,
query: str,
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return documents that are most similar to the query with their scores.
Args:
query (str): Query to look up for similar documents
k (int): Number of Documents to return.
Defaults to 4.
search_options (Optional[Dict[str, Any]]): Optional search options that are
passed to Couchbase search.
Defaults to empty dictionary.
fields (Optional[List[str]]): Optional list of fields to include in the
metadata of results. Note that these need to be stored in the index.
If nothing is specified, defaults to text and metadata fields.
Returns:
List of (Document, score) that are most similar to the query.
"""
query_embedding = self.embeddings.embed_query(query)
docs_with_score = self.similarity_search_with_score_by_vector(
query_embedding, k, search_options, **kwargs
)
return docs_with_score
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Document]:
"""Return documents that are most similar to the vector embedding.
Args:
embedding (List[float]): Embedding to look up documents similar to.
k (int): Number of Documents to return.
Defaults to 4.
search_options (Optional[Dict[str, Any]]): Optional search options that are
passed to Couchbase search.
Defaults to empty dictionary.
fields (Optional[List[str]]): Optional list of fields to include in the
metadata of results. Note that these need to be stored in the index.
If nothing is specified, defaults to document text and metadata fields.
Returns:
List of Documents most similar to the query.
"""
docs_with_score = self.similarity_search_with_score_by_vector(
embedding, k, search_options, **kwargs
)
return [doc for doc, _ in docs_with_score]
@classmethod
def _from_kwargs(
cls: Type[CouchbaseVectorStore],
embedding: Embeddings,
**kwargs: Any,
) -> CouchbaseVectorStore:
"""Initialize the Couchbase vector store from keyword arguments for the
vector store.
Args:
embedding: Embedding object to use to embed text.
**kwargs: Keyword arguments to initialize the vector store with.
Accepted arguments are:
- cluster
- bucket_name
- scope_name
- collection_name
- index_name
- text_key
- embedding_key
- scoped_index
"""
cluster = kwargs.get("cluster", None)
bucket_name = kwargs.get("bucket_name", None)
scope_name = kwargs.get("scope_name", None)
collection_name = kwargs.get("collection_name", None)
index_name = kwargs.get("index_name", None)
text_key = kwargs.get("text_key", cls._default_text_key)
embedding_key = kwargs.get("embedding_key", cls._default_embedding_key)
scoped_index = kwargs.get("scoped_index", True)
return cls(
embedding=embedding,
cluster=cluster,
bucket_name=bucket_name,
scope_name=scope_name,
collection_name=collection_name,
index_name=index_name,
text_key=text_key,
embedding_key=embedding_key,
scoped_index=scoped_index,
)
@classmethod
def from_texts(
cls: Type[CouchbaseVectorStore],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> CouchbaseVectorStore:
"""Construct a Couchbase vector store from a list of texts.
Example:
.. code-block:: python
from langchain_couchbase import CouchbaseVectorStore
from langchain_openai import OpenAIEmbeddings
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from datetime import timedelta
auth = PasswordAuthenticator(username, password)
options = ClusterOptions(auth)
connect_string = "couchbases://localhost"
cluster = Cluster(connect_string, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
embeddings = OpenAIEmbeddings()
texts = ["hello", "world"]
vectorstore = CouchbaseVectorStore.from_texts(
texts,
embedding=embeddings,
cluster=cluster,
bucket_name="",
scope_name="",
collection_name="",
index_name="vector-index",
)
Args:
texts (List[str]): list of texts to add to the vector store.
embedding (Embeddings): embedding function to use.
metadatas (optional[List[Dict]): list of metadatas to add to documents.
**kwargs: Keyword arguments used to initialize the vector store with and/or
passed to `add_texts` method. Check the constructor and/or `add_texts`
for the list of accepted arguments.
Returns:
A Couchbase vector store.
"""
vector_store = cls._from_kwargs(embedding, **kwargs)
batch_size = kwargs.get("batch_size", vector_store.DEFAULT_BATCH_SIZE)
ids = kwargs.get("ids", None)
vector_store.add_texts(
texts, metadatas=metadatas, ids=ids, batch_size=batch_size
)
return vector_store

File diff suppressed because it is too large Load Diff

View File

@ -1,81 +0,0 @@
[build-system]
requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "langchain-couchbase"
version = "0.2.2"
description = "An integration package connecting Couchbase and LangChain"
authors = []
readme = "README.md"
repository = "https://github.com/langchain-ai/langchain"
license = "MIT"
[tool.mypy]
disallow_untyped_defs = "True"
ignore_missing_imports = "True"
[tool.poetry.urls]
"Source Code" = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/couchbase"
"Release Notes" = "https://github.com/langchain-ai/langchain/releases?q=tag%3A%22langchain-couchbase%3D%3D0%22&expanded=true"
[tool.poetry.dependencies]
python = ">=3.9,<4.0"
langchain-core = "^0.3.15"
couchbase = "^4.3.2"
[tool.ruff.lint]
select = [ "E", "F", "I", "T201",]
[tool.coverage.run]
omit = [ "tests/*",]
[tool.pytest.ini_options]
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
markers = [ "compile: mark placeholder test used to compile integration tests without running them",]
asyncio_mode = "auto"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.codespell]
optional = true
[tool.poetry.group.test_integration]
optional = true
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
pytest-socket = "^0.7.0"
syrupy = "^4.0.2"
langchain = "^0.3.0.dev"
[tool.poetry.group.codespell.dependencies]
codespell = "^2.2.6"
[tool.poetry.group.test_integration.dependencies]
[tool.poetry.group.lint.dependencies]
ruff = "^0.5"
[tool.poetry.group.typing.dependencies]
mypy = "^1.10"
[tool.poetry.group.test.dependencies.langchain-core]
path = "../../core"
develop = true
[tool.poetry.group.dev.dependencies.langchain-core]
path = "../../core"
develop = true
[tool.poetry.group.typing.dependencies.langchain-core]
path = "../../core"
develop = true

View File

@ -1,17 +0,0 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_failure = True
print(file) # noqa: T201
traceback.print_exc()
print() # noqa: T201
sys.exit(1 if has_failure else 0)

View File

@ -1,18 +0,0 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain, langchain_experimental, or langchain_community
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_community\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

View File

@ -1,218 +0,0 @@
"""Test Couchbase Cache functionality"""
import os
from datetime import datetime, timedelta
from typing import Any
import pytest
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from langchain_core.globals import get_llm_cache, set_llm_cache
from langchain_core.outputs import Generation
from langchain_couchbase.cache import CouchbaseCache, CouchbaseSemanticCache
from tests.utils import (
FakeEmbeddings,
FakeLLM,
cache_key_hash_function,
fetch_document_expiry_time,
get_document_keys,
)
CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "")
BUCKET_NAME = os.getenv("COUCHBASE_BUCKET_NAME", "")
SCOPE_NAME = os.getenv("COUCHBASE_SCOPE_NAME", "")
CACHE_COLLECTION_NAME = os.getenv("COUCHBASE_CACHE_COLLECTION_NAME", "")
SEMANTIC_CACHE_COLLECTION_NAME = os.getenv(
"COUCHBASE_SEMANTIC_CACHE_COLLECTION_NAME", ""
)
USERNAME = os.getenv("COUCHBASE_USERNAME", "")
PASSWORD = os.getenv("COUCHBASE_PASSWORD", "")
INDEX_NAME = os.getenv("COUCHBASE_SEMANTIC_CACHE_INDEX_NAME", "")
def set_all_env_vars() -> bool:
"""Check if all environment variables are set"""
return all(
[
CONNECTION_STRING,
BUCKET_NAME,
SCOPE_NAME,
CACHE_COLLECTION_NAME,
USERNAME,
PASSWORD,
INDEX_NAME,
]
)
def get_cluster() -> Any:
"""Get a couchbase cluster object"""
auth = PasswordAuthenticator(USERNAME, PASSWORD)
options = ClusterOptions(auth)
connect_string = CONNECTION_STRING
cluster = Cluster(connect_string, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@pytest.fixture()
def cluster() -> Any:
"""Get a couchbase cluster object"""
return get_cluster()
@pytest.mark.skipif(
not set_all_env_vars(), reason="Missing Couchbase environment variables"
)
class TestCouchbaseCache:
def test_cache(self, cluster: Any) -> None:
"""Test standard LLM cache functionality"""
set_llm_cache(
CouchbaseCache(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=CACHE_COLLECTION_NAME,
)
)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
get_llm_cache().update("foo", llm_string, [Generation(text="fizz")])
cache_output = get_llm_cache().lookup("foo", llm_string)
assert cache_output == [Generation(text="fizz")]
get_llm_cache().clear()
output = get_llm_cache().lookup("bar", llm_string)
assert output != [Generation(text="fizz")]
def test_cache_with_ttl(self, cluster: Any) -> None:
"""Test standard LLM cache functionality with TTL"""
ttl = timedelta(minutes=10)
set_llm_cache(
CouchbaseCache(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=CACHE_COLLECTION_NAME,
ttl=ttl,
)
)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
get_llm_cache().update("foo", llm_string, [Generation(text="fizz")])
cache_output = get_llm_cache().lookup("foo", llm_string)
assert cache_output == [Generation(text="fizz")]
# Check the document's expiry time by fetching it from the database
document_key = cache_key_hash_function("foo" + llm_string)
document_expiry_time = fetch_document_expiry_time(
cluster, BUCKET_NAME, SCOPE_NAME, CACHE_COLLECTION_NAME, document_key
)
current_time = datetime.now()
time_to_expiry = document_expiry_time - current_time
assert time_to_expiry < ttl
def test_semantic_cache(self, cluster: Any) -> None:
"""Test semantic LLM cache functionality"""
set_llm_cache(
CouchbaseSemanticCache(
cluster=cluster,
embedding=FakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=SEMANTIC_CACHE_COLLECTION_NAME,
)
)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
get_llm_cache().update(
"foo", llm_string, [Generation(text="fizz"), Generation(text="Buzz")]
)
# foo and bar will have the same embedding produced by FakeEmbeddings
cache_output = get_llm_cache().lookup("bar", llm_string)
assert cache_output == [Generation(text="fizz"), Generation(text="Buzz")]
# clear the cache
get_llm_cache().clear()
output = get_llm_cache().lookup("bar", llm_string)
assert output != [Generation(text="fizz"), Generation(text="Buzz")]
def test_semantic_cache_with_ttl(self, cluster: Any) -> None:
"""Test semantic LLM cache functionality with TTL"""
ttl = timedelta(minutes=10)
set_llm_cache(
CouchbaseSemanticCache(
cluster=cluster,
embedding=FakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=SEMANTIC_CACHE_COLLECTION_NAME,
ttl=ttl,
)
)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
# Add a document to the cache
seed_prompt = "foo"
get_llm_cache().update(
seed_prompt, llm_string, [Generation(text="fizz"), Generation(text="Buzz")]
)
# foo and bar will have the same embedding produced by FakeEmbeddings
cache_output = get_llm_cache().lookup("bar", llm_string)
assert cache_output == [Generation(text="fizz"), Generation(text="Buzz")]
# Check the document's expiry time by fetching it from the database
fetch_document_query = (
f"SELECT meta().id, * from `{SEMANTIC_CACHE_COLLECTION_NAME}` doc "
f"WHERE doc.text = '{seed_prompt}'"
)
document_keys = get_document_keys(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
query=fetch_document_query,
)
assert len(document_keys) == 1
document_expiry_time = fetch_document_expiry_time(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=SEMANTIC_CACHE_COLLECTION_NAME,
document_key=document_keys[0],
)
current_time = datetime.now()
time_to_expiry = document_expiry_time - current_time
assert time_to_expiry < ttl

View File

@ -1,294 +0,0 @@
"""Test Couchbase Chat Message History functionality"""
import os
import time
from datetime import datetime, timedelta
from typing import Any
import pytest
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from langchain.memory import ConversationBufferMemory
from langchain_core.messages import AIMessage, HumanMessage
from langchain_couchbase.chat_message_histories import CouchbaseChatMessageHistory
from tests.utils import fetch_document_expiry_time, get_document_keys
CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "")
BUCKET_NAME = os.getenv("COUCHBASE_BUCKET_NAME", "")
SCOPE_NAME = os.getenv("COUCHBASE_SCOPE_NAME", "")
MESSAGE_HISTORY_COLLECTION_NAME = os.getenv(
"COUCHBASE_CHAT_HISTORY_COLLECTION_NAME", ""
)
USERNAME = os.getenv("COUCHBASE_USERNAME", "")
PASSWORD = os.getenv("COUCHBASE_PASSWORD", "")
SLEEP_DURATION = 0.2
def set_all_env_vars() -> bool:
"""Check if all environment variables are set"""
return all(
[
CONNECTION_STRING,
BUCKET_NAME,
SCOPE_NAME,
MESSAGE_HISTORY_COLLECTION_NAME,
USERNAME,
PASSWORD,
]
)
def get_cluster() -> Any:
"""Get a couchbase cluster object"""
auth = PasswordAuthenticator(USERNAME, PASSWORD)
options = ClusterOptions(auth)
connect_string = CONNECTION_STRING
cluster = Cluster(connect_string, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@pytest.fixture()
def cluster() -> Any:
"""Get a couchbase cluster object"""
return get_cluster()
@pytest.mark.skipif(
not set_all_env_vars(), reason="Missing Couchbase environment variables"
)
class TestCouchbaseCache:
def test_memory_with_message_store(self, cluster: Any) -> None:
"""Test chat message history with a message store"""
message_history = CouchbaseChatMessageHistory(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
session_id="test-session",
)
memory = ConversationBufferMemory(
memory_key="baz", chat_memory=message_history, return_messages=True
)
# clear the memory
memory.chat_memory.clear()
# wait for the messages to be cleared
time.sleep(SLEEP_DURATION)
assert memory.chat_memory.messages == []
# add some messages
ai_message = AIMessage(content="Hello, how are you doing ?")
user_message = HumanMessage(content="I'm good, how are you?")
memory.chat_memory.add_messages([ai_message, user_message])
# wait until the messages can be retrieved
time.sleep(SLEEP_DURATION)
# check that the messages are in the memory
messages = memory.chat_memory.messages
assert len(messages) == 2
# check that the messages are in the order of creation
assert messages == [ai_message, user_message]
# clear the memory
memory.chat_memory.clear()
time.sleep(SLEEP_DURATION)
assert memory.chat_memory.messages == []
def test_memory_with_separate_sessions(self, cluster: Any) -> None:
"""Test the chat message history with multiple sessions"""
message_history_a = CouchbaseChatMessageHistory(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
session_id="test-session-a",
)
message_history_b = CouchbaseChatMessageHistory(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
session_id="test-session-b",
)
memory_a = ConversationBufferMemory(
memory_key="a", chat_memory=message_history_a, return_messages=True
)
memory_b = ConversationBufferMemory(
memory_key="b", chat_memory=message_history_b, return_messages=True
)
# clear the memory
memory_a.chat_memory.clear()
memory_b.chat_memory.clear()
# add some messages
ai_message = AIMessage(content="Hello, how are you doing ?")
user_message = HumanMessage(content="I'm good, how are you?")
memory_a.chat_memory.add_ai_message(ai_message)
memory_b.chat_memory.add_user_message(user_message)
# wait until the messages can be retrieved
time.sleep(SLEEP_DURATION)
# check that the messages are in the memory
messages_a = memory_a.chat_memory.messages
messages_b = memory_b.chat_memory.messages
assert len(messages_a) == 1
assert len(messages_b) == 1
assert messages_a == [ai_message]
assert messages_b == [user_message]
# clear the memory
memory_a.chat_memory.clear()
time.sleep(SLEEP_DURATION)
# ensure that only the session that is cleared is empty
assert memory_a.chat_memory.messages == []
assert memory_b.chat_memory.messages == [user_message]
# clear the other session's memory
memory_b.chat_memory.clear()
time.sleep(SLEEP_DURATION)
assert memory_b.chat_memory.messages == []
def test_memory_message_with_ttl(self, cluster: Any) -> None:
"""Test chat message history with a message being saved with a TTL"""
ttl = timedelta(minutes=5)
session_id = "test-session-ttl"
message_history = CouchbaseChatMessageHistory(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
session_id=session_id,
ttl=ttl,
)
memory = ConversationBufferMemory(
memory_key="baz", chat_memory=message_history, return_messages=True
)
# clear the memory
memory.chat_memory.clear()
# wait for the messages to be cleared
time.sleep(SLEEP_DURATION)
assert memory.chat_memory.messages == []
# add some messages
ai_message = AIMessage(content="Hello, how are you doing ?")
memory.chat_memory.add_ai_message(ai_message)
# wait until the messages can be retrieved
time.sleep(SLEEP_DURATION)
# check that the messages are in the memory
messages = memory.chat_memory.messages
assert len(messages) == 1
# check that the messages are in the order of creation
assert messages == [ai_message]
# Check the document's expiry time by fetching it from the database
fetch_documents_query = (
f"SELECT meta().id, * from `{MESSAGE_HISTORY_COLLECTION_NAME}` doc"
f" WHERE doc.session_id = '{session_id}'"
)
document_keys = get_document_keys(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
query=fetch_documents_query,
)
assert len(document_keys) == 1
# Ensure that the document will expire within the TTL
document_expiry_time = fetch_document_expiry_time(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
document_key=document_keys[0],
)
current_time = datetime.now()
assert document_expiry_time - current_time < ttl
def test_memory_messages_with_ttl(self, cluster: Any) -> None:
"""Test chat message history with messages being stored with a TTL"""
ttl = timedelta(minutes=5)
session_id = "test-session-ttl"
message_history = CouchbaseChatMessageHistory(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
session_id=session_id,
ttl=ttl,
)
memory = ConversationBufferMemory(
memory_key="baz", chat_memory=message_history, return_messages=True
)
# clear the memory
memory.chat_memory.clear()
# wait for the messages to be cleared
time.sleep(SLEEP_DURATION)
assert memory.chat_memory.messages == []
# add some messages
ai_message = AIMessage(content="Hello, how are you doing ?")
user_message = HumanMessage(content="I'm good, how are you?")
memory.chat_memory.add_messages([ai_message, user_message])
# wait until the messages can be retrieved
time.sleep(SLEEP_DURATION)
# check that the messages are in the memory
messages = memory.chat_memory.messages
assert len(messages) == 2
# check that the messages are in the order of creation
assert messages == [ai_message, user_message]
# Check the documents' expiry time by fetching the documents from the database
fetch_documents_query = (
f"SELECT meta().id, * from `{MESSAGE_HISTORY_COLLECTION_NAME}` doc"
f" WHERE doc.session_id = '{session_id}'"
)
document_keys = get_document_keys(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
query=fetch_documents_query,
)
assert len(document_keys) == 2
# Ensure that each document will expire within the TTL
for document_key in document_keys:
document_expiry_time = fetch_document_expiry_time(
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=MESSAGE_HISTORY_COLLECTION_NAME,
document_key=document_key,
)
current_time = datetime.now()
assert document_expiry_time - current_time < ttl

View File

@ -1,7 +0,0 @@
import pytest
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

View File

@ -1,396 +0,0 @@
"""Test Couchbase Vector Store functionality"""
import os
import time
from typing import Any
import pytest
from langchain_core.documents import Document
from langchain_couchbase import CouchbaseVectorStore
from tests.utils import (
ConsistentFakeEmbeddings,
)
CONNECTION_STRING = os.getenv("COUCHBASE_CONNECTION_STRING", "")
BUCKET_NAME = os.getenv("COUCHBASE_BUCKET_NAME", "")
SCOPE_NAME = os.getenv("COUCHBASE_SCOPE_NAME", "")
COLLECTION_NAME = os.getenv("COUCHBASE_COLLECTION_NAME", "")
USERNAME = os.getenv("COUCHBASE_USERNAME", "")
PASSWORD = os.getenv("COUCHBASE_PASSWORD", "")
INDEX_NAME = os.getenv("COUCHBASE_INDEX_NAME", "")
SLEEP_DURATION = 1
def set_all_env_vars() -> bool:
return all(
[
CONNECTION_STRING,
BUCKET_NAME,
SCOPE_NAME,
COLLECTION_NAME,
USERNAME,
PASSWORD,
INDEX_NAME,
]
)
def get_cluster() -> Any:
"""Get a couchbase cluster object"""
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
auth = PasswordAuthenticator(USERNAME, PASSWORD)
options = ClusterOptions(auth)
connect_string = CONNECTION_STRING
cluster = Cluster(connect_string, options)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@pytest.fixture()
def cluster() -> Any:
"""Get a couchbase cluster object"""
return get_cluster()
def delete_documents(
cluster: Any, bucket_name: str, scope_name: str, collection_name: str
) -> None:
"""Delete all the documents in the collection"""
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
@pytest.mark.skipif(
not set_all_env_vars(), reason="Missing Couchbase environment variables"
)
class TestCouchbaseVectorStore:
@classmethod
def setup_method(self) -> None:
cluster = get_cluster()
# Delete all the documents in the collection
delete_documents(cluster, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)
def test_from_documents(self, cluster: Any) -> None:
"""Test end to end search using a list of documents."""
documents = [
Document(page_content="foo", metadata={"page": 1}),
Document(page_content="bar", metadata={"page": 2}),
Document(page_content="baz", metadata={"page": 3}),
]
vectorstore = CouchbaseVectorStore.from_documents(
documents,
ConsistentFakeEmbeddings(),
cluster=cluster,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
index_name=INDEX_NAME,
)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("baz", k=1)
assert output[0].page_content == "baz"
assert output[0].metadata["page"] == 3
def test_from_texts(self, cluster: Any) -> None:
"""Test end to end search using a list of texts."""
texts = [
"foo",
"bar",
"baz",
]
vectorstore = CouchbaseVectorStore.from_texts(
texts,
ConsistentFakeEmbeddings(),
cluster=cluster,
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("foo", k=1)
assert len(output) == 1
assert output[0].page_content == "foo"
def test_from_texts_with_metadatas(self, cluster: Any) -> None:
"""Test end to end search using a list of texts and metadatas."""
texts = [
"foo",
"bar",
"baz",
]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore.from_texts(
texts,
ConsistentFakeEmbeddings(),
metadatas=metadatas,
cluster=cluster,
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("baz", k=1)
assert output[0].page_content == "baz"
assert output[0].metadata["c"] == 3
def test_add_texts_with_ids_and_metadatas(self, cluster: Any) -> None:
"""Test end to end search by adding a list of texts, ids and metadatas."""
texts = [
"foo",
"bar",
"baz",
]
ids = ["a", "b", "c"]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
results = vectorstore.add_texts(
texts,
ids=ids,
metadatas=metadatas,
)
assert results == ids
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("foo", k=1)
assert output[0].id == "a"
assert output[0].page_content == "foo"
assert output[0].metadata["a"] == 1
def test_delete_texts_with_ids(self, cluster: Any) -> None:
"""Test deletion of documents by ids."""
texts = [
"foo",
"bar",
"baz",
]
ids = ["a", "b", "c"]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
results = vectorstore.add_texts(
texts,
ids=ids,
metadatas=metadatas,
)
assert results == ids
assert vectorstore.delete(ids)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("foo", k=1)
assert len(output) == 0
def test_similarity_search_with_scores(self, cluster: Any) -> None:
"""Test similarity search with scores."""
texts = ["foo", "bar", "baz"]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
vectorstore.add_texts(texts, metadatas=metadatas)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search_with_score("foo", k=2)
assert len(output) == 2
assert output[0][0].page_content == "foo"
# check if the scores are sorted
assert output[0][0].metadata["a"] == 1
assert output[0][1] > output[1][1]
def test_similarity_search_by_vector(self, cluster: Any) -> None:
"""Test similarity search by vector."""
texts = ["foo", "bar", "baz"]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
vectorstore.add_texts(texts, metadatas=metadatas)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
vector = ConsistentFakeEmbeddings().embed_query("foo")
vector_output = vectorstore.similarity_search_by_vector(vector, k=1)
assert vector_output[0].page_content == "foo"
similarity_output = vectorstore.similarity_search("foo", k=1)
assert similarity_output == vector_output
def test_output_fields(self, cluster: Any) -> None:
"""Test that output fields are set correctly."""
texts = [
"foo",
"bar",
"baz",
]
metadatas = [{"page": 1, "a": 1}, {"page": 2, "b": 2}, {"page": 3, "c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
ids = vectorstore.add_texts(texts, metadatas)
assert len(ids) == len(texts)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("foo", k=1, fields=["metadata.page"])
assert output[0].page_content == "foo"
assert output[0].metadata["page"] == 1
assert "a" not in output[0].metadata
def test_hybrid_search(self, cluster: Any) -> None:
"""Test hybrid search."""
texts = [
"foo",
"bar",
"baz",
]
metadatas = [
{"section": "index"},
{"section": "glossary"},
{"section": "appendix"},
]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
vectorstore.add_texts(texts, metadatas=metadatas)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
result, score = vectorstore.similarity_search_with_score("foo", k=1)[0]
# Wait for the documents to be indexed for hybrid search
time.sleep(SLEEP_DURATION)
hybrid_result, hybrid_score = vectorstore.similarity_search_with_score(
"foo",
k=1,
search_options={"query": {"match": "index", "field": "metadata.section"}},
)[0]
assert result == hybrid_result
assert score <= hybrid_score
def test_id_in_results(self, cluster: Any) -> None:
"""Test that the id is returned in the result documents."""
texts = [
"foo",
"bar",
"baz",
]
metadatas = [{"a": 1}, {"b": 2}, {"c": 3}]
vectorstore = CouchbaseVectorStore(
cluster=cluster,
embedding=ConsistentFakeEmbeddings(),
index_name=INDEX_NAME,
bucket_name=BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
)
ids = vectorstore.add_texts(texts, metadatas=metadatas)
assert len(ids) == len(texts)
# Wait for the documents to be indexed
time.sleep(SLEEP_DURATION)
output = vectorstore.similarity_search("foo", k=1)
assert output[0].id == ids[0]

View File

@ -1,12 +0,0 @@
from langchain_couchbase import __all__
EXPECTED_ALL = [
"CouchbaseVectorStore",
"CouchbaseCache",
"CouchbaseSemanticCache",
"CouchbaseChatMessageHistory",
]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)

View File

@ -1,136 +0,0 @@
"""Utilities for testing purposes."""
import hashlib
from datetime import datetime
from typing import Any, Dict, List, Mapping, Optional, cast
from couchbase.cluster import Cluster
from couchbase.options import GetOptions
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.embeddings import Embeddings
from langchain_core.language_models.llms import LLM
class FakeEmbeddings(Embeddings):
"""Fake embeddings functionality for testing."""
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Return simple embeddings.
Embeddings encode each text as its index."""
return [[float(1.0)] * 9 + [float(i)] for i in range(len(texts))]
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
return self.embed_documents(texts)
def embed_query(self, text: str) -> List[float]:
"""Return constant query embeddings.
Embeddings are identical to embed_documents(texts)[0].
Distance to each text will be that text's index,
as it was passed to embed_documents."""
return [float(1.0)] * 9 + [float(0.0)]
async def aembed_query(self, text: str) -> List[float]:
return self.embed_query(text)
class ConsistentFakeEmbeddings(FakeEmbeddings):
"""Fake embeddings which remember all the texts seen so far to return consistent
vectors for the same texts."""
def __init__(self, dimensionality: int = 10) -> None:
self.known_texts: List[str] = []
self.dimensionality = dimensionality
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Return consistent embeddings for each text seen so far."""
out_vectors = []
for text in texts:
if text not in self.known_texts:
self.known_texts.append(text)
vector = [float(1.0)] * (self.dimensionality - 1) + [
float(self.known_texts.index(text))
]
out_vectors.append(vector)
return out_vectors
def embed_query(self, text: str) -> List[float]:
"""Return consistent embeddings for the text, if seen before, or a constant
one if the text is unknown."""
return self.embed_documents([text])[0]
class FakeLLM(LLM):
"""Fake LLM wrapper for testing purposes."""
queries: Optional[Mapping] = None
sequential_responses: Optional[bool] = False
response_index: int = 0
def get_num_tokens(self, text: str) -> int:
"""Return number of tokens."""
return len(text.split())
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "fake"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
if self.sequential_responses:
return self._get_next_response_in_sequence
if self.queries is not None:
return self.queries[prompt]
if stop is None:
return "foo"
else:
return "bar"
@property
def _identifying_params(self) -> Dict[str, Any]:
return {}
@property
def _get_next_response_in_sequence(self) -> str:
queries = cast(Mapping, self.queries)
response = queries[list(queries.keys())[self.response_index]]
self.response_index = self.response_index + 1
return response
def cache_key_hash_function(_input: str) -> str:
"""Use a deterministic hashing approach."""
return hashlib.md5(_input.encode()).hexdigest()
def fetch_document_expiry_time(
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
document_key: str,
) -> datetime:
"""Fetch the document's expiry time from the database."""
collection = (
cluster.bucket(bucket_name).scope(scope_name).collection(collection_name)
)
result = collection.get(document_key, GetOptions(with_expiry=True))
return result.expiryTime
def get_document_keys(
cluster: Cluster, bucket_name: str, scope_name: str, query: str
) -> List[str]:
"""Get the document key from the database based on the query using meta().id."""
scope = cluster.bucket(bucket_name).scope(scope_name)
result = scope.query(query).execute()
document_keys = [row["id"] for row in result]
return document_keys