community[minor]: VectorStore Infinispan. Adding TLS and authentication (#23522)

**Description**:
this PR enable VectorStore TLS and authentication (digest, basic) with
HTTP/2 for Infinispan server.
Based on httpx.

Added docker-compose facilities for testing
Added documentation

**Dependencies:**
requires `pip install httpx[http2]` if HTTP2 is needed

**Twitter handle:**
https://twitter.com/infinispan
This commit is contained in:
Vittorio Rigamonti
2024-10-09 16:51:39 +02:00
committed by GitHub
parent ff925d2ddc
commit 7da2efd9d3
7 changed files with 353 additions and 83 deletions

View File

@@ -5,9 +5,10 @@ from __future__ import annotations
import json
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple, Type, cast
import warnings
from typing import Any, Iterable, List, Optional, Tuple, Type, Union, cast
import requests
from httpx import Response
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
@@ -49,7 +50,7 @@ class InfinispanVS(VectorStore):
embedding=RGBEmbeddings(),
output_fields: ["texture", "color"],
lambda_key: lambda text,meta: str(meta["_key"]),
lambda_content: lambda item: item["color"]})
lambda_content: lambda item: item["color"])
"""
def __init__(
@@ -58,13 +59,48 @@ class InfinispanVS(VectorStore):
ids: Optional[List[str]] = None,
**kwargs: Any,
):
"""
Parameters
----------
cache_name: str
Embeddings cache name. Default "vector"
entity_name: str
Protobuf entity name for the embeddings. Default "vector"
text_field: str
Protobuf field name for text. Default "text"
vector_field: str
Protobuf field name for vector. Default "vector"
lambda_content: lambda
Lambda returning the content part of an item. Default returns text_field
lambda_metadata: lambda
Lambda returning the metadata part of an item. Default returns items
fields excepts text_field, vector_field, _type
output_fields: List[str]
List of fields to be returned from item, if None return all fields.
Default None
kwargs: Any
Rest of arguments passed to Infinispan. See docs"""
self.ispn = Infinispan(**kwargs)
self._configuration = kwargs
self._cache_name = str(self._configuration.get("cache_name", "vector"))
self._entity_name = str(self._configuration.get("entity_name", "vector"))
self._embedding = embedding
self._textfield = self._configuration.get("textfield", "text")
self._vectorfield = self._configuration.get("vectorfield", "vector")
self._textfield = self._configuration.get("textfield", "")
if self._textfield == "":
self._textfield = self._configuration.get("text_field", "text")
else:
warnings.warn(
"`textfield` is deprecated. Please use `text_field` " "param.",
DeprecationWarning,
)
self._vectorfield = self._configuration.get("vectorfield", "")
if self._vectorfield == "":
self._vectorfield = self._configuration.get("vector_field", "vector")
else:
warnings.warn(
"`vectorfield` is deprecated. Please use `vector_field` " "param.",
DeprecationWarning,
)
self._to_content = self._configuration.get(
"lambda_content", lambda item: self._default_content(item)
)
@@ -121,7 +157,7 @@ repeated float %s = 1;
metadata_proto += "}\n"
return metadata_proto
def schema_create(self, proto: str) -> requests.Response:
def schema_create(self, proto: str) -> Response:
"""Deploy the schema for the vector db
Args:
proto(str): protobuf schema
@@ -130,14 +166,14 @@ repeated float %s = 1;
"""
return self.ispn.schema_post(self._entity_name + ".proto", proto)
def schema_delete(self) -> requests.Response:
def schema_delete(self) -> Response:
"""Delete the schema for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.schema_delete(self._entity_name + ".proto")
def cache_create(self, config: str = "") -> requests.Response:
def cache_create(self, config: str = "") -> Response:
"""Create the cache for the vector db
Args:
config(str): configuration of the cache.
@@ -172,14 +208,14 @@ repeated float %s = 1;
)
return self.ispn.cache_post(self._cache_name, config)
def cache_delete(self) -> requests.Response:
def cache_delete(self) -> Response:
"""Delete the cache for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.cache_delete(self._cache_name)
def cache_clear(self) -> requests.Response:
def cache_clear(self) -> Response:
"""Clear the cache for the vector db
Returns:
An http Response containing the result of the operation
@@ -193,14 +229,14 @@ repeated float %s = 1;
"""
return self.ispn.cache_exists(self._cache_name)
def cache_index_clear(self) -> requests.Response:
def cache_index_clear(self) -> Response:
"""Clear the index for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.index_clear(self._cache_name)
def cache_index_reindex(self) -> requests.Response:
def cache_index_reindex(self) -> Response:
"""Rebuild the for the vector db
Returns:
An http Response containing the result of the operation
@@ -325,12 +361,16 @@ repeated float %s = 1;
def configure(self, metadata: dict, dimension: int) -> None:
schema = self.schema_builder(metadata, dimension)
output = self.schema_create(schema)
assert output.ok, "Unable to create schema. Already exists? "
assert (
output.status_code == self.ispn.Codes.OK
), "Unable to create schema. Already exists? "
"Consider using clear_old=True"
assert json.loads(output.text)["error"] is None
if not self.cache_exists():
output = self.cache_create()
assert output.ok, "Unable to create cache. Already exists? "
assert (
output.status_code == self.ispn.Codes.OK
), "Unable to create cache. Already exists? "
"Consider using clear_old=True"
# Ensure index is clean
self.cache_index_clear()
@@ -350,7 +390,24 @@ repeated float %s = 1;
auto_config: Optional[bool] = True,
**kwargs: Any,
) -> InfinispanVS:
"""Return VectorStore initialized from texts and embeddings."""
"""Return VectorStore initialized from texts and embeddings.
In addition to parameters described by the super method, this
implementation provides other configuration params if different
configuration from default is needed.
Parameters
----------
ids : List[str]
Additional list of keys associated to the embedding. If not
provided UUIDs will be generated
clear_old : bool
Whether old data must be deleted. Default True
auto_config: bool
Whether to do a complete server setup (caches,
protobuf definition...). Default True
kwargs: Any
Rest of arguments passed to InfinispanVS. See docs"""
infinispanvs = cls(embedding=embedding, ids=ids, **kwargs)
if auto_config and len(metadatas or []) > 0:
if clear_old:
@@ -381,20 +438,83 @@ class Infinispan:
https://github.com/rigazilla/infinispan-vector#run-infinispan
"""
def __init__(self, **kwargs: Any):
self._configuration = kwargs
self._schema = str(self._configuration.get("schema", "http"))
self._host = str(self._configuration.get("hosts", ["127.0.0.1:11222"])[0])
self._default_node = self._schema + "://" + self._host
self._cache_url = str(self._configuration.get("cache_url", "/rest/v2/caches"))
self._schema_url = str(self._configuration.get("cache_url", "/rest/v2/schemas"))
self._use_post_for_query = str(
self._configuration.get("use_post_for_query", True)
)
def __init__(
self,
schema: str = "http",
user: str = "",
password: str = "",
hosts: List[str] = ["127.0.0.1:11222"],
cache_url: str = "/rest/v2/caches",
schema_url: str = "/rest/v2/schemas",
use_post_for_query: bool = True,
http2: bool = True,
verify: bool = True,
**kwargs: Any,
):
"""
Parameters
----------
schema: str
Schema for HTTP request: "http" or "https". Default "http"
user, password: str
User and password if auth is required. Default None
hosts: List[str]
List of server addresses. Default ["127.0.0.1:11222"]
cache_url: str
URL endpoint for cache API. Default "/rest/v2/caches"
schema_url: str
URL endpoint for schema API. Default "/rest/v2/schemas"
use_post_for_query: bool
Whether POST method should be used for query. Default True
http2: bool
Whether HTTP/2 protocol should be used. `pip install "httpx[http2]"` is
needed for HTTP/2. Default True
verify: bool
Whether TLS certificate must be verified. Default True
"""
def req_query(
self, query: str, cache_name: str, local: bool = False
) -> requests.Response:
try:
import httpx
except ImportError:
raise ImportError(
"Could not import httpx python package. "
"Please install it with `pip install httpx`"
'or `pip install "httpx[http2]"` if you need HTTP/2.'
)
self.Codes = httpx.codes
self._configuration = kwargs
self._schema = schema
self._user = user
self._password = password
self._host = hosts[0]
self._default_node = self._schema + "://" + self._host
self._cache_url = cache_url
self._schema_url = schema_url
self._use_post_for_query = use_post_for_query
self._http2 = http2
if self._user and self._password:
if self._schema == "http":
auth: Union[Tuple[str, str], httpx.DigestAuth] = httpx.DigestAuth(
username=self._user, password=self._password
)
else:
auth = (self._user, self._password)
self._h2c = httpx.Client(
http2=self._http2,
http1=not self._http2,
auth=auth,
verify=verify,
)
else:
self._h2c = httpx.Client(
http2=self._http2,
http1=not self._http2,
verify=verify,
)
def req_query(self, query: str, cache_name: str, local: bool = False) -> Response:
"""Request a query
Args:
query(str): query requested
@@ -409,7 +529,7 @@ class Infinispan:
def _query_post(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
) -> Response:
api_url = (
self._default_node
+ self._cache_url
@@ -420,9 +540,9 @@ class Infinispan:
)
data = {"query": query_str}
data_json = json.dumps(data)
response = requests.post(
response = self._h2c.post(
api_url,
data_json,
content=data_json,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
@@ -430,7 +550,7 @@ class Infinispan:
def _query_get(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
) -> Response:
api_url = (
self._default_node
+ self._cache_url
@@ -441,10 +561,10 @@ class Infinispan:
+ "&local="
+ str(local)
)
response = requests.get(api_url, timeout=REST_TIMEOUT)
response = self._h2c.get(api_url, timeout=REST_TIMEOUT)
return response
def post(self, key: str, data: str, cache_name: str) -> requests.Response:
def post(self, key: str, data: str, cache_name: str) -> Response:
"""Post an entry
Args:
key(str): key of the entry
@@ -454,15 +574,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.post(
response = self._h2c.post(
api_url,
data,
content=data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def put(self, key: str, data: str, cache_name: str) -> requests.Response:
def put(self, key: str, data: str, cache_name: str) -> Response:
"""Put an entry
Args:
key(str): key of the entry
@@ -472,15 +592,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.put(
response = self._h2c.put(
api_url,
data,
content=data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def get(self, key: str, cache_name: str) -> requests.Response:
def get(self, key: str, cache_name: str) -> Response:
"""Get an entry
Args:
key(str): key of the entry
@@ -489,12 +609,12 @@ class Infinispan:
An http Response containing the entry or errors
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.get(
response = self._h2c.get(
api_url, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT
)
return response
def schema_post(self, name: str, proto: str) -> requests.Response:
def schema_post(self, name: str, proto: str) -> Response:
"""Deploy a schema
Args:
name(str): name of the schema. Will be used as a key
@@ -503,10 +623,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.post(api_url, proto, timeout=REST_TIMEOUT)
response = self._h2c.post(api_url, content=proto, timeout=REST_TIMEOUT)
return response
def cache_post(self, name: str, config: str) -> requests.Response:
def cache_post(self, name: str, config: str) -> Response:
"""Create a cache
Args:
name(str): name of the cache.
@@ -515,15 +635,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.post(
response = self._h2c.post(
api_url,
config,
content=config,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def schema_delete(self, name: str) -> requests.Response:
def schema_delete(self, name: str) -> Response:
"""Delete a schema
Args:
name(str): name of the schema.
@@ -531,10 +651,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
response = self._h2c.delete(api_url, timeout=REST_TIMEOUT)
return response
def cache_delete(self, name: str) -> requests.Response:
def cache_delete(self, name: str) -> Response:
"""Delete a cache
Args:
name(str): name of the cache.
@@ -542,10 +662,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
response = self._h2c.delete(api_url, timeout=REST_TIMEOUT)
return response
def cache_clear(self, cache_name: str) -> requests.Response:
def cache_clear(self, cache_name: str) -> Response:
"""Clear a cache
Args:
cache_name(str): name of the cache.
@@ -555,7 +675,7 @@ class Infinispan:
api_url = (
self._default_node + self._cache_url + "/" + cache_name + "?action=clear"
)
response = requests.post(api_url, timeout=REST_TIMEOUT)
response = self._h2c.post(api_url, timeout=REST_TIMEOUT)
return response
def cache_exists(self, cache_name: str) -> bool:
@@ -570,18 +690,17 @@ class Infinispan:
)
return self.resource_exists(api_url)
@staticmethod
def resource_exists(api_url: str) -> bool:
def resource_exists(self, api_url: str) -> bool:
"""Check if a resource exists
Args:
api_url(str): url of the resource.
Returns:
true if resource exists
"""
response = requests.head(api_url, timeout=REST_TIMEOUT)
return response.ok
response = self._h2c.head(api_url, timeout=REST_TIMEOUT)
return response.status_code == self.Codes.OK
def index_clear(self, cache_name: str) -> requests.Response:
def index_clear(self, cache_name: str) -> Response:
"""Clear an index on a cache
Args:
cache_name(str): name of the cache.
@@ -595,9 +714,9 @@ class Infinispan:
+ cache_name
+ "/search/indexes?action=clear"
)
return requests.post(api_url, timeout=REST_TIMEOUT)
return self._h2c.post(api_url, timeout=REST_TIMEOUT)
def index_reindex(self, cache_name: str) -> requests.Response:
def index_reindex(self, cache_name: str) -> Response:
"""Rebuild index on a cache
Args:
cache_name(str): name of the cache.
@@ -611,4 +730,4 @@ class Infinispan:
+ cache_name
+ "/search/indexes?action=reindex"
)
return requests.post(api_url, timeout=REST_TIMEOUT)
return self._h2c.post(api_url, timeout=REST_TIMEOUT)