mirror of
				https://github.com/hwchase17/langchain.git
				synced 2025-11-04 02:03:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			905 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			905 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Util that calls GitHub."""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import json
 | 
						|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
 | 
						|
 | 
						|
import requests
 | 
						|
from langchain_core.utils import get_from_dict_or_env
 | 
						|
from pydantic import BaseModel, ConfigDict, model_validator
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from github.Issue import Issue
 | 
						|
    from github.PullRequest import PullRequest
 | 
						|
 | 
						|
 | 
						|
def _import_tiktoken() -> Any:
 | 
						|
    """Import tiktoken."""
 | 
						|
    try:
 | 
						|
        import tiktoken
 | 
						|
    except ImportError:
 | 
						|
        raise ImportError(
 | 
						|
            "tiktoken is not installed. Please install it with `pip install tiktoken`"
 | 
						|
        )
 | 
						|
    return tiktoken
 | 
						|
 | 
						|
 | 
						|
class GitHubAPIWrapper(BaseModel):
 | 
						|
    """Wrapper for GitHub API."""
 | 
						|
 | 
						|
    github: Any = None  #: :meta private:
 | 
						|
    github_repo_instance: Any = None  #: :meta private:
 | 
						|
    github_repository: Optional[str] = None
 | 
						|
    github_app_id: Optional[str] = None
 | 
						|
    github_app_private_key: Optional[str] = None
 | 
						|
    active_branch: Optional[str] = None
 | 
						|
    github_base_branch: Optional[str] = None
 | 
						|
 | 
						|
    model_config = ConfigDict(
 | 
						|
        extra="forbid",
 | 
						|
    )
 | 
						|
 | 
						|
    @model_validator(mode="before")
 | 
						|
    @classmethod
 | 
						|
    def validate_environment(cls, values: Dict) -> Any:
 | 
						|
        """Validate that api key and python package exists in environment."""
 | 
						|
        github_repository = get_from_dict_or_env(
 | 
						|
            values, "github_repository", "GITHUB_REPOSITORY"
 | 
						|
        )
 | 
						|
 | 
						|
        github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID")
 | 
						|
 | 
						|
        github_app_private_key = get_from_dict_or_env(
 | 
						|
            values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY"
 | 
						|
        )
 | 
						|
 | 
						|
        try:
 | 
						|
            from github import Auth, GithubIntegration
 | 
						|
 | 
						|
        except ImportError:
 | 
						|
            raise ImportError(
 | 
						|
                "PyGithub is not installed. "
 | 
						|
                "Please install it with `pip install PyGithub`"
 | 
						|
            )
 | 
						|
 | 
						|
        try:
 | 
						|
            # interpret the key as a file path
 | 
						|
            # fallback to interpreting as the key itself
 | 
						|
            with open(github_app_private_key, "r") as f:
 | 
						|
                private_key = f.read()
 | 
						|
        except Exception:
 | 
						|
            private_key = github_app_private_key
 | 
						|
 | 
						|
        auth = Auth.AppAuth(
 | 
						|
            github_app_id,
 | 
						|
            private_key,
 | 
						|
        )
 | 
						|
        gi = GithubIntegration(auth=auth)
 | 
						|
        installation = gi.get_installations()
 | 
						|
        if not installation:
 | 
						|
            raise ValueError(
 | 
						|
                f"Please make sure to install the created github app with id "
 | 
						|
                f"{github_app_id} on the repo: {github_repository}"
 | 
						|
                "More instructions can be found at "
 | 
						|
                "https://docs.github.com/en/apps/using-"
 | 
						|
                "github-apps/installing-your-own-github-app"
 | 
						|
            )
 | 
						|
        try:
 | 
						|
            installation = installation[0]
 | 
						|
        except ValueError as e:
 | 
						|
            raise ValueError(
 | 
						|
                f"Please make sure to give correct github parameters Error message: {e}"
 | 
						|
            )
 | 
						|
        # create a GitHub instance:
 | 
						|
        g = installation.get_github_for_installation()
 | 
						|
        repo = g.get_repo(github_repository)
 | 
						|
 | 
						|
        github_base_branch = get_from_dict_or_env(
 | 
						|
            values,
 | 
						|
            "github_base_branch",
 | 
						|
            "GITHUB_BASE_BRANCH",
 | 
						|
            default=repo.default_branch,
 | 
						|
        )
 | 
						|
 | 
						|
        active_branch = get_from_dict_or_env(
 | 
						|
            values,
 | 
						|
            "active_branch",
 | 
						|
            "ACTIVE_BRANCH",
 | 
						|
            default=repo.default_branch,
 | 
						|
        )
 | 
						|
 | 
						|
        values["github"] = g
 | 
						|
        values["github_repo_instance"] = repo
 | 
						|
        values["github_repository"] = github_repository
 | 
						|
        values["github_app_id"] = github_app_id
 | 
						|
        values["github_app_private_key"] = github_app_private_key
 | 
						|
        values["active_branch"] = active_branch
 | 
						|
        values["github_base_branch"] = github_base_branch
 | 
						|
 | 
						|
        return values
 | 
						|
 | 
						|
    def parse_issues(self, issues: List[Issue]) -> List[dict]:
 | 
						|
        """
 | 
						|
        Extracts title and number from each Issue and puts them in a dictionary
 | 
						|
        Parameters:
 | 
						|
            issues(List[Issue]): A list of Github Issue objects
 | 
						|
        Returns:
 | 
						|
            List[dict]: A dictionary of issue titles and numbers
 | 
						|
        """
 | 
						|
        parsed = []
 | 
						|
        for issue in issues:
 | 
						|
            title = issue.title
 | 
						|
            number = issue.number
 | 
						|
            opened_by = issue.user.login if issue.user else None
 | 
						|
            issue_dict = {"title": title, "number": number}
 | 
						|
            if opened_by is not None:
 | 
						|
                issue_dict["opened_by"] = opened_by
 | 
						|
            parsed.append(issue_dict)
 | 
						|
        return parsed
 | 
						|
 | 
						|
    def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]:
 | 
						|
        """
 | 
						|
        Extracts title and number from each Issue and puts them in a dictionary
 | 
						|
        Parameters:
 | 
						|
            issues(List[Issue]): A list of Github Issue objects
 | 
						|
        Returns:
 | 
						|
            List[dict]: A dictionary of issue titles and numbers
 | 
						|
        """
 | 
						|
        parsed = []
 | 
						|
        for pr in pull_requests:
 | 
						|
            parsed.append(
 | 
						|
                {
 | 
						|
                    "title": pr.title,
 | 
						|
                    "number": pr.number,
 | 
						|
                    "commits": str(pr.commits),
 | 
						|
                    "comments": str(pr.comments),
 | 
						|
                }
 | 
						|
            )
 | 
						|
        return parsed
 | 
						|
 | 
						|
    def get_issues(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches all open issues from the repo excluding pull requests
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext report containing the number of issues
 | 
						|
            and each issue's title and number.
 | 
						|
        """
 | 
						|
        issues = self.github_repo_instance.get_issues(state="open")
 | 
						|
        # Filter out pull requests (part of GH issues object)
 | 
						|
        issues = [issue for issue in issues if not issue.pull_request]
 | 
						|
        if issues:
 | 
						|
            parsed_issues = self.parse_issues(issues)
 | 
						|
            parsed_issues_str = (
 | 
						|
                "Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
 | 
						|
            )
 | 
						|
            return parsed_issues_str
 | 
						|
        else:
 | 
						|
            return "No open issues available"
 | 
						|
 | 
						|
    def list_open_pull_requests(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches all open PRs from the repo
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext report containing the number of PRs
 | 
						|
            and each PR's title and number.
 | 
						|
        """
 | 
						|
        # issues = self.github_repo_instance.get_issues(state="open")
 | 
						|
        pull_requests = self.github_repo_instance.get_pulls(state="open")
 | 
						|
        if pull_requests.totalCount > 0:
 | 
						|
            parsed_prs = self.parse_pull_requests(pull_requests)
 | 
						|
            parsed_prs_str = (
 | 
						|
                "Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs)
 | 
						|
            )
 | 
						|
            return parsed_prs_str
 | 
						|
        else:
 | 
						|
            return "No open pull requests available"
 | 
						|
 | 
						|
    def list_files_in_main_branch(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches all files in the main branch of the repo.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext report containing the paths and names of the files.
 | 
						|
        """
 | 
						|
        files: List[str] = []
 | 
						|
        try:
 | 
						|
            contents = self.github_repo_instance.get_contents(
 | 
						|
                "", ref=self.github_base_branch
 | 
						|
            )
 | 
						|
            for content in contents:
 | 
						|
                if content.type == "dir":
 | 
						|
                    files.extend(self._list_files(content.path))
 | 
						|
                else:
 | 
						|
                    files.append(content.path)
 | 
						|
 | 
						|
            if files:
 | 
						|
                files_str = "\n".join(files)
 | 
						|
                return f"Found {len(files)} files in the main branch:\n{files_str}"
 | 
						|
            else:
 | 
						|
                return "No files found in the main branch"
 | 
						|
        except Exception as e:
 | 
						|
            return str(e)
 | 
						|
 | 
						|
    def set_active_branch(self, branch_name: str) -> str:
 | 
						|
        """Equivalent to `git checkout branch_name` for this Agent.
 | 
						|
        Clones formatting from Github.
 | 
						|
 | 
						|
        Returns an Error (as a string) if branch doesn't exist.
 | 
						|
        """
 | 
						|
        curr_branches = [
 | 
						|
            branch.name for branch in self.github_repo_instance.get_branches()
 | 
						|
        ]
 | 
						|
        if branch_name in curr_branches:
 | 
						|
            self.active_branch = branch_name
 | 
						|
            return f"Switched to branch `{branch_name}`"
 | 
						|
        else:
 | 
						|
            return (
 | 
						|
                f"Error {branch_name} does not exist,"
 | 
						|
                f"in repo with current branches: {str(curr_branches)}"
 | 
						|
            )
 | 
						|
 | 
						|
    def list_branches_in_repo(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches a list of all branches in the repository.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext report containing the names of the branches.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            branches = [
 | 
						|
                branch.name for branch in self.github_repo_instance.get_branches()
 | 
						|
            ]
 | 
						|
            if branches:
 | 
						|
                branches_str = "\n".join(branches)
 | 
						|
                return (
 | 
						|
                    f"Found {len(branches)} branches in the repository:\n{branches_str}"
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                return "No branches found in the repository"
 | 
						|
        except Exception as e:
 | 
						|
            return str(e)
 | 
						|
 | 
						|
    def create_branch(self, proposed_branch_name: str) -> str:
 | 
						|
        """
 | 
						|
        Create a new branch, and set it as the active bot branch.
 | 
						|
        Equivalent to `git switch -c proposed_branch_name`
 | 
						|
        If the proposed branch already exists, we append _v1 then _v2...
 | 
						|
        until a unique name is found.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext success message.
 | 
						|
        """
 | 
						|
        from github import GithubException
 | 
						|
 | 
						|
        i = 0
 | 
						|
        new_branch_name = proposed_branch_name
 | 
						|
        base_branch = self.github_repo_instance.get_branch(
 | 
						|
            self.github_repo_instance.default_branch
 | 
						|
        )
 | 
						|
        for i in range(1000):
 | 
						|
            try:
 | 
						|
                self.github_repo_instance.create_git_ref(
 | 
						|
                    ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha
 | 
						|
                )
 | 
						|
                self.active_branch = new_branch_name
 | 
						|
                return (
 | 
						|
                    f"Branch '{new_branch_name}' "
 | 
						|
                    "created successfully, and set as current active branch."
 | 
						|
                )
 | 
						|
            except GithubException as e:
 | 
						|
                if e.status == 422 and "Reference already exists" in e.data["message"]:
 | 
						|
                    i += 1
 | 
						|
                    new_branch_name = f"{proposed_branch_name}_v{i}"
 | 
						|
                else:
 | 
						|
                    # Handle any other exceptions
 | 
						|
                    print(f"Failed to create branch. Error: {e}")  # noqa: T201
 | 
						|
                    raise Exception(
 | 
						|
                        "Unable to create branch name from proposed_branch_name: "
 | 
						|
                        f"{proposed_branch_name}"
 | 
						|
                    )
 | 
						|
        return (
 | 
						|
            "Unable to create branch. "
 | 
						|
            "At least 1000 branches exist with named derived from "
 | 
						|
            f"proposed_branch_name: `{proposed_branch_name}`"
 | 
						|
        )
 | 
						|
 | 
						|
    def list_files_in_bot_branch(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches all files in the active branch of the repo,
 | 
						|
        the branch the bot uses to make changes.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A plaintext list containing the filepaths in the branch.
 | 
						|
        """
 | 
						|
        files: List[str] = []
 | 
						|
        try:
 | 
						|
            contents = self.github_repo_instance.get_contents(
 | 
						|
                "", ref=self.active_branch
 | 
						|
            )
 | 
						|
            for content in contents:
 | 
						|
                if content.type == "dir":
 | 
						|
                    files.extend(self._list_files(content.path))
 | 
						|
                else:
 | 
						|
                    files.append(content.path)
 | 
						|
 | 
						|
            if files:
 | 
						|
                files_str = "\n".join(files)
 | 
						|
                return (
 | 
						|
                    f"Found {len(files)} files in branch `{self.active_branch}`:\n"
 | 
						|
                    f"{files_str}"
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                return f"No files found in branch: `{self.active_branch}`"
 | 
						|
        except Exception as e:
 | 
						|
            return f"Error: {e}"
 | 
						|
 | 
						|
    def get_files_from_directory(self, directory_path: str) -> str:
 | 
						|
        """
 | 
						|
        Recursively fetches files from a directory in the repo.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            directory_path (str): Path to the directory
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: List of file paths, or an error message.
 | 
						|
        """
 | 
						|
        from github import GithubException
 | 
						|
 | 
						|
        try:
 | 
						|
            return str(self._list_files(directory_path))
 | 
						|
        except GithubException as e:
 | 
						|
            return f"Error: status code {e.status}, {e.message}"
 | 
						|
 | 
						|
    def _list_files(self, directory_path: str) -> List[str]:
 | 
						|
        files: List[str] = []
 | 
						|
 | 
						|
        contents = self.github_repo_instance.get_contents(
 | 
						|
            directory_path, ref=self.active_branch
 | 
						|
        )
 | 
						|
 | 
						|
        for content in contents:
 | 
						|
            if content.type == "dir":
 | 
						|
                files.extend(self._list_files(content.path))
 | 
						|
            else:
 | 
						|
                files.append(content.path)
 | 
						|
        return files
 | 
						|
 | 
						|
    def get_issue(self, issue_number: int) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Fetches a specific issue and its first 10 comments
 | 
						|
        Parameters:
 | 
						|
            issue_number(int): The number for the github issue
 | 
						|
        Returns:
 | 
						|
            dict: A dictionary containing the issue's title,
 | 
						|
            body, comments as a string, and the username of the user
 | 
						|
            who opened the issue
 | 
						|
        """
 | 
						|
        issue = self.github_repo_instance.get_issue(number=issue_number)
 | 
						|
        page = 0
 | 
						|
        comments: List[dict] = []
 | 
						|
        while len(comments) <= 10:
 | 
						|
            comments_page = issue.get_comments().get_page(page)
 | 
						|
            if len(comments_page) == 0:
 | 
						|
                break
 | 
						|
            for comment in comments_page:
 | 
						|
                comments.append({"body": comment.body, "user": comment.user.login})
 | 
						|
            page += 1
 | 
						|
 | 
						|
        opened_by = None
 | 
						|
        if issue.user and issue.user.login:
 | 
						|
            opened_by = issue.user.login
 | 
						|
 | 
						|
        return {
 | 
						|
            "number": issue_number,
 | 
						|
            "title": issue.title,
 | 
						|
            "body": issue.body,
 | 
						|
            "comments": str(comments),
 | 
						|
            "opened_by": str(opened_by),
 | 
						|
        }
 | 
						|
 | 
						|
    def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]:
 | 
						|
        """Fetches the full text of all files in a PR. Truncates after first 3k tokens.
 | 
						|
        # TODO: Enhancement to summarize files with ctags if they're getting long.
 | 
						|
 | 
						|
        Args:
 | 
						|
            pr_number(int): The number of the pull request on Github
 | 
						|
 | 
						|
        Returns:
 | 
						|
            dict: A dictionary containing the issue's title,
 | 
						|
            body, and comments as a string
 | 
						|
        """
 | 
						|
        tiktoken = _import_tiktoken()
 | 
						|
        MAX_TOKENS_FOR_FILES = 3_000
 | 
						|
        pr_files = []
 | 
						|
        pr = self.github_repo_instance.get_pull(number=int(pr_number))
 | 
						|
        total_tokens = 0
 | 
						|
        page = 0
 | 
						|
        while True:  # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES:
 | 
						|
            files_page = pr.get_files().get_page(page)
 | 
						|
            if len(files_page) == 0:
 | 
						|
                break
 | 
						|
            for file in files_page:
 | 
						|
                try:
 | 
						|
                    file_metadata_response = requests.get(file.contents_url)
 | 
						|
                    if file_metadata_response.status_code == 200:
 | 
						|
                        download_url = json.loads(file_metadata_response.text)[
 | 
						|
                            "download_url"
 | 
						|
                        ]
 | 
						|
                    else:
 | 
						|
                        print(f"Failed to download file: {file.contents_url}, skipping")  # noqa: T201
 | 
						|
                        continue
 | 
						|
 | 
						|
                    file_content_response = requests.get(download_url)
 | 
						|
                    if file_content_response.status_code == 200:
 | 
						|
                        # Save the content as a UTF-8 string
 | 
						|
                        file_content = file_content_response.text
 | 
						|
                    else:
 | 
						|
                        print(  # noqa: T201
 | 
						|
                            "Failed downloading file content "
 | 
						|
                            f"(Error {file_content_response.status_code}). Skipping"
 | 
						|
                        )
 | 
						|
                        continue
 | 
						|
 | 
						|
                    file_tokens = len(
 | 
						|
                        tiktoken.get_encoding("cl100k_base").encode(
 | 
						|
                            file_content + file.filename + "file_name file_contents"
 | 
						|
                        )
 | 
						|
                    )
 | 
						|
                    if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES:
 | 
						|
                        pr_files.append(
 | 
						|
                            {
 | 
						|
                                "filename": file.filename,
 | 
						|
                                "contents": file_content,
 | 
						|
                                "additions": file.additions,
 | 
						|
                                "deletions": file.deletions,
 | 
						|
                            }
 | 
						|
                        )
 | 
						|
                        total_tokens += file_tokens
 | 
						|
                except Exception as e:
 | 
						|
                    print(f"Error when reading files from a PR on github. {e}")  # noqa: T201
 | 
						|
            page += 1
 | 
						|
        return pr_files
 | 
						|
 | 
						|
    def get_pull_request(self, pr_number: int) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Fetches a specific pull request and its first 10 comments,
 | 
						|
        limited by max_tokens.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            pr_number(int): The number for the Github pull
 | 
						|
            max_tokens(int): The maximum number of tokens in the response
 | 
						|
        Returns:
 | 
						|
            dict: A dictionary containing the pull's title, body,
 | 
						|
            and comments as a string
 | 
						|
        """
 | 
						|
        max_tokens = 2_000
 | 
						|
        pull = self.github_repo_instance.get_pull(number=pr_number)
 | 
						|
        total_tokens = 0
 | 
						|
 | 
						|
        def get_tokens(text: str) -> int:
 | 
						|
            tiktoken = _import_tiktoken()
 | 
						|
            return len(tiktoken.get_encoding("cl100k_base").encode(text))
 | 
						|
 | 
						|
        def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None:
 | 
						|
            nonlocal total_tokens  # Declare total_tokens as nonlocal
 | 
						|
            tokens = get_tokens(value)
 | 
						|
            if total_tokens + tokens <= max_tokens:
 | 
						|
                data_dict[key] = value
 | 
						|
                total_tokens += tokens  # Now this will modify the outer variable
 | 
						|
 | 
						|
        response_dict: Dict[str, str] = {}
 | 
						|
        add_to_dict(response_dict, "title", pull.title)
 | 
						|
        add_to_dict(response_dict, "number", str(pr_number))
 | 
						|
        add_to_dict(response_dict, "body", pull.body if pull.body else "")
 | 
						|
 | 
						|
        comments: List[str] = []
 | 
						|
        page = 0
 | 
						|
        while len(comments) <= 10:
 | 
						|
            comments_page = pull.get_issue_comments().get_page(page)
 | 
						|
            if len(comments_page) == 0:
 | 
						|
                break
 | 
						|
            for comment in comments_page:
 | 
						|
                comment_str = str({"body": comment.body, "user": comment.user.login})
 | 
						|
                if total_tokens + get_tokens(comment_str) > max_tokens:
 | 
						|
                    break
 | 
						|
                comments.append(comment_str)
 | 
						|
                total_tokens += get_tokens(comment_str)
 | 
						|
            page += 1
 | 
						|
        add_to_dict(response_dict, "comments", str(comments))
 | 
						|
 | 
						|
        commits: List[str] = []
 | 
						|
        page = 0
 | 
						|
        while len(commits) <= 10:
 | 
						|
            commits_page = pull.get_commits().get_page(page)
 | 
						|
            if len(commits_page) == 0:
 | 
						|
                break
 | 
						|
            for commit in commits_page:
 | 
						|
                commit_str = str({"message": commit.commit.message})
 | 
						|
                if total_tokens + get_tokens(commit_str) > max_tokens:
 | 
						|
                    break
 | 
						|
                commits.append(commit_str)
 | 
						|
                total_tokens += get_tokens(commit_str)
 | 
						|
            page += 1
 | 
						|
        add_to_dict(response_dict, "commits", str(commits))
 | 
						|
        return response_dict
 | 
						|
 | 
						|
    def create_pull_request(self, pr_query: str) -> str:
 | 
						|
        """
 | 
						|
        Makes a pull request from the bot's branch to the base branch
 | 
						|
        Parameters:
 | 
						|
            pr_query(str): a string which contains the PR title
 | 
						|
            and the PR body. The title is the first line
 | 
						|
            in the string, and the body are the rest of the string.
 | 
						|
            For example, "Updated README\nmade changes to add info"
 | 
						|
        Returns:
 | 
						|
            str: A success or failure message
 | 
						|
        """
 | 
						|
        if self.github_base_branch == self.active_branch:
 | 
						|
            return """Cannot make a pull request because 
 | 
						|
            commits are already in the main or master branch."""
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                title = pr_query.split("\n")[0]
 | 
						|
                body = pr_query[len(title) + 2 :]
 | 
						|
                pr = self.github_repo_instance.create_pull(
 | 
						|
                    title=title,
 | 
						|
                    body=body,
 | 
						|
                    head=self.active_branch,
 | 
						|
                    base=self.github_base_branch,
 | 
						|
                )
 | 
						|
                return f"Successfully created PR number {pr.number}"
 | 
						|
            except Exception as e:
 | 
						|
                return "Unable to make pull request due to error:\n" + str(e)
 | 
						|
 | 
						|
    def comment_on_issue(self, comment_query: str) -> str:
 | 
						|
        """
 | 
						|
        Adds a comment to a github issue
 | 
						|
        Parameters:
 | 
						|
            comment_query(str): a string which contains the issue number,
 | 
						|
            two newlines, and the comment.
 | 
						|
            for example: "1\n\nWorking on it now"
 | 
						|
            adds the comment "working on it now" to issue 1
 | 
						|
        Returns:
 | 
						|
            str: A success or failure message
 | 
						|
        """
 | 
						|
        issue_number = int(comment_query.split("\n\n")[0])
 | 
						|
        comment = comment_query[len(str(issue_number)) + 2 :]
 | 
						|
        try:
 | 
						|
            issue = self.github_repo_instance.get_issue(number=issue_number)
 | 
						|
            issue.create_comment(comment)
 | 
						|
            return "Commented on issue " + str(issue_number)
 | 
						|
        except Exception as e:
 | 
						|
            return "Unable to make comment due to error:\n" + str(e)
 | 
						|
 | 
						|
    def create_file(self, file_query: str) -> str:
 | 
						|
        """
 | 
						|
        Creates a new file on the Github repo
 | 
						|
        Parameters:
 | 
						|
            file_query(str): a string which contains the file path
 | 
						|
            and the file contents. The file path is the first line
 | 
						|
            in the string, and the contents are the rest of the string.
 | 
						|
            For example, "hello_world.md\n# Hello World!"
 | 
						|
        Returns:
 | 
						|
            str: A success or failure message
 | 
						|
        """
 | 
						|
        if self.active_branch == self.github_base_branch:
 | 
						|
            return (
 | 
						|
                "You're attempting to commit to the directly to the"
 | 
						|
                f"{self.github_base_branch} branch, which is protected. "
 | 
						|
                "Please create a new branch and try again."
 | 
						|
            )
 | 
						|
 | 
						|
        file_path = file_query.split("\n")[0]
 | 
						|
        file_contents = file_query[len(file_path) + 2 :]
 | 
						|
 | 
						|
        try:
 | 
						|
            try:
 | 
						|
                file = self.github_repo_instance.get_contents(
 | 
						|
                    file_path, ref=self.active_branch
 | 
						|
                )
 | 
						|
                if file:
 | 
						|
                    return (
 | 
						|
                        f"File already exists at `{file_path}` "
 | 
						|
                        f"on branch `{self.active_branch}`. You must use "
 | 
						|
                        "`update_file` to modify it."
 | 
						|
                    )
 | 
						|
            except Exception:
 | 
						|
                # expected behavior, file shouldn't exist yet
 | 
						|
                pass
 | 
						|
 | 
						|
            self.github_repo_instance.create_file(
 | 
						|
                path=file_path,
 | 
						|
                message="Create " + file_path,
 | 
						|
                content=file_contents,
 | 
						|
                branch=self.active_branch,
 | 
						|
            )
 | 
						|
            return "Created file " + file_path
 | 
						|
        except Exception as e:
 | 
						|
            return "Unable to make file due to error:\n" + str(e)
 | 
						|
 | 
						|
    def read_file(self, file_path: str) -> str:
 | 
						|
        """
 | 
						|
        Read a file from this agent's branch, defined by self.active_branch,
 | 
						|
        which supports PR branches.
 | 
						|
        Parameters:
 | 
						|
            file_path(str): the file path
 | 
						|
        Returns:
 | 
						|
            str: The file decoded as a string, or an error message if not found
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            file = self.github_repo_instance.get_contents(
 | 
						|
                file_path, ref=self.active_branch
 | 
						|
            )
 | 
						|
            return file.decoded_content.decode("utf-8")
 | 
						|
        except Exception as e:
 | 
						|
            return (
 | 
						|
                f"File not found `{file_path}` on branch"
 | 
						|
                f"`{self.active_branch}`. Error: {str(e)}"
 | 
						|
            )
 | 
						|
 | 
						|
    def update_file(self, file_query: str) -> str:
 | 
						|
        """
 | 
						|
        Updates a file with new content.
 | 
						|
        Parameters:
 | 
						|
            file_query(str): Contains the file path and the file contents.
 | 
						|
                The old file contents is wrapped in OLD <<<< and >>>> OLD
 | 
						|
                The new file contents is wrapped in NEW <<<< and >>>> NEW
 | 
						|
                For example:
 | 
						|
                /test/hello.txt
 | 
						|
                OLD <<<<
 | 
						|
                Hello Earth!
 | 
						|
                >>>> OLD
 | 
						|
                NEW <<<<
 | 
						|
                Hello Mars!
 | 
						|
                >>>> NEW
 | 
						|
        Returns:
 | 
						|
            A success or failure message
 | 
						|
        """
 | 
						|
        if self.active_branch == self.github_base_branch:
 | 
						|
            return (
 | 
						|
                "You're attempting to commit to the directly"
 | 
						|
                f"to the {self.github_base_branch} branch, which is protected. "
 | 
						|
                "Please create a new branch and try again."
 | 
						|
            )
 | 
						|
        try:
 | 
						|
            file_path: str = file_query.split("\n")[0]
 | 
						|
            old_file_contents = (
 | 
						|
                file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
 | 
						|
            )
 | 
						|
            new_file_contents = (
 | 
						|
                file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
 | 
						|
            )
 | 
						|
 | 
						|
            file_content = self.read_file(file_path)
 | 
						|
            updated_file_content = file_content.replace(
 | 
						|
                old_file_contents, new_file_contents
 | 
						|
            )
 | 
						|
 | 
						|
            if file_content == updated_file_content:
 | 
						|
                return (
 | 
						|
                    "File content was not updated because old content was not found."
 | 
						|
                    "It may be helpful to use the read_file action to get "
 | 
						|
                    "the current file contents."
 | 
						|
                )
 | 
						|
 | 
						|
            self.github_repo_instance.update_file(
 | 
						|
                path=file_path,
 | 
						|
                message="Update " + str(file_path),
 | 
						|
                content=updated_file_content,
 | 
						|
                branch=self.active_branch,
 | 
						|
                sha=self.github_repo_instance.get_contents(
 | 
						|
                    file_path, ref=self.active_branch
 | 
						|
                ).sha,
 | 
						|
            )
 | 
						|
            return "Updated file " + str(file_path)
 | 
						|
        except Exception as e:
 | 
						|
            return "Unable to update file due to error:\n" + str(e)
 | 
						|
 | 
						|
    def delete_file(self, file_path: str) -> str:
 | 
						|
        """
 | 
						|
        Deletes a file from the repo
 | 
						|
        Parameters:
 | 
						|
            file_path(str): Where the file is
 | 
						|
        Returns:
 | 
						|
            str: Success or failure message
 | 
						|
        """
 | 
						|
        if self.active_branch == self.github_base_branch:
 | 
						|
            return (
 | 
						|
                "You're attempting to commit to the directly"
 | 
						|
                f"to the {self.github_base_branch} branch, which is protected. "
 | 
						|
                "Please create a new branch and try again."
 | 
						|
            )
 | 
						|
        try:
 | 
						|
            self.github_repo_instance.delete_file(
 | 
						|
                path=file_path,
 | 
						|
                message="Delete " + file_path,
 | 
						|
                branch=self.active_branch,
 | 
						|
                sha=self.github_repo_instance.get_contents(
 | 
						|
                    file_path, ref=self.active_branch
 | 
						|
                ).sha,
 | 
						|
            )
 | 
						|
            return "Deleted file " + file_path
 | 
						|
        except Exception as e:
 | 
						|
            return "Unable to delete file due to error:\n" + str(e)
 | 
						|
 | 
						|
    def search_issues_and_prs(self, query: str) -> str:
 | 
						|
        """
 | 
						|
        Searches issues and pull requests in the repository.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            query(str): The search query
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A string containing the first 5 issues and pull requests
 | 
						|
        """
 | 
						|
        search_result = self.github.search_issues(query, repo=self.github_repository)
 | 
						|
        max_items = min(5, search_result.totalCount)
 | 
						|
        results = [f"Top {max_items} results:"]
 | 
						|
        for issue in search_result[:max_items]:
 | 
						|
            results.append(
 | 
						|
                f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}"
 | 
						|
            )
 | 
						|
        return "\n".join(results)
 | 
						|
 | 
						|
    def search_code(self, query: str) -> str:
 | 
						|
        """
 | 
						|
        Searches code in the repository.
 | 
						|
        # Todo: limit total tokens returned...
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            query(str): The search query
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A string containing, at most, the top 5 search results
 | 
						|
        """
 | 
						|
        search_result = self.github.search_code(
 | 
						|
            query=query, repo=self.github_repository
 | 
						|
        )
 | 
						|
        if search_result.totalCount == 0:
 | 
						|
            return "0 results found."
 | 
						|
        max_results = min(5, search_result.totalCount)
 | 
						|
        results = [f"Showing top {max_results} of {search_result.totalCount} results:"]
 | 
						|
        count = 0
 | 
						|
        for code in search_result:
 | 
						|
            if count >= max_results:
 | 
						|
                break
 | 
						|
            # Get the file content using the PyGithub get_contents method
 | 
						|
            file_content = self.github_repo_instance.get_contents(
 | 
						|
                code.path, ref=self.active_branch
 | 
						|
            ).decoded_content.decode()
 | 
						|
            results.append(
 | 
						|
                f"Filepath: `{code.path}`\nFile contents: {file_content}\n<END OF FILE>"
 | 
						|
            )
 | 
						|
            count += 1
 | 
						|
        return "\n".join(results)
 | 
						|
 | 
						|
    def create_review_request(self, reviewer_username: str) -> str:
 | 
						|
        """
 | 
						|
        Creates a review request on *THE* open pull request
 | 
						|
        that matches the current active_branch.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            reviewer_username(str): The username of the person who is being requested
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: A message confirming the creation of the review request
 | 
						|
        """
 | 
						|
        pull_requests = self.github_repo_instance.get_pulls(
 | 
						|
            state="open", sort="created"
 | 
						|
        )
 | 
						|
        # find PR against active_branch
 | 
						|
        pr = next(
 | 
						|
            (pr for pr in pull_requests if pr.head.ref == self.active_branch), None
 | 
						|
        )
 | 
						|
        if pr is None:
 | 
						|
            return (
 | 
						|
                "No open pull request found for the "
 | 
						|
                f"current branch `{self.active_branch}`"
 | 
						|
            )
 | 
						|
 | 
						|
        try:
 | 
						|
            pr.create_review_request(reviewers=[reviewer_username])
 | 
						|
            return (
 | 
						|
                f"Review request created for user {reviewer_username} "
 | 
						|
                f"on PR #{pr.number}"
 | 
						|
            )
 | 
						|
        except Exception as e:
 | 
						|
            return f"Failed to create a review request with error {e}"
 | 
						|
 | 
						|
    def get_latest_release(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches the latest release of the repository.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: The latest release
 | 
						|
        """
 | 
						|
        release = self.github_repo_instance.get_latest_release()
 | 
						|
        return (
 | 
						|
            f"Latest title: {release.title} "
 | 
						|
            f"tag: {release.tag_name} "
 | 
						|
            f"body: {release.body}"
 | 
						|
        )
 | 
						|
 | 
						|
    def get_releases(self) -> str:
 | 
						|
        """
 | 
						|
        Fetches all releases of the repository.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: The releases
 | 
						|
        """
 | 
						|
        releases = self.github_repo_instance.get_releases()
 | 
						|
        max_results = min(5, releases.totalCount)
 | 
						|
        results = [f"Top {max_results} results:"]
 | 
						|
        for release in releases[:max_results]:
 | 
						|
            results.append(
 | 
						|
                f"Title: {release.title}, Tag: {release.tag_name}, Body: {release.body}"
 | 
						|
            )
 | 
						|
 | 
						|
        return "\n".join(results)
 | 
						|
 | 
						|
    def get_release(self, tag_name: str) -> str:
 | 
						|
        """
 | 
						|
        Fetches a specific release of the repository.
 | 
						|
 | 
						|
        Parameters:
 | 
						|
            tag_name(str): The tag name of the release
 | 
						|
 | 
						|
        Returns:
 | 
						|
            str: The release
 | 
						|
        """
 | 
						|
        release = self.github_repo_instance.get_release(tag_name)
 | 
						|
        return f"Release: {release.title} tag: {release.tag_name} body: {release.body}"
 | 
						|
 | 
						|
    def run(self, mode: str, query: str) -> str:
 | 
						|
        if mode == "get_issue":
 | 
						|
            return json.dumps(self.get_issue(int(query)))
 | 
						|
        elif mode == "get_pull_request":
 | 
						|
            return json.dumps(self.get_pull_request(int(query)))
 | 
						|
        elif mode == "list_pull_request_files":
 | 
						|
            return json.dumps(self.list_pull_request_files(int(query)))
 | 
						|
        elif mode == "get_issues":
 | 
						|
            return self.get_issues()
 | 
						|
        elif mode == "comment_on_issue":
 | 
						|
            return self.comment_on_issue(query)
 | 
						|
        elif mode == "create_file":
 | 
						|
            return self.create_file(query)
 | 
						|
        elif mode == "create_pull_request":
 | 
						|
            return self.create_pull_request(query)
 | 
						|
        elif mode == "read_file":
 | 
						|
            return self.read_file(query)
 | 
						|
        elif mode == "update_file":
 | 
						|
            return self.update_file(query)
 | 
						|
        elif mode == "delete_file":
 | 
						|
            return self.delete_file(query)
 | 
						|
        elif mode == "list_open_pull_requests":
 | 
						|
            return self.list_open_pull_requests()
 | 
						|
        elif mode == "list_files_in_main_branch":
 | 
						|
            return self.list_files_in_main_branch()
 | 
						|
        elif mode == "list_files_in_bot_branch":
 | 
						|
            return self.list_files_in_bot_branch()
 | 
						|
        elif mode == "list_branches_in_repo":
 | 
						|
            return self.list_branches_in_repo()
 | 
						|
        elif mode == "set_active_branch":
 | 
						|
            return self.set_active_branch(query)
 | 
						|
        elif mode == "create_branch":
 | 
						|
            return self.create_branch(query)
 | 
						|
        elif mode == "get_files_from_directory":
 | 
						|
            return self.get_files_from_directory(query)
 | 
						|
        elif mode == "search_issues_and_prs":
 | 
						|
            return self.search_issues_and_prs(query)
 | 
						|
        elif mode == "search_code":
 | 
						|
            return self.search_code(query)
 | 
						|
        elif mode == "create_review_request":
 | 
						|
            return self.create_review_request(query)
 | 
						|
        elif mode == "get_latest_release":
 | 
						|
            return self.get_latest_release()
 | 
						|
        elif mode == "get_releases":
 | 
						|
            return self.get_releases()
 | 
						|
        elif mode == "get_release":
 | 
						|
            return self.get_release(query)
 | 
						|
        else:
 | 
						|
            raise ValueError("Invalid mode" + mode)
 |