This PR proposes to create a rate limiter in the chat model directly,
and would replace: https://github.com/langchain-ai/langchain/pull/21992
It resolves most of the constraints that the Runnable rate limiter
introduced:
1. It's not annoying to apply the rate limiter to existing code; i.e.,
possible to roll out the change at the location where the model is
instantiated,
rather than at every location where the model is used! (Which is
necessary
if the model is used in different ways in a given application.)
2. batch rate limiting is enforced properly
3. the rate limiter works correctly with streaming
4. the rate limiter is aware of the cache
5. The rate limiter can take into account information about the inputs
into the
model (we can add optional inputs to it down-the road together with
outputs!)
The only downside is that information will not be properly reflected in
tracing
as we don't have any metadata evens about a rate limiter. So the total
time
spent on a model invocation will be:
* time spent waiting for the rate limiter
* time spend on the actual model request
## Example
```python
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_groq import ChatGroq
groq = ChatGroq(rate_limiter=InMemoryRateLimiter(check_every_n_seconds=1))
groq.invoke('hello')
```
Thank you for contributing to LangChain!
- [ ] **PR title**: "package: description"
- Where "package" is whichever of langchain, community, core,
experimental, etc. is being modified. Use "docs: ..." for purely docs
changes, "templates: ..." for template changes, "infra: ..." for CI
changes.
- Example: "community: add foobar LLM"
- [ ] **PR message**: ***Delete this entire checklist*** and replace
with
- **Description:** a description of the change
- **Issue:** the issue # it fixes, if applicable
- **Dependencies:** any dependencies required for this change
- **Twitter handle:** if your PR gets announced, and you'd like a
mention, we'll gladly shout you out!
- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.
- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/
Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.
If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.
This PR introduces the following Runnables:
1. BaseRateLimiter: an abstraction for specifying a time based rate
limiter as a Runnable
2. InMemoryRateLimiter: Provides an in-memory implementation of a rate
limiter
## Example
```python
from langchain_core.runnables import InMemoryRateLimiter, RunnableLambda
from datetime import datetime
foo = InMemoryRateLimiter(requests_per_second=0.5)
def meow(x):
print(datetime.now().strftime("%H:%M:%S.%f"))
return x
chain = foo | meow
for _ in range(10):
print(chain.invoke('hello'))
```
Produces:
```
17:12:07.530151
hello
17:12:09.537932
hello
17:12:11.548375
hello
17:12:13.558383
hello
17:12:15.568348
hello
17:12:17.578171
hello
17:12:19.587508
hello
17:12:21.597877
hello
17:12:23.607707
hello
17:12:25.617978
hello
```

