|
| 1 | +defmodule Cadet.Logger.CloudWatchLogger do |
| 2 | + @moduledoc """ |
| 3 | + A custom Logger backend that sends logs to AWS CloudWatch. |
| 4 | + This backend can be configured to log at different levels and formats, |
| 5 | + and can include specific metadata in the logs. |
| 6 | + """ |
| 7 | + |
| 8 | + @behaviour :gen_event |
| 9 | + require Logger |
| 10 | + |
| 11 | + defstruct [ |
| 12 | + :level, |
| 13 | + :format, |
| 14 | + :metadata, |
| 15 | + :log_group, |
| 16 | + :log_stream, |
| 17 | + :buffer, |
| 18 | + :timer_ref |
| 19 | + ] |
| 20 | + |
| 21 | + @max_buffer_size 1000 |
| 22 | + @max_retries 3 |
| 23 | + @retry_delay 200 |
| 24 | + @flush_interval 5000 |
| 25 | + @failed_message "Failed to send log to CloudWatch." |
| 26 | + |
| 27 | + @impl true |
| 28 | + def init({__MODULE__, opts}) when is_list(opts) do |
| 29 | + config = configure_merge(read_env(), opts) |
| 30 | + {:ok, init(config, %__MODULE__{})} |
| 31 | + end |
| 32 | + |
| 33 | + @impl true |
| 34 | + def init({__MODULE__, name}) when is_atom(name) do |
| 35 | + config = read_env() |
| 36 | + {:ok, init(config, %__MODULE__{})} |
| 37 | + end |
| 38 | + |
| 39 | + @impl true |
| 40 | + def handle_call({:configure, options}, state) do |
| 41 | + {:ok, :ok, configure(options, state)} |
| 42 | + end |
| 43 | + |
| 44 | + @impl true |
| 45 | + def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do |
| 46 | + %{ |
| 47 | + format: format, |
| 48 | + metadata: metadata, |
| 49 | + buffer: buffer, |
| 50 | + log_stream: log_stream, |
| 51 | + log_group: log_group |
| 52 | + } = state |
| 53 | + |
| 54 | + if meet_level?(level, state.level) and not meet_cloudwatch_error?(msg) do |
| 55 | + formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) |
| 56 | + timestamp = timestamp_from_logger_ts(ts) |
| 57 | + |
| 58 | + log_event = %{ |
| 59 | + "timestamp" => timestamp, |
| 60 | + "message" => IO.chardata_to_string(formatted_msg) |
| 61 | + } |
| 62 | + |
| 63 | + new_buffer = [log_event | buffer] |
| 64 | + |
| 65 | + new_buffer = |
| 66 | + if length(new_buffer) >= @max_buffer_size do |
| 67 | + flush_buffer_async(log_stream, log_group, new_buffer) |
| 68 | + [] |
| 69 | + else |
| 70 | + new_buffer |
| 71 | + end |
| 72 | + |
| 73 | + {:ok, %{state | buffer: new_buffer}} |
| 74 | + else |
| 75 | + {:ok, state} |
| 76 | + end |
| 77 | + end |
| 78 | + |
| 79 | + @impl true |
| 80 | + def handle_info(:flush_buffer, state) do |
| 81 | + %{buffer: buffer, timer_ref: timer_ref, log_stream: log_stream, log_group: log_group} = state |
| 82 | + |
| 83 | + if timer_ref, do: Process.cancel_timer(timer_ref) |
| 84 | + |
| 85 | + new_state = |
| 86 | + if length(buffer) > 0 do |
| 87 | + flush_buffer_sync(log_stream, log_group, buffer) |
| 88 | + %{state | buffer: []} |
| 89 | + else |
| 90 | + state |
| 91 | + end |
| 92 | + |
| 93 | + new_timer_ref = schedule_flush(@flush_interval) |
| 94 | + {:ok, %{new_state | timer_ref: new_timer_ref}} |
| 95 | + end |
| 96 | + |
| 97 | + @impl true |
| 98 | + def terminate(_reason, state) do |
| 99 | + %{log_stream: log_stream, log_group: log_group, buffer: buffer, timer_ref: timer_ref} = state |
| 100 | + |
| 101 | + if timer_ref, do: Process.cancel_timer(timer_ref) |
| 102 | + flush_buffer_sync(log_stream, log_group, buffer) |
| 103 | + :ok |
| 104 | + end |
| 105 | + |
| 106 | + def handle_event(_, state), do: {:ok, state} |
| 107 | + def handle_call(_, state), do: {:ok, :ok, state} |
| 108 | + def handle_info(_, state), do: {:ok, state} |
| 109 | + |
| 110 | + # Helpers |
| 111 | + defp configure(options, state) do |
| 112 | + config = configure_merge(read_env(), options) |
| 113 | + Application.put_env(:logger, __MODULE__, config) |
| 114 | + init(config, state) |
| 115 | + end |
| 116 | + |
| 117 | + defp normalize_level(lvl) when lvl in [:warn, :warning], do: :warning |
| 118 | + defp normalize_level(lvl), do: lvl |
| 119 | + |
| 120 | + defp meet_level?(_lvl, nil), do: true |
| 121 | + |
| 122 | + defp meet_level?(lvl, min) do |
| 123 | + lvl = normalize_level(lvl) |
| 124 | + min = normalize_level(min) |
| 125 | + Logger.compare_levels(lvl, min) != :lt |
| 126 | + end |
| 127 | + |
| 128 | + defp meet_cloudwatch_error?(msg) when is_binary(msg) do |
| 129 | + String.contains?(msg, @failed_message) |
| 130 | + end |
| 131 | + |
| 132 | + defp meet_cloudwatch_error?(_) do |
| 133 | + false |
| 134 | + end |
| 135 | + |
| 136 | + defp flush_buffer_async(log_stream, log_group, buffer) do |
| 137 | + if length(buffer) > 0 do |
| 138 | + Task.start(fn -> send_to_cloudwatch(log_stream, log_group, buffer) end) |
| 139 | + end |
| 140 | + end |
| 141 | + |
| 142 | + defp flush_buffer_sync(log_stream, log_group, buffer) do |
| 143 | + if length(buffer) > 0 do |
| 144 | + send_to_cloudwatch(log_stream, log_group, buffer) |
| 145 | + end |
| 146 | + end |
| 147 | + |
| 148 | + defp schedule_flush(interval) do |
| 149 | + Process.send_after(self(), :flush_buffer, interval) |
| 150 | + end |
| 151 | + |
| 152 | + defp send_to_cloudwatch(log_stream, log_group, buffer) do |
| 153 | + # Ensure that the already have ExAws authentication configured |
| 154 | + with :ok <- check_exaws_config() do |
| 155 | + operation = build_log_operation(log_stream, log_group, buffer) |
| 156 | + |
| 157 | + operation |
| 158 | + |> send_with_retry() |
| 159 | + end |
| 160 | + end |
| 161 | + |
| 162 | + defp build_log_operation(log_stream, log_group, buffer) do |
| 163 | + # The headers and body structure can be found in the AWS API documentation: |
| 164 | + # https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html |
| 165 | + %ExAws.Operation.JSON{ |
| 166 | + http_method: :post, |
| 167 | + service: :logs, |
| 168 | + headers: [ |
| 169 | + {"x-amz-target", "Logs_20140328.PutLogEvents"}, |
| 170 | + {"content-type", "application/x-amz-json-1.1"} |
| 171 | + ], |
| 172 | + data: %{ |
| 173 | + "logGroupName" => log_group, |
| 174 | + "logStreamName" => log_stream, |
| 175 | + "logEvents" => Enum.reverse(buffer) |
| 176 | + } |
| 177 | + } |
| 178 | + end |
| 179 | + |
| 180 | + defp check_exaws_config do |
| 181 | + id = Application.get_env(:ex_aws, :access_key_id) || System.get_env("AWS_ACCESS_KEY_ID") |
| 182 | + |
| 183 | + secret = |
| 184 | + Application.get_env(:ex_aws, :secret_access_key) || System.get_env("AWS_SECRET_ACCESS_KEY") |
| 185 | + |
| 186 | + region = Application.get_env(:ex_aws, :region) || System.get_env("AWS_REGION") |
| 187 | + |
| 188 | + cond do |
| 189 | + is_nil(id) or id == "" or is_nil(secret) or secret == "" -> |
| 190 | + Logger.error( |
| 191 | + "#{@failed_message} AWS credentials missing. Ensure AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set or configured in ex_aws." |
| 192 | + ) |
| 193 | + |
| 194 | + :error |
| 195 | + |
| 196 | + region in [nil, ""] -> |
| 197 | + Logger.error( |
| 198 | + "#{@failed_message} AWS region not configured. Ensure AWS_REGION is set or configured in ex_aws." |
| 199 | + ) |
| 200 | + |
| 201 | + :error |
| 202 | + |
| 203 | + true -> |
| 204 | + :ok |
| 205 | + end |
| 206 | + end |
| 207 | + |
| 208 | + defp send_with_retry(operation, retries \\ @max_retries) |
| 209 | + |
| 210 | + defp send_with_retry(operation, retries) when retries > 0 do |
| 211 | + case request(operation) do |
| 212 | + {:ok, _response} -> |
| 213 | + :ok |
| 214 | + |
| 215 | + {:error, reason} -> |
| 216 | + Logger.error("#{@failed_message} #{inspect(reason)}. Retrying...") |
| 217 | + # Wait before retrying |
| 218 | + :timer.sleep(@retry_delay) |
| 219 | + send_with_retry(operation, retries - 1) |
| 220 | + end |
| 221 | + end |
| 222 | + |
| 223 | + defp send_with_retry(_, 0) do |
| 224 | + Logger.error("#{@failed_message} After multiple retries.") |
| 225 | + end |
| 226 | + |
| 227 | + defp init(config, state) do |
| 228 | + level = Keyword.get(config, :level) |
| 229 | + format = Logger.Formatter.compile(Keyword.get(config, :format)) |
| 230 | + raw_metadata = Keyword.get(config, :metadata, []) |
| 231 | + metadata = configure_metadata(raw_metadata) |
| 232 | + log_group = Keyword.get(config, :log_group, "cadet-logs") |
| 233 | + log_stream = Keyword.get(config, :log_stream, "#{node()}-#{:os.system_time(:second)}") |
| 234 | + timer_ref = schedule_flush(@flush_interval) |
| 235 | + |
| 236 | + %{ |
| 237 | + state |
| 238 | + | level: level, |
| 239 | + format: format, |
| 240 | + metadata: metadata, |
| 241 | + log_group: log_group, |
| 242 | + log_stream: log_stream, |
| 243 | + buffer: [], |
| 244 | + timer_ref: timer_ref |
| 245 | + } |
| 246 | + end |
| 247 | + |
| 248 | + defp configure_metadata(:all), do: :all |
| 249 | + defp configure_metadata(metadata), do: Enum.reverse(metadata) |
| 250 | + |
| 251 | + defp take_metadata(metadata, :all) do |
| 252 | + metadata |
| 253 | + end |
| 254 | + |
| 255 | + defp take_metadata(metadata, keys) do |
| 256 | + Enum.reduce(keys, [], fn key, acc -> |
| 257 | + case Keyword.fetch(metadata, key) do |
| 258 | + {:ok, val} -> [{key, val} | acc] |
| 259 | + :error -> acc |
| 260 | + end |
| 261 | + end) |
| 262 | + end |
| 263 | + |
| 264 | + defp timestamp_from_logger_ts({{year, month, day}, {hour, minute, second, microsecond}}) do |
| 265 | + datetime = %DateTime{ |
| 266 | + year: year, |
| 267 | + month: month, |
| 268 | + day: day, |
| 269 | + hour: hour, |
| 270 | + minute: minute, |
| 271 | + second: second, |
| 272 | + microsecond: {microsecond, 6}, |
| 273 | + time_zone: "Etc/UTC", |
| 274 | + zone_abbr: "UTC", |
| 275 | + utc_offset: 0, |
| 276 | + std_offset: 0 |
| 277 | + } |
| 278 | + |
| 279 | + DateTime.to_unix(datetime, :millisecond) |
| 280 | + end |
| 281 | + |
| 282 | + defp read_env do |
| 283 | + Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) |
| 284 | + end |
| 285 | + |
| 286 | + """ |
| 287 | + Merges the given options with the existing environment configuration. |
| 288 | + If a key exists in both, the value from `options` will take precedence. |
| 289 | + """ |
| 290 | + |
| 291 | + defp configure_merge(env, options) do |
| 292 | + Keyword.merge(env, options, fn |
| 293 | + _, _v1, v2 -> v2 |
| 294 | + end) |
| 295 | + end |
| 296 | + |
| 297 | + defp request(operation) do |
| 298 | + client = Application.get_env(:ex_aws, :ex_aws_mock, ExAws) |
| 299 | + client.request(operation) |
| 300 | + end |
| 301 | +end |
0 commit comments