Skip to content

Commit

Permalink
fix: code cleanup on pool_size_by_application_name (#1242)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Dec 3, 2024
1 parent 4b9cfa1 commit 22c1ba8
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 70 deletions.
94 changes: 41 additions & 53 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ defmodule Realtime.Database do
settings,
application_name,
backoff \\ :rand_exp,
decrypt \\ false,
pool \\ nil
decrypt \\ false
) do
pool = pool_size_by_application_name(application_name, settings, pool)
pool = pool_size_by_application_name(application_name, settings)

settings =
if decrypt do
Expand Down Expand Up @@ -80,67 +79,56 @@ defmodule Realtime.Database do
`realtime_migrations` will be handled as a special scenario as it requires 2 connections.
## Examples
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{}, 1)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10}, 1)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10}, nil)
10
iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10})
10
iex> Realtime.Database.pool_size_by_application_name("realtime_potato", %{}, nil)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_potato", %{})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}, nil)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}, 10)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"subs_pool_size" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"subcriber_pool_size" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}, nil)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}, 10)
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"subs_pool_size" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"subcriber_pool_size" => 10})
1
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}, nil)
2
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10})
2
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}, 10)
2
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"subs_pool_size" => 10})
2
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10})
2
iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"subcriber_pool_size" => 10})
2
"""
@spec pool_size_by_application_name(binary(), map(), non_neg_integer() | nil) ::
non_neg_integer()
def pool_size_by_application_name(application_name, settings, override_pool \\ nil) do
pool =
case application_name do
"realtime_subscription_manager" -> settings["subcriber_pool_size"]
"realtime_subscription_manager_pub" -> settings["subs_pool_size"]
"realtime_subscription_checker" -> settings["subs_pool_size"]
"realtime_connect" -> settings["db_pool"]
"realtime_health_check" -> 1
"realtime_janitor" -> 1
_ -> 1
end

@spec pool_size_by_application_name(binary(), map() | nil) :: non_neg_integer()
def pool_size_by_application_name(application_name, settings) do
case application_name do
"realtime_rls" -> 1
"realtime_broadcast_changes" -> 1
"realtime_subscription_manager" -> settings["subcriber_pool_size"] || 1
"realtime_subscription_manager_pub" -> settings["subs_pool_size"] || 1
"realtime_subscription_checker" -> settings["subs_pool_size"] || 1
"realtime_connect" -> settings["db_pool"] || 1
"realtime_health_check" -> 1
"realtime_janitor" -> 1
"realtime_migrations" -> 2
_ -> if override_pool, do: override_pool, else: pool || 1
"realtime_broadcast_changes" -> 1
"realtime_rls" -> 1
"realtime_replication_slot_teardown" -> 1
_ -> 1
end
end

Expand Down Expand Up @@ -220,10 +208,10 @@ defmodule Realtime.Database do
end)
end

def connect(tenant, application_name, pool, backoff \\ :stop) do
def connect(tenant, application_name, backoff \\ :stop) do
tenant
|> then(&Realtime.PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(&Realtime.Database.from_settings(&1, application_name, backoff, false, pool))
|> then(&Realtime.Database.from_settings(&1, application_name, backoff, false))
|> connect_db()
end

Expand Down Expand Up @@ -286,7 +274,7 @@ defmodule Realtime.Database do
"""
@spec replication_slot_teardown(Tenant.t()) :: :ok
def replication_slot_teardown(tenant) do
{:ok, conn} = connect(tenant, "realtime_replication_slot_teardown", 1)
{:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")

query =
"select active_pid from pg_replication_slots where slot_name ilike '%realtime%'"
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Realtime.Tenants do

connected_cluster when is_integer(connected_cluster) ->
tenant = Cache.get_tenant_by_external_id(external_id)
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check", 1)
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check")
Migrations.maybe_run_migrations(db_conn, tenant)
Process.alive?(db_conn) && GenServer.stop(db_conn)

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule Realtime.Tenants.Janitor do
Logger.info("Janitor starting realtime.messages cleanup")

with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor"),
:ok <- Messages.delete_old_messages(conn),
:ok <- Migrations.create_partitions(conn) do
Logger.info("Janitor finished")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.64",
version: "2.33.65",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
4 changes: 2 additions & 2 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ defmodule Realtime.Integration.RtChannelTest do

def rls_context(%{tenant: tenant} = context) do
{:ok, db_conn} =
Database.connect(tenant, "realtime_test", 1)
Database.connect(tenant, "realtime_test")

clean_table(db_conn, "realtime", "messages")
topic = Map.get(context, :topic, random_string())
Expand Down Expand Up @@ -1028,7 +1028,7 @@ defmodule Realtime.Integration.RtChannelTest do
Postgrex.query!(db_conn, query, [])

on_exit(fn ->
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, db_conn} = Database.connect(tenant, "realtime_test")
query = "DROP TABLE #{random_name} CASCADE"
Postgrex.query!(db_conn, query, [])
Realtime.Tenants.Connect.shutdown(db_conn)
Expand Down
4 changes: 2 additions & 2 deletions test/realtime/broadcast_changes/handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")
clean_table(conn, "realtime", "messages")

publication =
Expand Down Expand Up @@ -108,7 +108,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do
})
end

