mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-15 22:53:43 +00:00
to simplify gatekeeper development add support for DEBUG_INPUT which can be used to report content from files gathered in DEBUG run. Signed-off-by: Lukáš Doktor <ldoktor@redhat.com>
272 lines
9.3 KiB
Python
272 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (c) 2024 Red Hat Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
Keeps checking the current PR until all required jobs pass
|
|
Env variables:
|
|
* REQUIRED_JOBS: comma separated list of required jobs (in form of
|
|
"$workflow / $job")
|
|
* REQUIRED_REGEXPS: comma separated list of regexps for required jobs
|
|
* COMMIT_HASH: Full commit hash we want to be watching
|
|
* GITHUB_REPOSITORY: Github repository (user/repo)
|
|
Sample execution (GH token can be excluded):
|
|
GITHUB_TOKEN="..." REQUIRED_JOBS="skipper / skipper"
|
|
REQUIRED_REGEXPS=".*" REQUIRED_LABELS="ok-to-test;bar"
|
|
COMMIT_HASH=b8382cea886ad9a8f77d237bcfc0eba0c98775dd
|
|
GITHUB_REPOSITORY=kata-containers/kata-containers
|
|
GH_PR_NUMBER=123 python3 jobs.py
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import requests
|
|
|
|
|
|
PASS = 0
|
|
FAIL = 1
|
|
RUNNING = 127
|
|
|
|
|
|
_GH_HEADERS = {"Accept": "application/vnd.github.v3+json"}
|
|
if os.environ.get("GITHUB_TOKEN"):
|
|
_GH_HEADERS["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}"
|
|
_GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}"
|
|
_GH_RUNS_URL = f"{_GH_API_URL}/actions/runs"
|
|
if os.environ.get("DEBUG", "false") == "true":
|
|
DEBUG_DIR = os.path.join(os.path.abspath('.'), str(int(time.time())))
|
|
os.makedirs(DEBUG_DIR)
|
|
else:
|
|
DEBUG_DIR = None
|
|
if os.environ.get("DEBUG_INPUT", "") == "":
|
|
DEBUG_INPUT = None
|
|
else:
|
|
DEBUG_INPUT = os.path.abspath(os.environ["DEBUG_INPUT"])
|
|
|
|
|
|
class Checker:
|
|
"""Object to keep watching required GH action workflows"""
|
|
def __init__(self):
|
|
self.latest_commit_sha = os.getenv("COMMIT_HASH")
|
|
self.pr_number = os.getenv("GH_PR_NUMBER")
|
|
required_labels = os.getenv("REQUIRED_LABELS")
|
|
if required_labels:
|
|
self.required_labels = set(required_labels.split(";"))
|
|
else:
|
|
self.required_labels = []
|
|
required_jobs = os.getenv("REQUIRED_JOBS")
|
|
if required_jobs:
|
|
required_jobs = required_jobs.split(";")
|
|
else:
|
|
required_jobs = []
|
|
required_regexps = os.getenv("REQUIRED_REGEXPS")
|
|
self.required_regexps = []
|
|
# TODO: Add way to specify minimum amount of tests
|
|
# (eg. via \d+: prefix) and check it in status
|
|
if required_regexps:
|
|
for regexp in required_regexps.split(";"):
|
|
self.required_regexps.append(re.compile(regexp))
|
|
if not required_jobs and not self.required_regexps:
|
|
raise RuntimeError("No REQUIRED_JOBS or REQUIRED_REGEXPS defined")
|
|
# Set all required jobs as EXPECTED to enforce waiting for them
|
|
self.results = {job: {"status": "EXPECTED", "run_id": -1}
|
|
for job in required_jobs}
|
|
|
|
def record(self, workflow, job):
|
|
"""
|
|
Records a job run
|
|
"""
|
|
job_name = f"{workflow} / {job['name']}"
|
|
if job_name not in self.results:
|
|
for re_job in self.required_regexps:
|
|
# Required job via regexp
|
|
if re_job.match(job_name):
|
|
break
|
|
else:
|
|
# Not a required job
|
|
return
|
|
elif job['run_id'] < self.results[job_name]['run_id']:
|
|
# Newer results already stored
|
|
print(f"older {job_name} - {job['status']} {job['conclusion']} "
|
|
f"{job['id']} (newer_id={self.results[job_name]['id']})", file=sys.stderr)
|
|
return
|
|
print(f"{job_name} - {job['status']} {job['conclusion']} {job['id']}",
|
|
file=sys.stderr)
|
|
self.results[job_name] = job
|
|
|
|
@staticmethod
|
|
def _job_status(job):
|
|
"""Map job status to our status"""
|
|
if job["status"] != "completed":
|
|
return RUNNING
|
|
if job["conclusion"] != "success":
|
|
return job['conclusion']
|
|
return PASS
|
|
|
|
def status(self):
|
|
"""
|
|
:returns: 0 - all tests passing; 127 - no failures but some
|
|
tests in progress; 1 - any failure
|
|
"""
|
|
running = False
|
|
if not self.results:
|
|
# No results reported so far
|
|
return FAIL
|
|
for job in self.results.values():
|
|
status = self._job_status(job)
|
|
if status == RUNNING:
|
|
running |= True
|
|
elif status != PASS:
|
|
# Status not passed
|
|
return FAIL
|
|
if running:
|
|
return RUNNING
|
|
return PASS
|
|
|
|
def __str__(self):
|
|
"""Sumarize the current status"""
|
|
good = []
|
|
bad = []
|
|
warn = []
|
|
for name, job in self.results.items():
|
|
status = self._job_status(job)
|
|
if status == RUNNING:
|
|
warn.append(f"WARN: {name} - Still running")
|
|
elif status == PASS:
|
|
good.append(f"PASS: {name} - success")
|
|
else:
|
|
bad.append(f"FAIL: {name} - Not passed - {status}")
|
|
out = '\n'.join(sorted(good) + sorted(warn) + sorted(bad))
|
|
stat = self.status()
|
|
if stat == RUNNING:
|
|
status = "Some jobs are still running."
|
|
elif stat == PASS:
|
|
status = "All required jobs passed"
|
|
elif not self.results:
|
|
status = ("No required jobs for regexps: " +
|
|
";".join([_.pattern for _ in self.required_regexps]))
|
|
else:
|
|
status = "Not all required jobs passed!"
|
|
return f"{out}\n\n{status}"
|
|
|
|
def fetch_json_from_url(self, url, task, params=None):
|
|
"""Fetches URL and reports json output"""
|
|
print(url, file=sys.stderr)
|
|
if DEBUG_INPUT:
|
|
with open(f"{os.path.join(DEBUG_INPUT, task)}.json", "r",
|
|
encoding="utf8") as inp:
|
|
output = json.load(inp)
|
|
else:
|
|
response = requests.get(url, headers=_GH_HEADERS, params=params,
|
|
timeout=60)
|
|
response.raise_for_status()
|
|
output = response.json()
|
|
if DEBUG_DIR:
|
|
with open(f"{os.path.join(DEBUG_DIR, task)}.json", "w",
|
|
encoding="utf8") as out:
|
|
json.dump(output, out)
|
|
return output
|
|
|
|
def get_jobs_for_workflow_run(self, run_id):
|
|
"""Get jobs from a workflow id"""
|
|
total_count = -1
|
|
jobs = []
|
|
page = 1
|
|
while True:
|
|
url = f"{_GH_RUNS_URL}/{run_id}/jobs?per_page=100&page={page}"
|
|
output = self.fetch_json_from_url(
|
|
url, f"get_jobs_for_workflow_run__{run_id}")
|
|
jobs.extend(output["jobs"])
|
|
total_count = max(total_count, output["total_count"])
|
|
if len(jobs) >= total_count:
|
|
break
|
|
page += 1
|
|
return jobs
|
|
|
|
def check_workflow_runs_status(self, attempt):
|
|
"""
|
|
Checks if all required jobs passed
|
|
|
|
:returns: 0 - all passing; 1 - any failure; 127 some jobs running
|
|
"""
|
|
# TODO: Check if we need pagination here as well
|
|
response = self.fetch_json_from_url(
|
|
_GH_RUNS_URL, f"check_workflow_runs_status_{attempt}",
|
|
{"head_sha": self.latest_commit_sha})
|
|
workflow_runs = response["workflow_runs"]
|
|
for run in workflow_runs:
|
|
jobs = self.get_jobs_for_workflow_run(run["id"])
|
|
for job in jobs:
|
|
self.record(run["name"], job)
|
|
print(self)
|
|
return self.status()
|
|
|
|
def wait_for_required_tests(self):
|
|
"""
|
|
Wait for all required tests to pass or failfast
|
|
|
|
:return: 0 - all passing; 1 - any failure
|
|
"""
|
|
i = 0
|
|
while True:
|
|
i += 1
|
|
ret = self.check_workflow_runs_status(i)
|
|
if ret == RUNNING:
|
|
running_jobs = len([name
|
|
for name, job in self.results.items()
|
|
if self._job_status(job) == RUNNING])
|
|
print(f"{running_jobs} jobs are still running...")
|
|
time.sleep(180)
|
|
continue
|
|
return ret
|
|
|
|
def check_required_labels(self):
|
|
"""
|
|
Check if all expected labels are present
|
|
|
|
:return: True on success, False on failure
|
|
"""
|
|
if not self.required_labels:
|
|
# No required labels, exit
|
|
return True
|
|
|
|
if not self.pr_number:
|
|
print("The GH_PR_NUMBER not specified, skipping the "
|
|
f"required-labels-check ({self.required_labels})")
|
|
return True
|
|
|
|
response = requests.get(
|
|
f"{_GH_API_URL}/issues/{self.pr_number}",
|
|
headers=_GH_HEADERS,
|
|
timeout=60
|
|
)
|
|
response.raise_for_status()
|
|
labels = set(_["name"] for _ in response.json()["labels"])
|
|
if self.required_labels.issubset(labels):
|
|
return True
|
|
print(f"To run all required tests the PR{self.pr_number} must have "
|
|
f"{', '.join(self.required_labels.difference(labels))} labels "
|
|
"set.")
|
|
return False
|
|
|
|
def run(self):
|
|
"""
|
|
Keep checking the PR until all required jobs finish
|
|
|
|
:returns: 0 on success; 1 on check failure; 2 when PR missing labels
|
|
"""
|
|
print(f"Gatekeeper for project={os.environ['GITHUB_REPOSITORY']} and "
|
|
f"SHA={self.latest_commit_sha} PR={self.pr_number}")
|
|
if not self.check_required_labels():
|
|
sys.exit(2)
|
|
sys.exit(self.wait_for_required_tests())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
Checker().run()
|