mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-16 06:53:16 +00:00
Langchain vectorstore integration with Kinetica (#18102)
- **Description:** New vectorstore integration with the Kinetica database - **Issue:** - **Dependencies:** the Kinetica Python API `pip install gpudb==7.2.0.1`, - **Tag maintainer:** @baskaryan, @hwchase17 - **Twitter handle:** --------- Co-authored-by: Chad Juliano <cjuliano@kinetica.com>
This commit is contained in:
919
libs/community/langchain_community/vectorstores/kinetica.py
Normal file
919
libs/community/langchain_community/vectorstores/kinetica.py
Normal file
@@ -0,0 +1,919 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import enum
|
||||
import json
|
||||
import logging
|
||||
import struct
|
||||
import uuid
|
||||
from collections import OrderedDict
|
||||
from enum import Enum
|
||||
from functools import partial
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type
|
||||
|
||||
import numpy as np
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.pydantic_v1 import BaseSettings
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
from langchain_community.vectorstores.utils import maximal_marginal_relevance
|
||||
|
||||
|
||||
class DistanceStrategy(str, enum.Enum):
|
||||
"""Enumerator of the Distance strategies."""
|
||||
|
||||
EUCLIDEAN = "l2"
|
||||
COSINE = "cosine"
|
||||
MAX_INNER_PRODUCT = "inner"
|
||||
|
||||
|
||||
def _results_to_docs(docs_and_scores: Any) -> List[Document]:
|
||||
"""Return docs from docs and scores."""
|
||||
return [doc for doc, _ in docs_and_scores]
|
||||
|
||||
|
||||
class Dimension(int, Enum):
|
||||
"""Some default dimensions for known embeddings."""
|
||||
|
||||
OPENAI = 1536
|
||||
|
||||
|
||||
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN
|
||||
|
||||
_LANGCHAIN_DEFAULT_SCHEMA_NAME = "langchain" ## Default Kinetica schema name
|
||||
_LANGCHAIN_DEFAULT_COLLECTION_NAME = (
|
||||
"langchain_kinetica_embeddings" ## Default Kinetica table name
|
||||
)
|
||||
|
||||
|
||||
class KineticaSettings(BaseSettings):
|
||||
"""`Kinetica` client configuration.
|
||||
|
||||
Attribute:
|
||||
host (str) : An URL to connect to MyScale backend.
|
||||
Defaults to 'localhost'.
|
||||
port (int) : URL port to connect with HTTP. Defaults to 8443.
|
||||
username (str) : Username to login. Defaults to None.
|
||||
password (str) : Password to login. Defaults to None.
|
||||
database (str) : Database name to find the table. Defaults to 'default'.
|
||||
table (str) : Table name to operate on.
|
||||
Defaults to 'vector_table'.
|
||||
metric (str) : Metric to compute distance,
|
||||
supported are ('angular', 'euclidean', 'manhattan', 'hamming',
|
||||
'dot'). Defaults to 'angular'.
|
||||
https://github.com/spotify/annoy/blob/main/src/annoymodule.cc#L149-L169
|
||||
|
||||
"""
|
||||
|
||||
host: str = "http://127.0.0.1"
|
||||
port: int = 9191
|
||||
|
||||
username: Optional[str] = None
|
||||
password: Optional[str] = None
|
||||
|
||||
database: str = _LANGCHAIN_DEFAULT_SCHEMA_NAME
|
||||
table: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME
|
||||
metric: str = DEFAULT_DISTANCE_STRATEGY.value
|
||||
|
||||
def __getitem__(self, item: str) -> Any:
|
||||
return getattr(self, item)
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
env_prefix = "kinetica_"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
class Kinetica(VectorStore):
|
||||
"""`Kinetica` vector store.
|
||||
|
||||
To use, you should have the ``gpudb`` python package installed.
|
||||
|
||||
Args:
|
||||
kinetica_settings: Kinetica connection settings class.
|
||||
embedding_function: Any embedding function implementing
|
||||
`langchain.embeddings.base.Embeddings` interface.
|
||||
collection_name: The name of the collection to use. (default: langchain)
|
||||
NOTE: This is not the name of the table, but the name of the collection.
|
||||
The tables will be created when initializing the store (if not exists)
|
||||
So, make sure the user has the right permissions to create tables.
|
||||
distance_strategy: The distance strategy to use. (default: COSINE)
|
||||
pre_delete_collection: If True, will delete the collection if it exists.
|
||||
(default: False). Useful for testing.
|
||||
engine_args: SQLAlchemy's create engine arguments.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_community.vectorstores import Kinetica, KineticaSettings
|
||||
from langchain_community.embeddings.openai import OpenAIEmbeddings
|
||||
|
||||
kinetica_settings = KineticaSettings(
|
||||
host="http://127.0.0.1", username="", password=""
|
||||
)
|
||||
COLLECTION_NAME = "kinetica_store"
|
||||
embeddings = OpenAIEmbeddings()
|
||||
vectorstore = Kinetica.from_documents(
|
||||
documents=docs,
|
||||
embedding=embeddings,
|
||||
collection_name=COLLECTION_NAME,
|
||||
config=kinetica_settings,
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: KineticaSettings,
|
||||
embedding_function: Embeddings,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
schema_name: str = _LANGCHAIN_DEFAULT_SCHEMA_NAME,
|
||||
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
|
||||
pre_delete_collection: bool = False,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
relevance_score_fn: Optional[Callable[[float], float]] = None,
|
||||
) -> None:
|
||||
"""Constructor for the Kinetica class
|
||||
|
||||
Args:
|
||||
config (KineticaSettings): a `KineticaSettings` instance
|
||||
embedding_function (Embeddings): embedding function to use
|
||||
collection_name (str, optional): the Kinetica table name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_COLLECTION_NAME.
|
||||
schema_name (str, optional): the Kinetica table name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_SCHEMA_NAME.
|
||||
distance_strategy (DistanceStrategy, optional): _description_.
|
||||
Defaults to DEFAULT_DISTANCE_STRATEGY.
|
||||
pre_delete_collection (bool, optional): _description_. Defaults to False.
|
||||
logger (Optional[logging.Logger], optional): _description_.
|
||||
Defaults to None.
|
||||
"""
|
||||
|
||||
self._config = config
|
||||
self.embedding_function = embedding_function
|
||||
self.collection_name = collection_name
|
||||
self.schema_name = schema_name
|
||||
self._distance_strategy = distance_strategy
|
||||
self.pre_delete_collection = pre_delete_collection
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.override_relevance_score_fn = relevance_score_fn
|
||||
self._db = self.__get_db(self._config)
|
||||
|
||||
def __post_init__(self, dimensions: int) -> None:
|
||||
"""
|
||||
Initialize the store.
|
||||
"""
|
||||
try:
|
||||
from gpudb import GPUdbTable
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import Kinetica python API. "
|
||||
"Please install it with `pip install gpudb==7.2.0.1`."
|
||||
)
|
||||
|
||||
self.dimensions = dimensions
|
||||
dimension_field = f"vector({dimensions})"
|
||||
|
||||
if self.pre_delete_collection:
|
||||
self.delete_schema()
|
||||
|
||||
self.table_name = self.collection_name
|
||||
if self.schema_name is not None and len(self.schema_name) > 0:
|
||||
self.table_name = f"{self.schema_name}.{self.collection_name}"
|
||||
|
||||
self.table_schema = [
|
||||
["text", "string"],
|
||||
["embedding", "bytes", dimension_field],
|
||||
["metadata", "string", "json"],
|
||||
["id", "string", "uuid"],
|
||||
]
|
||||
|
||||
self.create_schema()
|
||||
self.EmbeddingStore: GPUdbTable = self.create_tables_if_not_exists()
|
||||
|
||||
def __get_db(self, config: KineticaSettings) -> Any:
|
||||
try:
|
||||
from gpudb import GPUdb
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import Kinetica python API. "
|
||||
"Please install it with `pip install gpudb==7.2.0.1`."
|
||||
)
|
||||
|
||||
options = GPUdb.Options()
|
||||
options.username = config.username
|
||||
options.password = config.password
|
||||
options.skip_ssl_cert_verification = True
|
||||
return GPUdb(host=config.host, options=options)
|
||||
|
||||
@property
|
||||
def embeddings(self) -> Embeddings:
|
||||
return self.embedding_function
|
||||
|
||||
@classmethod
|
||||
def __from(
|
||||
cls,
|
||||
config: KineticaSettings,
|
||||
texts: List[str],
|
||||
embeddings: List[List[float]],
|
||||
embedding: Embeddings,
|
||||
dimensions: int,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
ids: Optional[List[str]] = None,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
|
||||
pre_delete_collection: bool = False,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
**kwargs: Any,
|
||||
) -> Kinetica:
|
||||
"""Class method to assist in constructing the `Kinetica` store instance
|
||||
using different combinations of parameters
|
||||
|
||||
Args:
|
||||
config (KineticaSettings): a `KineticaSettings` instance
|
||||
texts (List[str]): The list of texts to generate embeddings for and store
|
||||
embeddings (List[List[float]]): List of embeddings
|
||||
embedding (Embeddings): the Embedding function
|
||||
dimensions (int): The number of dimensions the embeddings have
|
||||
metadatas (Optional[List[dict]], optional): List of JSON data associated
|
||||
with each text. Defaults to None.
|
||||
ids (Optional[List[str]], optional): List of unique IDs (UUID by default)
|
||||
associated with each text. Defaults to None.
|
||||
collection_name (str, optional): Kinetica schema name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_COLLECTION_NAME.
|
||||
distance_strategy (DistanceStrategy, optional): Not used for now.
|
||||
Defaults to DEFAULT_DISTANCE_STRATEGY.
|
||||
pre_delete_collection (bool, optional): Whether to delete the Kinetica
|
||||
schema or not. Defaults to False.
|
||||
logger (Optional[logging.Logger], optional): Logger to use for logging at
|
||||
different levels. Defaults to None.
|
||||
|
||||
Returns:
|
||||
Kinetica: An instance of Kinetica class
|
||||
"""
|
||||
if ids is None:
|
||||
ids = [str(uuid.uuid1()) for _ in texts]
|
||||
|
||||
if not metadatas:
|
||||
metadatas = [{} for _ in texts]
|
||||
|
||||
store = cls(
|
||||
config=config,
|
||||
collection_name=collection_name,
|
||||
embedding_function=embedding,
|
||||
# dimensions=dimensions,
|
||||
distance_strategy=distance_strategy,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
logger=logger,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
store.__post_init__(dimensions)
|
||||
|
||||
store.add_embeddings(
|
||||
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
|
||||
)
|
||||
|
||||
return store
|
||||
|
||||
def create_tables_if_not_exists(self) -> Any:
|
||||
"""Create the table to store the texts and embeddings"""
|
||||
|
||||
try:
|
||||
from gpudb import GPUdbTable
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import Kinetica python API. "
|
||||
"Please install it with `pip install gpudb==7.2.0.1`."
|
||||
)
|
||||
return GPUdbTable(
|
||||
_type=self.table_schema,
|
||||
name=self.table_name,
|
||||
db=self._db,
|
||||
options={"is_replicated": "true"},
|
||||
)
|
||||
|
||||
def drop_tables(self) -> None:
|
||||
"""Delete the table"""
|
||||
self._db.clear_table(
|
||||
f"{self.table_name}", options={"no_error_if_not_exists": "true"}
|
||||
)
|
||||
|
||||
def create_schema(self) -> None:
|
||||
"""Create a new Kinetica schema"""
|
||||
self._db.create_schema(self.schema_name)
|
||||
|
||||
def delete_schema(self) -> None:
|
||||
"""Delete a Kinetica schema with cascade set to `true`
|
||||
This method will delete a schema with all tables in it.
|
||||
"""
|
||||
self.logger.debug("Trying to delete collection")
|
||||
self._db.drop_schema(
|
||||
self.schema_name, {"no_error_if_not_exists": "true", "cascade": "true"}
|
||||
)
|
||||
|
||||
def add_embeddings(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
embeddings: List[List[float]],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
ids: Optional[List[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Add embeddings to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts: Iterable of strings to add to the vectorstore.
|
||||
embeddings: List of list of embedding vectors.
|
||||
metadatas: List of metadatas associated with the texts.
|
||||
ids: List of ids for the text embedding pairs
|
||||
kwargs: vectorstore specific parameters
|
||||
"""
|
||||
if ids is None:
|
||||
ids = [str(uuid.uuid1()) for _ in texts]
|
||||
|
||||
if not metadatas:
|
||||
metadatas = [{} for _ in texts]
|
||||
|
||||
records = []
|
||||
for text, embedding, metadata, id in zip(texts, embeddings, metadatas, ids):
|
||||
buf = struct.pack("%sf" % self.dimensions, *embedding)
|
||||
records.append([text, buf, json.dumps(metadata), id])
|
||||
|
||||
self.EmbeddingStore.insert_records(records)
|
||||
|
||||
return ids
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
ids: Optional[List[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts: Iterable of strings to add to the vectorstore.
|
||||
metadatas: Optional list of metadatas (JSON data) associated with the texts.
|
||||
ids: List of IDs (UUID) for the texts supplied; will be generated if None
|
||||
kwargs: vectorstore specific parameters
|
||||
|
||||
Returns:
|
||||
List of ids from adding the texts into the vectorstore.
|
||||
"""
|
||||
embeddings = self.embedding_function.embed_documents(list(texts))
|
||||
self.dimensions = len(embeddings[0])
|
||||
if not hasattr(self, "EmbeddingStore"):
|
||||
self.__post_init__(self.dimensions)
|
||||
return self.add_embeddings(
|
||||
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
|
||||
)
|
||||
|
||||
def similarity_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Run similarity search with Kinetica with distance.
|
||||
|
||||
Args:
|
||||
query (str): Query text to search for.
|
||||
k (int): Number of results to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query.
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(text=query)
|
||||
return self.similarity_search_by_vector(
|
||||
embedding=embedding,
|
||||
k=k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
def similarity_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs most similar to query.
|
||||
|
||||
Args:
|
||||
query: Text to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query and score for each
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(query)
|
||||
docs = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, filter=filter
|
||||
)
|
||||
return docs
|
||||
|
||||
def similarity_search_with_score_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
resp: Dict = self.__query_collection(embedding, k, filter)
|
||||
|
||||
records: OrderedDict = resp["records"]
|
||||
results = list(zip(*list(records.values())))
|
||||
|
||||
return self._results_to_docs_and_scores(results)
|
||||
|
||||
def similarity_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to embedding vector.
|
||||
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query vector.
|
||||
"""
|
||||
docs_and_scores = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, filter=filter
|
||||
)
|
||||
return [doc for doc, _ in docs_and_scores]
|
||||
|
||||
def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]:
|
||||
"""Return docs and scores from results."""
|
||||
docs = [
|
||||
(
|
||||
Document(
|
||||
page_content=result[0],
|
||||
metadata=json.loads(result[1]),
|
||||
),
|
||||
result[2] if self.embedding_function is not None else None,
|
||||
)
|
||||
for result in results
|
||||
]
|
||||
return docs
|
||||
|
||||
def _select_relevance_score_fn(self) -> Callable[[float], float]:
|
||||
"""
|
||||
The 'correct' relevance function
|
||||
may differ depending on a few things, including:
|
||||
- the distance / similarity metric used by the VectorStore
|
||||
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
|
||||
- embedding dimensionality
|
||||
- etc.
|
||||
"""
|
||||
if self.override_relevance_score_fn is not None:
|
||||
return self.override_relevance_score_fn
|
||||
|
||||
# Default strategy is to rely on distance strategy provided
|
||||
# in vectorstore constructor
|
||||
if self._distance_strategy == DistanceStrategy.COSINE:
|
||||
return self._cosine_relevance_score_fn
|
||||
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN:
|
||||
return self._euclidean_relevance_score_fn
|
||||
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
|
||||
return self._max_inner_product_relevance_score_fn
|
||||
else:
|
||||
raise ValueError(
|
||||
"No supported normalization function"
|
||||
f" for distance_strategy of {self._distance_strategy}."
|
||||
"Consider providing relevance_score_fn to Kinetica constructor."
|
||||
)
|
||||
|
||||
@property
|
||||
def distance_strategy(self) -> str:
|
||||
if self._distance_strategy == DistanceStrategy.EUCLIDEAN:
|
||||
return "l2_distance"
|
||||
elif self._distance_strategy == DistanceStrategy.COSINE:
|
||||
return "cosine_distance"
|
||||
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
|
||||
return "dot_product"
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Got unexpected value for distance: {self._distance_strategy}. "
|
||||
f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}."
|
||||
)
|
||||
|
||||
def __query_collection(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
) -> Dict:
|
||||
"""Query the collection."""
|
||||
# if filter is not None:
|
||||
# filter_clauses = []
|
||||
# for key, value in filter.items():
|
||||
# IN = "in"
|
||||
# if isinstance(value, dict) and IN in map(str.lower, value):
|
||||
# value_case_insensitive = {
|
||||
# k.lower(): v for k, v in value.items()
|
||||
# }
|
||||
# filter_by_metadata = self.EmbeddingStore.cmetadata[
|
||||
# key
|
||||
# ].astext.in_(value_case_insensitive[IN])
|
||||
# filter_clauses.append(filter_by_metadata)
|
||||
# else:
|
||||
# filter_by_metadata = self.EmbeddingStore.cmetadata[
|
||||
# key
|
||||
# ].astext == str(value)
|
||||
# filter_clauses.append(filter_by_metadata)
|
||||
|
||||
json_filter = json.dumps(filter) if filter is not None else None
|
||||
where_clause = (
|
||||
f" where '{json_filter}' = JSON(metadata) "
|
||||
if json_filter is not None
|
||||
else ""
|
||||
)
|
||||
|
||||
embedding_str = "[" + ",".join([str(x) for x in embedding]) + "]"
|
||||
|
||||
dist_strategy = self.distance_strategy
|
||||
|
||||
query_string = f"""
|
||||
SELECT text, metadata, {dist_strategy}(embedding, '{embedding_str}')
|
||||
as distance, embedding
|
||||
FROM {self.table_name}
|
||||
{where_clause}
|
||||
ORDER BY distance asc NULLS LAST
|
||||
LIMIT {k}
|
||||
"""
|
||||
|
||||
self.logger.debug(query_string)
|
||||
resp = self._db.execute_sql_and_decode(query_string)
|
||||
self.logger.debug(resp)
|
||||
return resp
|
||||
|
||||
def max_marginal_relevance_search_with_score_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs selected using the maximal marginal relevance with score
|
||||
to embedding vector.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
|
||||
Defaults to 20.
|
||||
lambda_mult (float): Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[Tuple[Document, float]]: List of Documents selected by maximal marginal
|
||||
relevance to the query and score for each.
|
||||
"""
|
||||
resp = self.__query_collection(embedding=embedding, k=fetch_k, filter=filter)
|
||||
records: OrderedDict = resp["records"]
|
||||
results = list(zip(*list(records.values())))
|
||||
|
||||
embedding_list = [
|
||||
struct.unpack("%sf" % self.dimensions, embedding)
|
||||
for embedding in records["embedding"]
|
||||
]
|
||||
|
||||
mmr_selected = maximal_marginal_relevance(
|
||||
np.array(embedding, dtype=np.float32),
|
||||
embedding_list,
|
||||
k=k,
|
||||
lambda_mult=lambda_mult,
|
||||
)
|
||||
|
||||
candidates = self._results_to_docs_and_scores(results)
|
||||
|
||||
return [r for i, r in enumerate(candidates) if i in mmr_selected]
|
||||
|
||||
def max_marginal_relevance_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
query (str): Text to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
|
||||
Defaults to 20.
|
||||
lambda_mult (float): Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[Document]: List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(query)
|
||||
return self.max_marginal_relevance_search_by_vector(
|
||||
embedding,
|
||||
k=k,
|
||||
fetch_k=fetch_k,
|
||||
lambda_mult=lambda_mult,
|
||||
filter=filter,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def max_marginal_relevance_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs selected using the maximal marginal relevance with score.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
query (str): Text to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
|
||||
Defaults to 20.
|
||||
lambda_mult (float): Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[Tuple[Document, float]]: List of Documents selected by maximal marginal
|
||||
relevance to the query and score for each.
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(query)
|
||||
docs = self.max_marginal_relevance_search_with_score_by_vector(
|
||||
embedding=embedding,
|
||||
k=k,
|
||||
fetch_k=fetch_k,
|
||||
lambda_mult=lambda_mult,
|
||||
filter=filter,
|
||||
**kwargs,
|
||||
)
|
||||
return docs
|
||||
|
||||
def max_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance
|
||||
to embedding vector.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
embedding (str): Text to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
|
||||
Defaults to 20.
|
||||
lambda_mult (float): Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[Document]: List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector(
|
||||
embedding,
|
||||
k=k,
|
||||
fetch_k=fetch_k,
|
||||
lambda_mult=lambda_mult,
|
||||
filter=filter,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return _results_to_docs(docs_and_scores)
|
||||
|
||||
async def amax_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance."""
|
||||
|
||||
# This is a temporary workaround to make the similarity search
|
||||
# asynchronous. The proper solution is to make the similarity search
|
||||
# asynchronous in the vector store implementations.
|
||||
func = partial(
|
||||
self.max_marginal_relevance_search_by_vector,
|
||||
embedding,
|
||||
k=k,
|
||||
fetch_k=fetch_k,
|
||||
lambda_mult=lambda_mult,
|
||||
filter=filter,
|
||||
**kwargs,
|
||||
)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
@classmethod
|
||||
def from_texts(
|
||||
cls: Type[Kinetica],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
config: KineticaSettings = KineticaSettings(),
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
|
||||
ids: Optional[List[str]] = None,
|
||||
pre_delete_collection: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> Kinetica:
|
||||
"""Adds the texts passed in to the vector store and returns it
|
||||
|
||||
Args:
|
||||
cls (Type[Kinetica]): Kinetica class
|
||||
texts (List[str]): A list of texts for which the embeddings are generated
|
||||
embedding (Embeddings): List of embeddings
|
||||
metadatas (Optional[List[dict]], optional): List of dicts, JSON
|
||||
describing the texts/documents. Defaults to None.
|
||||
config (KineticaSettings): a `KineticaSettings` instance
|
||||
collection_name (str, optional): Kinetica schema name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_COLLECTION_NAME.
|
||||
distance_strategy (DistanceStrategy, optional): Distance strategy
|
||||
e.g., l2, cosine etc.. Defaults to DEFAULT_DISTANCE_STRATEGY.
|
||||
ids (Optional[List[str]], optional): A list of UUIDs for each
|
||||
text/document. Defaults to None.
|
||||
pre_delete_collection (bool, optional): Indicates whether the Kinetica
|
||||
schema is to be deleted or not. Defaults to False.
|
||||
|
||||
Returns:
|
||||
Kinetica: a `Kinetica` instance
|
||||
"""
|
||||
|
||||
if len(texts) == 0:
|
||||
raise ValueError("texts is empty")
|
||||
|
||||
try:
|
||||
first_embedding = embedding.embed_documents(texts[0:1])
|
||||
except NotImplementedError:
|
||||
first_embedding = [embedding.embed_query(texts[0])]
|
||||
|
||||
dimensions = len(first_embedding[0])
|
||||
embeddings = embedding.embed_documents(list(texts))
|
||||
|
||||
kinetica_store = cls.__from(
|
||||
texts=texts,
|
||||
embeddings=embeddings,
|
||||
embedding=embedding,
|
||||
dimensions=dimensions,
|
||||
config=config,
|
||||
metadatas=metadatas,
|
||||
ids=ids,
|
||||
collection_name=collection_name,
|
||||
distance_strategy=distance_strategy,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return kinetica_store
|
||||
|
||||
@classmethod
|
||||
def from_embeddings(
|
||||
cls: Type[Kinetica],
|
||||
text_embeddings: List[Tuple[str, List[float]]],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
config: KineticaSettings = KineticaSettings(),
|
||||
dimensions: int = Dimension.OPENAI,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
|
||||
ids: Optional[List[str]] = None,
|
||||
pre_delete_collection: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> Kinetica:
|
||||
"""Adds the embeddings passed in to the vector store and returns it
|
||||
|
||||
Args:
|
||||
cls (Type[Kinetica]): Kinetica class
|
||||
text_embeddings (List[Tuple[str, List[float]]]): A list of texts
|
||||
and the embeddings
|
||||
embedding (Embeddings): List of embeddings
|
||||
metadatas (Optional[List[dict]], optional): List of dicts, JSON describing
|
||||
the texts/documents. Defaults to None.
|
||||
config (KineticaSettings): a `KineticaSettings` instance
|
||||
dimensions (int, optional): Dimension for the vector data, if not passed a
|
||||
default will be used. Defaults to Dimension.OPENAI.
|
||||
collection_name (str, optional): Kinetica schema name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_COLLECTION_NAME.
|
||||
distance_strategy (DistanceStrategy, optional): Distance strategy
|
||||
e.g., l2, cosine etc.. Defaults to DEFAULT_DISTANCE_STRATEGY.
|
||||
ids (Optional[List[str]], optional): A list of UUIDs for each text/document.
|
||||
Defaults to None.
|
||||
pre_delete_collection (bool, optional): Indicates whether the
|
||||
Kinetica schema is to be deleted or not. Defaults to False.
|
||||
|
||||
Returns:
|
||||
Kinetica: a `Kinetica` instance
|
||||
"""
|
||||
|
||||
texts = [t[0] for t in text_embeddings]
|
||||
embeddings = [t[1] for t in text_embeddings]
|
||||
dimensions = len(embeddings[0])
|
||||
|
||||
return cls.__from(
|
||||
texts=texts,
|
||||
embeddings=embeddings,
|
||||
embedding=embedding,
|
||||
dimensions=dimensions,
|
||||
config=config,
|
||||
metadatas=metadatas,
|
||||
ids=ids,
|
||||
collection_name=collection_name,
|
||||
distance_strategy=distance_strategy,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_documents(
|
||||
cls: Type[Kinetica],
|
||||
documents: List[Document],
|
||||
embedding: Embeddings,
|
||||
config: KineticaSettings = KineticaSettings(),
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
|
||||
ids: Optional[List[str]] = None,
|
||||
pre_delete_collection: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> Kinetica:
|
||||
"""Adds the list of `Document` passed in to the vector store and returns it
|
||||
|
||||
Args:
|
||||
cls (Type[Kinetica]): Kinetica class
|
||||
texts (List[str]): A list of texts for which the embeddings are generated
|
||||
embedding (Embeddings): List of embeddings
|
||||
config (KineticaSettings): a `KineticaSettings` instance
|
||||
metadatas (Optional[List[dict]], optional): List of dicts, JSON describing
|
||||
the texts/documents. Defaults to None.
|
||||
collection_name (str, optional): Kinetica schema name.
|
||||
Defaults to _LANGCHAIN_DEFAULT_COLLECTION_NAME.
|
||||
distance_strategy (DistanceStrategy, optional): Distance strategy
|
||||
e.g., l2, cosine etc.. Defaults to DEFAULT_DISTANCE_STRATEGY.
|
||||
ids (Optional[List[str]], optional): A list of UUIDs for each text/document.
|
||||
Defaults to None.
|
||||
pre_delete_collection (bool, optional): Indicates whether the Kinetica
|
||||
schema is to be deleted or not. Defaults to False.
|
||||
|
||||
Returns:
|
||||
Kinetica: a `Kinetica` instance
|
||||
"""
|
||||
|
||||
texts = [d.page_content for d in documents]
|
||||
metadatas = [d.metadata for d in documents]
|
||||
|
||||
return cls.from_texts(
|
||||
texts=texts,
|
||||
embedding=embedding,
|
||||
metadatas=metadatas,
|
||||
config=config,
|
||||
collection_name=collection_name,
|
||||
distance_strategy=distance_strategy,
|
||||
ids=ids,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
**kwargs,
|
||||
)
|
Reference in New Issue
Block a user