#!/usr/bin/env python3 # # Copyright (c) 2024 Red Hat Inc. # # SPDX-License-Identifier: Apache-2.0 """ Keeps checking the current PR until all required jobs pass Env variables: * REQUIRED_JOBS: comma separated list of required jobs (in form of "$workflow / $job") * REQUIRED_REGEXPS: comma separated list of regexps for required jobs * COMMIT_HASH: Full commit hash we want to be watching * GITHUB_REPOSITORY: Github repository (user/repo) Sample execution (GH token can be excluded): GITHUB_TOKEN="..." REQUIRED_JOBS="skipper / skipper" REQUIRED_REGEXPS=".*" REQUIRED_LABELS="ok-to-test;bar" COMMIT_HASH=b8382cea886ad9a8f77d237bcfc0eba0c98775dd GITHUB_REPOSITORY=kata-containers/kata-containers GH_PR_NUMBER=123 python3 jobs.py """ import json import os import re import sys import time import requests PASS = 0 FAIL = 1 RUNNING = 127 _GH_HEADERS = {"Accept": "application/vnd.github.v3+json"} if os.environ.get("GITHUB_TOKEN"): _GH_HEADERS["Authorization"] = f"Bearer {os.environ['GITHUB_TOKEN']}" _GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}" _GH_RUNS_URL = f"{_GH_API_URL}/actions/runs" _GH_SUMMARY_URL = ( f"{os.environ.get('GITHUB_SERVER_URL')}/" f"{os.environ.get('GITHUB_REPOSITORY')}/actions/runs/" f"{os.environ.get('GITHUB_RUN_ID')}" ) if os.environ.get("DEBUG", "false") == "true": DEBUG_DIR = os.path.join(os.path.abspath('.'), str(int(time.time()))) os.makedirs(DEBUG_DIR) else: DEBUG_DIR = None if os.environ.get("DEBUG_INPUT", "") == "": DEBUG_INPUT = None else: DEBUG_INPUT = os.path.abspath(os.environ["DEBUG_INPUT"]) class Checker: """Object to keep watching required GH action workflows""" def __init__(self): self.latest_commit_sha = os.getenv("COMMIT_HASH") self.pr_number = os.getenv("GH_PR_NUMBER") required_labels = os.getenv("REQUIRED_LABELS") if required_labels: self.required_labels = set(required_labels.split(";")) else: self.required_labels = [] required_jobs = os.getenv("REQUIRED_JOBS") if required_jobs: required_jobs = required_jobs.split(";") else: required_jobs = [] required_regexps = os.getenv("REQUIRED_REGEXPS") self.required_regexps = [] # TODO: Add way to specify minimum amount of tests # (eg. via \d+: prefix) and check it in status if required_regexps: for regexp in required_regexps.split(";"): self.required_regexps.append(re.compile(regexp)) if not required_jobs and not self.required_regexps: raise RuntimeError("No REQUIRED_JOBS or REQUIRED_REGEXPS defined") # Set all required jobs as EXPECTED to enforce waiting for them self.results = {job: {"status": "EXPECTED", "run_id": -1} for job in required_jobs} def record(self, workflow, job): """ Records a job run """ job_name = f"{workflow} / {job['name']}" if job_name not in self.results: for re_job in self.required_regexps: # Required job via regexp if re_job.match(job_name): break else: # Not a required job return elif job['run_id'] < self.results[job_name]['run_id']: # Newer results already stored print(f"older {job_name} - {job['status']} {job['conclusion']} " f"{job['id']} (newer_id={self.results[job_name]['id']})", file=sys.stderr) return print(f"{job_name} - {job['status']} {job['conclusion']} {job['id']}", file=sys.stderr) self.results[job_name] = job @staticmethod def _job_status(job): """Map job status to our status""" if job["status"] != "completed": return RUNNING if job["conclusion"] != "success": return job['conclusion'] return PASS def status(self): """ :returns: 0 - all tests passing; 127 - no failures but some tests in progress; 1 - any failure """ running = False if not self.results: # No results reported so far return FAIL for job in self.results.values(): status = self._job_status(job) if status == RUNNING: running |= True elif status != PASS: # Status not passed return FAIL if running: return RUNNING return PASS def __str__(self): """Sumarize the current status""" good = [] bad = [] warn = [] for name, job in self.results.items(): status = self._job_status(job) url = job.get("html_url", "") if status == RUNNING: warn.append(f"WARN: {name} - Still running {url}") elif status == PASS: good.append(f"PASS: {name} - success {url}") else: bad.append(f"FAIL: {name} - Not passed - {status} {url}") out = '\n'.join(sorted(good) + sorted(warn) + sorted(bad)) stat = self.status() if stat == RUNNING: status = "Some jobs are still running." elif stat == PASS: status = "All required jobs passed" elif not self.results: status = ("No required jobs for regexps: " + ";".join([_.pattern for _ in self.required_regexps])) else: status = "Not all required jobs passed!" return f"{out}\n\n{status}" def write_step_summary(self): """Write WARN/FAIL results to GitHub Step Summary if available""" def _section(name, items, icon='*'): """Format a MD section""" lines = [] lines.append(f"
\n

