langchain/libs/community/langchain_community/retrievers/chatgpt_plugin_retriever.py
Erick Friis c2a3021bb0
multiple: pydantic 2 compatibility, v0.3 (#26443)
Signed-off-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Dan O'Donovan <dan.odonovan@gmail.com>
Co-authored-by: Tom Daniel Grande <tomdgrande@gmail.com>
Co-authored-by: Grande <Tom.Daniel.Grande@statsbygg.no>
Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: ccurme <chester.curme@gmail.com>
Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>
Co-authored-by: Tomaz Bratanic <bratanic.tomaz@gmail.com>
Co-authored-by: ZhangShenao <15201440436@163.com>
Co-authored-by: Friso H. Kingma <fhkingma@gmail.com>
Co-authored-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Nuno Campos <nuno@langchain.dev>
Co-authored-by: Morgante Pell <morgantep@google.com>
2024-09-13 14:38:45 -07:00

90 lines
2.9 KiB
Python

from __future__ import annotations
from typing import List, Optional
import aiohttp
import requests
from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from pydantic import ConfigDict
class ChatGPTPluginRetriever(BaseRetriever):
"""`ChatGPT plugin` retriever."""
url: str
"""URL of the ChatGPT plugin."""
bearer_token: str
"""Bearer token for the ChatGPT plugin."""
top_k: int = 3
"""Number of documents to return."""
filter: Optional[dict] = None
"""Filter to apply to the results."""
aiosession: Optional[aiohttp.ClientSession] = None
"""Aiohttp session to use for requests."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
)
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
url, json, headers = self._create_request(query)
response = requests.post(url, json=json, headers=headers)
results = response.json()["results"][0]["results"]
docs = []
for d in results:
content = d.pop("text")
metadata = d.pop("metadata", d)
if metadata.get("source_id"):
metadata["source"] = metadata.pop("source_id")
docs.append(Document(page_content=content, metadata=metadata))
return docs
async def _aget_relevant_documents(
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> List[Document]:
url, json, headers = self._create_request(query)
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json) as response:
res = await response.json()
else:
async with self.aiosession.post(
url, headers=headers, json=json
) as response:
res = await response.json()
results = res["results"][0]["results"]
docs = []
for d in results:
content = d.pop("text")
metadata = d.pop("metadata", d)
if metadata.get("source_id"):
metadata["source"] = metadata.pop("source_id")
docs.append(Document(page_content=content, metadata=metadata))
return docs
def _create_request(self, query: str) -> tuple[str, dict, dict]:
url = f"{self.url}/query"
json = {
"queries": [
{
"query": query,
"filter": self.filter,
"top_k": self.top_k,
}
]
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.bearer_token}",
}
return url, json, headers