tools.testing: Add DEBUG support for gatekeeper

to avoid manual curling to analyze GK issues let's add a way to dump all
GK requests in a directory when the use specifies "DEBUG" env variable.

Signed-off-by: Lukáš Doktor <ldoktor@redhat.com>
This commit is contained in:
Lukáš Doktor
2025-05-15 10:28:11 +02:00
parent 393cc61153
commit 1a15990ee1

View File

@@ -20,6 +20,7 @@ GITHUB_REPOSITORY=kata-containers/kata-containers
GH_PR_NUMBER=123 python3 jobs.py GH_PR_NUMBER=123 python3 jobs.py
""" """
import json
import os import os
import re import re
import sys import sys
@@ -37,6 +38,11 @@ if os.environ.get("GITHUB_TOKEN"):
_GH_HEADERS["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}" _GH_HEADERS["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}"
_GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}" _GH_API_URL = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}"
_GH_RUNS_URL = f"{_GH_API_URL}/actions/runs" _GH_RUNS_URL = f"{_GH_API_URL}/actions/runs"
if os.environ.get("DEBUG", "false") == "true":
DEBUG_DIR = os.path.join(os.path.abspath('.'), str(int(time.time())))
os.makedirs(DEBUG_DIR)
else:
DEBUG_DIR = None
class Checker: class Checker:
@@ -144,6 +150,19 @@ class Checker:
status = "Not all required jobs passed!" status = "Not all required jobs passed!"
return f"{out}\n\n{status}" return f"{out}\n\n{status}"
def fetch_json_from_url(self, url, task, params=None):
"""Fetches URL and reports json output"""
print(url, file=sys.stderr)
response = requests.get(url, headers=_GH_HEADERS, params=params,
timeout=60)
response.raise_for_status()
output = response.json()
if DEBUG_DIR:
with open(f"{os.path.join(DEBUG_DIR, task)}.json", "w",
encoding="utf8") as out:
json.dump(output, out)
return output
def get_jobs_for_workflow_run(self, run_id): def get_jobs_for_workflow_run(self, run_id):
"""Get jobs from a workflow id""" """Get jobs from a workflow id"""
total_count = -1 total_count = -1
@@ -151,10 +170,8 @@ class Checker:
page = 1 page = 1
while True: while True:
url = f"{_GH_RUNS_URL}/{run_id}/jobs?per_page=100&page={page}" url = f"{_GH_RUNS_URL}/{run_id}/jobs?per_page=100&page={page}"
print(url, file=sys.stderr) output = self.fetch_json_from_url(
response = requests.get(url, headers=_GH_HEADERS, timeout=60) url, f"get_jobs_for_workflow_run__{run_id}")
response.raise_for_status()
output = response.json()
jobs.extend(output["jobs"]) jobs.extend(output["jobs"])
total_count = max(total_count, output["total_count"]) total_count = max(total_count, output["total_count"])
if len(jobs) >= total_count: if len(jobs) >= total_count:
@@ -162,22 +179,17 @@ class Checker:
page += 1 page += 1
return jobs return jobs
def check_workflow_runs_status(self): def check_workflow_runs_status(self, attempt):
""" """
Checks if all required jobs passed Checks if all required jobs passed
:returns: 0 - all passing; 1 - any failure; 127 some jobs running :returns: 0 - all passing; 1 - any failure; 127 some jobs running
""" """
# TODO: Check if we need pagination here as well # TODO: Check if we need pagination here as well
print(_GH_RUNS_URL, file=sys.stderr) response = self.fetch_json_from_url(
response = requests.get( _GH_RUNS_URL, f"check_workflow_runs_status_{attempt}",
_GH_RUNS_URL, {"head_sha": self.latest_commit_sha})
params={"head_sha": self.latest_commit_sha}, workflow_runs = response["workflow_runs"]
headers=_GH_HEADERS,
timeout=60
)
response.raise_for_status()
workflow_runs = response.json()["workflow_runs"]
for run in workflow_runs: for run in workflow_runs:
jobs = self.get_jobs_for_workflow_run(run["id"]) jobs = self.get_jobs_for_workflow_run(run["id"])
for job in jobs: for job in jobs:
@@ -191,8 +203,10 @@ class Checker:
:return: 0 - all passing; 1 - any failure :return: 0 - all passing; 1 - any failure
""" """
i = 0
while True: while True:
ret = self.check_workflow_runs_status() i += 1
ret = self.check_workflow_runs_status(i)
if ret == RUNNING: if ret == RUNNING:
running_jobs = len([name running_jobs = len([name
for name, job in self.results.items() for name, job in self.results.items()