Skip to content
Open
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a3dd06e
[_722] fix segfault and hung threads on SIGINT during parallel get
d-w-moore May 19, 2025
481952c
use subtest.
d-w-moore Jun 6, 2025
1213d2b
try to preserve latest synchronous parallel put/get for orderly shutd…
d-w-moore Dec 10, 2025
59fd131
can now abort parallel transfers with SIGINT/^C or SIGTERM
d-w-moore Dec 12, 2025
ba0407a
[_722] update readme for signals and parallel put/get
d-w-moore Dec 12, 2025
dddcc95
prevent auto_close
d-w-moore Dec 13, 2025
b96f805
satisfy static typing.
d-w-moore Dec 13, 2025
ed25eda
revise README
d-w-moore Dec 13, 2025
9d2ff7a
forward ref needed for mypy?
d-w-moore Dec 13, 2025
0aaf747
patch test
d-w-moore Dec 18, 2025
ade42ea
more informative error message when retcodes do not match
d-w-moore Dec 19, 2025
8272b5a
delete unnecessary "import irods"
d-w-moore Dec 20, 2025
7584b71
Update README.md
d-w-moore Jan 4, 2026
368e08e
add a finite timeout
d-w-moore Dec 20, 2025
0765f71
review comments
d-w-moore Jan 4, 2026
9ec506b
comments regarding futures returning None
d-w-moore Jan 7, 2026
1b42f97
test condition wait is ten minutes is the default, no need to specify…
d-w-moore Jan 7, 2026
1740e80
catch was a no-op
d-w-moore Jan 8, 2026
c5824cc
remove TODO's
d-w-moore Jan 8, 2026
92474be
[_722] test a data put is sanely interruptable
d-w-moore Jan 8, 2026
df05ce1
[squashed multiple commits] tighten up all the quit logic:
d-w-moore Jan 9, 2026
14037f9
[another_squash] tidy, fix, add put test
d-w-moore Jan 12, 2026
107ef8d
add tools.py with shared functions.
d-w-moore Jan 16, 2026
f7b5a73
make doc string more thorough, for abort_parallel_transfers().
d-w-moore Jan 16, 2026
d242b90
codacy, review
d-w-moore Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 60 additions & 45 deletions irods/test/modules/test_signal_handling_in_multithread_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import irods
import irods.helpers
from irods.test import modules as test_modules
from irods.parallel import abort_parallel_transfers

OBJECT_SIZE = 2 * 1024**3
OBJECT_NAME = "data_get_issue__722"
Expand Down Expand Up @@ -39,53 +40,56 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")):

for signal_name in signal_names:

test_case.subTest(f"Testing with signal {signal_name}")

# Call into this same module as a command. This will initiate another Python process that
# performs a lengthy data object "get" operation (see the main body of the script, below.)
process = subprocess.Popen(
[sys.executable, program],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)

# Wait for download process to reach the point of spawning data transfer threads. In Python 3.9+ versions
# of the concurrent.futures module, these are nondaemon threads and will block the exit of the main thread
# unless measures are taken (#722).
localfile = process.stdout.readline().strip()
test_case.assertTrue(
wait_till_true(
lambda: os.path.exists(localfile)
and os.stat(localfile).st_size > OBJECT_SIZE // 2
),
"Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.",
)

sig = getattr(signal, signal_name)

# Interrupt the subprocess with the given signal.
process.send_signal(sig)
# Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit
# due to misproper or incomplete handling of the signal.
try:
test_case.assertEqual(
process.wait(timeout=15),
-sig,
"Unexpected subprocess return code.",
)
except subprocess.TimeoutExpired as timeout_exc:
test_case.fail(
f"Subprocess timed out before terminating. "
"Non-daemon thread(s) probably prevented subprocess's main thread from exiting."
with test_case.subTest(f"Testing with signal {signal_name}"):

# Call into this same module as a command. This will initiate another Python process that
# performs a lengthy data object "get" operation (see the main body of the script, below.)
process = subprocess.Popen(
[sys.executable, program],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
# Assert that in the case of SIGINT, the process registered a KeyboardInterrupt.
if sig == signal.SIGINT:

# Wait for download process to reach the point of spawning data transfer threads. In Python 3.9+ versions
# of the concurrent.futures module, these are nondaemon threads and will block the exit of the main thread
# unless measures are taken (#722).
localfile = process.stdout.readline().strip()
test_case.assertTrue(
re.search("KeyboardInterrupt", process.stderr.read()),
"Did not find expected string 'KeyboardInterrupt' in log output.",
wait_till_true(
lambda: os.path.exists(localfile)
and os.stat(localfile).st_size > OBJECT_SIZE // 2
),
"Parallel download from data_objects.get() probably experienced a fatal error before spawning auxiliary data transfer threads.",
)

sig = getattr(signal, signal_name)

translate_return_code = lambda s: 128 - s if s < 0 else s

# Interrupt the subprocess with the given signal.
process.send_signal(sig)

# Assert that this signal is what killed the subprocess, rather than a timed out process "wait" or a natural exit
# due to misproper or incomplete handling of the signal.
try:
test_case.assertEqual(
translate_return_code(process.wait(timeout=15)),
128 + sig,
"Unexpected subprocess return code.",
)
except subprocess.TimeoutExpired as timeout_exc:
test_case.fail(
f"Subprocess timed out before terminating. "
"Non-daemon thread(s) probably prevented subprocess's main thread from exiting."
)
# Assert that in the case of SIGINT, the process registered a KeyboardInterrupt.
if sig == signal.SIGINT:
test_case.assertTrue(
re.search("KeyboardInterrupt", process.stderr.read()),
"Did not find expected string 'KeyboardInterrupt' in log output.",
)


if __name__ == "__main__":
# These lines are run only if the module is launched as a process.
Expand All @@ -110,8 +114,19 @@ def test(test_case, signal_names=("SIGTERM", "SIGINT")):
print(local_path)
sys.stdout.flush()

# "get" the object
session.data_objects.get(object_path, local_path)
def handler(sig,*_):
abort_parallel_transfers()
exit(128+sig)

signal.signal(signal.SIGTERM, handler)

try:
# download the object
session.data_objects.get(object_path, local_path)
except KeyboardInterrupt:
abort_parallel_transfers()
raise

finally:
# Clean up, whether or not the download succeeded.
if local_path is not None and os.path.exists(local_path):
Expand Down