diff --git a/.gitignore b/.gitignore index 1ff4785..661dfab 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ configs/templates.json -configs/config.ini \ No newline at end of file +configs/config.ini +__pycache__/ +*.py[cod] +*$py.class +.pytest_cache/ \ No newline at end of file diff --git a/README.md b/README.md index c8fef2e..1f92adf 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,7 @@ - install python3 (make sure to include pip in install) - Install [glab](https://gitlab.com/gitlab-org/cli) - Authorize via glab `glab auth login` (you will need Gitlab access token, SSH recomended) -- `pip install inquirer` or `pip3 install inquirer` -- `pip install requests` or `pip3 install requests` +- `pip install -r requirements.txt` ### Setup @@ -32,7 +31,7 @@ To run gitHappens script anywhere in filesystem, make sure to create an alias. Add following line to your `.bashrc` or `.zshrc` file -`alias gh='python3 ~//gitHappens.py'` +`alias gh='python3 ~//main.py'` Run `source ~/.zshrc` or restart terminal. @@ -210,6 +209,30 @@ To configure production deployment detection, add project-specific mappings to y If you run just `gh` (or whatever alias you set) or `gh --help` you will see all available flags and a short explanation. +## Project Structure + +The codebase is organized into modular components: + +``` +. +├── main.py # Entry point and argument parsing +├── gitlab_api.py # GitLab API interactions (unified transport) +├── config.py # Configuration management +├── templates.py # Template processing +├── git_utils.py # Git operations +├── interactive.py # User prompts and CLI interactions +├── ai_code_review.py # AI-powered code review +├── commands/ # Command-specific logic +│ ├── create_issue.py +│ ├── review.py +│ ├── deploy.py +│ ├── open_mr.py +│ ├── report.py +│ ├── summary.py +│ └── ai_review.py +└── tests/ # Unit tests (pytest) +``` + ## Troubleshooting 🪲🔫 ### Recieving 401 Unauthorized error diff --git a/ai_code_review.py b/ai_code_review.py index 7c3b64f..aba0e06 100644 --- a/ai_code_review.py +++ b/ai_code_review.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 -import subprocess import json import sys -import os -import configparser + +import config +import git_utils +import gitlab_api # ANSI color codes class Colors: @@ -45,51 +46,10 @@ class Colors: - MEDIUM: Code smells, potential bugs, missing error handling - LOW: Minor improvements, suggestions, style inconsistencies""" -def get_branch_diff(): - """Get the diff of changed files in current branch vs main branch.""" - try: - # Get main branch name - main_branch = subprocess.check_output( - "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'", - shell=True, text=True, stderr=subprocess.STDOUT - ).strip() - except subprocess.CalledProcessError: - main_branch = 'master' - - try: - current_branch = subprocess.check_output( - ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - text=True - ).strip() - - if current_branch == main_branch: - print(f"{Colors.HIGH}⚠ You are on the main branch ({main_branch}). No changes to review.{Colors.RESET}") - return None - - # Get diff of changed files only - diff_output = subprocess.check_output( - ['git', 'diff', f'{main_branch}...HEAD'], - text=True, - stderr=subprocess.DEVNULL - ) - - if not diff_output.strip(): - print(f"{Colors.INFO}ℹ No changes detected between {current_branch} and {main_branch}{Colors.RESET}") - return None - - return diff_output - - except subprocess.CalledProcessError as e: - print(f"{Colors.CRITICAL}✗ Error getting git diff: {e}{Colors.RESET}") - return None def get_openai_client(): """Initialize OpenAI client with API key from config.""" - config = configparser.ConfigParser() - config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs/config.ini') - config.read(config_path) - - api_key = config.get('DEFAULT', 'OPENAI_API_KEY', fallback=None) + api_key = config.config.get('DEFAULT', 'OPENAI_API_KEY', fallback=None) if not api_key: print(f"{Colors.HIGH}⚠ OpenAI API key not set in configs/config.ini{Colors.RESET}") print(f"{Colors.DIM} Add: OPENAI_API_KEY = your_key_here{Colors.RESET}") @@ -213,123 +173,11 @@ def format_issues(issues, severity, emoji): return comment -def get_merge_request_changes(project_id, mr_id, gitlab_token, api_url): - """Get the changes (diffs) from the merge request to find commit SHAs.""" - import requests - - url = f"{api_url}/projects/{project_id}/merge_requests/{mr_id}/changes" - headers = {"Private-Token": gitlab_token} - - try: - response = requests.get(url, headers=headers) - if response.status_code == 200: - return response.json() - else: - print(f"{Colors.CRITICAL}✗ Failed to get MR changes: {response.status_code}{Colors.RESET}") - return None - except Exception as e: - print(f"{Colors.CRITICAL}✗ Error getting MR changes: {e}{Colors.RESET}") - return None - -def get_diff_refs(project_id, mr_id, gitlab_token, api_url): - """Get the diff refs (base_sha, head_sha, start_sha) from the merge request.""" - import requests - - url = f"{api_url}/projects/{project_id}/merge_requests/{mr_id}" - headers = {"Private-Token": gitlab_token} - - try: - response = requests.get(url, headers=headers) - if response.status_code == 200: - mr_data = response.json() - diff_refs = mr_data.get('diff_refs', {}) - return { - 'base_sha': diff_refs.get('base_sha'), - 'head_sha': diff_refs.get('head_sha'), - 'start_sha': diff_refs.get('start_sha') - } - return None - except Exception: - return None - -def post_inline_comment(issue, severity, project_id, mr_id, gitlab_token, api_url, diff_refs): - """Post an inline comment on a specific line in the merge request.""" - import requests - - url = f"{api_url}/projects/{project_id}/merge_requests/{mr_id}/discussions" - headers = {"Private-Token": gitlab_token} - - severity_emoji = { - 'critical': '🔴', - 'high': '🟡', - 'medium': '🔵', - 'low': '🟢' - } - - emoji = severity_emoji.get(severity, '⚪') - body = f"{emoji} **{severity.upper()}**: {issue['issue']}" - - file_path = issue['file'] - line = issue['line'] - - # Convert line to integer if it's a string - try: - line = int(line) if isinstance(line, str) else line - except (ValueError, TypeError): - print(f"{Colors.HIGH}⚠ Skipping comment (invalid line number): {file_path}:{line}{Colors.RESET}") - return False - - data = { - "body": body, - "position": { - "base_sha": diff_refs['base_sha'], - "head_sha": diff_refs['head_sha'], - "start_sha": diff_refs['start_sha'], - "position_type": "text", - "new_path": file_path, - "new_line": line - } - } - - try: - response = requests.post(url, headers=headers, json=data) - if response.status_code == 201: - return True - else: - # Print error details for debugging - error_msg = response.json() if response.headers.get('content-type') == 'application/json' else response.text - print(f"{Colors.DIM} Failed {file_path}:{line} - {response.status_code}: {error_msg}{Colors.RESET}") - return False - except Exception as e: - print(f"{Colors.DIM} Error posting inline comment: {e}{Colors.RESET}") - return False - -def post_to_merge_request(comment_body, project_id, mr_id, gitlab_token, api_url): - """Post AI review as a general comment on the GitLab merge request.""" - import requests - - url = f"{api_url}/projects/{project_id}/merge_requests/{mr_id}/notes" - headers = {"Private-Token": gitlab_token} - data = {"body": comment_body} - - try: - response = requests.post(url, headers=headers, json=data) - if response.status_code == 201: - print(f"{Colors.INFO}✓ AI review summary posted to merge request{Colors.RESET}") - return True - else: - print(f"{Colors.CRITICAL}✗ Failed to post comment: {response.status_code}{Colors.RESET}") - print(f"{Colors.DIM}{response.text}{Colors.RESET}") - return False - except Exception as e: - print(f"{Colors.CRITICAL}✗ Error posting to GitLab: {e}{Colors.RESET}") - return False - def run_review(): """Main entry point for AI code review (terminal output).""" print(f"{Colors.INFO}🔍 Analyzing code changes...{Colors.RESET}") - diff_content = get_branch_diff() + diff_content = git_utils.get_branch_diff() if not diff_content: sys.exit(0) @@ -340,11 +188,12 @@ def run_review(): sys.exit(0) display_review_results(results) -def run_review_for_mr(project_id, mr_id, gitlab_token, api_url): + +def run_review_for_mr(project_id, mr_id): """Run AI code review and post inline comments to GitLab merge request.""" print(f"{Colors.INFO}🤖 Running AI code review...{Colors.RESET}") - diff_content = get_branch_diff() + diff_content = git_utils.get_branch_diff() if not diff_content: return @@ -354,11 +203,11 @@ def run_review_for_mr(project_id, mr_id, gitlab_token, api_url): return # Get diff refs for inline comments - diff_refs = get_diff_refs(project_id, mr_id, gitlab_token, api_url) + diff_refs = gitlab_api.get_diff_refs(project_id, mr_id) if not diff_refs or not all(diff_refs.values()): print(f"{Colors.HIGH}⚠ Could not get diff refs, posting summary only{Colors.RESET}") comment = format_gitlab_comment(results) - post_to_merge_request(comment, project_id, mr_id, gitlab_token, api_url) + gitlab_api.post_to_merge_request(comment, project_id, mr_id) return # Post inline comments for each issue @@ -368,7 +217,17 @@ def run_review_for_mr(project_id, mr_id, gitlab_token, api_url): for severity in ['critical', 'high', 'medium', 'low']: issues = results.get(severity, []) for issue in issues: - success = post_inline_comment(issue, severity, project_id, mr_id, gitlab_token, api_url, diff_refs) + file_path = issue['file'] + line = issue['line'] + try: + line = int(line) if isinstance(line, str) else line + except (ValueError, TypeError): + print(f"{Colors.HIGH}⚠ Skipping comment (invalid line number): {file_path}:{line}{Colors.RESET}") + continue + + emoji = {'critical': '🔴', 'high': '🟡', 'medium': '🔵', 'low': '🟢'}.get(severity, '⚪') + body = f"{emoji} **{severity.upper()}**: {issue['issue']}" + success = gitlab_api.post_inline_comment(project_id, mr_id, body, diff_refs, file_path, line) if success: total_posted += 1 print(f"{Colors.INFO} ✓ Posted {severity} issue on {issue['file']}:{issue['line']}{Colors.RESET}") @@ -382,7 +241,7 @@ def run_review_for_mr(project_id, mr_id, gitlab_token, api_url): for severity, issue in failed_comments: emoji = {'critical': '🔴', 'high': '🟡', 'medium': '🔵', 'low': '🟢'}.get(severity, '⚪') summary_comment += f"- {emoji} **`{issue['file']}:{issue['line']}`** - {issue['issue']}\n" - post_to_merge_request(summary_comment, project_id, mr_id, gitlab_token, api_url) + gitlab_api.post_to_merge_request(summary_comment, project_id, mr_id) else: print(f"{Colors.INFO}✓ All {total_posted} issues posted as inline comments{Colors.RESET}") diff --git a/commands/__init__.py b/commands/__init__.py new file mode 100644 index 0000000..354dfbb --- /dev/null +++ b/commands/__init__.py @@ -0,0 +1 @@ +# commands package diff --git a/commands/ai_review.py b/commands/ai_review.py new file mode 100644 index 0000000..a5aff7e --- /dev/null +++ b/commands/ai_review.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +from ai_code_review import run_review + + +def run(): + run_review() diff --git a/commands/create_issue.py b/commands/create_issue.py new file mode 100644 index 0000000..b7aff9b --- /dev/null +++ b/commands/create_issue.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import config +import gitlab_api +import interactive + + +def run(title, project_id, milestone, epic, iteration, selected_settings, only_issue): + estimated_time = interactive.prompt_estimated_time() + + if isinstance(project_id, list): + estimated_time_per_project = int(estimated_time) / len(project_id) if estimated_time else None + else: + estimated_time_per_project = estimated_time + + if estimated_time_per_project: + selected_settings = selected_settings.copy() if selected_settings else {} + selected_settings['estimated_time'] = int(estimated_time_per_project) + + issue_type = selected_settings.get('type') or 'issue' + created_issue = gitlab_api.execute_issue_create( + project_id, + title, + selected_settings.get('labels'), + milestone, + epic, + iteration, + selected_settings.get('weight'), + selected_settings.get('estimated_time'), + issue_type, + ) + print(f"Issue #{created_issue['iid']}: {created_issue['title']} created.") + + if only_issue: + return created_issue + + created_branch = gitlab_api.create_branch(project_id, created_issue) + created_mr = gitlab_api.create_merge_request( + project_id, + created_branch, + created_issue, + selected_settings.get('labels'), + milestone, + ) + print(f"Merge request #{created_mr['iid']}: {created_mr['title']} created.") + print("Run:") + print(" git fetch origin") + print(f" git checkout -b '{created_mr['source_branch']}' 'origin/{created_mr['source_branch']}'") + print("to switch to new branch.") + return created_issue diff --git a/commands/deploy.py b/commands/deploy.py new file mode 100644 index 0000000..4a688dc --- /dev/null +++ b/commands/deploy.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +import datetime + +import config +import gitlab_api +import git_utils + + +def run(): + try: + project_id = gitlab_api.get_project_id() + params = { + "per_page": 50, + "order_by": "updated_at", + "sort": "desc", + } + if config.MAIN_BRANCH: + params["ref"] = config.MAIN_BRANCH + else: + try: + main_branch = git_utils.get_main_branch() + params["ref"] = main_branch + except Exception: + params["ref"] = "main" + + pipelines = gitlab_api.get_pipelines(project_id, params) + if not pipelines: + print("Failed to fetch pipelines.") + return + + production_pipeline = None + for pipeline in pipelines: + jobs = gitlab_api.get_pipeline_jobs(project_id, pipeline['id']) + if not jobs: + continue + + for job in jobs: + job_name = job.get('name', '') + stage = job.get('stage', '') + job_status = job.get('status', '').lower() + + if job_status != 'success': + continue + + project_mapping = config.PRODUCTION_MAPPINGS.get(str(project_id)) + if project_mapping: + expected_stage = project_mapping.get('stage', '').lower() + expected_job = project_mapping.get('job', '').lower() + + if (stage.lower() == expected_stage or + (expected_job and job_name.lower() == expected_job)): + production_pipeline = { + 'pipeline': pipeline, + 'production_job': job, + } + break + else: + print("Didn't find deployment pipeline") + + if production_pipeline: + break + + if not production_pipeline: + print("No production deployment found matching pattern") + return + + pipeline = production_pipeline['pipeline'] + job = production_pipeline['production_job'] + + print(f"Last Production Deployment:") + print(f" Pipeline: #{pipeline['id']} - {pipeline['status']}") + print(f" Job: {job['name']} ({job['status']})") + print(f" Branch/Tag: {pipeline['ref']}") + print(f" Started: {job.get('started_at', 'N/A')}") + print(f" Finished: {job.get('finished_at', 'N/A')}") + if job.get('duration'): + print(f" Duration: {job['duration']} seconds") + else: + print(" Duration: N/A") + print(f" Commit: {pipeline['sha'][:8]}") + print(f" URL: {pipeline['web_url']}") + + if job.get('finished_at'): + try: + finished_time = datetime.datetime.fromisoformat(job['finished_at'].replace('Z', '+00:00')) + time_diff = datetime.datetime.now(datetime.timezone.utc) - finished_time + if time_diff.days > 0: + print(f" {time_diff.days} days ago") + elif time_diff.seconds > 3600: + hours = time_diff.seconds // 3600 + print(f" {hours} hours ago") + else: + minutes = time_diff.seconds // 60 + print(f" {minutes} minutes ago") + except Exception: + pass + + except Exception as e: + print(f"Error fetching last production deploy: {str(e)}") diff --git a/commands/open_mr.py b/commands/open_mr.py new file mode 100644 index 0000000..1eeab3e --- /dev/null +++ b/commands/open_mr.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +import git_utils + + +def run(): + git_utils.open_merge_request_in_browser() diff --git a/commands/report.py b/commands/report.py new file mode 100644 index 0000000..95f632a --- /dev/null +++ b/commands/report.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import config +import gitlab_api +import interactive + + +def run(text, minutes): + incident_project_id = config.config.get('DEFAULT', 'incident_project_id', fallback=None) + if not incident_project_id: + print("Error: incident_project_id not found in config.ini") + print("Please add your incident project ID to configs/config.ini under [DEFAULT] section:") + print("incident_project_id = your_project_id_here") + return + + issue_title = f"Incident Report: {text}" + selected_label = interactive.select_labels('Department') + + incident_settings = { + 'labels': ['incident', 'report'], + 'onlyIssue': True, + 'type': 'incident', + } + + if selected_label: + incident_settings['labels'].append(selected_label) + + try: + iteration = gitlab_api.get_active_iteration() + created_issue = gitlab_api.execute_issue_create( + incident_project_id, + issue_title, + incident_settings.get('labels'), + False, + False, + iteration, + None, + None, + incident_settings.get('type'), + ) + issue_iid = created_issue['iid'] + gitlab_api.close_opened_issue(issue_iid, incident_project_id) + print(f"Incident issue #{issue_iid} created successfully.") + print(f"Title: {issue_title}") + gitlab_api.add_spent_time(incident_project_id, issue_iid, minutes) + print(f"Added {minutes} minutes to issue time tracking.") + except Exception as e: + print(f"Error creating incident issue: {str(e)}") diff --git a/commands/review.py b/commands/review.py new file mode 100644 index 0000000..0132aba --- /dev/null +++ b/commands/review.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +import config +import gitlab_api +import git_utils +import interactive + + +def run(auto_merge=False, select_reviewers=False): + project_id = gitlab_api.get_project_id() + mr = gitlab_api.get_merge_request_for_branch(git_utils.get_current_branch()) + if not mr: + print("No active merge request found for current branch.") + return + + issue_id = mr['description'].replace('"', '').replace('#', '').split()[1] + spent_time = interactive.prompt_spent_time() + gitlab_api.track_issue_time(project_id, issue_id, spent_time) + + reviewers = None + if select_reviewers: + reviewers = interactive.choose_reviewers_manually() + gitlab_api.add_reviewers_to_merge_request(reviewers=reviewers) + + # Run AI code review and post to MR + try: + from ai_code_review import run_review_for_mr + mr_id = gitlab_api.get_active_merge_request_id() + run_review_for_mr(project_id, mr_id) + except Exception as e: + print(f"AI review skipped: {e}") + + if auto_merge: + gitlab_api.set_merge_request_to_auto_merge() diff --git a/commands/summary.py b/commands/summary.py new file mode 100644 index 0000000..7153bb5 --- /dev/null +++ b/commands/summary.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +import config +import git_utils + + +def run(ai=False): + if ai: + commits = git_utils.get_two_weeks_commits(return_output=True) + if not commits: + return + + openai_api_key = config.config.get('DEFAULT', 'OPENAI_API_KEY', fallback=None) + if not openai_api_key: + print("OpenAI API key not set. Skipping AI summary generation.") + return + + try: + import openai + except ImportError: + print("OpenAI package not installed. Please install it using: pip install openai") + return + + openai.api_key = openai_api_key + + try: + response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful assistant that summarizes git commits. Provide a concise, well-organized summary of the main changes and themes."}, + {"role": "user", "content": f"Please summarize these git commits in a clear, bulleted format:\n\n{commits}"} + ] + ) + print("\nAI-Generated Summary of Recent Changes:\n") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error generating AI summary: {e}") + else: + git_utils.get_two_weeks_commits() diff --git a/config.py b/config.py new file mode 100644 index 0000000..527d981 --- /dev/null +++ b/config.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +import configparser +import json +import os + +config = configparser.ConfigParser() +_absolute_config_path = os.path.dirname(os.path.abspath(__file__)) +config_path = os.path.join(_absolute_config_path, 'configs', 'config.ini') +config.read(config_path) + +BASE_URL = config.get('DEFAULT', 'base_url') +API_URL = BASE_URL + '/api/v4' +GROUP_ID = config.get('DEFAULT', 'group_id') +CUSTOM_TEMPLATE = config.get('DEFAULT', 'custom_template') +GITLAB_TOKEN = config.get('DEFAULT', 'GITLAB_TOKEN').strip('"\'') +DELETE_BRANCH = config.get('DEFAULT', 'delete_branch_after_merge').lower() == 'true' +DEVELOPER_EMAIL = config.get('DEFAULT', 'developer_email', fallback=None) +SQUASH_COMMITS = config.get('DEFAULT', 'squash_commits').lower() == 'true' +PRODUCTION_PIPELINE_NAME = config.get('DEFAULT', 'production_pipeline_name', fallback='deploy') +PRODUCTION_JOB_NAME = config.get('DEFAULT', 'production_job_name', fallback=None) +PRODUCTION_REF = config.get('DEFAULT', 'production_ref', fallback=None) +MAIN_BRANCH = 'master' + +# Read templates from json config +_templates_json_path = os.path.join(_absolute_config_path, 'configs', 'templates.json') +if os.path.exists(_templates_json_path): + with open(_templates_json_path, 'r') as f: + _json_config = json.load(f) + TEMPLATES = _json_config.get('templates', []) + REVIEWERS = _json_config.get('reviewers', []) + PRODUCTION_MAPPINGS = _json_config.get('productionMappings', {}) +else: + TEMPLATES = [] + REVIEWERS = [] + PRODUCTION_MAPPINGS = {} diff --git a/gitHappens.py b/gitHappens.py deleted file mode 100755 index 27d47f3..0000000 --- a/gitHappens.py +++ /dev/null @@ -1,845 +0,0 @@ -#!/usr/bin/env python3 -import subprocess -import json -import argparse -import configparser -import inquirer -import datetime -import re -import os -import requests -import sys -import webbrowser - -# Setup config parser and read settings -config = configparser.ConfigParser() -absolute_config_path = os.path.dirname(os.path.abspath(__file__)) -config_path = os.path.join(absolute_config_path, 'configs/config.ini') -config.read(config_path) - -BASE_URL = config.get('DEFAULT', 'base_url') -API_URL = BASE_URL + '/api/v4' -GROUP_ID = config.get('DEFAULT', 'group_id') -CUSTOM_TEMPLATE = config.get('DEFAULT', 'custom_template') -GITLAB_TOKEN = config.get('DEFAULT', 'GITLAB_TOKEN').strip('\"\'') -DELETE_BRANCH = config.get('DEFAULT', 'delete_branch_after_merge').lower() == 'true' -DEVELOPER_EMAIL = config.get('DEFAULT', 'developer_email', fallback=None) -SQUASH_COMMITS = config.get('DEFAULT', 'squash_commits').lower() == 'true' -PRODUCTION_PIPELINE_NAME = config.get('DEFAULT', 'production_pipeline_name', fallback='deploy') -PRODUCTION_JOB_NAME = config.get('DEFAULT', 'production_job_name', fallback=None) -PRODUCTION_REF = config.get('DEFAULT', 'production_ref', fallback=None) -MAIN_BRANCH = 'master' - -# Read templates from json config -with open(os.path.join(absolute_config_path,'configs/templates.json'), 'r') as f: - jsonConfig = json.load(f) -TEMPLATES = jsonConfig['templates'] -REVIEWERS = jsonConfig['reviewers'] -PRODUCTION_MAPPINGS = jsonConfig.get('productionMappings', {}) - -def get_project_id(): - project_link = getProjectLinkFromCurrentDir() - if (project_link == -1): - return enterProjectId() - - allProjects = get_all_projects(project_link) - # Find projects id by project ssh link gathered from repo - matching_id = None - for project in allProjects: - if project.get("ssh_url_to_repo") == project_link: - matching_id = project.get("id") - break - return matching_id - -def get_all_projects(project_link): - url = API_URL + "/projects?membership=true&search=" + project_link.split('/')[-1].split('.')[0] - - headers = { - "PRIVATE-TOKEN": GITLAB_TOKEN - } - - response = requests.get(url, headers=headers) - - if response.status_code == 200: - return response.json() - elif response.status_code == 401: - print("Error: Unauthorized (401). Your GitLab token is probably expired, invalid, or missing required permissions.") - print("Please generate a new token and update your configs/config.ini.") - exit(1) - else: - print(f"Request failed with status code {response.status_code}") - return None - -def getProjectLinkFromCurrentDir(): - try: - cmd = 'git remote get-url origin' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if result.returncode == 0: - output = result.stdout.decode('utf-8').strip() - return output - else: - return -1 - except FileNotFoundError: - return -1 - -def enterProjectId(): - while True: - project_id = input('Please enter the ID of your GitLab project: ') - if project_id: - return project_id - exit('Invalid project ID.') - -def list_milestones(current=False): - cmd = f'glab api /groups/{GROUP_ID}/milestones?state=active' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - milestones = json.loads(result.stdout) - if current: - today = datetime.date.today().strftime('%Y-%m-%d') - active_milestones = [] - for milestone in milestones: - start_date = milestone['start_date'] - due_date = milestone['due_date'] - if start_date and due_date and start_date <= today and due_date >= today: - active_milestones.append(milestone) - active_milestones.sort(key=lambda x: x['due_date']) - return active_milestones[0] - return milestones - -def select_template(): - template_names = [t['name'] for t in TEMPLATES] - template_names.append(CUSTOM_TEMPLATE) - questions = [ - inquirer.List('template', - message="Select template:", - choices=template_names, - ), - ] - answer = inquirer.prompt(questions) - return answer['template'] - -def getIssueSettings(template_name): - if template_name == CUSTOM_TEMPLATE: - return {} - return next((t for t in TEMPLATES if t['name'] == template_name), None) - -def createIssue(title, project_id, milestoneId, epic, iteration, settings): - if settings: - issueType = settings.get('type') or 'issue' - return executeIssueCreate(project_id, title, settings.get('labels'), milestoneId, epic, iteration, settings.get('weight'), settings.get('estimated_time'), issueType) - print("No settings in template") - exit(2) - pass - -def executeIssueCreate(project_id, title, labels, milestoneId, epic, iteration, weight, estimated_time, issue_type='issue'): - labels = ",".join(labels) if type(labels) == list else labels - assignee_id = getAuthorizedUser()['id'] - issue_command = [ - "glab", "api", - f"/projects/{str(project_id)}/issues", - "-f", f'title={title}', - "-f", f'assignee_ids={assignee_id}', - "-f", f'issue_type={issue_type}' - ] - if labels: - issue_command.append("-f") - issue_command.append(f'labels={labels}') - - if weight: - issue_command.append("-f") - issue_command.append(f'weight={str(weight)}') - - if milestoneId: - issue_command.append("-f") - issue_command.append(f'milestone_id={str(milestoneId)}') - - if epic: - epicId = epic['id'] - issue_command.append("-f") - issue_command.append(f'epic_id={str(epicId)}') - - # Set the description, including iteration, estimated time, and other info - description = "" - if iteration: - iterationId = iteration['id'] - description += f"/iteration *iteration:{str(iterationId)} " - - if estimated_time: - description += f"\n/estimate {estimated_time}m " - - issue_command.extend(["-f", f'description={description}']) - - issue_output = subprocess.check_output(issue_command) - return json.loads(issue_output.decode()) - -def select_milestone(milestones): - milestones = [t['title'] for t in milestones] - questions = [ - inquirer.List('milestones', - message="Select milestone:", - choices=milestones, - ), - ] - answer = inquirer.prompt(questions) - return answer['milestones'] - -def getSelectedMilestone(milestone, milestones): - return next((t for t in milestones if t['title'] == milestone), None) - -def get_milestone(manual): - if manual: - milestones = list_milestones() - return getSelectedMilestone(select_milestone(milestones), milestones) - milestone = list_milestones(True) # select active for today - return milestone - -def get_iteration(manual): - if manual: - iterations = list_iterations() - return getSelectedIteration(select_iteration(iterations), iterations) - return getActiveIteration() - -def getSelectedIteration(iteration, iterations): - return next((t for t in iterations if t['start_date'] + ' - ' + t['due_date'] == iteration), None) - -def select_iteration(iterations): - iterations = [t['start_date'] + ' - ' + t['due_date'] for t in iterations] - questions = [ - inquirer.List('iterations', - message="Select iteration:", - choices=iterations, - ), - ] - answer = inquirer.prompt(questions) - return answer['iterations'] - -def list_iterations(): - cmd = f'glab api /groups/{GROUP_ID}/iterations?state=opened' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - iterations = json.loads(result.stdout) - return iterations - -def getActiveIteration(): - iterations = list_iterations() - today = datetime.date.today().strftime('%Y-%m-%d') - active_iterations = [] - for iteration in iterations: - start_date = iteration['start_date'] - due_date = iteration['due_date'] - if start_date and due_date and start_date <= today and due_date >= today: - active_iterations.append(iteration) - active_iterations.sort(key=lambda x: x['due_date']) - return active_iterations[0] - -def getAuthorizedUser(): - output = subprocess.check_output(["glab", "api", "/user"]) - return json.loads(output) - -def list_epics(): - cmd = f'glab api /groups/{GROUP_ID}/epics?per_page=1000&state=opened' - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE) - return json.loads(result.stdout) - -def select_epic(epics): - epics = [t['title'] for t in epics] - search_query = inquirer.prompt([ - inquirer.Text('search_query', message='Search epic:'), - ])['search_query'] - - # Filter choices based on search query - filtered_epics = [c for c in epics if search_query.lower() in c.lower()] - questions = [ - inquirer.List('epics', - message="Select epic:", - choices=filtered_epics, - ), - ] - answer = inquirer.prompt(questions) - return answer['epics'] - -def getSelectedEpic(epic, epics): - return next((t for t in epics if t['title'] == epic), None) - -def get_epic(): - epics = list_epics() - return getSelectedEpic(select_epic(epics), epics) - -def create_branch(project_id, issue): - issueId = str(issue['iid']) - title = re.sub('\\s+', '-', issue['title']).lower() - title = issueId + '-' + title.replace(':','').replace('(',' ').replace(')', '').replace(' ','-') - branch_output = subprocess.check_output(["glab", "api", f"/projects/{str(project_id)}/repository/branches", "-f", f'branch={title}', "-f", f'ref={MAIN_BRANCH}', "-f", f'issue_iid={issueId}']) - return json.loads(branch_output.decode()) - -def create_merge_request(project_id, branch, issue, labels, milestoneId): - issueId = str(issue['iid']) - branch = branch['name'] - title = issue['title'] - assignee_id = getAuthorizedUser()['id'] - labels = ",".join(labels) if type(labels) == list else labels - merge_request_command = [ - "glab", "api", - f"/projects/{str(project_id)}/merge_requests", - "-f", f'title={title}', - "-f", f'description="Closes #{issueId}"', - "-f", f'source_branch={branch}', - "-f", f'target_branch={MAIN_BRANCH}', - "-f", f'issue_iid={issueId}', - "-f", f'assignee_ids={assignee_id}' - ] - - if SQUASH_COMMITS: - merge_request_command.append("-f") - merge_request_command.append("squash=true") - - if DELETE_BRANCH: - merge_request_command.append("-f") - merge_request_command.append("remove_source_branch=true") - - if labels: - merge_request_command.append("-f") - merge_request_command.append(f'labels={labels}') - - if milestoneId: - merge_request_command.append("-f") - merge_request_command.append(f'milestone_id={str(milestoneId)}') - - mr_output = subprocess.check_output(merge_request_command) - return json.loads(mr_output.decode()) - -def startIssueCreation(project_id, title, milestone, epic, iteration, selectedSettings, onlyIssue): - # Prompt for estimated time - estimated_time = inquirer.prompt([ - inquirer.Text('estimated_time', - message='Estimated time to complete this issue (in minutes, optional)', - validate=lambda _, x: x == '' or x.isdigit()) - ])['estimated_time'] - - # If multiple project IDs, split the estimated time - if isinstance(project_id, list): - estimated_time_per_project = int(estimated_time) / len(project_id) if estimated_time else None - else: - estimated_time_per_project = estimated_time - - # Modify settings to include estimated time - if estimated_time_per_project: - selectedSettings = selectedSettings.copy() if selectedSettings else {} - selectedSettings['estimated_time'] = int(estimated_time_per_project) - - createdIssue = createIssue(title, project_id, milestone, epic, iteration, selectedSettings) - print(f"Issue #{createdIssue['iid']}: {createdIssue['title']} created.") - - if onlyIssue: - return createdIssue - - createdBranch = create_branch(project_id, createdIssue) - - createdMergeRequest = create_merge_request(project_id, createdBranch, createdIssue, selectedSettings.get('labels'), milestone) - print(f"Merge request #{createdMergeRequest['iid']}: {createdMergeRequest['title']} created.") - - print("Run:") - print(" git fetch origin") - print(f" git checkout -b '{createdMergeRequest['source_branch']}' 'origin/{createdMergeRequest['source_branch']}'") - print("to switch to new branch.") - - return createdIssue - -def getCurrentBranch(): - return subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], text=True).strip() - -def openMergeRequestInBrowser(): - try: - merge_request_id = getActiveMergeRequestId() - remote_url = subprocess.check_output(["git", "config", "--get", "remote.origin.url"], text=True).strip() - url = BASE_URL + '/' + remote_url.split(':')[1][:-4] - webbrowser.open(f"{url}/-/merge_requests/{merge_request_id}") - except subprocess.CalledProcessError: - return None - -def getActiveMergeRequestId(): - branch_to_find = getCurrentBranch() - return find_merge_request_id_by_branch(branch_to_find) - -def find_merge_request_id_by_branch(branch_name): - return getMergeRequestForBranch(branch_name)['iid'] - -def getMergeRequestForBranch(branchName): - project_id = get_project_id() - api_url = f"{API_URL}/projects/{project_id}/merge_requests" - headers = {"Private-Token": GITLAB_TOKEN} - - params = { - "source_branch": branchName, - } - - response = requests.get(api_url, headers=headers, params=params) - if response.status_code == 200: - merge_requests = response.json() - for mr in merge_requests: - if mr["source_branch"] == branchName: - return mr - else: - print(f"Failed to fetch Merge Requests: {response.status_code} - {response.text}") - return None - -def chooseReviewersManually(): - """Prompt the user to select reviewers manually from the available list, showing names.""" - # Fetch user details for each reviewer ID - reviewer_choices = [] - for reviewer_id in REVIEWERS: - api_url = f"{API_URL}/users/{reviewer_id}" - headers = {"Private-Token": GITLAB_TOKEN} - try: - response = requests.get(api_url, headers=headers) - if response.status_code == 200: - user = response.json() - display_name = f"{user.get('name')} ({user.get('username')})" - reviewer_choices.append((display_name, reviewer_id)) - else: - reviewer_choices.append((str(reviewer_id), reviewer_id)) - except Exception: - reviewer_choices.append((str(reviewer_id), reviewer_id)) - - questions = [ - inquirer.Checkbox( - "selected_reviewers", - message="Select reviewers", - choices=[(name, str(rid)) for name, rid in reviewer_choices], - ) - ] - answers = inquirer.prompt(questions) - if answers and "selected_reviewers" in answers: - return [int(r) for r in answers["selected_reviewers"]] - else: - return [] - -def addReviewersToMergeRequest(reviewers=None): - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - api_url = f"{API_URL}/projects/{project_id}/merge_requests/{mr_id}" - headers = {"Private-Token": GITLAB_TOKEN} - - data = { - "reviewer_ids": reviewers if reviewers is not None else REVIEWERS - } - - requests.put(api_url, headers=headers, json=data) - -def setMergeRequestToAutoMerge(): - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - api_url = f"{API_URL}/projects/{project_id}/merge_requests/{mr_id}/merge" - headers = {"Private-Token": GITLAB_TOKEN} - - data = { - "id": project_id, - "merge_request_iid": mr_id, - "should_remove_source_branch": True, - "merge_when_pipeline_succeeds": True, - "auto_merge_strategy": "merge_when_pipeline_succeeds", - } - - requests.put(api_url, headers=headers, json=data) - -def getMainBranch(): - command = "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'" - output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) - return output.strip() - - -def get_two_weeks_commits(return_output=False): - two_weeks_ago = (datetime.datetime.now() - datetime.timedelta(weeks=2)).strftime('%Y-%m-%d') - - cmd = f'git log --since={two_weeks_ago} --format="%ad - %ae - %s" --date=short | grep -v "Merge branch"' - if (DEVELOPER_EMAIL): - cmd = f'{cmd} | grep {DEVELOPER_EMAIL}' - try: - output = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.DEVNULL, universal_newlines=True).strip() - if output: - if return_output: - return output - print(output) - else: - print("No commits found.") - return "" if return_output else None - except subprocess.CalledProcessError as e: - print(f"No commits were found or an error occurred. (exit status {e.returncode})") - return "" if return_output else None - except FileNotFoundError: - print("Git is not installed or not found in PATH.") - return "" if return_output else None - -def generate_smart_summary(): - commits = get_two_weeks_commits(return_output=True) - if not commits: - return - - # Check if OpenAI API key is set - openai_api_key = config.get('DEFAULT', 'OPENAI_API_KEY', fallback=None) - if not openai_api_key: - print("OpenAI API key not set. Skipping AI summary generation.") - return - - # Dynamically import openai only if API key is present - try: - import openai - except ImportError: - print("OpenAI package not installed. Please install it using: pip install openai") - return - - openai.api_key = openai_api_key - - try: - response = openai.chat.completions.create( - model="gpt-3.5-turbo", - messages=[ - {"role": "system", "content": "You are a helpful assistant that summarizes git commits. Provide a concise, well-organized summary of the main changes and themes."}, - {"role": "user", "content": f"Please summarize these git commits in a clear, bulleted format:\n\n{commits}"} - ] - ) - - print("\n📋 AI-Generated Summary of Recent Changes:\n") - print(response.choices[0].message.content) - except Exception as e: - print(f"Error generating AI summary: {e}") - -def process_report(text, minutes): - # Get the incident project ID from config - try: - incident_project_id = config.get('DEFAULT', 'incident_project_id') - except (configparser.NoOptionError, configparser.NoSectionError): - print("Error: incident_project_id not found in config.ini") - print("Please add your incident project ID to configs/config.ini under [DEFAULT] section:") - print("incident_project_id = your_project_id_here") - return - - issue_title = f"Incident Report: {text}" - - selected_label = selectLabels('Department') - - incident_settings = { - 'labels': ['incident', 'report'], - 'onlyIssue': True, - 'type': 'incident' - } - - if selected_label: - incident_settings['labels'].append(selected_label) - - try: - # Create the incident issue - iteration = getActiveIteration() - created_issue = createIssue(issue_title, incident_project_id, False, False, iteration, incident_settings) - issue_iid = created_issue['iid'] - - closeOpenedIssue(issue_iid, incident_project_id) - print(f"Incident issue #{issue_iid} created successfully.") - print(f"Title: {issue_title}") - - # Add time tracking to the issue - time_tracking_command = [ - "glab", "api", - f"/projects/{incident_project_id}/issues/{issue_iid}/add_spent_time", - "-f", f"duration={minutes}m" - ] - - try: - subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - print(f"Added {minutes} minutes to issue time tracking.") - except subprocess.CalledProcessError as e: - print(f"Error adding time tracking: {str(e)}") - - except Exception as e: - print(f"Error creating incident issue: {str(e)}") - -def closeOpenedIssue(issue_iid, project_id): - issue_command = [ - "glab", "api", - f"/projects/{project_id}/issues/{issue_iid}", - '-X', 'PUT', - '-f', 'state_event=close' - ] - try: - subprocess.run(issue_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except subprocess.CalledProcessError as e: - print(f"Error closing issue: {str(e)}") - -def selectLabels(search, multiple = False): - labels = getLabelsOfGroup(search) - labels = sorted([t['name'] for t in labels]) - - question_type = inquirer.Checkbox if multiple else inquirer.List - questions = [ - question_type( - 'labels', - message="Select one or more department labels:", - choices=labels, - ), - ] - answer = inquirer.prompt(questions) - return answer['labels'] - -def getLabelsOfGroup(search=''): - cmd = f'glab api /groups/{GROUP_ID}/labels?search={search}' - try: - result = subprocess.run(cmd.split(), stdout=subprocess.PIPE, check=True) - return json.loads(result.stdout) - except subprocess.CalledProcessError as e: - print(f"Error getting labels: {str(e)}") - return [] - -def getCurrentIssueId(): - mr = getMergeRequestForBranch(getCurrentBranch()) - return mr['description'].replace('"','').replace('#','').split()[1] - -def track_issue_time(): - # Get the current merge request - try: - project_id = get_project_id() - issue_id = getCurrentIssueId() - except Exception as e: - print(f"Error getting issue details: {str(e)}") - return - - # Prompt for actual time spent - spent_time = inquirer.prompt([ - inquirer.Text('spent_time', - message='How many minutes did you actually spend on this issue?', - validate=lambda _, x: x.isdigit()) - ])['spent_time'] - - # Add spent time to the issue description - time_tracking_command = [ - "glab", "api", - f"/projects/{project_id}/issues/{issue_id}/notes", - "-f", f"body=/spend {spent_time}m" - ] - - try: - subprocess.run(time_tracking_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) - print(f"Added {spent_time} minutes to issue {issue_id} time tracking.") - except subprocess.CalledProcessError as e: - print(f"Error adding time tracking: {str(e)}") - except Exception as e: - print(f"Error tracking issue time: {str(e)}") - -def get_last_production_deploy(): - try: - project_id = get_project_id() - api_url = f"{API_URL}/projects/{project_id}/pipelines" - headers = {"Private-Token": GITLAB_TOKEN} - - # Set up parameters for the pipeline search - params = { - "per_page": 50, - "order_by": "updated_at", - "sort": "desc" - } - - # Add ref filter if specified in config - if MAIN_BRANCH: - params["ref"] = MAIN_BRANCH - else: - # Use main branch if no specific ref is configured - try: - main_branch = getMainBranch() - params["ref"] = main_branch - except: - # Fallback to common main branch names - params["ref"] = "main" - - response = requests.get(api_url, headers=headers, params=params) - - if response.status_code != 200: - print(f"Failed to fetch pipelines: {response.status_code} - {response.text}") - return - - pipelines = response.json() - production_pipeline = None - - # Look for production pipeline by name pattern - for pipeline in pipelines: - # Get pipeline details to check jobs - pipeline_detail_url = f"{API_URL}/projects/{project_id}/pipelines/{pipeline['id']}/jobs" - detail_response = requests.get(pipeline_detail_url, headers=headers) - - if detail_response.status_code == 200: - jobs = detail_response.json() - - # Check if this pipeline contains production deployment - for job in jobs: - job_name = job.get('name', '') - stage = job.get('stage', '') - job_status = job.get('status', '').lower() - - # Only consider successful jobs - if job_status != 'success': - continue - - # Check project-specific mapping first - project_mapping = PRODUCTION_MAPPINGS.get(str(project_id)) - if project_mapping: - expected_stage = project_mapping.get('stage', '').lower() - expected_job = project_mapping.get('job', '').lower() - - if (stage.lower() == expected_stage or - (expected_job and job_name.lower() == expected_job)): - production_pipeline = { - 'pipeline': pipeline, - 'production_job': job - } - break - else: - print('Didn\'t find deployment pipeline') - - if production_pipeline: - break - - if not production_pipeline: - print(f"No production deployment found matching pattern") - return - - # Display the results - pipeline = production_pipeline['pipeline'] - job = production_pipeline['production_job'] - - print(f"🚀 Last Production Deployment:") - print(f" Pipeline: #{pipeline['id']} - {pipeline['status']}") - print(f" Job: {job['name']} ({job['status']})") - print(f" Branch/Tag: {pipeline['ref']}") - print(f" Started: {job.get('started_at', 'N/A')}") - print(f" Finished: {job.get('finished_at', 'N/A')}") - print(f" Duration: {job.get('duration', 'N/A')} seconds" if job.get('duration') else " Duration: N/A") - print(f" Commit: {pipeline['sha'][:8]}") - print(f" URL: {pipeline['web_url']}") - - # Show time since deployment - if job.get('finished_at'): - try: - finished_time = datetime.datetime.fromisoformat(job['finished_at'].replace('Z', '+00:00')) - time_diff = datetime.datetime.now(datetime.timezone.utc) - finished_time - - if time_diff.days > 0: - print(f" ⏰ {time_diff.days} days ago") - elif time_diff.seconds > 3600: - hours = time_diff.seconds // 3600 - print(f" ⏰ {hours} hours ago") - else: - minutes = time_diff.seconds // 60 - print(f" ⏰ {minutes} minutes ago") - except: - pass - - except Exception as e: - print(f"Error fetching last production deploy: {str(e)}") - -def main(): - global MAIN_BRANCH - - parser = argparse.ArgumentParser("Argument description of Git happens") - parser.add_argument("title", nargs="+", help="Title of issue") - parser.add_argument(f"--project_id", type=str, help="Id or URL-encoded path of project") - parser.add_argument("-m", "--milestone", action='store_true', help="Add this flag, if you want to manually select milestone") - parser.add_argument("--no_epic", action="store_true", help="Add this flag if you don't want to pick epic") - parser.add_argument("--no_milestone", action="store_true", help="Add this flag if you don't want to pick milestone") - parser.add_argument("--no_iteration", action="store_true", help="Add this flag if you don't want to pick iteration") - parser.add_argument("--only_issue", action="store_true", help="Add this flag if you don't want to create merge request and branch alongside issue") - parser.add_argument("-am", "--auto_merge", action="store_true", help="Add this flag to review if you want to set merge request to auto merge when pipeline succeeds") - parser.add_argument("--select", action="store_true", help="Manually select reviewers for merge request (interactive)") - - # If no arguments passed, show help - if len(sys.argv) <= 1: - parser.print_help() - exit(1) - - args = parser.parse_args() - if args.title[0] == 'report': - parts = args.title - if len(parts) != 3: - print("Invalid report format. Use: gh report \"text\" minutes") - return - - text = parts[1] - try: - minutes = int(parts[2].strip()) - process_report(text, minutes) - except ValueError: - print("Invalid minutes. Please provide a valid number.") - return - - # So it takes all text until first known argument - title = " ".join(args.title) - - if title == 'open': - openMergeRequestInBrowser() - return - elif title == 'review': - track_issue_time() - reviewers = None - if getattr(args, "select", False): - reviewers = chooseReviewersManually() - addReviewersToMergeRequest(reviewers=reviewers) - - # Run AI code review and post to MR - try: - from ai_code_review import run_review_for_mr - project_id = get_project_id() - mr_id = getActiveMergeRequestId() - run_review_for_mr(project_id, mr_id, GITLAB_TOKEN, API_URL) - except Exception as e: - print(f"AI review skipped: {e}") - - if(args.auto_merge): - setMergeRequestToAutoMerge() - return - elif title == 'summary': - get_two_weeks_commits() - return - elif title == 'summaryAI': - generate_smart_summary() - return - elif title == 'last deploy': - get_last_production_deploy() - return - elif title == 'ai review': - from ai_code_review import run_review - run_review() - return - - # Get settings for issue from template - selectedSettings = getIssueSettings(select_template()) - - # If template is False, ask for each settings - if not len(selectedSettings): - print('Custom selection of issue settings is not supported yet') - pass - - if args.project_id and selectedSettings.get('projectIds'): - print('NOTE: Overwriting project id from argument...') - - project_id = selectedSettings.get('projectIds') or args.project_id or get_project_id() - - milestone = False - if not args.no_milestone: - milestone = get_milestone(args.milestone)['id'] - - iteration = False - if not args.no_iteration: - # manual pick iteration - iteration = get_iteration(True) - - epic = False - if not args.no_epic: - epic = get_epic() - - MAIN_BRANCH = getMainBranch() - - onlyIssue = selectedSettings.get('onlyIssue') or args.only_issue - - if type(project_id) == list: - for id in project_id: - startIssueCreation(id, title, milestone, epic, iteration, selectedSettings, onlyIssue) - else: - startIssueCreation(project_id, title, milestone, epic, iteration, selectedSettings, onlyIssue) - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/git_utils.py b/git_utils.py new file mode 100644 index 0000000..cc2805f --- /dev/null +++ b/git_utils.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import subprocess +import datetime +import webbrowser + +import config + + +class Colors: + CRITICAL = '\033[91m' + HIGH = '\033[93m' + INFO = '\033[96m' + RESET = '\033[0m' + + +def get_project_link_from_current_dir(): + try: + result = subprocess.run( + ['git', 'remote', 'get-url', 'origin'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if result.returncode == 0: + return result.stdout.decode('utf-8').strip() + return -1 + except FileNotFoundError: + return -1 + + +def get_current_branch(): + return subprocess.check_output( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + text=True, + ).strip() + + +def get_main_branch(): + command = "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'" + output = subprocess.check_output( + command, + shell=True, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + return output.strip() + + +def get_two_weeks_commits(return_output=False): + two_weeks_ago = (datetime.datetime.now() - datetime.timedelta(weeks=2)).strftime('%Y-%m-%d') + cmd = f'git log --since={two_weeks_ago} --format="%ad - %ae - %s" --date=short | grep -v "Merge branch"' + if config.DEVELOPER_EMAIL: + cmd = f'{cmd} | grep {config.DEVELOPER_EMAIL}' + try: + output = subprocess.check_output( + cmd, + shell=True, + text=True, + stderr=subprocess.DEVNULL, + universal_newlines=True, + ).strip() + if output: + if return_output: + return output + print(output) + else: + print("No commits found.") + return "" if return_output else None + except subprocess.CalledProcessError as e: + print(f"No commits were found or an error occurred. (exit status {e.returncode})") + return "" if return_output else None + except FileNotFoundError: + print("Git is not installed or not found in PATH.") + return "" if return_output else None + + +def open_merge_request_in_browser(): + import gitlab_api + try: + merge_request_id = gitlab_api.get_active_merge_request_id() + remote_url = subprocess.check_output( + ["git", "config", "--get", "remote.origin.url"], + text=True, + ).strip() + url = config.BASE_URL + '/' + remote_url.split(':')[1][:-4] + webbrowser.open(f"{url}/-/merge_requests/{merge_request_id}") + except subprocess.CalledProcessError: + return None + + +def get_branch_diff(): + """Get the diff of changed files in current branch vs main branch.""" + try: + main_branch = subprocess.check_output( + "git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'", + shell=True, text=True, stderr=subprocess.STDOUT + ).strip() + except subprocess.CalledProcessError: + main_branch = 'master' + + try: + current_branch = subprocess.check_output( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + text=True, + ).strip() + + if current_branch == main_branch: + print(f"{Colors.HIGH}⚠ You are on the main branch ({main_branch}). No changes to review.{Colors.RESET}") + return None + + diff_output = subprocess.check_output( + ['git', 'diff', f'{main_branch}...HEAD'], + text=True, + stderr=subprocess.DEVNULL, + ) + + if not diff_output.strip(): + print(f"{Colors.INFO}ℹ No changes detected between {current_branch} and {main_branch}{Colors.RESET}") + return None + + return diff_output + except subprocess.CalledProcessError as e: + print(f"{Colors.CRITICAL}✗ Error getting git diff: {e}{Colors.RESET}") + return None diff --git a/gitlab_api.py b/gitlab_api.py new file mode 100644 index 0000000..2f8d81c --- /dev/null +++ b/gitlab_api.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +import json +import subprocess + +import requests + +import config + + +def _headers(): + return {"Private-Token": config.GITLAB_TOKEN} + + +def _glab(*args): + """Run a glab CLI command and return parsed JSON output.""" + cmd = ["glab", "api"] + list(args) + result = subprocess.check_output(cmd) + return json.loads(result.decode()) + + +def _glab_raw(*args): + """Run a glab CLI command and return raw stdout bytes.""" + cmd = ["glab", "api"] + list(args) + return subprocess.check_output(cmd) + + +def _api_get(path, params=None): + url = config.API_URL + path + response = requests.get(url, headers=_headers(), params=params) + if response.status_code == 200: + return response.json() + elif response.status_code == 401: + print("Error: Unauthorized (401). Your GitLab token is probably expired, invalid, or missing required permissions.") + print("Please generate a new token and update your configs/config.ini.") + exit(1) + else: + print(f"Request failed with status code {response.status_code}") + return None + + +def _api_post(path, data=None, json_data=None): + url = config.API_URL + path + response = requests.post(url, headers=_headers(), data=data, json=json_data) + return response + + +def _api_put(path, data=None, json_data=None): + url = config.API_URL + path + response = requests.put(url, headers=_headers(), data=data, json=json_data) + return response + + +def get_project_id(): + import git_utils + project_link = git_utils.get_project_link_from_current_dir() + if project_link == -1: + while True: + project_id = input('Please enter the ID of your GitLab project: ') + if project_id: + return project_id + exit('Invalid project ID.') + + all_projects = get_all_projects(project_link) + matching_id = None + for project in all_projects: + if project.get("ssh_url_to_repo") == project_link: + matching_id = project.get("id") + break + return matching_id + + +def get_all_projects(project_link): + search_term = project_link.split('/')[-1].split('.')[0] + path = f"/projects?membership=true&search={search_term}" + return _api_get(path) + + +def get_authorized_user(): + output = subprocess.check_output(["glab", "api", "/user"]) + return json.loads(output) + + +def list_milestones(current=False): + result = subprocess.run( + ["glab", "api", f"/groups/{config.GROUP_ID}/milestones?state=active"], + stdout=subprocess.PIPE, + ) + milestones = json.loads(result.stdout) + if current: + import datetime + today = datetime.date.today().strftime('%Y-%m-%d') + active = [ + m for m in milestones + if m.get('start_date') and m.get('due_date') + and m['start_date'] <= today and m['due_date'] >= today + ] + active.sort(key=lambda x: x['due_date']) + return active[0] if active else None + return milestones + + +def list_iterations(): + result = subprocess.run( + ["glab", "api", f"/groups/{config.GROUP_ID}/iterations?state=opened"], + stdout=subprocess.PIPE, + ) + return json.loads(result.stdout) + + +def get_active_iteration(): + import datetime + iterations = list_iterations() + today = datetime.date.today().strftime('%Y-%m-%d') + active = [ + i for i in iterations + if i.get('start_date') and i.get('due_date') + and i['start_date'] <= today and i['due_date'] >= today + ] + active.sort(key=lambda x: x['due_date']) + return active[0] if active else None + + +def list_epics(): + result = subprocess.run( + ["glab", "api", f"/groups/{config.GROUP_ID}/epics?per_page=1000&state=opened"], + stdout=subprocess.PIPE, + ) + return json.loads(result.stdout) + + +def execute_issue_create(project_id, title, labels, milestone_id, epic, iteration, weight, estimated_time, issue_type='issue'): + labels = ",".join(labels) if isinstance(labels, list) else labels + assignee_id = get_authorized_user()['id'] + cmd = [ + "glab", "api", + f"/projects/{str(project_id)}/issues", + "-f", f'title={title}', + "-f", f'assignee_ids={assignee_id}', + "-f", f'issue_type={issue_type}', + ] + if labels: + cmd.extend(["-f", f'labels={labels}']) + if weight: + cmd.extend(["-f", f'weight={str(weight)}']) + if milestone_id: + cmd.extend(["-f", f'milestone_id={str(milestone_id)}']) + if epic: + cmd.extend(["-f", f'epic_id={str(epic["id"])}']) + + description = "" + if iteration: + description += f"/iteration *iteration:{str(iteration['id'])} " + if estimated_time: + description += f"\n/estimate {estimated_time}m " + + cmd.extend(["-f", f'description={description}']) + output = subprocess.check_output(cmd) + return json.loads(output.decode()) + + +def create_branch(project_id, issue): + import re + issue_id = str(issue['iid']) + title = re.sub(r'\s+', '-', issue['title']).lower() + title = issue_id + '-' + title.replace(':', '').replace('(', ' ').replace(')', '').replace(' ', '-') + output = subprocess.check_output([ + "glab", "api", + f"/projects/{str(project_id)}/repository/branches", + "-f", f'branch={title}', + "-f", f'ref={config.MAIN_BRANCH}', + "-f", f'issue_iid={issue_id}', + ]) + return json.loads(output.decode()) + + +def create_merge_request(project_id, branch, issue, labels, milestone_id): + issue_id = str(issue['iid']) + branch_name = branch['name'] + title = issue['title'] + assignee_id = get_authorized_user()['id'] + labels = ",".join(labels) if isinstance(labels, list) else labels + cmd = [ + "glab", "api", + f"/projects/{str(project_id)}/merge_requests", + "-f", f'title={title}', + "-f", f'description="Closes #{issue_id}"', + "-f", f'source_branch={branch_name}', + "-f", f'target_branch={config.MAIN_BRANCH}', + "-f", f'issue_iid={issue_id}', + "-f", f'assignee_ids={assignee_id}', + ] + if config.SQUASH_COMMITS: + cmd.extend(["-f", "squash=true"]) + if config.DELETE_BRANCH: + cmd.extend(["-f", "remove_source_branch=true"]) + if labels: + cmd.extend(["-f", f'labels={labels}']) + if milestone_id: + cmd.extend(["-f", f'milestone_id={str(milestone_id)}']) + + output = subprocess.check_output(cmd) + return json.loads(output.decode()) + + +def get_merge_request_for_branch(branch_name): + project_id = get_project_id() + path = f"/projects/{project_id}/merge_requests" + params = {"source_branch": branch_name} + response = requests.get(config.API_URL + path, headers=_headers(), params=params) + if response.status_code == 200: + merge_requests = response.json() + for mr in merge_requests: + if mr.get("source_branch") == branch_name: + return mr + else: + print(f"Failed to fetch Merge Requests: {response.status_code} - {response.text}") + return None + + +def get_active_merge_request_id(): + import git_utils + branch = git_utils.get_current_branch() + mr = get_merge_request_for_branch(branch) + return mr['iid'] if mr else None + + +def add_reviewers_to_merge_request(reviewers=None): + project_id = get_project_id() + mr_id = get_active_merge_request_id() + path = f"/projects/{project_id}/merge_requests/{mr_id}" + data = {"reviewer_ids": reviewers if reviewers is not None else config.REVIEWERS} + _api_put(path, json_data=data) + + +def set_merge_request_to_auto_merge(): + project_id = get_project_id() + mr_id = get_active_merge_request_id() + path = f"/projects/{project_id}/merge_requests/{mr_id}/merge" + data = { + "id": project_id, + "merge_request_iid": mr_id, + "should_remove_source_branch": True, + "merge_when_pipeline_succeeds": True, + "auto_merge_strategy": "merge_when_pipeline_succeeds", + } + _api_put(path, json_data=data) + + +def get_labels_of_group(search=''): + cmd = ["glab", "api", f"/groups/{config.GROUP_ID}/labels?search={search}"] + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, check=True) + return json.loads(result.stdout) + except subprocess.CalledProcessError as e: + print(f"Error getting labels: {str(e)}") + return [] + + +def close_opened_issue(issue_iid, project_id): + cmd = [ + "glab", "api", + f"/projects/{project_id}/issues/{issue_iid}", + '-X', 'PUT', + '-f', 'state_event=close', + ] + try: + subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as e: + print(f"Error closing issue: {str(e)}") + + +def track_issue_time(project_id, issue_id, spent_time): + cmd = [ + "glab", "api", + f"/projects/{project_id}/issues/{issue_id}/notes", + "-f", f"body=/spend {spent_time}m", + ] + try: + subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + print(f"Added {spent_time} minutes to issue {issue_id} time tracking.") + except subprocess.CalledProcessError as e: + print(f"Error adding time tracking: {str(e)}") + except Exception as e: + print(f"Error tracking issue time: {str(e)}") + + +def add_spent_time(project_id, issue_iid, minutes): + cmd = [ + "glab", "api", + f"/projects/{project_id}/issues/{issue_iid}/add_spent_time", + "-f", f"duration={minutes}m", + ] + try: + subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as e: + print(f"Error adding time tracking: {str(e)}") + + +def get_pipelines(project_id, params): + path = f"/projects/{project_id}/pipelines" + return _api_get(path, params=params) + + +def get_pipeline_jobs(project_id, pipeline_id): + path = f"/projects/{project_id}/pipelines/{pipeline_id}/jobs" + response = requests.get(config.API_URL + path, headers=_headers()) + if response.status_code == 200: + return response.json() + return None + + +def get_user_details(user_id): + path = f"/users/{user_id}" + response = requests.get(config.API_URL + path, headers=_headers()) + if response.status_code == 200: + return response.json() + return None + + +def get_merge_request_changes(project_id, mr_id): + path = f"/projects/{project_id}/merge_requests/{mr_id}/changes" + response = requests.get(config.API_URL + path, headers=_headers()) + if response.status_code == 200: + return response.json() + else: + print(f"Failed to get MR changes: {response.status_code}") + return None + + +def get_diff_refs(project_id, mr_id): + path = f"/projects/{project_id}/merge_requests/{mr_id}" + response = requests.get(config.API_URL + path, headers=_headers()) + if response.status_code == 200: + mr_data = response.json() + diff_refs = mr_data.get('diff_refs', {}) + return { + 'base_sha': diff_refs.get('base_sha'), + 'head_sha': diff_refs.get('head_sha'), + 'start_sha': diff_refs.get('start_sha'), + } + return None + + +def post_inline_comment(project_id, mr_id, body, diff_refs, file_path, line): + path = f"/projects/{project_id}/merge_requests/{mr_id}/discussions" + data = { + "body": body, + "position": { + "base_sha": diff_refs['base_sha'], + "head_sha": diff_refs['head_sha'], + "start_sha": diff_refs['start_sha'], + "position_type": "text", + "new_path": file_path, + "new_line": line, + }, + } + response = requests.post(config.API_URL + path, headers=_headers(), json=data) + return response.status_code == 201 + + +def post_to_merge_request(comment_body, project_id, mr_id): + path = f"/projects/{project_id}/merge_requests/{mr_id}/notes" + data = {"body": comment_body} + response = requests.post(config.API_URL + path, headers=_headers(), json=data) + if response.status_code == 201: + print("AI review summary posted to merge request") + return True + else: + print(f"Failed to post comment: {response.status_code}") + print(response.text) + return False diff --git a/interactive.py b/interactive.py new file mode 100644 index 0000000..00beeb1 --- /dev/null +++ b/interactive.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +import inquirer + +import config +import gitlab_api + + +def select_milestone(milestones): + milestone_titles = [t['title'] for t in milestones] + questions = [ + inquirer.List('milestones', + message="Select milestone:", + choices=milestone_titles), + ] + answer = inquirer.prompt(questions) + return answer['milestones'] + + +def get_selected_milestone(title, milestones): + return next((t for t in milestones if t['title'] == title), None) + + +def get_milestone(manual=False): + if manual: + milestones = gitlab_api.list_milestones() + return get_selected_milestone(select_milestone(milestones), milestones) + return gitlab_api.list_milestones(current=True) + + +def select_iteration(iterations): + iteration_labels = [t['start_date'] + ' - ' + t['due_date'] for t in iterations] + questions = [ + inquirer.List('iterations', + message="Select iteration:", + choices=iteration_labels), + ] + answer = inquirer.prompt(questions) + return answer['iterations'] + + +def get_selected_iteration(label, iterations): + return next((t for t in iterations if t['start_date'] + ' - ' + t['due_date'] == label), None) + + +def get_iteration(manual=False): + if manual: + iterations = gitlab_api.list_iterations() + return get_selected_iteration(select_iteration(iterations), iterations) + return gitlab_api.get_active_iteration() + + +def select_epic(epics): + epic_titles = [t['title'] for t in epics] + search_query = inquirer.prompt([ + inquirer.Text('search_query', message='Search epic:'), + ])['search_query'] + + filtered_epics = [c for c in epic_titles if search_query.lower() in c.lower()] + questions = [ + inquirer.List('epics', + message="Select epic:", + choices=filtered_epics), + ] + answer = inquirer.prompt(questions) + return answer['epics'] + + +def get_selected_epic(title, epics): + return next((t for t in epics if t['title'] == title), None) + + +def get_epic(): + epics = gitlab_api.list_epics() + return get_selected_epic(select_epic(epics), epics) + + +def choose_reviewers_manually(): + reviewer_choices = [] + for reviewer_id in config.REVIEWERS: + user = gitlab_api.get_user_details(reviewer_id) + if user: + display_name = f"{user.get('name')} ({user.get('username')})" + reviewer_choices.append((display_name, reviewer_id)) + else: + reviewer_choices.append((str(reviewer_id), reviewer_id)) + + questions = [ + inquirer.Checkbox( + "selected_reviewers", + message="Select reviewers", + choices=[(name, str(rid)) for name, rid in reviewer_choices], + ) + ] + answers = inquirer.prompt(questions) + if answers and "selected_reviewers" in answers: + return [int(r) for r in answers["selected_reviewers"]] + return [] + + +def select_labels(search, multiple=False): + labels = gitlab_api.get_labels_of_group(search) + label_names = sorted([t['name'] for t in labels]) + question_type = inquirer.Checkbox if multiple else inquirer.List + questions = [ + question_type( + 'labels', + message="Select one or more department labels:", + choices=label_names, + ), + ] + answer = inquirer.prompt(questions) + return answer['labels'] + + +def prompt_estimated_time(): + return inquirer.prompt([ + inquirer.Text('estimated_time', + message='Estimated time to complete this issue (in minutes, optional)', + validate=lambda _, x: x == '' or x.isdigit()) + ])['estimated_time'] + + +def prompt_spent_time(): + return inquirer.prompt([ + inquirer.Text('spent_time', + message='How many minutes did you actually spend on this issue?', + validate=lambda _, x: x.isdigit()) + ])['spent_time'] diff --git a/main.py b/main.py new file mode 100644 index 0000000..51ba110 --- /dev/null +++ b/main.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +import argparse +import sys + +import config +import gitlab_api +import git_utils +import interactive +import templates +from commands import create_issue, review, open_mr, deploy, report, summary, ai_review + +_parser = argparse.ArgumentParser("Argument description of Git happens") +_parser.add_argument("title", nargs="+", help="Title of issue") +_parser.add_argument("--project_id", type=str, help="Id or URL-encoded path of project") +_parser.add_argument("-m", "--milestone", action='store_true', help="Add this flag, if you want to manually select milestone") +_parser.add_argument("--no_epic", action="store_true", help="Add this flag if you don't want to pick epic") +_parser.add_argument("--no_milestone", action="store_true", help="Add this flag if you don't want to pick milestone") +_parser.add_argument("--no_iteration", action="store_true", help="Add this flag if you don't want to pick iteration") +_parser.add_argument("--only_issue", action="store_true", help="Add this flag if you don't want to create merge request and branch alongside issue") +_parser.add_argument("-am", "--auto_merge", action="store_true", help="Add this flag to review if you want to set merge request to auto merge when pipeline succeeds") +_parser.add_argument("--select", action="store_true", help="Manually select reviewers for merge request (interactive)") + + +def main(): + if len(sys.argv) <= 1: + _parser.print_help() + sys.exit(1) + + args = _parser.parse_args() + + if args.title[0] == 'report': + parts = args.title + if len(parts) != 3: + print("Invalid report format. Use: gh report \"text\" minutes") + return + text = parts[1] + try: + minutes = int(parts[2].strip()) + report.run(text, minutes) + except ValueError: + print("Invalid minutes. Please provide a valid number.") + return + + title = " ".join(args.title) + + if title == 'open': + open_mr.run() + return + elif title == 'review': + review.run(auto_merge=args.auto_merge, select_reviewers=args.select) + return + elif title == 'summary': + summary.run(ai=False) + return + elif title == 'summaryAI': + summary.run(ai=True) + return + elif title == 'last deploy': + deploy.run() + return + elif title == 'ai review': + ai_review.run() + return + + selected_settings = templates.get_issue_settings(templates.select_template()) + + if not len(selected_settings): + print('Custom selection of issue settings is not supported yet') + + if args.project_id and selected_settings.get('projectIds'): + print('NOTE: Overwriting project id from argument...') + + project_id = selected_settings.get('projectIds') or args.project_id or gitlab_api.get_project_id() + + milestone = False + if not args.no_milestone: + milestone = interactive.get_milestone(args.milestone)['id'] + + iteration = False + if not args.no_iteration: + iteration = interactive.get_iteration(manual=True) + + epic = False + if not args.no_epic: + epic = interactive.get_epic() + + config.MAIN_BRANCH = git_utils.get_main_branch() + + only_issue = selected_settings.get('onlyIssue') or args.only_issue + + if isinstance(project_id, list): + for pid in project_id: + create_issue.run(title, pid, milestone, epic, iteration, selected_settings, only_issue) + else: + create_issue.run(title, project_id, milestone, epic, iteration, selected_settings, only_issue) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 7b7a7f0..0fffb0a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ inquirer>=3.1.3 -requests>=2.31.0 \ No newline at end of file +requests>=2.31.0 +pytest>=8.0.0 \ No newline at end of file diff --git a/templates.py b/templates.py new file mode 100644 index 0000000..4e9b8f2 --- /dev/null +++ b/templates.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +import inquirer + +import config + + +def select_template(): + template_names = [t['name'] for t in config.TEMPLATES] + template_names.append(config.CUSTOM_TEMPLATE) + questions = [ + inquirer.List('template', + message="Select template:", + choices=template_names), + ] + answer = inquirer.prompt(questions) + return answer['template'] + + +def get_issue_settings(template_name): + if template_name == config.CUSTOM_TEMPLATE: + return {} + return next((t for t in config.TEMPLATES if t['name'] == template_name), None) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..65140f2 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# tests package diff --git a/tests/test_commands/__init__.py b/tests/test_commands/__init__.py new file mode 100644 index 0000000..42cecd8 --- /dev/null +++ b/tests/test_commands/__init__.py @@ -0,0 +1 @@ +# tests.commands package diff --git a/tests/test_commands/test_create_issue.py b/tests/test_commands/test_create_issue.py new file mode 100644 index 0000000..40f9918 --- /dev/null +++ b/tests/test_commands/test_create_issue.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +import unittest +from unittest.mock import patch, MagicMock + +from commands import create_issue + + +class TestCreateIssue(unittest.TestCase): + @patch('commands.create_issue.interactive.prompt_estimated_time') + @patch('commands.create_issue.gitlab_api.execute_issue_create') + @patch('commands.create_issue.gitlab_api.create_branch') + @patch('commands.create_issue.gitlab_api.create_merge_request') + def test_run_creates_issue_branch_and_mr( + self, mock_mr, mock_branch, mock_issue, mock_time + ): + mock_time.return_value = '60' + mock_issue.return_value = {'iid': 1, 'title': 'Test'} + mock_branch.return_value = {'name': '1-test'} + mock_mr.return_value = {'iid': 2, 'title': 'Test', 'source_branch': '1-test'} + + result = create_issue.run( + 'Test issue', 123, 5, {'id': 10}, {'id': 20}, + {'labels': ['bug'], 'weight': 3}, False + ) + + self.assertEqual(result['iid'], 1) + mock_issue.assert_called_once_with( + 123, 'Test issue', ['bug'], 5, {'id': 10}, {'id': 20}, 3, 60, 'issue' + ) + mock_issue.assert_called_once() + mock_branch.assert_called_once() + mock_mr.assert_called_once() + + @patch('commands.create_issue.interactive.prompt_estimated_time') + @patch('commands.create_issue.gitlab_api.execute_issue_create') + def test_run_only_issue(self, mock_issue, mock_time): + mock_time.return_value = '' + mock_issue.return_value = {'iid': 1, 'title': 'Test'} + + result = create_issue.run( + 'Test issue', 123, 5, None, None, + {'labels': ['bug']}, True + ) + + self.assertEqual(result['iid'], 1) + mock_issue.assert_called_once_with( + 123, 'Test issue', ['bug'], 5, None, None, None, None, 'issue' + ) + + @patch('commands.create_issue.interactive.prompt_estimated_time') + @patch('commands.create_issue.gitlab_api.execute_issue_create') + @patch('commands.create_issue.gitlab_api.create_branch') + @patch('commands.create_issue.gitlab_api.create_merge_request') + def test_run_time_splitting(self, mock_mr, mock_branch, mock_issue, mock_time): + mock_time.return_value = '120' + mock_issue.return_value = {'iid': 1, 'title': 'Test'} + mock_branch.return_value = {'name': '1-test'} + mock_mr.return_value = {'iid': 2, 'title': 'Test', 'source_branch': '1-test'} + + create_issue.run( + 'Test issue', [111, 222], 5, None, None, + {'labels': ['bug']}, False + ) + + self.assertEqual(mock_issue.call_count, 1) + call_args = mock_issue.call_args[0] + self.assertEqual(call_args[7], 60) # estimated_time split across 2 projects + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_git_utils.py b/tests/test_git_utils.py new file mode 100644 index 0000000..73f5aaf --- /dev/null +++ b/tests/test_git_utils.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import unittest +from unittest.mock import patch, MagicMock + +import git_utils + + +class TestGitUtils(unittest.TestCase): + @patch('git_utils.subprocess.run') + def test_get_project_link_from_current_dir_success(self, mock_run): + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = b'git@gitlab.com:user/repo.git\n' + result = git_utils.get_project_link_from_current_dir() + self.assertEqual(result, 'git@gitlab.com:user/repo.git') + + @patch('git_utils.subprocess.run') + def test_get_project_link_from_current_dir_failure(self, mock_run): + mock_run.return_value.returncode = 1 + result = git_utils.get_project_link_from_current_dir() + self.assertEqual(result, -1) + + @patch('git_utils.subprocess.check_output') + def test_get_current_branch(self, mock_output): + mock_output.return_value = 'feature-branch\n' + result = git_utils.get_current_branch() + self.assertEqual(result, 'feature-branch') + + @patch('git_utils.subprocess.check_output') + def test_get_main_branch(self, mock_output): + mock_output.return_value = 'master\n' + result = git_utils.get_main_branch() + self.assertEqual(result, 'master') + + @patch('git_utils.subprocess.check_output') + def test_get_two_weeks_commits(self, mock_output): + mock_output.return_value = '2024-01-01 - user@example.com - commit msg\n' + result = git_utils.get_two_weeks_commits(return_output=True) + self.assertIn('commit msg', result) + + @patch('git_utils.webbrowser.open') + @patch('git_utils.subprocess.check_output') + @patch('gitlab_api.get_active_merge_request_id') + def test_open_merge_request_in_browser(self, mock_mr_id, mock_git, mock_open_browser): + mock_mr_id.return_value = 5 + mock_git.return_value = 'git@gitlab.com:user/repo.git\n' + git_utils.open_merge_request_in_browser() + mock_open_browser.assert_called_once() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_gitlab_api.py b/tests/test_gitlab_api.py new file mode 100644 index 0000000..ad9218e --- /dev/null +++ b/tests/test_gitlab_api.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +import json +import unittest +from unittest.mock import patch, MagicMock + +import gitlab_api + + +class TestGitlabApi(unittest.TestCase): + @patch('gitlab_api.subprocess.check_output') + def test_get_authorized_user(self, mock_output): + mock_output.return_value = json.dumps({'id': 1, 'username': 'test'}).encode() + result = gitlab_api.get_authorized_user() + self.assertEqual(result['id'], 1) + mock_output.assert_called_once_with(["glab", "api", "/user"]) + + @patch('gitlab_api.requests.get') + def test_api_get_success(self, mock_get): + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = [{'id': 1}] + result = gitlab_api._api_get('/projects') + self.assertEqual(result, [{'id': 1}]) + + @patch('gitlab_api.requests.get') + def test_api_get_401(self, mock_get): + mock_get.return_value.status_code = 401 + with self.assertRaises(SystemExit): + gitlab_api._api_get('/projects') + + @patch('gitlab_api.requests.put') + def test_api_put(self, mock_put): + mock_put.return_value.status_code = 200 + response = gitlab_api._api_put('/projects/1/merge_requests/2', json_data={'foo': 'bar'}) + self.assertEqual(response.status_code, 200) + + @patch('gitlab_api.requests.post') + def test_post_to_merge_request(self, mock_post): + mock_post.return_value.status_code = 201 + result = gitlab_api.post_to_merge_request('body', 1, 2) + self.assertTrue(result) + + @patch('gitlab_api.requests.post') + def test_post_inline_comment(self, mock_post): + mock_post.return_value.status_code = 201 + result = gitlab_api.post_inline_comment( + 1, 2, 'body text', {'base_sha': 'a', 'head_sha': 'b', 'start_sha': 'c'}, 'file.py', 10 + ) + self.assertTrue(result) + + @patch('gitlab_api.requests.get') + def test_get_diff_refs(self, mock_get): + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = { + 'diff_refs': {'base_sha': 'a', 'head_sha': 'b', 'start_sha': 'c'} + } + result = gitlab_api.get_diff_refs(1, 2) + self.assertEqual(result, {'base_sha': 'a', 'head_sha': 'b', 'start_sha': 'c'}) + + @patch('gitlab_api.requests.get') + def test_get_merge_request_changes(self, mock_get): + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = {'changes': []} + result = gitlab_api.get_merge_request_changes(1, 2) + self.assertEqual(result, {'changes': []}) + + @patch('gitlab_api.get_authorized_user') + @patch('gitlab_api.subprocess.check_output') + def test_execute_issue_create(self, mock_output, mock_user): + mock_user.return_value = {'id': 99} + mock_output.return_value = json.dumps({'iid': 42, 'title': 'Test'}).encode() + result = gitlab_api.execute_issue_create( + 1, 'Test', ['bug'], 5, None, None, 3, 60, 'issue' + ) + self.assertEqual(result['iid'], 42) + + @patch('gitlab_api.subprocess.check_output') + def test_create_branch(self, mock_output): + mock_output.return_value = json.dumps({'name': '42-test-branch'}).encode() + result = gitlab_api.create_branch(1, {'iid': 42, 'title': 'Test issue'}) + self.assertEqual(result['name'], '42-test-branch') + + @patch('gitlab_api.get_authorized_user') + @patch('gitlab_api.subprocess.check_output') + def test_create_merge_request(self, mock_output, mock_user): + mock_user.return_value = {'id': 99} + mock_output.return_value = json.dumps({'iid': 10, 'title': 'Test'}).encode() + result = gitlab_api.create_merge_request( + 1, {'name': 'branch'}, {'iid': 1, 'title': 'Test'}, ['bug'], 5 + ) + self.assertEqual(result['iid'], 10) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..abd8032 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +import sys +import unittest +from unittest.mock import patch, MagicMock + +import main + + +class TestMain(unittest.TestCase): + @patch('main._parser.print_help') + @patch.object(sys, 'argv', ['main.py']) + def test_no_args_shows_help(self, mock_help): + with self.assertRaises(SystemExit) as ctx: + main.main() + self.assertEqual(ctx.exception.code, 1) + mock_help.assert_called_once() + + @patch('commands.open_mr.run') + @patch.object(sys, 'argv', ['main.py', 'open']) + def test_open_command(self, mock_open_mr): + main.main() + mock_open_mr.assert_called_once() + + @patch('commands.review.run') + @patch.object(sys, 'argv', ['main.py', 'review']) + def test_review_command(self, mock_review): + main.main() + mock_review.assert_called_once_with(auto_merge=False, select_reviewers=False) + + @patch('commands.review.run') + @patch.object(sys, 'argv', ['main.py', 'review', '--auto_merge', '--select']) + def test_review_with_flags(self, mock_review): + main.main() + mock_review.assert_called_once_with(auto_merge=True, select_reviewers=True) + + @patch('commands.summary.run') + @patch.object(sys, 'argv', ['main.py', 'summary']) + def test_summary_command(self, mock_summary): + main.main() + mock_summary.assert_called_once_with(ai=False) + + @patch('commands.summary.run') + @patch.object(sys, 'argv', ['main.py', 'summaryAI']) + def test_summary_ai_command(self, mock_summary): + main.main() + mock_summary.assert_called_once_with(ai=True) + + @patch('commands.deploy.run') + @patch.object(sys, 'argv', ['main.py', 'last', 'deploy']) + def test_deploy_command(self, mock_deploy): + main.main() + mock_deploy.assert_called_once() + + @patch('commands.report.run') + @patch.object(sys, 'argv', ['main.py', 'report', 'incident', '30']) + def test_report_command(self, mock_report): + main.main() + mock_report.assert_called_once_with('incident', 30) + + @patch('gitlab_api.get_project_id') + @patch('commands.create_issue.run') + @patch('main.templates.select_template') + @patch('main.templates.get_issue_settings') + @patch('main.interactive.get_milestone') + @patch('main.interactive.get_iteration') + @patch('main.interactive.get_epic') + @patch('main.git_utils.get_main_branch') + @patch.object(sys, 'argv', ['main.py', 'My', 'Issue']) + def test_issue_creation(self, mock_branch, mock_epic, mock_iter, mock_milestone, mock_settings, mock_select, mock_create, mock_pid): + mock_pid.return_value = '12345' + mock_select.return_value = 'Bug easy' + mock_settings.return_value = {'labels': ['Bug'], 'weight': 1} + mock_milestone.return_value = {'id': 5} + mock_iter.return_value = {'id': 10} + mock_epic.return_value = {'id': 20} + mock_branch.return_value = 'master' + mock_create.return_value = {'iid': 1} + + main.main() + mock_create.assert_called_once() + mock_create.assert_called_once_with( + 'My Issue', + '12345', + 5, + {'id': 20}, + {'id': 10}, + {'labels': ['Bug'], 'weight': 1}, + False, + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_templates.py b/tests/test_templates.py new file mode 100644 index 0000000..d93d5c0 --- /dev/null +++ b/tests/test_templates.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +import unittest +from unittest.mock import patch, MagicMock + +import templates + + +class TestTemplates(unittest.TestCase): + @patch('templates.config.TEMPLATES', [ + {'name': 'Bug easy', 'weight': 1, 'labels': ['Bug']}, + {'name': 'Feature', 'weight': 3, 'labels': ['feature::Confirmed']}, + ]) + @patch('templates.config.CUSTOM_TEMPLATE', 'Custom') + @patch('inquirer.prompt') + def test_select_template(self, mock_prompt): + mock_prompt.return_value = {'template': 'Bug easy'} + result = templates.select_template() + self.assertEqual(result, 'Bug easy') + + @patch('templates.config.TEMPLATES', [ + {'name': 'Bug easy', 'weight': 1, 'labels': ['Bug']}, + ]) + @patch('templates.config.CUSTOM_TEMPLATE', 'Custom') + def test_get_issue_settings_found(self): + result = templates.get_issue_settings('Bug easy') + self.assertEqual(result['weight'], 1) + + @patch('templates.config.TEMPLATES', []) + @patch('templates.config.CUSTOM_TEMPLATE', 'Custom') + def test_get_issue_settings_custom(self): + result = templates.get_issue_settings('Custom') + self.assertEqual(result, {}) + + @patch('templates.config.TEMPLATES', []) + @patch('templates.config.CUSTOM_TEMPLATE', 'Custom') + def test_get_issue_settings_not_found(self): + result = templates.get_issue_settings('Missing') + self.assertIsNone(result) + + +if __name__ == '__main__': + unittest.main()