mirror of
https://github.com/hwchase17/langchain.git
synced 2025-04-30 21:05:36 +00:00
```python """python scripts/update_mypy_ruff.py""" import glob import tomllib from pathlib import Path import toml import subprocess import re ROOT_DIR = Path(__file__).parents[1] def main(): for path in glob.glob(str(ROOT_DIR / "libs/**/pyproject.toml"), recursive=True): print(path) with open(path, "rb") as f: pyproject = tomllib.load(f) try: pyproject["tool"]["poetry"]["group"]["typing"]["dependencies"]["mypy"] = ( "^1.10" ) pyproject["tool"]["poetry"]["group"]["lint"]["dependencies"]["ruff"] = ( "^0.5" ) except KeyError: continue with open(path, "w") as f: toml.dump(pyproject, f) cwd = "/".join(path.split("/")[:-1]) completed = subprocess.run( "poetry lock --no-update; poetry install --with typing; poetry run mypy . --no-color", cwd=cwd, shell=True, capture_output=True, text=True, ) logs = completed.stdout.split("\n") to_ignore = {} for l in logs: if re.match("^(.*)\:(\d+)\: error:.*\[(.*)\]", l): path, line_no, error_type = re.match( "^(.*)\:(\d+)\: error:.*\[(.*)\]", l ).groups() if (path, line_no) in to_ignore: to_ignore[(path, line_no)].append(error_type) else: to_ignore[(path, line_no)] = [error_type] print(len(to_ignore)) for (error_path, line_no), error_types in to_ignore.items(): all_errors = ", ".join(error_types) full_path = f"{cwd}/{error_path}" try: with open(full_path, "r") as f: file_lines = f.readlines() except FileNotFoundError: continue file_lines[int(line_no) - 1] = ( file_lines[int(line_no) - 1][:-1] + f" # type: ignore[{all_errors}]\n" ) with open(full_path, "w") as f: f.write("".join(file_lines)) subprocess.run( "poetry run ruff format .; poetry run ruff --select I --fix .", cwd=cwd, shell=True, capture_output=True, text=True, ) if __name__ == "__main__": main() ```
75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""Math utils."""
|
|
|
|
import logging
|
|
from typing import List, Optional, Tuple, Union
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray]
|
|
|
|
|
|
def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
|
|
"""Row-wise cosine similarity between two equal-width matrices."""
|
|
if len(X) == 0 or len(Y) == 0:
|
|
return np.array([])
|
|
|
|
X = np.array(X)
|
|
Y = np.array(Y)
|
|
if X.shape[1] != Y.shape[1]:
|
|
raise ValueError(
|
|
f"Number of columns in X and Y must be the same. X has shape {X.shape} "
|
|
f"and Y has shape {Y.shape}."
|
|
)
|
|
try:
|
|
import simsimd as simd
|
|
|
|
X = np.array(X, dtype=np.float32)
|
|
Y = np.array(Y, dtype=np.float32)
|
|
Z = 1 - np.array(simd.cdist(X, Y, metric="cosine"))
|
|
return Z
|
|
except ImportError:
|
|
logger.debug(
|
|
"Unable to import simsimd, defaulting to NumPy implementation. If you want "
|
|
"to use simsimd please install with `pip install simsimd`."
|
|
)
|
|
X_norm = np.linalg.norm(X, axis=1)
|
|
Y_norm = np.linalg.norm(Y, axis=1)
|
|
# Ignore divide by zero errors run time warnings as those are handled below.
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm)
|
|
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
|
|
return similarity
|
|
|
|
|
|
def cosine_similarity_top_k(
|
|
X: Matrix,
|
|
Y: Matrix,
|
|
top_k: Optional[int] = 5,
|
|
score_threshold: Optional[float] = None,
|
|
) -> Tuple[List[Tuple[int, int]], List[float]]:
|
|
"""Row-wise cosine similarity with optional top-k and score threshold filtering.
|
|
|
|
Args:
|
|
X: Matrix.
|
|
Y: Matrix, same width as X.
|
|
top_k: Max number of results to return.
|
|
score_threshold: Minimum cosine similarity of results.
|
|
|
|
Returns:
|
|
Tuple of two lists. First contains two-tuples of indices (X_idx, Y_idx),
|
|
second contains corresponding cosine similarities.
|
|
"""
|
|
if len(X) == 0 or len(Y) == 0:
|
|
return [], []
|
|
score_array = cosine_similarity(X, Y)
|
|
score_threshold = score_threshold or -1.0
|
|
score_array[score_array < score_threshold] = 0
|
|
top_k = min(top_k or len(score_array), np.count_nonzero(score_array))
|
|
top_k_idxs = np.argpartition(score_array, -top_k, axis=None)[-top_k:]
|
|
top_k_idxs = top_k_idxs[np.argsort(score_array.ravel()[top_k_idxs])][::-1]
|
|
ret_idxs = np.unravel_index(top_k_idxs, score_array.shape)
|
|
scores = score_array.ravel()[top_k_idxs].tolist()
|
|
return list(zip(*ret_idxs)), scores # type: ignore
|