mirror of
https://github.com/hwchase17/langchain.git
synced 2025-05-02 21:58:46 +00:00
```python """python scripts/update_mypy_ruff.py""" import glob import tomllib from pathlib import Path import toml import subprocess import re ROOT_DIR = Path(__file__).parents[1] def main(): for path in glob.glob(str(ROOT_DIR / "libs/**/pyproject.toml"), recursive=True): print(path) with open(path, "rb") as f: pyproject = tomllib.load(f) try: pyproject["tool"]["poetry"]["group"]["typing"]["dependencies"]["mypy"] = ( "^1.10" ) pyproject["tool"]["poetry"]["group"]["lint"]["dependencies"]["ruff"] = ( "^0.5" ) except KeyError: continue with open(path, "w") as f: toml.dump(pyproject, f) cwd = "/".join(path.split("/")[:-1]) completed = subprocess.run( "poetry lock --no-update; poetry install --with typing; poetry run mypy . --no-color", cwd=cwd, shell=True, capture_output=True, text=True, ) logs = completed.stdout.split("\n") to_ignore = {} for l in logs: if re.match("^(.*)\:(\d+)\: error:.*\[(.*)\]", l): path, line_no, error_type = re.match( "^(.*)\:(\d+)\: error:.*\[(.*)\]", l ).groups() if (path, line_no) in to_ignore: to_ignore[(path, line_no)].append(error_type) else: to_ignore[(path, line_no)] = [error_type] print(len(to_ignore)) for (error_path, line_no), error_types in to_ignore.items(): all_errors = ", ".join(error_types) full_path = f"{cwd}/{error_path}" try: with open(full_path, "r") as f: file_lines = f.readlines() except FileNotFoundError: continue file_lines[int(line_no) - 1] = ( file_lines[int(line_no) - 1][:-1] + f" # type: ignore[{all_errors}]\n" ) with open(full_path, "w") as f: f.write("".join(file_lines)) subprocess.run( "poetry run ruff format .; poetry run ruff --select I --fix .", cwd=cwd, shell=True, capture_output=True, text=True, ) if __name__ == "__main__": main() ```
179 lines
5.9 KiB
Python
179 lines
5.9 KiB
Python
"""Test Cassandra caches. Requires a running vector-capable Cassandra cluster."""
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from typing import Any, Iterator, Tuple
|
|
|
|
import pytest
|
|
from langchain.globals import get_llm_cache, set_llm_cache
|
|
from langchain_core.outputs import Generation, LLMResult
|
|
|
|
from langchain_community.cache import CassandraCache, CassandraSemanticCache
|
|
from langchain_community.utilities.cassandra import SetupMode
|
|
from tests.integration_tests.cache.fake_embeddings import FakeEmbeddings
|
|
from tests.unit_tests.llms.fake_llm import FakeLLM
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cassandra_connection() -> Iterator[Tuple[Any, str]]:
|
|
from cassandra.cluster import Cluster
|
|
|
|
keyspace = "langchain_cache_test_keyspace"
|
|
# get db connection
|
|
if "CASSANDRA_CONTACT_POINTS" in os.environ:
|
|
contact_points = os.environ["CASSANDRA_CONTACT_POINTS"].split(",")
|
|
cluster = Cluster(contact_points)
|
|
else:
|
|
cluster = Cluster()
|
|
#
|
|
session = cluster.connect()
|
|
# ensure keyspace exists
|
|
session.execute(
|
|
(
|
|
f"CREATE KEYSPACE IF NOT EXISTS {keyspace} "
|
|
f"WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}"
|
|
)
|
|
)
|
|
|
|
yield (session, keyspace)
|
|
|
|
|
|
def test_cassandra_cache(cassandra_connection: Tuple[Any, str]) -> None:
|
|
session, keyspace = cassandra_connection
|
|
cache = CassandraCache(session=session, keyspace=keyspace)
|
|
set_llm_cache(cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
get_llm_cache().update("foo", llm_string, [Generation(text="fizz")])
|
|
output = llm.generate(["foo"])
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
assert output == expected_output
|
|
cache.clear()
|
|
|
|
|
|
async def test_cassandra_cache_async(cassandra_connection: Tuple[Any, str]) -> None:
|
|
session, keyspace = cassandra_connection
|
|
cache = CassandraCache(
|
|
session=session, keyspace=keyspace, setup_mode=SetupMode.ASYNC
|
|
)
|
|
set_llm_cache(cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
await get_llm_cache().aupdate("foo", llm_string, [Generation(text="fizz")])
|
|
output = await llm.agenerate(["foo"])
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
assert output == expected_output
|
|
await cache.aclear()
|
|
|
|
|
|
def test_cassandra_cache_ttl(cassandra_connection: Tuple[Any, str]) -> None:
|
|
session, keyspace = cassandra_connection
|
|
cache = CassandraCache(session=session, keyspace=keyspace, ttl_seconds=2)
|
|
set_llm_cache(cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
get_llm_cache().update("foo", llm_string, [Generation(text="fizz")])
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
output = llm.generate(["foo"])
|
|
assert output == expected_output
|
|
time.sleep(2.5)
|
|
# entry has expired away.
|
|
output = llm.generate(["foo"])
|
|
assert output != expected_output
|
|
cache.clear()
|
|
|
|
|
|
async def test_cassandra_cache_ttl_async(cassandra_connection: Tuple[Any, str]) -> None:
|
|
session, keyspace = cassandra_connection
|
|
cache = CassandraCache(
|
|
session=session, keyspace=keyspace, ttl_seconds=2, setup_mode=SetupMode.ASYNC
|
|
)
|
|
set_llm_cache(cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
await get_llm_cache().aupdate("foo", llm_string, [Generation(text="fizz")])
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
output = await llm.agenerate(["foo"])
|
|
assert output == expected_output
|
|
await asyncio.sleep(2.5)
|
|
# entry has expired away.
|
|
output = await llm.agenerate(["foo"])
|
|
assert output != expected_output
|
|
await cache.aclear()
|
|
|
|
|
|
def test_cassandra_semantic_cache(cassandra_connection: Tuple[Any, str]) -> None:
|
|
session, keyspace = cassandra_connection
|
|
sem_cache = CassandraSemanticCache(
|
|
session=session,
|
|
keyspace=keyspace,
|
|
embedding=FakeEmbeddings(),
|
|
)
|
|
set_llm_cache(sem_cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
get_llm_cache().update("foo", llm_string, [Generation(text="fizz")])
|
|
output = llm.generate(["bar"]) # same embedding as 'foo'
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
assert output == expected_output
|
|
# clear the cache
|
|
sem_cache.clear()
|
|
output = llm.generate(["bar"]) # 'fizz' is erased away now
|
|
assert output != expected_output
|
|
sem_cache.clear()
|
|
|
|
|
|
async def test_cassandra_semantic_cache_async(
|
|
cassandra_connection: Tuple[Any, str],
|
|
) -> None:
|
|
session, keyspace = cassandra_connection
|
|
sem_cache = CassandraSemanticCache(
|
|
session=session,
|
|
keyspace=keyspace,
|
|
embedding=FakeEmbeddings(),
|
|
setup_mode=SetupMode.ASYNC,
|
|
)
|
|
set_llm_cache(sem_cache)
|
|
llm = FakeLLM()
|
|
params = llm.dict()
|
|
params["stop"] = None
|
|
llm_string = str(sorted([(k, v) for k, v in params.items()]))
|
|
await get_llm_cache().aupdate("foo", llm_string, [Generation(text="fizz")])
|
|
output = await llm.agenerate(["bar"]) # same embedding as 'foo'
|
|
expected_output = LLMResult(
|
|
generations=[[Generation(text="fizz")]],
|
|
llm_output={},
|
|
)
|
|
assert output == expected_output
|
|
# clear the cache
|
|
await sem_cache.aclear()
|
|
output = await llm.agenerate(["bar"]) # 'fizz' is erased away now
|
|
assert output != expected_output
|
|
await sem_cache.aclear()
|