mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-09-09 21:08:59 +00:00
load plugin
This commit is contained in:
@@ -1,23 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
"""Agent manager for managing GPT agents"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pilot.configs.config import Config
|
||||
from pilot.singleton import Singleton
|
||||
|
||||
|
||||
class AgentManager(metaclass=Singleton):
|
||||
"""Agent manager for managing DB-GPT agents"""
|
||||
def __init__(self) -> None:
|
||||
"""Agent manager for managing GPT agents"""
|
||||
|
||||
self.agents = {} #TODO need to define
|
||||
def __init__(self):
|
||||
self.next_key = 0
|
||||
self.agents = {} # key, (task, full_message_history, model)
|
||||
self.cfg = Config()
|
||||
|
||||
def create_agent(self):
|
||||
pass
|
||||
# Create new GPT agent
|
||||
# TODO: Centralise use of create_chat_completion() to globally enforce token limit
|
||||
|
||||
def message_agent(self):
|
||||
pass
|
||||
def create_agent(self, task: str, prompt: str, model: str) -> tuple[int, str]:
|
||||
"""Create a new agent and return its key
|
||||
|
||||
def list_agents(self):
|
||||
pass
|
||||
Args:
|
||||
task: The task to perform
|
||||
prompt: The prompt to use
|
||||
model: The model to use
|
||||
|
||||
def delete_agent(self):
|
||||
pass
|
||||
Returns:
|
||||
The key of the new agent
|
||||
"""
|
||||
|
||||
|
||||
def message_agent(self, key: str | int, message: str) -> str:
|
||||
"""Send a message to an agent and return its response
|
||||
|
||||
Args:
|
||||
key: The key of the agent to message
|
||||
message: The message to send to the agent
|
||||
|
||||
Returns:
|
||||
The agent's response
|
||||
"""
|
||||
|
||||
|
||||
def list_agents(self) -> list[tuple[str | int, str]]:
|
||||
"""Return a list of all agents
|
||||
|
||||
Returns:
|
||||
A list of tuples of the form (key, task)
|
||||
"""
|
||||
|
||||
# Return a list of agent keys and their tasks
|
||||
return [(key, task) for key, (task, _, _) in self.agents.items()]
|
||||
|
||||
def delete_agent(self, key: str | int) -> bool:
|
||||
"""Delete an agent from the agent manager
|
||||
|
||||
Args:
|
||||
key: The key of the agent to delete
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
|
||||
try:
|
||||
del self.agents[int(key)]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
0
pilot/commands/__init__.py
Normal file
0
pilot/commands/__init__.py
Normal file
61
pilot/commands/audio_text.py
Normal file
61
pilot/commands/audio_text.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Commands for converting audio to text."""
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
|
||||
CFG = Config()
|
||||
|
||||
|
||||
@command(
|
||||
"read_audio_from_file",
|
||||
"Convert Audio to text",
|
||||
'"filename": "<filename>"',
|
||||
CFG.huggingface_audio_to_text_model,
|
||||
"Configure huggingface_audio_to_text_model.",
|
||||
)
|
||||
def read_audio_from_file(filename: str) -> str:
|
||||
"""
|
||||
Convert audio to text.
|
||||
|
||||
Args:
|
||||
filename (str): The path to the audio file
|
||||
|
||||
Returns:
|
||||
str: The text from the audio
|
||||
"""
|
||||
with open(filename, "rb") as audio_file:
|
||||
audio = audio_file.read()
|
||||
return read_audio(audio)
|
||||
|
||||
|
||||
def read_audio(audio: bytes) -> str:
|
||||
"""
|
||||
Convert audio to text.
|
||||
|
||||
Args:
|
||||
audio (bytes): The audio to convert
|
||||
|
||||
Returns:
|
||||
str: The text from the audio
|
||||
"""
|
||||
model = CFG.huggingface_audio_to_text_model
|
||||
api_url = f"https://api-inference.huggingface.co/models/{model}"
|
||||
api_token = CFG.huggingface_api_token
|
||||
headers = {"Authorization": f"Bearer {api_token}"}
|
||||
|
||||
if api_token is None:
|
||||
raise ValueError(
|
||||
"You need to set your Hugging Face API token in the config file."
|
||||
)
|
||||
|
||||
response = requests.post(
|
||||
api_url,
|
||||
headers=headers,
|
||||
data=audio,
|
||||
)
|
||||
|
||||
text = json.loads(response.content.decode("utf-8"))["text"]
|
||||
return f"The audio says: {text}"
|
156
pilot/commands/command_mange.py
Normal file
156
pilot/commands/command_mange.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import functools
|
||||
import importlib
|
||||
import inspect
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
# Unique identifier for auto-gpt commands
|
||||
AUTO_GPT_COMMAND_IDENTIFIER = "auto_gpt_command"
|
||||
|
||||
|
||||
class Command:
|
||||
"""A class representing a command.
|
||||
|
||||
Attributes:
|
||||
name (str): The name of the command.
|
||||
description (str): A brief description of what the command does.
|
||||
signature (str): The signature of the function that the command executes. Defaults to None.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
method: Callable[..., Any],
|
||||
signature: str = "",
|
||||
enabled: bool = True,
|
||||
disabled_reason: Optional[str] = None,
|
||||
):
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.method = method
|
||||
self.signature = signature if signature else str(inspect.signature(self.method))
|
||||
self.enabled = enabled
|
||||
self.disabled_reason = disabled_reason
|
||||
|
||||
def __call__(self, *args, **kwargs) -> Any:
|
||||
if not self.enabled:
|
||||
return f"Command '{self.name}' is disabled: {self.disabled_reason}"
|
||||
return self.method(*args, **kwargs)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.name}: {self.description}, args: {self.signature}"
|
||||
|
||||
|
||||
class CommandRegistry:
|
||||
"""
|
||||
The CommandRegistry class is a manager for a collection of Command objects.
|
||||
It allows the registration, modification, and retrieval of Command objects,
|
||||
as well as the scanning and loading of command plugins from a specified
|
||||
directory.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.commands = {}
|
||||
|
||||
def _import_module(self, module_name: str) -> Any:
|
||||
return importlib.import_module(module_name)
|
||||
|
||||
def _reload_module(self, module: Any) -> Any:
|
||||
return importlib.reload(module)
|
||||
|
||||
def register(self, cmd: Command) -> None:
|
||||
self.commands[cmd.name] = cmd
|
||||
|
||||
def unregister(self, command_name: str):
|
||||
if command_name in self.commands:
|
||||
del self.commands[command_name]
|
||||
else:
|
||||
raise KeyError(f"Command '{command_name}' not found in registry.")
|
||||
|
||||
def reload_commands(self) -> None:
|
||||
"""Reloads all loaded command plugins."""
|
||||
for cmd_name in self.commands:
|
||||
cmd = self.commands[cmd_name]
|
||||
module = self._import_module(cmd.__module__)
|
||||
reloaded_module = self._reload_module(module)
|
||||
if hasattr(reloaded_module, "register"):
|
||||
reloaded_module.register(self)
|
||||
|
||||
def get_command(self, name: str) -> Callable[..., Any]:
|
||||
return self.commands[name]
|
||||
|
||||
def call(self, command_name: str, **kwargs) -> Any:
|
||||
if command_name not in self.commands:
|
||||
raise KeyError(f"Command '{command_name}' not found in registry.")
|
||||
command = self.commands[command_name]
|
||||
return command(**kwargs)
|
||||
|
||||
def command_prompt(self) -> str:
|
||||
"""
|
||||
Returns a string representation of all registered `Command` objects for use in a prompt
|
||||
"""
|
||||
commands_list = [
|
||||
f"{idx + 1}. {str(cmd)}" for idx, cmd in enumerate(self.commands.values())
|
||||
]
|
||||
return "\n".join(commands_list)
|
||||
|
||||
def import_commands(self, module_name: str) -> None:
|
||||
"""
|
||||
Imports the specified Python module containing command plugins.
|
||||
|
||||
This method imports the associated module and registers any functions or
|
||||
classes that are decorated with the `AUTO_GPT_COMMAND_IDENTIFIER` attribute
|
||||
as `Command` objects. The registered `Command` objects are then added to the
|
||||
`commands` dictionary of the `CommandRegistry` object.
|
||||
|
||||
Args:
|
||||
module_name (str): The name of the module to import for command plugins.
|
||||
"""
|
||||
|
||||
module = importlib.import_module(module_name)
|
||||
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
# Register decorated functions
|
||||
if hasattr(attr, AUTO_GPT_COMMAND_IDENTIFIER) and getattr(
|
||||
attr, AUTO_GPT_COMMAND_IDENTIFIER
|
||||
):
|
||||
self.register(attr.command)
|
||||
# Register command classes
|
||||
elif (
|
||||
inspect.isclass(attr) and issubclass(attr, Command) and attr != Command
|
||||
):
|
||||
cmd_instance = attr()
|
||||
self.register(cmd_instance)
|
||||
|
||||
|
||||
def command(
|
||||
name: str,
|
||||
description: str,
|
||||
signature: str = "",
|
||||
enabled: bool = True,
|
||||
disabled_reason: Optional[str] = None,
|
||||
) -> Callable[..., Any]:
|
||||
"""The command decorator is used to create Command objects from ordinary functions."""
|
||||
|
||||
def decorator(func: Callable[..., Any]) -> Command:
|
||||
cmd = Command(
|
||||
name=name,
|
||||
description=description,
|
||||
method=func,
|
||||
signature=signature,
|
||||
enabled=enabled,
|
||||
disabled_reason=disabled_reason,
|
||||
)
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs) -> Any:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
wrapper.command = cmd
|
||||
|
||||
setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
107
pilot/commands/commands.py
Normal file
107
pilot/commands/commands.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import json
|
||||
|
||||
|
||||
def is_valid_int(value: str) -> bool:
|
||||
"""Check if the value is a valid integer
|
||||
|
||||
Args:
|
||||
value (str): The value to check
|
||||
|
||||
Returns:
|
||||
bool: True if the value is a valid integer, False otherwise
|
||||
"""
|
||||
try:
|
||||
int(value)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def get_command(response_json: Dict):
|
||||
"""Parse the response and return the command name and arguments
|
||||
|
||||
Args:
|
||||
response_json (json): The response from the AI
|
||||
|
||||
Returns:
|
||||
tuple: The command name and arguments
|
||||
|
||||
Raises:
|
||||
json.decoder.JSONDecodeError: If the response is not valid JSON
|
||||
|
||||
Exception: If any other error occurs
|
||||
"""
|
||||
try:
|
||||
if "command" not in response_json:
|
||||
return "Error:", "Missing 'command' object in JSON"
|
||||
|
||||
if not isinstance(response_json, dict):
|
||||
return "Error:", f"'response_json' object is not dictionary {response_json}"
|
||||
|
||||
command = response_json["command"]
|
||||
if not isinstance(command, dict):
|
||||
return "Error:", "'command' object is not a dictionary"
|
||||
|
||||
if "name" not in command:
|
||||
return "Error:", "Missing 'name' field in 'command' object"
|
||||
|
||||
command_name = command["name"]
|
||||
|
||||
# Use an empty dictionary if 'args' field is not present in 'command' object
|
||||
arguments = command.get("args", {})
|
||||
|
||||
return command_name, arguments
|
||||
except json.decoder.JSONDecodeError:
|
||||
return "Error:", "Invalid JSON"
|
||||
# All other errors, return "Error: + error message"
|
||||
except Exception as e:
|
||||
return "Error:", str(e)
|
||||
|
||||
|
||||
|
||||
|
||||
def execute_command(
|
||||
command_registry: CommandRegistry,
|
||||
command_name: str,
|
||||
arguments,
|
||||
prompt: PromptGenerator,
|
||||
):
|
||||
"""Execute the command and return the result
|
||||
|
||||
Args:
|
||||
command_name (str): The name of the command to execute
|
||||
arguments (dict): The arguments for the command
|
||||
|
||||
Returns:
|
||||
str: The result of the command
|
||||
"""
|
||||
try:
|
||||
cmd = command_registry.commands.get(command_name)
|
||||
|
||||
# If the command is found, call it with the provided arguments
|
||||
if cmd:
|
||||
return cmd(**arguments)
|
||||
|
||||
# TODO: Remove commands below after they are moved to the command registry.
|
||||
command_name = map_command_synonyms(command_name.lower())
|
||||
|
||||
if command_name == "memory_add":
|
||||
return get_memory(CFG).add(arguments["string"])
|
||||
|
||||
# TODO: Change these to take in a file rather than pasted code, if
|
||||
# non-file is given, return instructions "Input should be a python
|
||||
# filepath, write your code to file and try again
|
||||
else:
|
||||
for command in prompt.commands:
|
||||
if (
|
||||
command_name == command["label"].lower()
|
||||
or command_name == command["name"].lower()
|
||||
):
|
||||
return command["function"](**arguments)
|
||||
return (
|
||||
f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'"
|
||||
" list for available commands and only respond in the specified JSON"
|
||||
" format."
|
||||
)
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
28
pilot/commands/commands_load.py
Normal file
28
pilot/commands/commands_load.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from pilot.configs.config import Config
|
||||
from pilot.prompts.generator import PromptGenerator
|
||||
from __future__ import annotations
|
||||
from typing import Any, Optional, Type
|
||||
from pilot.prompts.prompt import build_default_prompt_generator
|
||||
|
||||
|
||||
class CommandsLoad:
|
||||
"""
|
||||
Load Plugins Commands Info , help build system prompt!
|
||||
"""
|
||||
|
||||
def __init__(self)->None:
|
||||
self.command_registry = None
|
||||
|
||||
|
||||
def getCommandInfos(self, prompt_generator: Optional[PromptGenerator] = None)-> str:
|
||||
cfg = Config()
|
||||
if prompt_generator is None:
|
||||
prompt_generator = build_default_prompt_generator()
|
||||
for plugin in cfg.plugins:
|
||||
if not plugin.can_handle_post_prompt():
|
||||
continue
|
||||
prompt_generator = plugin.post_prompt(prompt_generator)
|
||||
self.prompt_generator = prompt_generator
|
||||
command_infos = ""
|
||||
command_infos += f"\n\n{prompt_generator.commands()}"
|
||||
return command_infos
|
343
pilot/commands/file_operations.py
Normal file
343
pilot/commands/file_operations.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""File operations for AutoGPT"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import os.path
|
||||
from typing import Dict, Generator, Literal, Tuple
|
||||
|
||||
import charset_normalizer
|
||||
import requests
|
||||
from colorama import Back, Fore
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.logs import logger
|
||||
from autogpt.spinner import Spinner
|
||||
from autogpt.utils import readable_file_size
|
||||
|
||||
CFG = Config()
|
||||
|
||||
Operation = Literal["write", "append", "delete"]
|
||||
|
||||
|
||||
def text_checksum(text: str) -> str:
|
||||
"""Get the hex checksum for the given text."""
|
||||
return hashlib.md5(text.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def operations_from_log(log_path: str) -> Generator[Tuple[Operation, str, str | None]]:
|
||||
"""Parse the file operations log and return a tuple containing the log entries"""
|
||||
try:
|
||||
log = open(log_path, "r", encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return
|
||||
|
||||
for line in log:
|
||||
line = line.replace("File Operation Logger", "").strip()
|
||||
if not line:
|
||||
continue
|
||||
operation, tail = line.split(": ", maxsplit=1)
|
||||
operation = operation.strip()
|
||||
if operation in ("write", "append"):
|
||||
try:
|
||||
path, checksum = (x.strip() for x in tail.rsplit(" #", maxsplit=1))
|
||||
except ValueError:
|
||||
path, checksum = tail.strip(), None
|
||||
yield (operation, path, checksum)
|
||||
elif operation == "delete":
|
||||
yield (operation, tail.strip(), None)
|
||||
|
||||
log.close()
|
||||
|
||||
|
||||
def file_operations_state(log_path: str) -> Dict:
|
||||
"""Iterates over the operations log and returns the expected state.
|
||||
|
||||
Parses a log file at CFG.file_logger_path to construct a dictionary that maps
|
||||
each file path written or appended to its checksum. Deleted files are removed
|
||||
from the dictionary.
|
||||
|
||||
Returns:
|
||||
A dictionary mapping file paths to their checksums.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If CFG.file_logger_path is not found.
|
||||
ValueError: If the log file content is not in the expected format.
|
||||
"""
|
||||
state = {}
|
||||
for operation, path, checksum in operations_from_log(log_path):
|
||||
if operation in ("write", "append"):
|
||||
state[path] = checksum
|
||||
elif operation == "delete":
|
||||
del state[path]
|
||||
return state
|
||||
|
||||
|
||||
def is_duplicate_operation(
|
||||
operation: Operation, filename: str, checksum: str | None = None
|
||||
) -> bool:
|
||||
"""Check if the operation has already been performed
|
||||
|
||||
Args:
|
||||
operation: The operation to check for
|
||||
filename: The name of the file to check for
|
||||
checksum: The checksum of the contents to be written
|
||||
|
||||
Returns:
|
||||
True if the operation has already been performed on the file
|
||||
"""
|
||||
state = file_operations_state(CFG.file_logger_path)
|
||||
if operation == "delete" and filename not in state:
|
||||
return True
|
||||
if operation == "write" and state.get(filename) == checksum:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def log_operation(operation: str, filename: str, checksum: str | None = None) -> None:
|
||||
"""Log the file operation to the file_logger.txt
|
||||
|
||||
Args:
|
||||
operation: The operation to log
|
||||
filename: The name of the file the operation was performed on
|
||||
checksum: The checksum of the contents to be written
|
||||
"""
|
||||
log_entry = f"{operation}: {filename}"
|
||||
if checksum is not None:
|
||||
log_entry += f" #{checksum}"
|
||||
logger.debug(f"Logging file operation: {log_entry}")
|
||||
append_to_file(CFG.file_logger_path, f"{log_entry}\n", should_log=False)
|
||||
|
||||
|
||||
def split_file(
|
||||
content: str, max_length: int = 4000, overlap: int = 0
|
||||
) -> Generator[str, None, None]:
|
||||
"""
|
||||
Split text into chunks of a specified maximum length with a specified overlap
|
||||
between chunks.
|
||||
|
||||
:param content: The input text to be split into chunks
|
||||
:param max_length: The maximum length of each chunk,
|
||||
default is 4000 (about 1k token)
|
||||
:param overlap: The number of overlapping characters between chunks,
|
||||
default is no overlap
|
||||
:return: A generator yielding chunks of text
|
||||
"""
|
||||
start = 0
|
||||
content_length = len(content)
|
||||
|
||||
while start < content_length:
|
||||
end = start + max_length
|
||||
if end + overlap < content_length:
|
||||
chunk = content[start : end + overlap - 1]
|
||||
else:
|
||||
chunk = content[start:content_length]
|
||||
|
||||
# Account for the case where the last chunk is shorter than the overlap, so it has already been consumed
|
||||
if len(chunk) <= overlap:
|
||||
break
|
||||
|
||||
yield chunk
|
||||
start += max_length - overlap
|
||||
|
||||
|
||||
@command("read_file", "Read file", '"filename": "<filename>"')
|
||||
def read_file(filename: str) -> str:
|
||||
"""Read a file and return the contents
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to read
|
||||
|
||||
Returns:
|
||||
str: The contents of the file
|
||||
"""
|
||||
try:
|
||||
charset_match = charset_normalizer.from_path(filename).best()
|
||||
encoding = charset_match.encoding
|
||||
logger.debug(f"Read file '{filename}' with encoding '{encoding}'")
|
||||
return str(charset_match)
|
||||
except Exception as err:
|
||||
return f"Error: {err}"
|
||||
|
||||
|
||||
def ingest_file(
|
||||
filename: str, memory, max_length: int = 4000, overlap: int = 200
|
||||
) -> None:
|
||||
"""
|
||||
Ingest a file by reading its content, splitting it into chunks with a specified
|
||||
maximum length and overlap, and adding the chunks to the memory storage.
|
||||
|
||||
:param filename: The name of the file to ingest
|
||||
:param memory: An object with an add() method to store the chunks in memory
|
||||
:param max_length: The maximum length of each chunk, default is 4000
|
||||
:param overlap: The number of overlapping characters between chunks, default is 200
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Working with file {filename}")
|
||||
content = read_file(filename)
|
||||
content_length = len(content)
|
||||
logger.info(f"File length: {content_length} characters")
|
||||
|
||||
chunks = list(split_file(content, max_length=max_length, overlap=overlap))
|
||||
|
||||
num_chunks = len(chunks)
|
||||
for i, chunk in enumerate(chunks):
|
||||
logger.info(f"Ingesting chunk {i + 1} / {num_chunks} into memory")
|
||||
memory_to_add = (
|
||||
f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}"
|
||||
)
|
||||
|
||||
memory.add(memory_to_add)
|
||||
|
||||
logger.info(f"Done ingesting {num_chunks} chunks from {filename}.")
|
||||
except Exception as err:
|
||||
logger.info(f"Error while ingesting file '{filename}': {err}")
|
||||
|
||||
|
||||
@command("write_to_file", "Write to file", '"filename": "<filename>", "text": "<text>"')
|
||||
def write_to_file(filename: str, text: str) -> str:
|
||||
"""Write text to a file
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to write to
|
||||
text (str): The text to write to the file
|
||||
|
||||
Returns:
|
||||
str: A message indicating success or failure
|
||||
"""
|
||||
checksum = text_checksum(text)
|
||||
if is_duplicate_operation("write", filename, checksum):
|
||||
return "Error: File has already been updated."
|
||||
try:
|
||||
directory = os.path.dirname(filename)
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
with open(filename, "w", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
log_operation("write", filename, checksum)
|
||||
return "File written to successfully."
|
||||
except Exception as err:
|
||||
return f"Error: {err}"
|
||||
|
||||
|
||||
@command(
|
||||
"append_to_file", "Append to file", '"filename": "<filename>", "text": "<text>"'
|
||||
)
|
||||
def append_to_file(filename: str, text: str, should_log: bool = True) -> str:
|
||||
"""Append text to a file
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to append to
|
||||
text (str): The text to append to the file
|
||||
should_log (bool): Should log output
|
||||
|
||||
Returns:
|
||||
str: A message indicating success or failure
|
||||
"""
|
||||
try:
|
||||
directory = os.path.dirname(filename)
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
with open(filename, "a", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
|
||||
if should_log:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
checksum = text_checksum(f.read())
|
||||
log_operation("append", filename, checksum=checksum)
|
||||
|
||||
return "Text appended successfully."
|
||||
except Exception as err:
|
||||
return f"Error: {err}"
|
||||
|
||||
|
||||
@command("delete_file", "Delete file", '"filename": "<filename>"')
|
||||
def delete_file(filename: str) -> str:
|
||||
"""Delete a file
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to delete
|
||||
|
||||
Returns:
|
||||
str: A message indicating success or failure
|
||||
"""
|
||||
if is_duplicate_operation("delete", filename):
|
||||
return "Error: File has already been deleted."
|
||||
try:
|
||||
os.remove(filename)
|
||||
log_operation("delete", filename)
|
||||
return "File deleted successfully."
|
||||
except Exception as err:
|
||||
return f"Error: {err}"
|
||||
|
||||
|
||||
@command("list_files", "List Files in Directory", '"directory": "<directory>"')
|
||||
def list_files(directory: str) -> list[str]:
|
||||
"""lists files in a directory recursively
|
||||
|
||||
Args:
|
||||
directory (str): The directory to search in
|
||||
|
||||
Returns:
|
||||
list[str]: A list of files found in the directory
|
||||
"""
|
||||
found_files = []
|
||||
|
||||
for root, _, files in os.walk(directory):
|
||||
for file in files:
|
||||
if file.startswith("."):
|
||||
continue
|
||||
relative_path = os.path.relpath(
|
||||
os.path.join(root, file), CFG.workspace_path
|
||||
)
|
||||
found_files.append(relative_path)
|
||||
|
||||
return found_files
|
||||
|
||||
|
||||
@command(
|
||||
"download_file",
|
||||
"Download File",
|
||||
'"url": "<url>", "filename": "<filename>"',
|
||||
CFG.allow_downloads,
|
||||
"Error: You do not have user authorization to download files locally.",
|
||||
)
|
||||
def download_file(url, filename):
|
||||
"""Downloads a file
|
||||
Args:
|
||||
url (str): URL of the file to download
|
||||
filename (str): Filename to save the file as
|
||||
"""
|
||||
try:
|
||||
directory = os.path.dirname(filename)
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}"
|
||||
with Spinner(message) as spinner:
|
||||
session = requests.Session()
|
||||
retry = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504])
|
||||
adapter = HTTPAdapter(max_retries=retry)
|
||||
session.mount("http://", adapter)
|
||||
session.mount("https://", adapter)
|
||||
|
||||
total_size = 0
|
||||
downloaded_size = 0
|
||||
|
||||
with session.get(url, allow_redirects=True, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
total_size = int(r.headers.get("Content-Length", 0))
|
||||
downloaded_size = 0
|
||||
|
||||
with open(filename, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
|
||||
# Update the progress message
|
||||
progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}"
|
||||
spinner.update_message(f"{message} {progress}")
|
||||
|
||||
return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(downloaded_size)})'
|
||||
except requests.HTTPError as err:
|
||||
return f"Got an HTTP Error whilst trying to download file: {err}"
|
||||
except Exception as err:
|
||||
return f"Error: {err}"
|
165
pilot/commands/image_gen.py
Normal file
165
pilot/commands/image_gen.py
Normal file
@@ -0,0 +1,165 @@
|
||||
""" Image Generation Module for AutoGPT."""
|
||||
import io
|
||||
import uuid
|
||||
from base64 import b64decode
|
||||
|
||||
import openai
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.logs import logger
|
||||
|
||||
CFG = Config()
|
||||
|
||||
|
||||
@command("generate_image", "Generate Image", '"prompt": "<prompt>"', CFG.image_provider)
|
||||
def generate_image(prompt: str, size: int = 256) -> str:
|
||||
"""Generate an image from a prompt.
|
||||
|
||||
Args:
|
||||
prompt (str): The prompt to use
|
||||
size (int, optional): The size of the image. Defaults to 256. (Not supported by HuggingFace)
|
||||
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
filename = f"{CFG.workspace_path}/{str(uuid.uuid4())}.jpg"
|
||||
|
||||
# DALL-E
|
||||
if CFG.image_provider == "dalle":
|
||||
return generate_image_with_dalle(prompt, filename, size)
|
||||
# HuggingFace
|
||||
elif CFG.image_provider == "huggingface":
|
||||
return generate_image_with_hf(prompt, filename)
|
||||
# SD WebUI
|
||||
elif CFG.image_provider == "sdwebui":
|
||||
return generate_image_with_sd_webui(prompt, filename, size)
|
||||
return "No Image Provider Set"
|
||||
|
||||
|
||||
def generate_image_with_hf(prompt: str, filename: str) -> str:
|
||||
"""Generate an image with HuggingFace's API.
|
||||
|
||||
Args:
|
||||
prompt (str): The prompt to use
|
||||
filename (str): The filename to save the image to
|
||||
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
API_URL = (
|
||||
f"https://api-inference.huggingface.co/models/{CFG.huggingface_image_model}"
|
||||
)
|
||||
if CFG.huggingface_api_token is None:
|
||||
raise ValueError(
|
||||
"You need to set your Hugging Face API token in the config file."
|
||||
)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {CFG.huggingface_api_token}",
|
||||
"X-Use-Cache": "false",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
API_URL,
|
||||
headers=headers,
|
||||
json={
|
||||
"inputs": prompt,
|
||||
},
|
||||
)
|
||||
|
||||
image = Image.open(io.BytesIO(response.content))
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
image.save(filename)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
|
||||
def generate_image_with_dalle(prompt: str, filename: str, size: int) -> str:
|
||||
"""Generate an image with DALL-E.
|
||||
|
||||
Args:
|
||||
prompt (str): The prompt to use
|
||||
filename (str): The filename to save the image to
|
||||
size (int): The size of the image
|
||||
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
|
||||
# Check for supported image sizes
|
||||
if size not in [256, 512, 1024]:
|
||||
closest = min([256, 512, 1024], key=lambda x: abs(x - size))
|
||||
logger.info(
|
||||
f"DALL-E only supports image sizes of 256x256, 512x512, or 1024x1024. Setting to {closest}, was {size}."
|
||||
)
|
||||
size = closest
|
||||
|
||||
response = openai.Image.create(
|
||||
prompt=prompt,
|
||||
n=1,
|
||||
size=f"{size}x{size}",
|
||||
response_format="b64_json",
|
||||
api_key=CFG.openai_api_key,
|
||||
)
|
||||
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
image_data = b64decode(response["data"][0]["b64_json"])
|
||||
|
||||
with open(filename, mode="wb") as png:
|
||||
png.write(image_data)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
|
||||
def generate_image_with_sd_webui(
|
||||
prompt: str,
|
||||
filename: str,
|
||||
size: int = 512,
|
||||
negative_prompt: str = "",
|
||||
extra: dict = {},
|
||||
) -> str:
|
||||
"""Generate an image with Stable Diffusion webui.
|
||||
Args:
|
||||
prompt (str): The prompt to use
|
||||
filename (str): The filename to save the image to
|
||||
size (int, optional): The size of the image. Defaults to 256.
|
||||
negative_prompt (str, optional): The negative prompt to use. Defaults to "".
|
||||
extra (dict, optional): Extra parameters to pass to the API. Defaults to {}.
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
# Create a session and set the basic auth if needed
|
||||
s = requests.Session()
|
||||
if CFG.sd_webui_auth:
|
||||
username, password = CFG.sd_webui_auth.split(":")
|
||||
s.auth = (username, password or "")
|
||||
|
||||
# Generate the images
|
||||
response = requests.post(
|
||||
f"{CFG.sd_webui_url}/sdapi/v1/txt2img",
|
||||
json={
|
||||
"prompt": prompt,
|
||||
"negative_prompt": negative_prompt,
|
||||
"sampler_index": "DDIM",
|
||||
"steps": 20,
|
||||
"cfg_scale": 7.0,
|
||||
"width": size,
|
||||
"height": size,
|
||||
"n_iter": 1,
|
||||
**extra,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
# Save the image to disk
|
||||
response = response.json()
|
||||
b64 = b64decode(response["images"][0].split(",", 1)[0])
|
||||
image = Image.open(io.BytesIO(b64))
|
||||
image.save(filename)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
10
pilot/commands/times.py
Normal file
10
pilot/commands/times.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def get_datetime() -> str:
|
||||
"""Return the current date and time
|
||||
|
||||
Returns:
|
||||
str: The current date and time
|
||||
"""
|
||||
return "Current date and time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
82
pilot/commands/web_playwright.py
Normal file
82
pilot/commands/web_playwright.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Web scraping commands using Playwright"""
|
||||
from __future__ import annotations
|
||||
|
||||
from autogpt.logs import logger
|
||||
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright
|
||||
except ImportError:
|
||||
logger.info(
|
||||
"Playwright not installed. Please install it with 'pip install playwright' to use."
|
||||
)
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
|
||||
|
||||
|
||||
def scrape_text(url: str) -> str:
|
||||
"""Scrape text from a webpage
|
||||
|
||||
Args:
|
||||
url (str): The URL to scrape text from
|
||||
|
||||
Returns:
|
||||
str: The scraped text
|
||||
"""
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch()
|
||||
page = browser.new_page()
|
||||
|
||||
try:
|
||||
page.goto(url)
|
||||
html_content = page.content()
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||||
text = "\n".join(chunk for chunk in chunks if chunk)
|
||||
|
||||
except Exception as e:
|
||||
text = f"Error: {str(e)}"
|
||||
|
||||
finally:
|
||||
browser.close()
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def scrape_links(url: str) -> str | list[str]:
|
||||
"""Scrape links from a webpage
|
||||
|
||||
Args:
|
||||
url (str): The URL to scrape links from
|
||||
|
||||
Returns:
|
||||
Union[str, List[str]]: The scraped links
|
||||
"""
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch()
|
||||
page = browser.new_page()
|
||||
|
||||
try:
|
||||
page.goto(url)
|
||||
html_content = page.content()
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
hyperlinks = extract_hyperlinks(soup, url)
|
||||
formatted_links = format_hyperlinks(hyperlinks)
|
||||
|
||||
except Exception as e:
|
||||
formatted_links = f"Error: {str(e)}"
|
||||
|
||||
finally:
|
||||
browser.close()
|
||||
|
||||
return formatted_links
|
112
pilot/commands/web_requests.py
Normal file
112
pilot/commands/web_requests.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Browse a webpage and summarize it using the LLM model"""
|
||||
from __future__ import annotations
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from requests import Response
|
||||
|
||||
from autogpt.config import Config
|
||||
from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
|
||||
from autogpt.url_utils.validators import validate_url
|
||||
|
||||
CFG = Config()
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update({"User-Agent": CFG.user_agent})
|
||||
|
||||
|
||||
@validate_url
|
||||
def get_response(
|
||||
url: str, timeout: int = 10
|
||||
) -> tuple[None, str] | tuple[Response, None]:
|
||||
"""Get the response from a URL
|
||||
|
||||
Args:
|
||||
url (str): The URL to get the response from
|
||||
timeout (int): The timeout for the HTTP request
|
||||
|
||||
Returns:
|
||||
tuple[None, str] | tuple[Response, None]: The response and error message
|
||||
|
||||
Raises:
|
||||
ValueError: If the URL is invalid
|
||||
requests.exceptions.RequestException: If the HTTP request fails
|
||||
"""
|
||||
try:
|
||||
response = session.get(url, timeout=timeout)
|
||||
|
||||
# Check if the response contains an HTTP error
|
||||
if response.status_code >= 400:
|
||||
return None, f"Error: HTTP {str(response.status_code)} error"
|
||||
|
||||
return response, None
|
||||
except ValueError as ve:
|
||||
# Handle invalid URL format
|
||||
return None, f"Error: {str(ve)}"
|
||||
|
||||
except requests.exceptions.RequestException as re:
|
||||
# Handle exceptions related to the HTTP request
|
||||
# (e.g., connection errors, timeouts, etc.)
|
||||
return None, f"Error: {str(re)}"
|
||||
|
||||
|
||||
def scrape_text(url: str) -> str:
|
||||
"""Scrape text from a webpage
|
||||
|
||||
Args:
|
||||
url (str): The URL to scrape text from
|
||||
|
||||
Returns:
|
||||
str: The scraped text
|
||||
"""
|
||||
response, error_message = get_response(url)
|
||||
if error_message:
|
||||
return error_message
|
||||
if not response:
|
||||
return "Error: Could not get response"
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||||
text = "\n".join(chunk for chunk in chunks if chunk)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def scrape_links(url: str) -> str | list[str]:
|
||||
"""Scrape links from a webpage
|
||||
|
||||
Args:
|
||||
url (str): The URL to scrape links from
|
||||
|
||||
Returns:
|
||||
str | list[str]: The scraped links
|
||||
"""
|
||||
response, error_message = get_response(url)
|
||||
if error_message:
|
||||
return error_message
|
||||
if not response:
|
||||
return "Error: Could not get response"
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
hyperlinks = extract_hyperlinks(soup, url)
|
||||
|
||||
return format_hyperlinks(hyperlinks)
|
||||
|
||||
|
||||
def create_message(chunk, question):
|
||||
"""Create a message for the user to summarize a chunk of text"""
|
||||
return {
|
||||
"role": "user",
|
||||
"content": f'"""{chunk}""" Using the above text, answer the following'
|
||||
f' question: "{question}" -- if the question cannot be answered using the'
|
||||
" text, summarize the text.",
|
||||
}
|
183
pilot/commands/web_selenium.py
Normal file
183
pilot/commands/web_selenium.py
Normal file
@@ -0,0 +1,183 @@
|
||||
"""Selenium web scraping module."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from sys import platform
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import WebDriverException
|
||||
from selenium.webdriver.chrome.options import Options as ChromeOptions
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.firefox.options import Options as FirefoxOptions
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.safari.options import Options as SafariOptions
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
from webdriver_manager.firefox import GeckoDriverManager
|
||||
|
||||
import autogpt.processing.text as summary
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
|
||||
from autogpt.url_utils.validators import validate_url
|
||||
|
||||
FILE_DIR = Path(__file__).parent.parent
|
||||
CFG = Config()
|
||||
|
||||
|
||||
@command(
|
||||
"browse_website",
|
||||
"Browse Website",
|
||||
'"url": "<url>", "question": "<what_you_want_to_find_on_website>"',
|
||||
)
|
||||
@validate_url
|
||||
def browse_website(url: str, question: str) -> str:
|
||||
"""Browse a website and return the answer and links to the user
|
||||
|
||||
Args:
|
||||
url (str): The url of the website to browse
|
||||
question (str): The question asked by the user
|
||||
|
||||
Returns:
|
||||
Tuple[str, WebDriver]: The answer and links to the user and the webdriver
|
||||
"""
|
||||
try:
|
||||
driver, text = scrape_text_with_selenium(url)
|
||||
except WebDriverException as e:
|
||||
# These errors are often quite long and include lots of context.
|
||||
# Just grab the first line.
|
||||
msg = e.msg.split("\n")[0]
|
||||
return f"Error: {msg}"
|
||||
|
||||
add_header(driver)
|
||||
summary_text = summary.summarize_text(url, text, question, driver)
|
||||
links = scrape_links_with_selenium(driver, url)
|
||||
|
||||
# Limit links to 5
|
||||
if len(links) > 5:
|
||||
links = links[:5]
|
||||
close_browser(driver)
|
||||
return f"Answer gathered from website: {summary_text} \n \n Links: {links}"
|
||||
|
||||
|
||||
def scrape_text_with_selenium(url: str) -> tuple[WebDriver, str]:
|
||||
"""Scrape text from a website using selenium
|
||||
|
||||
Args:
|
||||
url (str): The url of the website to scrape
|
||||
|
||||
Returns:
|
||||
Tuple[WebDriver, str]: The webdriver and the text scraped from the website
|
||||
"""
|
||||
logging.getLogger("selenium").setLevel(logging.CRITICAL)
|
||||
|
||||
options_available = {
|
||||
"chrome": ChromeOptions,
|
||||
"safari": SafariOptions,
|
||||
"firefox": FirefoxOptions,
|
||||
}
|
||||
|
||||
options = options_available[CFG.selenium_web_browser]()
|
||||
options.add_argument(
|
||||
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36"
|
||||
)
|
||||
|
||||
if CFG.selenium_web_browser == "firefox":
|
||||
if CFG.selenium_headless:
|
||||
options.headless = True
|
||||
options.add_argument("--disable-gpu")
|
||||
driver = webdriver.Firefox(
|
||||
executable_path=GeckoDriverManager().install(), options=options
|
||||
)
|
||||
elif CFG.selenium_web_browser == "safari":
|
||||
# Requires a bit more setup on the users end
|
||||
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
|
||||
driver = webdriver.Safari(options=options)
|
||||
else:
|
||||
if platform == "linux" or platform == "linux2":
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
options.add_argument("--remote-debugging-port=9222")
|
||||
|
||||
options.add_argument("--no-sandbox")
|
||||
if CFG.selenium_headless:
|
||||
options.add_argument("--headless=new")
|
||||
options.add_argument("--disable-gpu")
|
||||
|
||||
chromium_driver_path = Path("/usr/bin/chromedriver")
|
||||
|
||||
driver = webdriver.Chrome(
|
||||
executable_path=chromium_driver_path
|
||||
if chromium_driver_path.exists()
|
||||
else ChromeDriverManager().install(),
|
||||
options=options,
|
||||
)
|
||||
driver.get(url)
|
||||
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
|
||||
# Get the HTML content directly from the browser's DOM
|
||||
page_source = driver.execute_script("return document.body.outerHTML;")
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||||
text = "\n".join(chunk for chunk in chunks if chunk)
|
||||
return driver, text
|
||||
|
||||
|
||||
def scrape_links_with_selenium(driver: WebDriver, url: str) -> list[str]:
|
||||
"""Scrape links from a website using selenium
|
||||
|
||||
Args:
|
||||
driver (WebDriver): The webdriver to use to scrape the links
|
||||
|
||||
Returns:
|
||||
List[str]: The links scraped from the website
|
||||
"""
|
||||
page_source = driver.page_source
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
hyperlinks = extract_hyperlinks(soup, url)
|
||||
|
||||
return format_hyperlinks(hyperlinks)
|
||||
|
||||
|
||||
def close_browser(driver: WebDriver) -> None:
|
||||
"""Close the browser
|
||||
|
||||
Args:
|
||||
driver (WebDriver): The webdriver to close
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
driver.quit()
|
||||
|
||||
|
||||
def add_header(driver: WebDriver) -> None:
|
||||
"""Add a header to the website
|
||||
|
||||
Args:
|
||||
driver (WebDriver): The webdriver to use to add the header
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
with open(f"{FILE_DIR}/js/overlay.js", "r") as overlay_file:
|
||||
overlay_script = overlay_file.read()
|
||||
driver.execute_script(overlay_script)
|
||||
except Exception as e:
|
||||
print(f"Error executing overlay.js: {e}")
|
@@ -6,8 +6,8 @@ from enum import auto, Enum
|
||||
from typing import List, Any
|
||||
from pilot.configs.model_config import DB_SETTINGS
|
||||
|
||||
class SeparatorStyle(Enum):
|
||||
|
||||
class SeparatorStyle(Enum):
|
||||
SINGLE = auto()
|
||||
TWO = auto()
|
||||
THREE = auto()
|
||||
@@ -52,7 +52,6 @@ class Conversation:
|
||||
else:
|
||||
raise ValueError(f"Invalid style: {self.sep_style}")
|
||||
|
||||
|
||||
def append_message(self, role, message):
|
||||
self.messages.append([role, message])
|
||||
|
||||
@@ -103,6 +102,7 @@ def gen_sqlgen_conversation(dbname):
|
||||
message += s["schema_info"] + ";"
|
||||
return f"数据库{dbname}的Schema信息如下: {message}\n"
|
||||
|
||||
|
||||
conv_one_shot = Conversation(
|
||||
system="A chat between a curious human and an artificial intelligence assistant, who very familiar with database related knowledge. "
|
||||
"The assistant gives helpful, detailed, professional and polite answers to the human's questions. ",
|
||||
@@ -251,8 +251,6 @@ conv_qa_prompt_template = """ 基于以下已知的信息, 专业、详细的回
|
||||
{question}
|
||||
"""
|
||||
|
||||
|
||||
|
||||
default_conversation = conv_one_shot
|
||||
|
||||
conversation_types = {
|
||||
@@ -266,7 +264,6 @@ conv_templates = {
|
||||
"vicuna_v1": conv_vicuna_v1,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
message = gen_sqlgen_conversation("dbgpt")
|
||||
print(message)
|
0
pilot/prompts/__init__.py
Normal file
0
pilot/prompts/__init__.py
Normal file
155
pilot/prompts/generator.py
Normal file
155
pilot/prompts/generator.py
Normal file
@@ -0,0 +1,155 @@
|
||||
""" A module for generating custom prompt strings."""
|
||||
import json
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
|
||||
class PromptGenerator:
|
||||
"""
|
||||
A class for generating custom prompt strings based on constraints, commands,
|
||||
resources, and performance evaluations.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""
|
||||
Initialize the PromptGenerator object with empty lists of constraints,
|
||||
commands, resources, and performance evaluations.
|
||||
"""
|
||||
self.constraints = []
|
||||
self.commands = []
|
||||
self.resources = []
|
||||
self.performance_evaluation = []
|
||||
self.goals = []
|
||||
self.command_registry = None
|
||||
self.name = "Bob"
|
||||
self.role = "AI"
|
||||
self.response_format = {
|
||||
"thoughts": {
|
||||
"text": "thought",
|
||||
"reasoning": "reasoning",
|
||||
"plan": "- short bulleted\n- list that conveys\n- long-term plan",
|
||||
"criticism": "constructive self-criticism",
|
||||
"speak": "thoughts summary to say to user",
|
||||
},
|
||||
"command": {"name": "command name", "args": {"arg name": "value"}},
|
||||
}
|
||||
|
||||
def add_constraint(self, constraint: str) -> None:
|
||||
"""
|
||||
Add a constraint to the constraints list.
|
||||
|
||||
Args:
|
||||
constraint (str): The constraint to be added.
|
||||
"""
|
||||
self.constraints.append(constraint)
|
||||
|
||||
def add_command(
|
||||
self,
|
||||
command_label: str,
|
||||
command_name: str,
|
||||
args=None,
|
||||
function: Optional[Callable] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Add a command to the commands list with a label, name, and optional arguments.
|
||||
|
||||
Args:
|
||||
command_label (str): The label of the command.
|
||||
command_name (str): The name of the command.
|
||||
args (dict, optional): A dictionary containing argument names and their
|
||||
values. Defaults to None.
|
||||
function (callable, optional): A callable function to be called when
|
||||
the command is executed. Defaults to None.
|
||||
"""
|
||||
if args is None:
|
||||
args = {}
|
||||
|
||||
command_args = {arg_key: arg_value for arg_key, arg_value in args.items()}
|
||||
|
||||
command = {
|
||||
"label": command_label,
|
||||
"name": command_name,
|
||||
"args": command_args,
|
||||
"function": function,
|
||||
}
|
||||
|
||||
self.commands.append(command)
|
||||
|
||||
def _generate_command_string(self, command: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Generate a formatted string representation of a command.
|
||||
|
||||
Args:
|
||||
command (dict): A dictionary containing command information.
|
||||
|
||||
Returns:
|
||||
str: The formatted command string.
|
||||
"""
|
||||
args_string = ", ".join(
|
||||
f'"{key}": "{value}"' for key, value in command["args"].items()
|
||||
)
|
||||
return f'{command["label"]}: "{command["name"]}", args: {args_string}'
|
||||
|
||||
def add_resource(self, resource: str) -> None:
|
||||
"""
|
||||
Add a resource to the resources list.
|
||||
|
||||
Args:
|
||||
resource (str): The resource to be added.
|
||||
"""
|
||||
self.resources.append(resource)
|
||||
|
||||
def add_performance_evaluation(self, evaluation: str) -> None:
|
||||
"""
|
||||
Add a performance evaluation item to the performance_evaluation list.
|
||||
|
||||
Args:
|
||||
evaluation (str): The evaluation item to be added.
|
||||
"""
|
||||
self.performance_evaluation.append(evaluation)
|
||||
|
||||
def _generate_numbered_list(self, items: List[Any], item_type="list") -> str:
|
||||
"""
|
||||
Generate a numbered list from given items based on the item_type.
|
||||
|
||||
Args:
|
||||
items (list): A list of items to be numbered.
|
||||
item_type (str, optional): The type of items in the list.
|
||||
Defaults to 'list'.
|
||||
|
||||
Returns:
|
||||
str: The formatted numbered list.
|
||||
"""
|
||||
if item_type == "command":
|
||||
command_strings = []
|
||||
if self.command_registry:
|
||||
command_strings += [
|
||||
str(item)
|
||||
for item in self.command_registry.commands.values()
|
||||
if item.enabled
|
||||
]
|
||||
# terminate command is added manually
|
||||
command_strings += [self._generate_command_string(item) for item in items]
|
||||
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(command_strings))
|
||||
else:
|
||||
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
|
||||
|
||||
def generate_prompt_string(self) -> str:
|
||||
"""
|
||||
Generate a prompt string based on the constraints, commands, resources,
|
||||
and performance evaluations.
|
||||
|
||||
Returns:
|
||||
str: The generated prompt string.
|
||||
"""
|
||||
formatted_response_format = json.dumps(self.response_format, indent=4)
|
||||
return (
|
||||
f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\n"
|
||||
"Commands:\n"
|
||||
f"{self._generate_numbered_list(self.commands, item_type='command')}\n\n"
|
||||
f"Resources:\n{self._generate_numbered_list(self.resources)}\n\n"
|
||||
"Performance Evaluation:\n"
|
||||
f"{self._generate_numbered_list(self.performance_evaluation)}\n\n"
|
||||
"You should only respond in JSON format as described below \nResponse"
|
||||
f" Format: \n{formatted_response_format} \nEnsure the response can be"
|
||||
" parsed by Python json.loads"
|
||||
)
|
65
pilot/prompts/prompt.py
Normal file
65
pilot/prompts/prompt.py
Normal file
@@ -0,0 +1,65 @@
|
||||
|
||||
from pilot.configs.config import Config
|
||||
from pilot.prompts.generator import PromptGenerator
|
||||
|
||||
|
||||
CFG = Config()
|
||||
|
||||
DEFAULT_TRIGGERING_PROMPT = (
|
||||
"Determine which next command to use, and respond using the format specified above:"
|
||||
)
|
||||
|
||||
|
||||
def build_default_prompt_generator() -> PromptGenerator:
|
||||
"""
|
||||
This function generates a prompt string that includes various constraints,
|
||||
commands, resources, and performance evaluations.
|
||||
|
||||
Returns:
|
||||
str: The generated prompt string.
|
||||
"""
|
||||
|
||||
# Initialize the PromptGenerator object
|
||||
prompt_generator = PromptGenerator()
|
||||
|
||||
# Add constraints to the PromptGenerator object
|
||||
prompt_generator.add_constraint(
|
||||
"~4000 word limit for short term memory. Your short term memory is short, so"
|
||||
" immediately save important information to files."
|
||||
)
|
||||
prompt_generator.add_constraint(
|
||||
"If you are unsure how you previously did something or want to recall past"
|
||||
" events, thinking about similar events will help you remember."
|
||||
)
|
||||
prompt_generator.add_constraint("No user assistance")
|
||||
prompt_generator.add_constraint(
|
||||
'Exclusively use the commands listed in double quotes e.g. "command name"'
|
||||
)
|
||||
|
||||
# Add resources to the PromptGenerator object
|
||||
prompt_generator.add_resource(
|
||||
"Internet access for searches and information gathering."
|
||||
)
|
||||
prompt_generator.add_resource("Long Term memory management.")
|
||||
prompt_generator.add_resource(
|
||||
"GPT-3.5 powered Agents for delegation of simple tasks."
|
||||
)
|
||||
prompt_generator.add_resource("File output.")
|
||||
|
||||
# Add performance evaluations to the PromptGenerator object
|
||||
prompt_generator.add_performance_evaluation(
|
||||
"Continuously review and analyze your actions to ensure you are performing to"
|
||||
" the best of your abilities."
|
||||
)
|
||||
prompt_generator.add_performance_evaluation(
|
||||
"Constructively self-criticize your big-picture behavior constantly."
|
||||
)
|
||||
prompt_generator.add_performance_evaluation(
|
||||
"Reflect on past decisions and strategies to refine your approach."
|
||||
)
|
||||
prompt_generator.add_performance_evaluation(
|
||||
"Every command has a cost, so be smart and efficient. Aim to complete tasks in"
|
||||
" the least number of steps."
|
||||
)
|
||||
prompt_generator.add_performance_evaluation("Write all code to a file.")
|
||||
return prompt_generator
|
@@ -17,6 +17,10 @@ from pilot.vector_store.extract_tovec import get_vector_storelist, load_knownled
|
||||
|
||||
from pilot.configs.model_config import LOGDIR, VICUNA_MODEL_SERVER, LLM_MODEL, DATASETS_DIR
|
||||
|
||||
from pilot.plugins import scan_plugins
|
||||
from pilot.configs.config import Config
|
||||
from pilot.commands.command_mange import CommandRegistry
|
||||
|
||||
from pilot.conversation import (
|
||||
default_conversation,
|
||||
conv_templates,
|
||||
@@ -441,6 +445,29 @@ if __name__ == "__main__":
|
||||
logger.info(f"args: {args}")
|
||||
|
||||
dbs = get_database_list()
|
||||
|
||||
# 加载插件
|
||||
cfg = Config()
|
||||
cfg.set_plugins(scan_plugins(cfg, cfg.debug_mode))
|
||||
|
||||
# 加载插件可执行命令
|
||||
command_registry = CommandRegistry()
|
||||
command_categories = [
|
||||
"autogpt.commands.audio_text",
|
||||
"autogpt.commands.file_operations",
|
||||
"autogpt.commands.image_gen",
|
||||
"autogpt.commands.web_selenium",
|
||||
"autogpt.commands.write_tests",
|
||||
]
|
||||
# 排除禁用命令
|
||||
command_categories = [
|
||||
x for x in command_categories if x not in cfg.disabled_command_categories
|
||||
]
|
||||
for command_category in command_categories:
|
||||
command_registry.import_commands(command_category)
|
||||
|
||||
|
||||
|
||||
logger.info(args)
|
||||
demo = build_webdemo()
|
||||
demo.queue(
|
||||
|
Reference in New Issue
Block a user