## Interface
The rate limiter uses the following interface for acquiring a token:
```python
class BaseRateLimiter(Runnable[Input, Output], abc.ABC):
@abc.abstractmethod
def acquire(self, *, blocking: bool = True) -> bool:
"""Attempt to acquire the necessary tokens for the rate limiter.```
```
The flag `blocking` has been added to the abstraction to allow
supporting streaming (which is easier if blocking=False).
## Limitations
- The rate limiter is not designed to work across different processes.
It is an in-memory rate limiter, but it is thread safe.
- The rate limiter only supports time-based rate limiting. It does not
take into account the size of the request or any other factors.
- The current implementation does not handle streaming inputs well and
will consume all inputs even if the rate limit has been reached. Better
support for streaming inputs will be added in the future.
- When the rate limiter is combined with another runnable via a
RunnableSequence, usage of .batch() or .abatch() will only respect the
average rate limit. There will be bursty behavior as .batch() and
.abatch() wait for each step to complete before starting the next step.
One way to mitigate this is to use batch_as_completed() or
abatch_as_completed().
## Bursty behavior in `batch` and `abatch`
When the rate limiter is combined with another runnable via a
RunnableSequence, usage of .batch() or .abatch() will only respect the
average rate limit. There will be bursty behavior as .batch() and
.abatch() wait for each step to complete before starting the next step.
This becomes a problem if users are using `batch` and `abatch` with many
inputs (e.g., 100). In this case, there will be a burst of 100 inputs
into the batch of the rate limited runnable.
1. Using a RunnableBinding
The API would look like:
```python
from langchain_core.runnables import InMemoryRateLimiter, RunnableLambda
rate_limiter = InMemoryRateLimiter(requests_per_second=0.5)
def meow(x):
return x
rate_limited_meow = RunnableLambda(meow).with_rate_limiter(rate_limiter)
```
2. Another option is to add some init option to RunnableSequence that
changes `.batch()` to be depth first (e.g., by delegating to
`batch_as_completed`)
```python
RunnableSequence(first=rate_limiter, last=model, how='batch-depth-first')
```
Pros: Does not require Runnable Binding
Cons: Feels over-complicated
Feedback that `RunnableWithMessageHistory` is unwieldy compared to
ConversationChain and similar legacy abstractions is common.
Legacy chains using memory typically had no explicit notion of threads
or separate sessions. To use `RunnableWithMessageHistory`, users are
forced to introduce this concept into their code. This possibly felt
like unnecessary boilerplate.
Here we enable `RunnableWithMessageHistory` to run without a config if
the `get_session_history` callable has no arguments. This enables
minimal implementations like the following:
```python
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
memory = InMemoryChatMessageHistory()
chain = RunnableWithMessageHistory(llm, lambda: memory)
chain.invoke("Hi I'm Bob") # Hello Bob!
chain.invoke("What is my name?") # Your name is Bob.
```
Before, if an exception was raised in the outer `try` block in
`Runnable._atransform_stream_with_config` before `iterator_` is
assigned, the corresponding `finally` block would blow up with an
`UnboundLocalError`:
```txt
UnboundLocalError: cannot access local variable 'iterator_' where it is not associated with a value
```
By assigning an initial value to `iterator_` before entering the `try`
block, this commit ensures that the `finally` can run, and not bury the
"true" exception under a "During handling of the above exception [...]"
traceback.
Thanks for your consideration!
Thank you for contributing to LangChain!
- [ ] **PR title**: "package: description"
- Where "package" is whichever of langchain, community, core,
experimental, etc. is being modified. Use "docs: ..." for purely docs
changes, "templates: ..." for template changes, "infra: ..." for CI
changes.
- Example: "community: add foobar LLM"
- [ ] **PR message**: ***Delete this entire checklist*** and replace
with
- **Description:** a description of the change
- **Issue:** the issue # it fixes, if applicable
- **Dependencies:** any dependencies required for this change
- **Twitter handle:** if your PR gets announced, and you'd like a
mention, we'll gladly shout you out!
- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.
- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/
Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.
If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.
Fix#23716
Thank you for contributing to LangChain!
- [ ] **PR title**: "package: description"
- Where "package" is whichever of langchain, community, core,
experimental, etc. is being modified. Use "docs: ..." for purely docs
changes, "templates: ..." for template changes, "infra: ..." for CI
changes.
- Example: "community: add foobar LLM"
- [ ] **PR message**: ***Delete this entire checklist*** and replace
with
- **Description:** a description of the change
- **Issue:** the issue # it fixes, if applicable
- **Dependencies:** any dependencies required for this change
- **Twitter handle:** if your PR gets announced, and you'd like a
mention, we'll gladly shout you out!
- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.
- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/
Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.
If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.
---------
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This fixes processing issue for nodes with numbers in their labels (e.g.
`"node_1"`, which would previously be relabeled as `"node__"`, and now
are correctly processed as `"node_1"`)
- **Description:** When use
RunnableWithMessageHistory/SQLChatMessageHistory in async mode, we'll
get the following error:
```
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'asyncio_3'.")
```
which throwed by
ddfbca38df/libs/community/langchain_community/chat_message_histories/sql.py (L259).
and no message history will be add to database.
In this patch, a new _aexit_history function which will'be called in
async mode is added, and in turn aadd_messages will be called.
In this patch, we use `afunc` attribute of a Runnable to check if the
end listener should be run in async mode or not.
- **Issue:** #22021, #22022
- **Dependencies:** N/A
This raises ImportError due to a circular import:
```python
from langchain_core import chat_history
```
This does not:
```python
from langchain_core import runnables
from langchain_core import chat_history
```
Here we update `test_imports` to run each import in a separate
subprocess. Open to other ways of doing this!
- StopIteration can't be set on an asyncio.Future it raises a TypeError
and leaves the Future pending forever so we need to convert it to a
RuntimeError
- [x] **Adding AsyncRootListener**: "langchain_core: Adding
AsyncRootListener"
- **Description:** Adding an AsyncBaseTracer, AsyncRootListener and
`with_alistener` function. This is to enable binding async root listener
to runnables. This currently only supported for sync listeners.
- **Issue:** None
- **Dependencies:** None
- [x] **Add tests and docs**: Added units tests and example snippet code
within the function description of `with_alistener`
- [x] **Lint and test**: Run make format_diff, make lint_diff and make
test
This PR adds deduplication of callback handlers in merge_configs.
Fix for this issue:
https://github.com/langchain-ai/langchain/issues/22227
The issue appears when the code is:
1) running python >=3.11
2) invokes a runnable from within a runnable
3) binds the callbacks to the child runnable from the parent runnable
using with_config
In this case, the same callbacks end up appearing twice: (1) the first
time from with_config, (2) the second time with langchain automatically
propagating them on behalf of the user.
Prior to this PR this will emit duplicate events:
```python
@tool
async def get_items(question: str, callbacks: Callbacks): # <--- Accept callbacks
"""Ask question"""
template = ChatPromptTemplate.from_messages(
[
(
"human",
"'{question}"
)
]
)
chain = template | chat_model.with_config(
{
"callbacks": callbacks, # <-- Propagate callbacks
}
)
return await chain.ainvoke({"question": question})
```
Prior to this PR this will work work correctly (no duplicate events):
```python
@tool
async def get_items(question: str, callbacks: Callbacks): # <--- Accept callbacks
"""Ask question"""
template = ChatPromptTemplate.from_messages(
[
(
"human",
"'{question}"
)
]
)
chain = template | chat_model
return await chain.ainvoke({"question": question}, {"callbacks": callbacks})
```
This will also work (as long as the user is using python >= 3.11) -- as
langchain will automatically propagate callbacks
```python
@tool
async def get_items(question: str,):
"""Ask question"""
template = ChatPromptTemplate.from_messages(
[
(
"human",
"'{question}"
)
]
)
chain = template | chat_model
return await chain.ainvoke({"question": question})
```
Thank you for contributing to LangChain!
- [ ] **PR title**: "package: description"
- Where "package" is whichever of langchain, community, core,
experimental, etc. is being modified. Use "docs: ..." for purely docs
changes, "templates: ..." for template changes, "infra: ..." for CI
changes.
- Example: "community: add foobar LLM"
- [ ] **PR message**: ***Delete this entire checklist*** and replace
with
- **Description:** a description of the change
- **Issue:** the issue # it fixes, if applicable
- **Dependencies:** any dependencies required for this change
- **Twitter handle:** if your PR gets announced, and you'd like a
mention, we'll gladly shout you out!
- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.
- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/
Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.
If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.
- This is a pattern that shows up occasionally in langgraph questions,
people chain a graph to something else after, and want to pass the graph
some kwargs (eg. stream_mode)
LangSmith and LangChain context var handling evolved in parallel since
originally we didn't expect people to want to interweave the decorator
and langchain code.
Once we get a new langsmith release, this PR will let you seemlessly
hand off between @traceable context and runnable config context so you
can arbitrarily nest code.
It's expected that this fails right now until we get another release of
the SDK