diff --git a/lib/realtime/database.ex b/lib/realtime/database.ex index 44523c9ba..9fcf2d15e 100644 --- a/lib/realtime/database.ex +++ b/lib/realtime/database.ex @@ -43,10 +43,9 @@ defmodule Realtime.Database do settings, application_name, backoff \\ :rand_exp, - decrypt \\ false, - pool \\ nil + decrypt \\ false ) do - pool = pool_size_by_application_name(application_name, settings, pool) + pool = pool_size_by_application_name(application_name, settings) settings = if decrypt do @@ -80,67 +79,56 @@ defmodule Realtime.Database do `realtime_migrations` will be handled as a special scenario as it requires 2 connections. ## Examples - iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{}, 1) - 1 - - iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{}) - 1 - - iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10}, 1) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10}, nil) - 10 + iex> Realtime.Database.pool_size_by_application_name("realtime_connect", %{"db_pool" => 10}) + 10 - iex> Realtime.Database.pool_size_by_application_name("realtime_potato", %{}, nil) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_potato", %{}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}, nil) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}, 10) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"subs_pool_size" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"db_pool" => 10}) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_rls", %{"subcriber_pool_size" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}, nil) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}, 10) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"subs_pool_size" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"db_pool" => 10}) - 1 + iex> Realtime.Database.pool_size_by_application_name("realtime_broadcast_changes", %{"subcriber_pool_size" => 10}) + 1 - iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}, nil) - 2 + iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}) + 2 - iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}, 10) - 2 + iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"subs_pool_size" => 10}) + 2 - iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"db_pool" => 10}) - 2 + iex> Realtime.Database.pool_size_by_application_name("realtime_migrations", %{"subcriber_pool_size" => 10}) + 2 """ - @spec pool_size_by_application_name(binary(), map(), non_neg_integer() | nil) :: - non_neg_integer() - def pool_size_by_application_name(application_name, settings, override_pool \\ nil) do - pool = - case application_name do - "realtime_subscription_manager" -> settings["subcriber_pool_size"] - "realtime_subscription_manager_pub" -> settings["subs_pool_size"] - "realtime_subscription_checker" -> settings["subs_pool_size"] - "realtime_connect" -> settings["db_pool"] - "realtime_health_check" -> 1 - "realtime_janitor" -> 1 - _ -> 1 - end - + @spec pool_size_by_application_name(binary(), map() | nil) :: non_neg_integer() + def pool_size_by_application_name(application_name, settings) do case application_name do - "realtime_rls" -> 1 - "realtime_broadcast_changes" -> 1 + "realtime_subscription_manager" -> settings["subcriber_pool_size"] || 1 + "realtime_subscription_manager_pub" -> settings["subs_pool_size"] || 1 + "realtime_subscription_checker" -> settings["subs_pool_size"] || 1 + "realtime_connect" -> settings["db_pool"] || 1 + "realtime_health_check" -> 1 + "realtime_janitor" -> 1 "realtime_migrations" -> 2 - _ -> if override_pool, do: override_pool, else: pool || 1 + "realtime_broadcast_changes" -> 1 + "realtime_rls" -> 1 + "realtime_replication_slot_teardown" -> 1 + _ -> 1 end end @@ -220,10 +208,10 @@ defmodule Realtime.Database do end) end - def connect(tenant, application_name, pool, backoff \\ :stop) do + def connect(tenant, application_name, backoff \\ :stop) do tenant |> then(&Realtime.PostgresCdc.filter_settings(@cdc, &1.extensions)) - |> then(&Realtime.Database.from_settings(&1, application_name, backoff, false, pool)) + |> then(&Realtime.Database.from_settings(&1, application_name, backoff, false)) |> connect_db() end @@ -286,7 +274,7 @@ defmodule Realtime.Database do """ @spec replication_slot_teardown(Tenant.t()) :: :ok def replication_slot_teardown(tenant) do - {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown", 1) + {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown") query = "select active_pid from pg_replication_slots where slot_name ilike '%realtime%'" diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 20814a9c3..5be3e844a 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -105,7 +105,7 @@ defmodule Realtime.Tenants do connected_cluster when is_integer(connected_cluster) -> tenant = Cache.get_tenant_by_external_id(external_id) - {:ok, db_conn} = Database.connect(tenant, "realtime_health_check", 1) + {:ok, db_conn} = Database.connect(tenant, "realtime_health_check") Migrations.maybe_run_migrations(db_conn, tenant) Process.alive?(db_conn) && GenServer.stop(db_conn) diff --git a/lib/realtime/tenants/janitor.ex b/lib/realtime/tenants/janitor.ex index c5a649fa5..a2e57854b 100644 --- a/lib/realtime/tenants/janitor.ex +++ b/lib/realtime/tenants/janitor.ex @@ -120,7 +120,7 @@ defmodule Realtime.Tenants.Janitor do Logger.info("Janitor starting realtime.messages cleanup") with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id), - {:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1), + {:ok, conn} <- Database.connect(tenant, "realtime_janitor"), :ok <- Messages.delete_old_messages(conn), :ok <- Migrations.create_partitions(conn) do Logger.info("Janitor finished") diff --git a/mix.exs b/mix.exs index b647ad16f..2b2eaa9ae 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.33.64", + version: "2.33.65", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index e52fc7ad0..e59bba653 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -977,7 +977,7 @@ defmodule Realtime.Integration.RtChannelTest do def rls_context(%{tenant: tenant} = context) do {:ok, db_conn} = - Database.connect(tenant, "realtime_test", 1) + Database.connect(tenant, "realtime_test") clean_table(db_conn, "realtime", "messages") topic = Map.get(context, :topic, random_string()) @@ -1028,7 +1028,7 @@ defmodule Realtime.Integration.RtChannelTest do Postgrex.query!(db_conn, query, []) on_exit(fn -> - {:ok, db_conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, db_conn} = Database.connect(tenant, "realtime_test") query = "DROP TABLE #{random_name} CASCADE" Postgrex.query!(db_conn, query, []) Realtime.Tenants.Connect.shutdown(db_conn) diff --git a/test/realtime/broadcast_changes/handler_test.exs b/test/realtime/broadcast_changes/handler_test.exs index d201e8622..3d8e88aaa 100644 --- a/test/realtime/broadcast_changes/handler_test.exs +++ b/test/realtime/broadcast_changes/handler_test.exs @@ -21,7 +21,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} Migrations.run_migrations(migrations) - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") clean_table(conn, "realtime", "messages") publication = @@ -108,7 +108,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do }) end - Database.connect(tenant, "realtime_test", 1) + Database.connect(tenant, "realtime_test") Realtime.Repo.insert_all_entries(Message, messages, Message) :timer.sleep(500) diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index 15ac90a86..eaed2633c 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -19,7 +19,7 @@ defmodule Realtime.DatabaseTest do pid = start_supervised!({Extensions.PostgresCdcStream.Replication, args}, restart: :transient) - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") # Check replication slot was created assert %{rows: [["supabase_realtime_replication_slot"]]} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", []) @@ -34,7 +34,7 @@ defmodule Realtime.DatabaseTest do describe "transaction/1" do setup do tenant = tenant_fixture() - {:ok, db_conn} = Database.connect(tenant, "realtime_test", 1, :stop) + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) %{db_conn: db_conn} end diff --git a/test/realtime/messages_test.exs b/test/realtime/messages_test.exs index cb1d7c488..d39133613 100644 --- a/test/realtime/messages_test.exs +++ b/test/realtime/messages_test.exs @@ -13,7 +13,7 @@ defmodule Realtime.MessagesTest do migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} Migrations.run_migrations(migrations) - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") clean_table(conn, "realtime", "messages") date_start = Date.utc_today() |> Date.add(-10) date_end = Date.utc_today() diff --git a/test/realtime/repo_test.exs b/test/realtime/repo_test.exs index b468131c9..bead8ed9c 100644 --- a/test/realtime/repo_test.exs +++ b/test/realtime/repo_test.exs @@ -12,7 +12,7 @@ defmodule Realtime.RepoTest do setup do tenant = tenant_fixture() - {:ok, db_conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, db_conn} = Database.connect(tenant, "realtime_test") [%{settings: settings} | _] = tenant.extensions migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} diff --git a/test/realtime/tenants/authorization_test.exs b/test/realtime/tenants/authorization_test.exs index 12d589615..0e80dffd6 100644 --- a/test/realtime/tenants/authorization_test.exs +++ b/test/realtime/tenants/authorization_test.exs @@ -150,7 +150,7 @@ defmodule Realtime.Tenants.AuthorizationTest do context.authorization_context ) - {:ok, db_conn} = Database.connect(context.tenant, "realtime_test", 1) + {:ok, db_conn} = Database.connect(context.tenant, "realtime_test") assert {:ok, []} = Repo.all(db_conn, Message, Message) end end @@ -163,7 +163,7 @@ defmodule Realtime.Tenants.AuthorizationTest do migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} Migrations.run_migrations(migrations) - {:ok, db_conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, db_conn} = Database.connect(tenant, "realtime_test") clean_table(db_conn, "realtime", "messages") topic = random_string() diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index 0023bea59..c64f70b5d 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -31,7 +31,7 @@ defmodule Realtime.Tenants.JanitorTest do tenant = Repo.preload(tenant, :extensions) Connect.lookup_or_start_connection(tenant.external_id) :timer.sleep(250) - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") clean_table(conn, "realtime", "messages") tenant end @@ -80,7 +80,7 @@ defmodule Realtime.Tenants.JanitorTest do current = Enum.map(tenants, fn tenant -> - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") {:ok, res} = Repo.all(conn, from(m in Message), Message) res end) diff --git a/test/realtime/tenants/migrations_test.exs b/test/realtime/tenants/migrations_test.exs index aefe16c21..681d3a144 100644 --- a/test/realtime/tenants/migrations_test.exs +++ b/test/realtime/tenants/migrations_test.exs @@ -13,7 +13,7 @@ defmodule Realtime.Tenants.MigrationsTest do test "migrations for a given tenant only run once", %{tenant: tenant} do %{extensions: [%{settings: settings}]} = tenant - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, conn} = Database.connect(tenant, "realtime_test") Postgrex.query!(conn, "DROP SCHEMA realtime CASCADE", []) Postgrex.query!(conn, "CREATE SCHEMA realtime", []) diff --git a/test/support/generators.ex b/test/support/generators.ex index 4b06cd1ba..adb0e97ca 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -44,7 +44,7 @@ defmodule Generators do end def message_fixture(tenant, override \\ %{}) do - {:ok, db_conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, db_conn} = Database.connect(tenant, "realtime_test") Realtime.Tenants.Migrations.create_partitions(db_conn) create_attrs = %{