Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/telemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-export([attach/4,
attach_many/4,
persist/0,
detach/1,
list_handlers/1,
execute/2,
Expand Down Expand Up @@ -142,6 +143,17 @@ attach_many(HandlerId, EventNames, Function, Config) when is_function(Function,
end,
telemetry_handler_table:insert(HandlerId, EventNames, Function, Config).

?DOC("""
Persist telemetry handlers.

This will improve performance of calling Telemetry handlers at the cost of
reducing performance of attaching or detaching new handlers.

This function should be used with care.
""").
persist() ->
telemetry_handler_table:persist().

?DOC("""
Removes the existing handler.

Expand Down
69 changes: 69 additions & 0 deletions src/telemetry_ets.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-module(telemetry_ets).

-include("telemetry.hrl").

-export([persist/1,
list_for_event/2,
list_by_prefix/2,
insert/5,
delete/2]).

persist(TID) ->
Handlers = ets:tab2list(TID),
Map = lists:foldl(fun(Handler, Acc) ->
maps:update_with(Handler#handler.event_name,
fun(L) -> [Handler | L] end,
[Handler],
Acc)
end,
#{},
Handlers),
{ok, Map}.

list_for_event(TID, EventName) ->
try
ets:lookup(TID, EventName)
catch
error:badarg ->
[]
end.

list_by_prefix(TID, EventPrefix) ->
Pattern = match_pattern_for_prefix(EventPrefix),
try
ets:match_object(TID, Pattern)
catch
error:badarg ->
[]
end.

match_pattern_for_prefix(EventPrefix) ->
#handler{event_name=match_for_prefix(EventPrefix),
_='_'}.

-dialyzer({nowarn_function, match_for_prefix/1}).
match_for_prefix([]) ->
'_';
match_for_prefix([Segment | Rest]) ->
[Segment | match_for_prefix(Rest)].

insert(TID, HandlerId, EventNames, Function, Config) ->
case ets:match(TID, #handler{id=HandlerId,
_='_'}) of
[] ->
Objects = [#handler{id=HandlerId,
event_name=EventName,
function=Function,
config=Config} || EventName <- EventNames],
ets:insert(TID, Objects),
{ok, TID};
_ ->
{error, already_exists}
end.

delete(TID, HandlerId) ->
case ets:select_delete(TID, [{#handler{id=HandlerId,
_='_'}, [], [true]}]) of
0 -> {error, not_found};
_ -> {ok, TID}
end.
80 changes: 46 additions & 34 deletions src/telemetry_handler_table.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
-behaviour(gen_server).

-export([start_link/0,
persist/0,
insert/4,
delete/1,
list_for_event/1,
Expand All @@ -25,6 +26,8 @@
code_change/3,
terminate/2]).

-compile({inline, [impl_get/0]}).

-include("telemetry.hrl").

start_link() ->
Expand All @@ -42,46 +45,64 @@ insert(HandlerId, EventNames, Function, Config) ->
delete(HandlerId) ->
gen_server:call(?MODULE, {delete, HandlerId}).

persist() ->
{Mod, State} = impl_get(),
case Mod:persist(State) of
{ok, NewState} ->
persistent_term:put(telemetry, {telemetry_pt, NewState}),
ok;
_ ->
ok
end.

impl_get() -> persistent_term:get(telemetry).

-spec list_for_event(telemetry:event_name()) -> [#handler{}].
list_for_event(EventName) ->
try
ets:lookup(?MODULE, EventName)
catch
error:badarg ->
case impl_get() of
{Mod, State} ->
Mod:list_for_event(State, EventName);
_ ->
?LOG_WARNING("Failed to lookup telemetry handlers. "
"Ensure the telemetry application has been started. ", []),
[]
end.

-spec list_by_prefix(telemetry:event_prefix()) -> [#handler{}].
list_by_prefix(EventPrefix) ->
Pattern = match_pattern_for_prefix(EventPrefix),
ets:match_object(?MODULE, Pattern).
case impl_get() of
{Mod, State} ->
Mod:list_by_prefix(State, EventPrefix);
_ ->
?LOG_WARNING("Failed to lookup telemetry handlers. "
"Ensure the telemetry application has been started. ", []),
[]
end.

init([]) ->
_ = create_table(),
TID = create_table(),

persistent_term:put(telemetry, {telemetry_ets, TID}),

{ok, []}.

handle_call({insert, HandlerId, EventNames, Function, Config}, _From, State) ->
case ets:match(?MODULE, #handler{id=HandlerId,
_='_'}) of
[] ->
Objects = [#handler{id=HandlerId,
event_name=EventName,
function=Function,
config=Config} || EventName <- EventNames],
ets:insert(?MODULE, Objects),
{Mod, MState} = impl_get(),
case Mod:insert(MState, HandlerId, EventNames, Function, Config) of
{ok, NewState} ->
persistent_term:put(telemetry, {Mod, NewState}),
{reply, ok, State};
_ ->
{reply, {error, already_exists}, State}
{error, _} = Error ->
{reply, Error, State}
end;
handle_call({delete, HandlerId}, _From, State) ->
case ets:select_delete(?MODULE, [{#handler{id=HandlerId,
_='_'}, [], [true]}]) of
0 ->
{reply, {error, not_found}, State};
_ ->
{reply, ok, State}
{Mod, MState} = impl_get(),
case Mod:delete(MState, HandlerId) of
{ok, NewState} ->
persistent_term:put(telemetry, {Mod, NewState}),
{reply, ok, State};
{error, _} = Error ->
{reply, Error, State}
end.

handle_cast(_Msg, State) ->
Expand All @@ -94,20 +115,11 @@ code_change(_, State, _) ->
{ok, State}.

terminate(_Reason, _State) ->
persistent_term:erase(telemetry),
ok.

%%

create_table() ->
ets:new(?MODULE, [duplicate_bag, protected, named_table,
ets:new(?MODULE, [duplicate_bag, protected,
{keypos, #handler.event_name}, {read_concurrency, true}]).

match_pattern_for_prefix(EventPrefix) ->
#handler{event_name=match_for_prefix(EventPrefix),
_='_'}.

-dialyzer({nowarn_function, match_for_prefix/1}).
match_for_prefix([]) ->
'_';
match_for_prefix([Segment | Rest]) ->
[Segment | match_for_prefix(Rest)].
62 changes: 62 additions & 0 deletions src/telemetry_pt.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-module(telemetry_pt).

-include("telemetry.hrl").

-export([persist/1,
list_for_event/2,
list_by_prefix/2,
insert/5,
delete/2]).

persist(_Map) -> error.

list_for_event(Map, EventName) ->
case Map of
#{EventName := Handlers} -> Handlers;
_ -> []
end.

list_by_prefix(Map, EventPrefix) ->
[Handler ||
{EventName, Handlers} <- maps:to_list(Map),
starts_with(EventName, EventPrefix),
Handler <- Handlers].

starts_with(_Haystack, []) -> true;
starts_with([A | Haystack], [A | Needle]) -> starts_with(Haystack, Needle);
starts_with(_Haystack, _Needle) -> false.

insert(Map, _HandlerId, [], _Function, _Config) ->
{ok, Map};
insert(Map, HandlerId, [EventName | Rest], Function, Config) ->
Handler = #handler{id=HandlerId,
event_name=EventName,
function=Function,
config=Config},
OldHandlers = maps:get(EventName, Map, []),
case OldHandlers of
#{HandlerId := _} -> {error, already_exists};
_ ->
case put_new(Handler, OldHandlers) of
{ok, NewHandlers} ->
NewMap = Map#{EventName => NewHandlers},
insert(NewMap, HandlerId, Rest, Function, Config);
{error, _} = Error ->
Error
end
end.

put_new(Handler, List) ->
case lists:keymember(Handler#handler.id, #handler.id, List) of
true -> {error, already_exists};
false -> {ok, [Handler | List]}
end.

delete(Map, HandlerId) ->
Filtered = [{Event, lists:keydelete(HandlerId, #handler.id, Handlers)}
|| {Event, Handlers} <- maps:to_list(Map)],
NewMap = maps:from_list(Filtered),
case NewMap =:= Map of
true -> {error, not_found};
false -> {ok, NewMap}
end.
45 changes: 40 additions & 5 deletions test/telemetry_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,65 @@

-include("telemetry.hrl").

all() ->
[bad_event_names, duplicate_attach, invoke_handler,
all() -> [persist_with_existing_handlers, {group, ets}, {group, persisted}].

groups() ->
Tests = [bad_event_names, duplicate_attach, invoke_handler,
list_handlers, list_for_prefix, detach_on_exception,
no_execute_detached, no_execute_on_prefix, no_execute_on_specific,
handler_on_multiple_events, remove_all_handler_on_failure,
list_handler_on_many, detach_from_all, old_execute, default_metadata,
off_execute, invoke_successful_span_handlers, invoke_exception_span_handlers,
spans_generate_unique_default_contexts, logs_on_local_function].
spans_generate_unique_default_contexts, logs_on_local_function],

[{ets, [], Tests}, {persisted, [], Tests}].

init_per_suite(Config) ->
init_per_group(Name, Config) ->
application:ensure_all_started(telemetry),
case Name of
persisted -> ok = telemetry:persist();
_ -> ok
end,
Config.

end_per_suite(_Config) ->
end_per_group(_, _Config) ->
application:stop(telemetry).

init_per_testcase(_, Config) ->
HandlerId = crypto:strong_rand_bytes(16),
[{id, HandlerId} | Config].

end_per_testcase(persist_with_existing_handlers, _Config) ->
ok;
end_per_testcase(_, Config) ->
HandlerId = ?config(id, Config),
telemetry:detach(HandlerId).

persist_with_existing_handlers(Config) ->
application:ensure_all_started(telemetry),
HandlerId = ?config(id, Config),
Event = [a, test, event],
HandlerConfig = #{send_to => self()},
Measurements = #{data => 3},
Metadata = #{some => metadata},
telemetry:attach(HandlerId, Event, fun ?MODULE:echo_event/4, HandlerConfig),

telemetry:persist(),

telemetry:execute(Event, Measurements, Metadata),

try
receive
{event, Event, Measurements, Metadata, HandlerConfig} ->
ok
after
1000 ->
ct:fail(timeout_receive_echo)
end
after
application:stop(telemetry)
end.

bad_event_names(Config) ->
HandlerId = ?config(id, Config),
?assertError(badarg, telemetry:attach(HandlerId, ["some", event], fun ?MODULE:echo_event/4, [])),
Expand Down