mirror of
				https://github.com/hwchase17/langchain.git
				synced 2025-10-23 02:15:42 +00:00 
			
		
		
		
	The fix #14221 has broken default gitlab url which is forcing the users to specify GITLAB_URL for default one. With this fix if GITLAB_URL is not set, the default gitlab url will be taken. - **Description:** Add the GITHUB URL instead of None - **Issue:** the issue #14221 has broken the default github URL - **Dependencies:** None - **Tag maintainer:** @hwchase17 - **Twitter handle:** manjunath_shiva
		
			
				
	
	
		
			328 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			328 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Util that calls gitlab."""
 | |
| from __future__ import annotations
 | |
| 
 | |
| import json
 | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional
 | |
| 
 | |
| from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
 | |
| from langchain_core.utils import get_from_dict_or_env
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from gitlab.v4.objects import Issue
 | |
| 
 | |
| 
 | |
| class GitLabAPIWrapper(BaseModel):
 | |
|     """Wrapper for GitLab API."""
 | |
| 
 | |
|     gitlab: Any  #: :meta private:
 | |
|     gitlab_repo_instance: Any  #: :meta private:
 | |
|     gitlab_repository: Optional[str] = None
 | |
|     """The name of the GitLab repository, in the form {username}/{repo-name}."""
 | |
|     gitlab_personal_access_token: Optional[str] = None
 | |
|     """Personal access token for the GitLab service, used for authentication."""
 | |
|     gitlab_branch: Optional[str] = None
 | |
|     """The specific branch in the GitLab repository where the bot will make 
 | |
|         its commits. Defaults to 'main'.
 | |
|     """
 | |
|     gitlab_base_branch: Optional[str] = None
 | |
|     """The base branch in the GitLab repository, used for comparisons. 
 | |
|         Usually 'main' or 'master'. Defaults to 'main'.
 | |
|     """
 | |
| 
 | |
|     class Config:
 | |
|         """Configuration for this pydantic object."""
 | |
| 
 | |
|         extra = Extra.forbid
 | |
| 
 | |
|     @root_validator()
 | |
|     def validate_environment(cls, values: Dict) -> Dict:
 | |
|         """Validate that api key and python package exists in environment."""
 | |
| 
 | |
|         gitlab_url = get_from_dict_or_env(
 | |
|             values, "gitlab_url", "GITLAB_URL", default="https://gitlab.com"
 | |
|         )
 | |
|         gitlab_repository = get_from_dict_or_env(
 | |
|             values, "gitlab_repository", "GITLAB_REPOSITORY"
 | |
|         )
 | |
| 
 | |
|         gitlab_personal_access_token = get_from_dict_or_env(
 | |
|             values, "gitlab_personal_access_token", "GITLAB_PERSONAL_ACCESS_TOKEN"
 | |
|         )
 | |
| 
 | |
|         gitlab_branch = get_from_dict_or_env(
 | |
|             values, "gitlab_branch", "GITLAB_BRANCH", default="main"
 | |
|         )
 | |
|         gitlab_base_branch = get_from_dict_or_env(
 | |
|             values, "gitlab_base_branch", "GITLAB_BASE_BRANCH", default="main"
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             import gitlab
 | |
| 
 | |
|         except ImportError:
 | |
|             raise ImportError(
 | |
|                 "python-gitlab is not installed. "
 | |
|                 "Please install it with `pip install python-gitlab`"
 | |
|             )
 | |
| 
 | |
|         g = gitlab.Gitlab(
 | |
|             url=gitlab_url,
 | |
|             private_token=gitlab_personal_access_token,
 | |
|             keep_base_url=True,
 | |
|         )
 | |
| 
 | |
|         g.auth()
 | |
| 
 | |
|         values["gitlab"] = g
 | |
|         values["gitlab_repo_instance"] = g.projects.get(gitlab_repository)
 | |
|         values["gitlab_repository"] = gitlab_repository
 | |
|         values["gitlab_personal_access_token"] = gitlab_personal_access_token
 | |
|         values["gitlab_branch"] = gitlab_branch
 | |
|         values["gitlab_base_branch"] = gitlab_base_branch
 | |
| 
 | |
|         return values
 | |
| 
 | |
|     def parse_issues(self, issues: List[Issue]) -> List[dict]:
 | |
|         """
 | |
|         Extracts title and number from each Issue and puts them in a dictionary
 | |
|         Parameters:
 | |
|             issues(List[Issue]): A list of gitlab Issue objects
 | |
|         Returns:
 | |
|             List[dict]: A dictionary of issue titles and numbers
 | |
|         """
 | |
|         parsed = []
 | |
|         for issue in issues:
 | |
|             title = issue.title
 | |
|             number = issue.iid
 | |
|             parsed.append({"title": title, "number": number})
 | |
|         return parsed
 | |
| 
 | |
|     def get_issues(self) -> str:
 | |
|         """
 | |
