From 3ca4c1a8912f1504e6bc2971b49d9aa6d5369b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Wed, 27 Dec 2023 23:31:13 +0000 Subject: [PATCH] fix: Add rpc metrics to promex (#770) Add rpc metrics to promex --- lib/extensions/postgres_cdc_rls/cdc_rls.ex | 4 +- .../postgres_cdc_rls/subscriptions_checker.ex | 2 +- .../postgres_cdc_stream/cdc_stream.ex | 2 +- lib/realtime/helpers.ex | 4 +- lib/realtime/monitoring/latency.ex | 4 +- .../monitoring/prom_ex/plugins/tenants.ex | 23 +++++++- lib/realtime/rpc.ex | 56 +++++++++---------- lib/realtime/tenants/connect.ex | 5 +- .../controllers/metrics_controller.ex | 2 +- mix.exs | 2 +- 10 files changed, 64 insertions(+), 40 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/cdc_rls.ex b/lib/extensions/postgres_cdc_rls/cdc_rls.ex index 883d2a6b8..62b50b7c4 100644 --- a/lib/extensions/postgres_cdc_rls/cdc_rls.ex +++ b/lib/extensions/postgres_cdc_rls/cdc_rls.ex @@ -37,7 +37,7 @@ defmodule Extensions.PostgresCdcRls do Subscriptions, :create, opts, - 15_000 + timeout: 15_000 ) else apply(Subscriptions, :create, opts) @@ -68,7 +68,7 @@ defmodule Extensions.PostgresCdcRls do "Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}" ) - case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do + case Rpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000, tenant: tenant) do {:ok, _pid} = ok -> ok diff --git a/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex b/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex index 28a9d3c0c..8f9310f96 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex @@ -178,7 +178,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do if node == node() do acc ++ not_alive_pids(pids) else - case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], 15_000) do + case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], timeout: 15_000) do {:badrpc, _} = error -> Logger.error("Can't check pids on node #{inspect(node)}: #{inspect(error)}") acc diff --git a/lib/extensions/postgres_cdc_stream/cdc_stream.ex b/lib/extensions/postgres_cdc_stream/cdc_stream.ex index 82a8f5447..95bf3dede 100644 --- a/lib/extensions/postgres_cdc_stream/cdc_stream.ex +++ b/lib/extensions/postgres_cdc_stream/cdc_stream.ex @@ -60,7 +60,7 @@ defmodule Extensions.PostgresCdcStream do "Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}" ) - case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do + case Rpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000) do {:ok, _pid} = ok -> ok diff --git a/lib/realtime/helpers.ex b/lib/realtime/helpers.ex index 24f406c09..cac4b1bcd 100644 --- a/lib/realtime/helpers.ex +++ b/lib/realtime/helpers.ex @@ -325,7 +325,9 @@ defmodule Realtime.Helpers do [node() | Node.list()] |> Task.async_stream( fn node -> - Rpc.ecall(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason], 5000) + Rpc.enhanced_call(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason], + timeout: 5000 + ) end, timeout: 5000 ) diff --git a/lib/realtime/monitoring/latency.ex b/lib/realtime/monitoring/latency.ex index 6ddb5d98e..665700596 100644 --- a/lib/realtime/monitoring/latency.ex +++ b/lib/realtime/monitoring/latency.ex @@ -95,7 +95,9 @@ defmodule Realtime.Latency do for n <- [Node.self() | Node.list()] do Task.Supervisor.async(Realtime.TaskSupervisor, fn -> {latency, response} = - :timer.tc(fn -> Rpc.call(n, __MODULE__, :pong, [pong_timeout], timer_timeout) end) + :timer.tc(fn -> + Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout) + end) latency_ms = latency / 1_000 region = Application.get_env(:realtime, :region, "not_set") diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex index 49a78b1e8..f564b845f 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex @@ -2,17 +2,34 @@ defmodule Realtime.PromEx.Plugins.Tenants do @moduledoc false use PromEx.Plugin + + alias PromEx.MetricTypes.Event + require Logger @event_connected [:prom_ex, :plugin, :realtime, :tenants, :connected] + @impl true + def event_metrics(opts) do + rpc_metrics(opts) + end + + defp rpc_metrics(_opts) do + Event.build(:realtime, [ + sum( + [:realtime, :tenants, :rpc], + event_name: [:realtime, :tenants, :rpc], + description: "The total latency of rpc calls triggered by a tenant action", + measurement: :latency + ) + ]) + end + @impl true def polling_metrics(opts) do poll_rate = Keyword.get(opts, :poll_rate) - [ - metrics(poll_rate) - ] + [metrics(poll_rate)] end defp metrics(poll_rate) do diff --git a/lib/realtime/rpc.ex b/lib/realtime/rpc.ex index 8c7b1e756..b3d79f198 100644 --- a/lib/realtime/rpc.ex +++ b/lib/realtime/rpc.ex @@ -7,48 +7,48 @@ defmodule Realtime.Rpc do @doc """ Calls external node using :rpc.call/5 and collects telemetry """ - def call(node, mod, func, opts \\ [], timeout \\ 15000) do - {latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, opts, timeout) end) + @spec call(atom(), atom(), atom(), any(), keyword()) :: any() + def call(node, mod, func, args, opts \\ []) do + timeout = Keyword.get(opts, :timeout, 15000) + {latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, args, timeout) end) + tenant = Keyword.get(opts, :tenant, nil) - Telemetry.execute([:rpc, :call], latency, %{ - mod: mod, - func: func, - target_node: node, - origin_node: node() - }) - - response - rescue - _ -> - Telemetry.execute([:erpc, :call], timeout, %{ + Telemetry.execute( + [:realtime, :tenants, :rpc], + %{latency: latency}, + %{ + tenant: tenant, mod: mod, func: func, target_node: node, origin_node: node() - }) + } + ) + + response end @doc """ Calls external node using :erpc.call/5 and collects telemetry """ - def ecall(node, mod, func, opts \\ [], timeout \\ 15000) do - {latency, response} = :timer.tc(fn -> :erpc.call(node, mod, func, opts, timeout) end) + @spec enhanced_call(atom(), atom(), atom(), any(), keyword()) :: any() + def enhanced_call(node, mod, func, args \\ [], opts \\ []) do + timeout = Keyword.get(opts, :timeout, 15000) + {latency, response} = :timer.tc(fn -> :erpc.call(node, mod, func, args, timeout) end) + tenant = Keyword.get(opts, :tenant, nil) - Telemetry.execute([:erpc, :call], latency, %{ - mod: mod, - func: func, - target_node: node, - origin_node: node() - }) - - response - rescue - _ -> - Telemetry.execute([:erpc, :call], timeout, %{ + Telemetry.execute( + [:realtime, :tenants, :rpc], + %{latency: latency}, + %{ + tenant: tenant, mod: mod, func: func, target_node: node, origin_node: node() - }) + } + ) + + response end end diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 9a055877a..4076982ea 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -174,7 +174,10 @@ defmodule Realtime.Tenants.Connect do with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id), :ok <- tenant_suspended?(tenant), {:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do - Rpc.ecall(node, __MODULE__, :connect, [tenant_id, opts], erpc_timeout) + Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, opts], + timeout: erpc_timeout, + tenant: tenant_id + ) end end diff --git a/lib/realtime_web/controllers/metrics_controller.ex b/lib/realtime_web/controllers/metrics_controller.ex index 1c435dc1e..c3196214b 100644 --- a/lib/realtime_web/controllers/metrics_controller.ex +++ b/lib/realtime_web/controllers/metrics_controller.ex @@ -9,7 +9,7 @@ defmodule RealtimeWeb.MetricsController do Node.list() |> Task.async_stream( fn node -> - {node, Rpc.call(node, PromEx, :get_metrics, [], 10_000)} + {node, Rpc.call(node, PromEx, :get_metrics, [], timeout: 10_000)} end, timeout: :infinity ) diff --git a/mix.exs b/mix.exs index fdcfa5830..059cf23e2 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.25.52", + version: "2.25.53", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod,