Merge pull request #11270 from ldoktor/gk

tools.testing: Add methods to simplify gatekeeper development
This commit is contained in:
Wainer Moschetta 2025-05-26 12:04:07 -03:00 committed by GitHub
commit c249769bb8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ GITHUB_REPOSITORY=kata-containers/kata-containers
GH_PR_NUMBER=123 python3 jobs.py
"""
import json
import os
import re
import sys
@ -37,6 +38,15 @@ if os.environ.get("GITHUB_TOKEN"):
_GH_HEADERS["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}"
_GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}"
_GH_RUNS_URL = f"{_GH_API_URL}/actions/runs"
if os.environ.get("DEBUG", "false") == "true":
DEBUG_DIR = os.path.join(os.path.abspath('.'), str(int(time.time())))
os.makedirs(DEBUG_DIR)
else:
DEBUG_DIR = None
if os.environ.get("DEBUG_INPUT", "") == "":
DEBUG_INPUT = None
else:
DEBUG_INPUT = os.path.abspath(os.environ["DEBUG_INPUT"])
class Checker:
@ -144,6 +154,24 @@ class Checker:
status = "Not all required jobs passed!"
return f"{out}\n\n{status}"
def fetch_json_from_url(self, url, task, params=None):
"""Fetches URL and reports json output"""
print(url, file=sys.stderr)
if DEBUG_INPUT:
with open(f"{os.path.join(DEBUG_INPUT, task)}.json", "r",
encoding="utf8") as inp:
output = json.load(inp)
else:
response = requests.get(url, headers=_GH_HEADERS, params=params,
timeout=60)
response.raise_for_status()
output = response.json()
if DEBUG_DIR:
with open(f"{os.path.join(DEBUG_DIR, task)}.json", "w",
encoding="utf8") as out:
json.dump(output, out)
return output
def get_jobs_for_workflow_run(self, run_id):
"""Get jobs from a workflow id"""
total_count = -1
@ -151,10 +179,8 @@ class Checker:
page = 1
while True:
url = f"{_GH_RUNS_URL}/{run_id}/jobs?per_page=100&page={page}"
print(url, file=sys.stderr)
response = requests.get(url, headers=_GH_HEADERS, timeout=60)
response.raise_for_status()
output = response.json()
output = self.fetch_json_from_url(
url, f"get_jobs_for_workflow_run__{run_id}")
jobs.extend(output["jobs"])
total_count = max(total_count, output["total_count"])
if len(jobs) >= total_count:
@ -162,22 +188,17 @@ class Checker:
page += 1
return jobs
def check_workflow_runs_status(self):
def check_workflow_runs_status(self, attempt):
"""
Checks if all required jobs passed
:returns: 0 - all passing; 1 - any failure; 127 some jobs running
"""
# TODO: Check if we need pagination here as well
print(_GH_RUNS_URL, file=sys.stderr)
response = requests.get(
_GH_RUNS_URL,
params={"head_sha": self.latest_commit_sha},
headers=_GH_HEADERS,
timeout=60
)
response.raise_for_status()
workflow_runs = response.json()["workflow_runs"]
response = self.fetch_json_from_url(
_GH_RUNS_URL, f"check_workflow_runs_status_{attempt}",
{"head_sha": self.latest_commit_sha})
workflow_runs = response["workflow_runs"]
for run in workflow_runs:
jobs = self.get_jobs_for_workflow_run(run["id"])
for job in jobs:
@ -191,8 +212,10 @@ class Checker:
:return: 0 - all passing; 1 - any failure
"""
i = 0
while True:
ret = self.check_workflow_runs_status()
i += 1
ret = self.check_workflow_runs_status(i)
if ret == RUNNING:
running_jobs = len([name
for name, job in self.results.items()