Skip to content

Commit d347fe5

Browse files
author
Jeny Sadadia
committed
Test commit
Signed-off-by: Jeny Sadadia <[email protected]>
1 parent b80ea1a commit d347fe5

File tree

2 files changed

+48
-19
lines changed

2 files changed

+48
-19
lines changed

src/job_retry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,14 @@ def _run(self, sub_id):
8080
event_data["jobfilter"] = [node["name"]]
8181
event_data["platform_filter"] = [node["data"].get("platform")]
8282
event_data["retry_counter"] = retry_counter + 1
83+
event_data["debug"] = {"retry_by": str(node["id"])}
8384
self.log.debug(f"{node['id']}:Event data retry_counter: {event_data['retry_counter']}")
8485
# Added a flag `is_retry` to signal a job retry event
8586
event_data["is_retry"] = True
8687
event = {'data': event_data}
87-
self._api_helper.api.send_event('node', event)
88-
self.log.info(f"Job retry for node {node['id']} submitted")
88+
self._api_helper.api.send_event('retry', event)
89+
self.log.info(f"Job retry for node {node['id']} submitted. Parent node: {event_data['id']}")
90+
self.log.debug(f"Event:{event}")
8991
else:
9092
self.log.error(f"Not able to retry the job as parent kind is unknown: {node['id']}")
9193
return True

src/scheduler.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# SPDX-License-Identifier: LGPL-2.1-or-later
44
#
5-
# Copyright (C) 2021, 2022, 2023 Collabora Limited
5+
# Copyright (C) 2021-2025 Collabora Limited
66
# Author: Guillaume Tucker <[email protected]>
77
# Author: Jeny Sadadia <[email protected]>
88

@@ -105,11 +105,17 @@ def _cleanup_paths(self):
105105
# ToDo: if stat != 0 then report error to API?
106106

107107
def _setup(self, args):
108-
return self._api.subscribe('node')
109-
110-
def _stop(self, sub_id):
111-
if sub_id:
112-
self._api_helper.unsubscribe_filters(sub_id)
108+
# return self._api.subscribe('node')
109+
node_sub_id = self._api.subscribe('node')
110+
self.log.debug(f"Node channel sub id: {node_sub_id}")
111+
retry_sub_id = self._api.subscribe('retry')
112+
self.log.debug(f"Retry channel sub id: {retry_sub_id}")
113+
return [node_sub_id, retry_sub_id]
114+
115+
def _stop(self, sub_ids):
116+
for sub_id in sub_ids:
117+
if sub_id:
118+
self._api_helper.unsubscribe_filters(sub_id)
113119
self._cleanup_paths()
114120

115121
def backup_cleanup(self):
@@ -149,6 +155,7 @@ def _run_job(self, job_config, runtime, platform, input_node, retry_counter):
149155
node = self._api_helper.create_job_node(job_config,
150156
input_node,
151157
runtime, platform, retry_counter)
158+
self.log.debug(f"Job node created: {node['id']}. Parent: f{node['parent']}")
152159
except KeyError as e:
153160
self.log.error(' '.join([
154161
input_node['id'],
@@ -371,7 +378,17 @@ def _verify_architecture_filter(self, job, node):
371378
return False
372379
return True
373380

374-
def _run(self, sub_id):
381+
def _run(self, sub_ids):
382+
threads = []
383+
for sub_id in sub_ids:
384+
thread = threading.Thread(target=self._run_scheduler, args=(sub_id,))
385+
threads.append(thread)
386+
thread.start()
387+
388+
for thread in threads:
389+
thread.join()
390+
391+
def _run_scheduler(self, sub_id):
375392
self.log.info("Listening for available checkout events")
376393
self.log.info("Press Ctrl-C to stop.")
377394
subscribe_retries = 0
@@ -381,25 +398,29 @@ def _run(self, sub_id):
381398
event = None
382399
try:
383400
event = self._api_helper.receive_event_data(sub_id, block=False)
401+
if not event:
402+
# If we received a keep-alive event, just continue
403+
continue
384404
except Exception as e:
385405
self.log.error(f"Error receiving event: {e}, re-subscribing in 10 seconds")
386-
time.sleep(10)
387-
sub_id = self._api.subscribe('node')
388-
subscribe_retries += 1
389-
if subscribe_retries > 3:
390-
self.log.error("Failed to re-subscribe to node events")
391-
return False
392-
continue
393-
if not event:
394-
# If we received a keep-alive event, just continue
406+
# time.sleep(10)
407+
# sub_id = self._api.subscribe('node')
408+
# subscribe_retries += 1
409+
# if subscribe_retries > 3:
410+
# self.log.error("Failed to re-subscribe to node events")
411+
# return False
395412
continue
396-
subscribe_retries = 0
413+
# subscribe_retries = 0
414+
self.log.debug(f"Event received: {sub_id}:{event['id']}:{event.get('debug')}:{event.get('retry_counter')}")
397415
for job, runtime, platform, rules in self._sched.get_schedule(event):
398416
input_node = self._api.node.get(event['id'])
399417
jobfilter = event.get('jobfilter')
400418
# Add to node data the jobfilter if it exists in event
401419
if jobfilter and isinstance(jobfilter, list):
402420
input_node['jobfilter'] = jobfilter
421+
platform_filter = event.get('platform_filter')
422+
if platform_filter and isinstance(platform_filter, list):
423+
input_node['platform_filter'] = platform_filter
403424
# we cannot use rules, as we need to have info about job too
404425
if job.params.get('frequency', None):
405426
if not self._verify_frequency(job, input_node, platform):
@@ -448,5 +469,11 @@ def __call__(self, configs, args):
448469
opts = parse_opts('scheduler', globals())
449470
yaml_configs = opts.get_yaml_configs() or 'config'
450471
configs = kernelci.config.load(yaml_configs)
472+
# sub_ids = sch.setup(opts)
473+
# threads = []
474+
# for sub_id in sub_ids:
475+
# thread = threading.Thread(target=sch._run_scheduler, args=(sub_id,))
476+
# threads.append(thread)
477+
# thread.start()
451478
status = opts.command(configs, opts)
452479
sys.exit(0 if status is True else 1)

0 commit comments

Comments
 (0)