diff --git a/src/logplex_api_v3_channel_logs.erl b/src/logplex_api_v3_channel_logs.erl index 3488fd2f..4ee4440f 100644 --- a/src/logplex_api_v3_channel_logs.erl +++ b/src/logplex_api_v3_channel_logs.erl @@ -8,6 +8,7 @@ rest_init/2, service_available/2, allowed_methods/2, + malformed_request/2, is_authorized/2, resource_exists/2, content_types_provided/2, @@ -15,7 +16,8 @@ ]). -record(state, { - channel_id :: binary() + channel_id :: binary(), + num_logs = 100 :: integer() %% number of log lines to fetch from log buffers }). %% @private @@ -38,6 +40,22 @@ service_available(Req, State) -> allowed_methods(Req, State) -> {[<<"GET">>], Req, State}. +%% @private +malformed_request(Req, State) -> + case cowboy_req:qs_val(<<"num">>, Req) of + {undefined, Req1} -> + {false, Req1, State}; + {Val, Req1} -> + try + Num = list_to_integer(binary_to_list(Val)), + NewState = State#state{ num_logs = Num }, + {false, Req1, NewState} + catch + _:_ -> + {false, Req1, State} + end + end. + %% @private is_authorized(Req, State) -> logplex_api_v3:is_authorized(Req, State). @@ -57,9 +75,10 @@ content_types_provided(Req, State) -> {[{{<<"application">>, <<"logplex-1">>, []}, to_logs}], Req, State}. %% @private -to_logs(Req, #state{ channel_id = ChannelId } = State) -> - %% fetch all messages from log buffer - case logplex_channel:logs(ChannelId, -1) of +to_logs(Req, #state{ channel_id = ChannelId, + num_logs = NumLogs } = State) -> + %% fetch messages from log buffer + case logplex_channel:logs(ChannelId, NumLogs) of {error, Reason} -> ?ERR("channel_id=~s err='failed to fetch channel logs' reason='~p'", [ChannelId, Reason]), diff --git a/test/logplex_api_v3_SUITE.erl b/test/logplex_api_v3_SUITE.erl index 1e6c95e2..e3a4c583 100644 --- a/test/logplex_api_v3_SUITE.erl +++ b/test/logplex_api_v3_SUITE.erl @@ -57,6 +57,8 @@ groups() -> , fetch_channel_logs , channel_logs_format , channel_logs_bad_syslog_message + , channel_logs_with_num_query_string + , channel_logs_with_malformed_query_string ]}, {sessions, [sessions_service_unavailable @@ -630,6 +632,73 @@ channel_logs_bad_syslog_message(Config0) -> ?assertEqual("64 <1>1 aaaa host cccc dddd - " ++ Uuid ++ "\n", Body). +channel_logs_with_num_query_string(Config0) -> + Config = create_channel_with_tokens(Config0), + Channel = ?config(channel, Config), + [{TokenName, TokenId} | _] = ?config(tokens, Config), + NumLogs = 5, + ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 3)], + LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs], + logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName), + Props = stream_channel_logs(Channel, [{num_logs, NumLogs} | Config]), + Headers = proplists:get_value(headers, Props), + ?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)), + ?assertEqual("3", proplists:get_value("logplex-msg-count", Headers)), + ?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)), + ?assertEqual("close", proplists:get_value("connection", Headers)), + Lines = re:split(proplists:get_value(body, Props), "\n", [trim]), + [begin + ?assertEqual(match, re:run(Line, Expected, [{capture, none}])), + NBin = list_to_binary(integer_to_list(N)), + ?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line) + end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines)], + Config. + +channel_logs_with_num_query_limit(Config0) -> + Config = create_channel_with_tokens(Config0), + Channel = ?config(channel, Config), + [{TokenName, TokenId} | _] = ?config(tokens, Config), + NumLogs = 3, + ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 5)], + LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs], + logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName), + Props = stream_channel_logs(Channel, [{num_logs, NumLogs} | Config]), + Headers = proplists:get_value(headers, Props), + ?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)), + %% note: spits out one more line to indicate there's more line than requested in the buffer + ?assertEqual("4", proplists:get_value("logplex-msg-count", Headers)), + ?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)), + ?assertEqual("close", proplists:get_value("connection", Headers)), + Lines = re:split(proplists:get_value(body, Props), "\n", [trim]), + [begin + ?assertEqual(match, re:run(Line, Expected, [{capture, none}])), + NBin = list_to_binary(integer_to_list(N)), + ?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line) + end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines), N > 1], + Config. + +channel_logs_with_malformed_query_string(Config0) -> + Config = create_channel_with_tokens(Config0), + Channel = ?config(channel, Config), + [{TokenName, TokenId} | _] = ?config(tokens, Config), + ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 5)], + LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs], + logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName), + Props = stream_channel_logs(Channel, [{num_logs, "invalid"} | Config]), + Headers = proplists:get_value(headers, Props), + ?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)), + ?assertEqual("5", proplists:get_value("logplex-msg-count", Headers)), + ?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)), + ?assertEqual("close", proplists:get_value("connection", Headers)), + Lines = re:split(proplists:get_value(body, Props), "\n", [trim]), + [begin + ct:pal("~p -- ~p~n", [Line, Expected]), + ?assertEqual(match, re:run(Line, Expected, [{capture, none}])), + NBin = list_to_binary(integer_to_list(N)), + ?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line) + end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines)], + Config. + %% ----------------------------------------------------------------------------- %% sessions %% ----------------------------------------------------------------------------- @@ -750,7 +819,15 @@ get_channel_logs(Channel, Config) -> logplex_api_SUITE:get_(Url, Opts). stream_channel_logs(Channel, Config) -> - Url = ?config(api_v3_url, Config) ++ "/v3/channels/" ++ Channel ++ "/logs", + NumLogs = ?config(num_logs, Config), + QueryString = case NumLogs of + undefined -> ""; + _ when is_integer(NumLogs) -> + "?num=" ++ integer_to_list(NumLogs); + _ when is_list(NumLogs) -> + "?num=" ++ NumLogs + end, + Url = ?config(api_v3_url, Config) ++ "/v3/channels/" ++ Channel ++ "/logs" ++ QueryString, Headers = [{"Authorization", ?config(auth, Config)}], Opts = [{headers, Headers}, {timeout, timer:seconds(10)},