From 9ab492e62e4f7b4e685c5ea82db9b970a3f8c6ba Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Thu, 23 Jan 2025 14:27:32 +0100 Subject: [PATCH 1/6] Monitor the process which creates the memo value --- src/depcache.erl | 85 +++++++++++++++++++++++++++-------------- test/depcache_tests.erl | 25 ++++++++++++ 2 files changed, 82 insertions(+), 28 deletions(-) diff --git a/src/depcache.erl b/src/depcache.erl index 8f665a2..4343d44 100644 --- a/src/depcache.erl +++ b/src/depcache.erl @@ -297,12 +297,14 @@ memo(Fun, Key, MaxAge, Server) -> Result :: any(). memo(Fun, Key, MaxAge, Dep, Server) -> Key1 = case Key of - undefined -> memo_key(Fun); - _ -> Key - end, + undefined -> memo_key(Fun); + _ -> Key + end, case ?MODULE:get_wait(Key1, Server) of {ok, Value} -> Value; + {throw, premature_exit} -> + ?MODULE:memo(Fun, Key, MaxAge, Dep, Server); {throw, R} -> throw(R); undefined -> @@ -323,31 +325,58 @@ memo(Fun, Key, MaxAge, Dep, Server) -> Server :: depcache_server(), Result :: any(). memo_key(Fun, Key, MaxAge, Dep, Server) -> - try - Value = - case Fun of - {M,F,A} -> erlang:apply(M,F,A); - {M,F} -> M:F(); - _ when is_function(Fun) -> Fun() - end, - {Value1, MaxAge1, Dep1} = - case Value of - #memo{value=V, max_age=MA, deps=D} -> - MA1 = case is_integer(MA) of true -> MA; false -> MaxAge end, - {V, MA1, Dep++D}; - _ -> - {Value, MaxAge, Dep} - end, - case MaxAge of - 0 -> memo_send_replies(Key, Value1, Server); - _ -> set(Key, Value1, MaxAge1, Dep1, Server) - end, - Value1 - catch - ?WITH_STACKTRACE(Class, R, S) - memo_send_errors(Key, {throw, R}, Server), - erlang:raise(Class, R, S) - end. + ExitWatcher = start_exit_watcher(Key, Server), + try + try + {Value1, MaxAge1, Dep1} = case apply_fun(Fun) of + #memo{value=V, max_age=MA, deps=D} -> + MA1 = case is_integer(MA) of + true -> MA; + false -> MaxAge + end, + {V, MA1, Dep++D}; + Value -> + {Value, MaxAge, Dep} + end, + case MaxAge of + 0 -> memo_send_replies(Key, Value1, Server); + _ -> set(Key, Value1, MaxAge1, Dep1, Server) + end, + + Value1 + catch + ?WITH_STACKTRACE(Class, R, S) + memo_send_errors(Key, {throw, R}, Server), + erlang:raise(Class, R, S) + end + after + stop_exit_watcher(ExitWatcher) + end. + +%% @private +%% @doc Monitors the current process... +%% Sends premature_exit throw to depcache server when it detects one. +start_exit_watcher(Key, Server) -> + Self = self(), + spawn(fun() -> + Ref = monitor(process, Self), + receive + done -> + erlang:demonitor(Ref); + {'DOWN', Ref, process, Self, _Reason} -> + memo_send_errors(Key, {throw, premature_exit}, Server) + end + end). + +stop_exit_watcher(Pid) -> + Pid ! done. + +%% @private +%% @doc Execute the memo function +%% Returns the result value +apply_fun({M,F,A}) -> erlang:apply(M,F,A); +apply_fun({M,F}) -> M:F(); +apply_fun(Fun) when is_function(Fun) -> Fun(). %% @private diff --git a/test/depcache_tests.erl b/test/depcache_tests.erl index 906892d..a511221 100644 --- a/test/depcache_tests.erl +++ b/test/depcache_tests.erl @@ -145,3 +145,28 @@ memo_raise_test() -> ?assertMatch({depcache_tests, raise_error, 0, _}, hd(S)) end, ok. + +memo_premature_kill_test() -> + {ok, C} = depcache:start_link(#{}), + + LongTask = fun() -> + Fun = fun() -> + timer:sleep(500), + done + end, + depcache:memo(Fun, test, C) + end, + + Pid = spawn(LongTask), + timer:kill_after(250, Pid), + timer:sleep(50), + ?assertEqual({throw, premature_exit}, depcache:get_wait(test, C)), + + % Check if another process takes over processing in case of pre-mature exits + Task = spawn(LongTask), + timer:kill_after(250, Task), + timer:sleep(50), + ?assertEqual(done, LongTask()), + + ok. + From d9c36bd39eccedbf6a5e4b4d51b055e66d19e8bc Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Thu, 23 Jan 2025 14:49:01 +0100 Subject: [PATCH 2/6] Do builds for otp 25, 26 and 27 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 34d0e23..bf81f57 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: - otp_version: [20.3,21,22,23,24,25] + otp_version: [25,26,27] os: [ubuntu-latest] container: From e5d62f3d89663c530f26d48362cb7614e46f1842 Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Thu, 23 Jan 2025 14:58:09 +0100 Subject: [PATCH 3/6] Add proper as extra plt app to dialyzer config --- rebar.config | 1 + 1 file changed, 1 insertion(+) diff --git a/rebar.config b/rebar.config index 5956057..9259c99 100644 --- a/rebar.config +++ b/rebar.config @@ -19,6 +19,7 @@ ]}, {test, [ {dialyzer, [ + {plt_extra_apps, [proper]}, {warnings, [ no_return ]} From 41d35f7c79833d93379c6b6b1f691f074b520d69 Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Thu, 23 Jan 2025 15:26:36 +0100 Subject: [PATCH 4/6] Supress dialyzer warnings about use of proper:setup/2 --- rebar.config | 2 +- test/prop_depcache.erl | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 9259c99..d65744c 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ ]}, {test, [ {dialyzer, [ - {plt_extra_apps, [proper]}, + {plt_extra_apps, [proper, eunit]}, {warnings, [ no_return ]} diff --git a/test/prop_depcache.erl b/test/prop_depcache.erl index 915ebf3..f02b4a6 100644 --- a/test/prop_depcache.erl +++ b/test/prop_depcache.erl @@ -1,6 +1,14 @@ -module(prop_depcache). -include_lib("proper/include/proper.hrl"). +-dialyzer({nowarn_function, prop_set_get/0}). +-dialyzer({nowarn_function, prop_flush_all/0}). +-dialyzer({nowarn_function, prop_get_set_maxage/0}). +-dialyzer({nowarn_function, prop_get_set_maxage_0/0}). +-dialyzer({nowarn_function, prop_get_set_depend/0}). +-dialyzer({nowarn_function, prop_get_set_depend_map/0}). +-dialyzer({nowarn_function, prop_memo/0}). + %%%%%%%%%%%%%%%%%% %%% Properties %%% %%%%%%%%%%%%%%%%%% From 654bc1bc710139d0d0ad7b4583e703e7903c50cc Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Thu, 23 Jan 2025 16:52:27 +0100 Subject: [PATCH 5/6] Let the depcache process itself do the monitoring, instead of an extra monitor process --- src/depcache.erl | 57 +++++++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/src/depcache.erl b/src/depcache.erl index 4343d44..a6668a5 100644 --- a/src/depcache.erl +++ b/src/depcache.erl @@ -79,7 +79,7 @@ data_table :: ets:tab() }). --record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}). +-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map() }). -record(meta, {key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}). -record(depend, {key :: key(), serial :: non_neg_integer()}). @@ -325,8 +325,8 @@ memo(Fun, Key, MaxAge, Dep, Server) -> Server :: depcache_server(), Result :: any(). memo_key(Fun, Key, MaxAge, Dep, Server) -> - ExitWatcher = start_exit_watcher(Key, Server), - try + %%ExitWatcher = start_exit_watcher(Key, Server), + %%try try {Value1, MaxAge1, Dep1} = case apply_fun(Fun) of #memo{value=V, max_age=MA, deps=D} -> @@ -348,28 +348,28 @@ memo_key(Fun, Key, MaxAge, Dep, Server) -> ?WITH_STACKTRACE(Class, R, S) memo_send_errors(Key, {throw, R}, Server), erlang:raise(Class, R, S) - end - after - stop_exit_watcher(ExitWatcher) - end. + end. + %%after + %% stop_exit_watcher(ExitWatcher) + %%end. %% @private %% @doc Monitors the current process... %% Sends premature_exit throw to depcache server when it detects one. -start_exit_watcher(Key, Server) -> - Self = self(), - spawn(fun() -> - Ref = monitor(process, Self), - receive - done -> - erlang:demonitor(Ref); - {'DOWN', Ref, process, Self, _Reason} -> - memo_send_errors(Key, {throw, premature_exit}, Server) - end - end). - -stop_exit_watcher(Pid) -> - Pid ! done. +%%start_exit_watcher(Key, Server) -> +%% Self = self(), +%% spawn(fun() -> +%% Ref = monitor(process, Self), +%% receive +%% done -> +%% erlang:demonitor(Ref); +%% {'DOWN', Ref, process, Self, _Reason} -> +%% memo_send_errors(Key, {throw, premature_exit}, Server) +%% end +%% end). +%% +%%stop_exit_watcher(Pid) -> +%% Pid ! done. %% @private %% @doc Execute the memo function @@ -805,7 +805,8 @@ init(Config) -> tables = Tables, now=now_sec(), serial=0, - wait_pids=#{} + wait_pids=#{}, + writers=#{} }, timer:send_after(1000, tick), spawn_link(?MODULE, @@ -940,6 +941,11 @@ handle_info(tick, State) -> erase_process_dict(), {noreply, State#state{now=now_sec()}}; +handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> + io:fwrite(standard_error, "Down for ~p~n", [Ref]), + + {noreply, State}; + handle_info(_Msg, State) -> {noreply, State}. @@ -1007,9 +1013,14 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) -> WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids), {noreply, State#state{wait_pids=WaitPids}}; _ -> + %% Monitor the sender as a writer for Key + {Pid, _} = From, + Ref = erlang:monitor(process, Pid), + Writers = maps:put(Ref, Key, State#state.writers), + %% Nobody waiting or we hit a timeout, let next requestors wait for this caller. WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids), - {reply, undefined, State#state{wait_pids=WaitPids}} + {reply, undefined, State#state{wait_pids=WaitPids, writers=Writers}} end; {ok, _Value} = Found -> {reply, Found, State} From 240b57a33baba30fd13dc97ac67beaa7c98ff099 Mon Sep 17 00:00:00 2001 From: Maas-Maarten Zeeman Date: Fri, 24 Jan 2025 14:48:25 +0100 Subject: [PATCH 6/6] Cleanup, let the depcache monitor the writers --- src/depcache.erl | 126 ++++++++++++++++++++-------------------- test/depcache_tests.erl | 4 +- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/src/depcache.erl b/src/depcache.erl index a6668a5..a532666 100644 --- a/src/depcache.erl +++ b/src/depcache.erl @@ -1,5 +1,5 @@ %% @author Arjan Scherpenisse -%% @copyright 2009-2020 Marc Worrell, Arjan Scherpenisse +%% @copyright 2009-2025 Marc Worrell, Arjan Scherpenisse %% @doc Depcache API %% %% == depcache API == @@ -14,7 +14,7 @@ %% {@link cleanup/1}, {@link cleanup/5} %% %% @end -%% Copyright 2009-2020 Marc Worrell, Arjan Scherpenisse +%% Copyright 2009-2025 Marc Worrell, Arjan Scherpenisse %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -92,7 +92,7 @@ }). -type tables() :: #tables{meta_table :: ets:tab(), deps_table :: ets:tab(), data_table :: ets:tab()}. --type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}. +-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map()}. -type depend() :: #depend{key :: key(), serial :: non_neg_integer()}. -type cleanup_state() :: #cleanup_state{pid :: pid(), tables :: tables(), name :: atom(), memory_max :: non_neg_integer(), callback :: callback() | undefined}. -type meta() :: #meta{key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}. @@ -303,12 +303,12 @@ memo(Fun, Key, MaxAge, Dep, Server) -> case ?MODULE:get_wait(Key1, Server) of {ok, Value} -> Value; - {throw, premature_exit} -> - ?MODULE:memo(Fun, Key, MaxAge, Dep, Server); {throw, R} -> throw(R); undefined -> - memo_key(Fun, Key, MaxAge, Dep, Server) + memo_key(Fun, Key, MaxAge, Dep, Server); + {error, premature_exit} -> + ?MODULE:memo(Fun, Key, MaxAge, Dep, Server) end. %% @private @@ -325,51 +325,29 @@ memo(Fun, Key, MaxAge, Dep, Server) -> Server :: depcache_server(), Result :: any(). memo_key(Fun, Key, MaxAge, Dep, Server) -> - %%ExitWatcher = start_exit_watcher(Key, Server), - %%try - try - {Value1, MaxAge1, Dep1} = case apply_fun(Fun) of - #memo{value=V, max_age=MA, deps=D} -> - MA1 = case is_integer(MA) of - true -> MA; - false -> MaxAge - end, - {V, MA1, Dep++D}; - Value -> - {Value, MaxAge, Dep} - end, - case MaxAge of - 0 -> memo_send_replies(Key, Value1, Server); - _ -> set(Key, Value1, MaxAge1, Dep1, Server) - end, + try + {Value1, MaxAge1, Dep1} = case apply_fun(Fun) of + #memo{value=V, max_age=MA, deps=D} -> + MA1 = case is_integer(MA) of + true -> MA; + false -> MaxAge + end, + {V, MA1, Dep++D}; + Value -> + {Value, MaxAge, Dep} + end, + case MaxAge of + 0 -> memo_send_replies(Key, Value1, Server); + _ -> set(Key, Value1, MaxAge1, Dep1, Server) + end, - Value1 - catch - ?WITH_STACKTRACE(Class, R, S) - memo_send_errors(Key, {throw, R}, Server), - erlang:raise(Class, R, S) - end. - %%after - %% stop_exit_watcher(ExitWatcher) - %%end. + Value1 + catch + ?WITH_STACKTRACE(Class, R, S) + memo_send_errors(Key, {throw, R}, Server), + erlang:raise(Class, R, S) + end. -%% @private -%% @doc Monitors the current process... -%% Sends premature_exit throw to depcache server when it detects one. -%%start_exit_watcher(Key, Server) -> -%% Self = self(), -%% spawn(fun() -> -%% Ref = monitor(process, Self), -%% receive -%% done -> -%% erlang:demonitor(Ref); -%% {'DOWN', Ref, process, Self, _Reason} -> -%% memo_send_errors(Key, {throw, premature_exit}, Server) -%% end -%% end). -%% -%%stop_exit_watcher(Pid) -> -%% Pid ! done. %% @private %% @doc Execute the memo function @@ -487,7 +465,7 @@ set(Key, Data, MaxAge, Depend, Server) -> -spec get_wait( Key, Server ) -> Result when Key :: key(), Server :: depcache_server(), - Result :: {ok, any()} | undefined | {throw, term()}. + Result :: {ok, any()} | undefined | {throw, term()} | {error, premature_exit}. get_wait(Key, Server) -> case get_process_dict(Key, Server) of NoValue when NoValue =:= undefined orelse NoValue =:= depcache_disabled -> @@ -941,10 +919,20 @@ handle_info(tick, State) -> erase_process_dict(), {noreply, State#state{now=now_sec()}}; -handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> - io:fwrite(standard_error, "Down for ~p~n", [Ref]), - - {noreply, State}; +handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{ writers = Writers }=State) -> + case maps:take(Ref, Writers) of + error -> + {noreply, State}; + {Key, Writers1} -> + WaitPids1 = case maps:take(Key, State#state.wait_pids) of + error -> + State#state.wait_pids; + {{_MaxAge, List, _WriterRef}, WaitPids} -> + _ = [ catch gen_server:reply(From, {error, premature_exit}) || From <- List ], + WaitPids + end, + {noreply, State#state{ writers = Writers1, wait_pids = WaitPids1 }} + end; handle_info(_Msg, State) -> {noreply, State}. @@ -1008,19 +996,29 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) -> undefined -> State end, case State#state.wait_pids of - #{Key := {MaxAge, List}} when State#state.now < MaxAge -> + #{Key := {MaxAge, List, WriterRef}} when State#state.now < MaxAge -> %% Another process is already calculating the value, let the caller wait. - WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids), + WaitPids = maps:update(Key, {MaxAge, [From|List], WriterRef}, State#state.wait_pids), {noreply, State#state{wait_pids=WaitPids}}; _ -> - %% Monitor the sender as a writer for Key + %% de-monitor an old writer, if any. + Writers = case maps:find(Key, State#state.wait_pids) of + error -> + State#state.writers; + {ok, {_, _WaitPids, OldWriter}} -> + erlang:demonitor(OldWriter), + maps:without([OldWriter], State#state.writers) + end, + + %% Monitor and register the writer {Pid, _} = From, Ref = erlang:monitor(process, Pid), - Writers = maps:put(Ref, Key, State#state.writers), + Writers1 = maps:put(Ref, Key, Writers), %% Nobody waiting or we hit a timeout, let next requestors wait for this caller. - WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids), - {reply, undefined, State#state{wait_pids=WaitPids, writers=Writers}} + WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, [], Ref}, State#state.wait_pids), + + {reply, undefined, State#state{wait_pids=WaitPids, writers=Writers1}} end; {ok, _Value} = Found -> {reply, Found, State} @@ -1039,7 +1037,7 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) -> Tag :: atom(). handle_call_get_waiting_pids(Key, State) -> {State1, Pids} = case maps:take(Key, State#state.wait_pids) of - {{_MaxAge, List}, WaitPids} -> + {{_MaxAge, List, _WriterRef}, WaitPids} -> {State#state{wait_pids=WaitPids}, List}; error -> {State, []} @@ -1122,9 +1120,11 @@ handle_call_set({Key, Data, MaxAge, Depend}, #state{tables = Tables} = State) -> %% Check if other processes are waiting for this key, send them the data case maps:take(Key, State1#state.wait_pids) of - {{_MaxAge, List}, WaitPids} -> + {{_MaxAge, List, WriterRef}, WaitPids} -> _ = [ catch gen_server:reply(From, {ok, Data}) || From <- List ], - {reply, ok, State1#state{wait_pids=WaitPids}}; + _ = erlang:demonitor(WriterRef), + Writers = maps:without([WriterRef], State#state.writers), + {reply, ok, State1#state{writers=Writers, wait_pids=WaitPids}}; error -> {reply, ok, State1} end. diff --git a/test/depcache_tests.erl b/test/depcache_tests.erl index a511221..3f077bd 100644 --- a/test/depcache_tests.erl +++ b/test/depcache_tests.erl @@ -154,13 +154,13 @@ memo_premature_kill_test() -> timer:sleep(500), done end, - depcache:memo(Fun, test, C) + depcache:memo(Fun, premature_kill_test, C) end, Pid = spawn(LongTask), timer:kill_after(250, Pid), timer:sleep(50), - ?assertEqual({throw, premature_exit}, depcache:get_wait(test, C)), + ?assertEqual({error, premature_exit}, depcache:get_wait(premature_kill_test, C)), % Check if another process takes over processing in case of pre-mature exits Task = spawn(LongTask),