Skip to content

Commit

Permalink
Let the depcache process itself do the monitoring, instead of an extr…
Browse files Browse the repository at this point in the history
…a monitor process
  • Loading branch information
mmzeeman committed Jan 23, 2025
1 parent 41d35f7 commit 654bc1b
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions src/depcache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
data_table :: ets:tab()
}).

-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}).
-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map() }).
-record(meta, {key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}).
-record(depend, {key :: key(), serial :: non_neg_integer()}).

Expand Down Expand Up @@ -325,8 +325,8 @@ memo(Fun, Key, MaxAge, Dep, Server) ->
Server :: depcache_server(),
Result :: any().
memo_key(Fun, Key, MaxAge, Dep, Server) ->
ExitWatcher = start_exit_watcher(Key, Server),
try
%%ExitWatcher = start_exit_watcher(Key, Server),
%%try
try
{Value1, MaxAge1, Dep1} = case apply_fun(Fun) of
#memo{value=V, max_age=MA, deps=D} ->
Expand All @@ -348,28 +348,28 @@ memo_key(Fun, Key, MaxAge, Dep, Server) ->
?WITH_STACKTRACE(Class, R, S)
memo_send_errors(Key, {throw, R}, Server),
erlang:raise(Class, R, S)
end
after
stop_exit_watcher(ExitWatcher)
end.
end.
%%after
%% stop_exit_watcher(ExitWatcher)
%%end.

%% @private
%% @doc Monitors the current process...
%% Sends premature_exit throw to depcache server when it detects one.
start_exit_watcher(Key, Server) ->
Self = self(),
spawn(fun() ->
Ref = monitor(process, Self),
receive
done ->
erlang:demonitor(Ref);
{'DOWN', Ref, process, Self, _Reason} ->
memo_send_errors(Key, {throw, premature_exit}, Server)
end
end).

stop_exit_watcher(Pid) ->
Pid ! done.
%%start_exit_watcher(Key, Server) ->
%% Self = self(),
%% spawn(fun() ->
%% Ref = monitor(process, Self),
%% receive
%% done ->
%% erlang:demonitor(Ref);
%% {'DOWN', Ref, process, Self, _Reason} ->
%% memo_send_errors(Key, {throw, premature_exit}, Server)
%% end
%% end).
%%
%%stop_exit_watcher(Pid) ->
%% Pid ! done.

%% @private
%% @doc Execute the memo function
Expand Down Expand Up @@ -805,7 +805,8 @@ init(Config) ->
tables = Tables,
now=now_sec(),
serial=0,
wait_pids=#{}
wait_pids=#{},
writers=#{}
},
timer:send_after(1000, tick),
spawn_link(?MODULE,
Expand Down Expand Up @@ -940,6 +941,11 @@ handle_info(tick, State) ->
erase_process_dict(),
{noreply, State#state{now=now_sec()}};

handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
io:fwrite(standard_error, "Down for ~p~n", [Ref]),

{noreply, State};

handle_info(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -1007,9 +1013,14 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) ->
WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids),
{noreply, State#state{wait_pids=WaitPids}};
_ ->
%% Monitor the sender as a writer for Key
{Pid, _} = From,
Ref = erlang:monitor(process, Pid),
Writers = maps:put(Ref, Key, State#state.writers),

%% Nobody waiting or we hit a timeout, let next requestors wait for this caller.
WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids),
{reply, undefined, State#state{wait_pids=WaitPids}}
{reply, undefined, State#state{wait_pids=WaitPids, writers=Writers}}
end;
{ok, _Value} = Found ->
{reply, Found, State}
Expand Down

0 comments on commit 654bc1b

Please sign in to comment.