mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-08 06:23:20 +00:00
community[minor]: add JsonRequestsWrapper tool (#15374)
**Description:** This new feature enhances the flexibility of pipeline integration, particularly when working with RESTful APIs. ``JsonRequestsWrapper`` allows for the decoding of JSON output, instead of the only option for text output. --------- Co-authored-by: Zhichao HAN <hanzhichao2000@hotmail.com>
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
"""Lightweight wrapper around requests library, with async support."""
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
from typing import Any, AsyncGenerator, Dict, Literal, Optional, Union
|
||||
|
||||
import aiohttp
|
||||
import requests
|
||||
from langchain_core.pydantic_v1 import BaseModel, Extra
|
||||
from requests import Response
|
||||
|
||||
|
||||
class Requests(BaseModel):
|
||||
@@ -108,15 +109,13 @@ class Requests(BaseModel):
|
||||
yield response
|
||||
|
||||
|
||||
class TextRequestsWrapper(BaseModel):
|
||||
"""Lightweight wrapper around requests library.
|
||||
|
||||
The main purpose of this wrapper is to always return a text output.
|
||||
"""
|
||||
class GenericRequestsWrapper(BaseModel):
|
||||
"""Lightweight wrapper around requests library."""
|
||||
|
||||
headers: Optional[Dict[str, str]] = None
|
||||
aiosession: Optional[aiohttp.ClientSession] = None
|
||||
auth: Optional[Any] = None
|
||||
response_content_type: Literal["text", "json"] = "text"
|
||||
|
||||
class Config:
|
||||
"""Configuration for this pydantic object."""
|
||||
@@ -130,50 +129,96 @@ class TextRequestsWrapper(BaseModel):
|
||||
headers=self.headers, aiosession=self.aiosession, auth=self.auth
|
||||
)
|
||||
|
||||
def get(self, url: str, **kwargs: Any) -> str:
|
||||
def _get_resp_content(self, response: Response) -> Union[str, Dict[str, Any]]:
|
||||
if self.response_content_type == "text":
|
||||
return response.text
|
||||
elif self.response_content_type == "json":
|
||||
return response.json()
|
||||
else:
|
||||
raise ValueError(f"Invalid return type: {self.response_content_type}")
|
||||
|
||||
def _aget_resp_content(
|
||||
self, response: aiohttp.ClientResponse
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
if self.response_content_type == "text":
|
||||
return response.text()
|
||||
elif self.response_content_type == "json":
|
||||
return response.json()
|
||||
else:
|
||||
raise ValueError(f"Invalid return type: {self.response_content_type}")
|
||||
|
||||
def get(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
|
||||
"""GET the URL and return the text."""
|
||||
return self.requests.get(url, **kwargs).text
|
||||
return self._get_resp_content(self.requests.get(url, **kwargs))
|
||||
|
||||
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
def post(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""POST to the URL and return the text."""
|
||||
return self.requests.post(url, data, **kwargs).text
|
||||
return self._get_resp_content(self.requests.post(url, data, **kwargs))
|
||||
|
||||
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
def patch(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""PATCH the URL and return the text."""
|
||||
return self.requests.patch(url, data, **kwargs).text
|
||||
return self._get_resp_content(self.requests.patch(url, data, **kwargs))
|
||||
|
||||
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
def put(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""PUT the URL and return the text."""
|
||||
return self.requests.put(url, data, **kwargs).text
|
||||
return self._get_resp_content(self.requests.put(url, data, **kwargs))
|
||||
|
||||
def delete(self, url: str, **kwargs: Any) -> str:
|
||||
def delete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
|
||||
"""DELETE the URL and return the text."""
|
||||
return self.requests.delete(url, **kwargs).text
|
||||
return self._get_resp_content(self.requests.delete(url, **kwargs))
|
||||
|
||||
async def aget(self, url: str, **kwargs: Any) -> str:
|
||||
async def aget(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
|
||||
"""GET the URL and return the text asynchronously."""
|
||||
async with self.requests.aget(url, **kwargs) as response:
|
||||
return await response.text()
|
||||
return await self._aget_resp_content(response)
|
||||
|
||||
async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
async def apost(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""POST to the URL and return the text asynchronously."""
|
||||
async with self.requests.apost(url, data, **kwargs) as response:
|
||||
return await response.text()
|
||||
return await self._aget_resp_content(response)
|
||||
|
||||
async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
async def apatch(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""PATCH the URL and return the text asynchronously."""
|
||||
async with self.requests.apatch(url, data, **kwargs) as response:
|
||||
return await response.text()
|
||||
return await self._aget_resp_content(response)
|
||||
|
||||
async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
||||
async def aput(
|
||||
self, url: str, data: Dict[str, Any], **kwargs: Any
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""PUT the URL and return the text asynchronously."""
|
||||
async with self.requests.aput(url, data, **kwargs) as response:
|
||||
return await response.text()
|
||||
return await self._aget_resp_content(response)
|
||||
|
||||
async def adelete(self, url: str, **kwargs: Any) -> str:
|
||||
async def adelete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
|
||||
"""DELETE the URL and return the text asynchronously."""
|
||||
async with self.requests.adelete(url, **kwargs) as response:
|
||||
return await response.text()
|
||||
return await self._aget_resp_content(response)
|
||||
|
||||
|
||||
class JsonRequestsWrapper(GenericRequestsWrapper):
|
||||
"""Lightweight wrapper around requests library, with async support.
|
||||
|
||||
The main purpose of this wrapper is to always return a json output."""
|
||||
|
||||
response_content_type: Literal["text", "json"] = "json"
|
||||
|
||||
|
||||
class TextRequestsWrapper(GenericRequestsWrapper):
|
||||
"""Lightweight wrapper around requests library, with async support.
|
||||
|
||||
The main purpose of this wrapper is to always return a text output."""
|
||||
|
||||
response_content_type: Literal["text", "json"] = "text"
|
||||
|
||||
|
||||
# For backwards compatibility
|
||||
|
Reference in New Issue
Block a user