|         Fetches all open issues from the repo
 | |
| 
 | |
|         Returns:
 | |
|             str: A plaintext report containing the number of issues
 | |
|             and each issue's title and number.
 | |
|         """
 | |
|         issues = self.gitlab_repo_instance.issues.list(state="opened")
 | |
|         if len(issues) > 0:
 | |
|             parsed_issues = self.parse_issues(issues)
 | |
|             parsed_issues_str = (
 | |
|                 "Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
 | |
|             )
 | |
|             return parsed_issues_str
 | |
|         else:
 | |
|             return "No open issues available"
 | |
| 
 | |
|     def get_issue(self, issue_number: int) -> Dict[str, Any]:
 | |
|         """
 | |
|         Fetches a specific issue and its first 10 comments
 | |
|         Parameters:
 | |
|             issue_number(int): The number for the gitlab issue
 | |
|         Returns:
 | |
|             dict: A dictionary containing the issue's title,
 | |
|             body, and comments as a string
 | |
|         """
 | |
|         issue = self.gitlab_repo_instance.issues.get(issue_number)
 | |
|         page = 0
 | |
|         comments: List[dict] = []
 | |
|         while len(comments) <= 10:
 | |
|             comments_page = issue.notes.list(page=page)
 | |
|             if len(comments_page) == 0:
 | |
|                 break
 | |
|             for comment in comments_page:
 | |
|                 comment = issue.notes.get(comment.id)
 | |
|                 comments.append(
 | |
|                     {"body": comment.body, "user": comment.author["username"]}
 | |
|                 )
 | |
|             page += 1
 | |
| 
 | |
|         return {
 | |
|             "title": issue.title,
 | |
|             "body": issue.description,
 | |
|             "comments": str(comments),
 | |
|         }
 | |
| 
 | |
|     def create_pull_request(self, pr_query: str) -> str:
 | |
|         """
 | |
|         Makes a pull request from the bot's branch to the base branch
 | |
|         Parameters:
 | |
|             pr_query(str): a string which contains the PR title
 | |
|             and the PR body. The title is the first line
 | |
|             in the string, and the body are the rest of the string.
 | |
|             For example, "Updated README\nmade changes to add info"
 | |
|         Returns:
 | |
|             str: A success or failure message
 | |
|         """
 | |
|         if self.gitlab_base_branch == self.gitlab_branch:
 | |
|             return """Cannot make a pull request because 
 | |
|             commits are already in the master branch"""
 | |
|         else:
 | |
|             try:
 | |
|                 title = pr_query.split("\n")[0]
 | |
|                 body = pr_query[len(title) + 2 :]
 | |
|                 pr = self.gitlab_repo_instance.mergerequests.create(
 | |
|                     {
 | |
|                         "source_branch": self.gitlab_branch,
 | |
|                         "target_branch": self.gitlab_base_branch,
 | |
|                         "title": title,
 | |
|                         "description": body,
 | |
|                         "labels": ["created-by-agent"],
 | |
|                     }
 | |
|                 )
 | |
|                 return f"Successfully created PR number {pr.iid}"
 | |
|             except Exception as e:
 | |
|                 return "Unable to make pull request due to error:\n" + str(e)
 | |
| 
 | |
|     def comment_on_issue(self, comment_query: str) -> str:
 | |
|         """
 | |
|         Adds a comment to a gitlab issue
 | |
|         Parameters:
 | |
|             comment_query(str): a string which contains the issue number,
 | |
|             two newlines, and the comment.
 | |
|             for example: "1\n\nWorking on it now"
 | |
|             adds the comment "working on it now" to issue 1
 | |
|         Returns:
 | |
|             str: A success or failure message
 | |
|         """
 | |
|         issue_number = int(comment_query.split("\n\n")[0])
 | |
|         comment = comment_query[len(str(issue_number)) + 2 :]
 | |
|         try:
 | |
|             issue = self.gitlab_repo_instance.issues.get(issue_number)
 | |
|             issue.notes.create({"body": comment})
 | |
|             return "Commented on issue " + str(issue_number)
 | |
|         except Exception as e:
 | |
|             return "Unable to make comment due to error:\n" + str(e)
 | |
| 
 | |
|     def create_file(self, file_query: str) -> str:
 | |
|         """
 | |
|         Creates a new file on the gitlab repo
 | |
|         Parameters:
 | |
|             file_query(str): a string which contains the file path
 | |
|             and the file contents. The file path is the first line
 | |
|             in the string, and the contents are the rest of the string.
 | |
|             For example, "hello_world.md\n# Hello World!"
 | |
|         Returns:
 | |
|             str: A success or failure message
 | |
