mirror of
https://github.com/hwchase17/langchain.git
synced 2026-03-18 02:53:16 +00:00
fix(langchain-classic): patch ReDoS vulnerability in MRKL and ReAct action regex (CVE-2024-58340) (#35598)
The action-parsing regex in `MRKLOutputParser.parse()` and `ReActSingleInputOutputParser.parse()` used the pattern `(.*?)[\s]*Action` which causes catastrophic backtracking on crafted input where whitespace characters sit between two partial `Action` tokens. An attacker can trigger near-infinite CPU consumption with a relatively short string. The fix removes the redundant `[\s]*` quantifier between the first capture group and the literal `Action` keyword. Since `re.DOTALL` is active and the preceding `(.*?)` already matches any character (including whitespace), the `[\s]*` was unnecessary and was the source of the ambiguity that enabled backtracking. Adds regression tests for both parsers that use `SIGALRM` timeouts to assert the regex completes in bounded time on adversarial input. This fix was reviewed manually. Created with [Deep Agents CLI](https://docs.langchain.com/oss/python/deepagents/cli/overview).
This commit is contained in:
@@ -41,9 +41,7 @@ class MRKLOutputParser(AgentOutputParser):
|
||||
OutputParserException: If the output could not be parsed.
|
||||
"""
|
||||
includes_answer = FINAL_ANSWER_ACTION in text
|
||||
regex = (
|
||||
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
|
||||
)
|
||||
regex = r"Action\s*\d*\s*:[\s]*(.*?)Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
|
||||
action_match = re.search(regex, text, re.DOTALL)
|
||||
if action_match and includes_answer:
|
||||
if text.find(FINAL_ANSWER_ACTION) < text.find(action_match.group(0)):
|
||||
|
||||
@@ -52,9 +52,7 @@ class ReActSingleInputOutputParser(AgentOutputParser):
|
||||
@override
|
||||
def parse(self, text: str) -> AgentAction | AgentFinish:
|
||||
includes_answer = FINAL_ANSWER_ACTION in text
|
||||
regex = (
|
||||
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
|
||||
)
|
||||
regex = r"Action\s*\d*\s*:[\s]*(.*?)Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
|
||||
action_match = re.search(regex, text, re.DOTALL)
|
||||
if action_match:
|
||||
if includes_answer:
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from langchain_core.agents import AgentAction, AgentFinish
|
||||
from langchain_core.exceptions import OutputParserException
|
||||
@@ -43,3 +46,32 @@ Action: search Final Answer:
|
||||
Action Input: what is the temperature in SF?"""
|
||||
with pytest.raises(OutputParserException):
|
||||
parser.invoke(_input)
|
||||
|
||||
|
||||
def _timeout_handler(_signum: int, _frame: object) -> None:
|
||||
msg = "ReDoS: regex took too long"
|
||||
raise TimeoutError(msg)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform == "win32", reason="SIGALRM is not available on Windows"
|
||||
)
|
||||
def test_react_single_input_no_redos() -> None:
|
||||
"""Regression test for ReDoS caused by catastrophic backtracking."""
|
||||
parser = ReActSingleInputOutputParser()
|
||||
malicious = "Action: " + " \t" * 1000 + "Action "
|
||||
old = signal.signal(signal.SIGALRM, _timeout_handler)
|
||||
signal.alarm(2)
|
||||
try:
|
||||
try:
|
||||
parser.parse(malicious)
|
||||
except OutputParserException:
|
||||
pass
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
"ReDoS detected: ReActSingleInputOutputParser.parse() "
|
||||
"hung on crafted input"
|
||||
)
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, old)
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from langchain_core.agents import AgentAction, AgentFinish
|
||||
from langchain_core.exceptions import OutputParserException
|
||||
@@ -79,3 +82,30 @@ def test_final_answer_after_parsable_action() -> None:
|
||||
"Parsing LLM output produced both a final answer and a parse-able action"
|
||||
in exception_info.value.args[0]
|
||||
)
|
||||
|
||||
|
||||
def _timeout_handler(_signum: int, _frame: object) -> None:
|
||||
msg = "ReDoS: regex took too long"
|
||||
raise TimeoutError(msg)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform == "win32", reason="SIGALRM is not available on Windows"
|
||||
)
|
||||
def test_mrkl_output_parser_no_redos() -> None:
|
||||
"""Regression test for ReDoS caused by catastrophic backtracking."""
|
||||
malicious = "Action: " + " \t" * 1000 + "Action "
|
||||
old = signal.signal(signal.SIGALRM, _timeout_handler)
|
||||
signal.alarm(2)
|
||||
try:
|
||||
try:
|
||||
mrkl_output_parser.parse(malicious)
|
||||
except OutputParserException:
|
||||
pass
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
"ReDoS detected: MRKLOutputParser.parse() hung on crafted input"
|
||||
)
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, old)
|
||||
|
||||
Reference in New Issue
Block a user