diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 34d0e23..bf81f57 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: - otp_version: [20.3,21,22,23,24,25] + otp_version: [25,26,27] os: [ubuntu-latest] container: diff --git a/rebar.config b/rebar.config index 5956057..d65744c 100644 --- a/rebar.config +++ b/rebar.config @@ -19,6 +19,7 @@ ]}, {test, [ {dialyzer, [ + {plt_extra_apps, [proper, eunit]}, {warnings, [ no_return ]} diff --git a/src/depcache.erl b/src/depcache.erl index 8f665a2..a532666 100644 --- a/src/depcache.erl +++ b/src/depcache.erl @@ -1,5 +1,5 @@ %% @author Arjan Scherpenisse -%% @copyright 2009-2020 Marc Worrell, Arjan Scherpenisse +%% @copyright 2009-2025 Marc Worrell, Arjan Scherpenisse %% @doc Depcache API %% %% == depcache API == @@ -14,7 +14,7 @@ %% {@link cleanup/1}, {@link cleanup/5} %% %% @end -%% Copyright 2009-2020 Marc Worrell, Arjan Scherpenisse +%% Copyright 2009-2025 Marc Worrell, Arjan Scherpenisse %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -79,7 +79,7 @@ data_table :: ets:tab() }). --record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}). +-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map() }). -record(meta, {key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}). -record(depend, {key :: key(), serial :: non_neg_integer()}). @@ -92,7 +92,7 @@ }). -type tables() :: #tables{meta_table :: ets:tab(), deps_table :: ets:tab(), data_table :: ets:tab()}. --type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}. +-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map()}. -type depend() :: #depend{key :: key(), serial :: non_neg_integer()}. -type cleanup_state() :: #cleanup_state{pid :: pid(), tables :: tables(), name :: atom(), memory_max :: non_neg_integer(), callback :: callback() | undefined}. -type meta() :: #meta{key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}. @@ -297,16 +297,18 @@ memo(Fun, Key, MaxAge, Server) -> Result :: any(). memo(Fun, Key, MaxAge, Dep, Server) -> Key1 = case Key of - undefined -> memo_key(Fun); - _ -> Key - end, + undefined -> memo_key(Fun); + _ -> Key + end, case ?MODULE:get_wait(Key1, Server) of {ok, Value} -> Value; {throw, R} -> throw(R); undefined -> - memo_key(Fun, Key, MaxAge, Dep, Server) + memo_key(Fun, Key, MaxAge, Dep, Server); + {error, premature_exit} -> + ?MODULE:memo(Fun, Key, MaxAge, Dep, Server) end. %% @private @@ -323,31 +325,36 @@ memo(Fun, Key, MaxAge, Dep, Server) -> Server :: depcache_server(), Result :: any(). memo_key(Fun, Key, MaxAge, Dep, Server) -> - try - Value = - case Fun of - {M,F,A} -> erlang:apply(M,F,A); - {M,F} -> M:F(); - _ when is_function(Fun) -> Fun() - end, - {Value1, MaxAge1, Dep1} = - case Value of - #memo{value=V, max_age=MA, deps=D} -> - MA1 = case is_integer(MA) of true -> MA; false -> MaxAge end, - {V, MA1, Dep++D}; - _ -> - {Value, MaxAge, Dep} - end, - case MaxAge of - 0 -> memo_send_replies(Key, Value1, Server); - _ -> set(Key, Value1, MaxAge1, Dep1, Server) - end, - Value1 - catch - ?WITH_STACKTRACE(Class, R, S) - memo_send_errors(Key, {throw, R}, Server), - erlang:raise(Class, R, S) - end. + try + {Value1, MaxAge1, Dep1} = case apply_fun(Fun) of + #memo{value=V, max_age=MA, deps=D} -> + MA1 = case is_integer(MA) of + true -> MA; + false -> MaxAge + end, + {V, MA1, Dep++D}; + Value -> + {Value, MaxAge, Dep} + end, + case MaxAge of + 0 -> memo_send_replies(Key, Value1, Server); + _ -> set(Key, Value1, MaxAge1, Dep1, Server) + end, + + Value1 + catch + ?WITH_STACKTRACE(Class, R, S) + memo_send_errors(Key, {throw, R}, Server), + erlang:raise(Class, R, S) + end. + + +%% @private +%% @doc Execute the memo function +%% Returns the result value +apply_fun({M,F,A}) -> erlang:apply(M,F,A); +apply_fun({M,F}) -> M:F(); +apply_fun(Fun) when is_function(Fun) -> Fun(). %% @private @@ -458,7 +465,7 @@ set(Key, Data, MaxAge, Depend, Server) -> -spec get_wait( Key, Server ) -> Result when Key :: key(), Server :: depcache_server(), - Result :: {ok, any()} | undefined | {throw, term()}. + Result :: {ok, any()} | undefined | {throw, term()} | {error, premature_exit}. get_wait(Key, Server) -> case get_process_dict(Key, Server) of NoValue when NoValue =:= undefined orelse NoValue =:= depcache_disabled -> @@ -776,7 +783,8 @@ init(Config) -> tables = Tables, now=now_sec(), serial=0, - wait_pids=#{} + wait_pids=#{}, + writers=#{} }, timer:send_after(1000, tick), spawn_link(?MODULE, @@ -911,6 +919,21 @@ handle_info(tick, State) -> erase_process_dict(), {noreply, State#state{now=now_sec()}}; +handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{ writers = Writers }=State) -> + case maps:take(Ref, Writers) of + error -> + {noreply, State}; + {Key, Writers1} -> + WaitPids1 = case maps:take(Key, State#state.wait_pids) of + error -> + State#state.wait_pids; + {{_MaxAge, List, _WriterRef}, WaitPids} -> + _ = [ catch gen_server:reply(From, {error, premature_exit}) || From <- List ], + WaitPids + end, + {noreply, State#state{ writers = Writers1, wait_pids = WaitPids1 }} + end; + handle_info(_Msg, State) -> {noreply, State}. @@ -973,14 +996,29 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) -> undefined -> State end, case State#state.wait_pids of - #{Key := {MaxAge, List}} when State#state.now < MaxAge -> + #{Key := {MaxAge, List, WriterRef}} when State#state.now < MaxAge -> %% Another process is already calculating the value, let the caller wait. - WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids), + WaitPids = maps:update(Key, {MaxAge, [From|List], WriterRef}, State#state.wait_pids), {noreply, State#state{wait_pids=WaitPids}}; _ -> + %% de-monitor an old writer, if any. + Writers = case maps:find(Key, State#state.wait_pids) of + error -> + State#state.writers; + {ok, {_, _WaitPids, OldWriter}} -> + erlang:demonitor(OldWriter), + maps:without([OldWriter], State#state.writers) + end, + + %% Monitor and register the writer + {Pid, _} = From, + Ref = erlang:monitor(process, Pid), + Writers1 = maps:put(Ref, Key, Writers), + %% Nobody waiting or we hit a timeout, let next requestors wait for this caller. - WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids), - {reply, undefined, State#state{wait_pids=WaitPids}} + WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, [], Ref}, State#state.wait_pids), + + {reply, undefined, State#state{wait_pids=WaitPids, writers=Writers1}} end; {ok, _Value} = Found -> {reply, Found, State} @@ -999,7 +1037,7 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) -> Tag :: atom(). handle_call_get_waiting_pids(Key, State) -> {State1, Pids} = case maps:take(Key, State#state.wait_pids) of - {{_MaxAge, List}, WaitPids} -> + {{_MaxAge, List, _WriterRef}, WaitPids} -> {State#state{wait_pids=WaitPids}, List}; error -> {State, []} @@ -1082,9 +1120,11 @@ handle_call_set({Key, Data, MaxAge, Depend}, #state{tables = Tables} = State) -> %% Check if other processes are waiting for this key, send them the data case maps:take(Key, State1#state.wait_pids) of - {{_MaxAge, List}, WaitPids} -> + {{_MaxAge, List, WriterRef}, WaitPids} -> _ = [ catch gen_server:reply(From, {ok, Data}) || From <- List ], - {reply, ok, State1#state{wait_pids=WaitPids}}; + _ = erlang:demonitor(WriterRef), + Writers = maps:without([WriterRef], State#state.writers), + {reply, ok, State1#state{writers=Writers, wait_pids=WaitPids}}; error -> {reply, ok, State1} end. diff --git a/test/depcache_tests.erl b/test/depcache_tests.erl index 906892d..3f077bd 100644 --- a/test/depcache_tests.erl +++ b/test/depcache_tests.erl @@ -145,3 +145,28 @@ memo_raise_test() -> ?assertMatch({depcache_tests, raise_error, 0, _}, hd(S)) end, ok. + +memo_premature_kill_test() -> + {ok, C} = depcache:start_link(#{}), + + LongTask = fun() -> + Fun = fun() -> + timer:sleep(500), + done + end, + depcache:memo(Fun, premature_kill_test, C) + end, + + Pid = spawn(LongTask), + timer:kill_after(250, Pid), + timer:sleep(50), + ?assertEqual({error, premature_exit}, depcache:get_wait(premature_kill_test, C)), + + % Check if another process takes over processing in case of pre-mature exits + Task = spawn(LongTask), + timer:kill_after(250, Task), + timer:sleep(50), + ?assertEqual(done, LongTask()), + + ok. + diff --git a/test/prop_depcache.erl b/test/prop_depcache.erl index 915ebf3..f02b4a6 100644 --- a/test/prop_depcache.erl +++ b/test/prop_depcache.erl @@ -1,6 +1,14 @@ -module(prop_depcache). -include_lib("proper/include/proper.hrl"). +-dialyzer({nowarn_function, prop_set_get/0}). +-dialyzer({nowarn_function, prop_flush_all/0}). +-dialyzer({nowarn_function, prop_get_set_maxage/0}). +-dialyzer({nowarn_function, prop_get_set_maxage_0/0}). +-dialyzer({nowarn_function, prop_get_set_depend/0}). +-dialyzer({nowarn_function, prop_get_set_depend_map/0}). +-dialyzer({nowarn_function, prop_memo/0}). + %%%%%%%%%%%%%%%%%% %%% Properties %%% %%%%%%%%%%%%%%%%%%