mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-09-03 01:54:44 +00:00
feat(agent): Multi agent sdk (#976)
Co-authored-by: xtyuns <xtyuns@163.com> Co-authored-by: Fangyin Cheng <staneyffer@gmail.com> Co-authored-by: csunny <cfqsunny@163.com> Co-authored-by: qidanrui <qidanrui@gmail.com>
This commit is contained in:
468
dbgpt/util/code_utils.py
Normal file
468
dbgpt/util/code_utils.py
Normal file
@@ -0,0 +1,468 @@
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, TimeoutError
|
||||
from hashlib import md5
|
||||
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
# Regular expression for finding a code block
|
||||
# ```[ \t]*(\w+)?[ \t]*\r?\n(.*?)[ \t]*\r?\n``` Matches multi-line code blocks.
|
||||
# The [ \t]* matches the potential spaces before language name.
|
||||
# The (\w+)? matches the language, where the ? indicates it is optional.
|
||||
# The [ \t]* matches the potential spaces (not newlines) after language name.
|
||||
# The \r?\n makes sure there is a linebreak after ```.
|
||||
# The (.*?) matches the code itself (non-greedy).
|
||||
# The \r?\n makes sure there is a linebreak before ```.
|
||||
# The [ \t]* matches the potential spaces before closing ``` (the spec allows indentation).
|
||||
CODE_BLOCK_PATTERN = r"```[ \t]*(\w+)?[ \t]*\r?\n(.*?)\r?\n[ \t]*```"
|
||||
WORKING_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "extensions")
|
||||
UNKNOWN = "unknown"
|
||||
TIMEOUT_MSG = "Timeout"
|
||||
DEFAULT_TIMEOUT = 60
|
||||
WIN32 = sys.platform == "win32"
|
||||
PATH_SEPARATOR = WIN32 and "\\" or "/"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def content_str(content: Union[str, List]) -> str:
|
||||
if type(content) is str:
|
||||
return content
|
||||
rst = ""
|
||||
for item in content:
|
||||
if item["type"] == "text":
|
||||
rst += item["text"]
|
||||
else:
|
||||
assert (
|
||||
isinstance(item, dict) and item["type"] == "image_url"
|
||||
), "Wrong content format."
|
||||
rst += "<image>"
|
||||
return rst
|
||||
|
||||
|
||||
def infer_lang(code):
|
||||
"""infer the language for the code.
|
||||
TODO: make it robust.
|
||||
"""
|
||||
if (
|
||||
code.startswith("python ")
|
||||
or code.startswith("pip")
|
||||
or code.startswith("python3 ")
|
||||
):
|
||||
return "sh"
|
||||
|
||||
# check if code is a valid python code
|
||||
try:
|
||||
compile(code, "test", "exec")
|
||||
return "python"
|
||||
except SyntaxError:
|
||||
# not a valid python code
|
||||
return UNKNOWN
|
||||
|
||||
|
||||
# TODO: In the future move, to better support https://spec.commonmark.org/0.30/#fenced-code-blocks
|
||||
# perhaps by using a full Markdown parser.
|
||||
def extract_code(
|
||||
text: Union[str, List],
|
||||
pattern: str = CODE_BLOCK_PATTERN,
|
||||
detect_single_line_code: bool = False,
|
||||
) -> List[Tuple[str, str]]:
|
||||
"""Extract code from a text.
|
||||
|
||||
Args:
|
||||
text (str or List): The content to extract code from. The content can be
|
||||
a string or a list, as returned by standard GPT or multimodal GPT.
|
||||
pattern (str, optional): The regular expression pattern for finding the
|
||||
code block. Defaults to CODE_BLOCK_PATTERN.
|
||||
detect_single_line_code (bool, optional): Enable the new feature for
|
||||
extracting single line code. Defaults to False.
|
||||
|
||||
Returns:
|
||||
list: A list of tuples, each containing the language and the code.
|
||||
If there is no code block in the input text, the language would be "unknown".
|
||||
If there is code block but the language is not specified, the language would be "".
|
||||
"""
|
||||
text = content_str(text)
|
||||
if not detect_single_line_code:
|
||||
match = re.findall(pattern, text, flags=re.DOTALL)
|
||||
return match if match else [(UNKNOWN, text)]
|
||||
|
||||
# Extract both multi-line and single-line code block, separated by the | operator
|
||||
# `([^`]+)`: Matches inline code.
|
||||
code_pattern = re.compile(CODE_BLOCK_PATTERN + r"|`([^`]+)`")
|
||||
code_blocks = code_pattern.findall(text)
|
||||
|
||||
# Extract the individual code blocks and languages from the matched groups
|
||||
extracted = []
|
||||
for lang, group1, group2 in code_blocks:
|
||||
if group1:
|
||||
extracted.append((lang.strip(), group1.strip()))
|
||||
elif group2:
|
||||
extracted.append(("", group2.strip()))
|
||||
|
||||
return extracted
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(
|
||||
extract_code(
|
||||
"""```python import requests from bs4 import BeautifulSoup from datetime import datetime, timedelta # Define the search query query = "LLM application" # Define the time range (last week) end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") # Create the search URL url = f"https://arxiv.org/search/advanced?advanced=&terms-0-operator=AND&terms-0-term={query}&terms-0-field=title&classification-physics_archives=all&classification-include_cross_list=include&date-filter_by=specific_date&date-year=&date-from_date={start_date}&date-to_date={end_date}&date-date_type=submitted_date&abstracts=show&size=200&order=-announced_date_first" # Send a GET request to the search URL response = requests.get(url) # Parse the HTML content soup = BeautifulSoup(response.content, "html.parser") # Find all the paper titles and authors titles = soup.find_all("p", class_="title is-5 mathjax") authors = soup.find_all("p", class_="authors") # Print the results for i in range(len(titles)): print(f"Title: {titles[i].text.strip()}") print(f"Authors: {authors[i].text.strip()}") print("-------------------------") ``` This code uses the `requests` library to send a GET request to the advanced search page of arXiv. It searches for papers with the specified query ("LLM application") that were submitted in the last week. The code then uses `BeautifulSoup` to parse the HTML content of the search results page and extracts the paper titles and authors. Finally, it prints the titles and authors of the found papers."""
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
_IMPROVE_FUNCTION_CONFIG = {
|
||||
"prompt": """Improve the function '{func_name}' to achieve the objective '{objective}'.
|
||||
The current implementation of the function is as follows:
|
||||
{file_string}""",
|
||||
"model": "DEFAULT_MODEL",
|
||||
"request_timeout": 600,
|
||||
}
|
||||
|
||||
|
||||
_IMPROVE_CODE_CONFIG = {
|
||||
"prompt": """Analyze the code in the following files and return a list of suggestions for improvement{followup}, to achieve the objective of '{objective}'.
|
||||
{code}
|
||||
""",
|
||||
"model": "DEFAULT_MODEL",
|
||||
"request_timeout": 900,
|
||||
}
|
||||
|
||||
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutError("Timed out!")
|
||||
|
||||
|
||||
def _cmd(lang):
|
||||
if lang.startswith("python") or lang in ["bash", "sh", "powershell"]:
|
||||
return lang
|
||||
if lang in ["shell"]:
|
||||
return "sh"
|
||||
if lang in ["ps1"]:
|
||||
return "powershell"
|
||||
raise NotImplementedError(f"{lang} not recognized in code execution")
|
||||
|
||||
|
||||
def execute_code(
|
||||
code: Optional[str] = None,
|
||||
timeout: Optional[int] = None,
|
||||
filename: Optional[str] = None,
|
||||
work_dir: Optional[str] = None,
|
||||
use_docker: Optional[Union[List[str], str, bool]] = None,
|
||||
lang: Optional[str] = "python",
|
||||
) -> Tuple[int, str, str]:
|
||||
"""Execute code in a docker container.
|
||||
This function is not tested on MacOS.
|
||||
|
||||
Args:
|
||||
code (Optional, str): The code to execute.
|
||||
If None, the code from the file specified by filename will be executed.
|
||||
Either code or filename must be provided.
|
||||
timeout (Optional, int): The maximum execution time in seconds.
|
||||
If None, a default timeout will be used. The default timeout is 600 seconds. On Windows, the timeout is not enforced when use_docker=False.
|
||||
filename (Optional, str): The file name to save the code or where the code is stored when `code` is None.
|
||||
If None, a file with a randomly generated name will be created.
|
||||
The randomly generated file will be deleted after execution.
|
||||
The file name must be a relative path. Relative paths are relative to the working directory.
|
||||
work_dir (Optional, str): The working directory for the code execution.
|
||||
If None, a default working directory will be used.
|
||||
The default working directory is the "extensions" directory under
|
||||
"path_to_autogen".
|
||||
use_docker (Optional, list, str or bool): The docker image to use for code execution.
|
||||
If a list or a str of image name(s) is provided, the code will be executed in a docker container
|
||||
with the first image successfully pulled.
|
||||
If None, False or empty, the code will be executed in the current environment.
|
||||
Default is None, which will be converted into an empty list when docker package is available.
|
||||
Expected behaviour:
|
||||
- If `use_docker` is explicitly set to True and the docker package is available, the code will run in a Docker container.
|
||||
- If `use_docker` is explicitly set to True but the Docker package is missing, an error will be raised.
|
||||
- If `use_docker` is not set (i.e., left default to None) and the Docker package is not available, a warning will be displayed, but the code will run natively.
|
||||
If the code is executed in the current environment,
|
||||
the code must be trusted.
|
||||
lang (Optional, str): The language of the code. Default is "python".
|
||||
|
||||
Returns:
|
||||
int: 0 if the code executes successfully.
|
||||
str: The error message if the code fails to execute; the stdout otherwise.
|
||||
image: The docker image name after container run when docker is used.
|
||||
"""
|
||||
if all((code is None, filename is None)):
|
||||
error_msg = f"Either {code=} or {filename=} must be provided."
|
||||
logger.error(error_msg)
|
||||
raise AssertionError(error_msg)
|
||||
|
||||
# Warn if use_docker was unspecified (or None), and cannot be provided (the default).
|
||||
# In this case the current behavior is to fall back to run natively, but this behavior
|
||||
# is subject to change.
|
||||
|
||||
try:
|
||||
import docker
|
||||
|
||||
try:
|
||||
docker.version
|
||||
except AttributeError:
|
||||
docker = None
|
||||
except ImportError:
|
||||
docker = None
|
||||
|
||||
if use_docker is None:
|
||||
if docker is None:
|
||||
use_docker = False
|
||||
logger.warning(
|
||||
"execute_code was called without specifying a value for use_docker. Since the python docker package is not available, code will be run natively. Note: this fallback behavior is subject to change"
|
||||
)
|
||||
else:
|
||||
# Default to true
|
||||
use_docker = True
|
||||
|
||||
timeout = timeout or DEFAULT_TIMEOUT
|
||||
original_filename = filename
|
||||
if WIN32 and lang in ["sh", "shell"] and (not use_docker):
|
||||
lang = "ps1"
|
||||
if filename is None:
|
||||
code_hash = md5(code.encode()).hexdigest()
|
||||
# create a file with a automatically generated name
|
||||
filename = f"tmp_code_{code_hash}.{'py' if lang.startswith('python') else lang}"
|
||||
if work_dir is None:
|
||||
work_dir = WORKING_DIR
|
||||
filepath = os.path.join(work_dir, filename)
|
||||
file_dir = os.path.dirname(filepath)
|
||||
os.makedirs(file_dir, exist_ok=True)
|
||||
if code is not None:
|
||||
with open(filepath, "w", encoding="utf-8") as fout:
|
||||
fout.write(code)
|
||||
# check if already running in a docker container
|
||||
in_docker_container = os.path.exists("/.dockerenv")
|
||||
if not use_docker or in_docker_container:
|
||||
# already running in a docker container
|
||||
cmd = [
|
||||
sys.executable if lang.startswith("python") else _cmd(lang),
|
||||
f".\\{filename}" if WIN32 else filename,
|
||||
]
|
||||
if WIN32:
|
||||
logger.warning(
|
||||
"SIGALRM is not supported on Windows. No timeout will be enforced."
|
||||
)
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=work_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(
|
||||
subprocess.run,
|
||||
cmd,
|
||||
cwd=work_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
try:
|
||||
result = future.result(timeout=timeout)
|
||||
except TimeoutError:
|
||||
if original_filename is None:
|
||||
os.remove(filepath)
|
||||
return 1, TIMEOUT_MSG, None
|
||||
if original_filename is None:
|
||||
os.remove(filepath)
|
||||
if result.returncode:
|
||||
logs = result.stderr
|
||||
if original_filename is None:
|
||||
abs_path = str(pathlib.Path(filepath).absolute())
|
||||
logs = logs.replace(str(abs_path), "").replace(filename, "")
|
||||
else:
|
||||
abs_path = str(pathlib.Path(work_dir).absolute()) + PATH_SEPARATOR
|
||||
logs = logs.replace(str(abs_path), "")
|
||||
else:
|
||||
logs = result.stdout
|
||||
return result.returncode, logs, None
|
||||
|
||||
# create a docker client
|
||||
client = docker.from_env()
|
||||
image_list = (
|
||||
["python:3-alpine", "python:3", "python:3-windowsservercore"]
|
||||
if use_docker is True
|
||||
else [use_docker]
|
||||
if isinstance(use_docker, str)
|
||||
else use_docker
|
||||
)
|
||||
for image in image_list:
|
||||
# check if the image exists
|
||||
try:
|
||||
client.images.get(image)
|
||||
break
|
||||
except docker.errors.ImageNotFound:
|
||||
# pull the image
|
||||
print("Pulling image", image)
|
||||
try:
|
||||
client.images.pull(image)
|
||||
break
|
||||
except docker.errors.DockerException:
|
||||
print("Failed to pull image", image)
|
||||
# get a randomized str based on current time to wrap the exit code
|
||||
exit_code_str = f"exitcode{time.time()}"
|
||||
abs_path = pathlib.Path(work_dir).absolute()
|
||||
cmd = [
|
||||
"sh",
|
||||
"-c",
|
||||
f"{_cmd(lang)} {filename}; exit_code=$?; echo -n {exit_code_str}; echo -n $exit_code; echo {exit_code_str}",
|
||||
]
|
||||
# create a docker container
|
||||
container = client.containers.run(
|
||||
image,
|
||||
command=cmd,
|
||||
working_dir="/workspace",
|
||||
detach=True,
|
||||
# get absolute path to the working directory
|
||||
volumes={abs_path: {"bind": "/workspace", "mode": "rw"}},
|
||||
)
|
||||
start_time = time.time()
|
||||
while container.status != "exited" and time.time() - start_time < timeout:
|
||||
# Reload the container object
|
||||
container.reload()
|
||||
if container.status != "exited":
|
||||
container.stop()
|
||||
container.remove()
|
||||
if original_filename is None:
|
||||
os.remove(filepath)
|
||||
return 1, TIMEOUT_MSG, image
|
||||
# get the container logs
|
||||
logs = container.logs().decode("utf-8").rstrip()
|
||||
# commit the image
|
||||
tag = filename.replace("/", "")
|
||||
container.commit(repository="python", tag=tag)
|
||||
# remove the container
|
||||
container.remove()
|
||||
# check if the code executed successfully
|
||||
exit_code = container.attrs["State"]["ExitCode"]
|
||||
if exit_code == 0:
|
||||
# extract the exit code from the logs
|
||||
pattern = re.compile(f"{exit_code_str}(\\d+){exit_code_str}")
|
||||
match = pattern.search(logs)
|
||||
exit_code = 1 if match is None else int(match.group(1))
|
||||
# remove the exit code from the logs
|
||||
logs = logs if match is None else pattern.sub("", logs)
|
||||
|
||||
if original_filename is None:
|
||||
os.remove(filepath)
|
||||
if exit_code:
|
||||
logs = logs.replace(
|
||||
f"/workspace/{filename if original_filename is None else ''}", ""
|
||||
)
|
||||
# return the exit code, logs and image
|
||||
return exit_code, logs, f"python:{tag}"
|
||||
|
||||
|
||||
_GENERATE_ASSERTIONS_CONFIG = {
|
||||
"prompt": """Given the signature and docstring, write the exactly same number of assertion(s) for the provided example(s) in the docstring, without assertion messages.
|
||||
|
||||
func signature:
|
||||
{definition}
|
||||
assertions:""",
|
||||
"model": "FAST_MODEL",
|
||||
"max_tokens": 256,
|
||||
"stop": "\n\n",
|
||||
}
|
||||
|
||||
|
||||
def _remove_check(response):
|
||||
"""Remove the check function from the response."""
|
||||
# find the position of the check function
|
||||
pos = response.find("def check(")
|
||||
if pos == -1:
|
||||
return response
|
||||
return response[:pos]
|
||||
|
||||
|
||||
def eval_function_completions(
|
||||
responses: List[str],
|
||||
definition: str,
|
||||
test: Optional[str] = None,
|
||||
entry_point: Optional[str] = None,
|
||||
assertions: Optional[Union[str, Callable[[str], Tuple[str, float]]]] = None,
|
||||
timeout: Optional[float] = 3,
|
||||
use_docker: Optional[bool] = True,
|
||||
) -> Dict:
|
||||
"""(openai<1) Select a response from a list of responses for the function completion task (using generated assertions), and/or evaluate if the task is successful using a gold test.
|
||||
|
||||
Args:
|
||||
responses (list): The list of responses.
|
||||
definition (str): The input definition.
|
||||
test (Optional, str): The test code.
|
||||
entry_point (Optional, str): The name of the function.
|
||||
assertions (Optional, str or Callable): The assertion code which serves as a filter of the responses, or an assertion generator.
|
||||
When provided, only the responses that pass the assertions will be considered for the actual test (if provided).
|
||||
timeout (Optional, float): The timeout for executing the code.
|
||||
|
||||
Returns:
|
||||
dict: The success metrics.
|
||||
"""
|
||||
n = len(responses)
|
||||
if assertions is None:
|
||||
# no assertion filter
|
||||
success_list = []
|
||||
for i in range(n):
|
||||
response = _remove_check(responses[i])
|
||||
code = (
|
||||
f"{response}\n{test}\ncheck({entry_point})"
|
||||
if response.startswith("def")
|
||||
else f"{definition}{response}\n{test}\ncheck({entry_point})"
|
||||
)
|
||||
success = execute_code(code, timeout=timeout, use_docker=use_docker)[0] == 0
|
||||
success_list.append(success)
|
||||
return {
|
||||
"expected_success": 1 - pow(1 - sum(success_list) / n, n),
|
||||
"success": any(s for s in success_list),
|
||||
}
|
||||
if callable(assertions) and n > 1:
|
||||
# assertion generator
|
||||
assertions, gen_cost = assertions(definition)
|
||||
else:
|
||||
assertions, gen_cost = None, 0
|
||||
if n > 1 or test is None:
|
||||
for i in range(n):
|
||||
response = responses[i] = _remove_check(responses[i])
|
||||
code = (
|
||||
f"{response}\n{assertions}"
|
||||
if response.startswith("def")
|
||||
else f"{definition}{response}\n{assertions}"
|
||||
)
|
||||
succeed_assertions = (
|
||||
execute_code(code, timeout=timeout, use_docker=use_docker)[0] == 0
|
||||
)
|
||||
if succeed_assertions:
|
||||
break
|
||||
else:
|
||||
# just test, no need to check assertions
|
||||
succeed_assertions = False
|
||||
i, response = 0, responses[0]
|
||||
if test is None:
|
||||
# no test code
|
||||
return {
|
||||
"index_selected": i,
|
||||
"succeed_assertions": succeed_assertions,
|
||||
"gen_cost": gen_cost,
|
||||
"assertions": assertions,
|
||||
}
|
||||
code_test = (
|
||||
f"{response}\n{test}\ncheck({entry_point})"
|
||||
if response.startswith("def")
|
||||
else f"{definition}{response}\n{test}\ncheck({entry_point})"
|
||||
)
|
||||
success = execute_code(code_test, timeout=timeout, use_docker=use_docker)[0] == 0
|
||||
return {
|
||||
"index_selected": i,
|
||||
"succeed_assertions": succeed_assertions,
|
||||
"success": success,
|
||||
"gen_cost": gen_cost,
|
||||
"assertions": assertions,
|
||||
}
|
||||
|
||||
|
||||
_FUNC_COMPLETION_PROMPT = "# Python 3{definition}"
|
||||
_FUNC_COMPLETION_STOP = ["\nclass", "\ndef", "\nif", "\nprint"]
|
16
dbgpt/util/error_types.py
Normal file
16
dbgpt/util/error_types.py
Normal file
@@ -0,0 +1,16 @@
|
||||
class LLMChatError(Exception):
|
||||
"""
|
||||
llm conversation result generates exception
|
||||
"""
|
||||
|
||||
def __init__(self, message="LLM Chat Generrate Error!", original_exception=None):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
self.original_exception = original_exception
|
||||
|
||||
def __str__(self):
|
||||
if self.original_exception:
|
||||
# 返回自定义异常信息和原始异常信息
|
||||
return f"{self.message}({self.original_exception})"
|
||||
else:
|
||||
return self.message
|
@@ -1,5 +1,16 @@
|
||||
"""Utilities for the json_fixes package."""
|
||||
import json
|
||||
from datetime import date, datetime
|
||||
from dataclasses import dataclass, asdict, is_dataclass
|
||||
import os.path
|
||||
import re
|
||||
import logging
|
||||
|
||||
from jsonschema import Draft7Validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LLM_DEFAULT_RESPONSE_FORMAT = "llm_response_format_1"
|
||||
|
||||
|
||||
def serialize(obj):
|
||||
@@ -7,8 +18,105 @@ def serialize(obj):
|
||||
return obj.isoformat()
|
||||
|
||||
|
||||
class DateTimeEncoder(json.JSONEncoder):
|
||||
class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if is_dataclass(obj):
|
||||
return asdict(obj)
|
||||
if isinstance(obj, datetime):
|
||||
return obj.isoformat()
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
def extract_char_position(error_message: str) -> int:
|
||||
"""Extract the character position from the JSONDecodeError message.
|
||||
|
||||
Args:
|
||||
error_message (str): The error message from the JSONDecodeError
|
||||
exception.
|
||||
|
||||
Returns:
|
||||
int: The character position.
|
||||
"""
|
||||
|
||||
char_pattern = re.compile(r"\(char (\d+)\)")
|
||||
if match := char_pattern.search(error_message):
|
||||
return int(match[1])
|
||||
else:
|
||||
raise ValueError("Character position not found in the error message.")
|
||||
|
||||
|
||||
def find_json_objects(text):
|
||||
json_objects = []
|
||||
inside_string = False
|
||||
escape_character = False
|
||||
stack = []
|
||||
start_index = -1
|
||||
|
||||
for i, char in enumerate(text):
|
||||
# Handle escape characters
|
||||
if char == "\\" and not escape_character:
|
||||
escape_character = True
|
||||
continue
|
||||
|
||||
# Toggle inside_string flag
|
||||
if char == '"' and not escape_character:
|
||||
inside_string = not inside_string
|
||||
|
||||
if not inside_string and char == "\n":
|
||||
continue
|
||||
if inside_string and char == "\n":
|
||||
char = "\\n"
|
||||
if inside_string and char == "\t":
|
||||
char = "\\t"
|
||||
|
||||
# Handle opening brackets
|
||||
if char in "{[" and not inside_string:
|
||||
stack.append(char)
|
||||
if len(stack) == 1:
|
||||
start_index = i
|
||||
# Handle closing brackets
|
||||
if char in "}]" and not inside_string and stack:
|
||||
if (char == "}" and stack[-1] == "{") or (char == "]" and stack[-1] == "["):
|
||||
stack.pop()
|
||||
if not stack:
|
||||
end_index = i + 1
|
||||
try:
|
||||
json_obj = json.loads(text[start_index:end_index])
|
||||
json_objects.append(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
# Reset escape_character flag
|
||||
escape_character = False if escape_character else escape_character
|
||||
|
||||
return json_objects
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _format_json_str(jstr):
|
||||
"""Remove newlines outside of quotes, and handle JSON escape sequences.
|
||||
|
||||
1. this function removes the newline in the query outside of quotes otherwise json.loads(s) will fail.
|
||||
Ex 1:
|
||||
"{\n"tool": "python",\n"query": "print('hello')\nprint('world')"\n}" -> "{"tool": "python","query": "print('hello')\nprint('world')"}"
|
||||
Ex 2:
|
||||
"{\n \"location\": \"Boston, MA\"\n}" -> "{"location": "Boston, MA"}"
|
||||
|
||||
2. this function also handles JSON escape sequences inside quotes,
|
||||
Ex 1:
|
||||
'{"args": "a\na\na\ta"}' -> '{"args": "a\\na\\na\\ta"}'
|
||||
"""
|
||||
result = []
|
||||
inside_quotes = False
|
||||
last_char = " "
|
||||
for char in jstr:
|
||||
if last_char != "\\" and char == '"':
|
||||
inside_quotes = not inside_quotes
|
||||
last_char = char
|
||||
if not inside_quotes and char == "\n":
|
||||
continue
|
||||
if inside_quotes and char == "\n":
|
||||
char = "\\n"
|
||||
if inside_quotes and char == "\t":
|
||||
char = "\\t"
|
||||
result.append(char)
|
||||
return "".join(result)
|
||||
|
@@ -73,6 +73,15 @@ def extract_content_open_ending(long_string, s1, s2, is_include: bool = False):
|
||||
return match_map
|
||||
|
||||
|
||||
def str_to_bool(s):
|
||||
if s.lower() in ("true", "t", "1", "yes", "y"):
|
||||
return True
|
||||
elif s.lower() in ("false", "f", "0", "no", "n"):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def _to_str(x, charset="utf8", errors="strict"):
|
||||
if x is None or isinstance(x, str):
|
||||
return x
|
||||
|
Reference in New Issue
Block a user