kata-containers/tools/testing/gatekeeper/jobs.py
Lukáš Doktor 9f8c8ea851
tools.testing: Add way to re-play recorded queries in gatekeeper
to simplify gatekeeper development add support for DEBUG_INPUT which can
be used to report content from files gathered in DEBUG run.

Signed-off-by: Lukáš Doktor <ldoktor@redhat.com>
2025-05-15 10:32:10 +02:00

272 lines
9.3 KiB
Python

#!/usr/bin/env python3
#
# Copyright (c) 2024 Red Hat Inc.
#
# SPDX-License-Identifier: Apache-2.0
"""
Keeps checking the current PR until all required jobs pass
Env variables:
* REQUIRED_JOBS: comma separated list of required jobs (in form of
"$workflow / $job")
* REQUIRED_REGEXPS: comma separated list of regexps for required jobs
* COMMIT_HASH: Full commit hash we want to be watching
* GITHUB_REPOSITORY: Github repository (user/repo)
Sample execution (GH token can be excluded):
GITHUB_TOKEN="..." REQUIRED_JOBS="skipper / skipper"
REQUIRED_REGEXPS=".*" REQUIRED_LABELS="ok-to-test;bar"
COMMIT_HASH=b8382cea886ad9a8f77d237bcfc0eba0c98775dd
GITHUB_REPOSITORY=kata-containers/kata-containers
GH_PR_NUMBER=123 python3 jobs.py
"""
import json
import os
import re
import sys
import time
import requests
PASS = 0
FAIL = 1
RUNNING = 127
_GH_HEADERS = {"Accept": "application/vnd.github.v3+json"}
if os.environ.get("GITHUB_TOKEN"):
_GH_HEADERS["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}"
_GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}"
_GH_RUNS_URL = f"{_GH_API_URL}/actions/runs"
if os.environ.get("DEBUG", "false") == "true":
DEBUG_DIR = os.path.join(os.path.abspath('.'), str(int(time.time())))
os.makedirs(DEBUG_DIR)
else:
DEBUG_DIR = None
if os.environ.get("DEBUG_INPUT", "") == "":
DEBUG_INPUT = None
else:
DEBUG_INPUT = os.path.abspath(os.environ["DEBUG_INPUT"])
class Checker:
"""Object to keep watching required GH action workflows"""
def __init__(self):
self.latest_commit_sha = os.getenv("COMMIT_HASH")
self.pr_number = os.getenv("GH_PR_NUMBER")
required_labels = os.getenv("REQUIRED_LABELS")
if required_labels:
self.required_labels = set(required_labels.split(";"))
else:
self.required_labels = []
required_jobs = os.getenv("REQUIRED_JOBS")
if required_jobs:
required_jobs = required_jobs.split(";")
else:
required_jobs = []
required_regexps = os.getenv("REQUIRED_REGEXPS")
self.required_regexps = []
# TODO: Add way to specify minimum amount of tests
# (eg. via \d+: prefix) and check it in status
if required_regexps:
for regexp in required_regexps.split(";"):
self.required_regexps.append(re.compile(regexp))
if not required_jobs and not self.required_regexps:
raise RuntimeError("No REQUIRED_JOBS or REQUIRED_REGEXPS defined")
# Set all required jobs as EXPECTED to enforce waiting for them
self.results = {job: {"status": "EXPECTED", "run_id": -1}
for job in required_jobs}
def record(self, workflow, job):
"""
Records a job run
"""
job_name = f"{workflow} / {job['name']}"
if job_name not in self.results:
for re_job in self.required_regexps:
# Required job via regexp
if re_job.match(job_name):
break
else:
# Not a required job
return
elif job['run_id'] < self.results[job_name]['run_id']:
# Newer results already stored
print(f"older {job_name} - {job['status']} {job['conclusion']} "
f"{job['id']} (newer_id={self.results[job_name]['id']})", file=sys.stderr)
return
print(f"{job_name} - {job['status']} {job['conclusion']} {job['id']}",
file=sys.stderr)
self.results[job_name] = job
@staticmethod
def _job_status(job):
"""Map job status to our status"""
if job["status"] != "completed":
return RUNNING
if job["conclusion"] != "success":
return job['conclusion']
return PASS
def status(self):
"""
:returns: 0 - all tests passing; 127 - no failures but some
tests in progress; 1 - any failure
"""
running = False
if not self.results:
# No results reported so far
return FAIL
for job in self.results.values():
status = self._job_status(job)
if status == RUNNING:
running |= True
elif status != PASS:
# Status not passed
return FAIL
if running:
return RUNNING
return PASS
def __str__(self):
"""Sumarize the current status"""
good = []
bad = []
warn = []
for name, job in self.results.items():
status = self._job_status(job)
if status == RUNNING:
warn.append(f"WARN: {name} - Still running")
elif status == PASS:
good.append(f"PASS: {name} - success")
else:
bad.append(f"FAIL: {name} - Not passed - {status}")
out = '\n'.join(sorted(good) + sorted(warn) + sorted(bad))
stat = self.status()
if stat == RUNNING:
status = "Some jobs are still running."
elif stat == PASS:
status = "All required jobs passed"
elif not self.results:
status = ("No required jobs for regexps: " +
";".join([_.pattern for _ in self.required_regexps]))
else:
status = "Not all required jobs passed!"
return f"{out}\n\n{status}"
def fetch_json_from_url(self, url, task, params=None):
"""Fetches URL and reports json output"""
print(url, file=sys.stderr)
if DEBUG_INPUT:
with open(f"{os.path.join(DEBUG_INPUT, task)}.json", "r",
encoding="utf8") as inp:
output = json.load(inp)
else:
response = requests.get(url, headers=_GH_HEADERS, params=params,
timeout=60)
response.raise_for_status()
output = response.json()
if DEBUG_DIR:
with open(f"{os.path.join(DEBUG_DIR, task)}.json", "w",
encoding="utf8") as out:
json.dump(output, out)
return output
def get_jobs_for_workflow_run(self, run_id):
"""Get jobs from a workflow id"""
total_count = -1
jobs = []
page = 1
while True:
url = f"{_GH_RUNS_URL}/{run_id}/jobs?per_page=100&page={page}"
output = self.fetch_json_from_url(
url, f"get_jobs_for_workflow_run__{run_id}")
jobs.extend(output["jobs"])
total_count = max(total_count, output["total_count"])
if len(jobs) >= total_count:
break
page += 1
return jobs
def check_workflow_runs_status(self, attempt):
"""
Checks if all required jobs passed
:returns: 0 - all passing; 1 - any failure; 127 some jobs running
"""
# TODO: Check if we need pagination here as well
response = self.fetch_json_from_url(
_GH_RUNS_URL, f"check_workflow_runs_status_{attempt}",
{"head_sha": self.latest_commit_sha})
workflow_runs = response["workflow_runs"]
for run in workflow_runs:
jobs = self.get_jobs_for_workflow_run(run["id"])
for job in jobs:
self.record(run["name"], job)
print(self)
return self.status()
def wait_for_required_tests(self):
"""
Wait for all required tests to pass or failfast
:return: 0 - all passing; 1 - any failure
"""
i = 0
while True:
i += 1
ret = self.check_workflow_runs_status(i)
if ret == RUNNING:
running_jobs = len([name
for name, job in self.results.items()
if self._job_status(job) == RUNNING])
print(f"{running_jobs} jobs are still running...")
time.sleep(180)
continue
return ret
def check_required_labels(self):
"""
Check if all expected labels are present
:return: True on success, False on failure
"""
if not self.required_labels:
# No required labels, exit
return True
if not self.pr_number:
print("The GH_PR_NUMBER not specified, skipping the "
f"required-labels-check ({self.required_labels})")
return True
response = requests.get(
f"{_GH_API_URL}/issues/{self.pr_number}",
headers=_GH_HEADERS,
timeout=60
)
response.raise_for_status()
labels = set(_["name"] for _ in response.json()["labels"])
if self.required_labels.issubset(labels):
return True
print(f"To run all required tests the PR{self.pr_number} must have "
f"{', '.join(self.required_labels.difference(labels))} labels "
"set.")
return False
def run(self):
"""
Keep checking the PR until all required jobs finish
:returns: 0 on success; 1 on check failure; 2 when PR missing labels
"""
print(f"Gatekeeper for project={os.environ['GITHUB_REPOSITORY']} and "
f"SHA={self.latest_commit_sha} PR={self.pr_number}")
if not self.check_required_labels():
sys.exit(2)
sys.exit(self.wait_for_required_tests())
if __name__ == "__main__":
Checker().run()