diff --git a/README.md b/README.md index c8fef2e..8b395b4 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,27 @@ Run `source ~/.zshrc` or restart terminal. ## Usage ⚔ +## Project structure 🧱 + +GitHappens is split into small modules so CLI workflows can be tested and extended without editing a single large script. + +``` +gitHappens.py # Backwards-compatible executable wrapper +main.py # Entry point and argument parsing +config.py # Configuration and template file loading +templates.py # Template selection and lookup +gitlab_api.py # GitLab API and glab command interactions +git_utils.py # Local git helpers +interactive.py # User prompts and interactive selections +commands/ + create_issue.py # Issue, branch, and merge request creation workflow + review.py # Review workflow + deploy.py # Deployment lookup workflow + open_mr.py # Open merge request workflow + report.py # Incident report workflow + summary.py # Commit summary workflows +``` + ### Project selection - Project selection is made automatically if you run script in same path as your project is located. @@ -225,4 +246,3 @@ I suggest checking Gitlab's official API documentation: https://docs.gitlab.com/ ## Donating šŸ’œ Make sure to check this project on [OpenPledge](https://app.openpledge.io/repositories/zigcBenx/gitHappens). - diff --git a/commands/__init__.py b/commands/__init__.py new file mode 100644 index 0000000..f34b7ea --- /dev/null +++ b/commands/__init__.py @@ -0,0 +1 @@ +"""Command workflows for GitHappens.""" diff --git a/commands/create_issue.py b/commands/create_issue.py new file mode 100644 index 0000000..7bcd1f1 --- /dev/null +++ b/commands/create_issue.py @@ -0,0 +1,44 @@ +from gitlab_api import create_branch, create_issue, create_merge_request +from interactive import prompt_estimated_time + + +def start_issue_creation(project_id, title, milestone, epic, iteration, selected_settings, only_issue): + estimated_time = prompt_estimated_time() + + if isinstance(project_id, list): + estimated_time_per_project = int(estimated_time) / len(project_id) if estimated_time else None + else: + estimated_time_per_project = estimated_time + + if estimated_time_per_project: + selected_settings = selected_settings.copy() if selected_settings else {} + selected_settings["estimated_time"] = int(estimated_time_per_project) + + created_issue = create_issue(title, project_id, milestone, epic, iteration, selected_settings) + print(f"Issue #{created_issue['iid']}: {created_issue['title']} created.") + + if only_issue: + return created_issue + + created_branch = create_branch(project_id, created_issue) + created_merge_request = create_merge_request( + project_id, + created_branch, + created_issue, + selected_settings.get("labels"), + milestone, + ) + print(f"Merge request #{created_merge_request['iid']}: {created_merge_request['title']} created.") + print("Run:") + print(" git fetch origin") + print( + f" git checkout -b '{created_merge_request['source_branch']}' " + f"'origin/{created_merge_request['source_branch']}'" + ) + print("to switch to new branch.") + + return created_issue + + +# Backwards-compatible name. +startIssueCreation = start_issue_creation diff --git a/commands/deploy.py b/commands/deploy.py new file mode 100644 index 0000000..f436e6f --- /dev/null +++ b/commands/deploy.py @@ -0,0 +1,113 @@ +import datetime + +import requests + +import gitlab_api +from config import CONFIG, PRODUCTION_MAPPINGS +from git_utils import get_main_branch +from gitlab_api import get_project_id + + +def get_last_production_deploy(): + try: + project_id = get_project_id() + api_url = f"{CONFIG.api_url}/projects/{project_id}/pipelines" + headers = {"Private-Token": CONFIG.gitlab_token} + params = { + "per_page": 50, + "order_by": "updated_at", + "sort": "desc", + } + + if gitlab_api.MAIN_BRANCH: + params["ref"] = gitlab_api.MAIN_BRANCH + else: + try: + params["ref"] = get_main_branch() + except Exception: + params["ref"] = "main" + + response = requests.get(api_url, headers=headers, params=params) + if response.status_code != 200: + print(f"Failed to fetch pipelines: {response.status_code} - {response.text}") + return + + pipelines = response.json() + production_pipeline = None + + for pipeline in pipelines: + pipeline_detail_url = f"{CONFIG.api_url}/projects/{project_id}/pipelines/{pipeline['id']}/jobs" + detail_response = requests.get(pipeline_detail_url, headers=headers) + + if detail_response.status_code == 200: + jobs = detail_response.json() + + for job in jobs: + job_name = job.get("name", "") + stage = job.get("stage", "") + job_status = job.get("status", "").lower() + if job_status != "success": + continue + + project_mapping = PRODUCTION_MAPPINGS.get(str(project_id)) + if project_mapping: + expected_stage = project_mapping.get("stage", "").lower() + expected_job = project_mapping.get("job", "").lower() + + if stage.lower() == expected_stage or ( + expected_job and job_name.lower() == expected_job + ): + production_pipeline = { + "pipeline": pipeline, + "production_job": job, + } + break + else: + print("Didn't find deployment pipeline") + + if production_pipeline: + break + + if not production_pipeline: + print("No production deployment found matching pattern") + return + + pipeline = production_pipeline["pipeline"] + job = production_pipeline["production_job"] + + print("šŸš€ Last Production Deployment:") + print(f" Pipeline: #{pipeline['id']} - {pipeline['status']}") + print(f" Job: {job['name']} ({job['status']})") + print(f" Branch/Tag: {pipeline['ref']}") + print(f" Started: {job.get('started_at', 'N/A')}") + print(f" Finished: {job.get('finished_at', 'N/A')}") + print( + f" Duration: {job.get('duration', 'N/A')} seconds" + if job.get("duration") + else " Duration: N/A" + ) + print(f" Commit: {pipeline['sha'][:8]}") + print(f" URL: {pipeline['web_url']}") + + if job.get("finished_at"): + try: + finished_time = datetime.datetime.fromisoformat(job["finished_at"].replace("Z", "+00:00")) + time_diff = datetime.datetime.now(datetime.timezone.utc) - finished_time + + if time_diff.days > 0: + print(f" ā° {time_diff.days} days ago") + elif time_diff.seconds > 3600: + hours = time_diff.seconds // 3600 + print(f" ā° {hours} hours ago") + else: + minutes = time_diff.seconds // 60 + print(f" ā° {minutes} minutes ago") + except Exception: + pass + + except Exception as error: + print(f"Error fetching last production deploy: {str(error)}") + + +# Backwards-compatible name. +get_last_production_deploy = get_last_production_deploy diff --git a/commands/open_mr.py b/commands/open_mr.py new file mode 100644 index 0000000..d5a13c5 --- /dev/null +++ b/commands/open_mr.py @@ -0,0 +1,22 @@ +import subprocess +import webbrowser + +from config import CONFIG +from gitlab_api import get_active_merge_request_id + + +def open_merge_request_in_browser(): + try: + merge_request_id = get_active_merge_request_id() + remote_url = subprocess.check_output( + ["git", "config", "--get", "remote.origin.url"], + text=True, + ).strip() + url = CONFIG.base_url + "/" + remote_url.split(":")[1][:-4] + webbrowser.open(f"{url}/-/merge_requests/{merge_request_id}") + except subprocess.CalledProcessError: + return None + + +# Backwards-compatible name. +openMergeRequestInBrowser = open_merge_request_in_browser diff --git a/commands/report.py b/commands/report.py new file mode 100644 index 0000000..2afe778 --- /dev/null +++ b/commands/report.py @@ -0,0 +1,55 @@ +import subprocess + +from config import CONFIG +from gitlab_api import close_opened_issue, create_issue, get_active_iteration +from interactive import select_labels + + +def process_report(text, minutes): + incident_project_id = CONFIG.incident_project_id + if not incident_project_id: + print("Error: incident_project_id not found in config.ini") + print("Please add your incident project ID to configs/config.ini under [DEFAULT] section:") + print("incident_project_id = your_project_id_here") + return + + issue_title = f"Incident Report: {text}" + selected_label = select_labels("Department") + incident_settings = { + "labels": ["incident", "report"], + "onlyIssue": True, + "type": "incident", + } + + if selected_label: + incident_settings["labels"].append(selected_label) + + try: + iteration = get_active_iteration() + created_issue = create_issue(issue_title, incident_project_id, False, False, iteration, incident_settings) + issue_iid = created_issue["iid"] + + close_opened_issue(issue_iid, incident_project_id) + print(f"Incident issue #{issue_iid} created successfully.") + print(f"Title: {issue_title}") + + time_tracking_command = [ + "glab", + "api", + f"/projects/{incident_project_id}/issues/{issue_iid}/add_spent_time", + "-f", + f"duration={minutes}m", + ] + + try: + subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print(f"Added {minutes} minutes to issue time tracking.") + except subprocess.CalledProcessError as error: + print(f"Error adding time tracking: {str(error)}") + + except Exception as error: + print(f"Error creating incident issue: {str(error)}") + + +# Backwards-compatible name. +process_report = process_report diff --git a/commands/review.py b/commands/review.py new file mode 100644 index 0000000..9d80903 --- /dev/null +++ b/commands/review.py @@ -0,0 +1,66 @@ +import subprocess + +from config import CONFIG +from git_utils import get_current_branch +from gitlab_api import ( + add_reviewers_to_merge_request, + get_active_merge_request_id, + get_merge_request_for_branch, + get_project_id, + set_merge_request_to_auto_merge, +) +from interactive import choose_reviewers_manually, prompt_spent_time + + +def get_current_issue_id(): + merge_request = get_merge_request_for_branch(get_current_branch()) + return merge_request["description"].replace('"', "").replace("#", "").split()[1] + + +def track_issue_time(): + try: + project_id = get_project_id() + issue_id = get_current_issue_id() + except Exception as error: + print(f"Error getting issue details: {str(error)}") + return + + spent_time = prompt_spent_time() + time_tracking_command = [ + "glab", + "api", + f"/projects/{project_id}/issues/{issue_id}/notes", + "-f", + f"body=/spend {spent_time}m", + ] + + try: + subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + print(f"Added {spent_time} minutes to issue {issue_id} time tracking.") + except subprocess.CalledProcessError as error: + print(f"Error adding time tracking: {str(error)}") + except Exception as error: + print(f"Error tracking issue time: {str(error)}") + + +def run_review_workflow(select_reviewers=False, auto_merge=False): + track_issue_time() + reviewers = choose_reviewers_manually() if select_reviewers else None + add_reviewers_to_merge_request(reviewers=reviewers) + + try: + from ai_code_review import run_review_for_mr + + project_id = get_project_id() + mr_id = get_active_merge_request_id() + run_review_for_mr(project_id, mr_id, CONFIG.gitlab_token, CONFIG.api_url) + except Exception as error: + print(f"AI review skipped: {error}") + + if auto_merge: + set_merge_request_to_auto_merge() + + +# Backwards-compatible names. +getCurrentIssueId = get_current_issue_id +track_issue_time = track_issue_time diff --git a/commands/summary.py b/commands/summary.py new file mode 100644 index 0000000..2d5f59b --- /dev/null +++ b/commands/summary.py @@ -0,0 +1,40 @@ +from config import CONFIG +from git_utils import get_two_weeks_commits + + +def generate_smart_summary(): + commits = get_two_weeks_commits(return_output=True) + if not commits: + return + + if not CONFIG.openai_api_key: + print("OpenAI API key not set. Skipping AI summary generation.") + return + + try: + import openai + except ImportError: + print("OpenAI package not installed. Please install it using: pip install openai") + return + + openai.api_key = CONFIG.openai_api_key + + try: + response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "system", + "content": "You are a helpful assistant that summarizes git commits. Provide a concise, well-organized summary of the main changes and themes.", + }, + { + "role": "user", + "content": f"Please summarize these git commits in a clear, bulleted format:\n\n{commits}", + }, + ], + ) + + print("\nšŸ“‹ AI-Generated Summary of Recent Changes:\n") + print(response.choices[0].message.content) + except Exception as error: + print(f"Error generating AI summary: {error}") diff --git a/config.py b/config.py new file mode 100644 index 0000000..e14dc38 --- /dev/null +++ b/config.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import configparser +import json +from dataclasses import dataclass +from pathlib import Path + + +ROOT_DIR = Path(__file__).resolve().parent +CONFIG_DIR = ROOT_DIR / "configs" +CONFIG_PATH = CONFIG_DIR / "config.ini" +TEMPLATES_PATH = CONFIG_DIR / "templates.json" + + +@dataclass(frozen=True) +class AppConfig: + base_url: str = "https://gitlab.com" + group_id: str = "" + custom_template: str = "Custom" + gitlab_token: str = "" + delete_branch_after_merge: bool = True + developer_email: str | None = None + squash_commits: bool = True + production_pipeline_name: str = "deploy" + production_job_name: str | None = None + production_ref: str | None = None + openai_api_key: str | None = None + incident_project_id: str | None = None + + @property + def api_url(self): + return f"{self.base_url}/api/v4" + + +def _get(parser, option, fallback=None): + return parser.get("DEFAULT", option, fallback=fallback) + + +def _get_bool(parser, option, fallback): + return _get(parser, option, str(fallback)).lower() == "true" + + +def load_config(path=CONFIG_PATH): + parser = configparser.ConfigParser() + parser.read(path) + return AppConfig( + base_url=_get(parser, "base_url", "https://gitlab.com"), + group_id=_get(parser, "group_id", ""), + custom_template=_get(parser, "custom_template", "Custom"), + gitlab_token=_get(parser, "GITLAB_TOKEN", "").strip("\"'"), + delete_branch_after_merge=_get_bool(parser, "delete_branch_after_merge", True), + developer_email=_get(parser, "developer_email", None), + squash_commits=_get_bool(parser, "squash_commits", True), + production_pipeline_name=_get(parser, "production_pipeline_name", "deploy"), + production_job_name=_get(parser, "production_job_name", None), + production_ref=_get(parser, "production_ref", None), + openai_api_key=_get(parser, "OPENAI_API_KEY", None), + incident_project_id=_get(parser, "incident_project_id", None), + ) + + +def load_template_config(path=TEMPLATES_PATH): + if not Path(path).exists(): + return {"templates": [], "reviewers": [], "productionMappings": {}} + + with open(path, "r") as file: + data = json.load(file) + + return { + "templates": data.get("templates", []), + "reviewers": data.get("reviewers", []), + "productionMappings": data.get("productionMappings", {}), + } + + +CONFIG = load_config() +TEMPLATE_CONFIG = load_template_config() +TEMPLATES = TEMPLATE_CONFIG["templates"] +REVIEWERS = TEMPLATE_CONFIG["reviewers"] +PRODUCTION_MAPPINGS = TEMPLATE_CONFIG["productionMappings"] diff --git a/gitHappens.py b/gitHappens.py index 27d47f3..32bd001 100755 --- a/gitHappens.py +++ b/gitHappens.py @@ -1,845 +1,6 @@ #!/usr/bin/env python3 -import subprocess -import json -import argparse -import configparser -import inquirer -import datetime -import re -import os -import requests -import sys -import webbrowser +from main import main -# Setup config parser and read settings -config = configparser.ConfigParser() -absolute_config_path = os.path.dirname(os.path.abspath(__file__)) -config_path = os.path.join(absolute_config_path, 'configs/config.ini') -config.read(config_path) -BASE_URL = config.get('DEFAULT', 'base_url') -API_URL = BASE_URL + '/api/v4' -GROUP_ID = config.get('DEFAULT', 'group_id') -CUSTOM_TEMPLATE = config.get('DEFAULT', 'custom_template') -GITLAB_TOKEN = config.get('DEFAULT', 'GITLAB_TOKEN').strip('\"\'') -DELETE_BRANCH = config.get('DEFAULT', 'delete_branch_after_merge').lower() == 'true' -DEVELOPER_EMAIL = config.get('DEFAULT', 'developer_email', fallback=None) -SQUASH_COMMITS = config.get('DEFAULT', 'squash_commits').lower() == 'true' -PRODUCTION_PIPELINE_NAME = config.get('DEFAULT', 'production_pipeline_name', fallback='deploy') -PRODUCTION_JOB_NAME = config.get('DEFAULT', 'production_job_name', fallback=None) -PRODUCTION_REF = config.get('DEFAULT', 'production_ref', fallback=None) -MAIN_BRANCH = 'master' - -# Read templates from json config -with open(os.path.join(absolute_config_path,'configs/templates.json'), 'r') as f: - jsonConfig = json.load(f) -TEMPLATES = jsonConfig['templates'] -REVIEWERS = jsonConfig['reviewers'] -PRODUCTION_MAPPINGS = jsonConfig.get('productionMappings', {}) - -def get_project_id(): - project_link = getProjectLinkFromCurrentDir() - if (project_link == -1): - return enterProjectId() - - allProjects = get_all_projects(project_link) - # Find projects id by project ssh link gathered from repo - matching_id = None - for project in allProjects: - if project.get("ssh_url_to_repo") == project_link: - matching_id = project.get("id") - break - return matching_id - -def get_all_projects(project_link): - url = API_URL + "/projects?membership=true&search=" + project_link.split('/')[-1].split('.')[0] - - headers = { - "PRIVATE-TOKEN": GITLAB_TOKEN - } - - response = requests.get(url, headers=headers) - - if response.status_code == 200: - return response.json() - elif response.status_code == 401: - print("Error: Unauthorized (401). Your GitLab token is probably expired, invalid, or missing required permissions.") - print("Please generate a new token and update your configs/config.ini.") - exit(1) - else: - print(f"Request failed with status code {response.status_code}") - return None - -def getProjectLinkFromCurrentDir(): - try: - cmd = 'git remote get-url origin' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if result.returncode == 0: - output = result.stdout.decode('utf-8').strip() - return output - else: - return -1 - except FileNotFoundError: - return -1 - -def enterProjectId(): - while True: - project_id = input('Please enter the ID of your GitLab project: ') - if project_id: - return project_id - exit('Invalid project ID.') - -def list_milestones(current=False): - cmd = f'glab api /groups/{GROUP_ID}/milestones?state=active' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - milestones = json.loads(result.stdout) - if current: - today = datetime.date.today().strftime('%Y-%m-%d') - active_milestones = [] - for milestone in milestones: - start_date = milestone['start_date'] - due_date = milestone['due_date'] - if start_date and due_date and start_date <= today and due_date >= today: - active_milestones.append(milestone) - active_milestones.sort(key=lambda x: x['due_date']) - return active_milestones[0] - return milestones - -def select_template(): - template_names = [t['name'] for t in TEMPLATES] - template_names.append(CUSTOM_TEMPLATE) - questions = [ - inquirer.List('template', - message="Select template:", - choices=template_names, - ), - ] - answer = inquirer.prompt(questions) - return answer['template'] - -def getIssueSettings(template_name): - if template_name == CUSTOM_TEMPLATE: - return {} - return next((t for t in TEMPLATES if t['name'] == template_name), None) - -def createIssue(title, project_id, milestoneId, epic, iteration, settings): - if settings: - issueType = settings.get('type') or 'issue' - return executeIssueCreate(project_id, title, settings.get('labels'), milestoneId, epic, iteration, settings.get('weight'), settings.get('estimated_time'), issueType) - print("No settings in template") - exit(2) - pass - -def executeIssueCreate(project_id, title, labels, milestoneId, epic, iteration, weight, estimated_time, issue_type='issue'): - labels = ",".join(labels) if type(labels) == list else labels - assignee_id = getAuthorizedUser()['id'] - issue_command = [ - "glab", "api", - f"/projects/{str(project_id)}/issues", - "-f", f'title={title}', - "-f", f'assignee_ids={assignee_id}', - "-f", f'issue_type={issue_type}' - ] - if labels: - issue_command.append("-f") - issue_command.append(f'labels={labels}') - - if weight: - issue_command.append("-f") - issue_command.append(f'weight={str(weight)}') - - if milestoneId: - issue_command.append("-f") - issue_command.append(f'milestone_id={str(milestoneId)}') - - if epic: - epicId = epic['id'] - issue_command.append("-f") - issue_command.append(f'epic_id={str(epicId)}') - - # Set the description, including iteration, estimated time, and other info - description = "" - if iteration: - iterationId = iteration['id'] - description += f"/iteration *iteration:{str(iterationId)} " - - if estimated_time: - description += f"\n/estimate {estimated_time}m " - - issue_command.extend(["-f", f'description={description}']) - - issue_output = subprocess.check_output(issue_command) - return json.loads(issue_output.decode()) - -def select_milestone(milestones): - milestones = [t['title'] for t in milestones] - questions = [ - inquirer.List('milestones', - message="Select milestone:", - choices=milestones, - ), - ] - answer = inquirer.prompt(questions) - return answer['milestones'] - -def getSelectedMilestone(milestone, milestones): - return next((t for t in milestones if t['title'] == milestone), None) - -def get_milestone(manual): - if manual: - milestones = list_milestones() - return getSelectedMilestone(select_milestone(milestones), milestones) - milestone = list_milestones(True) # select active for today - return milestone - -def get_iteration(manual): - if manual: - iterations = list_iterations() - return getSelectedIteration(select_iteration(iterations), iterations) - return getActiveIteration() - -def getSelectedIteration(iteration, iterations): - return next((t for t in iterations if t['start_date'] + ' - ' + t['due_date'] == iteration), None) - -def select_iteration(iterations): - iterations = [t['start_date'] + ' - ' + t['due_date'] for t in iterations] - questions = [ - inquirer.List('iterations', - message="Select iteration:", - choices=iterations, - ), - ] - answer = inquirer.prompt(questions) - return answer['iterations'] - -def list_iterations(): - cmd = f'glab api /groups/{GROUP_ID}/iterations?state=opened' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - iterations = json.loads(result.stdout) - return iterations - -def getActiveIteration(): - iterations = list_iterations() - today = datetime.date.today().strftime('%Y-%m-%d') - active_iterations = [] - for iteration in iterations: - start_date = iteration['start_date'] - due_date = iteration['due_date'] - if start_date and due_date and start_date <= today and due_date >= today: - active_iterations.append(iteration) - active_iterations.sort(key=lambda x: x['due_date']) - return active_iterations[0] - -def getAuthorizedUser(): - output = subprocess.check_output(["glab", "api", "/user"]) - return json.loads(output) - -def list_epics(): - cmd = f'glab api /groups/{GROUP_ID}/epics?per_page=1000&state=opened' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - return json.loads(result.stdout) - -def select_epic(epics): - epics = [t['title'] for t in epics] - search_query = inquirer.prompt([ - inquirer.Text('search_query', message='Search epic:'), - ])['search_query'] - - # Filter choices based on search query - filtered_epics = [c for c in epics if search_query.lower() in c.lower()] - questions = [ - inquirer.List('epics', - message="Select epic:", - choices=filtered_epics, - ), - ] - answer = inquirer.prompt(questions) - return answer['epics'] - -def getSelectedEpic(epic, epics): - return next((t for t in epics if t['title'] == epic), None) - -def get_epic(): - epics = list_epics() - return getSelectedEpic(select_epic(epics), epics) - -def create_branch(project_id, issue): - issueId = str(issue['iid']) - title = re.sub('\\s+', '-', issue['title']).lower() - title = issueId + '-' + title.replace(':','').replace('(',' ').replace(')', '').replace(' ','-') - branch_output = subprocess.check_output(["glab", "api", f"/projects/{str(project_id)}/repository/branches", "-f", f'branch={title}', "-f", f'ref={MAIN_BRANCH}', "-f", f'issue_iid={issueId}']) - return json.loads(branch_output.decode()) - -def create_merge_request(project_id, branch, issue, labels, milestoneId): - issueId = str(issue['iid']) - branch = branch['name'] - title = issue['title'] - assignee_id = getAuthorizedUser()['id'] - labels = ",".join(labels) if type(labels) == list else labels - merge_request_command = [ - "glab", "api", - f"/projects/{str(project_id)}/merge_requests", - "-f", f'title={title}', - "-f", f'description="Closes #{issueId}"', - "-f", f'source_branch={branch}', - "-f", f'target_branch={MAIN_BRANCH}', - "-f", f'issue_iid={issueId}', - "-f", f'assignee_ids={assignee_id}' - ] - - if SQUASH_COMMITS: - merge_request_command.append("-f") - merge_request_command.append("squash=true") - - if DELETE_BRANCH: - merge_request_command.append("-f") - merge_request_command.append("remove_source_branch=true") - - if labels: - merge_request_command.append("-f") - merge_request_command.append(f'labels={labels}') - - if milestoneId: - merge_request_command.append("-f") - merge_request_command.append(f'milestone_id={str(milestoneId)}') - - mr_output = subprocess.check_output(merge_request_command) - return json.loads(mr_output.decode()) - -def startIssueCreation(project_id, title, milestone, epic, iteration, selectedSettings, onlyIssue): - # Prompt for estimated time - estimated_time = inquirer.prompt([ - inquirer.Text('estimated_time', - message='Estimated time to complete this issue (in minutes, optional)', - validate=lambda _, x: x == '' or x.isdigit()) - ])['estimated_time'] - - # If multiple project IDs, split the estimated time - if isinstance(project_id, list): - estimated_time_per_project = int(estimated_time) / len(project_id) if estimated_time else None - else: - estimated_time_per_project = estimated_time - - # Modify settings to include estimated time - if estimated_time_per_project: - selectedSettings = selectedSettings.copy() if selectedSettings else {} - selectedSettings['estimated_time'] = int(estimated_time_per_project) - - createdIssue = createIssue(title, project_id, milestone, epic, iteration, selectedSettings) - print(f"Issue #{createdIssue['iid']}: {createdIssue['title']} created.") - - if onlyIssue: - return createdIssue - - createdBranch = create_branch(project_id, createdIssue) - - createdMergeRequest = create_merge_request(project_id, createdBranch, createdIssue, selectedSettings.get('labels'), milestone) - print(f"Merge request #{createdMergeRequest['iid']}: {createdMergeRequest['title']} created.") - - print("Run:") - print(" git fetch origin") - print(f" git checkout -b '{createdMergeRequest['source_branch']}' 'origin/{createdMergeRequest['source_branch']}'") - print("to switch to new branch.") - - return createdIssue - -def getCurrentBranch(): - return subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], text=True).strip() - -def openMergeRequestInBrowser(): - try: - merge_request_id = getActiveMergeRequestId() - remote_url = subprocess.check_output(["git", "config", "--get", "remote.origin.url"], text=True).strip() - url = BASE_URL + '/' + remote_url.split(':')[1][:-4] - webbrowser.open(f"{url}/-/merge_requests/{merge_request_id}") - except subprocess.CalledProcessError: - return None - -def getActiveMergeRequestId(): - branch_to_find = getCurrentBranch() - return find_merge_request_id_by_branch(branch_to_find) - -def find_merge_request_id_by_branch(branch_name): - return getMergeRequestForBranch(branch_name)['iid'] - -def getMergeRequestForBranch(branchName): - project_id = get_project_id() - api_url = f"{API_URL}/projects/{project_id}/merge_requests" - headers = {"Private-Token": GITLAB_TOKEN} - - params = { - "source_branch": branchName, - } - - response = requests.get(api_url, headers=headers, params=params) - if response.status_code == 200: - merge_requests = response.json() - for mr in merge_requests: - if mr["source_branch"] == branchName: - return mr - else: - print(f"Failed to fetch Merge Requests: {response.status_code} - {response.text}") - return None - -def chooseReviewersManually(): - """Prompt the user to select reviewers manually from the available list, showing names.""" - # Fetch user details for each reviewer ID - reviewer_choices = [] - for reviewer_id in REVIEWERS: - api_url = f"{API_URL}/users/{reviewer_id}" - headers = {"Private-Token": GITLAB_TOKEN} - try: - response = requests.get(api_url, headers=headers) - if response.status_code == 200: - user = response.json() - display_name = f"{user.get('name')} ({user.get('username')})" - reviewer_choices.append((display_name, reviewer_id)) - else: - reviewer_choices.append((str(reviewer_id), reviewer_id)) - except Exception: - reviewer_choices.append((str(reviewer_id), reviewer_id)) - - questions = [ - inquirer.Checkbox( - "selected_reviewers", - message="Select reviewers", - choices=[(name, str(rid)) for name, rid in reviewer_choices], - ) - ] - answers = inquirer.prompt(questions) - if answers and "selected_reviewers" in answers: - return [int(r) for r in answers["selected_reviewers"]] - else: - return [] - -def addReviewersToMergeRequest(reviewers=None): - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - api_url = f"{API_URL}/projects/{project_id}/merge_requests/{mr_id}" - headers = {"Private-Token": GITLAB_TOKEN} - - data = { - "reviewer_ids": reviewers if reviewers is not None else REVIEWERS - } - - requests.put(api_url, headers=headers, json=data) - -def setMergeRequestToAutoMerge(): - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - api_url = f"{API_URL}/projects/{project_id}/merge_requests/{mr_id}/merge" - headers = {"Private-Token": GITLAB_TOKEN} - - data = { - "id": project_id, - "merge_request_iid": mr_id, - "should_remove_source_branch": True, - "merge_when_pipeline_succeeds": True, - "auto_merge_strategy": "merge_when_pipeline_succeeds", - } - - requests.put(api_url, headers=headers, json=data) - -def getMainBranch(): - command = "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'" - output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) - return output.strip() - - -def get_two_weeks_commits(return_output=False): - two_weeks_ago = (datetime.datetime.now() - datetime.timedelta(weeks=2)).strftime('%Y-%m-%d') - - cmd = f'git log --since={two_weeks_ago} --format="%ad - %ae - %s" --date=short | grep -v "Merge branch"' - if (DEVELOPER_EMAIL): - cmd = f'{cmd} | grep {DEVELOPER_EMAIL}' - try: - output = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.DEVNULL, universal_newlines=True).strip() - if output: - if return_output: - return output - print(output) - else: - print("No commits found.") - return "" if return_output else None - except subprocess.CalledProcessError as e: - print(f"No commits were found or an error occurred. (exit status {e.returncode})") - return "" if return_output else None - except FileNotFoundError: - print("Git is not installed or not found in PATH.") - return "" if return_output else None - -def generate_smart_summary(): - commits = get_two_weeks_commits(return_output=True) - if not commits: - return - - # Check if OpenAI API key is set - openai_api_key = config.get('DEFAULT', 'OPENAI_API_KEY', fallback=None) - if not openai_api_key: - print("OpenAI API key not set. Skipping AI summary generation.") - return - - # Dynamically import openai only if API key is present - try: - import openai - except ImportError: - print("OpenAI package not installed. Please install it using: pip install openai") - return - - openai.api_key = openai_api_key - - try: - response = openai.chat.completions.create( - model="gpt-3.5-turbo", - messages=[ - {"role": "system", "content": "You are a helpful assistant that summarizes git commits. Provide a concise, well-organized summary of the main changes and themes."}, - {"role": "user", "content": f"Please summarize these git commits in a clear, bulleted format:\n\n{commits}"} - ] - ) - - print("\nšŸ“‹ AI-Generated Summary of Recent Changes:\n") - print(response.choices[0].message.content) - except Exception as e: - print(f"Error generating AI summary: {e}") - -def process_report(text, minutes): - # Get the incident project ID from config - try: - incident_project_id = config.get('DEFAULT', 'incident_project_id') - except (configparser.NoOptionError, configparser.NoSectionError): - print("Error: incident_project_id not found in config.ini") - print("Please add your incident project ID to configs/config.ini under [DEFAULT] section:") - print("incident_project_id = your_project_id_here") - return - - issue_title = f"Incident Report: {text}" - - selected_label = selectLabels('Department') - - incident_settings = { - 'labels': ['incident', 'report'], - 'onlyIssue': True, - 'type': 'incident' - } - - if selected_label: - incident_settings['labels'].append(selected_label) - - try: - # Create the incident issue - iteration = getActiveIteration() - created_issue = createIssue(issue_title, incident_project_id, False, False, iteration, incident_settings) - issue_iid = created_issue['iid'] - - closeOpenedIssue(issue_iid, incident_project_id) - print(f"Incident issue #{issue_iid} created successfully.") - print(f"Title: {issue_title}") - - # Add time tracking to the issue - time_tracking_command = [ - "glab", "api", - f"/projects/{incident_project_id}/issues/{issue_iid}/add_spent_time", - "-f", f"duration={minutes}m" - ] - - try: - subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - print(f"Added {minutes} minutes to issue time tracking.") - except subprocess.CalledProcessError as e: - print(f"Error adding time tracking: {str(e)}") - - except Exception as e: - print(f"Error creating incident issue: {str(e)}") - -def closeOpenedIssue(issue_iid, project_id): - issue_command = [ - "glab", "api", - f"/projects/{project_id}/issues/{issue_iid}", - '-X', 'PUT', - '-f', 'state_event=close' - ] - try: - subprocess.run(issue_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except subprocess.CalledProcessError as e: - print(f"Error closing issue: {str(e)}") - -def selectLabels(search, multiple = False): - labels = getLabelsOfGroup(search) - labels = sorted([t['name'] for t in labels]) - - question_type = inquirer.Checkbox if multiple else inquirer.List - questions = [ - question_type( - 'labels', - message="Select one or more department labels:", - choices=labels, - ), - ] - answer = inquirer.prompt(questions) - return answer['labels'] - -def getLabelsOfGroup(search=''): - cmd = f'glab api /groups/{GROUP_ID}/labels?search={search}' - try: - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE, check=True) - return json.loads(result.stdout) - except subprocess.CalledProcessError as e: - print(f"Error getting labels: {str(e)}") - return [] - -def getCurrentIssueId(): - mr = getMergeRequestForBranch(getCurrentBranch()) - return mr['description'].replace('"','').replace('#','').split()[1] - -def track_issue_time(): - # Get the current merge request - try: - project_id = get_project_id() - issue_id = getCurrentIssueId() - except Exception as e: - print(f"Error getting issue details: {str(e)}") - return - - # Prompt for actual time spent - spent_time = inquirer.prompt([ - inquirer.Text('spent_time', - message='How many minutes did you actually spend on this issue?', - validate=lambda _, x: x.isdigit()) - ])['spent_time'] - - # Add spent time to the issue description - time_tracking_command = [ - "glab", "api", - f"/projects/{project_id}/issues/{issue_id}/notes", - "-f", f"body=/spend {spent_time}m" - ] - - try: - subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) - print(f"Added {spent_time} minutes to issue {issue_id} time tracking.") - except subprocess.CalledProcessError as e: - print(f"Error adding time tracking: {str(e)}") - except Exception as e: - print(f"Error tracking issue time: {str(e)}") - -def get_last_production_deploy(): - try: - project_id = get_project_id() - api_url = f"{API_URL}/projects/{project_id}/pipelines" - headers = {"Private-Token": GITLAB_TOKEN} - - # Set up parameters for the pipeline search - params = { - "per_page": 50, - "order_by": "updated_at", - "sort": "desc" - } - - # Add ref filter if specified in config - if MAIN_BRANCH: - params["ref"] = MAIN_BRANCH - else: - # Use main branch if no specific ref is configured - try: - main_branch = getMainBranch() - params["ref"] = main_branch - except: - # Fallback to common main branch names - params["ref"] = "main" - - response = requests.get(api_url, headers=headers, params=params) - - if response.status_code != 200: - print(f"Failed to fetch pipelines: {response.status_code} - {response.text}") - return - - pipelines = response.json() - production_pipeline = None - - # Look for production pipeline by name pattern - for pipeline in pipelines: - # Get pipeline details to check jobs - pipeline_detail_url = f"{API_URL}/projects/{project_id}/pipelines/{pipeline['id']}/jobs" - detail_response = requests.get(pipeline_detail_url, headers=headers) - - if detail_response.status_code == 200: - jobs = detail_response.json() - - # Check if this pipeline contains production deployment - for job in jobs: - job_name = job.get('name', '') - stage = job.get('stage', '') - job_status = job.get('status', '').lower() - - # Only consider successful jobs - if job_status != 'success': - continue - - # Check project-specific mapping first - project_mapping = PRODUCTION_MAPPINGS.get(str(project_id)) - if project_mapping: - expected_stage = project_mapping.get('stage', '').lower() - expected_job = project_mapping.get('job', '').lower() - - if (stage.lower() == expected_stage or - (expected_job and job_name.lower() == expected_job)): - production_pipeline = { - 'pipeline': pipeline, - 'production_job': job - } - break - else: - print('Didn\'t find deployment pipeline') - - if production_pipeline: - break - - if not production_pipeline: - print(f"No production deployment found matching pattern") - return - - # Display the results - pipeline = production_pipeline['pipeline'] - job = production_pipeline['production_job'] - - print(f"šŸš€ Last Production Deployment:") - print(f" Pipeline: #{pipeline['id']} - {pipeline['status']}") - print(f" Job: {job['name']} ({job['status']})") - print(f" Branch/Tag: {pipeline['ref']}") - print(f" Started: {job.get('started_at', 'N/A')}") - print(f" Finished: {job.get('finished_at', 'N/A')}") - print(f" Duration: {job.get('duration', 'N/A')} seconds" if job.get('duration') else " Duration: N/A") - print(f" Commit: {pipeline['sha'][:8]}") - print(f" URL: {pipeline['web_url']}") - - # Show time since deployment - if job.get('finished_at'): - try: - finished_time = datetime.datetime.fromisoformat(job['finished_at'].replace('Z', '+00:00')) - time_diff = datetime.datetime.now(datetime.timezone.utc) - finished_time - - if time_diff.days > 0: - print(f" ā° {time_diff.days} days ago") - elif time_diff.seconds > 3600: - hours = time_diff.seconds // 3600 - print(f" ā° {hours} hours ago") - else: - minutes = time_diff.seconds // 60 - print(f" ā° {minutes} minutes ago") - except: - pass - - except Exception as e: - print(f"Error fetching last production deploy: {str(e)}") - -def main(): - global MAIN_BRANCH - - parser = argparse.ArgumentParser("Argument description of Git happens") - parser.add_argument("title", nargs="+", help="Title of issue") - parser.add_argument(f"--project_id", type=str, help="Id or URL-encoded path of project") - parser.add_argument("-m", "--milestone", action='store_true', help="Add this flag, if you want to manually select milestone") - parser.add_argument("--no_epic", action="store_true", help="Add this flag if you don't want to pick epic") - parser.add_argument("--no_milestone", action="store_true", help="Add this flag if you don't want to pick milestone") - parser.add_argument("--no_iteration", action="store_true", help="Add this flag if you don't want to pick iteration") - parser.add_argument("--only_issue", action="store_true", help="Add this flag if you don't want to create merge request and branch alongside issue") - parser.add_argument("-am", "--auto_merge", action="store_true", help="Add this flag to review if you want to set merge request to auto merge when pipeline succeeds") - parser.add_argument("--select", action="store_true", help="Manually select reviewers for merge request (interactive)") - - # If no arguments passed, show help - if len(sys.argv) <= 1: - parser.print_help() - exit(1) - - args = parser.parse_args() - if args.title[0] == 'report': - parts = args.title - if len(parts) != 3: - print("Invalid report format. Use: gh report \"text\" minutes") - return - - text = parts[1] - try: - minutes = int(parts[2].strip()) - process_report(text, minutes) - except ValueError: - print("Invalid minutes. Please provide a valid number.") - return - - # So it takes all text until first known argument - title = " ".join(args.title) - - if title == 'open': - openMergeRequestInBrowser() - return - elif title == 'review': - track_issue_time() - reviewers = None - if getattr(args, "select", False): - reviewers = chooseReviewersManually() - addReviewersToMergeRequest(reviewers=reviewers) - - # Run AI code review and post to MR - try: - from ai_code_review import run_review_for_mr - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - run_review_for_mr(project_id, mr_id, GITLAB_TOKEN, API_URL) - except Exception as e: - print(f"AI review skipped: {e}") - - if(args.auto_merge): - setMergeRequestToAutoMerge() - return - elif title == 'summary': - get_two_weeks_commits() - return - elif title == 'summaryAI': - generate_smart_summary() - return - elif title == 'last deploy': - get_last_production_deploy() - return - elif title == 'ai review': - from ai_code_review import run_review - run_review() - return - - # Get settings for issue from template - selectedSettings = getIssueSettings(select_template()) - - # If template is False, ask for each settings - if not len(selectedSettings): - print('Custom selection of issue settings is not supported yet') - pass - - if args.project_id and selectedSettings.get('projectIds'): - print('NOTE: Overwriting project id from argument...') - - project_id = selectedSettings.get('projectIds') or args.project_id or get_project_id() - - milestone = False - if not args.no_milestone: - milestone = get_milestone(args.milestone)['id'] - - iteration = False - if not args.no_iteration: - # manual pick iteration - iteration = get_iteration(True) - - epic = False - if not args.no_epic: - epic = get_epic() - - MAIN_BRANCH = getMainBranch() - - onlyIssue = selectedSettings.get('onlyIssue') or args.only_issue - - if type(project_id) == list: - for id in project_id: - startIssueCreation(id, title, milestone, epic, iteration, selectedSettings, onlyIssue) - else: - startIssueCreation(project_id, title, milestone, epic, iteration, selectedSettings, onlyIssue) - -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/git_utils.py b/git_utils.py new file mode 100644 index 0000000..279cf57 --- /dev/null +++ b/git_utils.py @@ -0,0 +1,73 @@ +import datetime +import subprocess + +from config import CONFIG + + +def get_project_link_from_current_dir(): + try: + result = subprocess.run( + ["git", "remote", "get-url", "origin"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if result.returncode == 0: + return result.stdout.decode("utf-8").strip() + return -1 + except FileNotFoundError: + return -1 + + +def get_current_branch(): + return subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + text=True, + ).strip() + + +def get_main_branch(): + command = "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'" + output = subprocess.check_output( + command, + shell=True, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + return output.strip() + + +def get_two_weeks_commits(return_output=False, developer_email=None): + two_weeks_ago = (datetime.datetime.now() - datetime.timedelta(weeks=2)).strftime("%Y-%m-%d") + email = developer_email if developer_email is not None else CONFIG.developer_email + + cmd = f'git log --since={two_weeks_ago} --format="%ad - %ae - %s" --date=short | grep -v "Merge branch"' + if email: + cmd = f"{cmd} | grep {email}" + + try: + output = subprocess.check_output( + cmd, + shell=True, + text=True, + stderr=subprocess.DEVNULL, + universal_newlines=True, + ).strip() + if output: + if return_output: + return output + print(output) + else: + print("No commits found.") + return "" if return_output else None + except subprocess.CalledProcessError as error: + print(f"No commits were found or an error occurred. (exit status {error.returncode})") + return "" if return_output else None + except FileNotFoundError: + print("Git is not installed or not found in PATH.") + return "" if return_output else None + + +# Backwards-compatible names for older imports/scripts. +getProjectLinkFromCurrentDir = get_project_link_from_current_dir +getCurrentBranch = get_current_branch +getMainBranch = get_main_branch diff --git a/gitlab_api.py b/gitlab_api.py new file mode 100644 index 0000000..8d2a801 --- /dev/null +++ b/gitlab_api.py @@ -0,0 +1,326 @@ +import datetime +import json +import re +import subprocess + +import requests + +from config import CONFIG, REVIEWERS +from git_utils import get_current_branch, get_project_link_from_current_dir + + +MAIN_BRANCH = "master" + + +def set_main_branch(branch_name): + global MAIN_BRANCH + MAIN_BRANCH = branch_name + + +def enter_project_id(): + from interactive import enter_project_id as prompt_enter_project_id + + return prompt_enter_project_id() + + +def get_project_id(): + project_link = get_project_link_from_current_dir() + if project_link == -1: + return enter_project_id() + + all_projects = get_all_projects(project_link) + matching_id = None + for project in all_projects or []: + if project.get("ssh_url_to_repo") == project_link: + matching_id = project.get("id") + break + return matching_id + + +def get_all_projects(project_link): + url = CONFIG.api_url + "/projects?membership=true&search=" + project_link.split("/")[-1].split(".")[0] + headers = {"PRIVATE-TOKEN": CONFIG.gitlab_token} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return response.json() + if response.status_code == 401: + print("Error: Unauthorized (401). Your GitLab token is probably expired, invalid, or missing required permissions.") + print("Please generate a new token and update your configs/config.ini.") + exit(1) + + print(f"Request failed with status code {response.status_code}") + return None + + +def list_milestones(current=False): + result = subprocess.run( + ["glab", "api", f"/groups/{CONFIG.group_id}/milestones?state=active"], + stdout=subprocess.PIPE, + ) + milestones = json.loads(result.stdout) + if current: + today = datetime.date.today().strftime("%Y-%m-%d") + active_milestones = [ + milestone + for milestone in milestones + if milestone["start_date"] + and milestone["due_date"] + and milestone["start_date"] <= today <= milestone["due_date"] + ] + active_milestones.sort(key=lambda item: item["due_date"]) + return active_milestones[0] + return milestones + + +def list_iterations(): + result = subprocess.run( + ["glab", "api", f"/groups/{CONFIG.group_id}/iterations?state=opened"], + stdout=subprocess.PIPE, + ) + return json.loads(result.stdout) + + +def get_active_iteration(): + iterations = list_iterations() + today = datetime.date.today().strftime("%Y-%m-%d") + active_iterations = [ + iteration + for iteration in iterations + if iteration["start_date"] + and iteration["due_date"] + and iteration["start_date"] <= today <= iteration["due_date"] + ] + active_iterations.sort(key=lambda item: item["due_date"]) + return active_iterations[0] + + +def list_epics(): + result = subprocess.run( + ["glab", "api", f"/groups/{CONFIG.group_id}/epics?per_page=1000&state=opened"], + stdout=subprocess.PIPE, + ) + return json.loads(result.stdout) + + +def get_authorized_user(): + output = subprocess.check_output(["glab", "api", "/user"]) + return json.loads(output) + + +def create_issue(title, project_id, milestone_id, epic, iteration, settings): + if settings: + issue_type = settings.get("type") or "issue" + return execute_issue_create( + project_id, + title, + settings.get("labels"), + milestone_id, + epic, + iteration, + settings.get("weight"), + settings.get("estimated_time"), + issue_type, + ) + print("No settings in template") + exit(2) + + +def execute_issue_create( + project_id, + title, + labels, + milestone_id, + epic, + iteration, + weight, + estimated_time, + issue_type="issue", +): + labels = ",".join(labels) if type(labels) == list else labels + assignee_id = get_authorized_user()["id"] + issue_command = [ + "glab", + "api", + f"/projects/{str(project_id)}/issues", + "-f", + f"title={title}", + "-f", + f"assignee_ids={assignee_id}", + "-f", + f"issue_type={issue_type}", + ] + + if labels: + issue_command.extend(["-f", f"labels={labels}"]) + if weight: + issue_command.extend(["-f", f"weight={str(weight)}"]) + if milestone_id: + issue_command.extend(["-f", f"milestone_id={str(milestone_id)}"]) + if epic: + issue_command.extend(["-f", f"epic_id={str(epic['id'])}"]) + + description = "" + if iteration: + description += f"/iteration *iteration:{str(iteration['id'])} " + if estimated_time: + description += f"\n/estimate {estimated_time}m " + + issue_command.extend(["-f", f"description={description}"]) + issue_output = subprocess.check_output(issue_command) + return json.loads(issue_output.decode()) + + +def create_branch(project_id, issue): + issue_id = str(issue["iid"]) + title = re.sub("\\s+", "-", issue["title"]).lower() + title = issue_id + "-" + title.replace(":", "").replace("(", " ").replace(")", "").replace(" ", "-") + branch_output = subprocess.check_output( + [ + "glab", + "api", + f"/projects/{str(project_id)}/repository/branches", + "-f", + f"branch={title}", + "-f", + f"ref={MAIN_BRANCH}", + "-f", + f"issue_iid={issue_id}", + ] + ) + return json.loads(branch_output.decode()) + + +def create_merge_request(project_id, branch, issue, labels, milestone_id): + issue_id = str(issue["iid"]) + branch = branch["name"] + title = issue["title"] + assignee_id = get_authorized_user()["id"] + labels = ",".join(labels) if type(labels) == list else labels + merge_request_command = [ + "glab", + "api", + f"/projects/{str(project_id)}/merge_requests", + "-f", + f"title={title}", + "-f", + f'description="Closes #{issue_id}"', + "-f", + f"source_branch={branch}", + "-f", + f"target_branch={MAIN_BRANCH}", + "-f", + f"issue_iid={issue_id}", + "-f", + f"assignee_ids={assignee_id}", + ] + + if CONFIG.squash_commits: + merge_request_command.extend(["-f", "squash=true"]) + if CONFIG.delete_branch_after_merge: + merge_request_command.extend(["-f", "remove_source_branch=true"]) + if labels: + merge_request_command.extend(["-f", f"labels={labels}"]) + if milestone_id: + merge_request_command.extend(["-f", f"milestone_id={str(milestone_id)}"]) + + mr_output = subprocess.check_output(merge_request_command) + return json.loads(mr_output.decode()) + + +def get_merge_request_for_branch(branch_name): + project_id = get_project_id() + api_url = f"{CONFIG.api_url}/projects/{project_id}/merge_requests" + headers = {"Private-Token": CONFIG.gitlab_token} + response = requests.get(api_url, headers=headers, params={"source_branch": branch_name}) + + if response.status_code == 200: + for merge_request in response.json(): + if merge_request["source_branch"] == branch_name: + return merge_request + else: + print(f"Failed to fetch Merge Requests: {response.status_code} - {response.text}") + return None + + +def find_merge_request_id_by_branch(branch_name): + return get_merge_request_for_branch(branch_name)["iid"] + + +def get_active_merge_request_id(): + return find_merge_request_id_by_branch(get_current_branch()) + + +def add_reviewers_to_merge_request(reviewers=None): + project_id = get_project_id() + mr_id = get_active_merge_request_id() + api_url = f"{CONFIG.api_url}/projects/{project_id}/merge_requests/{mr_id}" + headers = {"Private-Token": CONFIG.gitlab_token} + data = {"reviewer_ids": reviewers if reviewers is not None else REVIEWERS} + requests.put(api_url, headers=headers, json=data) + + +def set_merge_request_to_auto_merge(): + project_id = get_project_id() + mr_id = get_active_merge_request_id() + api_url = f"{CONFIG.api_url}/projects/{project_id}/merge_requests/{mr_id}/merge" + headers = {"Private-Token": CONFIG.gitlab_token} + data = { + "id": project_id, + "merge_request_iid": mr_id, + "should_remove_source_branch": True, + "merge_when_pipeline_succeeds": True, + "auto_merge_strategy": "merge_when_pipeline_succeeds", + } + requests.put(api_url, headers=headers, json=data) + + +def close_opened_issue(issue_iid, project_id): + issue_command = [ + "glab", + "api", + f"/projects/{project_id}/issues/{issue_iid}", + "-X", + "PUT", + "-f", + "state_event=close", + ] + try: + subprocess.run(issue_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as error: + print(f"Error closing issue: {str(error)}") + + +def get_labels_of_group(search=""): + try: + result = subprocess.run( + ["glab", "api", f"/groups/{CONFIG.group_id}/labels?search={search}"], + stdout=subprocess.PIPE, + check=True, + ) + return json.loads(result.stdout) + except subprocess.CalledProcessError as error: + print(f"Error getting labels: {str(error)}") + return [] + + +# Backwards-compatible names. +get_project_id = get_project_id +get_all_projects = get_all_projects +enterProjectId = enter_project_id +list_milestones = list_milestones +list_iterations = list_iterations +getActiveIteration = get_active_iteration +getAuthorizedUser = get_authorized_user +list_epics = list_epics +createIssue = create_issue +executeIssueCreate = execute_issue_create +create_branch = create_branch +create_merge_request = create_merge_request +getMergeRequestForBranch = get_merge_request_for_branch +find_merge_request_id_by_branch = find_merge_request_id_by_branch +getActiveMergeRequestId = get_active_merge_request_id +addReviewersToMergeRequest = add_reviewers_to_merge_request +setMergeRequestToAutoMerge = set_merge_request_to_auto_merge +closeOpenedIssue = close_opened_issue +getLabelsOfGroup = get_labels_of_group diff --git a/interactive.py b/interactive.py new file mode 100644 index 0000000..48cd225 --- /dev/null +++ b/interactive.py @@ -0,0 +1,192 @@ +import requests + +from config import CONFIG, REVIEWERS +from gitlab_api import ( + get_active_iteration, + get_labels_of_group, + list_epics, + list_iterations, + list_milestones, +) + + +def enter_project_id(): + while True: + project_id = input("Please enter the ID of your GitLab project: ") + if project_id: + return project_id + exit("Invalid project ID.") + + +def select_milestone(milestones): + import inquirer + + milestone_names = [milestone["title"] for milestone in milestones] + questions = [ + inquirer.List( + "milestones", + message="Select milestone:", + choices=milestone_names, + ), + ] + answer = inquirer.prompt(questions) + return answer["milestones"] + + +def get_selected_milestone(milestone, milestones): + return next((item for item in milestones if item["title"] == milestone), None) + + +def get_milestone(manual): + if manual: + milestones = list_milestones() + return get_selected_milestone(select_milestone(milestones), milestones) + return list_milestones(True) + + +def select_iteration(iterations): + import inquirer + + iteration_names = [item["start_date"] + " - " + item["due_date"] for item in iterations] + questions = [ + inquirer.List( + "iterations", + message="Select iteration:", + choices=iteration_names, + ), + ] + answer = inquirer.prompt(questions) + return answer["iterations"] + + +def get_selected_iteration(iteration, iterations): + return next( + (item for item in iterations if item["start_date"] + " - " + item["due_date"] == iteration), + None, + ) + + +def get_iteration(manual): + if manual: + iterations = list_iterations() + return get_selected_iteration(select_iteration(iterations), iterations) + return get_active_iteration() + + +def select_epic(epics): + import inquirer + + epic_names = [epic["title"] for epic in epics] + search_query = inquirer.prompt( + [inquirer.Text("search_query", message="Search epic:")] + )["search_query"] + filtered_epics = [choice for choice in epic_names if search_query.lower() in choice.lower()] + questions = [ + inquirer.List( + "epics", + message="Select epic:", + choices=filtered_epics, + ), + ] + answer = inquirer.prompt(questions) + return answer["epics"] + + +def get_selected_epic(epic, epics): + return next((item for item in epics if item["title"] == epic), None) + + +def get_epic(): + epics = list_epics() + return get_selected_epic(select_epic(epics), epics) + + +def choose_reviewers_manually(): + import inquirer + + reviewer_choices = [] + for reviewer_id in REVIEWERS: + api_url = f"{CONFIG.api_url}/users/{reviewer_id}" + headers = {"Private-Token": CONFIG.gitlab_token} + try: + response = requests.get(api_url, headers=headers) + if response.status_code == 200: + user = response.json() + display_name = f"{user.get('name')} ({user.get('username')})" + reviewer_choices.append((display_name, reviewer_id)) + else: + reviewer_choices.append((str(reviewer_id), reviewer_id)) + except Exception: + reviewer_choices.append((str(reviewer_id), reviewer_id)) + + questions = [ + inquirer.Checkbox( + "selected_reviewers", + message="Select reviewers", + choices=[(name, str(reviewer_id)) for name, reviewer_id in reviewer_choices], + ) + ] + answers = inquirer.prompt(questions) + if answers and "selected_reviewers" in answers: + return [int(reviewer) for reviewer in answers["selected_reviewers"]] + return [] + + +def select_labels(search, multiple=False): + import inquirer + + labels = get_labels_of_group(search) + label_names = sorted([label["name"] for label in labels]) + question_type = inquirer.Checkbox if multiple else inquirer.List + questions = [ + question_type( + "labels", + message="Select one or more department labels:", + choices=label_names, + ), + ] + answer = inquirer.prompt(questions) + return answer["labels"] + + +def prompt_estimated_time(): + import inquirer + + return inquirer.prompt( + [ + inquirer.Text( + "estimated_time", + message="Estimated time to complete this issue (in minutes, optional)", + validate=lambda _, value: value == "" or value.isdigit(), + ) + ] + )["estimated_time"] + + +def prompt_spent_time(): + import inquirer + + return inquirer.prompt( + [ + inquirer.Text( + "spent_time", + message="How many minutes did you actually spend on this issue?", + validate=lambda _, value: value.isdigit(), + ) + ] + )["spent_time"] + + +# Backwards-compatible names. +enterProjectId = enter_project_id +select_milestone = select_milestone +getSelectedMilestone = get_selected_milestone +get_milestone = get_milestone +select_iteration = select_iteration +getSelectedIteration = get_selected_iteration +get_iteration = get_iteration +select_epic = select_epic +getSelectedEpic = get_selected_epic +get_epic = get_epic +chooseReviewersManually = choose_reviewers_manually +selectLabels = select_labels diff --git a/main.py b/main.py new file mode 100644 index 0000000..8f96d72 --- /dev/null +++ b/main.py @@ -0,0 +1,107 @@ +import argparse +import sys + +from commands.create_issue import start_issue_creation +from commands.deploy import get_last_production_deploy +from commands.open_mr import open_merge_request_in_browser +from commands.report import process_report +from commands.review import run_review_workflow +from commands.summary import generate_smart_summary +from git_utils import get_main_branch, get_two_weeks_commits +from gitlab_api import get_project_id, set_main_branch +from interactive import get_epic, get_iteration, get_milestone +from templates import get_issue_settings, select_template + + +def build_parser(): + parser = argparse.ArgumentParser("Argument description of Git happens") + parser.add_argument("title", nargs="+", help="Title of issue") + parser.add_argument("--project_id", type=str, help="Id or URL-encoded path of project") + parser.add_argument("-m", "--milestone", action="store_true", help="Add this flag, if you want to manually select milestone") + parser.add_argument("--no_epic", action="store_true", help="Add this flag if you don't want to pick epic") + parser.add_argument("--no_milestone", action="store_true", help="Add this flag if you don't want to pick milestone") + parser.add_argument("--no_iteration", action="store_true", help="Add this flag if you don't want to pick iteration") + parser.add_argument("--only_issue", action="store_true", help="Add this flag if you don't want to create merge request and branch alongside issue") + parser.add_argument("-am", "--auto_merge", action="store_true", help="Add this flag to review if you want to set merge request to auto merge when pipeline succeeds") + parser.add_argument("--select", action="store_true", help="Manually select reviewers for merge request (interactive)") + return parser + + +def main(argv=None): + argv = sys.argv[1:] if argv is None else argv + parser = build_parser() + + if not argv: + parser.print_help() + exit(1) + + args = parser.parse_args(argv) + if args.title[0] == "report": + parts = args.title + if len(parts) != 3: + print('Invalid report format. Use: gh report "text" minutes') + return + + try: + process_report(parts[1], int(parts[2].strip())) + except ValueError: + print("Invalid minutes. Please provide a valid number.") + return + + title = " ".join(args.title) + + if title == "open": + open_merge_request_in_browser() + return + if title == "review": + run_review_workflow(select_reviewers=getattr(args, "select", False), auto_merge=args.auto_merge) + return + if title == "summary": + get_two_weeks_commits() + return + if title == "summaryAI": + generate_smart_summary() + return + if title == "last deploy": + get_last_production_deploy() + return + if title == "ai review": + from ai_code_review import run_review + + run_review() + return + + selected_settings = get_issue_settings(select_template()) + + if not len(selected_settings): + print("Custom selection of issue settings is not supported yet") + + if args.project_id and selected_settings.get("projectIds"): + print("NOTE: Overwriting project id from argument...") + + project_id = selected_settings.get("projectIds") or args.project_id or get_project_id() + + milestone = False + if not args.no_milestone: + milestone = get_milestone(args.milestone)["id"] + + iteration = False + if not args.no_iteration: + iteration = get_iteration(True) + + epic = False + if not args.no_epic: + epic = get_epic() + + set_main_branch(get_main_branch()) + only_issue = selected_settings.get("onlyIssue") or args.only_issue + + if type(project_id) == list: + for project in project_id: + start_issue_creation(project, title, milestone, epic, iteration, selected_settings, only_issue) + else: + start_issue_creation(project_id, title, milestone, epic, iteration, selected_settings, only_issue) + + +if __name__ == "__main__": + main() diff --git a/templates.py b/templates.py new file mode 100644 index 0000000..8e43222 --- /dev/null +++ b/templates.py @@ -0,0 +1,31 @@ +from config import CONFIG, TEMPLATES + + +def select_template(templates=None, custom_template=None): + import inquirer + + templates = TEMPLATES if templates is None else templates + custom_template = CONFIG.custom_template if custom_template is None else custom_template + template_names = [template["name"] for template in templates] + template_names.append(custom_template) + questions = [ + inquirer.List( + "template", + message="Select template:", + choices=template_names, + ), + ] + answer = inquirer.prompt(questions) + return answer["template"] + + +def get_issue_settings(template_name, templates=None, custom_template=None): + templates = TEMPLATES if templates is None else templates + custom_template = CONFIG.custom_template if custom_template is None else custom_template + if template_name == custom_template: + return {} + return next((template for template in templates if template["name"] == template_name), None) + + +# Backwards-compatible names. +getIssueSettings = get_issue_settings diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e901b5a --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +"""Unit tests for GitHappens modules.""" + diff --git a/tests/test_command_modules.py b/tests/test_command_modules.py new file mode 100644 index 0000000..056e9c2 --- /dev/null +++ b/tests/test_command_modules.py @@ -0,0 +1,106 @@ +import datetime +import io +import unittest +from types import SimpleNamespace +from unittest.mock import Mock, patch + +import gitlab_api +from commands.deploy import get_last_production_deploy +from commands.open_mr import open_merge_request_in_browser +from commands.report import process_report +from commands.review import get_current_issue_id +from commands.summary import generate_smart_summary + + +class CommandModuleTests(unittest.TestCase): + @patch("commands.open_mr.webbrowser.open") + @patch("commands.open_mr.subprocess.check_output", return_value="git@gitlab.com:group/repo.git\n") + @patch("commands.open_mr.get_active_merge_request_id", return_value=12) + def test_open_merge_request_in_browser_builds_gitlab_url(self, _mr_id, _remote, browser_open): + open_merge_request_in_browser() + + browser_open.assert_called_once_with("https://gitlab.com/group/repo/-/merge_requests/12") + + @patch("commands.review.get_current_branch", return_value="feature") + @patch( + "commands.review.get_merge_request_for_branch", + return_value={"description": '"Closes #99"'}, + ) + def test_get_current_issue_id_extracts_issue_from_mr_description(self, _mr, _branch): + self.assertEqual(get_current_issue_id(), "99") + + @patch("commands.summary.get_two_weeks_commits", return_value="") + def test_generate_smart_summary_stops_when_no_commits(self, commits): + generate_smart_summary() + + commits.assert_called_once_with(return_output=True) + + @patch("commands.report.subprocess.run") + @patch("commands.report.close_opened_issue") + @patch("commands.report.create_issue") + @patch("commands.report.get_active_iteration", return_value={"id": 5}) + @patch("commands.report.select_labels", return_value="Department") + def test_process_report_creates_and_closes_incident_issue( + self, + _labels, + _iteration, + create_issue, + close_opened_issue, + _run, + ): + create_issue.return_value = {"iid": 77} + + with patch("commands.report.CONFIG", SimpleNamespace(incident_project_id="123")): + process_report("Something happened", 15) + + create_issue.assert_called_once_with( + "Incident Report: Something happened", + "123", + False, + False, + {"id": 5}, + { + "labels": ["incident", "report", "Department"], + "onlyIssue": True, + "type": "incident", + }, + ) + close_opened_issue.assert_called_once_with(77, "123") + + @patch("commands.deploy.requests.get") + @patch("commands.deploy.get_project_id", return_value=123) + def test_get_last_production_deploy_prints_matching_successful_job(self, _project_id, get): + gitlab_api.set_main_branch("main") + pipeline_response = Mock(status_code=200) + pipeline_response.json.return_value = [ + { + "id": 10, + "status": "success", + "ref": "main", + "sha": "abcdef123456", + "web_url": "https://gitlab.example/pipeline/10", + } + ] + jobs_response = Mock(status_code=200) + jobs_response.json.return_value = [ + { + "name": "production:deploy", + "stage": "deploy", + "status": "success", + "started_at": "2026-01-01T00:00:00Z", + "finished_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), + "duration": 42, + } + ] + get.side_effect = [pipeline_response, jobs_response] + + with patch("commands.deploy.PRODUCTION_MAPPINGS", {"123": {"stage": "deploy", "job": "production:deploy"}}): + with patch("sys.stdout", new_callable=io.StringIO) as stdout: + get_last_production_deploy() + + self.assertIn("Last Production Deployment", stdout.getvalue()) + self.assertIn("Pipeline: #10 - success", stdout.getvalue()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..ded1d45 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,74 @@ +import unittest +from unittest.mock import patch + +from commands.create_issue import start_issue_creation + + +class CommandTests(unittest.TestCase): + @patch("commands.create_issue.create_merge_request") + @patch("commands.create_issue.create_branch") + @patch("commands.create_issue.create_issue") + @patch("commands.create_issue.prompt_estimated_time", return_value="30") + def test_start_issue_creation_only_issue_skips_branch_and_mr( + self, + _prompt, + create_issue, + create_branch, + create_merge_request, + ): + create_issue.return_value = {"iid": 1, "title": "Test"} + + result = start_issue_creation( + project_id=1, + title="Test", + milestone=False, + epic=False, + iteration=False, + selected_settings={"labels": ["Bug"]}, + only_issue=True, + ) + + self.assertEqual(result, {"iid": 1, "title": "Test"}) + create_issue.assert_called_once() + create_branch.assert_not_called() + create_merge_request.assert_not_called() + + @patch("commands.create_issue.create_merge_request") + @patch("commands.create_issue.create_branch") + @patch("commands.create_issue.create_issue") + @patch("commands.create_issue.prompt_estimated_time", return_value="60") + def test_start_issue_creation_splits_estimate_for_multiple_projects( + self, + _prompt, + create_issue, + create_branch, + create_merge_request, + ): + create_issue.return_value = {"iid": 2, "title": "Test"} + create_branch.return_value = {"name": "branch"} + create_merge_request.return_value = {"iid": 3, "title": "Test", "source_branch": "branch"} + settings = {"labels": ["Bug"]} + + start_issue_creation( + project_id=[1, 2], + title="Test", + milestone=False, + epic=False, + iteration=False, + selected_settings=settings, + only_issue=False, + ) + + create_issue.assert_called_once_with( + "Test", + [1, 2], + False, + False, + False, + {"labels": ["Bug"], "estimated_time": 30}, + ) + self.assertEqual(settings, {"labels": ["Bug"]}) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..89f85a0 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,50 @@ +import tempfile +import unittest +from pathlib import Path + +from config import load_config, load_template_config + + +class ConfigTests(unittest.TestCase): + def test_load_config_reads_values_and_strips_token_quotes(self): + with tempfile.TemporaryDirectory() as directory: + config_path = Path(directory) / "config.ini" + config_path.write_text( + "\n".join( + [ + "[DEFAULT]", + "base_url=https://gitlab.example.com", + "group_id=42", + "custom_template=Other", + "GITLAB_TOKEN='secret'", + "delete_branch_after_merge=false", + "squash_commits=false", + "developer_email=dev@example.com", + "incident_project_id=99", + ] + ) + ) + + config = load_config(config_path) + + self.assertEqual(config.base_url, "https://gitlab.example.com") + self.assertEqual(config.api_url, "https://gitlab.example.com/api/v4") + self.assertEqual(config.group_id, "42") + self.assertEqual(config.custom_template, "Other") + self.assertEqual(config.gitlab_token, "secret") + self.assertFalse(config.delete_branch_after_merge) + self.assertFalse(config.squash_commits) + self.assertEqual(config.developer_email, "dev@example.com") + self.assertEqual(config.incident_project_id, "99") + + def test_load_template_config_returns_empty_defaults_when_file_is_missing(self): + with tempfile.TemporaryDirectory() as directory: + result = load_template_config(Path(directory) / "missing.json") + + self.assertEqual(result["templates"], []) + self.assertEqual(result["reviewers"], []) + self.assertEqual(result["productionMappings"], {}) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_git_utils.py b/tests/test_git_utils.py new file mode 100644 index 0000000..c53b963 --- /dev/null +++ b/tests/test_git_utils.py @@ -0,0 +1,29 @@ +import subprocess +import unittest +from unittest.mock import Mock, patch + +from git_utils import get_current_branch, get_project_link_from_current_dir + + +class GitUtilsTests(unittest.TestCase): + @patch("git_utils.subprocess.run") + def test_get_project_link_from_current_dir_returns_remote_url(self, run): + run.return_value = Mock(returncode=0, stdout=b"git@gitlab.com:group/repo.git\n") + + self.assertEqual(get_project_link_from_current_dir(), "git@gitlab.com:group/repo.git") + + @patch("git_utils.subprocess.run", side_effect=FileNotFoundError) + def test_get_project_link_from_current_dir_handles_missing_git(self, _run): + self.assertEqual(get_project_link_from_current_dir(), -1) + + @patch("git_utils.subprocess.check_output", return_value="feature\n") + def test_get_current_branch_strips_output(self, check_output): + self.assertEqual(get_current_branch(), "feature") + check_output.assert_called_once_with( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + text=True, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_gitlab_api.py b/tests/test_gitlab_api.py new file mode 100644 index 0000000..2b84b30 --- /dev/null +++ b/tests/test_gitlab_api.py @@ -0,0 +1,66 @@ +import json +import unittest +from unittest.mock import patch + +import gitlab_api + + +class GitlabApiTests(unittest.TestCase): + @patch("gitlab_api.subprocess.check_output") + @patch("gitlab_api.get_authorized_user", return_value={"id": 7}) + def test_execute_issue_create_builds_expected_glab_command(self, _authorized_user, check_output): + check_output.return_value = json.dumps({"iid": 12, "title": "Test"}).encode() + + result = gitlab_api.execute_issue_create( + project_id=123, + title="Test issue", + labels=["Bug", "P::1"], + milestone_id=456, + epic={"id": 789}, + iteration={"id": 10}, + weight=3, + estimated_time=60, + issue_type="issue", + ) + + command = check_output.call_args.args[0] + self.assertEqual(result, {"iid": 12, "title": "Test"}) + self.assertIn("/projects/123/issues", command) + self.assertIn("title=Test issue", command) + self.assertIn("assignee_ids=7", command) + self.assertIn("labels=Bug,P::1", command) + self.assertIn("milestone_id=456", command) + self.assertIn("epic_id=789", command) + self.assertIn("weight=3", command) + self.assertIn("description=/iteration *iteration:10 \n/estimate 60m ", command) + + @patch("gitlab_api.subprocess.check_output") + def test_create_branch_slug_matches_legacy_format(self, check_output): + check_output.return_value = json.dumps({"name": "44-fix-login-flow"}).encode() + + gitlab_api.set_main_branch("main") + result = gitlab_api.create_branch(5, {"iid": 44, "title": "Fix: Login (Flow)"}) + + command = check_output.call_args.args[0] + self.assertEqual(result, {"name": "44-fix-login-flow"}) + self.assertIn("branch=44-fix-login--flow", command) + self.assertIn("ref=main", command) + self.assertIn("issue_iid=44", command) + + @patch("gitlab_api.requests.get") + @patch("gitlab_api.get_project_id", return_value=123) + def test_get_merge_request_for_branch_filters_matching_branch(self, _project_id, get): + get.return_value.status_code = 200 + get.return_value.json.return_value = [ + {"iid": 1, "source_branch": "other"}, + {"iid": 2, "source_branch": "feature"}, + ] + + self.assertEqual( + gitlab_api.get_merge_request_for_branch("feature"), + {"iid": 2, "source_branch": "feature"}, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_interactive.py b/tests/test_interactive.py new file mode 100644 index 0000000..4c25a63 --- /dev/null +++ b/tests/test_interactive.py @@ -0,0 +1,30 @@ +import unittest + +from interactive import get_selected_epic, get_selected_iteration, get_selected_milestone + + +class InteractiveTests(unittest.TestCase): + def test_get_selected_milestone_returns_matching_item(self): + milestones = [{"title": "Sprint 1"}, {"title": "Sprint 2"}] + + self.assertEqual(get_selected_milestone("Sprint 2", milestones), {"title": "Sprint 2"}) + + def test_get_selected_iteration_matches_date_range_label(self): + iterations = [ + {"id": 1, "start_date": "2026-01-01", "due_date": "2026-01-15"}, + {"id": 2, "start_date": "2026-01-16", "due_date": "2026-01-31"}, + ] + + self.assertEqual( + get_selected_iteration("2026-01-16 - 2026-01-31", iterations), + {"id": 2, "start_date": "2026-01-16", "due_date": "2026-01-31"}, + ) + + def test_get_selected_epic_returns_matching_item(self): + epics = [{"title": "Platform"}, {"title": "Billing"}] + + self.assertEqual(get_selected_epic("Billing", epics), {"title": "Billing"}) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..278e9a7 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,16 @@ +import unittest + +from main import build_parser + + +class MainTests(unittest.TestCase): + def test_build_parser_supports_review_flags(self): + args = build_parser().parse_args(["review", "--select", "-am"]) + + self.assertEqual(args.title, ["review"]) + self.assertTrue(args.select) + self.assertTrue(args.auto_merge) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_templates.py b/tests/test_templates.py new file mode 100644 index 0000000..ca2120b --- /dev/null +++ b/tests/test_templates.py @@ -0,0 +1,26 @@ +import unittest + +from templates import get_issue_settings + + +class TemplateTests(unittest.TestCase): + def test_get_issue_settings_returns_empty_for_custom_template(self): + self.assertEqual( + get_issue_settings("Custom", templates=[{"name": "Bug"}], custom_template="Custom"), + {}, + ) + + def test_get_issue_settings_finds_matching_template(self): + templates = [ + {"name": "Bug", "labels": ["bug"]}, + {"name": "Feature", "labels": ["feature"]}, + ] + + self.assertEqual( + get_issue_settings("Feature", templates=templates, custom_template="Custom"), + {"name": "Feature", "labels": ["feature"]}, + ) + + +if __name__ == "__main__": + unittest.main()