Skip to content

Implement service to submit job retry #1257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion config/logger.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ keys=root,
timeout,
timeout-closing,
timeout-holdoff,
trigger
trigger,
job_retry

[handlers]
keys=consoleHandler, timedRotatingHandler
Expand Down Expand Up @@ -135,3 +136,9 @@ level=DEBUG
handlers=consoleHandler, timedRotatingHandler
qualname=trigger
propagate=0

[logger_job_retry]
level=DEBUG
handlers=consoleHandler, timedRotatingHandler
qualname=job_retry
propagate=0
13 changes: 13 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,16 @@ services:
- './data/src:/home/kernelci/data/src'
- './data/output:/home/kernelci/data/output'
- './logs:/home/kernelci/pipeline/logs'

job-retry:
<<: *base-service
container_name: 'kernelci-pipeline-job-retry'
command:
- './src/job_retry.py'
- '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}'
- 'run'
volumes:
- './config:/home/kernelci/config'
- './logs:/home/kernelci/pipeline/logs'
extra_hosts:
- "host.docker.internal:host-gateway"
103 changes: 103 additions & 0 deletions src/job_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python3
#
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (C) 2025 Collabora Limited
# Author: Jeny Sadadia <[email protected]>

import sys
import kernelci.config
from kernelci.legacy.cli import Args, Command, parse_opts

from base import Service


class JobRetry(Service):

def __init__(self, configs, args):
super().__init__(configs, args, 'job_retry')

def _setup(self, args):
return self._api_helper.subscribe_filters({
"state": "done",
"result": ("fail", "incomplete"),
"kind": "job"
# ToDo: Retry for build jobs
# "kind": ("kbuild", "job")
})

def _stop(self, sub_id):
if sub_id:
self._api_helper.unsubscribe_filters(sub_id)
sys.stdout.flush()

def _find_parent_kind(self, node, api_helper, kind):
parent_id = node.get('parent')
if not parent_id:
return None
parent_node = api_helper.api.node.get(parent_id)
if not parent_node:
return None
if parent_node.get('kind') == kind:
return parent_node
return self._find_parent_kind(parent_node, api_helper, kind)

def _run(self, sub_id):
self.log.info("Job retry: Listening for events... ")
self.log.info("Press Ctrl-C to stop.")

while True:
try:
node, _ = self._api_helper.receive_event_node(sub_id)
self.log.debug(f"Event received: {node['id']}")
except Exception as e:
self.log.error(f"Error receiving event: {e}")
continue

# Check retry count before submitting a retry
retry_counter = node.get("retry_counter", 0)
self.log.debug(f"{node['id']}: Node current retry_counter: {retry_counter}")
if retry_counter >= 3:
self.log.info(f"{node['id']} Job has already retried 3 times. \
Not submitting a retry.")
continue

parent_kind = None
if node.get('kind') == 'job':
parent_kind = 'kbuild'
# ToDo: retry build jobs
# if node.get("kind") == "kbuild":
# parent_kind = "checkout"
if parent_kind:
event_data = self._find_parent_kind(node, self._api_helper, parent_kind)
if not event_data:
self.log.error(f"Not able to find parent node for {node['id']}")
continue
event_data["jobfilter"] = [node["name"]]
event_data["platform_filter"] = [node["data"].get("platform")]
event_data["retry_counter"] = retry_counter + 1
event_data["debug"] = {"retry_by": str(node["id"])}
self.log.debug(f"{node['id']}:Event data retry_counter: {event_data['retry_counter']}")
event = {'data': event_data}
self._api_helper.api.send_event('retry', event)
self.log.info(f"Job retry for node {node['id']} submitted. Parent node: {event_data['id']}")
self.log.debug(f"Event:{event}")
else:
self.log.error(f"Not able to retry the job as parent kind is unknown: {node['id']}")
return True


class cmd_run(Command):
help = "Retry failed/incomplete builds and tests"
args = [Args.api_config]

def __call__(self, configs, args):
return JobRetry(configs, args).run(args)


if __name__ == '__main__':
opts = parse_opts('job_retry', globals())
yaml_configs = opts.get_yaml_configs() or 'config'
configs = kernelci.config.load(yaml_configs)
status = opts.command(configs, opts)
sys.exit(0 if status is True else 1)
72 changes: 55 additions & 17 deletions src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def __init__(self, configs, args):
self._storage_config, storage_cred
)
self._job_tmp_dirs = {}
self._threads = []
self._api_helper_lock = threading.Lock()
self._stop_thread_lock = threading.Lock()
self._context_lock = threading.Lock()
self._context = {}
self._stop_thread = False

