Skip to content

Commit

Permalink
fix: Disconnect when tenant has no users (#701)
Browse files Browse the repository at this point in the history
To ensure that we limit the amount of connections to a tenant database, we will check if that tenant has any users and if not, we kill the database connection.
  • Loading branch information
filipecabaco authored Oct 20, 2023
1 parent 0c6328c commit e14c1c8
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 74 deletions.
103 changes: 88 additions & 15 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,40 @@ defmodule Realtime.Tenants.Connect do
@moduledoc """
This module is responsible for attempting to connect to a tenant's database and store the DBConnection in a Syn registry.
"""
use GenServer
use GenServer, restart: :transient

require Logger

alias Realtime.Helpers
alias Realtime.Tenants
alias Realtime.UsersCounter

defstruct tenant_id: nil, db_conn_reference: nil
@erpc_timeout_default 5000
@check_connected_user_interval_default 1000
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]

defstruct tenant_id: nil,
db_conn_reference: nil,
db_conn_pid: nil,
check_connected_user_interval: nil,
connected_users_bucket: [1]

@doc """
Returns the database connection for a tenant. If the tenant is not connected, it will attempt to connect to the tenant's database.
"""
@spec lookup_or_start_connection(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
def lookup_or_start_connection(tenant_id) do
@spec lookup_or_start_connection(binary(), keyword()) ::
{:ok, DBConnection.t()} | {:error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) do
case get_status(tenant_id) do
{:ok, conn} -> {:ok, conn}
{:error, :tenant_database_unavailable} -> call_external_node(tenant_id)
{:error, :tenant_database_unavailable} -> call_external_node(tenant_id, opts)
{:error, :initializing} -> {:error, :tenant_database_unavailable}
end
end

@doc """
Returns the database connection pid from :syn if it exists.
"""

@spec get_status(binary()) ::
{:ok, DBConnection.t()} | {:error, :tenant_database_unavailable | :initializing}
def get_status(tenant_id) do
Expand All @@ -40,10 +49,10 @@ defmodule Realtime.Tenants.Connect do
@doc """
Connects to a tenant's database and stores the DBConnection in the process :syn metadata
"""
@spec connect(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
def connect(tenant_id) do
@spec connect(binary(), keyword()) :: {:ok, DBConnection.t()} | {:error, term()}
def connect(tenant_id, opts \\ []) do
supervisor = {:via, PartitionSupervisor, {Realtime.Tenants.Connect.DynamicSupervisor, self()}}
spec = {__MODULE__, tenant_id: tenant_id}
spec = {__MODULE__, [tenant_id: tenant_id] ++ opts}

case DynamicSupervisor.start_child(supervisor, spec) do
{:ok, _} -> get_status(tenant_id)
Expand All @@ -52,9 +61,20 @@ defmodule Realtime.Tenants.Connect do
end
end

def start_link(tenant_id: tenant_id) do
def start_link(opts) do
tenant_id = Keyword.get(opts, :tenant_id)

check_connected_user_interval =
Keyword.get(opts, :check_connected_user_interval, @check_connected_user_interval_default)

name = {__MODULE__, tenant_id, %{conn: nil}}
GenServer.start_link(__MODULE__, %__MODULE__{tenant_id: tenant_id}, name: {:via, :syn, name})

state = %__MODULE__{
tenant_id: tenant_id,
check_connected_user_interval: check_connected_user_interval
}

GenServer.start_link(__MODULE__, state, name: {:via, :syn, name})
end

## GenServer callbacks
Expand All @@ -66,9 +86,9 @@ defmodule Realtime.Tenants.Connect do
case res do
{:ok, conn} ->
:syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | conn: conn} end)
state = %{state | db_conn_reference: Process.monitor(conn)}
state = %{state | db_conn_reference: Process.monitor(conn), db_conn_pid: conn}

{:ok, state}
{:ok, state, {:continue, :setup_connected_users}}

{:error, error} ->
Logger.error("Error connecting to tenant database: #{inspect(error)}")
Expand All @@ -77,6 +97,41 @@ defmodule Realtime.Tenants.Connect do
end
end

@impl GenServer
def handle_continue(
:setup_connected_users,
%{
check_connected_user_interval: check_connected_user_interval,
connected_users_bucket: connected_users_bucket
} = state
) do
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)
{:noreply, state}
end

@impl GenServer
def handle_info(
:check_connected_users,
%{
tenant_id: tenant_id,
check_connected_user_interval: check_connected_user_interval,
connected_users_bucket: connected_users_bucket
} = state
) do
connected_users_bucket =
tenant_id
|> update_connected_users_bucket(connected_users_bucket)
|> send_connected_user_check_message(check_connected_user_interval)

