langchain/langchain/requests.py
Kei Kamikawa 186ca9d3e4
fixed aiohttp.client_exceptions.ClientConnectionError: Connection closed (#2718)
I fixed an issue where an error would always occur when making a request
using the `TextRequestsWrapper` with async API.

This is caused by escaping the scope of the context, which causes the
connection to be broken when reading the response body.

The correct usage is as described in the [official
tutorial](https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request),
where the text method must also be handled in the context scope.

<details>

<summary>Stacktrace</summary>

```
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/tools/base.py", line 116, in arun
    raise e
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/tools/base.py", line 110, in arun
    observation = await self._arun(tool_input)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/agents/tools.py", line 22, in _arun
    return await self.coroutine(tool_input)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 234, in arun
    return (await self.acall(args[0]))[self.output_keys[0]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 154, in acall
    raise e
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 148, in acall
    outputs = await self._acall(inputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/src/tools/example.py", line 153, in _acall
    api_response = await self.requests_wrapper.aget("http://example.com")
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/requests.py", line 130, in aget
    return await response.text()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1081, in text
    await self.read()
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1037, in read
    self._body = await self.content.read()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/streams.py", line 349, in read
  raise self._exception
aiohttp.client_exceptions.ClientConnectionError: Connection closed
```

</details>
2023-04-11 10:52:55 -07:00

171 lines
6.5 KiB
Python

"""Lightweight wrapper around requests library, with async support."""
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional
import aiohttp
import requests
from pydantic import BaseModel, Extra
class Requests(BaseModel):
"""Wrapper around requests to handle auth and async.
The main purpose of this wrapper is to handle authentication (by saving
headers) and enable easy async methods on the same base object.
"""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
def get(self, url: str, **kwargs: Any) -> requests.Response:
"""GET the URL and return the text."""
return requests.get(url, headers=self.headers, **kwargs)
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""POST to the URL and return the text."""
return requests.post(url, json=data, headers=self.headers, **kwargs)
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PATCH the URL and return the text."""
return requests.patch(url, json=data, headers=self.headers, **kwargs)
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PUT the URL and return the text."""
return requests.put(url, json=data, headers=self.headers, **kwargs)
def delete(self, url: str, **kwargs: Any) -> requests.Response:
"""DELETE the URL and return the text."""
return requests.delete(url, headers=self.headers, **kwargs)
@asynccontextmanager
async def _arequest(
self, method: str, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Make an async request."""
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=self.headers, **kwargs
) as response:
yield response
else:
async with self.aiosession.request(
method, url, headers=self.headers, **kwargs
) as response:
yield response
@asynccontextmanager
async def aget(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""GET the URL and return the text asynchronously."""
async with self._arequest("GET", url, **kwargs) as response:
yield response
@asynccontextmanager
async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""POST to the URL and return the text asynchronously."""
async with self._arequest("POST", url, **kwargs) as response:
yield response
@asynccontextmanager
async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PATCH the URL and return the text asynchronously."""
async with self._arequest("PATCH", url, **kwargs) as response:
yield response
@asynccontextmanager
async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PUT the URL and return the text asynchronously."""
async with self._arequest("PUT", url, **kwargs) as response:
yield response
@asynccontextmanager
async def adelete(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""DELETE the URL and return the text asynchronously."""
async with self._arequest("DELETE", url, **kwargs) as response:
yield response
class TextRequestsWrapper(BaseModel):
"""Lightweight wrapper around requests library.
The main purpose of this wrapper is to always return a text output.
"""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
@property
def requests(self) -> Requests:
return Requests(headers=self.headers, aiosession=self.aiosession)
def get(self, url: str, **kwargs: Any) -> str:
"""GET the URL and return the text."""
return self.requests.get(url, **kwargs).text
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""POST to the URL and return the text."""
return self.requests.post(url, data, **kwargs).text
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PATCH the URL and return the text."""
return self.requests.patch(url, data, **kwargs).text
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PUT the URL and return the text."""
return self.requests.put(url, data, **kwargs).text
def delete(self, url: str, **kwargs: Any) -> str:
"""DELETE the URL and return the text."""
return self.requests.delete(url, **kwargs).text
async def aget(self, url: str, **kwargs: Any) -> str:
"""GET the URL and return the text asynchronously."""
async with self.requests.aget(url, **kwargs) as response:
return await response.text()
async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""POST to the URL and return the text asynchronously."""
async with self.requests.apost(url, **kwargs) as response:
return await response.text()
async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PATCH the URL and return the text asynchronously."""
async with self.requests.apatch(url, **kwargs) as response:
return await response.text()
async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PUT the URL and return the text asynchronously."""
async with self.requests.aput(url, **kwargs) as response:
return await response.text()
async def adelete(self, url: str, **kwargs: Any) -> str:
"""DELETE the URL and return the text asynchronously."""
async with self.requests.adelete(url, **kwargs) as response:
return await response.text()
# For backwards compatibility
RequestsWrapper = TextRequestsWrapper