def _get_runtimes_configs(self, configs, runtimes):
runtimes_configs = {}
Expand All @@ -105,11 +111,19 @@ def _cleanup_paths(self):
# ToDo: if stat != 0 then report error to API?

def _setup(self, args):
return self._api.subscribe('node')

def _stop(self, sub_id):
if sub_id:
self._api_helper.unsubscribe_filters(sub_id)
node_sub_id = self._api.subscribe('node')
self.log.debug(f"Node channel sub id: {node_sub_id}")
retry_sub_id = self._api.subscribe('retry')
self.log.debug(f"Retry channel sub id: {retry_sub_id}")
self._context = {"node": node_sub_id, "retry": retry_sub_id}
return {"node": node_sub_id, "retry": retry_sub_id}

def _stop(self, context):
self._stop_thread = True
for _, sub_id in self._context.items():
if sub_id:
self.log.info(f"Unsubscribing: {sub_id}")
self._api_helper.unsubscribe_filters(sub_id)
self._cleanup_paths()

def backup_cleanup(self):
Expand Down Expand Up @@ -144,11 +158,11 @@ def backup_job(self, filename, nodeid):
except Exception as e:
self.log.error(f"Failed to backup {filename} to {new_filename}: {e}")

def _run_job(self, job_config, runtime, platform, input_node):
def _run_job(self, job_config, runtime, platform, input_node, retry_counter):
try:
node = self._api_helper.create_job_node(job_config,
input_node,
runtime, platform)
runtime, platform, retry_counter)
except KeyError as e:
self.log.error(' '.join([
input_node['id'],
Expand All @@ -162,6 +176,7 @@ def _run_job(self, job_config, runtime, platform, input_node):

if not node:
return
self.log.debug(f"Job node created: {node['id']}. Parent: {node['parent']}")
# Most of the time, the artifacts we need originate from the parent
# node. Import those into the current node, working on a copy so the
# original node doesn't get "polluted" with useless artifacts when we
Expand Down Expand Up @@ -371,43 +386,66 @@ def _verify_architecture_filter(self, job, node):
return False
return True

def _run(self, sub_id):
def _run(self, context):
for channel, sub_id in self._context.items():
thread = threading.Thread(target=self._run_scheduler, args=(channel, sub_id,))
self._threads.append(thread)
thread.start()

for thread in self._threads:
thread.join()

def _run_scheduler(self, channel, sub_id):
self.log.info("Listening for available checkout events")
self.log.info("Press Ctrl-C to stop.")
subscribe_retries = 0

while True:
with self._stop_thread_lock:
if self._stop_thread:
break
last_heartbeat['time'] = time.time()
event = None
try:
event = self._api_helper.receive_event_data(sub_id, block=False)
if not event:
# If we received a keep-alive event, just continue
continue
except Exception as e:
self.log.error(f"Error receiving event: {e}, re-subscribing in 10 seconds")
time.sleep(10)
sub_id = self._api.subscribe('node')
with self._stop_thread_lock:
if self._stop_thread:
break
self.log.error(f"Error receiving event: {e}")
self.log.debug(f"Re-subscribing to channel: {channel}")
sub_id = self._api.subscribe(channel)
with self._context_lock:
self._context[channel] = sub_id
subscribe_retries += 1
if subscribe_retries > 3:
self.log.error("Failed to re-subscribe to node events")
self.log.error(f"Failed to re-subscribe to channel: {channel}")
return False
continue
if not event:
# If we received a keep-alive event, just continue
continue
subscribe_retries = 0
for job, runtime, platform, rules in self._sched.get_schedule(event):
input_node = self._api.node.get(event['id'])
jobfilter = event.get('jobfilter')
# Add to node data the jobfilter if it exists in event
if jobfilter and isinstance(jobfilter, list):
input_node['jobfilter'] = jobfilter
platform_filter = event.get('platform_filter')
if platform_filter and isinstance(platform_filter, list):
input_node['platform_filter'] = platform_filter
# we cannot use rules, as we need to have info about job too
if job.params.get('frequency', None):
if not self._verify_frequency(job, input_node, platform):
continue
if not self._verify_architecture_filter(job, input_node):
continue
if self._api_helper.should_create_node(rules, input_node):
self._run_job(job, runtime, platform, input_node)
with self._api_helper_lock:
flag = self._api_helper.should_create_node(rules, input_node)
if flag:
retry_counter = event.get('retry_counter', 0)
self._run_job(job, runtime, platform, input_node, retry_counter)

return True

Expand Down