|         """
 | |
|         file_path = file_query.split("\n")[0]
 | |
|         file_contents = file_query[len(file_path) + 2 :]
 | |
|         try:
 | |
|             self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch)
 | |
|             return f"File already exists at {file_path}. Use update_file instead"
 | |
|         except Exception:
 | |
|             data = {
 | |
|                 "branch": self.gitlab_branch,
 | |
|                 "commit_message": "Create " + file_path,
 | |
|                 "file_path": file_path,
 | |
|                 "content": file_contents,
 | |
|             }
 | |
| 
 | |
|             self.gitlab_repo_instance.files.create(data)
 | |
| 
 | |
|             return "Created file " + file_path
 | |
| 
 | |
|     def read_file(self, file_path: str) -> str:
 | |
|         """
 | |
|         Reads a file from the gitlab repo
 | |
|         Parameters:
 | |
|             file_path(str): the file path
 | |
|         Returns:
 | |
|             str: The file decoded as a string
 | |
|         """
 | |
|         file = self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch)
 | |
|         return file.decode().decode("utf-8")
 | |
| 
 | |
|     def update_file(self, file_query: str) -> str:
 | |
|         """
 | |
|         Updates a file with new content.
 | |
|         Parameters:
 | |
|             file_query(str): Contains the file path and the file contents.
 | |
|                 The old file contents is wrapped in OLD <<<< and >>>> OLD
 | |
|                 The new file contents is wrapped in NEW <<<< and >>>> NEW
 | |
|                 For example:
 | |
|                 test/hello.txt
 | |
|                 OLD <<<<
 | |
|                 Hello Earth!
 | |
|                 >>>> OLD
 | |
|                 NEW <<<<
 | |
|                 Hello Mars!
 | |
|                 >>>> NEW
 | |
|         Returns:
 | |
|             A success or failure message
 | |
|         """
 | |
|         try:
 | |
|             file_path = file_query.split("\n")[0]
 | |
|             old_file_contents = (
 | |
|                 file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
 | |
|             )
 | |
|             new_file_contents = (
 | |
|                 file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
 | |
|             )
 | |
| 
 | |
|             file_content = self.read_file(file_path)
 | |
|             updated_file_content = file_content.replace(
 | |
|                 old_file_contents, new_file_contents
 | |
|             )
 | |
| 
 | |
|             if file_content == updated_file_content:
 | |
|                 return (
 | |
|                     "File content was not updated because old content was not found."
 | |
|                     "It may be helpful to use the read_file action to get "
 | |
|                     "the current file contents."
 | |
|                 )
 | |
| 
 | |
|             commit = {
 | |
|                 "branch": self.gitlab_branch,
 | |
|                 "commit_message": "Create " + file_path,
 | |
|                 "actions": [
 | |
|                     {
 | |
|                         "action": "update",
 | |
|                         "file_path": file_path,
 | |
|                         "content": updated_file_content,
 | |
|                     }
 | |
|                 ],
 | |
|             }
 | |
| 
 | |
|             self.gitlab_repo_instance.commits.create(commit)
 | |
|             return "Updated file " + file_path
 | |
|         except Exception as e:
 | |
|             return "Unable to update file due to error:\n" + str(e)
 | |
| 
 | |
|     def delete_file(self, file_path: str) -> str:
 | |
|         """
 | |
|         Deletes a file from the repo
 | |
|         Parameters:
 | |
|             file_path(str): Where the file is
 | |
|         Returns:
 | |
|             str: Success or failure message
 | |
|         """
 | |
|         try:
 | |
|             self.gitlab_repo_instance.files.delete(
 | |
|                 file_path, self.gitlab_branch, "Delete " + file_path
 | |
|             )
 | |
|             return "Deleted file " + file_path
 | |
|         except Exception as e:
 | |
|             return "Unable to delete file due to error:\n" + str(e)
 | |
| 
 | |
|     def run(self, mode: str, query: str) -> str:
 | |
|         if mode == "get_issues":
 | |
|             return self.get_issues()
 | |
|         elif mode == "get_issue":
 | |
|             return json.dumps(self.get_issue(int(query)))
 | |
|         elif mode == "comment_on_issue":
 | |
|             return self.comment_on_issue(query)
 | |
|         elif mode == "create_file":
 | |
|             return self.create_file(query)
 | |
|         elif mode == "create_pull_request":
 | |
|             return self.create_pull_request(query)
 | |
|         elif mode == "read_file":
 | |
|             return self.read_file(query)
 | |
|         elif mode == "update_file":
 | |
|             return self.update_file(query)
 | |
|         elif mode == "delete_file":
 | |
|             return self.delete_file(query)
 | |
|         else:
 | |
|             raise ValueError("Invalid mode" + mode)
 |