{:noreply, %{state | connected_users_bucket: connected_users_bucket}}
end

def handle_info(:shutdown, %{db_conn_pid: db_conn_pid} = state) do
Logger.info("Tenant has no connected users, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid)
{:stop, :normal, state}
end

@impl GenServer
def handle_info(
{:DOWN, db_conn_reference, _, _, _},
Expand All @@ -88,10 +143,28 @@ defmodule Realtime.Tenants.Connect do

## Private functions

defp call_external_node(tenant_id) do
defp call_external_node(tenant_id, opts) do
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
:erpc.call(node, __MODULE__, :connect, [tenant_id], 5000)
:erpc.call(node, __MODULE__, :connect, [tenant_id, opts], @erpc_timeout_default)
end
end

defp update_connected_users_bucket(tenant_id, connected_users_bucket) do
connected_users_bucket
|> then(&(&1 ++ [UsersCounter.tenant_users(tenant_id)]))
|> Enum.take(-6)
end

defp send_connected_user_check_message(
@connected_users_bucket_shutdown,
check_connected_user_interval
) do
Process.send_after(self(), :shutdown, check_connected_user_interval)
end

defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do
Process.send_after(self(), :check_connected_users, check_connected_user_interval)
connected_users_bucket
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.25.4",
version: "2.25.5",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
119 changes: 61 additions & 58 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -112,72 +112,75 @@ defmodule Realtime.Integration.RtChannelTest do
P.query!(conn, "insert into test (details) values ('test')", [])

assert_receive %Message{
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"record" => %{"details" => "test", "id" => id},
"schema" => "public",
"table" => "test",
"type" => "INSERT"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
}
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"record" => %{"details" => "test", "id" => id},
"schema" => "public",
"table" => "test",
"type" => "INSERT"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
},
2000

P.query!(conn, "update test set details = 'test' where id = #{id}", [])

assert_receive %Message{
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"old_record" => %{"id" => ^id},
"record" => %{"details" => "test", "id" => ^id},
"schema" => "public",
"table" => "test",
"type" => "UPDATE"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
}
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"old_record" => %{"id" => ^id},
"record" => %{"details" => "test", "id" => ^id},
"schema" => "public",
"table" => "test",
"type" => "UPDATE"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
},
2000

P.query!(conn, "delete from test where id = #{id}", [])

assert_receive %Message{
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"old_record" => %{"id" => ^id},
"schema" => "public",
"table" => "test",
"type" => "DELETE"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
}
event: "postgres_changes",
payload: %{
"data" => %{
"columns" => [
%{"name" => "id", "type" => "int4"},
%{"name" => "details", "type" => "text"}
],
"commit_timestamp" => _ts,
"errors" => nil,
"old_record" => %{"id" => ^id},
"schema" => "public",
"table" => "test",
"type" => "DELETE"
},
"ids" => [^sub_id]
},
ref: nil,
topic: "realtime:any"
},
2000
end

test "broadcast" do
Expand Down
48 changes: 48 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Realtime.Tenants.ConnectTest do
use Realtime.DataCase, async: false

alias Realtime.Tenants.Connect
alias Realtime.UsersCounter

describe "lookup_or_start_connection/1" do
setup do
Expand Down Expand Up @@ -50,5 +52,51 @@ defmodule Realtime.Tenants.ConnectTest do
test "if tenant does not exist, returns error" do
assert Connect.lookup_or_start_connection("none") == {:error, :tenant_not_found}
end

test "if no users are connected to a tenant channel, stop the connection", %{
tenant: %{external_id: tenant_id}
} do
{:ok, db_conn} =
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 50)

# Not enough time has passed, connection still alive
:timer.sleep(100)
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)

# Enough time has passed, connection stopped
:timer.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
end

test "if users are connected to a tenant channel, keep the connection", %{
tenant: %{external_id: tenant_id}
} do
UsersCounter.add(self(), tenant_id)

{:ok, db_conn} =
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10)

assert {pid, %{conn: conn_pid}} = :syn.lookup(Connect, tenant_id)
:timer.sleep(300)
assert {^pid, %{conn: ^conn_pid}} = :syn.lookup(Connect, tenant_id)
assert Process.alive?(db_conn)
end

test "connection is killed after user leaving", %{
tenant: %{external_id: tenant_id}
} do
UsersCounter.add(self(), tenant_id)

{:ok, db_conn} =
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10)

assert {_pid, %{conn: _conn_pid}} = :syn.lookup(Connect, tenant_id)
:timer.sleep(300)
:syn.leave(:users, tenant_id, self())
:timer.sleep(300)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
end
end
end

0 comments on commit e14c1c8

Please sign in to comment.