Skip to content

Commit

Permalink
feat: Connect to tenant database on channel join (#677)
Browse files Browse the repository at this point in the history
Before Realtime Channel join it checks if we're able to connect to Tenant database. If not we exit and avoid the client from connecting with the Channel.

How it works:

* One running GenServer for each node in the node closest to the tenant
* On GenServer init/1 check database connection with Postgrex and a SELECT 1 query
  * If successful, store in GenServer metadata the connection
  * If failed, GenServer dies
* GenServer tracks shutdown of DBConnection process and dies, cleaning the :syn registry in the process

Other changes:
* SynHandler made more reusable by allowing multiple modules
* Realtime.Nodes made with common functions required for node management
* Realtime.Helpers with extra functions for future refactors
  • Loading branch information
filipecabaco authored Oct 12, 2023
1 parent 6f77a0d commit 29152cb
Show file tree
Hide file tree
Showing 24 changed files with 956 additions and 629 deletions.
5 changes: 2 additions & 3 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Extensions.PostgresCdcRls do
require Logger

alias RealtimeWeb.Endpoint
alias Realtime.PostgresCdc
alias Extensions.PostgresCdcRls, as: Rls
alias Rls.Subscriptions

Expand Down Expand Up @@ -61,8 +60,8 @@ defmodule Extensions.PostgresCdcRls do
## Internal functions

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
8 changes: 4 additions & 4 deletions lib/extensions/postgres_cdc_rls/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
"""
use Supervisor

alias Extensions.PostgresCdcRls, as: Rls
alias Extensions.PostgresCdcRls

@spec start_link :: :ignore | {:error, any} | {:ok, pid}
def start_link() do
Expand All @@ -15,16 +15,16 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
def init(_args) do
load_migrations_modules()

:syn.set_event_handler(Rls.SynHandler)
:syn.add_node_to_scopes([Rls])
:syn.set_event_handler(Realtime.SynHandler)
:syn.add_node_to_scopes([PostgresCdcRls])

children = [
{
PartitionSupervisor,
partitions: 20,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Rls.DynamicSupervisor
name: PostgresCdcRls.DynamicSupervisor
}
]

Expand Down
69 changes: 0 additions & 69 deletions lib/extensions/postgres_cdc_rls/syn_handler.ex

This file was deleted.

12 changes: 4 additions & 8 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Extensions.PostgresCdcStream do

require Logger

alias Realtime.PostgresCdc
alias Extensions.PostgresCdcStream, as: Stream

def handle_connect(opts) do
Expand Down Expand Up @@ -47,17 +46,14 @@ defmodule Extensions.PostgresCdcStream do
def get_manager_conn(id) do
Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id)
|> case do
[] ->
{:error, nil}

[{_, %{manager_pid: pid, conn: conn}}] ->
{:ok, pid, conn}
[] -> {:error, nil}
[{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn}
end
end

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
9 changes: 7 additions & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ defmodule Realtime.Application do
name: Realtime.Registry.Unique
)

:syn.add_node_to_scopes([:users, RegionNodes])
:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
:ok = :syn.add_node_to_scopes([:users, RegionNodes])
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())

Expand All @@ -62,7 +63,11 @@ defmodule Realtime.Application do
RealtimeWeb.Presence,
{Task.Supervisor, name: Realtime.TaskSupervisor},
Realtime.Latency,
Realtime.Telemetry.Logger
Realtime.Telemetry.Logger,
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.Tenants.Connect.DynamicSupervisor}
] ++ extensions_supervisors()

children =
Expand Down
80 changes: 78 additions & 2 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ defmodule Realtime.Helpers do
This module includes helper functions for different contexts that can't be union in one module.
"""

alias Realtime.Api.Tenant
alias Realtime.PostgresCdc

require Logger

@spec cancel_timer(reference() | nil) :: non_neg_integer() | false | :ok | nil
def cancel_timer(nil), do: nil
def cancel_timer(ref), do: Process.cancel_timer(ref)
Expand All @@ -21,6 +26,32 @@ defmodule Realtime.Helpers do
|> unpad()
end

@spec connect_db(%{
:host => binary,
:name => binary,
:pass => binary,
:pool => non_neg_integer,
:port => binary,
:queue_target => non_neg_integer,
:socket_opts => list,
:ssl_enforced => boolean,
:user => binary,
optional(any) => any
}) :: {:error, any} | {:ok, pid}
def connect_db(%{
host: host,
port: port,
name: name,
user: user,
pass: pass,
socket_opts: socket_opts,
pool: pool,
queue_target: queue_target,
ssl_enforced: ssl_enforced
}) do
connect_db(host, port, name, user, pass, socket_opts, pool, queue_target, ssl_enforced)
end

@spec connect_db(
String.t(),
String.t(),
Expand Down Expand Up @@ -65,6 +96,51 @@ defmodule Realtime.Helpers do
|> Postgrex.start_link()
end

@cdc "postgres_cdc_rls"
@doc """
Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
"""
@spec check_tenant_connection(Tenant.t()) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(nil), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant) do
tenant
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(fn settings ->
ssl_enforced = default_ssl_param(settings)

host = settings["db_host"]
port = settings["db_port"]
name = settings["db_name"]
user = settings["db_user"]
password = settings["db_password"]
socket_opts = settings["db_socket_opts"]

opts = %{
host: host,
port: port,
name: name,
user: user,
pass: password,
socket_opts: socket_opts,
pool: 1,
queue_target: 1000,
ssl_enforced: ssl_enforced
}

with {:ok, conn} <- connect_db(opts) do
case Postgrex.query(conn, "SELECT 1", []) do
{:ok, _} ->
{:ok, conn}

{:error, e} ->
Logger.error("Error connecting to tenant database: #{inspect(e)}")
{:error, :tenant_database_unavailable}
end
end
end)
end

@spec default_ssl_param(map) :: boolean
def default_ssl_param(%{"ssl_enforced" => ssl_enforced}) when is_boolean(ssl_enforced),
do: ssl_enforced
Expand Down Expand Up @@ -198,8 +274,8 @@ defmodule Realtime.Helpers do
Enum.reduce(:syn.group_names(:users), 0, fn tenant, acc ->
case :syn.lookup(Extensions.PostgresCdcRls, tenant) do
{pid, %{region: region}} ->
platform_region = Realtime.PostgresCdc.platform_region_translator(region)
launch_node = Realtime.PostgresCdc.launch_node(tenant, platform_region, false)
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, false)
current_node = node(pid)

case launch_node do
Expand Down
118 changes: 118 additions & 0 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Realtime.Nodes do
@moduledoc """
Handles common needs for :syn module operations
"""
require Logger
alias Realtime.Api.Tenant

@doc """
Gets the node to launch the Postgres connection on for a tenant.
"""
@spec get_node_for_tenant(Tenant.t()) :: {:ok, node()} | {:error, term()}
def get_node_for_tenant(nil), do: {:error, :tenant_not_found}

def get_node_for_tenant(%Tenant{extensions: extensions, external_id: tenant_id}) do
with region <- get_region(extensions),
tenant_region <- platform_region_translator(region),
node <- launch_node(tenant_id, tenant_region, node()) do
{:ok, node}
end
end

defp get_region(extensions) do
extensions
|> Enum.map(fn %{settings: %{"region" => region}} -> region end)
|> Enum.uniq()
|> hd()
end

@doc """
Translates a region from a platform to the closest Supabase tenant region
"""
@spec platform_region_translator(String.t()) :: nil | binary()
def platform_region_translator(tenant_region) when is_binary(tenant_region) do
platform = Application.get_env(:realtime, :platform)
region_mapping(platform, tenant_region)
end

defp region_mapping(:aws, tenant_region) do
case tenant_region do
"us-west-1" -> "us-west-1"
"us-west-2" -> "us-west-1"
"us-east-1" -> "us-east-1"
"sa-east-1" -> "us-east-1"
"ca-central-1" -> "us-east-1"
"ap-southeast-1" -> "ap-southeast-1"
"ap-northeast-1" -> "ap-southeast-1"
"ap-northeast-2" -> "ap-southeast-1"
"ap-southeast-2" -> "ap-southeast-2"
"ap-south-1" -> "ap-southeast-1"
"eu-west-1" -> "eu-west-2"
"eu-west-2" -> "eu-west-2"
"eu-west-3" -> "eu-west-2"
"eu-central-1" -> "eu-west-2"
_ -> nil
end
end

defp region_mapping(:fly, tenant_region) do
case tenant_region do
"us-east-1" -> "iad"
"us-west-1" -> "sea"
"sa-east-1" -> "iad"
"ca-central-1" -> "iad"
"ap-southeast-1" -> "syd"
"ap-northeast-1" -> "syd"
"ap-northeast-2" -> "syd"
"ap-southeast-2" -> "syd"
"ap-south-1" -> "syd"
"eu-west-1" -> "lhr"
"eu-west-2" -> "lhr"
"eu-west-3" -> "lhr"
"eu-central-1" -> "lhr"
_ -> nil
end
end

defp region_mapping(_, tenant_region), do: tenant_region

@doc """
Lists the nodes in a region. Sorts by node name in case the list order
is unstable.
"""

@spec region_nodes(String.t()) :: [atom()]
def region_nodes(region) when is_binary(region) do
:syn.members(RegionNodes, region)
|> Enum.map(fn {_pid, [node: node]} -> node end)
|> Enum.sort()
end

@doc """
Picks the node to launch the Postgres connection on.
If there are not two nodes in a region the connection is established from
the `default` node given.
"""
@spec launch_node(String.t(), String.t(), atom()) :: atom()
def launch_node(tenant_id, region, default) do
case region_nodes(region) do
[node] ->
Logger.warning(
"Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}"
)

default

[] ->
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
default

regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant_id, member_count)

Enum.fetch!(regions_nodes, index)
end
end
end
Loading

0 comments on commit 29152cb

Please sign in to comment.