mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-12 18:06:16 +00:00
<!-- Thank you for contributing to LangChain! Replace this comment with: - Description: a description of the change, - Issue: the issue # it fixes (if applicable), - Dependencies: any dependencies required for this change, - Tag maintainer: for a quicker response, tag the relevant maintainer (see below), - Twitter handle: we announce bigger features on Twitter. If your PR gets announced and you'd like a mention, we'll gladly shout you out! If you're adding a new integration, please include: 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. Maintainer responsibilities: - General / Misc / if you don't know who to tag: @baskaryan - DataLoaders / VectorStores / Retrievers: @rlancemartin, @eyurtsev - Models / Prompts: @hwchase17, @baskaryan - Memory: @hwchase17 - Agents / Tools / Toolkits: @hinthornw - Tracing / Callbacks: @agola11 - Async: @agola11 If no one reviews your PR within a few days, feel free to @-mention the same people again. See contribution guidelines for more information on how to write/run tests, lint, etc: https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md --> @hinthornw --------- Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>
171 lines
6.5 KiB
Python
171 lines
6.5 KiB
Python
"""Lightweight wrapper around requests library, with async support."""
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any, AsyncGenerator, Dict, Optional
|
|
|
|
import aiohttp
|
|
import requests
|
|
from pydantic import BaseModel, Extra
|
|
|
|
|
|
class Requests(BaseModel):
|
|
"""Wrapper around requests to handle auth and async.
|
|
|
|
The main purpose of this wrapper is to handle authentication (by saving
|
|
headers) and enable easy async methods on the same base object.
|
|
"""
|
|
|
|
headers: Optional[Dict[str, str]] = None
|
|
aiosession: Optional[aiohttp.ClientSession] = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
arbitrary_types_allowed = True
|
|
|
|
def get(self, url: str, **kwargs: Any) -> requests.Response:
|
|
"""GET the URL and return the text."""
|
|
return requests.get(url, headers=self.headers, **kwargs)
|
|
|
|
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
|
|
"""POST to the URL and return the text."""
|
|
return requests.post(url, json=data, headers=self.headers, **kwargs)
|
|
|
|
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
|
|
"""PATCH the URL and return the text."""
|
|
return requests.patch(url, json=data, headers=self.headers, **kwargs)
|
|
|
|
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
|
|
"""PUT the URL and return the text."""
|
|
return requests.put(url, json=data, headers=self.headers, **kwargs)
|
|
|
|
def delete(self, url: str, **kwargs: Any) -> requests.Response:
|
|
"""DELETE the URL and return the text."""
|
|
return requests.delete(url, headers=self.headers, **kwargs)
|
|
|
|
@asynccontextmanager
|
|
async def _arequest(
|
|
self, method: str, url: str, **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""Make an async request."""
|
|
if not self.aiosession:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.request(
|
|
method, url, headers=self.headers, **kwargs
|
|
) as response:
|
|
yield response
|
|
else:
|
|
async with self.aiosession.request(
|
|
method, url, headers=self.headers, **kwargs
|
|
) as response:
|
|
yield response
|
|
|
|
@asynccontextmanager
|
|
async def aget(
|
|
self, url: str, **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""GET the URL and return the text asynchronously."""
|
|
async with self._arequest("GET", url, **kwargs) as response:
|
|
yield response
|
|
|
|
@asynccontextmanager
|
|
async def apost(
|
|
self, url: str, data: Dict[str, Any], **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""POST to the URL and return the text asynchronously."""
|
|
async with self._arequest("POST", url, json=data, **kwargs) as response:
|
|
yield response
|
|
|
|
@asynccontextmanager
|
|
async def apatch(
|
|
self, url: str, data: Dict[str, Any], **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""PATCH the URL and return the text asynchronously."""
|
|
async with self._arequest("PATCH", url, json=data, **kwargs) as response:
|
|
yield response
|
|
|
|
@asynccontextmanager
|
|
async def aput(
|
|
self, url: str, data: Dict[str, Any], **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""PUT the URL and return the text asynchronously."""
|
|
async with self._arequest("PUT", url, json=data, **kwargs) as response:
|
|
yield response
|
|
|
|
@asynccontextmanager
|
|
async def adelete(
|
|
self, url: str, **kwargs: Any
|
|
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
|
|
"""DELETE the URL and return the text asynchronously."""
|
|
async with self._arequest("DELETE", url, **kwargs) as response:
|
|
yield response
|
|
|
|
|
|
class TextRequestsWrapper(BaseModel):
|
|
"""Lightweight wrapper around requests library.
|
|
|
|
The main purpose of this wrapper is to always return a text output.
|
|
"""
|
|
|
|
headers: Optional[Dict[str, str]] = None
|
|
aiosession: Optional[aiohttp.ClientSession] = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
arbitrary_types_allowed = True
|
|
|
|
@property
|
|
def requests(self) -> Requests:
|
|
return Requests(headers=self.headers, aiosession=self.aiosession)
|
|
|
|
def get(self, url: str, **kwargs: Any) -> str:
|
|
"""GET the URL and return the text."""
|
|
return self.requests.get(url, **kwargs).text
|
|
|
|
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""POST to the URL and return the text."""
|
|
return self.requests.post(url, data, **kwargs).text
|
|
|
|
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""PATCH the URL and return the text."""
|
|
return self.requests.patch(url, data, **kwargs).text
|
|
|
|
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""PUT the URL and return the text."""
|
|
return self.requests.put(url, data, **kwargs).text
|
|
|
|
def delete(self, url: str, **kwargs: Any) -> str:
|
|
"""DELETE the URL and return the text."""
|
|
return self.requests.delete(url, **kwargs).text
|
|
|
|
async def aget(self, url: str, **kwargs: Any) -> str:
|
|
"""GET the URL and return the text asynchronously."""
|
|
async with self.requests.aget(url, **kwargs) as response:
|
|
return await response.text()
|
|
|
|
async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""POST to the URL and return the text asynchronously."""
|
|
async with self.requests.apost(url, data, **kwargs) as response:
|
|
return await response.text()
|
|
|
|
async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""PATCH the URL and return the text asynchronously."""
|
|
async with self.requests.apatch(url, data, **kwargs) as response:
|
|
return await response.text()
|
|
|
|
async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
|
|
"""PUT the URL and return the text asynchronously."""
|
|
async with self.requests.aput(url, data, **kwargs) as response:
|
|
return await response.text()
|
|
|
|
async def adelete(self, url: str, **kwargs: Any) -> str:
|
|
"""DELETE the URL and return the text asynchronously."""
|
|
async with self.requests.adelete(url, **kwargs) as response:
|
|
return await response.text()
|
|
|
|
|
|
# For backwards compatibility
|
|
RequestsWrapper = TextRequestsWrapper
|