Skip to content

Commit ca904f6

Browse files
Ensure records don't get stuck in processing state
If an exception occurs during an on_update callback, the "completion" function would never be called, which meant the PACT flag was never reset. This had the end result of causing the affected record to never process again, i.e. its value could never be changed.
1 parent 5ce9345 commit ca904f6

File tree

4 files changed

+144
-5
lines changed

4 files changed

+144
-5
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Changed:
2424

2525
- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_
2626
- `Ensure returned numpy arrays are not writeable <../../pull/164>`_
27+
- `Ensure records do not get stuck in processing state <../../pull/175>`_
2728

2829
Fixed:
2930

softioc/asyncio_dispatcher.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ async def async_wrapper():
8484
ret = func(*func_args)
8585
if inspect.isawaitable(ret):
8686
await ret
87-
if completion:
88-
completion(*completion_args)
8987
except Exception:
9088
logging.exception("Exception when running dispatched callback")
89+
finally:
90+
if completion:
91+
completion(*completion_args)
9192
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
9293

9394
def __enter__(self):

softioc/cothread_dispatcher.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import inspect
2+
import logging
13

24
class CothreadDispatcher:
35
def __init__(self, dispatcher = None):
@@ -28,7 +30,12 @@ def __call__(
2830
completion = None,
2931
completion_args=()):
3032
def wrapper():
31-
func(*func_args)
32-
if completion:
33-
completion(*completion_args)
33+
try:
34+
func(*func_args)
35+
except Exception:
36+
logging.exception("Exception when running dispatched callback")
37+
finally:
38+
if completion:
39+
completion(*completion_args)
40+
3441
self.__dispatcher(wrapper)

tests/test_records.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,136 @@ async def query_record(index):
912912
if process.exitcode is None:
913913
pytest.fail("Process did not terminate")
914914

915+
916+
def blocking_test_func_broken_on_update(
917+
self, device_name, conn, use_asyncio
918+
):
919+
920+
builder.SetDeviceName(device_name)
921+
922+
count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0)
923+
924+
async def async_blocking_broken_on_update(new_val):
925+
"""on_update function that always throws an exception"""
926+
log("CHILD: blocking_broken_on_update starting")
927+
completed_count = count_rec.get() + 1
928+
count_rec.set(completed_count)
929+
log(
930+
f"CHILD: blocking_update_func updated count: {completed_count}",
931+
)
932+
raise Exception("on_update is broken!")
933+
934+
def sync_blocking_broken_on_update(new_val):
935+
"""on_update function that always throws an exception"""
936+
log("CHILD: blocking_broken_on_update starting")
937+
completed_count = count_rec.get() + 1
938+
count_rec.set(completed_count)
939+
log(
940+
f"CHILD: blocking_update_func updated count: {completed_count}",
941+
)
942+
raise Exception("on_update is broken!")
943+
944+
if use_asyncio:
945+
on_update_callback = async_blocking_broken_on_update
946+
else:
947+
on_update_callback = sync_blocking_broken_on_update
948+
949+
builder.longOut(
950+
"BLOCKING-BROKEN-ON-UPDATE",
951+
on_update=on_update_callback,
952+
always_update=True,
953+
blocking=True
954+
)
955+
956+
if use_asyncio:
957+
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
958+
else:
959+
dispatcher = None
960+
builder.LoadDatabase()
961+
softioc.iocInit(dispatcher)
962+
963+
conn.send("R") # "Ready"
964+
965+
log("CHILD: Sent R over Connection to Parent")
966+
967+
# Keep process alive while main thread runs CAGET
968+
if not use_asyncio:
969+
log("CHILD: Beginning cothread poll_list")
970+
import cothread
971+
cothread.poll_list([(conn.fileno(), cothread.POLLIN)], TIMEOUT)
972+
if conn.poll(TIMEOUT):
973+
val = conn.recv()
974+
assert val == "D", "Did not receive expected Done character"
975+
976+
log("CHILD: Received exit command, child exiting")
977+
978+
@requires_cothread
979+
@pytest.mark.asyncio
980+
@pytest.mark.parametrize("use_asyncio", [True, False])
981+
async def test_blocking_broken_on_update(self, use_asyncio):
982+
"""Test that a blocking record with an on_update record that will
983+
always throw an exception will not permanently block record processing.
984+
985+
Runs using both cothread and asyncio dispatchers in the IOC."""
986+
ctx = get_multiprocessing_context()
987+
988+
parent_conn, child_conn = ctx.Pipe()
989+
990+
device_name = create_random_prefix()
991+
992+
process = ctx.Process(
993+
target=self.blocking_test_func_broken_on_update,
994+
args=(device_name, child_conn, use_asyncio),
995+
)
996+
997+
process.start()
998+
999+
log("PARENT: Child started, waiting for R command")
1000+
1001+
from aioca import caget, caput
1002+
1003+
try:
1004+
# Wait for message that IOC has started
1005+
select_and_recv(parent_conn, "R")
1006+
1007+
log("PARENT: received R command")
1008+
1009+
assert await caget(device_name + ":BLOCKING-COUNTER") == 0
1010+
1011+
log("PARENT: BLOCKING-COUNTER was 0")
1012+
1013+
await caput(
1014+
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
1015+
1,
1016+
wait=True,
1017+
timeout=TIMEOUT
1018+
)
1019+
1020+
assert await caget(device_name + ":BLOCKING-COUNTER") == 1
1021+
1022+
await caput(
1023+
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
1024+
2,
1025+
wait=True,
1026+
timeout=TIMEOUT
1027+
)
1028+
1029+
assert await caget(device_name + ":BLOCKING-COUNTER") == 2
1030+
1031+
1032+
finally:
1033+
# Clear the cache before stopping the IOC stops
1034+
# "channel disconnected" error messages
1035+
aioca_cleanup()
1036+
1037+
log("PARENT: Sending Done command to child")
1038+
parent_conn.send("D") # "Done"
1039+
process.join(timeout=TIMEOUT)
1040+
log(f"PARENT: Join completed with exitcode {process.exitcode}")
1041+
if process.exitcode is None:
1042+
pytest.fail("Process did not terminate")
1043+
1044+
9151045
class TestGetSetField:
9161046
"""Tests related to get_field and set_field on records"""
9171047

0 commit comments

Comments
 (0)