Database.connect(tenant, "realtime_test", 1)
Database.connect(tenant, "realtime_test")
Realtime.Repo.insert_all_entries(Message, messages, Message)
:timer.sleep(500)

Expand Down
4 changes: 2 additions & 2 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Realtime.DatabaseTest do
pid =
start_supervised!({Extensions.PostgresCdcStream.Replication, args}, restart: :transient)

{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")
# Check replication slot was created
assert %{rows: [["supabase_realtime_replication_slot"]]} =
Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", [])
Expand All @@ -34,7 +34,7 @@ defmodule Realtime.DatabaseTest do
describe "transaction/1" do
setup do
tenant = tenant_fixture()
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1, :stop)
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
%{db_conn: db_conn}
end

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Realtime.MessagesTest do
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")
clean_table(conn, "realtime", "messages")
date_start = Date.utc_today() |> Date.add(-10)
date_end = Date.utc_today()
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.RepoTest do

setup do
tenant = tenant_fixture()
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, db_conn} = Database.connect(tenant, "realtime_test")

[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Expand Down
4 changes: 2 additions & 2 deletions test/realtime/tenants/authorization_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ defmodule Realtime.Tenants.AuthorizationTest do
context.authorization_context
)

{:ok, db_conn} = Database.connect(context.tenant, "realtime_test", 1)
{:ok, db_conn} = Database.connect(context.tenant, "realtime_test")
assert {:ok, []} = Repo.all(db_conn, Message, Message)
end
end
Expand All @@ -163,7 +163,7 @@ defmodule Realtime.Tenants.AuthorizationTest do
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, db_conn} = Database.connect(tenant, "realtime_test")

clean_table(db_conn, "realtime", "messages")
topic = random_string()
Expand Down
4 changes: 2 additions & 2 deletions test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Realtime.Tenants.JanitorTest do
tenant = Repo.preload(tenant, :extensions)
Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(250)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")
clean_table(conn, "realtime", "messages")
tenant
end
Expand Down Expand Up @@ -80,7 +80,7 @@ defmodule Realtime.Tenants.JanitorTest do

current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/migrations_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Realtime.Tenants.MigrationsTest do

test "migrations for a given tenant only run once", %{tenant: tenant} do
%{extensions: [%{settings: settings}]} = tenant
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, conn} = Database.connect(tenant, "realtime_test")

Postgrex.query!(conn, "DROP SCHEMA realtime CASCADE", [])
Postgrex.query!(conn, "CREATE SCHEMA realtime", [])
Expand Down
2 changes: 1 addition & 1 deletion test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Generators do
end

def message_fixture(tenant, override \\ %{}) do
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, db_conn} = Database.connect(tenant, "realtime_test")
Realtime.Tenants.Migrations.create_partitions(db_conn)

create_attrs = %{
Expand Down

0 comments on commit 22c1ba8

Please sign in to comment.