Skip to content

Commit

Permalink
fix: Add rpc metrics to promex (#770)
Browse files Browse the repository at this point in the history
Add rpc metrics to promex
  • Loading branch information
filipecabaco authored Dec 27, 2023
1 parent 711f022 commit 3ca4c1a
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 40 deletions.
4 changes: 2 additions & 2 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Extensions.PostgresCdcRls do
Subscriptions,
:create,
opts,
15_000
timeout: 15_000
)
else
apply(Subscriptions, :create, opts)
Expand Down Expand Up @@ -68,7 +68,7 @@ defmodule Extensions.PostgresCdcRls do
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
)

case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
case Rpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000, tenant: tenant) do
{:ok, _pid} = ok ->
ok

Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
if node == node() do
acc ++ not_alive_pids(pids)
else
case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], 15_000) do
case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], timeout: 15_000) do
{:badrpc, _} = error ->
Logger.error("Can't check pids on node #{inspect(node)}: #{inspect(error)}")
acc
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ defmodule Extensions.PostgresCdcStream do
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
)

case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
case Rpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000) do
{:ok, _pid} = ok ->
ok

Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ defmodule Realtime.Helpers do
[node() | Node.list()]
|> Task.async_stream(
fn node ->
Rpc.ecall(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason], 5000)
Rpc.enhanced_call(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason],
timeout: 5000
)
end,
timeout: 5000
)
Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/monitoring/latency.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ defmodule Realtime.Latency do
for n <- [Node.self() | Node.list()] do
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
{latency, response} =
:timer.tc(fn -> Rpc.call(n, __MODULE__, :pong, [pong_timeout], timer_timeout) end)
:timer.tc(fn ->
Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
end)

latency_ms = latency / 1_000
region = Application.get_env(:realtime, :region, "not_set")
Expand Down
23 changes: 20 additions & 3 deletions lib/realtime/monitoring/prom_ex/plugins/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,34 @@ defmodule Realtime.PromEx.Plugins.Tenants do
@moduledoc false

use PromEx.Plugin

alias PromEx.MetricTypes.Event

require Logger

@event_connected [:prom_ex, :plugin, :realtime, :tenants, :connected]

@impl true
def event_metrics(opts) do
rpc_metrics(opts)
end

defp rpc_metrics(_opts) do
Event.build(:realtime, [
sum(
[:realtime, :tenants, :rpc],
event_name: [:realtime, :tenants, :rpc],
description: "The total latency of rpc calls triggered by a tenant action",
measurement: :latency
)
])
end

@impl true
def polling_metrics(opts) do
poll_rate = Keyword.get(opts, :poll_rate)

[
metrics(poll_rate)
]
[metrics(poll_rate)]
end

defp metrics(poll_rate) do
Expand Down
56 changes: 28 additions & 28 deletions lib/realtime/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,48 @@ defmodule Realtime.Rpc do
@doc """
Calls external node using :rpc.call/5 and collects telemetry
"""
def call(node, mod, func, opts \\ [], timeout \\ 15000) do
{latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, opts, timeout) end)
@spec call(atom(), atom(), atom(), any(), keyword()) :: any()
def call(node, mod, func, args, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 15000)
{latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, args, timeout) end)
tenant = Keyword.get(opts, :tenant, nil)

Telemetry.execute([:rpc, :call], latency, %{
mod: mod,
func: func,
target_node: node,
origin_node: node()
})

response
rescue
_ ->
Telemetry.execute([:erpc, :call], timeout, %{
Telemetry.execute(
[:realtime, :tenants, :rpc],
%{latency: latency},
%{
tenant: tenant,
mod: mod,
func: func,
target_node: node,
origin_node: node()
})
}
)

response
end

@doc """
Calls external node using :erpc.call/5 and collects telemetry
"""
def ecall(node, mod, func, opts \\ [], timeout \\ 15000) do
{latency, response} = :timer.tc(fn -> :erpc.call(node, mod, func, opts, timeout) end)
@spec enhanced_call(atom(), atom(), atom(), any(), keyword()) :: any()
def enhanced_call(node, mod, func, args \\ [], opts \\ []) do
timeout = Keyword.get(opts, :timeout, 15000)
{latency, response} = :timer.tc(fn -> :erpc.call(node, mod, func, args, timeout) end)
tenant = Keyword.get(opts, :tenant, nil)

Telemetry.execute([:erpc, :call], latency, %{
mod: mod,
func: func,
target_node: node,
origin_node: node()
})

response
rescue
_ ->
Telemetry.execute([:erpc, :call], timeout, %{
Telemetry.execute(
[:realtime, :tenants, :rpc],
%{latency: latency},
%{
tenant: tenant,
mod: mod,
func: func,
target_node: node,
origin_node: node()
})
}
)

response
end
end
5 changes: 4 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ defmodule Realtime.Tenants.Connect do
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
:ok <- tenant_suspended?(tenant),
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
Rpc.ecall(node, __MODULE__, :connect, [tenant_id, opts], erpc_timeout)
Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, opts],
timeout: erpc_timeout,
tenant: tenant_id
)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/controllers/metrics_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule RealtimeWeb.MetricsController do
Node.list()
|> Task.async_stream(
fn node ->
{node, Rpc.call(node, PromEx, :get_metrics, [], 10_000)}
{node, Rpc.call(node, PromEx, :get_metrics, [], timeout: 10_000)}
end,
timeout: :infinity
)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.25.52",
version: "2.25.53",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down

0 comments on commit 3ca4c1a

Please sign in to comment.