{name}

\n") if not items: lines.append("None") else: for item in items: lines.append(f"{icon} {item}") lines.append("
\n") return lines summary_path = os.environ.get("GITHUB_STEP_SUMMARY") if not summary_path: return lines = [] passing = [] failing = [] running = [] for name, job in self.results.items(): status = self._job_status(job) url = job.get("html_url", "") if status == RUNNING: running.append(f"[{name}]({url})" if url else name) elif status == PASS: passing.append(f"[{name}]({url})" if url else name) else: link = f"[{name}]({url})" if url else name failing.append(f"{link} ({status})") lines.extend(_section("Failing checks", failing, "❌")) lines.extend(_section("In progress checks", running, "🔶")) lines.extend(_section("Successful checks", passing, "🟢")) summary = [f"Total: {len(self.results)}, " f"Passed: {len(passing)}, " f"Failed: {len(failing)}, Running: {len(running)}"] lines.extend(_section("Summary", summary)) with open(summary_path, "w", encoding="utf8") as summary: summary.write("\n".join(lines) + "\n") print(f"Human-readable summary: {_GH_SUMMARY_URL}") def fetch_json_from_url(self, url, task, params=None): """Fetches URL and reports json output""" print(url, file=sys.stderr) if DEBUG_INPUT: with open(f"{os.path.join(DEBUG_INPUT, task)}.json", "r", encoding="utf8") as inp: output = json.load(inp) else: response = requests.get(url, headers=_GH_HEADERS, params=params, timeout=60) response.raise_for_status() output = response.json() if DEBUG_DIR: with open(f"{os.path.join(DEBUG_DIR, task)}.json", "w", encoding="utf8") as out: json.dump(output, out) return output def paginated_fetch(self, url, items_key, task, params=None, per_page=30): """ Fetches all items from a paginated GitHub API endpoint. :param url: The base URL to fetch from :param items_key: The key in the response JSON containing the items list :param task: Task name for debug file naming :param params: Additional query parameters :param per_page: Number of items per page (default 30) :returns: List of all items across all pages """ items = [] page = 1 total_count = -1 while True: page_params = {"per_page": per_page, "page": page} if params: page_params.update(params) output = self.fetch_json_from_url( url, f"{task}_page{page}", page_params) items.extend(output[items_key]) total_count = max(total_count, output["total_count"]) if len(items) >= total_count: break page += 1 return items def get_jobs_for_workflow_run(self, run_id): """Get jobs from a workflow id""" url = f"{_GH_RUNS_URL}/{run_id}/jobs" return self.paginated_fetch( url, "jobs", f"get_jobs_for_workflow_run__{run_id}") def check_workflow_runs_status(self, attempt): """ Checks if all required jobs passed :returns: 0 - all passing; 1 - any failure; 127 some jobs running """ workflow_runs = self.paginated_fetch( _GH_RUNS_URL, "workflow_runs", f"check_workflow_runs_status_{attempt}", {"head_sha": self.latest_commit_sha}) for run in workflow_runs: jobs = self.get_jobs_for_workflow_run(run["id"]) for job in jobs: self.record(run["name"], job) print(self) self.write_step_summary() return self.status() def wait_for_required_tests(self): """ Wait for all required tests to pass or failfast :return: 0 - all passing; 1 - any failure """ i = 0 while True: i += 1 ret = self.check_workflow_runs_status(i) if ret == RUNNING: running_jobs = len([name for name, job in self.results.items() if self._job_status(job) == RUNNING]) print(f"{running_jobs} jobs are still running...") time.sleep(180) continue return ret def check_required_labels(self): """ Check if all expected labels are present :return: True on success, False on failure """ if not self.required_labels: # No required labels, exit return True if not self.pr_number: print("The GH_PR_NUMBER not specified, skipping the " f"required-labels-check ({self.required_labels})") return True response = requests.get( f"{_GH_API_URL}/issues/{self.pr_number}", headers=_GH_HEADERS, timeout=60 ) response.raise_for_status() labels = set(_["name"] for _ in response.json()["labels"]) if self.required_labels.issubset(labels): return True print(f"To run all required tests the PR{self.pr_number} must have " f"{', '.join(self.required_labels.difference(labels))} labels " "set.") return False def run(self): """ Keep checking the PR until all required jobs finish :returns: 0 on success; 1 on check failure; 2 when PR missing labels """ print(f"Gatekeeper for project={os.environ['GITHUB_REPOSITORY']} and " f"SHA={self.latest_commit_sha} PR={self.pr_number}") if not self.check_required_labels(): sys.exit(2) sys.exit(self.wait_for_required_tests()) if __name__ == "__main__": Checker().run()