diff --git a/docs/docs/integrations/document_loaders/kinetica.ipynb b/docs/docs/integrations/document_loaders/kinetica.ipynb new file mode 100644 index 00000000000..0176557308f --- /dev/null +++ b/docs/docs/integrations/document_loaders/kinetica.ipynb @@ -0,0 +1,125 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Kinetica\n", + "\n", + "This notebooks goes over how to load documents from Kinetica" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install gpudb==7.2.0.1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.document_loaders.kinetica_loader import KineticaLoader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## Loading Environment Variables\n", + "import os\n", + "\n", + "from dotenv import load_dotenv\n", + "from langchain_community.vectorstores import (\n", + " KineticaSettings,\n", + ")\n", + "\n", + "load_dotenv()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Kinetica needs the connection to the database.\n", + "# This is how to set it up.\n", + "HOST = os.getenv(\"KINETICA_HOST\", \"http://127.0.0.1:9191\")\n", + "USERNAME = os.getenv(\"KINETICA_USERNAME\", \"\")\n", + "PASSWORD = os.getenv(\"KINETICA_PASSWORD\", \"\")\n", + "\n", + "\n", + "def create_config() -> KineticaSettings:\n", + " return KineticaSettings(host=HOST, username=USERNAME, password=PASSWORD)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.document_loaders.kinetica_loader import KineticaLoader\n", + "\n", + "# The following `QUERY` is an example which will not run; this\n", + "# needs to be substituted with a valid `QUERY` that will return\n", + "# data and the `SCHEMA.TABLE` combination must exist in Kinetica.\n", + "\n", + "QUERY = \"select text, survey_id from SCHEMA.TABLE limit 10\"\n", + "kinetica_loader = KineticaLoader(\n", + " QUERY,\n", + " HOST,\n", + " USERNAME,\n", + " PASSWORD,\n", + ")\n", + "kinetica_documents = kinetica_loader.load()\n", + "print(kinetica_documents)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.document_loaders.kinetica_loader import KineticaLoader\n", + "\n", + "# The following `QUERY` is an example which will not run; this\n", + "# needs to be substituted with a valid `QUERY` that will return\n", + "# data and the `SCHEMA.TABLE` combination must exist in Kinetica.\n", + "\n", + "QUERY = \"select text, survey_id as source from SCHEMA.TABLE limit 10\"\n", + "snowflake_loader = KineticaLoader(\n", + " query=QUERY,\n", + " host=HOST,\n", + " username=USERNAME,\n", + " password=PASSWORD,\n", + " metadata_columns=[\"source\"],\n", + ")\n", + "kinetica_documents = snowflake_loader.load()\n", + "print(kinetica_documents)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/docs/integrations/providers/kinetica.mdx b/docs/docs/integrations/providers/kinetica.mdx index 317bdbf4543..29d039dfe74 100644 --- a/docs/docs/integrations/providers/kinetica.mdx +++ b/docs/docs/integrations/providers/kinetica.mdx @@ -26,3 +26,19 @@ See [Kinetica Vectorsore API](/docs/integrations/vectorstores/kinetica) for usag from langchain_community.vectorstores import Kinetica ``` +## Document Loader + +The Kinetica Document loader can be used to load LangChain Documents from the +Kinetica database. + +See [Kinetica Document Loader](/docs/integrations/document_loaders/kinetica) for usage + +```python +from langchain_community.document_loaders.kinetica_loader import KineticaLoader +``` + +## Retriever + +The Kinetica Retriever can return documents given an unstructured query. + +See [Kinetica VectorStore based Retriever](/docs/integrations/retrievers/kinetica) for usage diff --git a/docs/docs/integrations/retrievers/kinetica.ipynb b/docs/docs/integrations/retrievers/kinetica.ipynb new file mode 100644 index 00000000000..63f2fd16d5a --- /dev/null +++ b/docs/docs/integrations/retrievers/kinetica.ipynb @@ -0,0 +1,171 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Kinetica Vectorstore based Retriever\n", + "\n", + ">[Kinetica](https://www.kinetica.com/) is a database with integrated support for vector similarity search\n", + "\n", + "It supports:\n", + "- exact and approximate nearest neighbor search\n", + "- L2 distance, inner product, and cosine distance\n", + "\n", + "This notebook shows how to use a retriever based on Kinetica vector store (`Kinetica`)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Please ensure that this connector is installed in your working environment.\n", + "%pip install gpudb==7.2.0.1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We want to use `OpenAIEmbeddings` so we have to get the OpenAI API Key." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import getpass\n", + "import os\n", + "\n", + "os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## Loading Environment Variables\n", + "from dotenv import load_dotenv\n", + "\n", + "load_dotenv()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.docstore.document import Document\n", + "from langchain.text_splitter import CharacterTextSplitter\n", + "from langchain_community.document_loaders import TextLoader\n", + "from langchain_community.vectorstores import (\n", + " Kinetica,\n", + " KineticaSettings,\n", + ")\n", + "from langchain_openai import OpenAIEmbeddings" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Kinetica needs the connection to the database.\n", + "# This is how to set it up.\n", + "HOST = os.getenv(\"KINETICA_HOST\", \"http://127.0.0.1:9191\")\n", + "USERNAME = os.getenv(\"KINETICA_USERNAME\", \"\")\n", + "PASSWORD = os.getenv(\"KINETICA_PASSWORD\", \"\")\n", + "OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\", \"\")\n", + "\n", + "\n", + "def create_config() -> KineticaSettings:\n", + " return KineticaSettings(host=HOST, username=USERNAME, password=PASSWORD)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Retriever from vector store" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "loader = TextLoader(\"../../modules/state_of_the_union.txt\")\n", + "documents = loader.load()\n", + "text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n", + "docs = text_splitter.split_documents(documents)\n", + "\n", + "embeddings = OpenAIEmbeddings()\n", + "\n", + "# The Kinetica Module will try to create a table with the name of the collection.\n", + "# So, make sure that the collection name is unique and the user has the permission to create a table.\n", + "\n", + "COLLECTION_NAME = \"state_of_the_union_test\"\n", + "connection = create_config()\n", + "\n", + "db = Kinetica.from_documents(\n", + " embedding=embeddings,\n", + " documents=docs,\n", + " collection_name=COLLECTION_NAME,\n", + " config=connection,\n", + ")\n", + "\n", + "# create retriever from the vector store\n", + "retriever = db.as_retriever(search_kwargs={\"k\": 2})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Search with retriever" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = retriever.get_relevant_documents(\n", + " \"What did the president say about Ketanji Brown Jackson\"\n", + ")\n", + "print(docs[0].page_content)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/libs/community/langchain_community/document_loaders/__init__.py b/libs/community/langchain_community/document_loaders/__init__.py index fe52b4ff3bf..0f0d511000f 100644 --- a/libs/community/langchain_community/document_loaders/__init__.py +++ b/libs/community/langchain_community/document_loaders/__init__.py @@ -781,6 +781,7 @@ _module_lookup = { "IuguLoader": "langchain_community.document_loaders.iugu", "JSONLoader": "langchain_community.document_loaders.json_loader", "JoplinLoader": "langchain_community.document_loaders.joplin", + "KineticaLoader": "langchain_community.document_loaders.kinetica_loader", "LakeFSLoader": "langchain_community.document_loaders.lakefs", "LarkSuiteDocLoader": "langchain_community.document_loaders.larksuite", "LLMSherpaFileLoader": "langchain_community.document_loaders.llmsherpa", diff --git a/libs/community/langchain_community/document_loaders/kinetica_loader.py b/libs/community/langchain_community/document_loaders/kinetica_loader.py new file mode 100644 index 00000000000..d5cb1296e08 --- /dev/null +++ b/libs/community/langchain_community/document_loaders/kinetica_loader.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from typing import Any, Dict, Iterator, List, Optional, Tuple + +from langchain_core.documents import Document + +from langchain_community.document_loaders.base import BaseLoader + + +class KineticaLoader(BaseLoader): + """Load from `Kinetica` API. + + Each document represents one row of the result. The `page_content_columns` + are written into the `page_content` of the document. The `metadata_columns` + are written into the `metadata` of the document. By default, all columns + are written into the `page_content` and none into the `metadata`. + + """ + + def __init__( + self, + query: str, + host: str, + username: str, + password: str, + parameters: Optional[Dict[str, Any]] = None, + page_content_columns: Optional[List[str]] = None, + metadata_columns: Optional[List[str]] = None, + ): + """Initialize Kinetica document loader. + + Args: + query: The query to run in Kinetica. + parameters: Optional. Parameters to pass to the query. + page_content_columns: Optional. Columns written to Document `page_content`. + metadata_columns: Optional. Columns written to Document `metadata`. + """ + self.query = query + self.host = host + self.username = username + self.password = password + self.parameters = parameters + self.page_content_columns = page_content_columns + self.metadata_columns = metadata_columns if metadata_columns is not None else [] + + def _execute_query(self) -> List[Dict[str, Any]]: + try: + from gpudb import GPUdb, GPUdbSqlIterator + except ImportError: + raise ImportError( + "Could not import Kinetica python API. " + "Please install it with `pip install gpudb==7.2.0.1`." + ) + + try: + options = GPUdb.Options() + options.username = self.username + options.password = self.password + + conn = GPUdb(host=self.host, options=options) + + with GPUdbSqlIterator(conn, self.query) as records: + column_names = records.type_map.keys() + query_result = [dict(zip(column_names, record)) for record in records] + + except Exception as e: + print(f"An error occurred: {e}") # noqa: T201 + query_result = [] + + return query_result + + def _get_columns( + self, query_result: List[Dict[str, Any]] + ) -> Tuple[List[str], List[str]]: + page_content_columns = ( + self.page_content_columns if self.page_content_columns else [] + ) + metadata_columns = self.metadata_columns if self.metadata_columns else [] + if page_content_columns is None and query_result: + page_content_columns = list(query_result[0].keys()) + if metadata_columns is None: + metadata_columns = [] + return page_content_columns or [], metadata_columns + + def lazy_load(self) -> Iterator[Document]: + query_result = self._execute_query() + if isinstance(query_result, Exception): + print(f"An error occurred during the query: {query_result}") # noqa: T201 + return [] + page_content_columns, metadata_columns = self._get_columns(query_result) + if "*" in page_content_columns: + page_content_columns = list(query_result[0].keys()) + for row in query_result: + page_content = "\n".join( + f"{k}: {v}" for k, v in row.items() if k in page_content_columns + ) + metadata = {k: v for k, v in row.items() if k in metadata_columns} + doc = Document(page_content=page_content, metadata=metadata) + yield doc + + def load(self) -> List[Document]: + """Load data into document objects.""" + return list(self.lazy_load()) diff --git a/libs/community/tests/unit_tests/document_loaders/test_imports.py b/libs/community/tests/unit_tests/document_loaders/test_imports.py index 28c68459e76..dc28a03e686 100644 --- a/libs/community/tests/unit_tests/document_loaders/test_imports.py +++ b/libs/community/tests/unit_tests/document_loaders/test_imports.py @@ -89,6 +89,7 @@ EXPECTED_ALL = [ "IuguLoader", "JSONLoader", "JoplinLoader", + "KineticaLoader", "LLMSherpaFileLoader", "LarkSuiteDocLoader", "LakeFSLoader",